2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
22 import ncclient
.asyncio_manager
32 from collections
import defaultdict
33 from collections
import deque
35 from urllib
.parse
import urlparse
37 # disable unsigned certificate warning
38 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
39 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
41 gi
.require_version('RwYang', '1.0')
42 gi
.require_version('NsdBaseYang', '1.0')
43 gi
.require_version('ProjectNsdYang', '1.0')
44 gi
.require_version('RwDts', '1.0')
45 gi
.require_version('RwNsmYang', '1.0')
46 gi
.require_version('RwNsrYang', '1.0')
47 gi
.require_version('NsrYang', '1.0')
48 gi
.require_version('RwTypes', '1.0')
49 gi
.require_version('RwVlrYang', '1.0')
50 gi
.require_version('RwVnfrYang', '1.0')
51 gi
.require_version('VnfrYang', '1.0')
52 gi
.require_version('ProjectVnfdYang', '1.0')
53 from gi
.repository
import (
58 ProjectNsdYang
as NsdYang
,
69 gi
.require_version('RwKeyspec', '1.0')
70 from gi
.repository
.RwKeyspec
import quoted_key
72 from rift
.mano
.utils
.ssh_keys
import ManoSshKey
73 import rift
.mano
.ncclient
74 import rift
.mano
.config_data
.config
75 import rift
.mano
.dts
as mano_dts
77 from rift
.mano
.utils
.project
import (
80 get_add_delete_update_cfgs
,
84 from . import rwnsm_conman
as conman
86 from . import publisher
87 from . import subscriber
89 from . import config_value_pool
90 from . import rwvnffgmgr
91 from . import scale_group
92 from . import rwnsmplugin
93 from . import openmano_nsm
97 class NetworkServiceRecordState(Enum
):
98 """ Network Service Record State """
102 VNFFG_INIT_PHASE
= 104
108 VL_TERMINATE_PHASE
= 111
109 VNF_TERMINATE_PHASE
= 112
110 VNFFG_TERMINATE_PHASE
= 113
117 class NetworkServiceRecordError(Exception):
118 """ Network Service Record Error """
122 class NetworkServiceDescriptorError(Exception):
123 """ Network Service Descriptor Error """
127 class VirtualNetworkFunctionRecordError(Exception):
128 """ Virtual Network Function Record Error """
132 class NetworkServiceDescriptorNotFound(Exception):
133 """ Cannot find Network Service Descriptor"""
137 class NetworkServiceDescriptorNotFound(Exception):
138 """ Network Service Descriptor reference count exists """
141 class NsrInstantiationFailed(Exception):
142 """ Failed to instantiate network service """
146 class VnfInstantiationFailed(Exception):
147 """ Failed to instantiate virtual network function"""
151 class VnffgInstantiationFailed(Exception):
152 """ Failed to instantiate virtual network function"""
156 class VnfDescriptorError(Exception):
157 """Failed to instantiate virtual network function"""
161 class ScalingOperationError(Exception):
165 class ScaleGroupMissingError(Exception):
169 class PlacementGroupError(Exception):
173 class NsrNsdUpdateError(Exception):
177 class NsrVlUpdateError(NsrNsdUpdateError
):
180 class VirtualLinkRecordError(Exception):
181 """ Virtual Links Record Error """
185 class VlRecordState(Enum
):
186 """ VL Record State """
188 INSTANTIATION_PENDING
= 102
190 TERMINATE_PENDING
= 104
195 class VnffgRecordState(Enum
):
196 """ VNFFG Record State """
198 INSTANTIATION_PENDING
= 102
200 TERMINATE_PENDING
= 104
205 class VnffgRecord(object):
206 """ Vnffg Records class"""
209 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
,cloud_account_name
):
214 self
._vnffgmgr
= vnffgmgr
216 self
._nsr
_name
= nsr_name
217 self
._vnffgd
_msg
= vnffgd_msg
218 self
._cloud
_account
_name
= cloud_account_name
219 if sdn_account_name
is None:
220 self
._sdn
_account
_name
= ''
222 self
._sdn
_account
_name
= sdn_account_name
224 self
._vnffgr
_id
= str(uuid
.uuid4())
225 self
._vnffgr
_rsp
_id
= list()
226 self
._vnffgr
_state
= VnffgRecordState
.INIT
231 return self
._vnffgr
_id
235 """ state of this VNF """
236 return self
._vnffgr
_state
238 def fetch_vnffgr(self
):
240 Get VNFFGR message to be published
243 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
244 vnffgr_dict
= {"id": self
._vnffgr
_id
,
245 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
246 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
247 "sdn_account": self
._sdn
_account
_name
,
248 "operational_status": 'init',
250 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
251 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
252 vnffgr_dict
= {"id": self
._vnffgr
_id
,
253 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
254 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
255 "sdn_account": self
._sdn
_account
_name
,
256 "operational_status": 'terminated',
258 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
261 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
263 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
264 self
._vnffgr
_state
= VnffgRecordState
.FAILED
265 vnffgr_dict
= {"id": self
._vnffgr
_id
,
266 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
267 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
268 "sdn_account": self
._sdn
_account
_name
,
269 "operational_status": 'failed',
271 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
276 def vnffgr_create_msg(self
):
277 """ Virtual Link Record message for Creating VLR in VNS """
278 vnffgr_dict
= {"id": self
._vnffgr
_id
,
279 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
280 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
281 "sdn_account": self
._sdn
_account
_name
,
282 "cloud_account": self
._cloud
_account
_name
,
284 vnffgr
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
285 for rsp
in self
._vnffgd
_msg
.rsp
:
286 vnffgr_rsp
= vnffgr
.rsp
.add()
287 vnffgr_rsp
.id = str(uuid
.uuid4())
288 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
289 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
290 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
291 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
292 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
293 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
294 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
295 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
296 self
._log
.debug("Service Function Type for VNFD ID %s is %s",
297 rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
299 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",
300 rsp_cp_ref
.vnfd_id_ref
)
303 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
304 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
305 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
306 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
307 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
308 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
309 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
310 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
311 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
312 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
313 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
315 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
316 self
._log
.debug(" Received VNFR is %s", vnfr
)
317 while vnfr
.operational_status
!= 'running':
318 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
319 if vnfr
.operational_status
== 'failed':
320 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
321 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
323 yield from asyncio
.sleep(2, loop
=self
._loop
)
324 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
325 self
._log
.debug("Received VNFR is %s", vnfr
)
327 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
328 for cp
in vnfr
.connection_point
:
329 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
330 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
331 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
332 for vdu
in vnfr
.vdur
:
333 for intf
in vdu
.interface
:
334 if intf
.type_yang
== "EXTERNAL" and intf
.external_connection_point_ref
== vnfr_cp_ref
.vnfr_connection_point_ref
:
335 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
336 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
337 vnfr_cp_ref
.connection_point_params
.vm_id
)
340 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
341 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
343 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
344 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
346 rsp_id_ref
= _rsp
[0].id
347 rsp_name
= _rsp
[0].name
349 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",
350 vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
352 vnffgr_classifier
= vnffgr
.classifier
.add()
353 vnffgr_classifier
.id = vnffgd_classifier
.id
354 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
355 _rsp
[0].classifier_name
= vnffgr_classifier
.name
356 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
357 vnffgr_classifier
.rsp_name
= rsp_name
358 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
359 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
360 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
361 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
362 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
363 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
365 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
366 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
368 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
369 self
._log
.debug(" Received VNFR is %s", vnfr
)
370 while vnfr
.operational_status
!= 'running':
371 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
372 if vnfr
.operational_status
== 'failed':
373 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
374 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
376 yield from asyncio
.sleep(2, loop
=self
._loop
)
377 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
378 self
._log
.debug("Received VNFR is %s", vnfr
)
380 for cp
in vnfr
.connection_point
:
381 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
382 vnffgr_classifier
.port_id
= cp
.connection_point_id
383 vnffgr_classifier
.ip_address
= cp
.ip_address
384 for vdu
in vnfr
.vdur
:
385 for intf
in vdu
.interface
:
386 if intf
.type_yang
== "EXTERNAL" and intf
.external_connection_point_ref
== vnffgr_classifier
.vnfr_connection_point_ref
:
387 vnffgr_classifier
.vm_id
= vdu
.vim_id
388 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",
390 vnfr_cp_ref
.connection_point_params
.vm_id
)
393 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
397 def vnffgr_nsr_sff_list(self
):
398 """ SFF List for VNFR """
400 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
402 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
403 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
404 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
405 self
._log
.debug(" Received VNFR is %s", vnfr
)
406 while vnfr
.operational_status
!= 'running':
407 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
408 if vnfr
.operational_status
== 'failed':
409 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
410 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
411 yield from asyncio
.sleep(2, loop
=self
._loop
)
412 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
413 self
._log
.debug("Received VNFR is %s", vnfr
)
415 sff
= RwsdnalYang
.YangData_RwProject_Project_Vnffgs_VnffgChain_Sff()
416 sff_list
[nsr_vnfr
.vnfd
.id] = sff
417 sff
.name
= nsr_vnfr
.name
418 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
420 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
421 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
422 for cp
in vnfr
.connection_point
:
423 sff_dp
= sff
.dp_endpoints
.add()
424 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
425 sff_dp
.address
= cp
.ip_address
426 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
427 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
428 for sf_name
in sf_list
:
429 _sf
= sff
.vnfr_list
.add()
430 _sf
.vnfr_name
= sf_name
435 def instantiate(self
):
436 """ Instantiate this VNFFG """
438 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
442 vnffgr_request
= yield from self
.vnffgr_create_msg()
443 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
446 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
447 except Exception as e
:
448 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
449 self
._vnffgr
_state
= VnffgRecordState
.FAILED
450 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
452 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
454 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
455 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
457 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
458 yield from self
._nsr
.update_state()
460 def vnffgr_in_vnffgrm(self
):
461 """ Is there a VNFR record in VNFM """
462 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
463 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
464 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
471 """ Terminate this VNFFGR """
472 if not self
.vnffgr_in_vnffgrm():
473 self
._log
.error("Ignoring terminate request for id %s in state %s",
474 self
.id, self
._vnffgr
_state
)
477 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
478 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
480 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
482 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
483 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
486 class VirtualLinkRecord(object):
487 """ Virtual Link Records class"""
488 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
491 def create_record(dts
, log
, loop
, project
, nsr_name
, vld_msg
,
492 datacenter
, ip_profile
, nsr_id
, restart_mode
=False):
493 """Creates a new VLR object based on the given data.
495 If restart mode is enabled, then we look for existing records in the
496 DTS and create a VLR records using the exiting data(ID)
501 vlr_obj
= VirtualLinkRecord(
514 res_iter
= yield from dts
.query_read(
515 project
.add_project("D,/vlr:vlr-catalog/vlr:vlr"),
516 rwdts
.XactFlag
.MERGE
)
519 response
= yield from fut
520 vlr
= response
.result
522 # Check if the record is already present, if so use the ID of
523 # the existing record. Since the name of the record is uniquely
524 # formed we can use it as a search key!
525 if vlr
.name
== vlr_obj
.name
:
526 vlr_obj
.reset_id(vlr
.id)
531 def __init__(self
, dts
, log
, loop
, project
, nsr_name
, vld_msg
,
532 datacenter
, ip_profile
, nsr_id
):
536 self
._project
= project
537 self
._nsr
_name
= nsr_name
538 self
._vld
_msg
= vld_msg
539 self
._datacenter
_name
= datacenter
540 self
._assigned
_subnet
= None
541 self
._nsr
_id
= nsr_id
542 self
._ip
_profile
= ip_profile
543 self
._vlr
_id
= str(uuid
.uuid4())
544 self
._state
= VlRecordState
.INIT
545 self
._prev
_state
= None
546 self
._create
_time
= int(time
.time())
547 self
.state_failed_reason
= None
551 """ path for this object """
552 return self
._project
.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
553 format(quoted_key(self
._vlr
_id
)))
562 """ Get NSR name for this VL """
567 """ Virtual Link Desciptor """
571 def assigned_subnet(self
):
572 """ Subnet assigned to this VL"""
573 return self
._assigned
_subnet
578 Get the name for this VLR.
579 VLR name is "nsr name:VLD name"
581 if self
.vld_msg
.vim_network_name
:
582 return self
.vld_msg
.vim_network_name
583 elif self
.vld_msg
.name
== "multisite":
584 # This is a temporary hack to identify manually provisioned inter-site network
585 return self
.vld_msg
.name
587 return self
._project
.name
+ "." +self
._nsr
_name
+ "." + self
.vld_msg
.name
590 def datacenter_name(self
):
591 """ Datacenter that this VLR should be created in """
592 return self
._datacenter
_name
596 """ Get the VLR path from VLR """
597 return (VirtualLinkRecord
.XPATH
+ "[vlr:id={}]").format(quoted_key(vlr
.id))
605 def state(self
, value
):
606 """ VLR set state """
610 def prev_state(self
):
611 """ VLR previous state """
612 return self
._prev
_state
615 def prev_state(self
, value
):
616 """ VLR set previous state """
617 self
._prev
_state
= value
621 """ Virtual Link Record message for Creating VLR in VNS """
622 vld_fields
= ["short_name",
630 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
633 vlr_dict
= {"id": self
._vlr
_id
,
634 "nsr_id_ref": self
._nsr
_id
,
635 "vld_ref": self
.vld_msg
.id,
637 "create_time": self
._create
_time
,
638 "datacenter": self
._datacenter
_name
,
641 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
642 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
645 vlr_dict
.update(vld_copy_dict
)
646 vlr
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.from_dict(vlr_dict
)
648 if self
.vld_msg
.has_field('virtual_connection_points'):
649 for cp
in self
.vld_msg
.virtual_connection_points
:
650 vcp
= vlr
.virtual_connection_points
.add()
651 vcp
.from_dict(cp
.as_dict())
654 def reset_id(self
, vlr_id
):
655 self
._vlr
_id
= vlr_id
657 def create_nsr_vlr_msg(self
, vnfrs
):
658 """ The VLR message"""
659 nsr_vlr
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr()
660 nsr_vlr
.vlr_ref
= self
._vlr
_id
661 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
662 nsr_vlr
.datacenter
= self
._datacenter
_name
664 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
666 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
667 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
668 self
._datacenter
_name
== vnfr
._datacenter
_name
):
669 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
670 cp_entry
.vnfr_id
= vnfr
.id
671 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
676 def instantiate(self
):
677 """ Instantiate this VL """
678 self
._log
.debug("Instaniating VLR key %s, vld %s",
679 self
.xpath
, self
._vld
_msg
)
681 self
._state
= VlRecordState
.INSTANTIATION_PENDING
682 self
._log
.debug("Executing VL create path:%s msg:%s",
683 self
.xpath
, self
.vlr_msg
)
685 with self
._dts
.transaction(flags
=0) as xact
:
686 block
= xact
.block_create()
687 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
688 self
._log
.debug("Executing VL create path:%s msg:%s",
689 self
.xpath
, self
.vlr_msg
)
690 res_iter
= yield from block
.execute(now
=True)
696 self
._state
= VlRecordState
.FAILED
697 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
699 if vlr
.operational_status
== 'failed':
700 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
701 self
._state
= VlRecordState
.FAILED
702 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
704 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
706 self
._assigned
_subnet
= vlr
.assigned_subnet
708 def vlr_in_vns(self
):
709 """ Is there a VLR record in VNS """
710 if (self
._state
== VlRecordState
.ACTIVE
or
711 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
712 self
._state
== VlRecordState
.TERMINATE_PENDING
or
713 self
._state
== VlRecordState
.FAILED
):
720 """ Terminate this VL """
721 if not self
.vlr_in_vns():
722 self
._log
.debug("Ignoring terminate request for id %s in state %s",
723 self
.id, self
._state
)
726 self
._log
.debug("Terminating VL id:%s", self
.id)
727 self
._state
= VlRecordState
.TERMINATE_PENDING
729 with self
._dts
.transaction(flags
=0) as xact
:
730 block
= xact
.block_create()
731 block
.add_query_delete(self
.xpath
)
732 yield from block
.execute(flags
=0, now
=True)
734 self
._state
= VlRecordState
.TERMINATED
735 self
._log
.debug("Terminated VL id:%s", self
.id)
737 def set_state_from_op_status(self
, operational_status
):
738 """ Set the state of this VL based on operational_status"""
740 self
._log
.debug("set_state_from_op_status called for vlr id %s with value %s", self
.id, operational_status
)
741 if operational_status
== 'running':
742 self
._state
= VlRecordState
.ACTIVE
743 elif operational_status
== 'failed':
744 self
._state
= VlRecordState
.FAILED
745 elif operational_status
== 'vl_alloc_pending':
746 self
._state
= VlRecordState
.INSTANTIATION_PENDING
748 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status
))
750 class VnfRecordState(Enum
):
751 """ Vnf Record State """
753 INSTANTIATION_PENDING
= 102
755 TERMINATE_PENDING
= 104
760 class VirtualNetworkFunctionRecord(object):
761 """ Virtual Network Function Record class"""
762 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
766 def create_record(dts
, log
, loop
, project
, vnfd
, nsr_config
, const_vnfd_msg
, nsd_id
, nsr_name
,
767 datacenter_name
, nsr_id
, group_name
, group_instance_id
,
768 placement_groups
, cloud_config
, restart_mode
=False):
769 """Creates a new VNFR object based on the given data.
771 If restart mode is enabled, then we look for existing records in the
772 DTS and create a VNFR records using the exiting data(ID)
775 VirtualNetworkFunctionRecord
778 vnfr_obj
= VirtualNetworkFunctionRecord(
794 restart_mode
=restart_mode
)
797 res_iter
= yield from dts
.query_read(
798 project
.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"),
799 rwdts
.XactFlag
.MERGE
)
802 response
= yield from fut
803 vnfr
= response
.result
805 if vnfr
.name
== vnfr_obj
.name
:
806 vnfr_obj
.reset_id(vnfr
.id)
824 group_instance_id
=None,
825 placement_groups
= [],
827 restart_mode
= False):
831 self
._project
= project
833 self
._nsr
_config
= nsr_config
834 self
._const
_vnfd
_msg
= const_vnfd_msg
835 self
._nsd
_id
= nsd_id
836 self
._nsr
_name
= nsr_name
837 self
._nsr
_id
= nsr_id
838 self
._datacenter
_name
= datacenter_name
839 self
._group
_name
= group_name
840 self
._group
_instance
_id
= group_instance_id
841 self
._placement
_groups
= placement_groups
842 self
._cloud
_config
= cloud_config
843 self
.restart_mode
= restart_mode
845 self
._config
_status
= NsrYang
.ConfigStates
.INIT
846 self
._create
_time
= int(time
.time())
848 self
._prev
_state
= VnfRecordState
.INIT
849 self
._state
= VnfRecordState
.INIT
850 self
._state
_failed
_reason
= None
852 self
._active
_vdus
= 0
854 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
857 self
._vnfr
_id
= str(uuid
.uuid4())
860 self
.substitute_vnf_input_parameters
= VnfInputParameterSubstitution(self
._log
,
861 self
._const
_vnfd
_msg
,
863 self
._vnfr
_msg
= self
.create_vnfr_msg()
864 self
._log
.debug("Set VNFR {} config type to {}".
865 format(self
.name
, self
.config_type
))
868 if group_name
is None and group_instance_id
is not None:
869 raise ValueError("Group instance id must not be provided with an empty group name")
879 return self
._project
.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]"
880 .format(quoted_key(self
.id)))
885 return self
._vnfr
_msg
888 def const_vnfr_msg(self
):
890 return RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef(
891 vnfr_id
=self
.id, datacenter
=self
._datacenter
_name
)
899 def datacenter_name(self
):
900 """ Datacenter that this VNF should be created in """
901 return self
._datacenter
_name
906 """ Is this VNF actve """
907 return True if self
._state
== VnfRecordState
.ACTIVE
else False
911 """ state of this VNF """
915 def state_failed_reason(self
):
916 """ Error message in case this VNF is in failed state """
917 return self
._state
_failed
_reason
920 def member_vnf_index(self
):
921 """ Member VNF index """
922 return self
._const
_vnfd
_msg
.member_vnf_index
927 return self
._nsr
_name
931 """ Name of this VNFR """
932 if self
._name
is not None:
935 name_tags
= [self
._project
.name
, self
._nsr
_name
]
937 if self
._group
_name
is not None:
938 name_tags
.append(self
._group
_name
)
940 if self
._group
_instance
_id
is not None:
941 name_tags
.append(str(self
._group
_instance
_id
))
943 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
945 self
._name
= "__".join(name_tags
)
950 def vnfr_xpath(vnfr
):
951 """ Get the VNFR path from VNFR """
952 return (VirtualNetworkFunctionRecord
.XPATH
+
953 "[vnfr:id={}]").format(quoted_key(vnfr
.id))
956 def config_type(self
):
957 cfg_types
= ['netconf', 'juju', 'script']
958 for method
in cfg_types
:
959 if self
._vnfd
.vnf_configuration
.has_field(method
):
964 def config_status(self
):
965 """Return the config status as YANG ENUM string"""
966 self
._log
.debug("Map VNFR {} config status {} ({})".
967 format(self
.name
, self
._config
_status
, self
.config_type
))
968 if self
.config_type
== 'none':
969 return 'config_not_needed'
970 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
972 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
977 def set_state(self
, state
):
978 """ set the state of this object """
979 self
._prev
_state
= self
._state
982 def reset_id(self
, vnfr_id
):
983 self
._vnfr
_id
= vnfr_id
984 self
._vnfr
_msg
= self
.create_vnfr_msg()
987 self
.config_store
.merge_vnfd_config(
991 self
.member_vnf_index
,
994 def create_vnfr_msg(self
):
995 """ VNFR message for this VNFR """
1003 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
1006 "nsr_id_ref": self
._nsr
_id
,
1008 "datacenter": self
._datacenter
_name
,
1009 "config_status": self
.config_status
1011 vnfr_dict
.update(vnfd_copy_dict
)
1013 vnfr
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1014 vnfr
.vnfd
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd
. \
1015 from_dict(self
.vnfd
.as_dict())
1016 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
1017 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
1019 if self
._vnfd
.mgmt_interface
.has_field("port"):
1020 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
1022 for group_info
in self
._placement
_groups
:
1023 group
= vnfr
.placement_groups_info
.add()
1024 group
.from_dict(group_info
.as_dict())
1026 if self
._cloud
_config
and len(self
._cloud
_config
.as_dict()):
1027 self
._log
.debug("Cloud config during vnfr create is {}".format(self
._cloud
_config
))
1028 vnfr
.cloud_config
= self
._cloud
_config
1030 # UI expects the monitoring param field to exist
1031 vnfr
.monitoring_param
= []
1033 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
1035 if self
.restart_mode
:
1036 vnfr
.operational_status
= 'init'
1038 # Set Operational Status as pre-init for Input Param Substitution
1039 if self
._state
not in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATE_PENDING
,
1040 VnfRecordState
.TERMINATED
, VnfRecordState
.FAILED
]:
1041 # To make sure that an active VNFR is not updated with a previous state.
1042 # This can happen during config state updates.
1043 vnfr
.operational_status
= 'pre_init'
1045 vnfr
.operational_status
= self
._state
1050 def update_vnfm(self
):
1051 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
1052 format(self
.name
, self
.vnfr_msg
))
1053 yield from self
._dts
.query_update(
1055 rwdts
.XactFlag
.REPLACE
,
1059 def get_config_status(self
):
1060 """Return the config status as YANG ENUM"""
1061 return self
._config
_status
1064 def set_config_status(self
, status
):
1066 def status_to_string(status
):
1068 NsrYang
.ConfigStates
.INIT
: 'init',
1069 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
1070 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
1071 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
1072 NsrYang
.ConfigStates
.FAILED
: 'failed',
1075 return status_dc
[status
]
1077 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
1078 format(self
.name
, self
._config
_status
,
1079 self
.config_type
, status
))
1080 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1081 self
._log
.warning("Updating already configured VNFR {}".
1085 if self
._config
_status
!= status
:
1087 self
._config
_status
= status
1088 # I don't think this is used. Original implementor can check.
1089 # Caused Exception, so corrected it by status_to_string
1090 # But not sure whats the use of this variable?
1091 self
.vnfr_msg
.config_status
= status_to_string(status
)
1092 except Exception as e
:
1093 self
._log
.exception("Exception=%s", str(e
))
1095 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1097 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1099 # Publish only after VNFM has the VNFR created
1100 yield from self
.update_vnfm()
1101 except Exception as e
:
1102 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1103 format(status
, self
.name
, e
))
1104 self
._log
.exception(e
)
1106 def is_configured(self
):
1107 if self
.config_type
== 'none':
1110 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1116 def update_config_primitives(self
, vnf_config
, nsr
):
1117 # Update only after we are configured
1118 if self
._config
_status
== NsrYang
.ConfigStates
.INIT
:
1121 if not vnf_config
.as_dict():
1124 self
._log
.debug("Update VNFR {} config: {}".
1125 format(self
.name
, vnf_config
.as_dict()))
1127 # Update config primitive
1129 for prim
in self
._vnfd
.vnf_configuration
.config_primitive
:
1130 for p
in vnf_config
.config_primitive
:
1131 if prim
.name
== p
.name
:
1132 for param
in prim
.parameter
:
1133 for pa
in p
.parameter
:
1134 if pa
.name
== param
.name
:
1135 if pa
.default_value
and \
1136 (pa
.default_value
!= param
.default_value
):
1137 param
.default_value
= pa
.default_value
1138 param
.read_only
= pa
.read_only
1141 self
._log
.debug("Prim: {}".format(prim
.as_dict()))
1145 self
._log
.debug("Updated VNFD {} config: {}".
1146 format(self
._vnfd
.name
,
1147 self
._vnfd
.vnf_configuration
))
1148 self
._vnfr
_msg
= self
.create_vnfr_msg()
1151 yield from nsr
.nsm_plugin
.update_vnfr(self
)
1152 except Exception as e
:
1153 self
._log
.error("Exception updating VNFM with new config "
1154 "primitive for VNFR {}: {}".
1155 format(self
.name
, e
))
1156 self
._log
.exception(e
)
1159 def instantiate(self
, nsr
):
1160 """ Instantiate this VNFR"""
1162 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1163 self
.xpath
, self
._vnfd
)
1165 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1166 self
.xpath
, self
.vnfr_msg
)
1168 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1170 def find_vlr_for_cp(conn
):
1171 """ Find VLR for the given connection point """
1172 for vlr_id
, vlr
in nsr
.vlrs
.items():
1173 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1174 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1175 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1176 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1177 vlr
._datacenter
_name
== self
._datacenter
_name
):
1178 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1179 conn
.name
, self
.member_vnf_index
)
1183 # For every connection point in the VNFD fill in the identifier
1184 self
._log
.debug("Add connection point for VNF %s: %s",
1185 self
.vnfr_msg
.name
, self
._vnfd
.connection_point
)
1186 for conn_p
in self
._vnfd
.connection_point
:
1187 cpr
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint()
1188 cpr
.name
= conn_p
.name
1189 cpr
.type_yang
= conn_p
.type_yang
1190 if conn_p
.has_field('port_security_enabled'):
1191 cpr
.port_security_enabled
= conn_p
.port_security_enabled
1193 vlr_ref
= find_vlr_for_cp(conn_p
)
1195 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1196 self
._log
.debug("%s", msg
)
1197 # raise VirtualNetworkFunctionRecordError(msg)
1200 cpr
.vlr_ref
= vlr_ref
.id
1202 self
.vnfr_msg
.connection_point
.append(cpr
)
1203 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1204 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd
.id)
1206 self
._log
.debug("VNFR {} restart mode {}".
1207 format(self
.vnfr_msg
.id, self
.restart_mode
))
1208 if not self
.restart_mode
:
1209 # Checking for NS Terminate.
1210 if nsr
._ns
_terminate
_received
== False:
1211 # Create with pre-init operational state publishes the vnfr for substitution.
1212 yield from self
._dts
.query_create(self
.xpath
, 0, self
.vnfr_msg
)
1213 # Call to substitute VNF Input Parameter
1214 self
.substitute_vnf_input_parameters(self
.vnfr_msg
, self
._nsr
_config
)
1215 # Calling Update with pre-init operational data after Param substitution to instatntiate vnfr
1216 yield from self
._dts
.query_update(self
.xpath
, 0, self
.vnfr_msg
)
1219 yield from self
._dts
.query_update(self
.xpath
,
1223 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1224 self
.xpath
, self
.vnfr_msg
)
1227 def update_state(self
, vnfr_msg
):
1228 """ Update this VNFR"""
1229 if vnfr_msg
.operational_status
== "running":
1230 if self
.vnfr_msg
.operational_status
!= "running":
1231 yield from self
.is_active()
1232 elif vnfr_msg
.operational_status
== "failed":
1233 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1236 def is_active(self
):
1237 """ This VNFR is active """
1238 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1239 self
.set_state(VnfRecordState
.ACTIVE
)
1242 def instantiation_failed(self
, failed_reason
=None):
1243 """ This VNFR instantiation failed"""
1244 self
._log
.debug("VNFR %s instantiation failed", self
._vnfr
_id
)
1245 self
.set_state(VnfRecordState
.FAILED
)
1246 self
._state
_failed
_reason
= failed_reason
1248 def vnfr_in_vnfm(self
):
1249 """ Is there a VNFR record in VNFM """
1250 if (self
._state
== VnfRecordState
.ACTIVE
or
1251 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1252 self
._state
== VnfRecordState
.FAILED
):
1258 def terminate(self
):
1259 """ Terminate this VNF """
1260 if not self
.vnfr_in_vnfm():
1261 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1262 self
.id, self
._state
)
1265 self
._log
.debug("Terminating VNF id:%s", self
.id)
1266 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1267 with self
._dts
.transaction(flags
=0) as xact
:
1268 block
= xact
.block_create()
1269 block
.add_query_delete(self
.xpath
)
1270 yield from block
.execute(flags
=0)
1271 self
.set_state(VnfRecordState
.TERMINATED
)
1272 self
._log
.debug("Terminated VNF id:%s", self
.id)
1275 class NetworkServiceStatus(object):
1276 """ A class representing the Network service's status """
1277 MAX_EVENTS_RECORDED
= 10
1278 """ Network service Status class"""
1279 def __init__(self
, dts
, log
, loop
):
1284 self
._state
= NetworkServiceRecordState
.INIT
1285 self
._events
= deque([])
1288 def create_notification(self
, evt
, evt_desc
, evt_details
):
1289 xp
= "N,/rw-nsr:nsm-notification"
1290 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1292 notif
.description
= evt_desc
1293 notif
.details
= evt_details
if evt_details
is not None else None
1295 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1296 self
._log
.info("Notification called by creating dts query: %s", notif
)
1298 def record_event(self
, evt
, evt_desc
, evt_details
):
1299 """ Record an event """
1300 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1301 evt
, evt_desc
, len(self
._events
))
1302 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1303 self
._events
.popleft()
1304 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1305 evt_details
if evt_details
is not None else None))
1307 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1309 def set_state(self
, state
):
1310 """ set the state of this status object """
1314 """ Return the state as a yang enum string """
1315 state_to_str_map
= {"INIT": "init",
1316 "VL_INIT_PHASE": "vl_init_phase",
1317 "VNF_INIT_PHASE": "vnf_init_phase",
1318 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1319 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1320 "RUNNING": "running",
1321 "SCALING_OUT": "scaling_out",
1322 "SCALING_IN": "scaling_in",
1323 "TERMINATE_RCVD": "terminate_rcvd",
1324 "TERMINATE": "terminate",
1325 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1326 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1327 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1328 "TERMINATED": "terminated",
1330 "VL_INSTANTIATE": "vl_instantiate",
1331 "VL_TERMINATE": "vl_terminate",
1333 return state_to_str_map
[self
._state
.name
]
1337 """ State of this status object """
1342 """ Network Service Record as a message"""
1345 for entry
in self
._events
:
1346 event
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents()
1349 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1350 event_list
.append(event
)
1354 class NetworkServiceRecord(object):
1355 """ Network service record """
1356 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1358 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
,
1359 sdn_account_name
, key_pairs
, project
, restart_mode
=False,
1365 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1366 self
._nsm
_plugin
= nsm_plugin
1367 self
._sdn
_account
_name
= sdn_account_name
1368 self
._vlr
_handler
= vlr_handler
1369 self
._project
= project
1372 self
._nsr
_msg
= None
1373 self
._nsr
_regh
= None
1374 self
._key
_pairs
= key_pairs
1375 self
._ssh
_key
_file
= None
1376 self
._ssh
_pub
_key
= None
1381 self
._param
_pools
= {}
1382 self
._scaling
_groups
= {}
1383 self
._create
_time
= int(time
.time())
1384 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1385 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1386 self
._config
_status
_details
= None
1388 self
.restart_mode
= restart_mode
1389 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1390 self
._debug
_running
= False
1391 self
._is
_active
= False
1392 self
._vl
_phase
_completed
= False
1393 self
._vnf
_phase
_completed
= False
1394 self
.instantiated
= set()
1396 # Used for orchestration_progress
1397 self
._active
_vms
= 0
1398 self
._active
_networks
= 0
1400 # A flag to indicate if the NS has failed, currently it is recorded in
1401 # operational status, but at the time of termination this field is
1402 # over-written making it difficult to identify the failure.
1403 self
._is
_failed
= False
1405 # Initalise the state to init
1406 # The NSR moves through the following transitions
1407 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1408 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1409 # 3. VNFS_READY - READY when the NSR is published
1411 self
.set_state(NetworkServiceRecordState
.INIT
)
1413 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
, self
._project
)
1415 # Create an asyncio loop to know when the virtual links are ready
1416 self
._vls
_ready
= asyncio
.Event(loop
=self
._loop
)
1418 # This variable stores all the terminate events received per NS. This is then used to prevent any
1419 # further nsr non-terminate updates received in case of terminate being called bedore ns in in running state.
1420 self
._ns
_terminate
_received
= False
1423 def nsm_plugin(self
):
1425 return self
._nsm
_plugin
1427 def set_state(self
, state
):
1428 """ Set state for this NSR"""
1429 # We are in init phase and is moving to the next state
1430 # The new state could be a FAILED state or VNF_INIIT_PHASE
1431 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1432 self
._vl
_phase
_completed
= True
1434 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1435 self
._vnf
_phase
_completed
= True
1437 self
._op
_status
.set_state(state
)
1439 self
._nsm
_plugin
.set_state(self
.id, state
)
1443 """ Get id for this NSR"""
1444 return self
._nsr
_cfg
_msg
.id
1448 """ Name of this network service record """
1449 return self
._nsr
_cfg
_msg
.name
1452 def _datacenter_name(self
):
1453 if self
._nsr
_cfg
_msg
.has_field('datacenter'):
1454 return self
._nsr
_cfg
_msg
.datacenter
1459 """State of this NetworkServiceRecord"""
1460 return self
._op
_status
.state
1464 """ Is this NSR active ?"""
1465 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1469 """ VLRs associated with this NSR"""
1474 """ VNFRs associated with this NSR"""
1479 """ VNFFGRs associated with this NSR"""
1480 return self
._vnffgrs
1483 def scaling_groups(self
):
1484 """ Scaling groups associated with this NSR """
1485 return self
._scaling
_groups
1488 def param_pools(self
):
1489 """ Parameter value pools associated with this NSR"""
1490 return self
._param
_pools
1493 def nsr_cfg_msg(self
):
1494 return self
._nsr
_cfg
_msg
1497 def nsr_cfg_msg(self
, msg
):
1498 self
._nsr
_cfg
_msg
= msg
1502 """ NSD Protobuf for this NSR """
1503 if self
._nsd
is not None:
1505 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1510 """ NSD ID for this NSR """
1511 return self
.nsd_msg
.id
1515 ''' Get a new job id for config primitive'''
1520 def config_status(self
):
1521 """ Config status for NSR """
1522 return self
._config
_status
1530 def is_failed(self
):
1531 return self
._is
_failed
1534 def public_key(self
):
1535 return self
._ssh
_pub
_key
1538 def private_key(self
):
1539 return self
._ssh
_key
_file
1541 def resolve_placement_group_cloud_construct(self
, input_group
):
1543 Returns the cloud specific construct for placement group
1545 copy_dict
= ['name', 'requirement', 'strategy']
1547 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1548 if group_info
.placement_group_ref
== input_group
.name
:
1549 group
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1550 group_dict
= {k
:v
for k
,v
in
1551 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1552 for param
in copy_dict
:
1553 group_dict
.update({param
: getattr(input_group
, param
)})
1554 group
.from_dict(group_dict
)
1560 return "NSR(name={}, nsd_id={}, data center={})".format(
1561 self
.name
, self
.nsd_id
, self
._datacenter
_name
1564 def _get_vnfd(self
, vnfd_id
, config_xact
):
1565 """ Fetch vnfd msg for the passed vnfd id """
1566 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1568 def _get_vnfd_datacenter(self
, vnfd_member_index
):
1569 """ Fetch datacenter for the passed vnfd id """
1570 if self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
1571 vim_accounts
= [vnf
.datacenter
for vnf
in self
._nsr
_cfg
_msg
.vnf_datacenter_map \
1572 if str(vnfd_member_index
) == str(vnf
.member_vnf_index_ref
)]
1573 if vim_accounts
and vim_accounts
[0]:
1574 return vim_accounts
[0]
1575 return self
._datacenter
_name
1577 def _get_constituent_vnfd_msg(self
, vnf_index
):
1578 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1579 if const_vnfd
.member_vnf_index
== vnf_index
:
1582 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1584 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1585 """ Record an event """
1586 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1587 if state
is not None:
1588 self
.set_state(state
)
1590 def scaling_trigger_str(self
, trigger
):
1591 SCALING_TRIGGER_STRS
= {
1592 NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1593 NsdBaseYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1594 NsdBaseYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1595 NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1598 return SCALING_TRIGGER_STRS
[trigger
]
1599 except Exception as e
:
1600 self
._log
.error("Scaling trigger mapping error for {} : {}".
1602 self
._log
.exception(e
)
1603 return "Unknown trigger"
1605 def generate_ssh_key_pair(self
, config_xact
):
1606 '''Generate a ssh key pair if required'''
1607 if self
._ssh
_key
_file
:
1608 self
._log
.debug("Key pair already generated")
1612 for cv
in self
.nsd_msg
.constituent_vnfd
:
1613 vnfd
= self
._get
_vnfd
(cv
.vnfd_id_ref
, config_xact
)
1614 if vnfd
and vnfd
.mgmt_interface
.ssh_key
:
1622 key
= ManoSshKey(self
._log
)
1623 path
= tempfile
.mkdtemp()
1624 key
.write_to_disk(name
=self
.id, directory
=path
)
1625 self
._ssh
_key
_file
= "file://{}".format(key
.private_key_file
)
1626 self
._ssh
_pub
_key
= key
.public_key
1627 except Exception as e
:
1628 self
._log
.exception("Error generating ssh key for {}: {}".
1629 format(self
.nsr_cfg_msg
.name
, e
))
1632 def instantiate_vls(self
):
1634 This function instantiates VLs for every VL in this Network Service
1636 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1638 for vlr_id
, vlr
in self
._vlrs
.items():
1639 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1641 if not isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
):
1642 self
._vls
_ready
.set()
1644 # Wait for the VLs to be ready before yielding control out
1645 self
._log
.debug("Waitng for %d VLs in NSR id %s to be active",
1646 len(self
._vlrs
), self
.id)
1648 self
._log
.debug("NSR id:%s, name:%s - Waiting for %d VLs to be ready",
1649 self
.id, self
.name
, len(self
._vlrs
))
1650 yield from self
._vls
_ready
.wait()
1652 self
._log
.debug("NSR id:%s, name:%s, No virtual links found",
1654 self
._vls
_ready
.set()
1656 self
._log
.info("All %d VLs in NSR id %s are active, start the VNFs",
1657 len(self
._vlrs
), self
.id)
1659 def create(self
, config_xact
):
1660 """ Create this network service"""
1661 self
._log
.debug("Create NS {} for {}".format(self
.name
, self
._project
.name
))
1662 # Create virtual links for all the external vnf
1663 # connection points in this NS
1664 yield from self
.create_vls()
1666 # Create VNFs in this network service
1667 yield from self
.create_vnfs(config_xact
)
1669 # Create VNFFG for network service
1670 self
.create_vnffgs()
1672 # Create Scaling Groups for each scaling group in NSD
1673 self
.create_scaling_groups()
1675 # Create Parameter Pools
1676 self
.create_param_pools()
1679 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1680 """ Apply config based on script for scale group """
1681 rift_var_root_dir
= os
.environ
['RIFT_VAR_ROOT']
1684 def add_vnfrs_data(vnfrs_list
):
1685 """ Add as a dict each of the VNFRs data """
1688 for vnfr
in vnfrs_list
:
1689 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1691 vnfr_data
['name'] = vnfr
.name
1692 if trigger
in [NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
,
1693 NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1694 # Get VNF management and other IPs, etc
1695 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1696 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1698 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1699 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1700 vnfr_data
['member_vnf_index_ref'] = opdata
.member_vnf_index_ref
1701 vnfr_data
['vdur_data'] = []
1702 for vdur
in opdata
.vdur
:
1704 vdur_data
['vm_name'] = vdur
.name
1705 vdur_data
['vm_mgmt_ip'] = vdur
.vm_management_ip
1706 vnfr_data
['vdur_data'].append(vdur_data
)
1707 except Exception as e
:
1708 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1709 format(vnfr
.name
, e
))
1712 vnfr_data
['connection_points'] = []
1713 for cp
in opdata
.connection_point
:
1715 con_pt
['name'] = cp
.name
1716 con_pt
['ip_address'] = cp
.ip_address
1717 vnfr_data
['connection_points'].append(con_pt
)
1718 except Exception as e
:
1719 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1720 format(vnfr
.name
, e
))
1722 vnfrs_data
.append(vnfr_data
)
1723 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1727 def add_nsr_data(nsr
):
1729 nsr_data
['name'] = nsr
.name
1732 if script
is None or len(script
) == 0:
1733 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1736 if script
[0] == '/':
1739 path
= os
.path
.join(rift_var_root_dir
,
1740 'launchpad/packages/nsd',
1742 self
.nsd_id
, 'scripts',
1745 if not os
.path
.exists(path
):
1746 self
._log
.error("Config failed for scale group {}: Script does not exist at {}".
1747 format(group
.name
, path
))
1750 # Build a YAML file with all parameters for the script to execute
1751 # The data consists of 5 sections
1753 # 2. Scale group config
1754 # 3. VNFRs in the scale group
1755 # 4. VNFRs outside scale group
1758 data
['trigger'] = group
.trigger_map(trigger
)
1759 data
['config'] = group
.group_msg
.as_dict()
1762 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1764 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1766 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1767 data
["nsr"] = add_nsr_data(self
)
1770 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1771 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1774 self
._log
.debug("Creating a temp file: {} with input data: {}".
1775 format(tmp_file
.name
, data
))
1777 cmd
= "{} {}".format(path
, tmp_file
.name
)
1778 self
._log
.debug("Running the CMD: {}".format(cmd
))
1779 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1780 rc
= yield from proc
.wait()
1782 self
._log
.error("The script {} for scale group {} config returned: {}".
1783 format(script
, group
.name
, rc
))
1791 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1792 """ Apply the config for the scaling group based on trigger """
1793 if group
is None or scale_instance
is None:
1797 def update_config_status(success
=True, err_msg
=None):
1799 We are trying to determine the scaling instance's config status
1800 as a collation of the config status associated with 4 different triggers
1802 self
._log
.debug("Update %s scaling config status to %r : %s",
1803 scale_instance
, success
, err_msg
)
1804 if (scale_instance
.config_status
== "failed"):
1805 # Do not update the config status if it is already in failed state
1808 if scale_instance
.config_status
== "configured":
1809 # Update only to failed state an already configured scale instance
1811 scale_instance
.config_status
= "failed"
1812 scale_instance
.config_err_msg
= err_msg
1813 yield from self
.update_state()
1815 # We are in configuring state
1816 # Only after post scale out mark instance as configured
1817 if trigger
== NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
:
1819 scale_instance
.config_status
= "configured"
1820 for vnfr
in scale_instance
.vnfrs
:
1821 if vnfr
.config_status
== "configuring":
1822 vnfr
.vnfr_msg
.config_status
= "configured"
1823 yield from vnfr
.update_vnfm()
1825 scale_instance
.config_status
= "failed"
1826 scale_instance
.config_err_msg
= err_msg
1828 yield from self
.update_state()
1829 # Publish config state as update_state seems to care only operational status
1830 yield from self
.publish()
1832 config
= group
.trigger_config(trigger
)
1834 if trigger
== NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
:
1835 self
._log
.debug("No config needed, update %s scaling config status to configured",
1837 scale_instance
.config_status
= "configured"
1840 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1841 if config
.has_field("ns_service_primitive_name_ref"):
1842 config_name
= config
.ns_service_primitive_name_ref
1843 nsd_msg
= self
.nsd_msg
1844 config_primitive
= None
1845 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1846 if ns_cfg_prim
.name
== config_name
:
1847 config_primitive
= ns_cfg_prim
1850 if config_primitive
is None:
1851 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1853 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1854 if config_primitive
.has_field("user_defined_script"):
1855 script_path
= '/'.join(["launchpad/packages/nsd", self
._project
.name
, nsd_msg
.id, "scripts", config_primitive
.user_defined_script
])
1856 rc
= yield from self
.apply_scale_group_config_script(script_path
,
1857 group
, scale_instance
, trigger
, vnfrs
)
1860 err_msg
= "Failed config for trigger {} using config script '{}'". \
1861 format(self
.scaling_trigger_str(trigger
),
1862 config_primitive
.user_defined_script
)
1863 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1866 err_msg
= "Failed config for trigger {} as config script is not specified". \
1867 format(self
.scaling_trigger_str(trigger
))
1868 yield from update_config_status(success
=False, err_msg
=err_msg
)
1869 raise NotImplementedError("Only script based config support for scale group for now: {}".
1872 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1873 format(self
.scaling_trigger_str(trigger
))
1874 yield from update_config_status(success
=False, err_msg
=err_msg
)
1875 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1879 def create_scaling_groups(self
):
1880 """ This function creates a NSScalingGroup for every scaling
1881 group defined in he NSD"""
1883 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1884 self
._log
.debug("Found scaling_group %s in nsr id %s",
1885 scaling_group_msg
.name
, self
.id)
1887 group_record
= scale_group
.ScalingGroup(
1892 self
._scaling
_groups
[group_record
.name
] = group_record
1895 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1896 group
= self
._scaling
_groups
[group_name
]
1897 scale_instance
= group
.create_instance(index
, is_default
)
1901 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1902 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1905 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1906 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1907 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1909 datacenter_name
= self
._get
_vnfd
_datacenter
(const_vnfd_msg
.member_vnf_index
)
1910 if datacenter_name
is None:
1911 datacenter_name
= self
._datacenter
_name
1912 for _
in range(count
):
1913 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, datacenter_name
, group_name
, index
)
1914 scale_instance
.add_vnfr(vnfr
)
1919 def instantiate_instance():
1920 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1921 vnfrs
= yield from create_vnfs()
1922 yield from self
.publish()
1924 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1925 scale_instance
.operational_status
= "vnf_init_phase"
1926 yield from self
.update_state()
1929 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1930 group
, scale_instance
, vnfrs
)
1932 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1933 format(group
.name
, index
))
1934 scale_instance
.operational_status
= "failed"
1936 yield from self
.instantiate_vnfs(vnfrs
, scaleout
=True)
1939 except Exception as e
:
1940 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1941 format(group
.name
, e
))
1942 self
._log
.exception(e
)
1943 scale_instance
.operational_status
= "failed"
1945 yield from self
.update_state()
1947 yield from instantiate_instance()
1950 def delete_scale_group_instance(self
, group_name
, index
):
1951 group
= self
._scaling
_groups
[group_name
]
1952 scale_instance
= group
.get_instance(index
)
1953 if scale_instance
.is_default
:
1954 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1956 scale_instance
.operational_status
= "terminate"
1957 yield from self
.update_state()
1960 def terminate_instance():
1961 self
._log
.debug("Terminating scaling instance %s VNFRS" % scale_instance
)
1962 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.PRE_SCALE_IN
,
1963 group
, scale_instance
)
1965 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1966 format(group
.name
, index
))
1968 # Going ahead with terminate, even if there is an error in pre-scale-in config
1969 # as this could be result of scale out failure and we need to cleanup this group
1970 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
, scalein
=True)
1971 group
.delete_instance(index
)
1973 scale_instance
.operational_status
= "vnf_terminate_phase"
1974 yield from self
.update_state()
1976 yield from terminate_instance()
1979 def _update_scale_group_instances_status(self
):
1981 def post_scale_out_task(group
, instance
):
1982 # Apply post scale out config once all VNFRs are active
1983 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.POST_SCALE_OUT
,
1985 instance
.operational_status
= "running"
1987 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1988 format(group
.name
, instance
.instance_id
))
1990 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1991 format(group
.name
, instance
.instance_id
))
1993 yield from self
.update_state()
1995 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1996 for group
, instances
in group_instances
.items():
1997 self
._log
.debug("Updating %s instance status", group
)
1998 for instance
in instances
:
1999 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
2000 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
2001 if instance
.operational_status
== "vnf_init_phase":
2002 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
2003 instance
.operational_status
= "running"
2005 # Create a task for post scale out to allow us to sleep before attempting
2006 # to configure newly created VM's
2007 self
._loop
.create_task(post_scale_out_task(group
, instance
))
2009 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
2010 self
._log
.debug("Scale out for group {} and instance {} failed".
2011 format(group
.name
, instance
.instance_id
))
2012 instance
.operational_status
= "failed"
2014 elif instance
.operational_status
== "vnf_terminate_phase":
2015 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
2016 instance
.operational_status
= "terminated"
2017 rc
= yield from self
.apply_scaling_group_config(NsdBaseYang
.ScalingTrigger
.POST_SCALE_IN
,
2020 self
._log
.debug("Scale in for group {} and instance {} succeeded".
2021 format(group
.name
, instance
.instance_id
))
2023 self
._log
.error("Post scale in config for scale group {} ({}) failed".
2024 format(group
.name
, instance
.instance_id
))
2026 def create_vnffgs(self
):
2027 """ This function creates VNFFGs for every VNFFG in the NSD
2028 associated with this NSR"""
2030 for vnffgd
in self
.nsd_msg
.vnffgd
:
2031 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
2032 vnffgr
= VnffgRecord(self
._dts
,
2035 self
._nsm
._vnffgmgr
,
2039 self
._sdn
_account
_name
,
2040 self
._datacenter
_name
2042 self
._vnffgrs
[vnffgr
.id] = vnffgr
2044 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
2045 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
2046 if not vld
.has_field('ip_profile_ref'):
2048 profile
= [profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
2049 return profile
[0] if profile
else None
2052 def _create_vls(self
, vld
, datacenter
):
2053 """Create a VLR in the cloud account specified using the given VLD
2057 datacenter : Cloud account name
2062 vlr
= yield from VirtualLinkRecord
.create_record(
2070 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
2072 restart_mode
=self
.restart_mode
)
2076 def _extract_datacenters_for_vl(self
, vld
):
2078 Extracts the list of cloud accounts from the NS Config obj
2081 1. Cloud accounts based connection point (vnf_datacenter_map)
2083 vld : VLD yang object
2088 datacenter_list
= []
2090 if self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
2091 # Handle case where datacenter is None
2092 vnf_datacenter_map
= {}
2093 for vnf
in self
._nsr
_cfg
_msg
.vnf_datacenter_map
:
2094 if vnf
.datacenter
is not None or vnf
.datacenter
is not None:
2095 vnf_datacenter_map
[vnf
.member_vnf_index_ref
] = \
2098 for vnfc
in vld
.vnfd_connection_point_ref
:
2099 datacenter
= vnf_datacenter_map
.get(
2100 vnfc
.member_vnf_index_ref
, self
._datacenter
_name
)
2102 datacenter_list
.append(datacenter
)
2104 if self
._nsr
_cfg
_msg
.vl_datacenter_map
:
2105 for vld_map
in self
._nsr
_cfg
_msg
.vl_datacenter_map
:
2106 if vld_map
.vld_id_ref
== vld
.id:
2107 for datacenter
in vld_map
.datacenters
:
2108 datacenter_list
.append(datacenter
)
2110 # If no config has been provided then fall-back to the default
2112 if not datacenter_list
:
2113 datacenter_list
.append(self
._datacenter
_name
)
2115 self
._log
.debug("VL {} data center list: {}".
2116 format(vld
.name
, datacenter_list
))
2117 return set(datacenter_list
)
2120 def create_vls(self
):
2121 """ This function creates VLs for every VLD in the NSD
2122 associated with this NSR"""
2123 for vld
in self
.nsd_msg
.vld
:
2125 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
2126 datacenter_list
= self
._extract
_datacenters
_for
_vl
(vld
)
2127 for datacenter
in datacenter_list
:
2128 vlr
= yield from self
._create
_vls
(vld
, datacenter
)
2129 self
._vlrs
[vlr
.id] = vlr
2130 self
._nsm
.add_vlr_id_nsr_map(vlr
.id, self
)
2133 def create_vl_instance(self
, vld
):
2134 self
._log
.error("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
2135 # Check if the VL is already present
2137 for vl_id
, vl
in self
._vlrs
.items():
2138 if vl
.vld_msg
.id == vld
.id:
2139 self
._log
.error("The VLD %s already in NSR %s as VLR %s with status %s",
2140 vld
.id, self
.id, vl
.id, vl
.state
)
2142 if vlr
.state
!= VlRecordState
.TERMINATED
:
2143 err_msg
= "VLR for VL {} in NSR {} already instantiated". \
2144 format(vld
, self
.id)
2145 self
._log
.error(err_msg
)
2146 raise NsrVlUpdateError(err_msg
)
2150 datacenter_list
= self
._extract
_datacenters
_for
_vl
(vld
)
2151 for datacenter
in datacenter_list
:
2152 vlr
= yield from self
._create
_vls
(vld
, account
, datacenter
)
2153 self
._vlrs
[vlr
.id] = vlr
2154 self
._nsm
.add_vlr_id_nsr_map(vlr
.id, self
)
2156 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
2157 yield from self
.update_state()
2160 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
2162 except Exception as e
:
2163 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
2164 format(self
.id, vld
.id, e
)
2165 self
._log
.error(err_msg
)
2166 self
._log
.exception(e
)
2167 vlr
.state
= VlRecordState
.FAILED
2169 yield from self
.update_state()
2172 def delete_vl_instance(self
, vld
):
2173 for vlr_id
, vlr
in self
._vlrs
.items():
2174 if vlr
.vld_msg
.id == vld
.id:
2175 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
2176 vlr
.id, vld
.id, self
.id)
2177 vlr
.state
= VlRecordState
.TERMINATE_PENDING
2178 yield from self
.update_state()
2181 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2182 vlr
.state
= VlRecordState
.TERMINATED
2184 self
.remove_vlr_id_nsr_map(vlr
.id)
2186 except Exception as e
:
2187 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
2188 format(self
.id, vld
.id, e
)
2189 self
._log
.error(err_msg
)
2190 self
._log
.exception(e
)
2191 vlr
.state
= VlRecordState
.FAILED
2193 yield from self
.update_state()
2197 def create_vnfs(self
, config_xact
):
2199 This function creates VNFs for every VNF in the NSD
2200 associated with this NSR
2202 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
2203 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2205 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
2206 if not const_vnfd
.start_by_default
:
2207 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
2208 const_vnfd
.member_vnf_index
)
2211 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
2212 datacenter_name
= self
._get
_vnfd
_datacenter
(const_vnfd
.member_vnf_index
)
2213 if datacenter_name
is None:
2214 datacenter_name
= self
._datacenter
_name
2215 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, datacenter_name
)
2217 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
2218 placement_groups
= []
2219 for group
in self
.nsd_msg
.placement_groups
:
2220 for member_vnfd
in group
.member_vnfd
:
2221 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
2222 (member_vnfd
.member_vnf_index_ref
== str(const_vnfd
.member_vnf_index
)):
2223 group_info
= self
.resolve_placement_group_cloud_construct(group
)
2224 if group_info
is None:
2225 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
2226 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2228 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2231 const_vnfd
.member_vnf_index
)
2232 placement_groups
.append(group_info
)
2233 return placement_groups
2235 def get_cloud_config(self
):
2236 cloud_config
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_CloudConfig()
2237 self
._log
.debug("Received key pair is {}".format(self
._key
_pairs
))
2239 for authorized_key
in self
.nsr_cfg_msg
.ssh_authorized_key
:
2240 if authorized_key
.key_pair_ref
in self
._key
_pairs
:
2241 key_pair
= cloud_config
.key_pair
.add()
2242 key_pair
.from_dict(self
._key
_pairs
[authorized_key
.key_pair_ref
].as_dict())
2243 for nsd_key_pair
in self
.nsd_msg
.key_pair
:
2244 key_pair
= cloud_config
.key_pair
.add()
2245 key_pair
.from_dict(key_pair
.as_dict())
2246 for nsr_cfg_user
in self
.nsr_cfg_msg
.user
:
2247 user
= cloud_config
.user
.add()
2248 user
.name
= nsr_cfg_user
.name
2249 user
.user_info
= nsr_cfg_user
.user_info
2250 for ssh_key
in nsr_cfg_user
.ssh_authorized_key
:
2251 if ssh_key
.key_pair_ref
in self
._key
_pairs
:
2252 key_pair
= user
.key_pair
.add()
2253 key_pair
.from_dict(self
._key
_pairs
[ssh_key
.key_pair_ref
].as_dict())
2254 for nsd_user
in self
.nsd_msg
.user
:
2255 user
= cloud_config
.user
.add()
2256 user
.from_dict(nsd_user
.as_dict())
2258 self
._log
.debug("Formed cloud-config msg is {}".format(cloud_config
))
2262 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, datacenter_name
, group_name
=None, group_instance_id
=None):
2263 # Fetch the VNFD associated with this VNF
2264 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
2265 cloud_config
= self
.get_cloud_config()
2266 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,datacenter_name
)
2267 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2269 const_vnfd
.member_vnf_index
,
2270 [ group
.name
for group
in placement_groups
])
2272 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
2287 restart_mode
=self
.restart_mode
,
2289 if vnfr
.id in self
._vnfrs
:
2290 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
2291 raise NetworkServiceRecordError(err
)
2293 self
._vnfrs
[vnfr
.id] = vnfr
2294 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
2296 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2298 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2304 def create_param_pools(self
):
2305 for param_pool
in self
.nsd_msg
.parameter_pool
:
2306 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2308 start_value
= param_pool
.range.start_value
2309 end_value
= param_pool
.range.end_value
2310 if end_value
< start_value
:
2311 raise NetworkServiceRecordError(
2312 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2313 start_value
, end_value
2317 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2320 range(start_value
, end_value
)
2324 def fetch_vnfr(self
, vnfr_path
):
2325 """ Fetch VNFR record """
2327 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2329 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2331 for ent
in res_iter
:
2332 res
= yield from ent
2338 def instantiate_vnfs(self
, vnfrs
, scaleout
=False):
2340 This function instantiates VNFs for every VNF in this Network Service
2343 def instantiate_vnf(vnf
):
2344 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2345 vnfd_id
= vnf
.vnfr_msg
.vnfd
.id
2346 for dependency_vnf
in dependencies
[vnfd_id
]:
2347 while dependency_vnf
not in self
.instantiated
:
2348 yield from asyncio
.sleep(1, loop
=self
._loop
)
2350 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
,scaleout
)
2351 self
.instantiated
.add(vnfd_id
)
2353 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2354 dependencies
= collections
.defaultdict(list)
2355 for dependency_vnf
in self
._nsr
_cfg
_msg
.nsd
.vnf_dependency
:
2356 dependencies
[dependency_vnf
.vnf_source_ref
].append(dependency_vnf
.vnf_depends_on_ref
)
2358 # The dictionary copy is to ensure that if a terminate is initiated right after instantiation, the
2359 # Runtime error for "dictionary changed size during iteration" does not occur.
2360 # vnfrs - 'dict_values' object
2361 # vnfrs_copy - list object
2362 vnfrs_copy
= list(vnfrs
)
2364 for vnf
in vnfrs_copy
:
2365 vnf_task
= self
._loop
.create_task(instantiate_vnf(vnf
))
2366 tasks
.append(vnf_task
)
2369 self
._log
.debug("Waiting for %s instantiate_vnf tasks to complete", len(tasks
))
2370 done
, pending
= yield from asyncio
.wait(tasks
, loop
=self
._loop
, timeout
=30)
2372 self
._log
.error("The Instantiate vnf task timed out after 30 seconds.")
2373 raise VirtualNetworkFunctionRecordError("Task tied out : ", pending
)
2376 def instantiate_vnffgs(self
):
2378 This function instantiates VNFFGs for every VNFFG in this Network Service
2380 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2381 len(self
.nsd_msg
.vnffgd
), self
.id)
2382 for _
, vnfr
in self
.vnfrs
.items():
2383 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2384 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2385 yield from asyncio
.sleep(2, loop
=self
._loop
)
2386 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2387 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2390 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2391 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2394 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2395 yield from asyncio
.sleep(90, loop
=self
._loop
)
2396 self
._log
.info("Starting VNFFG orchestration")
2397 for vnffg
in self
._vnffgrs
.values():
2398 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2399 yield from vnffg
.instantiate()
2402 def instantiate_scaling_instances(self
, config_xact
):
2403 """ Instantiate any default scaling instances in this Network Service """
2404 for group
in self
._scaling
_groups
.values():
2405 for i
in range(group
.min_instance_count
):
2406 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2407 yield from self
.create_scale_group_instance(
2408 group
.name
, i
, config_xact
, is_default
=True
2411 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2412 if group_msg
.scaling_group_name_ref
!= group
.name
:
2415 for instance
in group_msg
.instance
:
2416 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2417 yield from self
.create_scale_group_instance(
2418 group
.name
, instance
.id, config_xact
, is_default
=False
2421 def has_scaling_instances(self
):
2422 """ Return boolean indicating if the network service has default scaling groups """
2423 for group
in self
._scaling
_groups
.values():
2424 if group
.min_instance_count
> 0:
2427 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2428 if len(group_msg
.instance
) > 0:
2435 """ This function publishes this NSR """
2437 self
._nsr
_msg
= self
.create_msg()
2439 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2443 if self
._debug
_running
:
2444 self
._log
.debug("Publishing NSR in RUNNING state!")
2447 yield from self
._nsm
.nsr_handler
.update(None, self
.nsr_xpath
, self
._nsr
_msg
)
2448 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2449 self
._debug
_running
= True
2452 def unpublish(self
, xact
=None):
2453 """ Unpublish this NSR object """
2454 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2456 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2459 def nsr_xpath(self
):
2460 """ Returns the xpath associated with this NSR """
2461 return self
._project
.add_project((
2462 "D,/nsr:ns-instance-opdata" +
2463 "/nsr:nsr[nsr:ns-instance-config-ref={}]"
2464 ).format(quoted_key(self
.id)))
2467 def xpath_from_nsr(nsr
):
2468 """ Returns the xpath associated with this NSR op data"""
2469 return (NetworkServiceRecord
.XPATH
+
2470 "[nsr:ns-instance-config-ref={}]").format(quoted_key(nsr
.id))
2473 def nsd_xpath(self
):
2474 """ Return NSD config xpath."""
2475 return self
._project
.add_project((
2476 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id={}]"
2477 ).format(quoted_key(self
.nsd_id
)))
2480 def instantiate(self
, config_xact
):
2481 """"Instantiates a NetworkServiceRecord.
2483 This function instantiates a Network service
2484 which involves the following steps,
2486 * Instantiate every VL in NSD by sending create VLR request to DTS.
2487 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2488 * Publish the NSR details to DTS
2491 nsr: The NSR configuration request containing nsr-id and nsd
2492 config_xact: The configuration transaction which initiated the instatiation
2495 NetworkServiceRecordError if the NSR creation fails
2501 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2503 # Move the state to INIITALIZING
2504 self
.set_state(NetworkServiceRecordState
.INIT
)
2506 event_descr
= "Instantiation Request Received NSR Id: %s, NS Name: %s" % (self
.id, self
.name
)
2507 self
.record_event("instantiating", event_descr
)
2510 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2512 # Merge any config and initial config primitive values
2513 self
.config_store
.merge_nsd_config(self
.nsd_msg
, self
._project
.name
)
2514 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2516 event_descr
= "Fetched NSD with descriptor id %s, NS Name: %s" % (self
.nsd_id
, self
.name
)
2517 self
.record_event("nsd-fetched", event_descr
)
2519 if self
._nsd
is None:
2520 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2521 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2522 raise NetworkServiceRecordError(self
)
2524 self
._log
.debug("Got nsd result %s", self
._nsd
)
2526 # Substitute any input parameters
2527 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2530 yield from self
.create(config_xact
)
2532 # Publish the NSR to DTS
2533 yield from self
.publish()
2536 def do_instantiate():
2538 Instantiate network service
2540 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2541 self
.id, self
.nsd_id
)
2543 # instantiate the VLs
2544 event_descr
= ("Instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2545 (len(self
.nsd_msg
.vld
), self
.id, self
.name
))
2546 self
.record_event("begin-external-vls-instantiation", event_descr
)
2548 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2550 # Publish the NSR to DTS
2551 yield from self
.publish()
2553 if self
._ns
_terminate
_received
:
2554 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-external-vls-instantiation.")
2555 # Setting this flag as False again as this is a state where neither VL or VNF have been instantiated.
2556 self
._ns
_terminate
_received
= False
2557 # At this stage only ns-instance opdata is published. Cleaning up the record.
2558 yield from self
.unpublish()
2561 yield from self
.instantiate_vls()
2563 event_descr
= ("Finished instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2564 (len(self
.nsd_msg
.vld
), self
.id, self
.name
))
2565 self
.record_event("end-external-vls-instantiation", event_descr
)
2567 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2569 # Publish the NSR to DTS
2570 yield from self
.publish()
2572 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2573 self
.id, self
.nsd_id
)
2575 # instantiate the VNFs
2576 event_descr
= ("Instantiating %s VNFS for NSR id: %s, NS Name: %s " %
2577 (len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
))
2579 self
.record_event("begin-vnf-instantiation", event_descr
)
2581 if self
._ns
_terminate
_received
:
2582 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : end-external-vls-instantiation.")
2585 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2587 self
._log
.debug(" Finished instantiating %d VNFs for NSR id: %s, NS Name: %s",
2588 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
)
2590 event_descr
= ("Finished instantiating %s VNFs for NSR id: %s, NS Name: %s" %
2591 (len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
.name
))
2592 self
.record_event("end-vnf-instantiation", event_descr
)
2594 # Publish the NSR to DTS
2595 yield from self
.publish()
2597 if len(self
.vnffgrs
) > 0:
2598 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2599 event_descr
= ("Instantiating %s VNFFGS for NSR id: %s, NS Name: %s" %
2600 (len(self
.nsd_msg
.vnffgd
), self
.id, self
.name
))
2602 self
.record_event("begin-vnffg-instantiation", event_descr
)
2604 if self
._ns
_terminate
_received
:
2605 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-vnffg-instantiation.")
2608 yield from self
.instantiate_vnffgs()
2610 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id: %s, NS Name: %s" %
2611 (len(self
.nsd_msg
.vnffgd
), self
.id, self
.name
))
2612 self
.record_event("end-vnffg-instantiation", event_descr
)
2614 if self
.has_scaling_instances():
2615 event_descr
= ("Instantiating %s Scaling Groups for NSR id: %s, NS Name: %s" %
2616 (len(self
._scaling
_groups
), self
.id, self
.name
))
2618 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2620 if self
._ns
_terminate
_received
:
2621 self
._log
.debug("Terminate Received. Interrupting Instantiation at event : begin-scaling-group-instantiation.")
2624 yield from self
.instantiate_scaling_instances(config_xact
)
2625 self
.record_event("end-scaling-group-instantiation", event_descr
)
2627 # Give the plugin a chance to deploy the network service now that all
2628 # virtual links and vnfs are instantiated
2629 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2631 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s], for NS[%s]",
2632 self
.id, self
.nsd_id
, self
.name
)
2634 # Publish the NSR to DTS
2635 yield from self
.publish()
2637 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s], for NS[%s]",
2638 self
.id, self
.nsd_id
, self
.name
)
2640 def on_instantiate_done(fut
):
2641 # If the do_instantiate fails, then publish NSR with failed result
2644 import traceback
, sys
2645 print(traceback
.format_exception(None,e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
2646 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(e
))
2647 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(e
)))
2649 instantiate_task
= self
._loop
.create_task(do_instantiate())
2650 instantiate_task
.add_done_callback(on_instantiate_done
)
2653 def set_config_status(self
, status
, status_details
=None):
2654 if self
.config_status
!= status
:
2655 self
._log
.debug("Updating NSR {} status for {} to {}".
2656 format(self
.name
, self
.config_status
, status
))
2657 self
._config
_status
= status
2658 self
._config
_status
_details
= status_details
2660 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2661 self
.record_event("config-failed", "NS configuration failed",
2662 evt_details
=self
._config
_status
_details
)
2664 yield from self
.publish()
2666 if status
== NsrYang
.ConfigStates
.TERMINATE
:
2667 yield from self
.terminate_ns_cont()
2670 def is_active(self
):
2671 """ This NS is active """
2672 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2676 # Publish the NSR to DTS
2677 self
._log
.debug("Network service %s is active ", self
.id)
2678 self
._is
_active
= True
2680 event_descr
= "NSR in running state for NSR id: %s, NS Name: %s" % (self
.id, self
.name
)
2681 self
.record_event("ns-running", event_descr
)
2683 yield from self
.publish()
2686 def instantiation_failed(self
, failed_reason
=None):
2687 """ The NS instantiation failed"""
2688 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2690 self
.set_state(NetworkServiceRecordState
.FAILED
)
2691 self
._is
_failed
= True
2693 event_descr
= "Instantiation of NS %s - %s failed" % (self
.id, self
.name
)
2694 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2696 # Publish the NSR to DTS
2697 yield from self
.publish()
2700 def terminate_vnfrs(self
, vnfrs
, scalein
=False):
2701 """ Terminate VNFRS in this network service """
2702 self
._log
.debug("Terminating VNFs in network service %s - %s", self
.id, self
.name
)
2705 for vnfr
in list(vnfrs
):
2706 self
._log
.debug("Terminating VNFs in network service %s %s", vnfr
.id, self
.id)
2707 # The below check is added for determining which of the VNFRS are scaling ones
2708 # under OPENMANO. Need to pass scalein True when terminate received to OPENAMNO
2710 if isinstance(self
.nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
2711 for scaling_group
in self
._scaling
_groups
.values():
2712 scaling_instances
= scaling_group
.create_record_msg().instance
2713 for sc
in scaling_instances
:
2714 if vnfr
.id in sc
.vnfrs
:
2716 self
._log
.debug("Found a Scaling VNF for Openmano during Terminate")
2718 yield from self
.nsm_plugin
.terminate_vnf(self
, vnfr
, scalein
=scaleIn
)
2720 vnfr_ids
.append(vnfr
.id)
2722 for vnfr_id
in vnfr_ids
:
2723 self
._vnfrs
.pop(vnfr_id
, None)
2726 def terminate(self
):
2727 """Start terminate of a NetworkServiceRecord."""
2728 # Move the state to TERMINATE
2729 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2730 event_descr
= "Terminate being processed for NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2731 self
.record_event("terminate", event_descr
)
2732 self
._log
.debug("Terminating network service id: %s, NS Name: %s", self
.id, self
.name
)
2734 # Adding the NSR ID on terminate Evet. This will be checked to halt the instantiation if not already finished.
2735 self
._ns
_terminate
_received
= True
2737 yield from self
.publish()
2740 # IN case the instantiation failed, then trigger a cleanup immediately
2741 # don't wait for Cfg manager, as it will have no idea of this NSR.
2742 # Due to the failure
2743 yield from self
.terminate_ns_cont()
2747 def terminate_ns_cont(self
):
2748 """Config script related to terminate finished, continue termination"""
2749 def terminate_vnffgrs():
2750 """ Terminate VNFFGRS in this network service """
2751 self
._log
.debug("Terminating VNFFGRs in network service %s - %s", self
.id, self
.name
)
2752 for vnffgr
in self
.vnffgrs
.values():
2753 yield from vnffgr
.terminate()
2755 def terminate_vlrs():
2756 """ Terminate VLRs in this netork service """
2757 self
._log
.debug("Terminating VLs in network service %s - %s", self
.id, self
.name
)
2758 for vlr_id
, vlr
in self
.vlrs
.items():
2759 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2760 vlr
.state
= VlRecordState
.TERMINATED
2762 # Move the state to VNF_TERMINATE_PHASE
2763 self
._log
.debug("Terminating VNFFGs in NS ID: %s, NS Name: %s", self
.id, self
.name
)
2764 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2765 event_descr
= "Terminating VNFFGS in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2766 self
.record_event("terminating-vnffgss", event_descr
)
2767 yield from terminate_vnffgrs()
2769 # Move the state to VNF_TERMINATE_PHASE
2770 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2771 event_descr
= "Terminating VNFS in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2772 self
.record_event("terminating-vnfs", event_descr
)
2773 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2775 # Move the state to VL_TERMINATE_PHASE
2776 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2777 event_descr
= "Terminating VLs in NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2778 self
.record_event("terminating-vls", event_descr
)
2779 yield from terminate_vlrs()
2780 yield from self
.nsm_plugin
.terminate_ns(self
)
2781 # Remove the generated SSH key
2782 if self
._ssh
_key
_file
:
2783 p
= urlparse(self
._ssh
_key
_file
)
2785 path
= os
.path
.dirname(p
[2])
2786 self
._log
.debug("NSR {}: Removing keys in {}".format(self
.name
,
2788 shutil
.rmtree(path
, ignore_errors
=True)
2790 # Move the state to TERMINATED
2791 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2792 event_descr
= "Terminated NS Id: %s, NS Name: %s" % (self
.id, self
.name
)
2793 self
.record_event("terminated", event_descr
)
2795 # Unpublish the NSR record
2796 self
._log
.debug("Unpublishing the network service %s - %s", self
.id, self
.name
)
2797 yield from self
.unpublish()
2799 # Finaly delete the NS instance from this NS Manager
2800 self
._log
.debug("Deleting the network service %s - %s", self
.id, self
.name
)
2801 self
.nsm
.delete_nsr(self
.id)
2804 """"Enable a NetworkServiceRecord."""
2808 """"Disable a NetworkServiceRecord."""
2811 def map_config_status(self
):
2812 self
._log
.debug("Config status for ns {} is {}".
2813 format(self
.name
, self
._config
_status
))
2814 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2815 return 'configuring'
2816 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2820 def vl_phase_completed(self
):
2821 """ Are VLs created in this NS?"""
2822 return self
._vl
_phase
_completed
2824 def vnf_phase_completed(self
):
2825 """ Are VLs created in this NS?"""
2826 return self
._vnf
_phase
_completed
2828 def create_msg(self
):
2829 """ The network serice record as a message """
2830 nsr_dict
= {"ns_instance_config_ref": self
.id}
2831 nsr
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2832 #nsr.datacenter = self.cloud_account_name
2833 nsr
.sdn_account
= self
._sdn
_account
_name
2834 nsr
.name_ref
= self
.name
2835 nsr
.nsd_ref
= self
.nsd_id
2836 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2837 nsr
.operational_events
= self
._op
_status
.msg
2838 nsr
.operational_status
= self
._op
_status
.yang_str()
2839 nsr
.config_status
= self
.map_config_status()
2840 nsr
.config_status_details
= self
._config
_status
_details
2841 nsr
.create_time
= self
._create
_time
2842 nsr
.uptime
= int(time
.time()) - self
._create
_time
2844 # Added for OpenMano
2846 nsr
.orchestration_progress
.networks
.total
= len(self
.nsd_msg
.vld
)
2847 if isinstance(self
.nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
2848 # Taking the last update by OpenMano
2849 nsr
.orchestration_progress
.networks
.active
= self
.nsm_plugin
._openmano
_nsrs
[self
.id]._active
_nets
2851 nsr
.orchestration_progress
.networks
.active
= self
._active
_networks
2853 for vnfr_id
, vnfr
in self
._vnfrs
.items():
2854 no_of_vdus
+= len(vnfr
.vnfd
.vdu
)
2856 nsr
.orchestration_progress
.vms
.total
= no_of_vdus
2857 if isinstance(self
.nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
2858 # Taking the last update by OpenMano
2859 nsr
.orchestration_progress
.vms
.active
= self
.nsm_plugin
._openmano
_nsrs
[self
.id]._active
_vms
2861 nsr
.orchestration_progress
.vms
.active
= self
._active
_vms
2864 if self
._ssh
_pub
_key
:
2865 nsr
.ssh_key_generated
.private_key_file
= self
._ssh
_key
_file
2866 nsr
.ssh_key_generated
.public_key
= self
._ssh
_pub
_key
2868 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2869 cfg_prim
= RwNsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2871 nsr
.service_primitive
.append(cfg_prim
)
2873 for init_cfg
in self
.nsd_msg
.initial_service_primitive
:
2874 prim
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_InitialServicePrimitive
.from_dict(
2876 nsr
.initial_service_primitive
.append(prim
)
2878 for term_cfg
in self
.nsd_msg
.terminate_service_primitive
:
2879 prim
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_TerminateServicePrimitive
.from_dict(
2881 nsr
.terminate_service_primitive
.append(prim
)
2883 if self
.vl_phase_completed():
2884 for vlr_id
, vlr
in self
.vlrs
.items():
2885 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2887 if self
.vnf_phase_completed():
2888 for vnfr_id
in self
.vnfrs
:
2889 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2890 for vnffgr
in self
.vnffgrs
.values():
2891 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2892 for scaling_group
in self
._scaling
_groups
.values():
2893 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2897 def all_vnfs_active(self
):
2898 """ Are all VNFS in this NS active? """
2899 for _
, vnfr
in self
.vnfrs
.items():
2900 if vnfr
.active
is not True:
2905 def update_state(self
):
2906 """ Re-evaluate this NS's state """
2907 curr_state
= self
._op
_status
.state
2909 # This means that the terminate has been fired before the NS was UP.
2910 if self
._ns
_terminate
_received
:
2911 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
2912 self
._ns
_terminate
_received
= False
2913 yield from self
.terminate_ns_cont()
2915 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2916 self
._log
.debug("NS (%s - %s) in terminated state, not updating state", self
.id, self
.name
)
2919 new_state
= NetworkServiceRecordState
.RUNNING
2920 self
._log
.debug("Received update_state for nsr: %s, curr-state: %s",
2921 self
.id, curr_state
)
2924 if (isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
)):
2925 for vlr_id
, vl
in self
.vlrs
.items():
2926 self
._log
.debug("VLR %s state %s", vlr_id
, vl
.state
)
2927 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2929 elif vl
.state
== VlRecordState
.FAILED
:
2930 if vl
.prev_state
!= vl
.state
:
2931 event_descr
= "Instantiation of VL %s failed" % vl
.id
2932 event_error_details
= vl
.state_failed_reason
2933 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2934 vl
.prev_state
= vl
.state
2935 new_state
= NetworkServiceRecordState
.FAILED
2938 self
._log
.debug("VL already in failed state")
2940 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2941 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2944 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2945 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2948 # Check all the VNFRs are present
2949 if new_state
== NetworkServiceRecordState
.RUNNING
:
2950 for _
, vnfr
in self
.vnfrs
.items():
2951 self
._log
.debug("VNFR state %s", vnfr
.state
)
2952 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2954 for vnfr
in self
.vnfrs
:
2955 active_vdus
+= self
.nsm
._vnfrs
[vnfr
]._active
_vdus
2957 if self
._active
_vms
!= active_vdus
:
2958 self
._active
_vms
= active_vdus
2959 yield from self
.publish()
2963 elif vnfr
.state
== VnfRecordState
.FAILED
:
2964 if vnfr
._prev
_state
!= vnfr
.state
:
2965 event_descr
= "Instantiation of VNF %s for NS: %s failed" % (vnfr
.id, self
.name
)
2966 event_error_details
= vnfr
.state_failed_reason
2967 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2968 vnfr
.set_state(VnfRecordState
.FAILED
)
2970 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2971 vnfr
.state
, vnfr
._prev
_state
)
2972 new_state
= NetworkServiceRecordState
.FAILED
2975 self
._log
.debug("VNF %s in NSR %s - %s is still not active; current state is: %s",
2976 vnfr
.id, self
.id, self
.name
, vnfr
.state
)
2977 new_state
= curr_state
2979 # If new state is RUNNING; check VNFFGRs are also active
2980 if new_state
== NetworkServiceRecordState
.RUNNING
:
2981 for _
, vnffgr
in self
.vnffgrs
.items():
2982 self
._log
.debug("Checking vnffgr state for nsr %s is: %s",
2983 self
.id, vnffgr
.state
)
2984 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2986 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2987 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2988 self
.record_event("vnffg-failed", event_descr
)
2989 new_state
= NetworkServiceRecordState
.FAILED
2992 self
._log
.info("VNFFGR %s in NSR %s - %s is still not active; current state is: %s",
2993 vnffgr
.id, self
.id, self
.name
, vnffgr
.state
)
2994 new_state
= curr_state
2996 # Update all the scaling group instance operational status to
2997 # reflect the state of all VNFR within that instance
2998 yield from self
._update
_scale
_group
_instances
_status
()
3000 for _
, group
in self
._scaling
_groups
.items():
3001 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
3002 new_state
= NetworkServiceRecordState
.SCALING_OUT
3004 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
3005 new_state
= NetworkServiceRecordState
.SCALING_IN
3008 if new_state
!= curr_state
:
3009 self
._log
.debug("Changing state of Network service %s - %s from %s to %s",
3010 self
.id, self
.name
, curr_state
, new_state
)
3011 if new_state
== NetworkServiceRecordState
.RUNNING
:
3012 yield from self
.is_active()
3013 elif new_state
== NetworkServiceRecordState
.FAILED
:
3014 # If the NS is already active and we entered scaling_in, scaling_out,
3015 # do not mark the NS as failing if scaling operation failed.
3016 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
3017 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
3018 new_state
= NetworkServiceRecordState
.RUNNING
3019 self
.set_state(new_state
)
3021 yield from self
.instantiation_failed()
3023 self
.set_state(new_state
)
3025 yield from self
.publish()
3027 def vl_instantiation_state(self
):
3028 """ Check if all VLs in this NS are active """
3029 for vl_id
, vlr
in self
.vlrs
.items():
3030 if vlr
.state
== VlRecordState
.ACTIVE
:
3032 elif vlr
.state
== VlRecordState
.FAILED
:
3033 return VlRecordState
.FAILED
3034 elif vlr
.state
== VlRecordState
.TERMINATED
:
3035 return VlRecordState
.TERMINATED
3036 elif vlr
.state
== VlRecordState
.INSTANTIATION_PENDING
:
3037 return VlRecordState
.INSTANTIATION_PENDING
3039 self
._log
.error("vlr %s still in state %s", vlr
, vlr
.state
)
3040 raise VirtualLinkRecordError("Invalid state %s" %(vlr
.state
))
3041 return VlRecordState
.ACTIVE
3043 def vl_instantiation_successful(self
):
3044 """ Mark that all VLs in this NS are active """
3045 if self
._vls
_ready
.is_set():
3046 self
._log
.error("NSR id %s, vls_ready is already set", self
.id)
3048 if self
.vl_instantiation_state() == VlRecordState
.ACTIVE
:
3049 self
._log
.debug("NSR id %s, All %d vlrs are in active state %s",
3050 self
.id, len(self
.vlrs
), self
.vl_instantiation_state
)
3051 self
._vls
_ready
.set()
3053 def vlr_event(self
, vlr
, action
):
3054 self
._log
.debug("Received VLR %s with action:%s", vlr
, action
)
3056 if vlr
.id not in self
.vlrs
:
3057 self
._log
.error("VLR %s:%s received for unknown id, state:%s",
3058 vlr
.id, vlr
.name
, vlr
.operational_status
)
3061 vlr_local
= self
.vlrs
[vlr
.id]
3063 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3064 if vlr
.operational_status
== 'running':
3065 vlr_local
.set_state_from_op_status(vlr
.operational_status
)
3066 self
._active
_networks
+= 1
3067 self
._log
.info("VLR %s:%s moving to active state",
3069 elif vlr
.operational_status
== 'failed':
3070 vlr_local
.set_state_from_op_status(vlr
.operational_status
)
3071 vlr_local
.state_failed_reason
= vlr
.operational_status_details
3072 asyncio
.ensure_future(self
.update_state(), loop
=self
._loop
)
3073 self
._log
.info("VLR %s:%s moving to failed state",
3076 self
._log
.warning("VLR %s:%s received state:%s",
3077 vlr
.id, vlr
.name
, vlr
.operational_status
)
3079 if isinstance(self
.nsm_plugin
, rwnsmplugin
.RwNsPlugin
):
3080 self
.vl_instantiation_successful()
3082 # self.update_state() is responsible for publishing the NSR state. Its being called by vlr_event and update_vnfr.
3083 # The call from vlr_event occurs only if vlr reaches a failed state. Hence implementing the check here to handle
3084 # ns terminate received after other vlr states as vl-alloc-pending, vl-init, running.
3085 if self
._ns
_terminate
_received
:
3086 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
3087 if vlr
.operational_status
in ['running', 'failed']:
3088 self
._ns
_terminate
_received
= False
3089 asyncio
.ensure_future(self
.terminate_ns_cont(), loop
=self
._loop
)
3092 class InputParameterSubstitution(object):
3094 This class is responsible for substituting input parameters into an NSD.
3097 def __init__(self
, log
, project
):
3098 """Create an instance of InputParameterSubstitution
3101 log - a logger for this object to use
3105 self
.project
= project
3107 def _fix_xpath(self
, xpath
):
3108 # Fix the parameter.xpath to include project and correct namespace
3109 self
.log
.debug("Provided xpath: {}".format(xpath
))
3110 #Split the xpath at the /
3111 attrs
= xpath
.split('/')
3113 for attr
in attrs
[1:]:
3114 new_ns
= 'project-nsd'
3117 # Includes namespace
3118 ns
, name
= attr
.split(':', 2)
3120 ns
= "rw-project-nsd"
3122 new_xp
= new_xp
+ '/' + new_ns
+ ':' + name
3124 updated_xpath
= self
.project
.add_project(new_xp
)
3126 self
.log
.error("Updated xpath: {}".format(updated_xpath
))
3127 return updated_xpath
3129 def __call__(self
, nsd
, nsr_config
):
3130 """Substitutes input parameters from the NSR config into the NSD
3132 This call modifies the provided NSD with the input parameters that are
3133 contained in the NSR config.
3136 nsd - a GI NSD object
3137 nsr_config - a GI NSR config object
3140 if nsd
is None or nsr_config
is None:
3143 # Create a lookup of the xpath elements that this descriptor allows
3145 optional_input_parameters
= set()
3146 for input_parameter
in nsd
.input_parameter_xpath
:
3147 optional_input_parameters
.add(input_parameter
.xpath
)
3149 # Apply the input parameters to the descriptor
3150 if nsr_config
.input_parameter
:
3151 for param
in nsr_config
.input_parameter
:
3152 if param
.xpath
not in optional_input_parameters
:
3153 msg
= "tried to set an invalid input parameter ({})"
3154 self
.log
.error(msg
.format(param
.xpath
))
3158 "input-parameter:{} = {}".format(
3165 xp
= self
._fix
_xpath
(param
.xpath
)
3166 xpath
.setxattr(nsd
, xp
, param
.value
)
3168 except Exception as e
:
3169 self
.log
.exception(e
)
3172 class VnfInputParameterSubstitution(object):
3174 This class is responsible for substituting input parameters into a VNFD.
3177 def __init__(self
, log
, const_vnfd
, project
):
3178 """Create an instance of VnfInputParameterSubstitution
3181 log - a logger for this object to use
3182 const_vnfd - id refs for vnfs in a ns
3183 project - project for the VNFs
3187 self
.member_vnf_index
= const_vnfd
.member_vnf_index
3188 self
.vnfd_id_ref
= const_vnfd
.vnfd_id_ref
3189 self
.project
= project
3191 def __call__(self
, vnfr
, nsr_config
):
3192 """Substitutes vnf input parameters from the NSR config into the VNFD
3194 This call modifies the provided VNFD with the input parameters that are
3195 contained in the NSR config.
3198 vnfr - a GI VNFR object
3199 nsr_config - a GI NSR Config object
3203 def compose_xpath(xpath
, id):
3204 prefix
= "/rw-project:project[rw-project:name={}]".format(quoted_key(self
.project
.name
)) + \
3205 "/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vnfd/".format(quoted_key(id))
3207 suffix
= '/'.join(xpath
.split('/')[3:]).replace('vnfd', 'vnfr')
3208 return prefix
+ suffix
3210 def substitute_xpath(ip_xpath
, substitute_value
, vnfr
):
3211 vnfr_xpath
= compose_xpath(ip_xpath
, vnfr
.id)
3214 verify_xpath_wildcarded
= xpath
.getxattr(vnfr
, vnfr_xpath
)
3217 "vnf-input-parameter:{} = {}, for VNF : [member-vnf-index : {}, vnfd-id-ref : {}]".format(
3220 self
.member_vnf_index
,
3225 xpath
.setxattr(vnfr
, vnfr_xpath
, substitute_value
)
3227 except Exception as e
:
3228 self
.log
.exception(e
)
3230 except Exception as e
:
3231 self
.log
.exception("Wildcarded xpath {} is listy in nature. Can not update. Exception => {}"
3232 .format(ip_xpath
, e
))
3234 if vnfr
is None or nsr_config
is None:
3237 optional_input_parameters
= set()
3238 for input_parameter
in nsr_config
.nsd
.input_parameter_xpath
:
3239 optional_input_parameters
.add(input_parameter
.xpath
)
3241 # Apply the input parameters to the vnfr
3242 if nsr_config
.vnf_input_parameter
:
3243 for param
in nsr_config
.vnf_input_parameter
:
3244 if (param
.member_vnf_index_ref
== self
.member_vnf_index
and param
.vnfd_id_ref
== self
.vnfd_id_ref
):
3245 if param
.input_parameter
:
3246 for ip
in param
.input_parameter
:
3247 if ip
.xpath
not in optional_input_parameters
:
3248 msg
= "Substitution Failed. Tried to set an invalid vnf input parameter ({}) for vnf [member-vnf-index : {}, vnfd-id-ref : {}]"
3249 self
.log
.error(msg
.format(ip
.xpath
, self
.member_vnf_index
, self
.vnfd_id_ref
))
3253 substitute_xpath(ip
.xpath
, ip
.value
, vnfr
)
3254 except Exception as e
:
3255 self
.log
.exception(e
)
3257 self
.log
.debug("Substituting Xpaths with default Values")
3258 for input_parameter
in nsr_config
.nsd
.input_parameter_xpath
:
3259 if input_parameter
.default_value
is not None:
3261 if "vnfd-catalog" in input_parameter
.xpath
:
3262 substitute_xpath(input_parameter
.xpath
, input_parameter
.default_value
, vnfr
)
3263 except Exception as e
:
3264 self
.log
.exception(e
)
3267 class NetworkServiceDescriptor(object):
3269 Network service descriptor class
3272 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
3282 """ Returns nsd id """
3287 """ Returns name of nsd """
3288 return self
._nsd
.name
3292 """ Return the message associated with this NetworkServiceDescriptor"""
3296 def path_for_id(nsd_id
):
3297 """ Return path for the passed nsd_id"""
3298 return self
._nsm
._project
.add_project(
3299 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'".
3303 """ Return the message associated with this NetworkServiceDescriptor"""
3304 return NetworkServiceDescriptor
.path_for_id(self
.id)
3306 def update(self
, nsd
):
3307 """ Update the NSD descriptor """
3311 class NsdDtsHandler(object):
3312 """ The network service descriptor DTS handler """
3313 XPATH
= "C,/project-nsd:nsd-catalog/project-nsd:nsd"
3315 def __init__(self
, dts
, log
, loop
, nsm
):
3322 self
._project
= nsm
._project
3326 """ Return registration handle """
3331 """ Register for Nsd create/update/delete/read requests from dts """
3334 self
._log
.warning("DTS handler already registered for project {}".
3335 format(self
._project
.name
))
3338 def on_apply(dts
, acg
, xact
, action
, scratch
):
3339 """Apply the configuration"""
3340 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
3341 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
3345 # Create/Update an NSD record
3346 for cfg
in self
._regh
.get_xact_elements(xact
):
3347 # Only interested in those NSD cfgs whose ID was received in prepare callback
3348 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
3349 self
._nsm
.update_nsd(cfg
)
3352 # This can happen if we do the deregister
3353 # during project delete before this is called
3354 self
._log
.debug("No reg handle for {} for project {}".
3355 format(self
.__class
__, self
._project
.name
))
3357 scratch
.pop('nsds', None)
3359 return RwTypes
.RwStatus
.SUCCESS
3362 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3363 """ Prepare callback from DTS for NSD config """
3365 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
3368 fref
= ProtobufC
.FieldReference
.alloc()
3369 fref
.goto_whole_message(msg
.to_pbcm())
3371 if fref
.is_field_deleted():
3372 # Delete an NSD record
3373 self
._log
.debug("Deleting NSD with id %s", msg
.id)
3374 self
._nsm
.delete_nsd(msg
.id)
3376 # Add this NSD to scratch to create/update in apply callback
3377 nsds
= scratch
.setdefault('nsds', [])
3379 # acg._scratch['nsds'].append(msg.id)
3381 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3383 xpath
= self
._project
.add_project(NsdDtsHandler
.XPATH
)
3385 "Registering for NSD config using xpath: %s",
3389 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3390 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3391 # Need a list in scratch to store NSDs to create/update later
3392 # acg._scratch['nsds'] = list()
3393 self
._regh
= acg
.register(
3395 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3396 on_prepare
=on_prepare
)
3398 def deregister(self
):
3399 self
._log
.debug("De-register NSD handler for project {}".
3400 format(self
._project
.name
))
3402 self
._regh
.deregister()
3406 class VnfdDtsHandler(object):
3407 """ DTS handler for VNFD config changes """
3408 XPATH
= "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
3410 def __init__(self
, dts
, log
, loop
, nsm
):
3416 self
._project
= nsm
._project
3420 """ DTS registration handle """
3425 """ Register for VNFD configuration"""
3428 self
._log
.warning("DTS handler already registered for project {}".
3429 format(self
._project
.name
))
3433 def on_apply(dts
, acg
, xact
, action
, scratch
):
3434 """Apply the configuration"""
3435 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
3436 xact
, action
, scratch
)
3438 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
3441 # Create/Update a VNFD record
3442 for cfg
in self
._regh
.get_xact_elements(xact
):
3443 # Only interested in those VNFD cfgs whose ID was received in prepare callback
3444 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
3445 self
._nsm
.update_vnfd(cfg
)
3447 for cfg
in self
._regh
.elements
:
3448 if cfg
.id in scratch
.get('deleted_vnfds', []):
3449 yield from self
._nsm
.delete_vnfd(cfg
.id)
3452 self
._log
.warning("Reg handle none for {} in project {}".
3453 format(self
.__class
__, self
._project
))
3455 scratch
.pop('vnfds', None)
3456 scratch
.pop('deleted_vnfds', None)
3459 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3460 """ on prepare callback """
3461 xpath
= ks_path
.to_xpath(NsdYang
.get_schema())
3462 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
3463 xpath
, xact_info
.query_action
, msg
)
3465 fref
= ProtobufC
.FieldReference
.alloc()
3466 fref
.goto_whole_message(msg
.to_pbcm())
3468 # Handle deletes in prepare_callback, but adds/updates in apply_callback
3469 if fref
.is_field_deleted():
3470 self
._log
.debug("Adding msg to deleted field")
3471 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
3472 deleted_vnfds
.append(msg
.id)
3474 # Add this VNFD to scratch to create/update in apply callback
3475 vnfds
= scratch
.setdefault('vnfds', [])
3476 vnfds
.append(msg
.id)
3479 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3480 except rift
.tasklets
.dts
.ResponseError
as e
:
3482 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
3483 format(self
._project
, xpath
, xact_info
.query_action
, e
))
3486 xpath
= self
._project
.add_project(VnfdDtsHandler
.XPATH
)
3488 "Registering for VNFD config using xpath {} for project {}"
3489 .format(xpath
, self
._project
))
3490 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3491 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3492 # Need a list in scratch to store VNFDs to create/update later
3493 # acg._scratch['vnfds'] = list()
3494 # acg._scratch['deleted_vnfds'] = list()
3495 self
._regh
= acg
.register(
3497 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
3498 on_prepare
=on_prepare
)
3500 def deregister(self
):
3501 self
._log
.debug("De-register VNFD handler for project {}".
3502 format(self
._project
.name
))
3504 self
._regh
.deregister()
3508 class NsrRpcDtsHandler(object):
3509 """ The network service instantiation RPC DTS handler """
3510 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
3511 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
3512 NETCONF_IP_ADDRESS
= "127.0.0.1"
3514 RESTCONF_PORT
= 8008
3515 NETCONF_USER
= "@rift"
3517 REST_BASE_V2_URL
= 'https://{}:{}/v2/api/'.format("127.0.0.1",
3520 def __init__(self
, dts
, log
, loop
, nsm
):
3525 self
._project
= nsm
._project
3528 self
._ns
_regh
= None
3530 self
._manager
= None
3531 self
._nsr
_config
_url
= NsrRpcDtsHandler
.REST_BASE_V2_URL
+ \
3532 'project/{}/'.format(self
._project
) + \
3533 'config/ns-instance-config'
3535 self
._model
= RwYang
.Model
.create_libncx()
3536 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
3540 """ Return the NS manager instance """
3544 def wrap_netconf_config_xml(xml
):
3545 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
3549 def _connect(self
, timeout_secs
=240):
3551 start_time
= time
.time()
3552 while (time
.time() - start_time
) < timeout_secs
:
3555 self
._log
.debug("Attemping NsmTasklet netconf connection.")
3557 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
3559 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
3560 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
3561 username
=NsrRpcDtsHandler
.NETCONF_USER
,
3562 password
=NsrRpcDtsHandler
.NETCONF_PW
,
3564 look_for_keys
=False,
3565 hostkey_verify
=False,
3570 except ncclient
.transport
.errors
.SSHError
as e
:
3571 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
3572 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
3574 yield from asyncio
.sleep(5, loop
=self
._loop
)
3576 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
3579 def _apply_ns_instance_config(self
,payload_dict
):
3580 req_hdr
= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
3581 response
=requests
.post(self
._nsr
_config
_url
,
3583 auth
=(NsrRpcDtsHandler
.NETCONF_USER
, NsrRpcDtsHandler
.NETCONF_PW
),
3590 """ Register for NS monitoring read from dts """
3593 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
3594 """ prepare callback from dts start-network-service"""
3595 assert action
== rwdts
.QueryAction
.RPC
3597 if not self
._project
.rpc_check(msg
, xact_info
):
3601 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
3602 "nsr_id":str(uuid
.uuid4())
3605 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and
3606 ('cloud_account' in rpc_ip
or 'om_datacenter' in rpc_ip
)):
3608 "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".
3610 self
._log
.error(errmsg
)
3611 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3612 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3614 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3615 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3618 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
3621 # Add used value to the pool
3622 self
._log
.debug("RPC output: {}".format(rpc_op
))
3624 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
3626 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3627 rpc_ip
.name
, rpc_ip
.nsd_ref
)
3629 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
3630 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
3631 if k
in RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields
}
3632 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
3634 ns_instance_config
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
3635 ns_instance_config
.nsd
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd()
3636 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
3638 payload_dict
= ns_instance_config
.to_json(self
._model
)
3640 self
._log
.debug("Sending configure ns-instance-config json to %s: %s",
3641 self
._nsr
_config
_url
,ns_instance_config
)
3643 response
= yield from self
._loop
.run_in_executor(
3645 self
._apply
_ns
_instance
_config
,
3648 response
.raise_for_status()
3649 self
._log
.debug("Received edit config response: %s", response
.json())
3651 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
3652 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3654 except Exception as e
:
3655 errmsg
= ("Exception processing the "
3656 "start-network-service: {}".format(e
))
3657 self
._log
.exception(errmsg
)
3658 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3659 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3661 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3662 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3664 self
._ns
_regh
= yield from self
._dts
.register(
3665 xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
3666 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
3667 on_prepare
=on_ns_config_prepare
),
3668 flags
=rwdts
.Flag
.PUBLISHER
,
3671 def deregister(self
):
3673 self
._ns
_regh
.deregister()
3674 self
._ns
_regh
= None
3677 class NsrDtsHandler(object):
3678 """ The network service DTS handler """
3679 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3680 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3681 KEY_PAIR_XPATH
= "C,/nsr:key-pair"
3683 def __init__(self
, dts
, log
, loop
, nsm
):
3688 self
._project
= self
._nsm
._project
3690 self
._nsr
_regh
= None
3691 self
._scale
_regh
= None
3692 self
._key
_pair
_regh
= None
3696 """ Return the NS manager instance """
3701 """ Register for Nsr create/update/delete/read requests from dts """
3704 self
._log
.warning("DTS handler already registered for project {}".
3705 format(self
._project
.name
))
3708 def nsr_id_from_keyspec(ks
):
3709 nsr_path_entry
= RwNsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3710 nsr_id
= nsr_path_entry
.key00
.id
3713 def group_name_from_keyspec(ks
):
3714 group_path_entry
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3715 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3718 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3719 """ Return boolean indicating if scaling group instance was already commited previously.
3721 By looking at the existing elements in this registration handle (elements not part
3722 of this current xact), we can tell if the instance was configured previously without
3723 keeping any application state.
3725 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3726 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3727 elem_group_name
= group_name_from_keyspec(keyspec
)
3729 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3732 if instance_cfg
.id == instance_id
:
3737 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3738 delta
= {"added": [], "deleted": []}
3739 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3740 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3741 if elem_nsr_id
!= nsr_id
:
3744 elem_group_name
= group_name_from_keyspec(keyspec
)
3745 if elem_group_name
!= group_name
:
3748 delta
["added"].append(instance_cfg
.id)
3750 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3751 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3752 if elem_nsr_id
!= nsr_id
:
3755 elem_group_name
= group_name_from_keyspec(keyspec
)
3756 if elem_group_name
!= group_name
:
3759 if instance_cfg
.id in delta
["added"]:
3760 delta
["added"].remove(instance_cfg
.id)
3762 delta
["deleted"].append(instance_cfg
.id)
3767 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3770 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3771 delta
= {"added": [], "deleted": []}
3772 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3773 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3774 if elem_nsr_id
!= nsr_id
:
3777 if 'vld' in instance_cfg
.nsd
:
3778 for vld
in instance_cfg
.nsd
.vld
:
3779 delta
["added"].append(vld
)
3781 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3782 self
._log
.debug("NSR update: %s", instance_cfg
)
3783 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3784 if elem_nsr_id
!= nsr_id
:
3787 if 'vld' in instance_cfg
.nsd
:
3788 for vld
in instance_cfg
.nsd
.vld
:
3789 if vld
in delta
["added"]:
3790 delta
["added"].remove(vld
)
3792 delta
["deleted"].append(vld
)
3796 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3797 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3799 for vld
in vl_delta
["added"]:
3800 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3802 for vld
in vl_delta
["deleted"]:
3803 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3805 def get_nsr_key_pairs(dts_member_reg
, xact
):
3807 for instance_cfg
, keyspec
in dts_member_reg
.get_xact_elements(xact
, include_keyspec
=True):
3808 self
._log
.debug("Key pair received is {} KS: {}".format(instance_cfg
, keyspec
))
3809 xpath
= keyspec
.to_xpath(RwNsrYang
.get_schema())
3810 key_pairs
[instance_cfg
.name
] = instance_cfg
3813 def on_apply(dts
, acg
, xact
, action
, scratch
):
3814 """Apply the configuration"""
3815 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3816 xact
, action
, scratch
)
3819 def handle_create_nsr(msg
, key_pairs
=None, restart_mode
=False):
3820 # Handle create nsr requests """
3821 # Do some validations
3822 if not msg
.has_field("nsd"):
3823 err
= "NSD not provided"
3824 self
._log
.error(err
)
3825 raise NetworkServiceRecordError(err
)
3827 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3828 msg
.id, msg
.as_dict())
3829 nsr
= yield from self
.nsm
.create_nsr(msg
,
3831 key_pairs
=key_pairs
,
3832 restart_mode
=restart_mode
)
3835 def handle_delete_nsr(msg
):
3837 def delete_instantiation(ns_id
):
3838 """ Delete instantiation """
3839 yield from self
._nsm
.terminate_ns(ns_id
, None)
3841 # Handle delete NSR requests
3842 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3843 # Terminate the NSR instance
3844 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3846 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3847 event_descr
= "Terminate rcvd for NS Id: %s, NS Name: %s" % (msg
.id, msg
.name
)
3848 nsr
.record_event("terminate-rcvd", event_descr
)
3850 self
._loop
.create_task(delete_instantiation(msg
.id))
3853 def begin_instantiation(nsr
):
3854 # Begin instantiation
3855 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3857 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3858 except Exception as e
:
3859 self
._log
.exception(e
)
3863 def instantiate_ns(msg
, key_pairs
, restart_mode
=False):
3864 nsr
= yield from handle_create_nsr(msg
, key_pairs
, restart_mode
=restart_mode
)
3865 yield from begin_instantiation(nsr
)
3867 def on_instantiate_done(fut
, msg
):
3868 # If the do_instantiate fails, then publish NSR with failed result
3872 print(traceback
.format_exception(None, e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
3873 self
._log
.error("NSR instantiation failed for NSR id %s: %s", msg
.id, str(e
))
3874 failed_nsr
= self
._nsm
.nsrs
[msg
.id]
3875 self
._loop
.create_task(failed_nsr
.instantiation_failed(failed_reason
=str(e
)))
3878 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3879 xact
, action
, scratch
)
3881 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3883 if self
._key
_pair
_regh
:
3884 for element
in self
._key
_pair
_regh
.elements
:
3885 key_pairs
.append(element
)
3887 self
._log
.error("Reg handle none for key pair in project {}".
3888 format(self
._project
))
3891 for element
in self
._nsr
_regh
.elements
:
3892 if element
.id not in self
.nsm
._nsrs
:
3893 instantiate_task
= self
._loop
.create_task(instantiate_ns(element
, key_pairs
,
3895 instantiate_task
.add_done_callback(functools
.partial(on_instantiate_done
, msg
=element
))
3897 self
._log
.error("Reg handle none for NSR in project {}".
3898 format(self
._project
))
3900 return RwTypes
.RwStatus
.SUCCESS
3902 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3905 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3906 deleted_msgs
, updated_msgs
)
3908 for msg
in added_msgs
:
3909 if msg
.id not in self
._nsm
.nsrs
:
3910 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3911 key_pairs
= get_nsr_key_pairs(self
._key
_pair
_regh
, xact
)
3912 instantiate_task
= self
._loop
.create_task(instantiate_ns(msg
,key_pairs
))
3913 instantiate_task
.add_done_callback(functools
.partial(on_instantiate_done
, msg
=msg
))
3915 for msg
in deleted_msgs
:
3916 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3918 handle_delete_nsr(msg
)
3920 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3922 for msg
in updated_msgs
:
3923 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3924 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3927 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3929 for group
in msg
.scaling_group
:
3930 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3931 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3933 for instance_id
in instance_delta
["added"]:
3934 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3936 for instance_id
in instance_delta
["deleted"]:
3937 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3940 return RwTypes
.RwStatus
.SUCCESS
3943 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3944 """ Prepare calllback from DTS for NSR """
3946 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3947 action
= xact_info
.query_action
3949 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3950 xact
, action
, xact_info
, xpath
, msg
3953 fref
= ProtobufC
.FieldReference
.alloc()
3954 fref
.goto_whole_message(msg
.to_pbcm())
3956 def send_err_msg(err_msg
):
3957 self
._log
.error(errmsg
)
3958 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
3961 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
3964 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3965 # if this is an NSR create
3966 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3967 # Ensure the Cloud account/datacenter has been specified
3968 if not msg
.has_field("datacenter") and not msg
.has_field("datacenter"):
3969 errmsg
= ("Cloud account or datacenter not specified in NS {}".
3971 send_err_msg(errmsg
)
3974 # Check if nsd is specified
3975 if not msg
.has_field("nsd"):
3976 errmsg
= ("NSD not specified in NS {}".
3978 send_err_msg(errmsg
)
3982 nsr
= self
._nsm
.nsrs
[msg
.id]
3983 if msg
.has_field("nsd"):
3984 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3985 errmsg
= ("Unable to update VL when NS {} not in running state".
3987 send_err_msg(errmsg
)
3990 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3991 errmsg
= ("NS config {} NSD should have atleast 1 VLD".
3993 send_err_msg(errmsg
)
3996 if msg
.has_field("scaling_group"):
3997 self
._log
.debug("ScaleMsg %s", msg
)
3998 self
._log
.debug("NSSCALINGSTATE %s", nsr
.state
)
3999 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4000 errmsg
= ("Unable to perform scaling action when NS {} not in running state".
4002 send_err_msg(errmsg
)
4005 if len(msg
.scaling_group
) > 1:
4006 errmsg
= ("Only a single scaling group can be configured at a time for NS {}".
4008 send_err_msg(errmsg
)
4011 for group_msg
in msg
.scaling_group
:
4012 num_new_group_instances
= len(group_msg
.instance
)
4013 if num_new_group_instances
> 1:
4014 errmsg
= ("Only a single scaling instance can be modified at a time for NS {}".
4016 send_err_msg(errmsg
)
4019 elif num_new_group_instances
== 1:
4020 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
4021 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
4022 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
4023 errmsg
= (" Max instances for {} reached for NS {}".
4024 format(str(scale_group
), msg
.name
))
4025 send_err_msg(errmsg
)
4028 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
4031 xpath
= self
._project
.add_project(NsrDtsHandler
.NSR_XPATH
)
4032 self
._log
.debug("Registering for NSR config using xpath: {}".
4035 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
4036 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
4037 self
._nsr
_regh
= acg
.register(
4039 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
4040 on_prepare
=on_prepare
4043 self
._scale
_regh
= acg
.register(
4044 xpath
=self
._project
.add_project(NsrDtsHandler
.SCALE_INSTANCE_XPATH
),
4045 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
4048 self
._key
_pair
_regh
= acg
.register(
4049 xpath
=self
._project
.add_project(NsrDtsHandler
.KEY_PAIR_XPATH
),
4050 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
4053 def deregister(self
):
4054 self
._log
.debug("De-register NSR config for project {}".
4055 format(self
._project
.name
))
4057 self
._nsr
_regh
.deregister()
4058 self
._nsr
_regh
= None
4059 if self
._scale
_regh
:
4060 self
._scale
_regh
.deregister()
4061 self
._scale
_regh
= None
4062 if self
._key
_pair
_regh
:
4063 self
._key
_pair
_regh
.deregister()
4064 self
._key
_pair
_regh
= None
4067 class VnfrDtsHandler(object):
4068 """ The virtual network service DTS handler """
4069 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
4071 def __init__(self
, dts
, log
, loop
, nsm
):
4081 """ Return registration handle """
4086 """ Return the NS manager instance """
4091 """ Register for vnfr create/update/delete/ advises from dts """
4093 self
._log
.warning("VNFR DTS handler already registered for project {}".
4094 format(self
._project
.name
))
4098 def on_prepare(xact_info
, action
, ks_path
, msg
):
4099 """ prepare callback from dts """
4100 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
4102 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
4103 xact_info
, action
, ks_path
, msg
4106 schema
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr
.schema()
4107 path_entry
= schema
.keyspec_to_entry(ks_path
)
4108 if not path_entry
or (path_entry
.key00
.id not in self
._nsm
._vnfrs
):
4109 # This can happen when using external RO or after delete with monitoring params
4110 self
._log
.debug("%s request for non existent record path %s",
4112 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
4116 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
4117 yield from self
._nsm
.update_vnfr(msg
)
4118 elif action
== rwdts
.QueryAction
.DELETE
:
4119 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
4121 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
4123 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
4125 self
._log
.debug("Registering for VNFR using xpath: %s",
4126 VnfrDtsHandler
.XPATH
)
4128 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
4129 with self
._dts
.group_create() as group
:
4130 self
._regh
= group
.register(xpath
=self
._nsm
._project
.add_project(
4131 VnfrDtsHandler
.XPATH
),
4133 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
4135 def deregister(self
):
4136 self
._log
.debug("De-register VNFR for project {}".
4137 format(self
._nsm
._project
.name
))
4139 self
._regh
.deregister()
4142 class NsManager(object):
4143 """ The Network Service Manager class"""
4144 def __init__(self
, dts
, log
, loop
, project
,
4145 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
4146 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
4150 self
._project
= project
4151 self
._nsr
_handler
= nsr_handler
4152 self
._vnfr
_pub
_handler
= vnfr_handler
4153 self
._vlr
_pub
_handler
= vlr_handler
4154 self
._vnffgmgr
= vnffgmgr
4155 self
._vnfd
_pub
_handler
= vnfd_pub_handler
4156 self
._cloud
_account
_handler
= cloud_account_handler
4158 self
._ro
_plugin
_selector
= ro_plugin_selector
4160 # Intialize the set of variables for implementing Scaling RPC using REST.
4161 self
._headers
= {"content-type":"application/json", "accept":"application/json"}
4162 self
._user
= '@rift'
4163 self
._password
= 'rift'
4164 self
._ip
= 'localhost'
4166 self
._conf
_url
= "https://{ip}:{port}/api/config/project/{project}". \
4169 project
=self
._project
.name
)
4175 self
._nsr
_for
_vlr
= {}
4177 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
4179 # TODO: All these handlers should move to tasklet level.
4180 # Passing self is often an indication of bad design
4181 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
4182 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
4183 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
4184 VnfrDtsHandler(dts
, log
, loop
, self
),
4185 NsrDtsHandler(dts
, log
, loop
, self
),
4186 ScalingRpcHandler(log
, dts
, loop
, self
, self
.scale_rpc_callback
),
4187 # NsrRpcDtsHandler(dts, log, loop, self),
4188 self
._vnfd
_dts
_handler
,
4209 def nsr_handler(self
):
4210 """" NSR handler """
4211 return self
._nsr
_handler
4215 """" So Obj handler """
4220 """ NSRs in this NSM"""
4225 """ NSDs in this NSM"""
4230 """ VNFDs in this NSM"""
4235 """ VNFRs in this NSM"""
4239 def nsr_pub_handler(self
):
4240 """ NSR publication handler """
4241 return self
._nsr
_handler
4244 def vnfr_pub_handler(self
):
4245 """ VNFR publication handler """
4246 return self
._vnfr
_pub
_handler
4249 def vlr_pub_handler(self
):
4250 """ VLR publication handler """
4251 return self
._vlr
_pub
_handler
4254 def vnfd_pub_handler(self
):
4255 return self
._vnfd
_pub
_handler
4259 """ Register all static DTS handlers """
4260 self
._log
.debug("Register DTS handlers for project {}".format(self
._project
))
4261 for dts_handle
in self
._dts
_handlers
:
4262 if asyncio
.iscoroutinefunction(dts_handle
.register
):
4263 yield from dts_handle
.register()
4265 dts_handle
.register()
4267 def deregister(self
):
4268 """ Register all static DTS handlers """
4269 for dts_handle
in self
._dts
_handlers
:
4270 dts_handle
.deregister()
4273 def get_ns_by_nsr_id(self
, nsr_id
):
4274 """ get NSR by nsr id """
4275 if nsr_id
not in self
._nsrs
:
4276 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
4278 return self
._nsrs
[nsr_id
]
4280 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
4281 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4286 nsr
= self
._nsrs
[nsr_id
]
4287 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4288 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4290 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
4292 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
4293 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4298 nsr
= self
._nsrs
[nsr_id
]
4299 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4300 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4302 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
4304 def scale_rpc_callback(self
, xact
, msg
, action
):
4305 """Callback handler for RPC calls
4307 xact : Transaction Handler
4309 action : Scaling Action
4311 def get_scaling_group_information():
4312 scaling_group_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
4313 output
= requests
.get(scaling_group_url
, headers
=self
._headers
, auth
=(self
._user
, self
._password
), verify
=False)
4314 if output
.text
is None or len(output
.text
) == 0:
4315 self
.log
.error("nsr id %s information not present", self
._nsr
_id
)
4317 scaling_group_info
= json
.loads(output
.text
)
4318 return scaling_group_info
4320 def config_scaling_group_information(scaling_group_info
):
4321 data_str
= json
.dumps(scaling_group_info
)
4323 scale_out_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
4324 response
= requests
.put(scale_out_url
, data
=data_str
, verify
=False,
4325 auth
=(self
._user
, self
._password
), headers
=self
._headers
)
4326 response
.raise_for_status()
4329 scaling_group_info
= get_scaling_group_information()
4330 self
._log
.debug("Scale out info: {}".format(scaling_group_info
))
4331 if scaling_group_info
is None:
4334 scaling_group_present
= False
4335 if "scaling-group" in scaling_group_info
["nsr:nsr"]:
4336 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
4337 for scaling_group
in scaling_group_array
:
4338 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
4339 scaling_group_present
= True
4340 if 'instance' not in scaling_group
:
4341 scaling_group
['instance'] = []
4342 for instance
in scaling_group
['instance']:
4343 if instance
["id"] == int(msg
.instance_id
):
4344 self
.log
.error("scaling group with instance id %s exists for scale out", msg
.instance_id
)
4346 scaling_group
["instance"].append({"id": int(msg
.instance_id
)})
4348 if not scaling_group_present
:
4349 scaling_group_info
["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg
.scaling_group_name_ref
,
4350 "instance": [{"id": msg
.instance_id
}]}]
4352 config_scaling_group_information(scaling_group_info
)
4356 scaling_group_info
= get_scaling_group_information()
4357 if scaling_group_info
is None:
4360 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
4361 scaling_group_present
= False
4362 instance_id_present
= False
4363 for scaling_group
in scaling_group_array
:
4364 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
4365 scaling_group_present
= True
4366 if 'instance' in scaling_group
:
4367 instance_array
= scaling_group
["instance"];
4368 for index
in range(len(instance_array
)):
4369 if instance_array
[index
]["id"] == int(msg
.instance_id
):
4370 instance_array
.pop(index
)
4371 instance_id_present
= True
4374 if not scaling_group_present
:
4375 self
.log
.error("Scaling group %s doesnot exists for scale in", msg
.scaling_group_name_ref
)
4378 if not instance_id_present
:
4379 self
.log
.error("Instance id %s doesnot exists for scale in", msg
.instance_id
)
4382 config_scaling_group_information(scaling_group_info
)
4385 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
4386 self
._loop
.run_in_executor(None, scale_out
)
4388 self
._loop
.run_in_executor(None, scale_in
)
4390 def nsr_update_cfg(self
, nsr_id
, msg
):
4391 nsr
= self
._nsrs
[nsr_id
]
4392 nsr
.nsr_cfg_msg
= msg
4394 def nsr_instantiate_vl(self
, nsr_id
, vld
):
4395 self
.log
.error("NSR {} create VL {}".format(nsr_id
, vld
))
4396 nsr
= self
._nsrs
[nsr_id
]
4397 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4398 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
4400 # Not calling in a separate task as this is called from a separate task
4401 yield from nsr
.create_vl_instance(vld
)
4403 def nsr_terminate_vl(self
, nsr_id
, vld
):
4404 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
4405 nsr
= self
._nsrs
[nsr_id
]
4406 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4407 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
4409 # Not calling in a separate task as this is called from a separate task
4410 yield from nsr
.delete_vl_instance(vld
)
4413 def create_nsr(self
, nsr_msg
, config_xact
, key_pairs
=None,restart_mode
=False):
4414 """ Create an NSR instance """
4415 self
._log
.debug("NSRMSG %s", nsr_msg
)
4416 if nsr_msg
.id in self
._nsrs
:
4417 msg
= "NSR id %s already exists" % nsr_msg
.id
4418 self
._log
.error(msg
)
4419 raise NetworkServiceRecordError(msg
)
4421 self
._log
.debug("Create NetworkServiceRecord nsr id %s from nsd_id %s",
4425 nsm_plugin
= self
._ro
_plugin
_selector
.get_ro_plugin(nsr_msg
.resource_orchestrator
)
4426 #Work Around - openmano expects datacenter id instead of datacenter name
4427 if isinstance(nsm_plugin
, openmano_nsm
.OpenmanoNsPlugin
):
4428 for uuid
, name
in nsm_plugin
._cli
_api
.datacenter_list():
4429 if name
== nsr_msg
.datacenter
:
4430 nsr_msg
.datacenter
= uuid
4432 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.datacenter
)
4434 nsr
= NetworkServiceRecord(self
._dts
,
4443 restart_mode
=restart_mode
,
4444 vlr_handler
=self
._vlr
_pub
_handler
4446 self
._nsrs
[nsr_msg
.id] = nsr
4449 # Generate ssh key pair if required
4450 nsr
.generate_ssh_key_pair(config_xact
)
4451 except Exception as e
:
4452 self
._log
.exception("SSH key: {}".format(e
))
4454 self
._log
.debug("NSR {}: SSh key generated: {}".format(nsr_msg
.name
,
4457 ssh_key
= {'private_key': nsr
.private_key
,
4458 'public_key': nsr
.public_key
4461 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
, key_pairs
, ssh_key
=ssh_key
)
4465 def delete_nsr(self
, nsr_id
):
4467 Delete NSR with the passed nsr id
4469 del self
._nsrs
[nsr_id
]
4472 def instantiate_ns(self
, nsr_id
, config_xact
):
4473 """ Instantiate an NS instance """
4474 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
4475 if nsr_id
not in self
._nsrs
:
4476 err
= "NSR id %s not found " % nsr_id
4477 self
._log
.error(err
)
4478 raise NetworkServiceRecordError(err
)
4480 nsr
= self
._nsrs
[nsr_id
]
4482 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
4483 except Exception as e
:
4484 self
._log
.exception("NS instantiate: {}".format(e
))
4488 def update_vnfr(self
, vnfr
):
4489 """Create/Update an VNFR """
4491 vnfr_state
= self
._vnfrs
[vnfr
.id].state
4492 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
4494 no_of_active_vms
= 0
4495 for vdur
in vnfr
.vdur
:
4496 if vdur
.operational_status
== 'running':
4497 no_of_active_vms
+= 1
4499 self
._vnfrs
[vnfr
.id]._active
_vdus
= no_of_active_vms
4500 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
4501 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
4503 nsr
._vnf
_inst
_started
= False
4504 yield from nsr
.update_state()
4506 def find_nsr_for_vnfr(self
, vnfr_id
):
4507 """ Find the NSR which )has the passed vnfr id"""
4508 for nsr
in list(self
.nsrs
.values()):
4509 for vnfr
in list(nsr
.vnfrs
.values()):
4510 if vnfr
.id == vnfr_id
:
4514 def delete_vnfr(self
, vnfr_id
):
4515 """ Delete VNFR with the passed id"""
4516 del self
._vnfrs
[vnfr_id
]
4519 def get_nsr_config(self
, nsd_id
):
4520 xpath
= self
._project
.add_project("C,/nsr:ns-instance-config")
4521 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
4523 for result
in results
:
4524 entry
= yield from result
4525 ns_instance_config
= entry
.result
4527 for nsr
in ns_instance_config
.nsr
:
4528 if nsr
.nsd
.id == nsd_id
:
4533 def get_nsd(self
, nsd_id
):
4534 """ Get network service descriptor for the passed nsd_id"""
4535 if nsd_id
not in self
._nsds
:
4536 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
4537 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
4539 return self
._nsds
[nsd_id
]
4541 def create_nsd(self
, nsd_msg
):
4542 """ Create a network service descriptor """
4543 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
4544 if nsd_msg
.id in self
._nsds
:
4545 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
4546 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
4548 nsd
= NetworkServiceDescriptor(
4555 self
._nsds
[nsd_msg
.id] = nsd
4559 def update_nsd(self
, nsd
):
4560 """ update the Network service descriptor """
4561 self
._log
.debug("Update network service descriptor - %s", nsd
)
4562 if nsd
.id not in self
._nsds
:
4563 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
4564 self
.create_nsd(nsd
)
4566 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
4567 self
._nsds
[nsd
.id].update(nsd
)
4569 def delete_nsd(self
, nsd_id
):
4570 """ Delete the Network service descriptor with the passed id """
4571 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
4572 if nsd_id
not in self
._nsds
:
4573 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
4574 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
4575 del self
._nsds
[nsd_id
]
4577 def get_vnfd_config(self
, xact
):
4578 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
4579 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
4580 if cfg
.id not in self
._vnfds
:
4581 self
.create_vnfd(cfg
)
4583 def get_vnfd(self
, vnfd_id
, xact
):
4584 """ Get virtual network function descriptor for the passed vnfd_id"""
4585 if vnfd_id
not in self
._vnfds
:
4586 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4587 self
.get_vnfd_config(xact
)
4589 if vnfd_id
not in self
._vnfds
:
4590 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4591 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
4593 return self
._vnfds
[vnfd_id
]
4595 def create_vnfd(self
, vnfd
):
4596 """ Create a virtual network function descriptor """
4597 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
4598 if vnfd
.id in self
._vnfds
:
4599 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
4600 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
4602 self
._vnfds
[vnfd
.id] = vnfd
4603 return self
._vnfds
[vnfd
.id]
4605 def update_vnfd(self
, vnfd
):
4606 """ Update the virtual network function descriptor """
4607 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
4610 if vnfd
.id not in self
._vnfds
:
4611 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
4612 self
.create_vnfd(vnfd
)
4614 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
4615 self
._vnfds
[vnfd
.id] = vnfd
4618 def delete_vnfd(self
, vnfd_id
):
4619 """ Delete the virtual network function descriptor with the passed id """
4620 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
4621 if vnfd_id
not in self
._vnfds
:
4622 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
4623 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
4625 del self
._vnfds
[vnfd_id
]
4628 def publish_nsr(self
, xact
, path
, msg
):
4629 """ Publish a NSR """
4630 self
._log
.debug("Publish NSR with path %s, msg %s",
4632 yield from self
.nsr_handler
.update(xact
, path
, msg
)
4635 def unpublish_nsr(self
, xact
, path
):
4636 """ Un Publish an NSR """
4637 self
._log
.debug("Publishing delete NSR with path %s", path
)
4638 yield from self
.nsr_handler
.delete(path
, xact
)
4640 def vnfr_is_ready(self
, vnfr_id
):
4641 """ VNFR with the id is ready """
4642 self
._log
.debug("VNFR id %s ready", vnfr_id
)
4643 if vnfr_id
not in self
._vnfds
:
4644 err
= "Did not find VNFR ID with id %s" % vnfr_id
4645 self
._log
.critical("err")
4646 raise VirtualNetworkFunctionRecordError(err
)
4647 self
._vnfrs
[vnfr_id
].is_ready()
4651 def terminate_ns(self
, nsr_id
, xact
):
4653 Terminate network service for the given NSR Id
4656 if nsr_id
not in self
._nsrs
:
4659 # Terminate the instances/networks assocaited with this nw service
4660 self
._log
.debug("Terminating the network service %s", nsr_id
)
4662 yield from self
._nsrs
[nsr_id
].terminate()
4663 except Exception as e
:
4664 self
.log
.exception("Failed to terminate NSR[id=%s]", nsr_id
)
4666 def vlr_event(self
, vlr
, action
):
4667 self
._log
.debug("Received VLR %s with action:%s", vlr
, action
)
4668 # Find the NS and see if we can proceed
4669 nsr
= self
.find_nsr_for_vlr_id(vlr
.id)
4671 self
._log
.error("VLR %s:%s received for NSR, state:%s",
4672 vlr
.id, vlr
.name
, vlr
.operational_status
)
4674 nsr
.vlr_event(vlr
, action
)
4676 def add_vlr_id_nsr_map(self
, vlr_id
, nsr
):
4677 """ Add a mapping for vlr_id into NSR """
4678 self
._nsr
_for
_vlr
[vlr_id
] = nsr
4680 def remove_vlr_id_nsr_map(self
, vlr_id
):
4681 """ Remove a mapping for vlr_id into NSR """
4682 if vlr_id
in self
._nsr
_for
_vlr
:
4683 del self
._nsr
_for
_vlr
[vlr_id
]
4685 def find_nsr_for_vlr_id(self
, vlr_id
):
4686 """ Find NSR for VLR id """
4688 if vlr_id
in self
._nsr
_for
_vlr
:
4689 nsr
= self
._nsr
_for
_vlr
[vlr_id
]
4693 class NsmRecordsPublisherProxy(object):
4694 """ This class provides a publisher interface that allows plugin objects
4695 to publish NSR/VNFR/VLR"""
4697 def __init__(self
, dts
, log
, loop
, project
, nsr_pub_hdlr
,
4698 vnfr_pub_hdlr
, vlr_pub_hdlr
,):
4702 self
._project
= project
4703 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4704 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4705 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4708 def publish_nsr_opdata(self
, xact
, nsr
):
4709 """ Publish an NSR """
4710 path
= ("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref={}]"
4711 ).format(quoted_key(nsr
.ns_instance_config_ref
))
4712 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4715 def publish_nsr(self
, xact
, nsr
):
4716 """ Publish an NSR """
4717 path
= self
._project
.add_project(NetworkServiceRecord
.xpath_from_nsr(nsr
))
4718 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4721 def unpublish_nsr(self
, xact
, nsr
):
4722 """ Unpublish an NSR """
4723 path
= self
._project
.add_project(NetworkServiceRecord
.xpath_from_nsr(nsr
))
4724 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4727 def publish_vnfr(self
, xact
, vnfr
):
4728 """ Publish an VNFR """
4729 path
= self
._project
.add_project(VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
))
4730 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4733 def unpublish_vnfr(self
, xact
, vnfr
):
4734 """ Unpublish a VNFR """
4735 path
= self
._project
.add_project(VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
))
4736 yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
)
4737 # NOTE: The regh delete does not send the on_prepare to VNFM tasklet as well
4738 # as remove all the VNFR elements. So need to send this additional delete block.
4739 with self
._dts
.transaction(flags
= 0) as xact
:
4740 block
= xact
.block_create()
4741 block
.add_query_delete(path
)
4742 yield from block
.execute(flags
=0, now
=True)
4745 def publish_vlr(self
, xact
, vlr
):
4746 """ Publish a VLR """
4747 path
= self
._project
.add_project(VirtualLinkRecord
.vlr_xpath(vlr
))
4748 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4751 def unpublish_vlr(self
, xact
, vlr
):
4752 """ Unpublish a VLR """
4753 path
= self
._project
.add_project(VirtualLinkRecord
.vlr_xpath(vlr
))
4754 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4756 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4757 """ The Network service Monitor DTS handler """
4758 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4759 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4761 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4762 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4764 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4766 def __init__(self
, log
, dts
, loop
, nsm
, callback
=None):
4767 super().__init
__(log
, dts
, loop
, nsm
._project
)
4769 self
.callback
= callback
4770 self
.last_instance_id
= defaultdict(int)
4773 self
._reg
_out
= None
4778 def send_err_msg(err_msg
, xact_info
, ks_path
, e
=False):
4779 xpath
= ks_path
.to_xpath(NsrYang
.get_schema())
4781 self
._log
.exception(err_msg
)
4783 self
._log
.error(err_msg
)
4784 xact_info
.send_error_xpath(RwTypes
.RwStatus
.FAILURE
,
4787 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
4790 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4791 assert action
== rwdts
.QueryAction
.RPC
4793 self
._log
.debug("Scale in called: {}".format(msg
.as_dict()))
4794 if not self
.project
.rpc_check(msg
, xact_info
):
4798 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4799 "instance_id": msg
.instance_id
})
4801 nsr
= self
._nsm
.nsrs
[msg
.nsr_id_ref
]
4802 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4803 errmsg
= ("Unable to perform scaling action when NS {}({}) not in running state".
4804 format(nsr
.name
, nsr
.id))
4805 send_err_msg(errmsg
, xact_info
, ks_path
)
4808 xact_info
.respond_xpath(
4809 rwdts
.XactRspCode
.ACK
,
4810 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4814 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4816 except Exception as e
:
4817 errmsg
= ("Exception doing scale in using {}: {}".
4819 send_err_msg(errmsg
, xact_info
, ks_path
, e
=True)
4822 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4823 assert action
== rwdts
.QueryAction
.RPC
4825 self
._log
.debug("Scale out called: {}".format(msg
.as_dict()))
4826 if not self
.project
.rpc_check(msg
, xact_info
):
4830 scaling_group
= msg
.scaling_group_name_ref
4831 if not msg
.instance_id
:
4832 last_instance_id
= self
.last_instance_id
[scale_group
]
4833 msg
.instance_id
= last_instance_id
+ 1
4834 self
.last_instance_id
[scale_group
] += 1
4836 nsr
= self
._nsm
.nsrs
[msg
.nsr_id_ref
]
4837 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
4838 errmsg
= ("Unable to perform scaling action when NS {}({}) not in running state".
4839 format(nsr
.name
, nsr
.id))
4840 send_err_msg(errmsg
, xact_info
, ks_path
)
4843 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4844 "instance_id": msg
.instance_id
})
4846 xact_info
.respond_xpath(
4847 rwdts
.XactRspCode
.ACK
,
4848 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4852 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4854 except Exception as e
:
4855 errmsg
= ("Exception doing scale in using {}: {}".
4857 send_err_msg(errmsg
, xact_info
, ks_path
, e
=True)
4859 self
._reg
_in
= yield from self
.dts
.register(
4860 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4861 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
4862 on_prepare
=on_scale_in_prepare
),
4863 flags
=rwdts
.Flag
.PUBLISHER
)
4865 self
._reg
_out
= yield from self
.dts
.register(
4866 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4867 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
4868 on_prepare
=on_scale_out_prepare
),
4869 flags
=rwdts
.Flag
.PUBLISHER
)
4871 def deregister(self
):
4873 self
._reg
_in
.deregister()
4877 self
._reg
_out
.deregister()
4878 self
._reg
_out
= None
4881 class NsmProject(ManoProject
):
4883 def __init__(self
, name
, tasklet
, **kw
):
4884 super(NsmProject
, self
).__init
__(tasklet
.log
, name
)
4885 self
.update(tasklet
)
4888 self
._ro
_plugin
_selector
= None
4889 self
._vnffgmgr
= None
4891 self
._nsr
_pub
_handler
= None
4892 self
._vnfr
_pub
_handler
= None
4893 self
._vlr
_pub
_handler
= None
4894 self
._vnfd
_pub
_handler
= None
4895 self
._scale
_cfg
_handler
= None
4897 self
._records
_publisher
_proxy
= None
4899 def vlr_event(self
, vlr
, action
):
4900 """ VLR Event callback """
4901 self
.log
.debug("VLR Event received for VLR %s with action %s", vlr
, action
)
4902 self
._nsm
.vlr_event(vlr
, action
)
4906 self
.log
.debug("Register NsmProject for {}".format(self
.name
))
4908 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(
4909 self
._dts
, self
.log
, self
.loop
, self
)
4910 yield from self
._nsr
_pub
_handler
.register()
4912 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(
4913 self
._dts
, self
.log
, self
.loop
, self
)
4914 yield from self
._vnfr
_pub
_handler
.register()
4916 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(
4917 self
._dts
, self
.log
, self
.loop
, self
)
4918 yield from self
._vlr
_pub
_handler
.register()
4920 self
._vlr
_sub
_handler
= subscriber
.VlrSubscriberDtsHandler(self
.log
,
4926 yield from self
._vlr
_sub
_handler
.register()
4928 manifest
= self
._tasklet
.tasklet_info
.get_pb_manifest()
4929 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4930 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4931 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4933 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(
4934 use_ssl
, ssl_cert
, ssl_key
, self
.loop
, self
)
4936 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4941 self
._nsr
_pub
_handler
,
4942 self
._vnfr
_pub
_handler
,
4943 self
._vlr
_pub
_handler
,
4946 # Register the NSM to receive the nsm plugin
4947 # when cloud account is configured
4948 self
._ro
_plugin
_selector
= cloud
.ROAccountConfigSubscriber(
4953 self
._records
_publisher
_proxy
4955 yield from self
._ro
_plugin
_selector
.register()
4957 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4964 yield from self
._cloud
_account
_handler
.register()
4966 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
,
4967 self
, self
._cloud
_account
_handler
)
4968 yield from self
._vnffgmgr
.register()
4970 self
._nsm
= NsManager(
4975 self
._nsr
_pub
_handler
,
4976 self
._vnfr
_pub
_handler
,
4977 self
._vlr
_pub
_handler
,
4978 self
._ro
_plugin
_selector
,
4980 self
._vnfd
_pub
_handler
,
4981 self
._cloud
_account
_handler
,
4984 yield from self
._nsm
.register()
4985 self
.log
.debug("Register NsmProject for {} complete".format(self
.name
))
4987 def deregister(self
):
4988 self
._log
.debug("Project {} de-register".format(self
.name
))
4989 self
._nsm
.deregister()
4990 self
._vnffgmgr
.deregister()
4991 self
._cloud
_account
_handler
.deregister()
4992 self
._ro
_plugin
_selector
.deregister()
4993 self
._nsr
_pub
_handler
.deregister()
4994 self
._vnfr
_pub
_handler
.deregister()
4995 self
._vlr
_pub
_handler
.deregister()
4996 self
._vlr
_sub
_handler
.deregister()
5000 def delete_prepare(self
):
5001 if self
._nsm
and self
._nsm
._nsrs
:
5002 delete_msg
= "Project has NSR associated with it. Delete all Project NSR and try again."
5003 return False, delete_msg
5007 class NsmTasklet(rift
.tasklets
.Tasklet
):
5009 The network service manager tasklet
5011 def __init__(self
, *args
, **kwargs
):
5012 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
5013 self
.rwlog
.set_category("rw-mano-log")
5014 self
.rwlog
.set_subcategory("nsm")
5017 self
.project_handler
= None
5025 """ The task start callback """
5026 super(NsmTasklet
, self
).start()
5027 self
.log
.info("Starting NsmTasklet")
5029 self
.log
.debug("Registering with dts")
5030 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
5031 RwNsmYang
.get_schema(),
5033 self
.on_dts_state_change
)
5035 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
5041 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
5044 def on_instance_started(self
):
5045 """ Task instance started callback """
5046 self
.log
.debug("Got instance started callback")
5050 """ Task init callback """
5051 self
.log
.debug("Got instance started callback")
5053 self
.log
.debug("creating project handler")
5054 self
.project_handler
= ProjectHandler(self
, NsmProject
)
5055 self
.project_handler
.register()
5061 """ Task run callback """
5065 def on_dts_state_change(self
, state
):
5066 """Take action according to current dts state to transition
5067 application into the corresponding application state
5070 state - current dts state
5073 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
5074 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
5078 rwdts
.State
.INIT
: self
.init
,
5079 rwdts
.State
.RUN
: self
.run
,
5082 # Transition application to next state
5083 handler
= handlers
.get(state
, None)
5084 if handler
is not None:
5085 yield from handler()
5087 # Transition dts to next state
5088 next_state
= switch
.get(state
, None)
5089 if next_state
is not None:
5090 self
.log
.debug("Changing state to %s", next_state
)
5091 self
._dts
.handle
.set_state(next_state
)