2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
22 import ncclient
.asyncio_manager
32 from collections
import defaultdict
33 from collections
import deque
35 from urllib
.parse
import urlparse
37 # disable unsigned certificate warning
38 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
39 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
41 gi
.require_version('RwYang', '1.0')
42 gi
.require_version('NsdBaseYang', '1.0')
43 gi
.require_version('ProjectNsdYang', '1.0')
44 gi
.require_version('RwDts', '1.0')
45 gi
.require_version('RwNsmYang', '1.0')
46 gi
.require_version('RwNsrYang', '1.0')
47 gi
.require_version('NsrYang', '1.0')
48 gi
.require_version('RwTypes', '1.0')
49 gi
.require_version('RwVlrYang', '1.0')
50 gi
.require_version('RwVnfrYang', '1.0')
51 gi
.require_version('VnfrYang', '1.0')
52 gi
.require_version('ProjectVnfdYang', '1.0')
53 from gi
.repository
import (
58 ProjectNsdYang
as NsdYang
,
69 gi
.require_version('RwKeyspec', '1.0')
70 from gi
.repository
.RwKeyspec
import quoted_key
72 from rift
.mano
.utils
.ssh_keys
import ManoSshKey
73 import rift
.mano
.ncclient
74 import rift
.mano
.config_data
.config
75 import rift
.mano
.dts
as mano_dts
77 from rift
.mano
.utils
.project
import (
80 get_add_delete_update_cfgs
,
84 from . import rwnsm_conman
as conman
86 from . import publisher
87 from . import subscriber
89 from . import config_value_pool
90 from . import rwvnffgmgr
91 from . import scale_group
92 from . import rwnsmplugin
93 from . import openmano_nsm
97 class NetworkServiceRecordState(Enum
):
98 """ Network Service Record State """
102 VNFFG_INIT_PHASE
= 104
108 VL_TERMINATE_PHASE
= 111
109 VNF_TERMINATE_PHASE
= 112
110 VNFFG_TERMINATE_PHASE
= 113
117 class NetworkServiceRecordError(Exception):
118 """ Network Service Record Error """
122 class NetworkServiceDescriptorError(Exception):
123 """ Network Service Descriptor Error """
127 class VirtualNetworkFunctionRecordError(Exception):
128 """ Virtual Network Function Record Error """
132 class NetworkServiceDescriptorNotFound(Exception):
133 """ Cannot find Network Service Descriptor"""
137 class NetworkServiceDescriptorNotFound(Exception):
138 """ Network Service Descriptor reference count exists """
141 class NsrInstantiationFailed(Exception):
142 """ Failed to instantiate network service """
146 class VnfInstantiationFailed(Exception):
147 """ Failed to instantiate virtual network function"""
151 class VnffgInstantiationFailed(Exception):
152 """ Failed to instantiate virtual network function"""
156 class VnfDescriptorError(Exception):
157 """Failed to instantiate virtual network function"""
161 class ScalingOperationError(Exception):
165 class ScaleGroupMissingError(Exception):
169 class PlacementGroupError(Exception):
173 class NsrNsdUpdateError(Exception):
177 class NsrVlUpdateError(NsrNsdUpdateError
):
180 class VirtualLinkRecordError(Exception):
181 """ Virtual Links Record Error """
185 class VlRecordState(Enum
):
186 """ VL Record State """
188 INSTANTIATION_PENDING
= 102
190 TERMINATE_PENDING
= 104
195 class VnffgRecordState(Enum
):
196 """ VNFFG Record State """
198 INSTANTIATION_PENDING
= 102
200 TERMINATE_PENDING
= 104
205 class VnffgRecord(object):
206 """ Vnffg Records class"""
209 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
,cloud_account_name
):
214 self
._vnffgmgr
= vnffgmgr
216 self
._nsr
_name
= nsr_name
217 self
._vnffgd
_msg
= vnffgd_msg
218 self
._cloud
_account
_name
= cloud_account_name
219 if sdn_account_name
is None:
220 self
._sdn
_account
_name
= ''
222 self
._sdn
_account
_name
= sdn_account_name
224 self
._vnffgr
_id
= str(uuid
.uuid4())
225 self
._vnffgr
_rsp
_id
= list()
226 self
._vnffgr
_state
= VnffgRecordState
.INIT
231 return self
._vnffgr
_id
235 """ state of this VNF """
236 return self
._vnffgr
_state
238 def fetch_vnffgr(self
):
240 Get VNFFGR message to be published
243 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
244 vnffgr_dict
= {"id": self
._vnffgr
_id
,
245 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
246 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
247 "sdn_account": self
._sdn
_account
_name
,
248 "operational_status": 'init',
250 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
251 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
252 vnffgr_dict
= {"id": self
._vnffgr
_id
,
253 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
254 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
255 "sdn_account": self
._sdn
_account
_name
,
256 "operational_status": 'terminated',
258 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
261 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
263 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
264 self
._vnffgr
_state
= VnffgRecordState
.FAILED
265 vnffgr_dict
= {"id": self
._vnffgr
_id
,
266 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
267 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
268 "sdn_account": self
._sdn
_account
_name
,
269 "operational_status": 'failed',
271 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
276 def vnffgr_create_msg(self
):
277 """ Virtual Link Record message for Creating VLR in VNS """
278 vnffgr_dict
= {"id": self
._vnffgr
_id
,
279 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
280 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
281 "sdn_account": self
._sdn
_account
_name
,
282 "cloud_account": self
._cloud
_account
_name
,
284 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
285 for rsp
in self
._vnffgd
_msg
.rsp
:
286 vnffgr_rsp
= vnffgr
.rsp
.add()
287 vnffgr_rsp
.id = str(uuid
.uuid4())
288 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
289 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
290 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
291 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
292 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
293 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
294 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
295 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
296 self
._log
.debug("Service Function Type for VNFD ID %s is %s",
297 rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
299 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",
300 rsp_cp_ref
.vnfd_id_ref
)
303 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
304 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
305 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
306 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
307 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
308 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
309 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
310 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
311 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
312 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
313 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
315 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
316 self
._log
.debug(" Received VNFR is %s", vnfr
)
317 while vnfr
.operational_status
!= 'running':
318 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
319 if vnfr
.operational_status
== 'failed':
320 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
321 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
323 yield from asyncio
.sleep(2, loop
=self
._loop
)
324 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
325 self
._log
.debug("Received VNFR is %s", vnfr
)
327 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
328 for cp
in vnfr
.connection_point
:
329 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
330 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
331 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
332 for vdu
in vnfr
.vdur
:
333 for intf
in vdu
.interface
:
334 if intf
.type_yang
== "EXTERNAL" and intf
.external_connection_point_ref
== vnfr_cp_ref
.vnfr_connection_point_ref
:
335 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
336 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
337 vnfr_cp_ref
.connection_point_params
.vm_id
)
340 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
341 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
343 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
344 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
346 rsp_id_ref
= _rsp
[0].id
347 rsp_name
= _rsp
[0].name
349 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",
350 vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
352 vnffgr_classifier
= vnffgr
.classifier
.add()
353 vnffgr_classifier
.id = vnffgd_classifier
.id
354 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
355 _rsp
[0].classifier_name
= vnffgr_classifier
.name
356 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
357 vnffgr_classifier
.rsp_name
= rsp_name
358 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
359 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
360 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
361 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
362 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
363 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
365 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
366 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
368 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
369 self
._log
.debug(" Received VNFR is %s", vnfr
)
370 while vnfr
.operational_status
!= 'running':
371 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
372 if vnfr
.operational_status
== 'failed':
373 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
374 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
376 yield from asyncio
.sleep(2, loop
=self
._loop
)
377 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
378 self
._log
.debug("Received VNFR is %s", vnfr
)
380 for cp
in vnfr
.connection_point
:
381 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
382 vnffgr_classifier
.port_id
= cp
.connection_point_id
383 vnffgr_classifier
.ip_address
= cp
.ip_address
384 for vdu
in vnfr
.vdur
:
385 for intf
in vdu
.interface
:
386 if intf
.type_yang
== "EXTERNAL" and intf
.external_connection_point_ref
== vnffgr_classifier
.vnfr_connection_point_ref
:
387 vnffgr_classifier
.vm_id
= vdu
.vim_id
388 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",
390 vnfr_cp_ref
.connection_point_params
.vm_id
)
393 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
397 def vnffgr_nsr_sff_list(self
):
398 """ SFF List for VNFR """
400 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
402 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
403 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
404 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
405 self
._log
.debug(" Received VNFR is %s", vnfr
)
406 while vnfr
.operational_status
!= 'running':
407 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
408 if vnfr
.operational_status
== 'failed':
409 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
410 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
411 yield from asyncio
.sleep(2, loop
=self
._loop
)
412 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
413 self
._log
.debug("Received VNFR is %s", vnfr
)
415 sff
= RwsdnalYang
.YangData_RwProject_Project_Vnffgs_VnffgChain_Sff()
416 sff_list
[nsr_vnfr
.vnfd
.id] = sff
417 sff
.name
= nsr_vnfr
.name
418 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
420 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
421 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
422 for cp
in vnfr
.connection_point
:
423 sff_dp
= sff
.dp_endpoints
.add()
424 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
425 sff_dp
.address
= cp
.ip_address
426 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
427 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
428 for sf_name
in sf_list
:
429 _sf
= sff
.vnfr_list
.add()
430 _sf
.vnfr_name
= sf_name
435 def instantiate(self
):
436 """ Instantiate this VNFFG """
438 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
442 vnffgr_request
= yield from self
.vnffgr_create_msg()
443 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
446 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
447 except Exception as e
:
448 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
449 self
._vnffgr
_state
= VnffgRecordState
.FAILED
450 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
452 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
454 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
455 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
457 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
458 yield from self
._nsr
.update_state()
460 def vnffgr_in_vnffgrm(self
):
461 """ Is there a VNFR record in VNFM """
462 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
463 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
464 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
471 """ Terminate this VNFFGR """
472 if not self
.vnffgr_in_vnffgrm():
473 self
._log
.error("Ignoring terminate request for id %s in state %s",
474 self
.id, self
._vnffgr
_state
)
477 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
478 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
480 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
482 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
483 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
486 class VirtualLinkRecord(object):
487 """ Virtual Link Records class"""
488 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
491 def create_record(dts
, log
, loop
, project
, nsr_name
, vld_msg
,
492 datacenter
, ip_profile
, nsr_id
, restart_mode
=False):
493 """Creates a new VLR object based on the given data.
495 If restart mode is enabled, then we look for existing records in the
496 DTS and create a VLR records using the exiting data(ID)
501 vlr_obj
= VirtualLinkRecord(
514 res_iter
= yield from dts
.query_read(
515 project
.add_project("D,/vlr:vlr-catalog/vlr:vlr"),
516 rwdts
.XactFlag
.MERGE
)
519 response
= yield from fut
520 vlr
= response
.result
522 # Check if the record is already present, if so use the ID of
523 # the existing record. Since the name of the record is uniquely
524 # formed we can use it as a search key!
525 if vlr
.name
== vlr_obj
.name
:
526 vlr_obj
.reset_id(vlr
.id)
531 def __init__(self
, dts
, log
, loop
, project
, nsr_name
, vld_msg
,
532 datacenter
, ip_profile
, nsr_id
):
536 self
._project
= project
537 self
._nsr
_name
= nsr_name
538 self
._vld
_msg
= vld_msg
539 self
._datacenter
_name
= datacenter
540 self
._assigned
_subnet
= None
541 self
._nsr
_id
= nsr_id
542 self
._ip
_profile
= ip_profile
543 self
._vlr
_id
= str(uuid
.uuid4())
544 self
._state
= VlRecordState
.INIT
545 self
._prev
_state
= None
546 self
._create
_time
= int(time
.time())
547 self
.state_failed_reason
= None
551 """ path for this object """
552 return self
._project
.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
553 format(quoted_key(self
._vlr
_id
)))
562 """ Get NSR name for this VL """
567 """ Virtual Link Desciptor """
571 def assigned_subnet(self
):
572 """ Subnet assigned to this VL"""
573 return self
._assigned
_subnet
578 Get the name for this VLR.
579 VLR name is "nsr name:VLD name"
581 if self
.vld_msg
.vim_network_name
:
582 return self
.vld_msg
.vim_network_name
583 elif self
.vld_msg
.name
== "multisite":
584 # This is a temporary hack to identify manually provisioned inter-site network
585 return self
.vld_msg
.name
587 return self
._project
.name
+ "." +self
._nsr
_name
+ "." + self
.vld_msg
.name
590 def datacenter_name(self
):
591 """ Datacenter that this VLR should be created in """
592 return self
._datacenter
_name
596 """ Get the VLR path from VLR """
597 return (VirtualLinkRecord
.XPATH
+ "[vlr:id={}]").format(quoted_key(vlr
.id))
605 def state(self
, value
):
606 """ VLR set state """
610 def prev_state(self
):
611 """ VLR previous state """
612 return self
._prev
_state
615 def prev_state(self
, value
):
616 """ VLR set previous state """
617 self
._prev
_state
= value
621 """ Virtual Link Record message for Creating VLR in VNS """
622 vld_fields
= ["short_name",
630 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
633 vlr_dict
= {"id": self
._vlr
_id
,
634 "nsr_id_ref": self
._nsr
_id
,
635 "vld_ref": self
.vld_msg
.id,
637 "create_time": self
._create
_time
,
638 "datacenter": self
._datacenter
_name
,
641 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
642 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
645 vlr_dict
.update(vld_copy_dict
)
646 vlr
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.from_dict(vlr_dict
)
648 if self
.vld_msg
.has_field('virtual_connection_points'):
649 for cp
in self
.vld_msg
.virtual_connection_points
:
650 vcp
= vlr
.virtual_connection_points
.add()
651 vcp
.from_dict(cp
.as_dict())
654 def reset_id(self
, vlr_id
):
655 self
._vlr
_id
= vlr_id
657 def create_nsr_vlr_msg(self
, vnfrs
):
658 """ The VLR message"""
659 nsr_vlr
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr()
660 nsr_vlr
.vlr_ref
= self
._vlr
_id
661 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
662 nsr_vlr
.datacenter
= self
._datacenter
_name
664 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
666 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
667 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
668 self
._datacenter
_name
== vnfr
._datacenter
_name
):
669 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
670 cp_entry
.vnfr_id
= vnfr
.id
671 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
676 def instantiate(self
):
677 """ Instantiate this VL """
678 self
._log
.debug("Instaniating VLR key %s, vld %s",
679 self
.xpath
, self
._vld
_msg
)
681 self
._state
= VlRecordState
.INSTANTIATION_PENDING
682 self
._log
.debug("Executing VL create path:%s msg:%s",
683 self
.xpath
, self
.vlr_msg
)
685 with self
._dts
.transaction(flags
=0) as xact
:
686 block
= xact
.block_create()
687 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
688 self
._log
.debug("Executing VL create path:%s msg:%s",
689 self
.xpath
, self
.vlr_msg
)
690 res_iter
= yield from block
.execute(now
=True)
696 self
._state
= VlRecordState
.FAILED
697 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
699 if vlr
.operational_status
== 'failed':
700 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
701 self
._state
= VlRecordState
.FAILED
702 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
704 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
706 self
._assigned
_subnet
= vlr
.assigned_subnet
708 def vlr_in_vns(self
):
709 """ Is there a VLR record in VNS """
710 if (self
._state
== VlRecordState
.ACTIVE
or
711 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
712 self
._state
== VlRecordState
.TERMINATE_PENDING
or
713 self
._state
== VlRecordState
.FAILED
):
720 """ Terminate this VL """
721 if not self
.vlr_in_vns():
722 self
._log
.debug("Ignoring terminate request for id %s in state %s",
723 self
.id, self
._state
)
726 self
._log
.debug("Terminating VL id:%s", self
.id)
727 self
._state
= VlRecordState
.TERMINATE_PENDING
729 with self
._dts
.transaction(flags
=0) as xact
:
730 block
= xact
.block_create()
731 block
.add_query_delete(self
.xpath
)
732 yield from block
.execute(flags
=0, now
=True)
734 self
._state
= VlRecordState
.TERMINATED
735 self
._log
.debug("Terminated VL id:%s", self
.id)
737 def set_state_from_op_status(self
, operational_status
):
738 """ Set the state of this VL based on operational_status"""
740 self
._log
.debug("set_state_from_op_status called for vlr id %s with value %s", self
.id, operational_status
)
741 if operational_status
== 'running':
742 self
._state
= VlRecordState
.ACTIVE
743 elif operational_status
== 'failed':
744 self
._state
= VlRecordState
.FAILED
745 elif operational_status
== 'vl_alloc_pending':
746 self
._state
= VlRecordState
.INSTANTIATION_PENDING
748 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status
))
750 class VnfRecordState(Enum
):
751 """ Vnf Record State """
753 INSTANTIATION_PENDING
= 102
755 TERMINATE_PENDING
= 104
760 class VirtualNetworkFunctionRecord(object):
761 """ Virtual Network Function Record class"""
762 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
766 def create_record(dts
, log
, loop
, project
, vnfd
, nsr_config
, const_vnfd_msg
, nsd_id
, nsr_name
,
767 datacenter_name
, nsr_id
, group_name
, group_instance_id
,
768 placement_groups
, cloud_config
, restart_mode
=False):
769 """Creates a new VNFR object based on the given data.
771 If restart mode is enabled, then we look for existing records in the
772 DTS and create a VNFR records using the exiting data(ID)
775 VirtualNetworkFunctionRecord
778 vnfr_obj
= VirtualNetworkFunctionRecord(
794 restart_mode
=restart_mode
)
797 res_iter
= yield from dts
.query_read(
798 project
.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"),
799 rwdts
.XactFlag
.MERGE
)
802 response
= yield from fut
803 vnfr
= response
.result
805 if vnfr
.name
== vnfr_obj
.name
:
806 vnfr_obj
.reset_id(vnfr
.id)
824 group_instance_id
=None,
825 placement_groups
= [],
827 restart_mode
= False):
831 self
._project
= project
833 self
._nsr
_config
= nsr_config
834 self
._const
_vnfd
_msg
= const_vnfd_msg
835 self
._nsd
_id
= nsd_id
836 self
._nsr
_name
= nsr_name
837 self
._nsr
_id
= nsr_id
838 self
._datacenter
_name
= datacenter_name
839 self
._group
_name
= group_name
840 self
._group
_instance
_id
= group_instance_id
841 self
._placement
_groups
= placement_groups
842 self
._cloud
_config
= cloud_config
843 self
.restart_mode
= restart_mode
845 self
._config
_status
= NsrYang
.ConfigStates
.INIT
846 self
._create
_time
= int(time
.time())
848 self
._prev
_state
= VnfRecordState
.INIT
849 self
._state
= VnfRecordState
.INIT
850 self
._state
_failed
_reason
= None
852 self
._active
_vdus
= 0
854 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
857 self
._vnfr
_id
= str(uuid
.uuid4())
860 self
.substitute_vnf_input_parameters
= VnfInputParameterSubstitution(self
._log
,
861 self
._const
_vnfd
_msg
,
863 self
._vnfr
_msg
= self
.create_vnfr_msg()
864 self
._log
.debug("Set VNFR {} config type to {}".
865 format(self
.name
, self
.config_type
))
868 if group_name
is None and group_instance_id
is not None:
869 raise ValueError("Group instance id must not be provided with an empty group name")
879 return self
._project
.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]"
880 .format(quoted_key(self
.id)))
885 return self
._vnfr
_msg
888 def const_vnfr_msg(self
):
890 return RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef(
891 vnfr_id
=self
.id, datacenter
=self
._datacenter
_name
)
899 def datacenter_name(self
):
900 """ Datacenter that this VNF should be created in """
901 return self
._datacenter
_name
906 """ Is this VNF actve """
907 return True if self
._state
== VnfRecordState
.ACTIVE
else False
911 """ state of this VNF """
915 def state_failed_reason(self
):
916 """ Error message in case this VNF is in failed state """
917 return self
._state
_failed
_reason
920 def member_vnf_index(self
):
921 """ Member VNF index """
922 return self
._const
_vnfd
_msg
.member_vnf_index
927 return self
._nsr
_name
931 """ Name of this VNFR """
932 if self
._name
is not None:
935 name_tags
= [self
._project
.name
, self
._nsr
_name
]
937 if self
._group
_name
is not None:
938 name_tags
.append(self
._group
_name
)
940 if self
._group
_instance
_id
is not None:
941 name_tags
.append(str(self
._group
_instance
_id
))
943 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
945 self
._name
= "__".join(name_tags
)
950 def vnfr_xpath(vnfr
):
951 """ Get the VNFR path from VNFR """
952 return (VirtualNetworkFunctionRecord
.XPATH
+
953 "[vnfr:id={}]").format(quoted_key(vnfr
.id))
956 def config_type(self
):
957 cfg_types
= ['netconf', 'juju', 'script']
958 for method
in cfg_types
:
959 if self
._vnfd
.vnf_configuration
.has_field(method
):
964 def config_status(self
):
965 """Return the config status as YANG ENUM string"""
966 self
._log
.debug("Map VNFR {} config status {} ({})".
967 format(self
.name
, self
._config
_status
, self
.config_type
))
968 if self
.config_type
== 'none':
969 return 'config_not_needed'
970 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
972 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
977 def set_state(self
, state
):
978 """ set the state of this object """
979 self
._prev
_state
= self
._state
982 def reset_id(self
, vnfr_id
):
983 self
._vnfr
_id
= vnfr_id
984 self
._vnfr
_msg
= self
.create_vnfr_msg()
987 self
.config_store
.merge_vnfd_config(
991 self
.member_vnf_index
,
994 def create_vnfr_msg(self
):
995 """ VNFR message for this VNFR """
1003 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
1006 "nsr_id_ref": self
._nsr
_id
,
1008 "datacenter": self
._datacenter
_name
,
1009 "config_status": self
.config_status
1011 vnfr_dict
.update(vnfd_copy_dict
)
1013 vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1014 vnfr
.vnfd
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd
. \
1015 from_dict(self
.vnfd
.as_dict())
1016 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
1017 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
1019 if self
._vnfd
.mgmt_interface
.has_field("port"):
1020 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
1022 for group_info
in self
._placement
_groups
:
1023 group
= vnfr
.placement_groups_info
.add()
1024 group
.from_dict(group_info
.as_dict())
1026 if self
._cloud
_config
and len(self
._cloud
_config
.as_dict()):
1027 self
._log
.debug("Cloud config during vnfr create is {}".format(self
._cloud
_config
))
1028 vnfr
.cloud_config
= self
._cloud
_config
1030 # UI expects the monitoring param field to exist
1031 vnfr
.monitoring_param
= []
1033 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
1035 if self
.restart_mode
:
1036 vnfr
.operational_status
= 'init'
1038 # Set Operational Status as pre-init for Input Param Substitution
1039 vnfr
.operational_status
= 'pre_init'
1044 def update_vnfm(self
):
1045 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
1046 format(self
.name
, self
.vnfr_msg
))
1047 yield from self
._dts
.query_update(
1049 rwdts
.XactFlag
.REPLACE
,
1053 def get_config_status(self
):
1054 """Return the config status as YANG ENUM"""
1055 return self
._config
_status
1058 def set_config_status(self
, status
):
1060 def status_to_string(status
):
1062 NsrYang
.ConfigStates
.INIT
: 'init',
1063 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
1064 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
1065 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
1066 NsrYang
.ConfigStates
.FAILED
: 'failed',
1069 return status_dc
[status
]
1071 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
1072 format(self
.name
, self
._config
_status
,
1073 self
.config_type
, status
))
1074 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1075 self
._log
.warning("Updating already configured VNFR {}".
1079 if self
._config
_status
!= status
:
1081 self
._config
_status
= status
1082 # I don't think this is used. Original implementor can check.
1083 # Caused Exception, so corrected it by status_to_string
1084 # But not sure whats the use of this variable?
1085 self
.vnfr_msg
.config_status
= status_to_string(status
)
1086 except Exception as e
:
1087 self
._log
.exception("Exception=%s", str(e
))
1089 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1091 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1093 # Publish only after VNFM has the VNFR created
1094 yield from self
.update_vnfm()
1095 except Exception as e
:
1096 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1097 format(status
, self
.name
, e
))
1098 self
._log
.exception(e
)
1100 def is_configured(self
):
1101 if self
.config_type
== 'none':
1104 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1110 def update_config_primitives(self
, vnf_config
, nsr
):
1111 # Update only after we are configured
1112 if self
._config
_status
== NsrYang
.ConfigStates
.INIT
:
1115 if not vnf_config
.as_dict():
1118 self
._log
.debug("Update VNFR {} config: {}".
1119 format(self
.name
, vnf_config
.as_dict()))
1121 # Update config primitive
1123 for prim
in self
._vnfd
.vnf_configuration
.config_primitive
:
1124 for p
in vnf_config
.config_primitive
:
1125 if prim
.name
== p
.name
:
1126 for param
in prim
.parameter
:
1127 for pa
in p
.parameter
:
1128 if pa
.name
== param
.name
:
1129 if pa
.default_value
and \
1130 (pa
.default_value
!= param
.default_value
):
1131 param
.default_value
= pa
.default_value
1132 param
.read_only
= pa
.read_only
1135 self
._log
.debug("Prim: {}".format(prim
.as_dict()))
1139 self
._log
.debug("Updated VNFD {} config: {}".
1140 format(self
._vnfd
.name
,
1141 self
._vnfd
.vnf_configuration
))
1142 self
._vnfr
_msg
= self
.create_vnfr_msg()
1145 yield from nsr
.nsm_plugin
.update_vnfr(self
)
1146 except Exception as e
:
1147 self
._log
.error("Exception updating VNFM with new config "
1148 "primitive for VNFR {}: {}".
1149 format(self
.name
, e
))
1150 self
._log
.exception(e
)
1153 def instantiate(self
, nsr
):
1154 """ Instantiate this VNFR"""
1156 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1157 self
.xpath
, self
._vnfd
)
1159 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1160 self
.xpath
, self
.vnfr_msg
)
1162 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1164 def find_vlr_for_cp(conn
):
1165 """ Find VLR for the given connection point """
1166 for vlr_id
, vlr
in nsr
.vlrs
.items():
1167 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1168 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1169 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1170 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1171 vlr
._datacenter
_name
== self
._datacenter
_name
):
1172 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1173 conn
.name
, self
.member_vnf_index
)
1177 # For every connection point in the VNFD fill in the identifier
1178 self
._log
.debug("Add connection point for VNF %s: %s",
1179 self
.vnfr_msg
.name
, self
._vnfd
.connection_point
)
1180 for conn_p
in self
._vnfd
.connection_point
:
1181 cpr
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint()
1182 cpr
.name
= conn_p
.name
1183 cpr
.type_yang
= conn_p
.type_yang
1184 if conn_p
.has_field('port_security_enabled'):
1185 cpr
.port_security_enabled
= conn_p
.port_security_enabled
1187 vlr_ref
= find_vlr_for_cp(conn_p
)
1189 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1190 self
._log
.debug("%s", msg
)
1191 # raise VirtualNetworkFunctionRecordError(msg)
1194 cpr
.vlr_ref
= vlr_ref
.id
1196 self
.vnfr_msg
.connection_point
.append(cpr
)
1197 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1198 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd
.id)
1200 self
._log
.debug("VNFR {} restart mode {}".
1201 format(self
.vnfr_msg
.id, self
.restart_mode
))
1202 if not self
.restart_mode
:
1203 # Checking for NS Terminate.
1204 if nsr
._ns
_terminate
_received
== False:
1205 # Create with pre-init operational state publishes the vnfr for substitution.
1206 yield from self
._dts
.query_create(self
.xpath
, 0, self
.vnfr_msg
)
1207 # Call to substitute VNF Input Parameter
1208 self
.substitute_vnf_input_parameters(self
.vnfr_msg
, self
._nsr
_config
)
1209 # Calling Update with pre-init operational data after Param substitution to instatntiate vnfr
1210 yield from self
._dts
.query_update(self
.xpath
, 0, self
.vnfr_msg
)
1213 yield from self
._dts
.query_update(self
.xpath
,
1217 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1218 self
.xpath
, self
.vnfr_msg
)
1221 def update_state(self
, vnfr_msg
):
1222 """ Update this VNFR"""
1223 if vnfr_msg
.operational_status
== "running":
1224 if self
.vnfr_msg
.operational_status
!= "running":
1225 yield from self
.is_active()
1226 elif vnfr_msg
.operational_status
== "failed":
1227 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1230 def is_active(self
):
1231 """ This VNFR is active """
1232 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1233 self
.set_state(VnfRecordState
.ACTIVE
)
1236 def instantiation_failed(self
, failed_reason
=None):
1237 """ This VNFR instantiation failed"""
1238 self
._log
.debug("VNFR %s instantiation failed", self
._vnfr
_id
)
1239 self
.set_state(VnfRecordState
.FAILED
)
1240 self
._state
_failed
_reason
= failed_reason
1242 def vnfr_in_vnfm(self
):
1243 """ Is there a VNFR record in VNFM """
1244 if (self
._state
== VnfRecordState
.ACTIVE
or
1245 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1246 self
._state
== VnfRecordState
.FAILED
):
1252 def terminate(self
):
1253 """ Terminate this VNF """
1254 if not self
.vnfr_in_vnfm():
1255 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1256 self
.id, self
._state
)
1259 self
._log
.debug("Terminating VNF id:%s", self
.id)
1260 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1261 with self
._dts
.transaction(flags
=0) as xact
:
1262 block
= xact
.block_create()
1263 block
.add_query_delete(self
.xpath
)
1264 yield from block
.execute(flags
=0)
1265 self
.set_state(VnfRecordState
.TERMINATED
)
1266 self
._log
.debug("Terminated VNF id:%s", self
.id)
1269 class NetworkServiceStatus(object):
1270 """ A class representing the Network service's status """
1271 MAX_EVENTS_RECORDED
= 10
1272 """ Network service Status class"""
1273 def __init__(self
, dts
, log
, loop
):
1278 self
._state
= NetworkServiceRecordState
.INIT
1279 self
._events
= deque([])
1282 def create_notification(self
, evt
, evt_desc
, evt_details
):
1283 xp
= "N,/rw-nsr:nsm-notification"
1284 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1286 notif
.description
= evt_desc
1287 notif
.details
= evt_details
if evt_details
is not None else None
1289 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1290 self
._log
.info("Notification called by creating dts query: %s", notif
)
1292 def record_event(self
, evt
, evt_desc
, evt_details
):
1293 """ Record an event """
1294 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1295 evt
, evt_desc
, len(self
._events
))
1296 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1297 self
._events
.popleft()
1298 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1299 evt_details
if evt_details
is not None else None))
1301 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1303 def set_state(self
, state
):
1304 """ set the state of this status object """
1308 """ Return the state as a yang enum string """
1309 state_to_str_map
= {"INIT": "init",
1310 "VL_INIT_PHASE": "vl_init_phase",
1311 "VNF_INIT_PHASE": "vnf_init_phase",
1312 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1313 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1314 "RUNNING": "running",
1315 "SCALING_OUT": "scaling_out",
1316 "SCALING_IN": "scaling_in",
1317 "TERMINATE_RCVD": "terminate_rcvd",
1318 "TERMINATE": "terminate",
1319 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1320 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1321 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1322 "TERMINATED": "terminated",
1324 "VL_INSTANTIATE": "vl_instantiate",
1325 "VL_TERMINATE": "vl_terminate",
1327 return state_to_str_map
[self
._state
.name
]
1331 """ State of this status object """
1336 """ Network Service Record as a message"""
1339 for entry
in self
._events
:
1340 event
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents()
1343 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1344 event_list
.append(event
)
1348 class NetworkServiceRecord(object):
1349 """ Network service record """
1350 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1352 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
,
1353 sdn_account_name
, key_pairs
, project
, restart_mode
=False,
1359 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1360 self
._nsm
_plugin
= nsm_plugin
1361 self
._sdn
_account
_name
= sdn_account_name
1362 self
._vlr
_handler
= vlr_handler
1363 self
._project
= project
1366 self
._nsr
_msg
= None
1367 self
._nsr
_regh
= None
1368 self
._key
_pairs
= key_pairs
1369 self
._ssh
_key
_file
= None
1370 self
._ssh
_pub
_key
= None
1375 self
._param
_pools
= {}
1376 self
._scaling
_groups
= {}
1377 self
._create
_time
= int(time
.time())
1378 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1379 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1380 self
._config
_status
_details
= None
1382 self
.restart_mode
= restart_mode
1383 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1384 self
._debug
_running
= False
1385 self
._is
_active
= False
1386 self
._vl
_phase
_completed
= False
1387 self
._vnf
_phase
_completed
= False
1388 self
.instantiated
= set()
1390 # Used for orchestration_progress
1391 self
._active
_vms
= 0
1392 self
._active
_networks
= 0
1394 # A flag to indicate if the NS has failed, currently it is recorded in
1395 # operational status, but at the time of termination this field is
1396 # over-written making it difficult to identify the failure.
1397 self
._is
_failed
= False
1399 # Initalise the state to init
1400 # The NSR moves through the following transitions
1401 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1402 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1403 # 3. VNFS_READY - READY when the NSR is published
1405 self
.set_state(NetworkServiceRecordState
.INIT
)
1407 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
, self
._project
)
1409 # Create an asyncio loop to know when the virtual links are ready
1410 self
._vls
_ready
= asyncio
.Event(loop
=self
._loop
)
1412 # This variable stores all the terminate events received per NS. This is then used to prevent any
1413 # further nsr non-terminate updates received in case of terminate being called bedore ns in in running state.
1414 self
._ns
_terminate
_received
= False
1417 def nsm_plugin(self
):
1419 return self
._nsm
_plugin
1421 def set_state(self
, state
):
1422 """ Set state for this NSR"""
1423 # We are in init phase and is moving to the next state
1424 # The new state could be a FAILED state or VNF_INIIT_PHASE
1425 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1426 self
._vl
_phase
_completed
= True
1428 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1429 self
._vnf
_phase
_completed
= True
1431 self
._op
_status
.set_state(state
)
1433 self
._nsm
_plugin
.set_state(self
.id, state
)
1437 """ Get id for this NSR"""
1438 return self
._nsr
_cfg
_msg
.id
1442 """ Name of this network service record """
1443 return self
._nsr
_cfg
_msg
.name
1446 def _datacenter_name(self
):
1447 if self
._nsr
_cfg
_msg
.has_field('datacenter'):
1448 return self
._nsr
_cfg
_msg
.datacenter
1453 """State of this NetworkServiceRecord"""
1454 return self
._op
_status
.state
1458 """ Is this NSR active ?"""
1459 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1463 """ VLRs associated with this NSR"""
1468 """ VNFRs associated with this NSR"""
1473 """ VNFFGRs associated with this NSR"""
1474 return self
._vnffgrs
1477 def scaling_groups(self
):
1478 """ Scaling groups associated with this NSR """
1479 return self
._scaling
_groups
1482 def param_pools(self
):
1483 """ Parameter value pools associated with this NSR"""
1484 return self
._param
_pools
1487 def nsr_cfg_msg(self
):
1488 return self
._nsr
_cfg
_msg
1491 def nsr_cfg_msg(self
, msg
):
1492 self
._nsr
_cfg
_msg
= msg
1496 """ NSD Protobuf for this NSR """
1497 if self
._nsd
is not None:
1499 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1504 """ NSD ID for this NSR """
1505 return self
.nsd_msg
.id
1509 ''' Get a new job id for config primitive'''
1514 def config_status(self
):
1515 """ Config status for NSR """
1516 return self
._config
_status
1524 def is_failed(self
):
1525 return self
._is
_failed
1528 def public_key(self
):
1529 return self
._ssh
_pub
_key
1532 def private_key(self
):
1533 return self
._ssh
_key
_file
1535 def resolve_placement_group_cloud_construct(self
, input_group
):
1537 Returns the cloud specific construct for placement group
1539 copy_dict
= ['name', 'requirement', 'strategy']
1541 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1542 if group_info
.placement_group_ref
== input_group
.name
:
1543 group
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1544 group_dict
= {k
:v
for k
,v
in
1545 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1546 for param
in copy_dict
:
1547 group_dict
.update({param
: getattr(input_group
, param
)})
1548 group
.from_dict(group_dict
)
1554 return "NSR(name={}, nsd_id={}, data center={})".format(
1555 self
.name
, self
.nsd_id
, self
._datacenter
_name
1558 def _get_vnfd(self
, vnfd_id
, config_xact
):
1559 """ Fetch vnfd msg for the passed vnfd id """
1560 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1562 def _get_vnfd_datacenter(self
, vnfd_member_index
):
1563 """ Fetch datacenter for the passed vnfd id """
1564 if self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
1565 vim_accounts
= [vnf
.datacenter
for vnf
in self
._nsr
_cfg
_msg
.vnf_datacenter_map \
1566 if str(vnfd_member_index
) == str(vnf
.member_vnf_index_ref
)]
1567 if vim_accounts
and vim_accounts
[0]:
1568 return vim_accounts
[0]
1569 return self
._datacenter
_name
1571 def _get_constituent_vnfd_msg(self
, vnf_index
):
1572 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1573 if const_vnfd
.member_vnf_index
== vnf_index
:
1576 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1578 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1579 """ Record an event """
1580 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1581 if state
is not None:
1582 self
.set_state(state
)
1584 def scaling_trigger_str(self
, trigger
):
1585 SCALING_TRIGGER_STRS
= {
1586 NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1587 NsdBaseYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1588 NsdBaseYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1589 NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1592 return SCALING_TRIGGER_STRS
[trigger
]
1593 except Exception as e
:
1594 self
._log
.error("Scaling trigger mapping error for {} : {}".
1596 self
._log
.exception(e
)
1597 return "Unknown trigger"
1599 def generate_ssh_key_pair(self
, config_xact
):
1600 '''Generate a ssh key pair if required'''
1601 if self
._ssh
_key
_file
:
1602 self
._log
.debug("Key pair already generated")
1606 for cv
in self
.nsd_msg
.constituent_vnfd
:
1607 vnfd
= self
._get
_vnfd
(cv
.vnfd_id_ref
, config_xact
)
1608 if vnfd
and vnfd
.mgmt_interface
.ssh_key
:
1616 key
= ManoSshKey(self
._log
)
1617 path
= tempfile
.mkdtemp()
1618 key
.write_to_disk(name
=self
.id, directory
=path
)
1619 self
._ssh
_key
_file
= "file://{}".format(key
.private_key_file
)
1620 self
._ssh
_pub
_key
= key
.public_key
1621 except Exception as e
:
1622 self
._log
.exception("Error generating ssh key for {}: {}".
1623 format(self
.nsr_cfg_msg
.name
, e
))
1626 def instantiate_vls(self
):
1628 This function instantiates VLs for every VL in this Network Service
1630 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1632 for vlr_id
, vlr
in self
._vlrs
.items():
1633 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1635 if not isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
):
1636 self
._vls
_ready
.set()
1638 # Wait for the VLs to be ready before yielding control out
1639 self
._log
.debug("Waitng for %d VLs in NSR id %s to be active",
1640 len(self
._vlrs
), self
.id)
1642 self
._log
.debug("NSR id:%s, name:%s - Waiting for %d VLs to be ready",
1643 self
.id, self
.name
, len(self
._vlrs
))
1644 yield from self
._vls
_ready
.wait()
1646 self
._log
.debug("NSR id:%s, name:%s, No virtual links found",
1648 self
._vls
_ready
.set()
1650 self
._log
.info("All %d VLs in NSR id %s are active, start the VNFs",
1651 len(self
._vlrs
), self
.id)
1653 def create(self
, config_xact
):
1654 """ Create this network service"""
1655 self
._log
.debug("Create NS {} for {}".format(self
.name
, self
._project
.name
))
1656 # Create virtual links for all the external vnf
1657 # connection points in this NS
1658 yield from self
.create_vls()
1660 # Create VNFs in this network service
1661 yield from self
.create_vnfs(config_xact
)
1663 # Create VNFFG for network service
1664 self
.create_vnffgs()
1666 # Create Scaling Groups for each scaling group in NSD
1667 self
.create_scaling_groups()
1669 # Create Parameter Pools
1670 self
.create_param_pools()
1673 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1674 """ Apply config based on script for scale group """
1675 rift_var_root_dir
= os
.environ
['RIFT_VAR_ROOT']
1678 def add_vnfrs_data(vnfrs_list
):
1679 """ Add as a dict each of the VNFRs data """
1682 for vnfr
in vnfrs_list
:
1683 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1685 vnfr_data
['name'] = vnfr
.name
1686 if trigger
in [NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
,
1687 NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1688 # Get VNF management and other IPs, etc
1689 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1690 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1692 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1693 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1694 vnfr_data
['member_vnf_index_ref'] = opdata
.member_vnf_index_ref
1695 vnfr_data
['vdur_data'] = []
1696 for vdur
in opdata
.vdur
:
1698 vdur_data
['vm_name'] = vdur
.name
1699 vdur_data
['vm_mgmt_ip'] = vdur
.vm_management_ip
1700 vnfr_data
['vdur_data'].append(vdur_data
)
1701 except Exception as e
:
1702 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1703 format(vnfr
.name
, e
))
1706 vnfr_data
['connection_points'] = []
1707 for cp
in opdata
.connection_point
:
1709 con_pt
['name'] = cp
.name
1710 con_pt
['ip_address'] = cp
.ip_address
1711 vnfr_data
['connection_points'].append(con_pt
)
1712 except Exception as e
:
1713 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1714 format(vnfr
.name
, e
))
1716 vnfrs_data
.append(vnfr_data
)
1717 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1721 def add_nsr_data(nsr
):
1723 nsr_data
['name'] = nsr
.name
1726 if script
is None or len(script
) == 0:
1727 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1730 if script
[0] == '/':
1733 path
= os
.path
.join(rift_var_root_dir
,
1734 'launchpad/packages/nsd',
1736 self
.nsd_id
, 'scripts',
1739 if not os
.path
.exists(path
):
1740 self
._log
.error("Config failed for scale group {}: Script does not exist at {}".
1741 format(group
.name
, path
))
1744 # Build a YAML file with all parameters for the script to execute
1745 # The data consists of 5 sections
1747 # 2. Scale group config
1748 # 3. VNFRs in the scale group
1749 # 4. VNFRs outside scale group
1752 data
['trigger'] = group
.trigger_map(trigger
)
1753 data
['config'] = group
.group_msg
.as_dict()
1756 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1758 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1760 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1761 data
["nsr"] = add_nsr_data(self
)
1764 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1765 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1768 self
._log
.debug("Creating a temp file: {} with input data: {}".
1769 format(tmp_file
.name
, data
))
1771 cmd
= "{} {}".format(path
, tmp_file
.name
)
1772 self
._log
.debug("Running the CMD: {}".format(cmd
))
1773 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1774 rc
= yield from proc
.wait()
1776 self
._log
.error("The script {} for scale group {} config returned: {}".
1777 format(script
, group
.name
, rc
))
1785 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1786 """ Apply the config for the scaling group based on trigger """
1787 if group
is None or scale_instance
is None:
1791 def update_config_status(success
=True, err_msg
=None):
1793 We are trying to determine the scaling instance's config status
1794 as a collation of the config status associated with 4 different triggers
1796 self
._log
.debug("Update %s scaling config status to %r : %s",
1797 scale_instance
, success
, err_msg
)
1798 if (scale_instance
.config_status
== "failed"):
1799 # Do not update the config status if it is already in failed state
1802 if scale_instance
.config_status
== "configured":
1803 # Update only to failed state an already configured scale instance
1805 scale_instance
.config_status
= "failed"
1806 scale_instance
.config_err_msg
= err_msg
1807 yield from self
.update_state()
1809 # We are in configuring state
1810 # Only after post scale out mark instance as configured
1811 if trigger
== NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
:
1813 scale_instance
.config_status
= "configured"
1814 for vnfr
in scale_instance
.vnfrs
:
1815 if vnfr
.config_status
== "configuring":
1816 vnfr
.vnfr_msg
.config_status
= "configured"
1817 yield from vnfr
.update_vnfm()
1819 scale_instance
.config_status
= "failed"
1820 scale_instance
.config_err_msg
= err_msg
1822 yield from self
.update_state()
1823 # Publish config state as update_state seems to care only operational status
1824 yield from self
.publish()
1826 config
= group
.trigger_config(trigger
)
1828 if trigger
== NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
:
1829 self
._log
.debug("No config needed, update %s scaling config status to configured",
1831 scale_instance
.config_status
= "configured"
1834 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1835 if config
.has_field("ns_service_primitive_name_ref"):
1836 config_name
= config
.ns_service_primitive_name_ref
1837 nsd_msg
= self
.nsd_msg
1838 config_primitive
= None
1839 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1840 if ns_cfg_prim
.name
== config_name
:
1841 config_primitive
= ns_cfg_prim
1844 if config_primitive
is None:
1845 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1847 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1848 if config_primitive
.has_field("user_defined_script"):
1849 script_path
= '/'.join(["launchpad/packages/nsd", self
._project
.name
, nsd_msg
.id, "scripts", config_primitive
.user_defined_script
])
1850 rc
= yield from self
.apply_scale_group_config_script(script_path
,
1851 group
, scale_instance
, trigger
, vnfrs
)
1854 err_msg
= "Failed config for trigger {} using config script '{}'". \
1855 format(self
.scaling_trigger_str(trigger
),
1856 config_primitive
.user_defined_script
)
1857 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1860 err_msg
= "Failed config for trigger {} as config script is not specified". \
1861 format(self
.scaling_trigger_str(trigger
))
1862 yield from update_config_status(success
=False, err_msg
=err_msg
)
1863 raise NotImplementedError("Only script based config support for scale group for now: {}".
1866 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1867 format(self
.scaling_trigger_str(trigger
))
1868 yield from update_config_status(success
=False, err_msg
=err_msg
)
1869 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1873 def create_scaling_groups(self
):
1874 """ This function creates a NSScalingGroup for every scaling
1875 group defined in he NSD"""
1877 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1878 self
._log
.debug("Found scaling_group %s in nsr id %s",
1879 scaling_group_msg
.name
, self
.id)
1881 group_record
= scale_group
.ScalingGroup(
1886 self
._scaling
_groups
[group_record
.name
] = group_record
1889 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1890 group
= self
._scaling
_groups
[group_name
]
1891 scale_instance
= group
.create_instance(index
, is_default
)
1895 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1896 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1899 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1900 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1901 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1903 datacenter_name
= self
._get
_vnfd
_datacenter
(const_vnfd_msg
.member_vnf_index
)
1904 if datacenter_name
is None:
1905 datacenter_name
= self
._datacenter
_name
1906 for _
in range(count
):
1907 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, datacenter_name
, group_name
, index
)
1908 scale_instance
.add_vnfr(vnfr
)
1913 def instantiate_instance():
1914 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1915 vnfrs
= yield from create_vnfs()
1916 yield from self
.publish()
1918 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1919 scale_instance
.operational_status
= "vnf_init_phase"
1920 yield from self
.update_state()
1923 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1924 group
, scale_instance
, vnfrs
)
1926 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1927 format(group
.name
, index
))
1928 scale_instance
.operational_status
= "failed"
1930 yield from self
.instantiate_vnfs(vnfrs
, scaleout
=True)
1933 except Exception as e
:
1934 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1935 format(group
.name
, e
))
1936 self
._log
.exception(e
)
1937 scale_instance
.operational_status
= "failed"
1939 yield from self
.update_state()
1941 yield from instantiate_instance()
1944 def delete_scale_group_instance(self
, group_name
, index
):
1945 group
= self
._scaling
_groups
[group_name
]
1946 scale_instance
= group
.get_instance(index
)
1947 if scale_instance
.is_default
:
1948 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1950 scale_instance
.operational_status
= "terminate"
1951 yield from self
.update_state()
1954 def terminate_instance():
1955 self
._log
.debug("Terminating scaling instance %s VNFRS" % scale_instance
)
1956 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
,
1957 group
, scale_instance
)
1959 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1960 format(group
.name
, index
))
1962 # Going ahead with terminate, even if there is an error in pre-scale-in config
1963 # as this could be result of scale out failure and we need to cleanup this group
1964 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
, scalein
=True)
1965 group
.delete_instance(index
)
1967 scale_instance
.operational_status
= "vnf_terminate_phase"
1968 yield from self
.update_state()
1970 yield from terminate_instance()
1973 def _update_scale_group_instances_status(self
):
1975 def post_scale_out_task(group
, instance
):
1976 # Apply post scale out config once all VNFRs are active
1977 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
,
1979 instance
.operational_status
= "running"
1981 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1982 format(group
.name
, instance
.instance_id
))
1984 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1985 format(group
.name
, instance
.instance_id
))
1987 yield from self
.update_state()
1989 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1990 for group
, instances
in group_instances
.items():
1991 self
._log
.debug("Updating %s instance status", group
)
1992 for instance
in instances
:
1993 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
1994 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
1995 if instance
.operational_status
== "vnf_init_phase":
1996 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
1997 instance
.operational_status
= "running"
1999 # Create a task for post scale out to allow us to sleep before attempting
2000 # to configure newly created VM's
2001 self
._loop
.create_task(post_scale_out_task(group
, instance
))
2003 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
2004 self
._log
.debug("Scale out for group {} and instance {} failed".
2005 format(group
.name
, instance
.instance_id
))
2006 instance
.operational_status
= "failed"
2008 elif instance
.operational_status
== "vnf_terminate_phase":
2009 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
2010 instance
.operational_status
= "terminated"
2011 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.POST_SCALE_IN
,
2014 self
._log
.debug("Scale in for group {} and instance {} succeeded".
2015 format(group
.name
, instance
.instance_id
))
2017 self
._log
.error("Post scale in config for scale group {} ({}) failed".
2018 format(group
.name
, instance
.instance_id
))
2020 def create_vnffgs(self
):
2021 """ This function creates VNFFGs for every VNFFG in the NSD
2022 associated with this NSR"""
2024 for vnffgd
in self
.nsd_msg
.vnffgd
:
2025 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
2026 vnffgr
= VnffgRecord(self
._dts
,
2029 self
._nsm
._vnffgmgr
,
2033 self
._sdn
_account
_name
,
2034 self
._datacenter
_name
2036 self
._vnffgrs
[vnffgr
.id] = vnffgr
2038 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
2039 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
2040 if not vld
.has_field('ip_profile_ref'):
2042 profile
= [profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
2043 return profile
[0] if profile
else None
2046 def _create_vls(self
, vld
, datacenter
):
2047 """Create a VLR in the cloud account specified using the given VLD
2051 datacenter : Cloud account name
2056 vlr
= yield from VirtualLinkRecord
.create_record(
2064 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
2066 restart_mode
=self
.restart_mode
)
2070 def _extract_datacenters_for_vl(self
, vld
):
2072 Extracts the list of cloud accounts from the NS Config obj
2075 1. Cloud accounts based connection point (vnf_datacenter_map)
2077 vld : VLD yang object
2082 datacenter_list
= []
2084 if self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
2085 # Handle case where datacenter is None
2086 vnf_datacenter_map
= {}
2087 for vnf
in self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
2088 if vnf
.datacenter
is not None or vnf
.datacenter
is not None:
2089 vnf_datacenter_map
[vnf
.member_vnf_index_ref
] = \
2092 for vnfc
in vld
.vnfd_connection_point_ref
:
2093 datacenter
= vnf_datacenter_map
.get(
2094 vnfc
.member_vnf_index_ref
, self
._datacenter
_name
)
2096 datacenter_list
.append(datacenter
)
2098 if self
._nsr
_cfg
_msg
.vl_datacenter_map
:
2099 for vld_map
in self
._nsr
_cfg
_msg
.vl_datacenter_map
:
2100 if vld_map
.vld_id_ref
== vld
.id:
2101 for datacenter
in vld_map
.datacenters
:
2102 datacenter_list
.append(datacenter
)
2104 # If no config has been provided then fall-back to the default
2106 if not datacenter_list
:
2107 datacenter_list
.append(self
._datacenter
_name
)
2109 self
._log
.debug("VL {} data center list: {}".
2110 format(vld
.name
, datacenter_list
))
2111 return set(datacenter_list
)
2114 def create_vls(self
):
2115 """ This function creates VLs for every VLD in the NSD
2116 associated with this NSR"""
2117 for vld
in self
.nsd_msg
.vld
:
2119 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
2120 datacenter_list
= self
._extract
_datacenters
_for
_vl
(vld
)
2121 for datacenter
in datacenter_list
:
2122 vlr
= yield from self
._create
_vls
(vld
, datacenter
)
2123 self
._vlrs
[vlr
.id] = vlr
2124 self
._nsm
.add_vlr_id_nsr_map(vlr
.id, self
)
2127 def create_vl_instance(self
, vld
):
2128 self
._log
.error("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
2129 # Check if the VL is already present
2131 for vl_id
, vl
in self
._vlrs
.items():
2132 if vl
.vld_msg
.id == vld
.id:
2133 self
._log
.error("The VLD %s already in NSR %s as VLR %s with status %s",
2134 vld
.id, self
.id, vl
.id, vl
.state
)
2136 if vlr
.state
!= VlRecordState
.TERMINATED
:
2137 err_msg
= "VLR for VL {} in NSR {} already instantiated". \
2138 format(vld
, self
.id)
2139 self
._log
.error(err_msg
)
2140 raise NsrVlUpdateError(err_msg
)
2144 datacenter_list
= self
._extract
_datacenters
_for
_vl
(vld
)
2145 for datacenter
in datacenter_list
:
2146 vlr
= yield from self
._create
_vls
(vld
, account
, datacenter
)
2147 self
._vlrs
[vlr
.id] = vlr
2148 self
._nsm
.add_vlr_id_nsr_map(vlr
.id, self
)
2150 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
2151 yield from self
.update_state()
2154 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
2156 except Exception as e
:
2157 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
2158 format(self
.id, vld
.id, e
)
2159 self
._log
.error(err_msg
)
2160 self
._log
.exception(e
)
2161 vlr
.state
= VlRecordState
.FAILED
2163 yield from self
.update_state()
2166 def delete_vl_instance(self
, vld
):
2167 for vlr_id
, vlr
in self
._vlrs
.items():
2168 if vlr
.vld_msg
.id == vld
.id:
2169 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
2170 vlr
.id, vld
.id, self
.id)
2171 vlr
.state
= VlRecordState
.TERMINATE_PENDING
2172 yield from self
.update_state()
2175 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2176 vlr
.state
= VlRecordState
.TERMINATED
2178 self
.remove_vlr_id_nsr_map(vlr
.id)
2180 except Exception as e
:
2181 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
2182 format(self
.id, vld
.id, e
)
2183 self
._log
.error(err_msg
)
2184 self
._log
.exception(e
)
2185 vlr
.state
= VlRecordState
.FAILED
2187 yield from self
.update_state()
2191 def create_vnfs(self
, config_xact
):
2193 This function creates VNFs for every VNF in the NSD
2194 associated with this NSR
2196 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
2197 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2199 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
2200 if not const_vnfd
.start_by_default
:
2201 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
2202 const_vnfd
.member_vnf_index
)
2205 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
2206 datacenter_name
= self
._get
_vnfd
_datacenter
(const_vnfd
.member_vnf_index
)
2207 if datacenter_name
is None:
2208 datacenter_name
= self
._datacenter
_name
2209 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, datacenter_name
)
2211 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
2212 placement_groups
= []
2213 for group
in self
.nsd_msg
.placement_groups
:
2214 for member_vnfd
in group
.member_vnfd
:
2215 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
2216 (member_vnfd
.member_vnf_index_ref
== str(const_vnfd
.member_vnf_index
)):
2217 group_info
= self
.resolve_placement_group_cloud_construct(group
)
2218 if group_info
is None:
2219 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
2220 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2222 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2225 const_vnfd
.member_vnf_index
)
2226 placement_groups
.append(group_info
)
2227 return placement_groups
2229 def get_cloud_config(self
):
2230 cloud_config
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_CloudConfig()
2231 self
._log
.debug("Received key pair is {}".format(self
._key
_pairs
))
2233 for authorized_key
in self
.nsr_cfg_msg
.ssh_authorized_key
:
2234 if authorized_key
.key_pair_ref
in self
._key
_pairs
:
2235 key_pair
= cloud_config
.key_pair
.add()
2236 key_pair
.from_dict(self
._key
_pairs
[authorized_key
.key_pair_ref
].as_dict())
2237 for nsd_key_pair
in self
.nsd_msg
.key_pair
:
2238 key_pair
= cloud_config
.key_pair
.add()
2239 key_pair
.from_dict(key_pair
.as_dict())
2240 for nsr_cfg_user
in self
.nsr_cfg_msg
.user
:
2241 user
= cloud_config
.user
.add()
2242 user
.name
= nsr_cfg_user
.name
2243 user
.user_info
= nsr_cfg_user
.user_info
2244 for ssh_key
in nsr_cfg_user
.ssh_authorized_key
:
2245 if ssh_key
.key_pair_ref
in self
._key
_pairs
:
2246 key_pair
= user
.key_pair
.add()
2247 key_pair
.from_dict(self
._key
_pairs
[ssh_key
.key_pair_ref
].as_dict())
2248 for nsd_user
in self
.nsd_msg
.user
:
2249 user
= cloud_config
.user
.add()
2250 user
.from_dict(nsd_user
.as_dict())
2252 self
._log
.debug("Formed cloud-config msg is {}".format(cloud_config
))
2256 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, datacenter_name
, group_name
=None, group_instance_id
=None):
2257 # Fetch the VNFD associated with this VNF
2258 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
2259 cloud_config
= self
.get_cloud_config()
2260 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,datacenter_name
)
2261 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2263 const_vnfd
.member_vnf_index
,
2264 [ group
.name
for group
in placement_groups
])
2266 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
2281 restart_mode
=self
.restart_mode
,
2283 if vnfr
.id in self
._vnfrs
:
2284 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
2285 raise NetworkServiceRecordError(err
)
2287 self
._vnfrs
[vnfr
.id] = vnfr
2288 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
2290 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2292 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2298 def create_param_pools(self
):
2299 for param_pool
in self
.nsd_msg
.parameter_pool
:
2300 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2302 start_value
= param_pool
.range.start_value
2303 end_value
= param_pool
.range.end_value
2304 if end_value
< start_value
:
2305 raise NetworkServiceRecordError(
2306 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2307 start_value
, end_value
2311 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2314 range(start_value
, end_value
)
2318 def fetch_vnfr(self
, vnfr_path
):
2319 """ Fetch VNFR record """
2321 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2323 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2325 for ent
in res_iter
:
2326 res
= yield from ent
2332 def instantiate_vnfs(self
, vnfrs
, scaleout
=False):
2334 This function instantiates VNFs for every VNF in this Network Service
2337 def instantiate_vnf(vnf
):
2338 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2339 vnfd_id
= vnf
.vnfr_msg
.vnfd
.id
2340 for dependency_vnf
in dependencies
[vnfd_id
]:
2341 while dependency_vnf
not in self
.instantiated
:
2342 yield from asyncio
.sleep(1, loop
=self
._loop
)
2344 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
,scaleout
)
2345 self
.instantiated
.add(vnfd_id
)
2347 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2348 dependencies
= collections
.defaultdict(list)
2349 for dependency_vnf
in self
._nsr
_cfg
_msg
.nsd
.vnf_dependency
:
2350 dependencies
[dependency_vnf
.vnf_source_ref
].append(dependency_vnf
.vnf_depends_on_ref
)
2352 # The dictionary copy is to ensure that if a terminate is initiated right after instantiation, the
2353 # Runtime error for "dictionary changed size during iteration" does not occur.
2354 # vnfrs - 'dict_values' object
2355 # vnfrs_copy - list object
2356 vnfrs_copy
= list(vnfrs
)
2358 for vnf
in vnfrs_copy
:
2359 vnf_task
= self
._loop
.create_task(instantiate_vnf(vnf
))
2360 tasks
.append(vnf_task
)
2363 self
._log
.debug("Waiting for %s instantiate_vnf tasks to complete", len(tasks
))
2364 done
, pending
= yield from asyncio
.wait(tasks
, loop
=self
._loop
, timeout
=30)
2366 self
._log
.error("The Instantiate vnf task timed out after 30 seconds.")
2367 raise VirtualNetworkFunctionRecordError("Task tied out : ", pending
)
2370 def instantiate_vnffgs(self
):
2372 This function instantiates VNFFGs for every VNFFG in this Network Service
2374 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2375 len(self
.nsd_msg
.vnffgd
), self
.id)
2376 for _
, vnfr
in self
.vnfrs
.items():
2377 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2378 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2379 yield from asyncio
.sleep(2, loop
=self
._loop
)
2380 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2381 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2384 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2385 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2388 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2389 yield from asyncio
.sleep(90, loop
=self
._loop
)
2390 self
._log
.info("Starting VNFFG orchestration")
2391 for vnffg
in self
._vnffgrs
.values():
2392 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2393 yield from vnffg
.instantiate()
2396 def instantiate_scaling_instances(self
, config_xact
):
2397 """ Instantiate any default scaling instances in this Network Service """
2398 for group
in self
._scaling
_groups
.values():
2399 for i
in range(group
.min_instance_count
):
2400 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2401 yield from self
.create_scale_group_instance(
2402 group
.name
, i
, config_xact
, is_default
=True
2405 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2406 if group_msg
.scaling_group_name_ref
!= group
.name
:
2409 for instance
in group_msg
.instance
:
2410 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2411 yield from self
.create_scale_group_instance(
2412 group
.name
, instance
.id, config_xact
, is_default
=False
2415 def has_scaling_instances(self
):
2416 """ Return boolean indicating if the network service has default scaling groups """
2417 for group
in self
._scaling
_groups
.values():
2418 if group
.min_instance_count
> 0:
2421 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2422 if len(group_msg
.instance
) > 0:
2429 """ This function publishes this NSR """
2431 self
._nsr
_msg
= self
.create_msg()
2433 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2437 if self
._debug
_running
:
2438 self
._log
.debug("Publishing NSR in RUNNING state!")
2441 yield from self
._nsm
.nsr_handler
.update(None, self
.nsr_xpath
, self
._nsr
_msg
)
2442 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2443 self
._debug
_running
= True
2446 def unpublish(self
, xact
=None):
2447 """ Unpublish this NSR object """
2448 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2450 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2453 def nsr_xpath(self
):
2454 """ Returns the xpath associated with this NSR """
2455 return self
._project
.add_project((
2456 "D,/nsr:ns-instance-opdata" +
2457 "/nsr:nsr[nsr:ns-instance-config-ref={}]"
2458 ).format(quoted_key(self
.id)))
2461 def xpath_from_nsr(nsr
):
2462 """ Returns the xpath associated with this NSR op data"""
2463 return (NetworkServiceRecord
.XPATH
+
2464 "[nsr:ns-instance-config-ref={}]").format(quoted_key(nsr
.id))
2467 def nsd_xpath(self
):
2468 """ Return NSD config xpath."""
2469 return self
._project
.add_project((
2470 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id={}]"
2471 ).format(quoted_key(self
.nsd_id
)))
2474 def instantiate(self
, config_xact
):
2475 """"Instantiates a NetworkServiceRecord.
2477 This function instantiates a Network service
2478 which involves the following steps,
2480 * Instantiate every VL in NSD by sending create VLR request to DTS.
2481 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2482 * Publish the NSR details to DTS
2485 nsr: The NSR configuration request containing nsr-id and nsd
2486 config_xact: The configuration transaction which initiated the instatiation
2489 NetworkServiceRecordError if the NSR creation fails
2495 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2497 # Move the state to INIITALIZING
2498 self
.set_state(NetworkServiceRecordState
.INIT
)
2500 event_descr
= "Instantiation Request Received NSR Id: %s, NS Name: %s" % (self
.id, self
.name
)
2501 self
.record_event("instantiating", event_descr
)
2504 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2506 # Merge any config and initial config primitive values
2507 self
.config_store
.merge_nsd_config(self
.nsd_msg
, self
._project
.name
)
2508 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2510 event_descr
= "Fetched NSD with descriptor id %s, NS Name: %s" % (self
.nsd_id
, self
.name
)
2511 self
.record_event("nsd-fetched", event_descr
)
2513 if self
._nsd
is None:
2514 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2515 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2516 raise NetworkServiceRecordError(self
)
2518 self
._log
.debug("Got nsd result %s", self
._nsd
)
2520 # Substitute any input parameters
2521 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2524 yield from self
.create(config_xact
)
2526 # Publish the NSR to DTS
2527 yield from self
.publish()
2530 def do_instantiate():
2532 Instantiate network service
2534 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2535 self
.id, self
.nsd_id
)
2537 # instantiate the VLs
2538 event_descr
= ("Instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2539 (len(self
.nsd_msg
.vld
), self
.id, self
.name
))
2540 self
.record_event("begin-external-vls-instantiation", event_descr
)
2542 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2544 # Publish the NSR to DTS
2545 yield from self
.publish()
2547 if self
._ns
_terminate
_received
:
2548 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-external-vls-instantiation.")
2549 # Setting this flag as False again as this is a state where neither VL or VNF have been instantiated.
2550 self
._ns
_terminate
_received
= False
2551 # At this stage only ns-instance opdata is published. Cleaning up the record.
2552 yield from self
.unpublish()
2555 yield from self
.instantiate_vls()
2557 event_descr
= ("Finished instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2558 (len(self
.nsd_msg
.vld
), self
.id, self
.name
))
2559 self
.record_event("end-external-vls-instantiation", event_descr
)
2561 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2563 # Publish the NSR to DTS
2564 yield from self
.publish()
2566 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2567 self
.id, self
.nsd_id
)
2569 # instantiate the VNFs
2570 event_descr
= ("Instantiating %s VNFS for NSR id: %s, NS Name: %s " %
2571 (len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
))
2573 self
.record_event("begin-vnf-instantiation", event_descr
)
2575 if self
._ns
_terminate
_received
:
2576 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : end-external-vls-instantiation.")
2579 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2581 self
._log
.debug(" Finished instantiating %d VNFs for NSR id: %s, NS Name: %s",
2582 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
)
2584 event_descr
= ("Finished instantiating %s VNFs for NSR id: %s, NS Name: %s" %
2585 (len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
))
2586 self
.record_event("end-vnf-instantiation", event_descr
)
2588 # Publish the NSR to DTS
2589 yield from self
.publish()
2591 if len(self
.vnffgrs
) > 0:
2592 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2593 event_descr
= ("Instantiating %s VNFFGS for NSR id: %s, NS Name: %s" %
2594 (len(self
.nsd_msg
.vnffgd
), self
.id, self
.name
))
2596 self
.record_event("begin-vnffg-instantiation", event_descr
)
2598 if self
._ns
_terminate
_received
:
2599 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-vnffg-instantiation.")
2602 yield from self
.instantiate_vnffgs()
2604 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id: %s, NS Name: %s" %
2605 (len(self
.nsd_msg
.vnffgd
), self
.id, self
.name
))
2606 self
.record_event("end-vnffg-instantiation", event_descr
)
2608 if self
.has_scaling_instances():
2609 event_descr
= ("Instantiating %s Scaling Groups for NSR id: %s, NS Name: %s" %
2610 (len(self
._scaling
_groups
), self
.id, self
.name
))
2612 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2614 if self
._ns
_terminate
_received
:
2615 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-scaling-group-instantiation.")
2618 yield from self
.instantiate_scaling_instances(config_xact
)
2619 self
.record_event("end-scaling-group-instantiation", event_descr
)
2621 # Give the plugin a chance to deploy the network service now that all
2622 # virtual links and vnfs are instantiated
2623 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2625 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s], for NS[%s]",
2626 self
.id, self
.nsd_id
, self
.name
)
2628 # Publish the NSR to DTS
2629 yield from self
.publish()
2631 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s], for NS[%s]",
2632 self
.id, self
.nsd_id
, self
.name
)
2634 def on_instantiate_done(fut
):
2635 # If the do_instantiate fails, then publish NSR with failed result
2638 import traceback
, sys
2639 print(traceback
.format_exception(None,e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
2640 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(e
))
2641 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(e
)))
2643 instantiate_task
= self
._loop
.create_task(do_instantiate())
2644 instantiate_task
.add_done_callback(on_instantiate_done
)
2647 def set_config_status(self
, status
, status_details
=None):
2648 if self
.config_status
!= status
:
2649 self
._log
.debug("Updating NSR {} status for {} to {}".
2650 format(self
.name
, self
.config_status
, status
))
2651 self
._config
_status
= status
2652 self
._config
_status
_details
= status_details
2654 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2655 self
.record_event("config-failed", "NS configuration failed",
2656 evt_details
=self
._config
_status
_details
)
2658 yield from self
.publish()
2660 if status
== NsrYang
.ConfigStates
.TERMINATE
:
2661 yield from self
.terminate_ns_cont()
2664 def is_active(self
):
2665 """ This NS is active """
2666 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2670 # Publish the NSR to DTS
2671 self
._log
.debug("Network service %s is active ", self
.id)
2672 self
._is
_active
= True
2674 event_descr
= "NSR in running state for NSR id: %s, NS Name: %s" % (self
.id, self
.name
)
2675 self
.record_event("ns-running", event_descr
)
2677 yield from self
.publish()
2680 def instantiation_failed(self
, failed_reason
=None):
2681 """ The NS instantiation failed"""
2682 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2684 self
.set_state(NetworkServiceRecordState
.FAILED
)
2685 self
._is
_failed
= True
2687 event_descr
= "Instantiation of NS %s - %s failed" % (self
.id, self
.name
)
2688 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2690 # Publish the NSR to DTS
2691 yield from self
.publish()
2694 def terminate_vnfrs(self
, vnfrs
, scalein
=False):
2695 """ Terminate VNFRS in this network service """
2696 self
._log
.debug("Terminating VNFs in network service %s - %s", self
.id, self
.name
)
2698 for vnfr
in list(vnfrs
):
2699 self
._log
.debug("Terminating VNFs in network service %s %s", vnfr
.id, self
.id)
2700 yield from self
.nsm_plugin
.terminate_vnf(self
, vnfr
, scalein
=scalein
)
2701 vnfr_ids
.append(vnfr
.id)
2703 for vnfr_id
in vnfr_ids
:
2704 self
._vnfrs
.pop(vnfr_id
, None)
2707 def terminate(self
):
2708 """Start terminate of a NetworkServiceRecord."""
2709 # Move the state to TERMINATE
2710 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2711 event_descr
= "Terminate being processed for NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2712 self
.record_event("terminate", event_descr
)
2713 self
._log
.debug("Terminating network service id: %s, NS Name: %s", self
.id, self
.name
)
2715 # Adding the NSR ID on terminate Evet. This will be checked to halt the instantiation if not already finished.
2716 self
._ns
_terminate
_received
= True
2718 yield from self
.publish()
2721 # IN case the instantiation failed, then trigger a cleanup immediately
2722 # don't wait for Cfg manager, as it will have no idea of this NSR.
2723 # Due to the failure
2724 yield from self
.terminate_ns_cont()
2728 def terminate_ns_cont(self
):
2729 """Config script related to terminate finished, continue termination"""
2730 def terminate_vnffgrs():
2731 """ Terminate VNFFGRS in this network service """
2732 self
._log
.debug("Terminating VNFFGRs in network service %s - %s", self
.id, self
.name
)
2733 for vnffgr
in self
.vnffgrs
.values():
2734 yield from vnffgr
.terminate()
2736 def terminate_vlrs():
2737 """ Terminate VLRs in this netork service """
2738 self
._log
.debug("Terminating VLs in network service %s - %s", self
.id, self
.name
)
2739 for vlr_id
, vlr
in self
.vlrs
.items():
2740 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2741 vlr
.state
= VlRecordState
.TERMINATED
2743 # Move the state to VNF_TERMINATE_PHASE
2744 self
._log
.debug("Terminating VNFFGs in NS ID: %s, NS Name: %s", self
.id, self
.name
)
2745 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2746 event_descr
= "Terminating VNFFGS in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2747 self
.record_event("terminating-vnffgss", event_descr
)
2748 yield from terminate_vnffgrs()
2750 # Move the state to VNF_TERMINATE_PHASE
2751 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2752 event_descr
= "Terminating VNFS in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2753 self
.record_event("terminating-vnfs", event_descr
)
2754 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2756 # Move the state to VL_TERMINATE_PHASE
2757 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2758 event_descr
= "Terminating VLs in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2759 self
.record_event("terminating-vls", event_descr
)
2760 yield from terminate_vlrs()
2761 yield from self
.nsm_plugin
.terminate_ns(self
)
2762 # Remove the generated SSH key
2763 if self
._ssh
_key
_file
:
2764 p
= urlparse(self
._ssh
_key
_file
)
2766 path
= os
.path
.dirname(p
[2])
2767 self
._log
.debug("NSR {}: Removing keys in {}".format(self
.name
,
2769 shutil
.rmtree(path
, ignore_errors
=True)
2771 # Move the state to TERMINATED
2772 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2773 event_descr
= "Terminated NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2774 self
.record_event("terminated", event_descr
)
2776 # Unpublish the NSR record
2777 self
._log
.debug("Unpublishing the network service %s - %s", self
.id, self
.name
)
2778 yield from self
.unpublish()
2780 # Finaly delete the NS instance from this NS Manager
2781 self
._log
.debug("Deleting the network service %s - %s", self
.id, self
.name
)
2782 self
.nsm
.delete_nsr(self
.id)
2785 """"Enable a NetworkServiceRecord."""
2789 """"Disable a NetworkServiceRecord."""
2792 def map_config_status(self
):
2793 self
._log
.debug("Config status for ns {} is {}".
2794 format(self
.name
, self
._config
_status
))
2795 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2796 return 'configuring'
2797 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2801 def vl_phase_completed(self
):
2802 """ Are VLs created in this NS?"""
2803 return self
._vl
_phase
_completed
2805 def vnf_phase_completed(self
):
2806 """ Are VLs created in this NS?"""
2807 return self
._vnf
_phase
_completed
2809 def create_msg(self
):
2810 """ The network serice record as a message """
2811 nsr_dict
= {"ns_instance_config_ref": self
.id}
2812 nsr
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2813 #nsr.datacenter = self.cloud_account_name
2814 nsr
.sdn_account
= self
._sdn
_account
_name
2815 nsr
.name_ref
= self
.name
2816 nsr
.nsd_ref
= self
.nsd_id
2817 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2818 nsr
.operational_events
= self
._op
_status
.msg
2819 nsr
.operational_status
= self
._op
_status
.yang_str()
2820 nsr
.config_status
= self
.map_config_status()
2821 nsr
.config_status_details
= self
._config
_status
_details
2822 nsr
.create_time
= self
._create
_time
2823 nsr
.uptime
= int(time
.time()) - self
._create
_time
2825 # Added for OpenMano
2827 nsr
.orchestration_progress
.networks
.total
= len(self
.nsd_msg
.vld
)
2828 if isinstance(self
.nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
2829 # Taking the last update by OpenMano
2830 nsr
.orchestration_progress
.networks
.active
= self
.nsm_plugin
._openmano
_nsrs
[self
.id]._active
_nets
2832 nsr
.orchestration_progress
.networks
.active
= self
._active
_networks
2834 for vnfr_id
, vnfr
in self
._vnfrs
.items():
2835 no_of_vdus
+= len(vnfr
.vnfd
.vdu
)
2837 nsr
.orchestration_progress
.vms
.total
= no_of_vdus
2838 if isinstance(self
.nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
2839 # Taking the last update by OpenMano
2840 nsr
.orchestration_progress
.vms
.active
= self
.nsm_plugin
._openmano
_nsrs
[self
.id]._active
_vms
2842 nsr
.orchestration_progress
.vms
.active
= self
._active
_vms
2845 if self
._ssh
_pub
_key
:
2846 nsr
.ssh_key_generated
.private_key_file
= self
._ssh
_key
_file
2847 nsr
.ssh_key_generated
.public_key
= self
._ssh
_pub
_key
2849 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2850 cfg_prim
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2852 nsr
.service_primitive
.append(cfg_prim
)
2854 for init_cfg
in self
.nsd_msg
.initial_service_primitive
:
2855 prim
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_InitialServicePrimitive
.from_dict(
2857 nsr
.initial_service_primitive
.append(prim
)
2859 for term_cfg
in self
.nsd_msg
.terminate_service_primitive
:
2860 prim
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_TerminateServicePrimitive
.from_dict(
2862 nsr
.terminate_service_primitive
.append(prim
)
2864 if self
.vl_phase_completed():
2865 for vlr_id
, vlr
in self
.vlrs
.items():
2866 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2868 if self
.vnf_phase_completed():
2869 for vnfr_id
in self
.vnfrs
:
2870 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2871 for vnffgr
in self
.vnffgrs
.values():
2872 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2873 for scaling_group
in self
._scaling
_groups
.values():
2874 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2878 def all_vnfs_active(self
):
2879 """ Are all VNFS in this NS active? """
2880 for _
, vnfr
in self
.vnfrs
.items():
2881 if vnfr
.active
is not True:
2886 def update_state(self
):
2887 """ Re-evaluate this NS's state """
2888 curr_state
= self
._op
_status
.state
2890 # This means that the terminate has been fired before the NS was UP.
2891 if self
._ns
_terminate
_received
:
2892 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
2893 self
._ns
_terminate
_received
= False
2894 yield from self
.terminate_ns_cont()
2896 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2897 self
._log
.debug("NS (%s - %s) in terminated state, not updating state", self
.id, self
.name
)
2900 new_state
= NetworkServiceRecordState
.RUNNING
2901 self
._log
.debug("Received update_state for nsr: %s, curr-state: %s",
2902 self
.id, curr_state
)
2905 if (isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
)):
2906 for vlr_id
, vl
in self
.vlrs
.items():
2907 self
._log
.debug("VLR %s state %s", vlr_id
, vl
.state
)
2908 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2910 elif vl
.state
== VlRecordState
.FAILED
:
2911 if vl
.prev_state
!= vl
.state
:
2912 event_descr
= "Instantiation of VL %s failed" % vl
.id
2913 event_error_details
= vl
.state_failed_reason
2914 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2915 vl
.prev_state
= vl
.state
2916 new_state
= NetworkServiceRecordState
.FAILED
2919 self
._log
.debug("VL already in failed state")
2921 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2922 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2925 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2926 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2929 # Check all the VNFRs are present
2930 if new_state
== NetworkServiceRecordState
.RUNNING
:
2931 for _
, vnfr
in self
.vnfrs
.items():
2932 self
._log
.debug("VNFR state %s", vnfr
.state
)
2933 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2935 for vnfr
in self
.vnfrs
:
2936 active_vdus
+= self
.nsm
._vnfrs
[vnfr
]._active
_vdus
2938 if self
._active
_vms
!= active_vdus
:
2939 self
._active
_vms
= active_vdus
2940 yield from self
.publish()
2944 elif vnfr
.state
== VnfRecordState
.FAILED
:
2945 if vnfr
._prev
_state
!= vnfr
.state
:
2946 event_descr
= "Instantiation of VNF %s for NS: %s failed" % (vnfr
.id, self
.name
)
2947 event_error_details
= vnfr
.state_failed_reason
2948 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2949 vnfr
.set_state(VnfRecordState
.FAILED
)
2951 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2952 vnfr
.state
, vnfr
._prev
_state
)
2953 new_state
= NetworkServiceRecordState
.FAILED
2956 self
._log
.debug("VNF %s in NSR %s - %s is still not active; current state is: %s",
2957 vnfr
.id, self
.id, self
.name
, vnfr
.state
)
2958 new_state
= curr_state
2960 # If new state is RUNNING; check VNFFGRs are also active
2961 if new_state
== NetworkServiceRecordState
.RUNNING
:
2962 for _
, vnffgr
in self
.vnffgrs
.items():
2963 self
._log
.debug("Checking vnffgr state for nsr %s is: %s",
2964 self
.id, vnffgr
.state
)
2965 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2967 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2968 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2969 self
.record_event("vnffg-failed", event_descr
)
2970 new_state
= NetworkServiceRecordState
.FAILED
2973 self
._log
.info("VNFFGR %s in NSR %s - %s is still not active; current state is: %s",
2974 vnffgr
.id, self
.id, self
.name
, vnffgr
.state
)
2975 new_state
= curr_state
2977 # Update all the scaling group instance operational status to
2978 # reflect the state of all VNFR within that instance
2979 yield from self
._update
_scale
_group
_instances
_status
()
2981 for _
, group
in self
._scaling
_groups
.items():
2982 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
2983 new_state
= NetworkServiceRecordState
.SCALING_OUT
2985 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
2986 new_state
= NetworkServiceRecordState
.SCALING_IN
2989 if new_state
!= curr_state
:
2990 self
._log
.debug("Changing state of Network service %s - %s from %s to %s",
2991 self
.id, self
.name
, curr_state
, new_state
)
2992 if new_state
== NetworkServiceRecordState
.RUNNING
:
2993 yield from self
.is_active()
2994 elif new_state
== NetworkServiceRecordState
.FAILED
:
2995 # If the NS is already active and we entered scaling_in, scaling_out,
2996 # do not mark the NS as failing if scaling operation failed.
2997 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
2998 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
2999 new_state
= NetworkServiceRecordState
.RUNNING
3000 self
.set_state(new_state
)
3002 yield from self
.instantiation_failed()
3004 self
.set_state(new_state
)
3006 yield from self
.publish()
3008 def vl_instantiation_state(self
):
3009 """ Check if all VLs in this NS are active """
3010 for vl_id
, vlr
in self
.vlrs
.items():
3011 if vlr
.state
== VlRecordState
.ACTIVE
:
3013 elif vlr
.state
== VlRecordState
.FAILED
:
3014 return VlRecordState
.FAILED
3015 elif vlr
.state
== VlRecordState
.TERMINATED
:
3016 return VlRecordState
.TERMINATED
3017 elif vlr
.state
== VlRecordState
.INSTANTIATION_PENDING
:
3018 return VlRecordState
.INSTANTIATION_PENDING
3020 self
._log
.error("vlr %s still in state %s", vlr
, vlr
.state
)
3021 raise VirtualLinkRecordError("Invalid state %s" %(vlr
.state
))
3022 return VlRecordState
.ACTIVE
3024 def vl_instantiation_successful(self
):
3025 """ Mark that all VLs in this NS are active """
3026 if self
._vls
_ready
.is_set():
3027 self
._log
.error("NSR id %s, vls_ready is already set", self
.id)
3029 if self
.vl_instantiation_state() == VlRecordState
.ACTIVE
:
3030 self
._log
.debug("NSR id %s, All %d vlrs are in active state %s",
3031 self
.id, len(self
.vlrs
), self
.vl_instantiation_state
)
3032 self
._vls
_ready
.set()
3034 def vlr_event(self
, vlr
, action
):
3035 self
._log
.debug("Received VLR %s with action:%s", vlr
, action
)
3037 if vlr
.id not in self
.vlrs
:
3038 self
._log
.error("VLR %s:%s received for unknown id, state:%s",
3039 vlr
.id, vlr
.name
, vlr
.operational_status
)
3042 vlr_local
= self
.vlrs
[vlr
.id]
3044 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3045 if vlr
.operational_status
== 'running':
3046 vlr_local
.set_state_from_op_status(vlr
.operational_status
)
3047 self
._active
_networks
+= 1
3048 self
._log
.info("VLR %s:%s moving to active state",
3050 elif vlr
.operational_status
== 'failed':
3051 vlr_local
.set_state_from_op_status(vlr
.operational_status
)
3052 vlr_local
.state_failed_reason
= vlr
.operational_status_details
3053 asyncio
.ensure_future(self
.update_state(), loop
=self
._loop
)
3054 self
._log
.info("VLR %s:%s moving to failed state",
3057 self
._log
.warning("VLR %s:%s received state:%s",
3058 vlr
.id, vlr
.name
, vlr
.operational_status
)
3060 if isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
):
3061 self
.vl_instantiation_successful()
3063 # self.update_state() is responsible for publishing the NSR state. Its being called by vlr_event and update_vnfr.
3064 # The call from vlr_event occurs only if vlr reaches a failed state. Hence implementing the check here to handle
3065 # ns terminate received after other vlr states as vl-alloc-pending, vl-init, running.
3066 if self
._ns
_terminate
_received
:
3067 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
3068 if vlr
.operational_status
in ['running', 'failed']:
3069 self
._ns
_terminate
_received
= False
3070 asyncio
.ensure_future(self
.terminate_ns_cont(), loop
=self
._loop
)
3073 class InputParameterSubstitution(object):
3075 This class is responsible for substituting input parameters into an NSD.
3078 def __init__(self
, log
, project
):
3079 """Create an instance of InputParameterSubstitution
3082 log - a logger for this object to use
3086 self
.project
= project
3088 def _fix_xpath(self
, xpath
):
3089 # Fix the parameter.xpath to include project and correct namespace
3090 self
.log
.debug("Provided xpath: {}".format(xpath
))
3091 #Split the xpath at the /
3092 attrs
= xpath
.split('/')
3094 for attr
in attrs
[1:]:
3095 new_ns
= 'project-nsd'
3098 # Includes namespace
3099 ns
, name
= attr
.split(':', 2)
3101 ns
= "rw-project-nsd"
3103 new_xp
= new_xp
+ '/' + new_ns
+ ':' + name
3105 updated_xpath
= self
.project
.add_project(new_xp
)
3107 self
.log
.error("Updated xpath: {}".format(updated_xpath
))
3108 return updated_xpath
3110 def __call__(self
, nsd
, nsr_config
):
3111 """Substitutes input parameters from the NSR config into the NSD
3113 This call modifies the provided NSD with the input parameters that are
3114 contained in the NSR config.
3117 nsd - a GI NSD object
3118 nsr_config - a GI NSR config object
3121 if nsd
is None or nsr_config
is None:
3124 # Create a lookup of the xpath elements that this descriptor allows
3126 optional_input_parameters
= set()
3127 for input_parameter
in nsd
.input_parameter_xpath
:
3128 optional_input_parameters
.add(input_parameter
.xpath
)
3130 # Apply the input parameters to the descriptor
3131 if nsr_config
.input_parameter
:
3132 for param
in nsr_config
.input_parameter
:
3133 if param
.xpath
not in optional_input_parameters
:
3134 msg
= "tried to set an invalid input parameter ({})"
3135 self
.log
.error(msg
.format(param
.xpath
))
3139 "input-parameter:{} = {}".format(
3146 xp
= self
._fix
_xpath
(param
.xpath
)
3147 xpath
.setxattr(nsd
, xp
, param
.value
)
3149 except Exception as e
:
3150 self
.log
.exception(e
)
3153 class VnfInputParameterSubstitution(object):
3155 This class is responsible for substituting input parameters into a VNFD.
3158 def __init__(self
, log
, const_vnfd
, project
):
3159 """Create an instance of VnfInputParameterSubstitution
3162 log - a logger for this object to use
3163 const_vnfd - id refs for vnfs in a ns
3164 project - project for the VNFs
3168 self
.member_vnf_index
= const_vnfd
.member_vnf_index
3169 self
.vnfd_id_ref
= const_vnfd
.vnfd_id_ref
3170 self
.project
= project
3172 def __call__(self
, vnfr
, nsr_config
):
3173 """Substitutes vnf input parameters from the NSR config into the VNFD
3175 This call modifies the provided VNFD with the input parameters that are
3176 contained in the NSR config.
3179 vnfr - a GI VNFR object
3180 nsr_config - a GI NSR Config object
3184 def compose_xpath(xpath
, id):
3185 prefix
= "/rw-project:project[rw-project:name={}]".format(quoted_key(self
.project
.name
)) + \
3186 "/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vnfd/".format(quoted_key(id))
3188 suffix
= '/'.join(xpath
.split('/')[3:]).replace('vnfd', 'vnfr')
3189 return prefix
+ suffix
3191 def substitute_xpath(ip_xpath
, substitute_value
, vnfr
):
3192 vnfr_xpath
= compose_xpath(ip_xpath
, vnfr
.id)
3195 verify_xpath_wildcarded
= xpath
.getxattr(vnfr
, vnfr_xpath
)
3198 "vnf-input-parameter:{} = {}, for VNF : [member-vnf-index : {}, vnfd-id-ref : {}]".format(
3201 self
.member_vnf_index
,
3206 xpath
.setxattr(vnfr
, vnfr_xpath
, substitute_value
)
3208 except Exception as e
:
3209 self
.log
.exception(e
)
3211 except Exception as e
:
3212 self
.log
.exception("Wildcarded xpath {} is listy in nature. Can not update. Exception => {}"
3213 .format(ip_xpath
, e
))
3215 if vnfr
is None or nsr_config
is None:
3218 optional_input_parameters
= set()
3219 for input_parameter
in nsr_config
.nsd
.input_parameter_xpath
:
3220 optional_input_parameters
.add(input_parameter
.xpath
)
3222 # Apply the input parameters to the vnfr
3223 if nsr_config
.vnf_input_parameter
:
3224 for param
in nsr_config
.vnf_input_parameter
:
3225 if (param
.member_vnf_index_ref
== self
.member_vnf_index
and param
.vnfd_id_ref
== self
.vnfd_id_ref
):
3226 if param
.input_parameter
:
3227 for ip
in param
.input_parameter
:
3228 if ip
.xpath
not in optional_input_parameters
:
3229 msg
= "Substitution Failed. Tried to set an invalid vnf input parameter ({}) for vnf [member-vnf-index : {}, vnfd-id-ref : {}]"
3230 self
.log
.error(msg
.format(ip
.xpath
, self
.member_vnf_index
, self
.vnfd_id_ref
))
3234 substitute_xpath(ip
.xpath
, ip
.value
, vnfr
)
3235 except Exception as e
:
3236 self
.log
.exception(e
)
3238 self
.log
.debug("Substituting Xpaths with default Values")
3239 for input_parameter
in nsr_config
.nsd
.input_parameter_xpath
:
3240 if input_parameter
.default_value
is not None:
3242 if "vnfd-catalog" in input_parameter
.xpath
:
3243 substitute_xpath(input_parameter
.xpath
, input_parameter
.default_value
, vnfr
)
3244 except Exception as e
:
3245 self
.log
.exception(e
)
3248 class NetworkServiceDescriptor(object):
3250 Network service descriptor class
3253 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
3263 """ Returns nsd id """
3268 """ Returns name of nsd """
3269 return self
._nsd
.name
3273 """ Return the message associated with this NetworkServiceDescriptor"""
3277 def path_for_id(nsd_id
):
3278 """ Return path for the passed nsd_id"""
3279 return self
._nsm
._project
.add_project(
3280 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'".
3284 """ Return the message associated with this NetworkServiceDescriptor"""
3285 return NetworkServiceDescriptor
.path_for_id(self
.id)
3287 def update(self
, nsd
):
3288 """ Update the NSD descriptor """
3292 class NsdDtsHandler(object):
3293 """ The network service descriptor DTS handler """
3294 XPATH
= "C,/project-nsd:nsd-catalog/project-nsd:nsd"
3296 def __init__(self
, dts
, log
, loop
, nsm
):
3303 self
._project
= nsm
._project
3307 """ Return registration handle """
3312 """ Register for Nsd create/update/delete/read requests from dts """
3315 self
._log
.warning("DTS handler already registered for project {}".
3316 format(self
._project
.name
))
3319 def on_apply(dts
, acg
, xact
, action
, scratch
):
3320 """Apply the configuration"""
3321 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
3322 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
3326 # Create/Update an NSD record
3327 for cfg
in self
._regh
.get_xact_elements(xact
):
3328 # Only interested in those NSD cfgs whose ID was received in prepare callback
3329 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
3330 self
._nsm
.update_nsd(cfg
)
3333 # This can happen if we do the deregister
3334 # during project delete before this is called
3335 self
._log
.debug("No reg handle for {} for project {}".
3336 format(self
.__class
__, self
._project
.name
))
3338 scratch
.pop('nsds', None)
3340 return RwTypes
.RwStatus
.SUCCESS
3343 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3344 """ Prepare callback from DTS for NSD config """
3346 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
3349 fref
= ProtobufC
.FieldReference
.alloc()
3350 fref
.goto_whole_message(msg
.to_pbcm())
3352 if fref
.is_field_deleted():
3353 # Delete an NSD record
3354 self
._log
.debug("Deleting NSD with id %s", msg
.id)
3355 self
._nsm
.delete_nsd(msg
.id)
3357 # Add this NSD to scratch to create/update in apply callback
3358 nsds
= scratch
.setdefault('nsds', [])
3360 # acg._scratch['nsds'].append(msg.id)
3362 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3364 xpath
= self
._project
.add_project(NsdDtsHandler
.XPATH
)
3366 "Registering for NSD config using xpath: %s",
3370 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3371 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3372 # Need a list in scratch to store NSDs to create/update later
3373 # acg._scratch['nsds'] = list()
3374 self
._regh
= acg
.register(
3376 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3377 on_prepare
=on_prepare
)
3379 def deregister(self
):
3380 self
._log
.debug("De-register NSD handler for project {}".
3381 format(self
._project
.name
))
3383 self
._regh
.deregister()
3387 class VnfdDtsHandler(object):
3388 """ DTS handler for VNFD config changes """
3389 XPATH
= "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
3391 def __init__(self
, dts
, log
, loop
, nsm
):
3397 self
._project
= nsm
._project
3401 """ DTS registration handle """
3406 """ Register for VNFD configuration"""
3409 self
._log
.warning("DTS handler already registered for project {}".
3410 format(self
._project
.name
))
3414 def on_apply(dts
, acg
, xact
, action
, scratch
):
3415 """Apply the configuration"""
3416 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
3417 xact
, action
, scratch
)
3419 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
3422 # Create/Update a VNFD record
3423 for cfg
in self
._regh
.get_xact_elements(xact
):
3424 # Only interested in those VNFD cfgs whose ID was received in prepare callback
3425 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
3426 self
._nsm
.update_vnfd(cfg
)
3428 for cfg
in self
._regh
.elements
:
3429 if cfg
.id in scratch
.get('deleted_vnfds', []):
3430 yield from self
._nsm
.delete_vnfd(cfg
.id)
3433 self
._log
.warning("Reg handle none for {} in project {}".
3434 format(self
.__class
__, self
._project
))
3436 scratch
.pop('vnfds', None)
3437 scratch
.pop('deleted_vnfds', None)
3440 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3441 """ on prepare callback """
3442 xpath
= ks_path
.to_xpath(NsdYang
.get_schema())
3443 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
3444 xpath
, xact_info
.query_action
, msg
)
3446 fref
= ProtobufC
.FieldReference
.alloc()
3447 fref
.goto_whole_message(msg
.to_pbcm())
3449 # Handle deletes in prepare_callback, but adds/updates in apply_callback
3450 if fref
.is_field_deleted():
3451 self
._log
.debug("Adding msg to deleted field")
3452 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
3453 deleted_vnfds
.append(msg
.id)
3455 # Add this VNFD to scratch to create/update in apply callback
3456 vnfds
= scratch
.setdefault('vnfds', [])
3457 vnfds
.append(msg
.id)
3460 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3461 except rift
.tasklets
.dts
.ResponseError
as e
:
3463 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
3464 format(self
._project
, xpath
, xact_info
.query_action
, e
))
3467 xpath
= self
._project
.add_project(VnfdDtsHandler
.XPATH
)
3469 "Registering for VNFD config using xpath {} for project {}"
3470 .format(xpath
, self
._project
))
3471 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3472 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3473 # Need a list in scratch to store VNFDs to create/update later
3474 # acg._scratch['vnfds'] = list()
3475 # acg._scratch['deleted_vnfds'] = list()
3476 self
._regh
= acg
.register(
3478 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
3479 on_prepare
=on_prepare
)
3481 def deregister(self
):
3482 self
._log
.debug("De-register VNFD handler for project {}".
3483 format(self
._project
.name
))
3485 self
._regh
.deregister()
3489 class NsrRpcDtsHandler(object):
3490 """ The network service instantiation RPC DTS handler """
3491 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
3492 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
3493 NETCONF_IP_ADDRESS
= "127.0.0.1"
3495 RESTCONF_PORT
= 8008
3496 NETCONF_USER
= "@rift"
3498 REST_BASE_V2_URL
= 'https://{}:{}/v2/api/'.format("127.0.0.1",
3501 def __init__(self
, dts
, log
, loop
, nsm
):
3506 self
._project
= nsm
._project
3509 self
._ns
_regh
= None
3511 self
._manager
= None
3512 self
._nsr
_config
_url
= NsrRpcDtsHandler
.REST_BASE_V2_URL
+ \
3513 'project/{}/'.format(self
._project
) + \
3514 'config/ns-instance-config'
3516 self
._model
= RwYang
.Model
.create_libncx()
3517 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
3521 """ Return the NS manager instance """
3525 def wrap_netconf_config_xml(xml
):
3526 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
3530 def _connect(self
, timeout_secs
=240):
3532 start_time
= time
.time()
3533 while (time
.time() - start_time
) < timeout_secs
:
3536 self
._log
.debug("Attemping NsmTasklet netconf connection.")
3538 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
3540 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
3541 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
3542 username
=NsrRpcDtsHandler
.NETCONF_USER
,
3543 password
=NsrRpcDtsHandler
.NETCONF_PW
,
3545 look_for_keys
=False,
3546 hostkey_verify
=False,
3551 except ncclient
.transport
.errors
.SSHError
as e
:
3552 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
3553 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
3555 yield from asyncio
.sleep(5, loop
=self
._loop
)
3557 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
3560 def _apply_ns_instance_config(self
,payload_dict
):
3561 req_hdr
= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
3562 response
=requests
.post(self
._nsr
_config
_url
,
3564 auth
=(NsrRpcDtsHandler
.NETCONF_USER
, NsrRpcDtsHandler
.NETCONF_PW
),
3571 """ Register for NS monitoring read from dts """
3574 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
3575 """ prepare callback from dts start-network-service"""
3576 assert action
== rwdts
.QueryAction
.RPC
3578 if not self
._project
.rpc_check(msg
, xact_info
):
3582 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
3583 "nsr_id":str(uuid
.uuid4())
3586 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and
3587 ('cloud_account' in rpc_ip
or 'om_datacenter' in rpc_ip
)):
3589 "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".
3591 self
._log
.error(errmsg
)
3592 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3593 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3595 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3596 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3599 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
3602 # Add used value to the pool
3603 self
._log
.debug("RPC output: {}".format(rpc_op
))
3605 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
3607 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3608 rpc_ip
.name
, rpc_ip
.nsd_ref
)
3610 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
3611 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
3612 if k
in RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields
}
3613 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
3615 ns_instance_config
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
3616 ns_instance_config
.nsd
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd()
3617 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
3619 payload_dict
= ns_instance_config
.to_json(self
._model
)
3621 self
._log
.debug("Sending configure ns-instance-config json to %s: %s",
3622 self
._nsr
_config
_url
,ns_instance_config
)
3624 response
= yield from self
._loop
.run_in_executor(
3626 self
._apply
_ns
_instance
_config
,
3629 response
.raise_for_status()
3630 self
._log
.debug("Received edit config response: %s", response
.json())
3632 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
3633 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3635 except Exception as e
:
3636 errmsg
= ("Exception processing the "
3637 "start-network-service: {}".format(e
))
3638 self
._log
.exception(errmsg
)
3639 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3640 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3642 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3643 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3645 self
._ns
_regh
= yield from self
._dts
.register(
3646 xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
3647 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
3648 on_prepare
=on_ns_config_prepare
),
3649 flags
=rwdts
.Flag
.PUBLISHER
,
3652 def deregister(self
):
3654 self
._ns
_regh
.deregister()
3655 self
._ns
_regh
= None
3658 class NsrDtsHandler(object):
3659 """ The network service DTS handler """
3660 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3661 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3662 KEY_PAIR_XPATH
= "C,/nsr:key-pair"
3664 def __init__(self
, dts
, log
, loop
, nsm
):
3669 self
._project
= self
._nsm
._project
3671 self
._nsr
_regh
= None
3672 self
._scale
_regh
= None
3673 self
._key
_pair
_regh
= None
3677 """ Return the NS manager instance """
3682 """ Register for Nsr create/update/delete/read requests from dts """
3685 self
._log
.warning("DTS handler already registered for project {}".
3686 format(self
._project
.name
))
3689 def nsr_id_from_keyspec(ks
):
3690 nsr_path_entry
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3691 nsr_id
= nsr_path_entry
.key00
.id
3694 def group_name_from_keyspec(ks
):
3695 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3696 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3699 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3700 """ Return boolean indicating if scaling group instance was already commited previously.
3702 By looking at the existing elements in this registration handle (elements not part
3703 of this current xact), we can tell if the instance was configured previously without
3704 keeping any application state.
3706 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3707 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3708 elem_group_name
= group_name_from_keyspec(keyspec
)
3710 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3713 if instance_cfg
.id == instance_id
:
3718 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3719 delta
= {"added": [], "deleted": []}
3720 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3721 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3722 if elem_nsr_id
!= nsr_id
:
3725 elem_group_name
= group_name_from_keyspec(keyspec
)
3726 if elem_group_name
!= group_name
:
3729 delta
["added"].append(instance_cfg
.id)
3731 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3732 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3733 if elem_nsr_id
!= nsr_id
:
3736 elem_group_name
= group_name_from_keyspec(keyspec
)
3737 if elem_group_name
!= group_name
:
3740 if instance_cfg
.id in delta
["added"]:
3741 delta
["added"].remove(instance_cfg
.id)
3743 delta
["deleted"].append(instance_cfg
.id)
3748 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3751 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3752 delta
= {"added": [], "deleted": []}
3753 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3754 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3755 if elem_nsr_id
!= nsr_id
:
3758 if 'vld' in instance_cfg
.nsd
:
3759 for vld
in instance_cfg
.nsd
.vld
:
3760 delta
["added"].append(vld
)
3762 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3763 self
._log
.debug("NSR update: %s", instance_cfg
)
3764 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3765 if elem_nsr_id
!= nsr_id
:
3768 if 'vld' in instance_cfg
.nsd
:
3769 for vld
in instance_cfg
.nsd
.vld
:
3770 if vld
in delta
["added"]:
3771 delta
["added"].remove(vld
)
3773 delta
["deleted"].append(vld
)
3777 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3778 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3780 for vld
in vl_delta
["added"]:
3781 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3783 for vld
in vl_delta
["deleted"]:
3784 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3786 def get_nsr_key_pairs(dts_member_reg
, xact
):
3788 for instance_cfg
, keyspec
in dts_member_reg
.get_xact_elements(xact
, include_keyspec
=True):
3789 self
._log
.debug("Key pair received is {} KS: {}".format(instance_cfg
, keyspec
))
3790 xpath
= keyspec
.to_xpath(RwNsrYang
.get_schema())
3791 key_pairs
[instance_cfg
.name
] = instance_cfg
3794 def on_apply(dts
, acg
, xact
, action
, scratch
):
3795 """Apply the configuration"""
3796 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3797 xact
, action
, scratch
)
3800 def handle_create_nsr(msg
, key_pairs
=None, restart_mode
=False):
3801 # Handle create nsr requests """
3802 # Do some validations
3803 if not msg
.has_field("nsd"):
3804 err
= "NSD not provided"
3805 self
._log
.error(err
)
3806 raise NetworkServiceRecordError(err
)
3808 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3809 msg
.id, msg
.as_dict())
3810 nsr
= yield from self
.nsm
.create_nsr(msg
,
3812 key_pairs
=key_pairs
,
3813 restart_mode
=restart_mode
)
3816 def handle_delete_nsr(msg
):
3818 def delete_instantiation(ns_id
):
3819 """ Delete instantiation """
3820 yield from self
._nsm
.terminate_ns(ns_id
, None)
3822 # Handle delete NSR requests
3823 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3824 # Terminate the NSR instance
3825 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3827 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3828 event_descr
= "Terminate rcvd for NS Id: %s, NS Name: %s" % (msg
.id, msg
.name
)
3829 nsr
.record_event("terminate-rcvd", event_descr
)
3831 self
._loop
.create_task(delete_instantiation(msg
.id))
3834 def begin_instantiation(nsr
):
3835 # Begin instantiation
3836 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3838 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3839 except Exception as e
:
3840 self
._log
.exception(e
)
3844 def instantiate_ns(msg
, key_pairs
, restart_mode
=False):
3845 nsr
= yield from handle_create_nsr(msg
, key_pairs
, restart_mode
=restart_mode
)
3846 yield from begin_instantiation(nsr
)
3848 def on_instantiate_done(fut
, msg
):
3849 # If the do_instantiate fails, then publish NSR with failed result
3853 print(traceback
.format_exception(None, e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
3854 self
._log
.error("NSR instantiation failed for NSR id %s: %s", msg
.id, str(e
))
3855 failed_nsr
= self
._nsm
.nsrs
[msg
.id]
3856 self
._loop
.create_task(failed_nsr
.instantiation_failed(failed_reason
=str(e
)))
3859 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3860 xact
, action
, scratch
)
3862 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3864 if self
._key
_pair
_regh
:
3865 for element
in self
._key
_pair
_regh
.elements
:
3866 key_pairs
.append(element
)
3868 self
._log
.error("Reg handle none for key pair in project {}".
3869 format(self
._project
))
3872 for element
in self
._nsr
_regh
.elements
:
3873 if element
.id not in self
.nsm
._nsrs
:
3874 instantiate_task
= self
._loop
.create_task(instantiate_ns(element
, key_pairs
,
3876 instantiate_task
.add_done_callback(functools
.partial(on_instantiate_done
, msg
=element
))
3878 self
._log
.error("Reg handle none for NSR in project {}".
3879 format(self
._project
))
3881 return RwTypes
.RwStatus
.SUCCESS
3883 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3886 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3887 deleted_msgs
, updated_msgs
)
3889 for msg
in added_msgs
:
3890 if msg
.id not in self
._nsm
.nsrs
:
3891 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3892 key_pairs
= get_nsr_key_pairs(self
._key
_pair
_regh
, xact
)
3893 instantiate_task
= self
._loop
.create_task(instantiate_ns(msg
,key_pairs
))
3894 instantiate_task
.add_done_callback(functools
.partial(on_instantiate_done
, msg
=msg
))
3896 for msg
in deleted_msgs
:
3897 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3899 handle_delete_nsr(msg
)
3901 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3903 for msg
in updated_msgs
:
3904 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3905 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3908 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3910 for group
in msg
.scaling_group
:
3911 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3912 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3914 for instance_id
in instance_delta
["added"]:
3915 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3917 for instance_id
in instance_delta
["deleted"]:
3918 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3921 return RwTypes
.RwStatus
.SUCCESS
3924 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3925 """ Prepare calllback from DTS for NSR """
3927 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3928 action
= xact_info
.query_action
3930 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3931 xact
, action
, xact_info
, xpath
, msg
3934 fref
= ProtobufC
.FieldReference
.alloc()
3935 fref
.goto_whole_message(msg
.to_pbcm())
3937 def send_err_msg(err_msg
):
3938 self
._log
.error(errmsg
)
3939 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3942 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
3945 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3946 # if this is an NSR create
3947 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3948 # Ensure the Cloud account/datacenter has been specified
3949 if not msg
.has_field("datacenter") and not msg
.has_field("datacenter"):
3950 errmsg
= ("Cloud account or datacenter not specified in NS {}".
3952 send_err_msg(errmsg
)
3955 # Check if nsd is specified
3956 if not msg
.has_field("nsd"):
3957 errmsg
= ("NSD not specified in NS {}".
3959 send_err_msg(errmsg
)
3963 nsr
= self
._nsm
.nsrs
[msg
.id]
3964 if msg
.has_field("nsd"):
3965 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3966 errmsg
= ("Unable to update VL when NS {} not in running state".
3968 send_err_msg(errmsg
)
3971 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3972 errmsg
= ("NS config {} NSD should have atleast 1 VLD".
3974 send_err_msg(errmsg
)
3977 if msg
.has_field("scaling_group"):
3978 self
._log
.debug("ScaleMsg %s", msg
)
3979 self
._log
.debug("NSSCALINGSTATE %s", nsr
.state
)
3980 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3981 errmsg
= ("Unable to perform scaling action when NS {} not in running state".
3983 send_err_msg(errmsg
)
3986 if len(msg
.scaling_group
) > 1:
3987 errmsg
= ("Only a single scaling group can be configured at a time for NS {}".
3989 send_err_msg(errmsg
)
3992 for group_msg
in msg
.scaling_group
:
3993 num_new_group_instances
= len(group_msg
.instance
)
3994 if num_new_group_instances
> 1:
3995 errmsg
= ("Only a single scaling instance can be modified at a time for NS {}".
3997 send_err_msg(errmsg
)
4000 elif num_new_group_instances
== 1:
4001 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
4002 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
4003 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
4004 errmsg
= (" Max instances for {} reached for NS {}".
4005 format(str(scale_group
), msg
.name
))
4006 send_err_msg(errmsg
)
4009 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
4012 xpath
= self
._project
.add_project(NsrDtsHandler
.NSR_XPATH
)
4013 self
._log
.debug("Registering for NSR config using xpath: {}".
4016 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
4017 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
4018 self
._nsr
_regh
= acg
.register(
4020 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
4021 on_prepare
=on_prepare
4024 self
._scale
_regh
= acg
.register(
4025 xpath
=self
._project
.add_project(NsrDtsHandler
.SCALE_INSTANCE_XPATH
),
4026 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
4029 self
._key
_pair
_regh
= acg
.register(
4030 xpath
=self
._project
.add_project(NsrDtsHandler
.KEY_PAIR_XPATH
),
4031 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
4034 def deregister(self
):
4035 self
._log
.debug("De-register NSR config for project {}".
4036 format(self
._project
.name
))
4038 self
._nsr
_regh
.deregister()
4039 self
._nsr
_regh
= None
4040 if self
._scale
_regh
:
4041 self
._scale
_regh
.deregister()
4042 self
._scale
_regh
= None
4043 if self
._key
_pair
_regh
:
4044 self
._key
_pair
_regh
.deregister()
4045 self
._key
_pair
_regh
= None
4048 class VnfrDtsHandler(object):
4049 """ The virtual network service DTS handler """
4050 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
4052 def __init__(self
, dts
, log
, loop
, nsm
):
4062 """ Return registration handle """
4067 """ Return the NS manager instance """
4072 """ Register for vnfr create/update/delete/ advises from dts """
4074 self
._log
.warning("VNFR DTS handler already registered for project {}".
4075 format(self
._project
.name
))
4079 def on_prepare(xact_info
, action
, ks_path
, msg
):
4080 """ prepare callback from dts """
4081 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
4083 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
4084 xact_info
, action
, ks_path
, msg
4087 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
4088 path_entry
= schema
.keyspec_to_entry(ks_path
)
4089 if not path_entry
or (path_entry
.key00
.id not in self
._nsm
._vnfrs
):
4090 # This can happen when using external RO or after delete with monitoring params
4091 self
._log
.debug("%s request for non existent record path %s",
4093 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
4097 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
4098 yield from self
._nsm
.update_vnfr(msg
)
4099 elif action
== rwdts
.QueryAction
.DELETE
:
4100 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
4102 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
4104 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
4106 self
._log
.debug("Registering for VNFR using xpath: %s",
4107 VnfrDtsHandler
.XPATH
)
4109 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
4110 with self
._dts
.group_create() as group
:
4111 self
._regh
= group
.register(xpath
=self
._nsm
._project
.add_project(
4112 VnfrDtsHandler
.XPATH
),
4114 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
4116 def deregister(self
):
4117 self
._log
.debug("De-register VNFR for project {}".
4118 format(self
._nsm
._project
.name
))
4120 self
._regh
.deregister()
4123 class NsManager(object):
4124 """ The Network Service Manager class"""
4125 def __init__(self
, dts
, log
, loop
, project
,
4126 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
4127 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
4131 self
._project
= project
4132 self
._nsr
_handler
= nsr_handler
4133 self
._vnfr
_pub
_handler
= vnfr_handler
4134 self
._vlr
_pub
_handler
= vlr_handler
4135 self
._vnffgmgr
= vnffgmgr
4136 self
._vnfd
_pub
_handler
= vnfd_pub_handler
4137 self
._cloud
_account
_handler
= cloud_account_handler
4139 self
._ro
_plugin
_selector
= ro_plugin_selector
4141 # Intialize the set of variables for implementing Scaling RPC using REST.
4142 self
._headers
= {"content-type":"application/json", "accept":"application/json"}
4143 self
._user
= '@rift'
4144 self
._password
= 'rift'
4145 self
._ip
= 'localhost'
4147 self
._conf
_url
= "https://{ip}:{port}/api/config/project/{project}". \
4150 project
=self
._project
.name
)
4156 self
._nsr
_for
_vlr
= {}
4158 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
4160 # TODO: All these handlers should move to tasklet level.
4161 # Passing self is often an indication of bad design
4162 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
4163 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
4164 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
4165 VnfrDtsHandler(dts
, log
, loop
, self
),
4166 NsrDtsHandler(dts
, log
, loop
, self
),
4167 ScalingRpcHandler(log
, dts
, loop
, self
, self
.scale_rpc_callback
),
4168 # NsrRpcDtsHandler(dts, log, loop, self),
4169 self
._vnfd
_dts
_handler
,
4190 def nsr_handler(self
):
4191 """" NSR handler """
4192 return self
._nsr
_handler
4196 """" So Obj handler """
4201 """ NSRs in this NSM"""
4206 """ NSDs in this NSM"""
4211 """ VNFDs in this NSM"""
4216 """ VNFRs in this NSM"""
4220 def nsr_pub_handler(self
):
4221 """ NSR publication handler """
4222 return self
._nsr
_handler
4225 def vnfr_pub_handler(self
):
4226 """ VNFR publication handler """
4227 return self
._vnfr
_pub
_handler
4230 def vlr_pub_handler(self
):
4231 """ VLR publication handler """
4232 return self
._vlr
_pub
_handler
4235 def vnfd_pub_handler(self
):
4236 return self
._vnfd
_pub
_handler
4240 """ Register all static DTS handlers """
4241 self
._log
.debug("Register DTS handlers for project {}".format(self
._project
))
4242 for dts_handle
in self
._dts
_handlers
:
4243 if asyncio
.iscoroutinefunction(dts_handle
.register
):
4244 yield from dts_handle
.register()
4246 dts_handle
.register()
4248 def deregister(self
):
4249 """ Register all static DTS handlers """
4250 for dts_handle
in self
._dts
_handlers
:
4251 dts_handle
.deregister()
4254 def get_ns_by_nsr_id(self
, nsr_id
):
4255 """ get NSR by nsr id """
4256 if nsr_id
not in self
._nsrs
:
4257 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
4259 return self
._nsrs
[nsr_id
]
4261 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
4262 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4267 nsr
= self
._nsrs
[nsr_id
]
4268 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4269 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4271 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
4273 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
4274 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4279 nsr
= self
._nsrs
[nsr_id
]
4280 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4281 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4283 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
4285 def scale_rpc_callback(self
, xact
, msg
, action
):
4286 """Callback handler for RPC calls
4288 xact : Transaction Handler
4290 action : Scaling Action
4292 def get_scaling_group_information():
4293 scaling_group_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
4294 output
= requests
.get(scaling_group_url
, headers
=self
._headers
, auth
=(self
._user
, self
._password
), verify
=False)
4295 if output
.text
is None or len(output
.text
) == 0:
4296 self
.log
.error("nsr id %s information not present", self
._nsr
_id
)
4298 scaling_group_info
= json
.loads(output
.text
)
4299 return scaling_group_info
4301 def config_scaling_group_information(scaling_group_info
):
4302 data_str
= json
.dumps(scaling_group_info
)
4304 scale_out_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
4305 response
= requests
.put(scale_out_url
, data
=data_str
, verify
=False,
4306 auth
=(self
._user
, self
._password
), headers
=self
._headers
)
4307 response
.raise_for_status()
4310 scaling_group_info
= get_scaling_group_information()
4311 self
._log
.debug("Scale out info: {}".format(scaling_group_info
))
4312 if scaling_group_info
is None:
4315 scaling_group_present
= False
4316 if "scaling-group" in scaling_group_info
["nsr:nsr"]:
4317 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
4318 for scaling_group
in scaling_group_array
:
4319 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
4320 scaling_group_present
= True
4321 if 'instance' not in scaling_group
:
4322 scaling_group
['instance'] = []
4323 for instance
in scaling_group
['instance']:
4324 if instance
["id"] == int(msg
.instance_id
):
4325 self
.log
.error("scaling group with instance id %s exists for scale out", msg
.instance_id
)
4327 scaling_group
["instance"].append({"id": int(msg
.instance_id
)})
4329 if not scaling_group_present
:
4330 scaling_group_info
["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg
.scaling_group_name_ref
,
4331 "instance": [{"id": msg
.instance_id
}]}]
4333 config_scaling_group_information(scaling_group_info
)
4337 scaling_group_info
= get_scaling_group_information()
4338 if scaling_group_info
is None:
4341 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
4342 scaling_group_present
= False
4343 instance_id_present
= False
4344 for scaling_group
in scaling_group_array
:
4345 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
4346 scaling_group_present
= True
4347 if 'instance' in scaling_group
:
4348 instance_array
= scaling_group
["instance"];
4349 for index
in range(len(instance_array
)):
4350 if instance_array
[index
]["id"] == int(msg
.instance_id
):
4351 instance_array
.pop(index
)
4352 instance_id_present
= True
4355 if not scaling_group_present
:
4356 self
.log
.error("Scaling group %s doesnot exists for scale in", msg
.scaling_group_name_ref
)
4359 if not instance_id_present
:
4360 self
.log
.error("Instance id %s doesnot exists for scale in", msg
.instance_id
)
4363 config_scaling_group_information(scaling_group_info
)
4366 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
4367 self
._loop
.run_in_executor(None, scale_out
)
4369 self
._loop
.run_in_executor(None, scale_in
)
4371 def nsr_update_cfg(self
, nsr_id
, msg
):
4372 nsr
= self
._nsrs
[nsr_id
]
4373 nsr
.nsr_cfg_msg
= msg
4375 def nsr_instantiate_vl(self
, nsr_id
, vld
):
4376 self
.log
.error("NSR {} create VL {}".format(nsr_id
, vld
))
4377 nsr
= self
._nsrs
[nsr_id
]
4378 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4379 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
4381 # Not calling in a separate task as this is called from a separate task
4382 yield from nsr
.create_vl_instance(vld
)
4384 def nsr_terminate_vl(self
, nsr_id
, vld
):
4385 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
4386 nsr
= self
._nsrs
[nsr_id
]
4387 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4388 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
4390 # Not calling in a separate task as this is called from a separate task
4391 yield from nsr
.delete_vl_instance(vld
)
4394 def create_nsr(self
, nsr_msg
, config_xact
, key_pairs
=None,restart_mode
=False):
4395 """ Create an NSR instance """
4396 self
._log
.debug("NSRMSG %s", nsr_msg
)
4397 if nsr_msg
.id in self
._nsrs
:
4398 msg
= "NSR id %s already exists" % nsr_msg
.id
4399 self
._log
.error(msg
)
4400 raise NetworkServiceRecordError(msg
)
4402 self
._log
.debug("Create NetworkServiceRecord nsr id %s from nsd_id %s",
4406 nsm_plugin
= self
._ro
_plugin
_selector
.get_ro_plugin(nsr_msg
.resource_orchestrator
)
4407 #Work Around - openmano expects datacenter id instead of datacenter name
4408 if isinstance(nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
4409 for uuid
, name
in nsm_plugin
._cli
_api
.datacenter_list():
4410 if name
== nsr_msg
.datacenter
:
4411 nsr_msg
.datacenter
= uuid
4413 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.datacenter
)
4415 nsr
= NetworkServiceRecord(self
._dts
,
4424 restart_mode
=restart_mode
,
4425 vlr_handler
=self
._vlr
_pub
_handler
4427 self
._nsrs
[nsr_msg
.id] = nsr
4430 # Generate ssh key pair if required
4431 nsr
.generate_ssh_key_pair(config_xact
)
4432 except Exception as e
:
4433 self
._log
.exception("SSH key: {}".format(e
))
4435 self
._log
.debug("NSR {}: SSh key generated: {}".format(nsr_msg
.name
,
4438 ssh_key
= {'private_key': nsr
.private_key
,
4439 'public_key': nsr
.public_key
4442 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
, key_pairs
, ssh_key
=ssh_key
)
4446 def delete_nsr(self
, nsr_id
):
4448 Delete NSR with the passed nsr id
4450 del self
._nsrs
[nsr_id
]
4453 def instantiate_ns(self
, nsr_id
, config_xact
):
4454 """ Instantiate an NS instance """
4455 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
4456 if nsr_id
not in self
._nsrs
:
4457 err
= "NSR id %s not found " % nsr_id
4458 self
._log
.error(err
)
4459 raise NetworkServiceRecordError(err
)
4461 nsr
= self
._nsrs
[nsr_id
]
4463 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
4464 except Exception as e
:
4465 self
._log
.exception("NS instantiate: {}".format(e
))
4469 def update_vnfr(self
, vnfr
):
4470 """Create/Update an VNFR """
4472 vnfr_state
= self
._vnfrs
[vnfr
.id].state
4473 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
4475 no_of_active_vms
= 0
4476 for vdur
in vnfr
.vdur
:
4477 if vdur
.operational_status
== 'running':
4478 no_of_active_vms
+= 1
4480 self
._vnfrs
[vnfr
.id]._active
_vdus
= no_of_active_vms
4481 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
4482 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
4484 nsr
._vnf
_inst
_started
= False
4485 yield from nsr
.update_state()
4487 def find_nsr_for_vnfr(self
, vnfr_id
):
4488 """ Find the NSR which )has the passed vnfr id"""
4489 for nsr
in list(self
.nsrs
.values()):
4490 for vnfr
in list(nsr
.vnfrs
.values()):
4491 if vnfr
.id == vnfr_id
:
4495 def delete_vnfr(self
, vnfr_id
):
4496 """ Delete VNFR with the passed id"""
4497 del self
._vnfrs
[vnfr_id
]
4500 def get_nsr_config(self
, nsd_id
):
4501 xpath
= self
._project
.add_project("C,/nsr:ns-instance-config")
4502 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
4504 for result
in results
:
4505 entry
= yield from result
4506 ns_instance_config
= entry
.result
4508 for nsr
in ns_instance_config
.nsr
:
4509 if nsr
.nsd
.id == nsd_id
:
4514 def get_nsd(self
, nsd_id
):
4515 """ Get network service descriptor for the passed nsd_id"""
4516 if nsd_id
not in self
._nsds
:
4517 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
4518 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
4520 return self
._nsds
[nsd_id
]
4522 def create_nsd(self
, nsd_msg
):
4523 """ Create a network service descriptor """
4524 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
4525 if nsd_msg
.id in self
._nsds
:
4526 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
4527 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
4529 nsd
= NetworkServiceDescriptor(
4536 self
._nsds
[nsd_msg
.id] = nsd
4540 def update_nsd(self
, nsd
):
4541 """ update the Network service descriptor """
4542 self
._log
.debug("Update network service descriptor - %s", nsd
)
4543 if nsd
.id not in self
._nsds
:
4544 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
4545 self
.create_nsd(nsd
)
4547 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
4548 self
._nsds
[nsd
.id].update(nsd
)
4550 def delete_nsd(self
, nsd_id
):
4551 """ Delete the Network service descriptor with the passed id """
4552 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
4553 if nsd_id
not in self
._nsds
:
4554 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
4555 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
4556 del self
._nsds
[nsd_id
]
4558 def get_vnfd_config(self
, xact
):
4559 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
4560 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
4561 if cfg
.id not in self
._vnfds
:
4562 self
.create_vnfd(cfg
)
4564 def get_vnfd(self
, vnfd_id
, xact
):
4565 """ Get virtual network function descriptor for the passed vnfd_id"""
4566 if vnfd_id
not in self
._vnfds
:
4567 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4568 self
.get_vnfd_config(xact
)
4570 if vnfd_id
not in self
._vnfds
:
4571 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4572 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
4574 return self
._vnfds
[vnfd_id
]
4576 def create_vnfd(self
, vnfd
):
4577 """ Create a virtual network function descriptor """
4578 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
4579 if vnfd
.id in self
._vnfds
:
4580 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
4581 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
4583 self
._vnfds
[vnfd
.id] = vnfd
4584 return self
._vnfds
[vnfd
.id]
4586 def update_vnfd(self
, vnfd
):
4587 """ Update the virtual network function descriptor """
4588 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
4591 if vnfd
.id not in self
._vnfds
:
4592 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
4593 self
.create_vnfd(vnfd
)
4595 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
4596 self
._vnfds
[vnfd
.id] = vnfd
4599 def delete_vnfd(self
, vnfd_id
):
4600 """ Delete the virtual network function descriptor with the passed id """
4601 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
4602 if vnfd_id
not in self
._vnfds
:
4603 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
4604 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
4606 del self
._vnfds
[vnfd_id
]
4609 def publish_nsr(self
, xact
, path
, msg
):
4610 """ Publish a NSR """
4611 self
._log
.debug("Publish NSR with path %s, msg %s",
4613 yield from self
.nsr_handler
.update(xact
, path
, msg
)
4616 def unpublish_nsr(self
, xact
, path
):
4617 """ Un Publish an NSR """
4618 self
._log
.debug("Publishing delete NSR with path %s", path
)
4619 yield from self
.nsr_handler
.delete(path
, xact
)
4621 def vnfr_is_ready(self
, vnfr_id
):
4622 """ VNFR with the id is ready """
4623 self
._log
.debug("VNFR id %s ready", vnfr_id
)
4624 if vnfr_id
not in self
._vnfds
:
4625 err
= "Did not find VNFR ID with id %s" % vnfr_id
4626 self
._log
.critical("err")
4627 raise VirtualNetworkFunctionRecordError(err
)
4628 self
._vnfrs
[vnfr_id
].is_ready()
4632 def terminate_ns(self
, nsr_id
, xact
):
4634 Terminate network service for the given NSR Id
4637 if nsr_id
not in self
._nsrs
:
4640 # Terminate the instances/networks assocaited with this nw service
4641 self
._log
.debug("Terminating the network service %s", nsr_id
)
4643 yield from self
._nsrs
[nsr_id
].terminate()
4644 except Exception as e
:
4645 self
.log
.exception("Failed to terminate NSR[id=%s]", nsr_id
)
4647 def vlr_event(self
, vlr
, action
):
4648 self
._log
.debug("Received VLR %s with action:%s", vlr
, action
)
4649 # Find the NS and see if we can proceed
4650 nsr
= self
.find_nsr_for_vlr_id(vlr
.id)
4652 self
._log
.error("VLR %s:%s received for NSR, state:%s",
4653 vlr
.id, vlr
.name
, vlr
.operational_status
)
4655 nsr
.vlr_event(vlr
, action
)
4657 def add_vlr_id_nsr_map(self
, vlr_id
, nsr
):
4658 """ Add a mapping for vlr_id into NSR """
4659 self
._nsr
_for
_vlr
[vlr_id
] = nsr
4661 def remove_vlr_id_nsr_map(self
, vlr_id
):
4662 """ Remove a mapping for vlr_id into NSR """
4663 if vlr_id
in self
._nsr
_for
_vlr
:
4664 del self
._nsr
_for
_vlr
[vlr_id
]
4666 def find_nsr_for_vlr_id(self
, vlr_id
):
4667 """ Find NSR for VLR id """
4669 if vlr_id
in self
._nsr
_for
_vlr
:
4670 nsr
= self
._nsr
_for
_vlr
[vlr_id
]
4674 class NsmRecordsPublisherProxy(object):
4675 """ This class provides a publisher interface that allows plugin objects
4676 to publish NSR/VNFR/VLR"""
4678 def __init__(self
, dts
, log
, loop
, project
, nsr_pub_hdlr
,
4679 vnfr_pub_hdlr
, vlr_pub_hdlr
,):
4683 self
._project
= project
4684 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4685 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4686 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4689 def publish_nsr_opdata(self
, xact
, nsr
):
4690 """ Publish an NSR """
4691 path
= ("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref={}]"
4692 ).format(quoted_key(nsr
.ns_instance_config_ref
))
4693 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4696 def publish_nsr(self
, xact
, nsr
):
4697 """ Publish an NSR """
4698 path
= self
._project
.add_project(NetworkServiceRecord
.xpath_from_nsr(nsr
))
4699 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4702 def unpublish_nsr(self
, xact
, nsr
):
4703 """ Unpublish an NSR """
4704 path
= self
._project
.add_project(NetworkServiceRecord
.xpath_from_nsr(nsr
))
4705 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4708 def publish_vnfr(self
, xact
, vnfr
):
4709 """ Publish an VNFR """
4710 path
= self
._project
.add_project(VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
))
4711 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4714 def unpublish_vnfr(self
, xact
, vnfr
):
4715 """ Unpublish a VNFR """
4716 path
= self
._project
.add_project(VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
))
4717 yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
)
4718 # NOTE: The regh delete does not send the on_prepare to VNFM tasklet as well
4719 # as remove all the VNFR elements. So need to send this additional delete block.
4720 with self
._dts
.transaction(flags
= 0) as xact
:
4721 block
= xact
.block_create()
4722 block
.add_query_delete(path
)
4723 yield from block
.execute(flags
=0, now
=True)
4726 def publish_vlr(self
, xact
, vlr
):
4727 """ Publish a VLR """
4728 path
= self
._project
.add_project(VirtualLinkRecord
.vlr_xpath(vlr
))
4729 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4732 def unpublish_vlr(self
, xact
, vlr
):
4733 """ Unpublish a VLR """
4734 path
= self
._project
.add_project(VirtualLinkRecord
.vlr_xpath(vlr
))
4735 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4737 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4738 """ The Network service Monitor DTS handler """
4739 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4740 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4742 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4743 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4745 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4747 def __init__(self
, log
, dts
, loop
, nsm
, callback
=None):
4748 super().__init
__(log
, dts
, loop
, nsm
._project
)
4750 self
.callback
= callback
4751 self
.last_instance_id
= defaultdict(int)
4754 self
._reg
_out
= None
4759 def send_err_msg(err_msg
, xact_info
, ks_path
, e
=False):
4760 xpath
= ks_path
.to_xpath(NsrYang
.get_schema())
4762 self
._log
.exception(err_msg
)
4764 self
._log
.error(err_msg
)
4765 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
4768 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
4771 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4772 assert action
== rwdts
.QueryAction
.RPC
4774 self
._log
.debug("Scale in called: {}".format(msg
.as_dict()))
4775 if not self
.project
.rpc_check(msg
, xact_info
):
4779 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4780 "instance_id": msg
.instance_id
})
4782 nsr
= self
._nsm
.nsrs
[msg
.nsr_id_ref
]
4783 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4784 errmsg
= ("Unable to perform scaling action when NS {}({}) not in running state".
4785 format(nsr
.name
, nsr
.id))
4786 send_err_msg(errmsg
, xact_info
, ks_path
)
4789 xact_info
.respond_xpath(
4790 rwdts
.XactRspCode
.ACK
,
4791 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4795 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4797 except Exception as e
:
4798 errmsg
= ("Exception doing scale in using {}: {}".
4800 send_err_msg(errmsg
, xact_info
, ks_path
, e
=True)
4803 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4804 assert action
== rwdts
.QueryAction
.RPC
4806 self
._log
.debug("Scale out called: {}".format(msg
.as_dict()))
4807 if not self
.project
.rpc_check(msg
, xact_info
):
4811 scaling_group
= msg
.scaling_group_name_ref
4812 if not msg
.instance_id
:
4813 last_instance_id
= self
.last_instance_id
[scale_group
]
4814 msg
.instance_id
= last_instance_id
+ 1
4815 self
.last_instance_id
[scale_group
] += 1
4817 nsr
= self
._nsm
.nsrs
[msg
.nsr_id_ref
]
4818 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4819 errmsg
= ("Unable to perform scaling action when NS {}({}) not in running state".
4820 format(nsr
.name
, nsr
.id))
4821 send_err_msg(errmsg
, xact_info
, ks_path
)
4824 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4825 "instance_id": msg
.instance_id
})
4827 xact_info
.respond_xpath(
4828 rwdts
.XactRspCode
.ACK
,
4829 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4833 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4835 except Exception as e
:
4836 errmsg
= ("Exception doing scale in using {}: {}".
4838 send_err_msg(errmsg
, xact_info
, ks_path
, e
=True)
4840 self
._reg
_in
= yield from self
.dts
.register(
4841 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4842 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
4843 on_prepare
=on_scale_in_prepare
),
4844 flags
=rwdts
.Flag
.PUBLISHER
)
4846 self
._reg
_out
= yield from self
.dts
.register(
4847 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4848 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
4849 on_prepare
=on_scale_out_prepare
),
4850 flags
=rwdts
.Flag
.PUBLISHER
)
4852 def deregister(self
):
4854 self
._reg
_in
.deregister()
4858 self
._reg
_out
.deregister()
4859 self
._reg
_out
= None
4862 class NsmProject(ManoProject
):
4864 def __init__(self
, name
, tasklet
, **kw
):
4865 super(NsmProject
, self
).__init
__(tasklet
.log
, name
)
4866 self
.update(tasklet
)
4869 self
._ro
_plugin
_selector
= None
4870 self
._vnffgmgr
= None
4872 self
._nsr
_pub
_handler
= None
4873 self
._vnfr
_pub
_handler
= None
4874 self
._vlr
_pub
_handler
= None
4875 self
._vnfd
_pub
_handler
= None
4876 self
._scale
_cfg
_handler
= None
4878 self
._records
_publisher
_proxy
= None
4880 def vlr_event(self
, vlr
, action
):
4881 """ VLR Event callback """
4882 self
.log
.debug("VLR Event received for VLR %s with action %s", vlr
, action
)
4883 self
._nsm
.vlr_event(vlr
, action
)
4887 self
.log
.debug("Register NsmProject for {}".format(self
.name
))
4889 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(
4890 self
._dts
, self
.log
, self
.loop
, self
)
4891 yield from self
._nsr
_pub
_handler
.register()
4893 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(
4894 self
._dts
, self
.log
, self
.loop
, self
)
4895 yield from self
._vnfr
_pub
_handler
.register()
4897 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(
4898 self
._dts
, self
.log
, self
.loop
, self
)
4899 yield from self
._vlr
_pub
_handler
.register()
4901 self
._vlr
_sub
_handler
= subscriber
.VlrSubscriberDtsHandler(self
.log
,
4907 yield from self
._vlr
_sub
_handler
.register()
4909 manifest
= self
._tasklet
.tasklet_info
.get_pb_manifest()
4910 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4911 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4912 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4914 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(
4915 use_ssl
, ssl_cert
, ssl_key
, self
.loop
, self
)
4917 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4922 self
._nsr
_pub
_handler
,
4923 self
._vnfr
_pub
_handler
,
4924 self
._vlr
_pub
_handler
,
4927 # Register the NSM to receive the nsm plugin
4928 # when cloud account is configured
4929 self
._ro
_plugin
_selector
= cloud
.ROAccountConfigSubscriber(
4934 self
._records
_publisher
_proxy
4936 yield from self
._ro
_plugin
_selector
.register()
4938 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4945 yield from self
._cloud
_account
_handler
.register()
4947 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
,
4948 self
, self
._cloud
_account
_handler
)
4949 yield from self
._vnffgmgr
.register()
4951 self
._nsm
= NsManager(
4956 self
._nsr
_pub
_handler
,
4957 self
._vnfr
_pub
_handler
,
4958 self
._vlr
_pub
_handler
,
4959 self
._ro
_plugin
_selector
,
4961 self
._vnfd
_pub
_handler
,
4962 self
._cloud
_account
_handler
,
4965 yield from self
._nsm
.register()
4966 self
.log
.debug("Register NsmProject for {} complete".format(self
.name
))
4968 def deregister(self
):
4969 self
._log
.debug("Project {} de-register".format(self
.name
))
4970 self
._nsm
.deregister()
4971 self
._vnffgmgr
.deregister()
4972 self
._cloud
_account
_handler
.deregister()
4973 self
._ro
_plugin
_selector
.deregister()
4974 self
._nsr
_pub
_handler
.deregister()
4975 self
._vnfr
_pub
_handler
.deregister()
4976 self
._vlr
_pub
_handler
.deregister()
4977 self
._vlr
_sub
_handler
.deregister()
4981 def delete_prepare(self
):
4982 if self
._nsm
and self
._nsm
._nsrs
:
4983 delete_msg
= "Project has NSR associated with it. Delete all Project NSR and try again."
4984 return False, delete_msg
4988 class NsmTasklet(rift
.tasklets
.Tasklet
):
4990 The network service manager tasklet
4992 def __init__(self
, *args
, **kwargs
):
4993 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
4994 self
.rwlog
.set_category("rw-mano-log")
4995 self
.rwlog
.set_subcategory("nsm")
4998 self
.project_handler
= None
5006 """ The task start callback """
5007 super(NsmTasklet
, self
).start()
5008 self
.log
.info("Starting NsmTasklet")
5010 self
.log
.debug("Registering with dts")
5011 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
5012 RwNsmYang
.get_schema(),
5014 self
.on_dts_state_change
)
5016 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
5022 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
5025 def on_instance_started(self
):
5026 """ Task instance started callback """
5027 self
.log
.debug("Got instance started callback")
5031 """ Task init callback """
5032 self
.log
.debug("Got instance started callback")
5034 self
.log
.debug("creating project handler")
5035 self
.project_handler
= ProjectHandler(self
, NsmProject
)
5036 self
.project_handler
.register()
5042 """ Task run callback """
5046 def on_dts_state_change(self
, state
):
5047 """Take action according to current dts state to transition
5048 application into the corresponding application state
5051 state - current dts state
5054 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
5055 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
5059 rwdts
.State
.INIT
: self
.init
,
5060 rwdts
.State
.RUN
: self
.run
,
5063 # Transition application to next state
5064 handler
= handlers
.get(state
, None)
5065 if handler
is not None:
5066 yield from handler()
5068 # Transition dts to next state
5069 next_state
= switch
.get(state
, None)
5070 if next_state
is not None:
5071 self
.log
.debug("Changing state to %s", next_state
)
5072 self
._dts
.handle
.set_state(next_state
)