2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
20 import ncclient
.asyncio_manager
30 from collections
import deque
31 from collections
import defaultdict
35 gi
.require_version('RwYang', '1.0')
36 gi
.require_version('RwNsdYang', '1.0')
37 gi
.require_version('RwDts', '1.0')
38 gi
.require_version('RwNsmYang', '1.0')
39 gi
.require_version('RwNsrYang', '1.0')
40 gi
.require_version('RwTypes', '1.0')
41 gi
.require_version('RwVlrYang', '1.0')
42 gi
.require_version('RwVnfrYang', '1.0')
43 from gi
.repository
import (
59 import rift
.mano
.ncclient
60 import rift
.mano
.config_data
.config
61 import rift
.mano
.dts
as mano_dts
63 from . import rwnsm_conman
as conman
65 from . import publisher
67 from . import config_value_pool
68 from . import rwvnffgmgr
69 from . import scale_group
72 class NetworkServiceRecordState(Enum
):
73 """ Network Service Record State """
77 VNFFG_INIT_PHASE
= 104
83 VL_TERMINATE_PHASE
= 111
84 VNF_TERMINATE_PHASE
= 112
85 VNFFG_TERMINATE_PHASE
= 113
92 class NetworkServiceRecordError(Exception):
93 """ Network Service Record Error """
97 class NetworkServiceDescriptorError(Exception):
98 """ Network Service Descriptor Error """
102 class VirtualNetworkFunctionRecordError(Exception):
103 """ Virtual Network Function Record Error """
107 class NetworkServiceDescriptorNotFound(Exception):
108 """ Cannot find Network Service Descriptor"""
112 class NetworkServiceDescriptorRefCountExists(Exception):
113 """ Network Service Descriptor reference count exists """
117 class NetworkServiceDescriptorUnrefError(Exception):
118 """ Failed to unref a network service descriptor """
122 class NsrInstantiationFailed(Exception):
123 """ Failed to instantiate network service """
127 class VnfInstantiationFailed(Exception):
128 """ Failed to instantiate virtual network function"""
132 class VnffgInstantiationFailed(Exception):
133 """ Failed to instantiate virtual network function"""
137 class VnfDescriptorError(Exception):
138 """Failed to instantiate virtual network function"""
142 class ScalingOperationError(Exception):
146 class ScaleGroupMissingError(Exception):
150 class PlacementGroupError(Exception):
154 class NsrNsdUpdateError(Exception):
158 class NsrVlUpdateError(NsrNsdUpdateError
):
162 class VlRecordState(Enum
):
163 """ VL Record State """
165 INSTANTIATION_PENDING
= 102
167 TERMINATE_PENDING
= 104
172 class VnffgRecordState(Enum
):
173 """ VNFFG Record State """
175 INSTANTIATION_PENDING
= 102
177 TERMINATE_PENDING
= 104
182 class VnffgRecord(object):
183 """ Vnffg Records class"""
186 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
):
191 self
._vnffgmgr
= vnffgmgr
193 self
._nsr
_name
= nsr_name
194 self
._vnffgd
_msg
= vnffgd_msg
195 if sdn_account_name
is None:
196 self
._sdn
_account
_name
= ''
198 self
._sdn
_account
_name
= sdn_account_name
200 self
._vnffgr
_id
= str(uuid
.uuid4())
201 self
._vnffgr
_rsp
_id
= list()
202 self
._vnffgr
_state
= VnffgRecordState
.INIT
207 return self
._vnffgr
_id
211 """ state of this VNF """
212 return self
._vnffgr
_state
214 def fetch_vnffgr(self
):
216 Get VNFFGR message to be published
219 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
220 vnffgr_dict
= {"id": self
._vnffgr
_id
,
221 "nsd_id": self
._nsr
.nsd_id
,
222 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
223 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
224 "sdn_account": self
._sdn
_account
_name
,
225 "operational_status": 'init',
227 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
228 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
229 vnffgr_dict
= {"id": self
._vnffgr
_id
,
230 "nsd_id": self
._nsr
.nsd_id
,
231 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
232 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
233 "sdn_account": self
._sdn
_account
_name
,
234 "operational_status": 'terminated',
236 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
239 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
241 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
242 self
._vnffgr
_state
= VnffgRecordState
.FAILED
243 vnffgr_dict
= {"id": self
._vnffgr
_id
,
244 "nsd_id": self
._nsr
.nsd_id
,
245 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
246 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
247 "sdn_account": self
._sdn
_account
_name
,
248 "operational_status": 'failed',
250 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
255 def vnffgr_create_msg(self
):
256 """ Virtual Link Record message for Creating VLR in VNS """
257 vnffgr_dict
= {"id": self
._vnffgr
_id
,
258 "nsd_id": self
._nsr
.nsd_id
,
259 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
260 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
261 "sdn_account": self
._sdn
_account
_name
,
263 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
264 for rsp
in self
._vnffgd
_msg
.rsp
:
265 vnffgr_rsp
= vnffgr
.rsp
.add()
266 vnffgr_rsp
.id = str(uuid
.uuid4())
267 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
268 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
269 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
270 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
271 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
272 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
273 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
274 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
275 self
._log
.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
277 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref
.vnfd_id_ref
)
280 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
281 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
282 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
283 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
284 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
285 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
286 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
287 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
288 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
289 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
290 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
292 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
293 self
._log
.debug(" Received VNFR is %s", vnfr
)
294 while vnfr
.operational_status
!= 'running':
295 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
296 if vnfr
.operational_status
== 'failed':
297 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
298 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
299 yield from asyncio
.sleep(2, loop
=self
._loop
)
300 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
301 self
._log
.debug("Received VNFR is %s", vnfr
)
303 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
304 for cp
in vnfr
.connection_point
:
305 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
306 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
307 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
308 for vdu
in vnfr
.vdur
:
309 for ext_intf
in vdu
.external_interface
:
310 if ext_intf
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
311 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
312 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
313 vnfr_cp_ref
.connection_point_params
.vm_id
)
316 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
317 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
319 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
320 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
322 rsp_id_ref
= _rsp
[0].id
323 rsp_name
= _rsp
[0].name
325 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
327 vnffgr_classifier
= vnffgr
.classifier
.add()
328 vnffgr_classifier
.id = vnffgd_classifier
.id
329 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
330 _rsp
[0].classifier_name
= vnffgr_classifier
.name
331 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
332 vnffgr_classifier
.rsp_name
= rsp_name
333 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
334 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
335 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
336 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
337 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
338 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
340 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
341 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
343 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
344 self
._log
.debug(" Received VNFR is %s", vnfr
)
345 while vnfr
.operational_status
!= 'running':
346 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
347 if vnfr
.operational_status
== 'failed':
348 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
349 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
350 yield from asyncio
.sleep(2, loop
=self
._loop
)
351 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
352 self
._log
.debug("Received VNFR is %s", vnfr
)
354 for cp
in vnfr
.connection_point
:
355 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
356 vnffgr_classifier
.port_id
= cp
.connection_point_id
357 vnffgr_classifier
.ip_address
= cp
.ip_address
358 for vdu
in vnfr
.vdur
:
359 for ext_intf
in vdu
.external_interface
:
360 if ext_intf
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
361 vnffgr_classifier
.vm_id
= vdu
.vim_id
362 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
363 vnfr_cp_ref
.connection_point_params
.vm_id
)
366 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
370 def vnffgr_nsr_sff_list(self
):
371 """ SFF List for VNFR """
373 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
375 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
376 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
377 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
378 self
._log
.debug(" Received VNFR is %s", vnfr
)
379 while vnfr
.operational_status
!= 'running':
380 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
381 if vnfr
.operational_status
== 'failed':
382 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
383 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
384 yield from asyncio
.sleep(2, loop
=self
._loop
)
385 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
386 self
._log
.debug("Received VNFR is %s", vnfr
)
388 sff
= RwsdnYang
.VNFFGSff()
389 sff_list
[nsr_vnfr
.vnfd
.id] = sff
390 sff
.name
= nsr_vnfr
.name
391 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
393 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
394 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
395 for cp
in vnfr
.connection_point
:
396 sff_dp
= sff
.dp_endpoints
.add()
397 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
398 sff_dp
.address
= cp
.ip_address
399 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
400 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
401 for sf_name
in sf_list
:
402 _sf
= sff
.vnfr_list
.add()
403 _sf
.vnfr_name
= sf_name
408 def instantiate(self
):
409 """ Instantiate this VNFFG """
411 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
415 vnffgr_request
= yield from self
.vnffgr_create_msg()
416 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
419 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
420 except Exception as e
:
421 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
422 self
._vnffgr
_state
= VnffgRecordState
.FAILED
423 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
425 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
427 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
428 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
430 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
431 yield from self
._nsr
.update_state()
433 def vnffgr_in_vnffgrm(self
):
434 """ Is there a VNFR record in VNFM """
435 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
436 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
437 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
444 """ Terminate this VNFFGR """
445 if not self
.vnffgr_in_vnffgrm():
446 self
._log
.error("Ignoring terminate request for id %s in state %s",
447 self
.id, self
._vnffgr
_state
)
450 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
451 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
453 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
455 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
456 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
459 class VirtualLinkRecord(object):
460 """ Virtual Link Records class"""
463 def create_record(dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, ip_profile
, nsr_id
, restart_mode
=False):
464 """Creates a new VLR object based on the given data.
466 If restart mode is enabled, then we look for existing records in the
467 DTS and create a VLR records using the exiting data(ID)
472 vlr_obj
= VirtualLinkRecord(
484 res_iter
= yield from dts
.query_read(
485 "D,/vlr:vlr-catalog/vlr:vlr",
486 rwdts
.XactFlag
.MERGE
)
489 response
= yield from fut
490 vlr
= response
.result
492 # Check if the record is already present, if so use the ID of
493 # the existing record. Since the name of the record is uniquely
494 # formed we can use it as a search key!
495 if vlr
.name
== vlr_obj
.name
:
496 vlr_obj
.reset_id(vlr
.id)
501 def __init__(self
, dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, ip_profile
, nsr_id
):
505 self
._nsr
_name
= nsr_name
506 self
._vld
_msg
= vld_msg
507 self
._cloud
_account
_name
= cloud_account_name
508 self
._assigned
_subnet
= None
509 self
._nsr
_id
= nsr_id
510 self
._ip
_profile
= ip_profile
511 self
._vlr
_id
= str(uuid
.uuid4())
512 self
._state
= VlRecordState
.INIT
513 self
._prev
_state
= None
517 """ path for this object """
518 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
._vlr
_id
)
527 """ Get NSR name for this VL """
532 """ Virtual Link Desciptor """
536 def assigned_subnet(self
):
537 """ Subnet assigned to this VL"""
538 return self
._assigned
_subnet
543 Get the name for this VLR.
544 VLR name is "nsr name:VLD name"
546 if self
.vld_msg
.vim_network_name
:
547 return self
.vld_msg
.vim_network_name
548 elif self
.vld_msg
.name
== "multisite":
549 # This is a temporary hack to identify manually provisioned inter-site network
550 return self
.vld_msg
.name
552 return self
._nsr
_name
+ "." + self
.vld_msg
.name
555 def cloud_account_name(self
):
556 """ Cloud account that this VLR should be created in """
557 return self
._cloud
_account
_name
561 """ Get the VLR path from VLR """
562 return (VirtualLinkRecord
.XPATH
+ "[vlr:id = '{}']").format(vlr
.id)
570 def state(self
, value
):
571 """ VLR set state """
575 def prev_state(self
):
576 """ VLR previous state """
577 return self
._prev
_state
580 def prev_state(self
, value
):
581 """ VLR set previous state """
582 self
._prev
_state
= value
586 """ Virtual Link Record message for Creating VLR in VNS """
587 vld_fields
= ["short_name",
595 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
598 vlr_dict
= {"id": self
._vlr
_id
,
599 "nsr_id_ref": self
._nsr
_id
,
600 "vld_ref": self
.vld_msg
.id,
602 "cloud_account": self
.cloud_account_name
,
605 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
606 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
608 vlr_dict
.update(vld_copy_dict
)
609 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
612 def reset_id(self
, vlr_id
):
613 self
._vlr
_id
= vlr_id
615 def create_nsr_vlr_msg(self
, vnfrs
):
616 """ The VLR message"""
617 nsr_vlr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
618 nsr_vlr
.vlr_ref
= self
._vlr
_id
619 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
620 nsr_vlr
.cloud_account
= self
.cloud_account_name
622 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
624 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
625 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
626 self
.cloud_account_name
== vnfr
.cloud_account_name
):
627 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
628 cp_entry
.vnfr_id
= vnfr
.id
629 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
634 def instantiate(self
):
635 """ Instantiate this VL """
637 self
._log
.debug("Instaniating VLR key %s, vld %s",
638 self
.xpath
, self
._vld
_msg
)
640 self
._state
= VlRecordState
.INSTANTIATION_PENDING
641 self
._log
.debug("Executing VL create path:%s msg:%s",
642 self
.xpath
, self
.vlr_msg
)
644 with self
._dts
.transaction(flags
=0) as xact
:
645 block
= xact
.block_create()
646 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
647 self
._log
.debug("Executing VL create path:%s msg:%s",
648 self
.xpath
, self
.vlr_msg
)
649 res_iter
= yield from block
.execute(now
=True)
655 self
._state
= VlRecordState
.FAILED
656 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
658 if vlr
.operational_status
== 'failed':
659 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
660 self
._state
= VlRecordState
.FAILED
661 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
663 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
665 self
._state
= VlRecordState
.ACTIVE
666 self
._assigned
_subnet
= vlr
.assigned_subnet
668 def vlr_in_vns(self
):
669 """ Is there a VLR record in VNS """
670 if (self
._state
== VlRecordState
.ACTIVE
or
671 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
672 self
._state
== VlRecordState
.TERMINATE_PENDING
or
673 self
._state
== VlRecordState
.FAILED
):
680 """ Terminate this VL """
681 if not self
.vlr_in_vns():
682 self
._log
.debug("Ignoring terminate request for id %s in state %s",
683 self
.id, self
._state
)
686 self
._log
.debug("Terminating VL id:%s", self
.id)
687 self
._state
= VlRecordState
.TERMINATE_PENDING
689 with self
._dts
.transaction(flags
=0) as xact
:
690 block
= xact
.block_create()
691 block
.add_query_delete(self
.xpath
)
692 yield from block
.execute(flags
=0, now
=True)
694 self
._state
= VlRecordState
.TERMINATED
695 self
._log
.debug("Terminated VL id:%s", self
.id)
698 class VnfRecordState(Enum
):
699 """ Vnf Record State """
701 INSTANTIATION_PENDING
= 102
703 TERMINATE_PENDING
= 104
708 class VirtualNetworkFunctionRecord(object):
709 """ Virtual Network Function Record class"""
710 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
714 def create_record(dts
, log
, loop
, vnfd
, const_vnfd_msg
, nsd_id
, nsr_name
,
715 cloud_account_name
, nsr_id
, group_name
, group_instance_id
,
716 placement_groups
, restart_mode
=False):
717 """Creates a new VNFR object based on the given data.
719 If restart mode is enabled, then we look for existing records in the
720 DTS and create a VNFR records using the exiting data(ID)
723 VirtualNetworkFunctionRecord
725 vnfr_obj
= VirtualNetworkFunctionRecord(
738 restart_mode
=restart_mode
)
741 res_iter
= yield from dts
.query_read(
742 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
743 rwdts
.XactFlag
.MERGE
)
746 response
= yield from fut
747 vnfr
= response
.result
749 if vnfr
.name
== vnfr_obj
.name
:
750 vnfr_obj
.reset_id(vnfr
.id)
766 group_instance_id
=None,
767 placement_groups
= [],
768 restart_mode
= False):
773 self
._const
_vnfd
_msg
= const_vnfd_msg
774 self
._nsd
_id
= nsd_id
775 self
._nsr
_name
= nsr_name
776 self
._nsr
_id
= nsr_id
777 self
._cloud
_account
_name
= cloud_account_name
778 self
._group
_name
= group_name
779 self
._group
_instance
_id
= group_instance_id
780 self
._placement
_groups
= placement_groups
781 self
._config
_status
= NsrYang
.ConfigStates
.INIT
783 self
._prev
_state
= VnfRecordState
.INIT
784 self
._state
= VnfRecordState
.INIT
785 self
._state
_failed
_reason
= None
787 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
790 self
._vnfr
_id
= str(uuid
.uuid4())
792 self
._vnfr
_msg
= self
.create_vnfr_msg()
793 self
._log
.debug("Set VNFR {} config type to {}".
794 format(self
.name
, self
.config_type
))
795 self
.restart_mode
= restart_mode
798 if group_name
is None and group_instance_id
is not None:
799 raise ValueError("Group instance id must not be provided with an empty group name")
809 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self
.id)
814 return self
._vnfr
_msg
817 def const_vnfr_msg(self
):
819 return RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id
=self
.id,cloud_account
=self
.cloud_account_name
)
827 def cloud_account_name(self
):
828 """ Cloud account that this VNF should be created in """
829 return self
._cloud
_account
_name
834 """ Is this VNF actve """
835 return True if self
._state
== VnfRecordState
.ACTIVE
else False
839 """ state of this VNF """
843 def state_failed_reason(self
):
844 """ Error message in case this VNF is in failed state """
845 return self
._state
_failed
_reason
848 def member_vnf_index(self
):
849 """ Member VNF index """
850 return self
._const
_vnfd
_msg
.member_vnf_index
855 return self
._nsr
_name
859 """ Name of this VNFR """
860 if self
._name
is not None:
863 name_tags
= [self
._nsr
_name
]
865 if self
._group
_name
is not None:
866 name_tags
.append(self
._group
_name
)
868 if self
._group
_instance
_id
is not None:
869 name_tags
.append(str(self
._group
_instance
_id
))
871 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
873 self
._name
= "__".join(name_tags
)
878 def vnfr_xpath(vnfr
):
879 """ Get the VNFR path from VNFR """
880 return (VirtualNetworkFunctionRecord
.XPATH
+ "[vnfr:id = '{}']").format(vnfr
.id)
883 def config_type(self
):
884 cfg_types
= ['netconf', 'juju', 'script']
885 for method
in cfg_types
:
886 if self
._vnfd
.vnf_configuration
.has_field(method
):
891 def config_status(self
):
892 """Return the config status as YANG ENUM string"""
893 self
._log
.debug("Map VNFR {} config status {} ({})".
894 format(self
.name
, self
._config
_status
, self
.config_type
))
895 if self
.config_type
== 'none':
896 return 'config_not_needed'
897 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
899 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
904 def set_state(self
, state
):
905 """ set the state of this object """
906 self
._prev
_state
= self
._state
909 def reset_id(self
, vnfr_id
):
910 self
._vnfr
_id
= vnfr_id
911 self
._vnfr
_msg
= self
.create_vnfr_msg()
914 self
.config_store
.merge_vnfd_config(
917 self
.member_vnf_index
,
920 def create_vnfr_msg(self
):
921 """ VNFR message for this VNFR """
929 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
932 "nsr_id_ref": self
._nsr
_id
,
933 "vnfd_ref": self
.vnfd
.id,
935 "cloud_account": self
._cloud
_account
_name
,
936 "config_status": self
.config_status
938 vnfr_dict
.update(vnfd_copy_dict
)
940 vnfr
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
941 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
942 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
944 if self
._vnfd
.mgmt_interface
.has_field("port"):
945 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
947 for group_info
in self
._placement
_groups
:
948 group
= vnfr
.placement_groups_info
.add()
949 group
.from_dict(group_info
.as_dict())
951 # UI expects the monitoring param field to exist
952 vnfr
.monitoring_param
= []
954 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
958 def update_vnfm(self
):
959 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
960 format(self
.name
, self
.vnfr_msg
))
961 yield from self
._dts
.query_update(
963 rwdts
.XactFlag
.TRACE
,
967 def get_config_status(self
):
968 """Return the config status as YANG ENUM"""
969 return self
._config
_status
972 def set_config_status(self
, status
):
974 def status_to_string(status
):
976 NsrYang
.ConfigStates
.INIT
: 'init',
977 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
978 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
979 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
980 NsrYang
.ConfigStates
.FAILED
: 'failed',
983 return status_dc
[status
]
985 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
986 format(self
.name
, self
._config
_status
,
987 self
.config_type
, status
))
988 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
989 self
._log
.error("Updating already configured VNFR {}".
993 if self
._config
_status
!= status
:
995 self
._config
_status
= status
996 # I don't think this is used. Original implementor can check.
997 # Caused Exception, so corrected it by status_to_string
998 # But not sure whats the use of this variable?
999 self
.vnfr_msg
.config_status
= status_to_string(status
)
1000 except Exception as e
:
1001 self
._log
.error("Exception=%s", str(e
))
1004 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1006 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1008 # Publish only after VNFM has the VNFR created
1009 yield from self
.update_vnfm()
1010 except Exception as e
:
1011 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1012 format(status
, self
.name
, e
))
1013 self
._log
.exception(e
)
1015 def is_configured(self
):
1016 if self
.config_type
== 'none':
1019 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1025 def instantiate(self
, nsr
):
1026 """ Instantiate this VNFR"""
1028 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1029 self
.xpath
, self
._vnfd
)
1031 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1032 self
.xpath
, self
.vnfr_msg
)
1034 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1036 def find_vlr_for_cp(conn
):
1037 """ Find VLR for the given connection point """
1038 for vlr
in nsr
.vlrs
:
1039 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1040 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1041 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1042 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1043 vlr
.cloud_account_name
== self
.cloud_account_name
):
1044 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1045 conn
.name
, self
.member_vnf_index
)
1049 # For every connection point in the VNFD fill in the identifier
1050 for conn_p
in self
._vnfd
.connection_point
:
1051 cpr
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1052 cpr
.name
= conn_p
.name
1053 cpr
.type_yang
= conn_p
.type_yang
1054 vlr_ref
= find_vlr_for_cp(conn_p
)
1056 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1057 self
._log
.debug("%s", msg
)
1058 # raise VirtualNetworkFunctionRecordError(msg)
1061 cpr
.vlr_ref
= vlr_ref
.id
1062 self
.vnfr_msg
.connection_point
.append(cpr
)
1063 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1064 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd_ref
)
1066 if not self
.restart_mode
:
1067 yield from self
._dts
.query_create(self
.xpath
,
1071 yield from self
._dts
.query_update(self
.xpath
,
1075 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1076 self
.xpath
, self
.vnfr_msg
)
1078 self
._log
.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1079 self
.xpath
, self
._vnfd
, self
.vnfr_msg
)
1082 def update_state(self
, vnfr_msg
):
1083 """ Update this VNFR"""
1084 if vnfr_msg
.operational_status
== "running":
1085 if self
.vnfr_msg
.operational_status
!= "running":
1086 yield from self
.is_active()
1087 elif vnfr_msg
.operational_status
== "failed":
1088 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1091 def is_active(self
):
1092 """ This VNFR is active """
1093 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1094 self
.set_state(VnfRecordState
.ACTIVE
)
1097 def instantiation_failed(self
, failed_reason
=None):
1098 """ This VNFR instantiation failed"""
1099 self
._log
.error("VNFR %s instantiation failed", self
._vnfr
_id
)
1100 self
.set_state(VnfRecordState
.FAILED
)
1101 self
._state
_failed
_reason
= failed_reason
1103 def vnfr_in_vnfm(self
):
1104 """ Is there a VNFR record in VNFM """
1105 if (self
._state
== VnfRecordState
.ACTIVE
or
1106 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1107 self
._state
== VnfRecordState
.FAILED
):
1113 def terminate(self
):
1114 """ Terminate this VNF """
1115 if not self
.vnfr_in_vnfm():
1116 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1117 self
.id, self
._state
)
1120 self
._log
.debug("Terminating VNF id:%s", self
.id)
1121 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1122 with self
._dts
.transaction(flags
=0) as xact
:
1123 block
= xact
.block_create()
1124 block
.add_query_delete(self
.xpath
)
1125 yield from block
.execute(flags
=0)
1126 self
.set_state(VnfRecordState
.TERMINATED
)
1127 self
._log
.debug("Terminated VNF id:%s", self
.id)
1130 class NetworkServiceStatus(object):
1131 """ A class representing the Network service's status """
1132 MAX_EVENTS_RECORDED
= 10
1133 """ Network service Status class"""
1134 def __init__(self
, dts
, log
, loop
):
1139 self
._state
= NetworkServiceRecordState
.INIT
1140 self
._events
= deque([])
1143 def create_notification(self
, evt
, evt_desc
, evt_details
):
1144 xp
= "N,/rw-nsr:nsm-notification"
1145 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1147 notif
.description
= evt_desc
1148 notif
.details
= evt_details
if evt_details
is not None else None
1150 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1151 self
._log
.info("Notification called by creating dts query: %s", notif
)
1153 def record_event(self
, evt
, evt_desc
, evt_details
):
1154 """ Record an event """
1155 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1156 evt
, evt_desc
, len(self
._events
))
1157 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1158 self
._events
.popleft()
1159 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1160 evt_details
if evt_details
is not None else None))
1162 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1164 def set_state(self
, state
):
1165 """ set the state of this status object """
1169 """ Return the state as a yang enum string """
1170 state_to_str_map
= {"INIT": "init",
1171 "VL_INIT_PHASE": "vl_init_phase",
1172 "VNF_INIT_PHASE": "vnf_init_phase",
1173 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1174 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1175 "RUNNING": "running",
1176 "SCALING_OUT": "scaling_out",
1177 "SCALING_IN": "scaling_in",
1178 "TERMINATE_RCVD": "terminate_rcvd",
1179 "TERMINATE": "terminate",
1180 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1181 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1182 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1183 "TERMINATED": "terminated",
1185 "VL_INSTANTIATE": "vl_instantiate",
1186 "VL_TERMINATE": "vl_terminate",
1188 return state_to_str_map
[self
._state
.name
]
1192 """ State of this status object """
1197 """ Network Service Record as a message"""
1200 for entry
in self
._events
:
1201 event
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1204 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1205 event_list
.append(event
)
1209 class NetworkServiceRecord(object):
1210 """ Network service record """
1211 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1213 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
, sdn_account_name
, restart_mode
=False):
1218 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1219 self
._nsm
_plugin
= nsm_plugin
1220 self
._sdn
_account
_name
= sdn_account_name
1223 self
._nsr
_msg
= None
1224 self
._nsr
_regh
= None
1229 self
._param
_pools
= {}
1230 self
._scaling
_groups
= {}
1231 self
._create
_time
= int(time
.time())
1232 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1233 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1234 self
._config
_status
_details
= None
1236 self
.restart_mode
= restart_mode
1237 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1238 self
._debug
_running
= False
1239 self
._is
_active
= False
1240 self
._vl
_phase
_completed
= False
1241 self
._vnf
_phase
_completed
= False
1244 # Initalise the state to init
1245 # The NSR moves through the following transitions
1246 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1247 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1248 # 3. VNFS_READY - READY when the NSR is published
1250 self
.set_state(NetworkServiceRecordState
.INIT
)
1252 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
)
1255 def nsm_plugin(self
):
1257 return self
._nsm
_plugin
1259 def set_state(self
, state
):
1260 """ Set state for this NSR"""
1261 self
._log
.debug("Setting state to %s", state
)
1262 # We are in init phase and is moving to the next state
1263 # The new state could be a FAILED state or VNF_INIIT_PHASE
1264 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1265 self
._vl
_phase
_completed
= True
1267 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1268 self
._vnf
_phase
_completed
= True
1270 self
._op
_status
.set_state(state
)
1274 """ Get id for this NSR"""
1275 return self
._nsr
_cfg
_msg
.id
1279 """ Name of this network service record """
1280 return self
._nsr
_cfg
_msg
.name
1283 def cloud_account_name(self
):
1284 return self
._nsr
_cfg
_msg
.cloud_account
1288 """State of this NetworkServiceRecord"""
1289 return self
._op
_status
.state
1293 """ Is this NSR active ?"""
1294 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1298 """ VLRs associated with this NSR"""
1303 """ VNFRs associated with this NSR"""
1308 """ VNFFGRs associated with this NSR"""
1309 return self
._vnffgrs
1312 def scaling_groups(self
):
1313 """ Scaling groups associated with this NSR """
1314 return self
._scaling
_groups
1317 def param_pools(self
):
1318 """ Parameter value pools associated with this NSR"""
1319 return self
._param
_pools
1322 def nsr_cfg_msg(self
):
1323 return self
._nsr
_cfg
_msg
1326 def nsr_cfg_msg(self
, msg
):
1327 self
._nsr
_cfg
_msg
= msg
1331 """ NSD Protobuf for this NSR """
1332 if self
._nsd
is not None:
1334 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1339 """ NSD ID for this NSR """
1340 return self
.nsd_msg
.id
1344 ''' Get a new job id for config primitive'''
1349 def config_status(self
):
1350 """ Config status for NSR """
1351 return self
._config
_status
1353 def resolve_placement_group_cloud_construct(self
, input_group
):
1355 Returns the cloud specific construct for placement group
1357 copy_dict
= ['name', 'requirement', 'strategy']
1359 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1360 if group_info
.placement_group_ref
== input_group
.name
:
1361 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1362 group_dict
= {k
:v
for k
,v
in
1363 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1364 for param
in copy_dict
:
1365 group_dict
.update({param
: getattr(input_group
, param
)})
1366 group
.from_dict(group_dict
)
1372 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1373 self
.name
, self
.nsd_id
, self
.cloud_account_name
1376 def _get_vnfd(self
, vnfd_id
, config_xact
):
1377 """ Fetch vnfd msg for the passed vnfd id """
1378 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1380 def _get_vnfd_cloud_account(self
, vnfd_member_index
):
1381 """ Fetch Cloud Account for the passed vnfd id """
1382 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1383 vim_accounts
= [vnf
.cloud_account
for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map \
1384 if vnfd_member_index
== vnf
.member_vnf_index_ref
]
1385 if vim_accounts
and vim_accounts
[0]:
1386 return vim_accounts
[0]
1387 return self
.cloud_account_name
1389 def _get_constituent_vnfd_msg(self
, vnf_index
):
1390 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1391 if const_vnfd
.member_vnf_index
== vnf_index
:
1394 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1396 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1397 """ Record an event """
1398 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1399 if state
is not None:
1400 self
.set_state(state
)
1402 def scaling_trigger_str(self
, trigger
):
1403 SCALING_TRIGGER_STRS
= {
1404 NsdYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1405 NsdYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1406 NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1407 NsdYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1410 return SCALING_TRIGGER_STRS
[trigger
]
1411 except Exception as e
:
1412 self
._log
.error("Scaling trigger mapping error for {} : {}".
1414 self
._log
.exception(e
)
1415 return "Unknown trigger"
1418 def instantiate_vls(self
):
1420 This function instantiates VLs for every VL in this Network Service
1422 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1424 for vlr
in self
._vlrs
:
1425 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1426 vlr
.state
= VlRecordState
.ACTIVE
1429 def create(self
, config_xact
):
1430 """ Create this network service"""
1431 # Create virtual links for all the external vnf
1432 # connection points in this NS
1433 yield from self
.create_vls()
1435 # Create VNFs in this network service
1436 yield from self
.create_vnfs(config_xact
)
1438 # Create VNFFG for network service
1439 self
.create_vnffgs()
1441 # Create Scaling Groups for each scaling group in NSD
1442 self
.create_scaling_groups()
1444 # Create Parameter Pools
1445 self
.create_param_pools()
1448 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1449 """ Apply config based on script for scale group """
1452 def add_vnfrs_data(vnfrs_list
):
1453 """ Add as a dict each of the VNFRs data """
1455 for vnfr
in vnfrs_list
:
1456 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1458 vnfr_data
['name'] = vnfr
.name
1459 if trigger
in [NsdYang
.ScalingTrigger
.PRE_SCALE_IN
, NsdYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1460 # Get VNF management and other IPs, etc
1461 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1462 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1464 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1465 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1466 except Exception as e
:
1467 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1468 format(vnfr
.name
, e
))
1471 vnfr_data
['connection_points'] = []
1472 for cp
in opdata
.connection_point
:
1474 con_pt
['name'] = cp
.name
1475 con_pt
['ip_address'] = cp
.ip_address
1476 vnfr_data
['connection_points'].append(con_pt
)
1477 except Exception as e
:
1478 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1479 format(vnfr
.name
, e
))
1481 vnfrs_data
.append(vnfr_data
)
1482 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1486 def add_nsr_data(nsr
):
1488 nsr_data
['name'] = nsr
.name
1491 if script
is None or len(script
) == 0:
1492 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1495 if script
[0] == '/':
1498 path
= os
.path
.join(os
.environ
['RIFT_INSTALL'], "usr/bin", script
)
1499 if not os
.path
.exists(path
):
1500 self
._log
.error("Config faled for scale group {}: Script does not exist at {}".
1501 format(group
.name
, path
))
1504 # Build a YAML file with all parameters for the script to execute
1505 # The data consists of 5 sections
1507 # 2. Scale group config
1508 # 3. VNFRs in the scale group
1509 # 4. VNFRs outside scale group
1512 data
['trigger'] = group
.trigger_map(trigger
)
1513 data
['config'] = group
.group_msg
.as_dict()
1516 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1518 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1520 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1521 data
["nsr"] = add_nsr_data(self
)
1524 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1525 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1528 self
._log
.debug("Creating a temp file: {} with input data: {}".
1529 format(tmp_file
.name
, data
))
1531 cmd
= "{} {}".format(path
, tmp_file
.name
)
1532 self
._log
.debug("Running the CMD: {}".format(cmd
))
1533 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1534 rc
= yield from proc
.wait()
1536 self
._log
.error("The script {} for scale group {} config returned: {}".
1537 format(script
, group
.name
, rc
))
1545 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1546 """ Apply the config for the scaling group based on trigger """
1547 if group
is None or scale_instance
is None:
1551 def update_config_status(success
=True, err_msg
=None):
1552 self
._log
.debug("Update %s config status to %r : %s",
1553 scale_instance
, success
, err_msg
)
1554 if (scale_instance
.config_status
== "failed"):
1555 # Do not update the config status if it is already in failed state
1558 if scale_instance
.config_status
== "configured":
1559 # Update only to failed state an already configured scale instance
1561 scale_instance
.config_status
= "failed"
1562 scale_instance
.config_err_msg
= err_msg
1563 yield from self
.update_state()
1565 # We are in configuring state
1566 # Only after post scale out mark instance as configured
1567 if trigger
== NsdYang
.ScalingTrigger
.POST_SCALE_OUT
:
1569 scale_instance
.config_status
= "configured"
1571 scale_instance
.config_status
= "failed"
1572 scale_instance
.config_err_msg
= err_msg
1573 yield from self
.update_state()
1575 config
= group
.trigger_config(trigger
)
1579 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1580 if config
.has_field("ns_config_primitive_name_ref"):
1581 config_name
= config
.ns_config_primitive_name_ref
1582 nsd_msg
= self
.nsd_msg
1583 config_primitive
= None
1584 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1585 if ns_cfg_prim
.name
== config_name
:
1586 config_primitive
= ns_cfg_prim
1589 if config_primitive
is None:
1590 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1592 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1593 if config_primitive
.has_field("user_defined_script"):
1594 rc
= yield from self
.apply_scale_group_config_script(config_primitive
.user_defined_script
,
1595 group
, scale_instance
, trigger
, vnfrs
)
1598 err_msg
= "Failed config for trigger {} using config script '{}'". \
1599 format(self
.scaling_trigger_str(trigger
),
1600 config_primitive
.user_defined_script
)
1601 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1604 err_msg
= "Failed config for trigger {} as config script is not specified". \
1605 format(self
.scaling_trigger_str(trigger
))
1606 yield from update_config_status(success
=False, err_msg
=err_msg
)
1607 raise NotImplementedError("Only script based config support for scale group for now: {}".
1610 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1611 format(self
.scaling_trigger_str(trigger
))
1612 yield from update_config_status(success
=False, err_msg
=err_msg
)
1613 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1617 def create_scaling_groups(self
):
1618 """ This function creates a NSScalingGroup for every scaling
1619 group defined in he NSD"""
1621 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1622 self
._log
.debug("Found scaling_group %s in nsr id %s",
1623 scaling_group_msg
.name
, self
.id)
1625 group_record
= scale_group
.ScalingGroup(
1630 self
._scaling
_groups
[group_record
.name
] = group_record
1633 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1634 group
= self
._scaling
_groups
[group_name
]
1635 scale_instance
= group
.create_instance(index
, is_default
)
1639 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1640 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1643 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1644 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1645 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1647 cloud_account_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd_msg
.member_vnf_index
)
1648 if cloud_account_name
is None:
1649 cloud_account_name
= self
.cloud_account_name
1650 for _
in range(count
):
1651 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, cloud_account_name
, group_name
, index
)
1652 scale_instance
.add_vnfr(vnfr
)
1658 def instantiate_instance():
1659 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1660 vnfrs
= yield from create_vnfs()
1661 yield from self
.publish()
1663 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1664 scale_instance
.operational_status
= "vnf_init_phase"
1665 yield from self
.update_state()
1668 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1669 group
, scale_instance
, vnfrs
)
1671 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1672 format(group
.name
, index
))
1673 scale_instance
.operational_status
= "failed"
1675 yield from self
.instantiate_vnfs(vnfrs
)
1677 except Exception as e
:
1678 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1679 format(group
.name
, e
))
1680 self
._log
.exception(e
)
1681 scale_instance
.operational_status
= "failed"
1683 yield from self
.update_state()
1685 yield from instantiate_instance()
1688 def delete_scale_group_instance(self
, group_name
, index
):
1689 group
= self
._scaling
_groups
[group_name
]
1690 scale_instance
= group
.get_instance(index
)
1691 if scale_instance
.is_default
:
1692 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1694 scale_instance
.operational_status
= "terminate"
1695 yield from self
.update_state()
1698 def terminate_instance():
1699 self
._log
.debug("Terminating %s VNFRS" % scale_instance
)
1700 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_IN
,
1701 group
, scale_instance
)
1703 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1704 format(group
.name
, index
))
1706 # Going ahead with terminate, even if there is an error in pre-scale-in config
1707 # as this could be result of scale out failure and we need to cleanup this group
1708 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
)
1709 group
.delete_instance(index
)
1711 scale_instance
.operational_status
= "vnf_terminate_phase"
1712 yield from self
.update_state()
1714 yield from terminate_instance()
1717 def _update_scale_group_instances_status(self
):
1719 def post_scale_out_task(group
, instance
):
1720 # Apply post scale out config once all VNFRs are active
1721 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_OUT
,
1723 instance
.operational_status
= "running"
1725 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1726 format(group
.name
, instance
.instance_id
))
1728 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1729 format(group
.name
, instance
.instance_id
))
1731 yield from self
.update_state()
1733 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1734 for group
, instances
in group_instances
.items():
1735 self
._log
.debug("Updating %s instance status", group
)
1736 for instance
in instances
:
1737 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
1738 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
1739 if instance
.operational_status
== "vnf_init_phase":
1740 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
1741 instance
.operational_status
= "running"
1743 # Create a task for post scale out to allow us to sleep before attempting
1744 # to configure newly created VM's
1745 self
._loop
.create_task(post_scale_out_task(group
, instance
))
1747 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
1748 self
._log
.debug("Scale out for group {} and instance {} failed".
1749 format(group
.name
, instance
.instance_id
))
1750 instance
.operational_status
= "failed"
1752 elif instance
.operational_status
== "vnf_terminate_phase":
1753 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
1754 instance
.operational_status
= "terminated"
1755 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_IN
,
1758 self
._log
.debug("Scale in for group {} and instance {} succeeded".
1759 format(group
.name
, instance
.instance_id
))
1761 self
._log
.error("Post scale in config for scale group {} ({}) failed".
1762 format(group
.name
, instance
.instance_id
))
1764 def create_vnffgs(self
):
1765 """ This function creates VNFFGs for every VNFFG in the NSD
1766 associated with this NSR"""
1768 for vnffgd
in self
.nsd_msg
.vnffgd
:
1769 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
1770 vnffgr
= VnffgRecord(self
._dts
,
1773 self
._nsm
._vnffgmgr
,
1777 self
._sdn
_account
_name
1779 self
._vnffgrs
[vnffgr
.id] = vnffgr
1781 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
1782 if not vld
.has_field('ip_profile_ref'):
1784 profile
= [ profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1785 return profile
[0] if profile
else None
1788 def _create_vls(self
, vld
, cloud_account
):
1789 """Create a VLR in the cloud account specified using the given VLD
1793 cloud_account : Cloud account name
1798 vlr
= yield from VirtualLinkRecord
.create_record(
1805 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
1807 restart_mode
=self
.restart_mode
)
1811 def _extract_cloud_accounts_for_vl(self
, vld
):
1813 Extracts the list of cloud accounts from the NS Config obj
1816 1. Cloud accounts based connection point (vnf_cloud_account_map)
1818 vld : VLD yang object
1823 cloud_account_list
= []
1825 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1826 # Handle case where cloud_account is None
1828 for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1829 if vnf
.cloud_account
is not None:
1830 vnf_cloud_map
[vnf
.member_vnf_index_ref
] = vnf
.cloud_account
1832 for vnfc
in vld
.vnfd_connection_point_ref
:
1833 cloud_account
= vnf_cloud_map
.get(
1834 vnfc
.member_vnf_index_ref
,
1835 self
.cloud_account_name
)
1837 cloud_account_list
.append(cloud_account
)
1839 if self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1840 for vld_map
in self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1841 if vld_map
.vld_id_ref
== vld
.id:
1842 cloud_account_list
.extend(vld_map
.cloud_accounts
)
1844 # If no config has been provided then fall-back to the default
1846 if not cloud_account_list
:
1847 cloud_account_list
= [self
.cloud_account_name
]
1849 self
._log
.debug("VL {} cloud accounts: {}".
1850 format(vld
.name
, cloud_account_list
))
1851 return set(cloud_account_list
)
1854 def create_vls(self
):
1855 """ This function creates VLs for every VLD in the NSD
1856 associated with this NSR"""
1857 for vld
in self
.nsd_msg
.vld
:
1858 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
1859 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1860 for account
in cloud_account_list
:
1861 vlr
= yield from self
._create
_vls
(vld
, account
)
1862 self
._vlrs
.append(vlr
)
1866 def create_vl_instance(self
, vld
):
1867 self
._log
.debug("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
1868 # Check if the VL is already present
1870 for vl
in self
._vlrs
:
1871 if vl
.vld_msg
.id == vld
.id:
1872 self
._log
.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1873 vld
.id, self
.id, vl
.id, vl
.state
)
1875 if vlr
.state
!= VlRecordState
.TERMINATED
:
1876 err_msg
= "VLR for VL %s in NSR %s already instantiated", \
1878 self
._log
.error(err_msg
)
1879 raise NsrVlUpdateError(err_msg
)
1883 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1884 for account
in cloud_account_list
:
1885 vlr
= yield from self
._create
_vls
(vld
, account
)
1886 self
._vlrs
.append(vlr
)
1888 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
1889 yield from self
.update_state()
1892 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1893 vlr
.state
= VlRecordState
.ACTIVE
1895 except Exception as e
:
1896 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
1897 format(self
.id, vld
.id, e
)
1898 self
._log
.error(err_msg
)
1899 self
._log
.exception(e
)
1900 vlr
.state
= VlRecordState
.FAILED
1902 yield from self
.update_state()
1905 def delete_vl_instance(self
, vld
):
1906 for vlr
in self
._vlrs
:
1907 if vlr
.vld_msg
.id == vld
.id:
1908 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
1909 vlr
.id, vld
.id, self
.id)
1910 vlr
.state
= VlRecordState
.TERMINATE_PENDING
1911 yield from self
.update_state()
1914 yield from self
.nsm_plugin
.terminate_vl(vlr
)
1915 vlr
.state
= VlRecordState
.TERMINATED
1916 self
._vlrs
.remove(vlr
)
1918 except Exception as e
:
1919 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
1920 format(self
.id, vld
.id, e
)
1921 self
._log
.error(err_msg
)
1922 self
._log
.exception(e
)
1923 vlr
.state
= VlRecordState
.FAILED
1925 yield from self
.update_state()
1929 def create_vnfs(self
, config_xact
):
1931 This function creates VNFs for every VNF in the NSD
1932 associated with this NSR
1934 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
1935 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
1937 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1938 if not const_vnfd
.start_by_default
:
1939 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1940 const_vnfd
.member_vnf_index
)
1943 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
1944 cloud_account_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd
.member_vnf_index
)
1945 if cloud_account_name
is None:
1946 cloud_account_name
= self
.cloud_account_name
1947 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, cloud_account_name
)
1950 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
1951 placement_groups
= []
1952 for group
in self
.nsd_msg
.placement_groups
:
1953 for member_vnfd
in group
.member_vnfd
:
1954 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
1955 (member_vnfd
.member_vnf_index_ref
== const_vnfd
.member_vnf_index
):
1956 group_info
= self
.resolve_placement_group_cloud_construct(group
)
1957 if group_info
is None:
1958 self
._log
.error("Could not resolve cloud-construct for placement group: %s", group
.name
)
1959 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1961 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1964 const_vnfd
.member_vnf_index
)
1965 placement_groups
.append(group_info
)
1966 return placement_groups
1969 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, cloud_account_name
, group_name
=None, group_instance_id
=None):
1970 # Fetch the VNFD associated with this VNF
1971 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
1972 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,cloud_account_name
)
1973 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
1975 const_vnfd
.member_vnf_index
,
1976 [ group
.name
for group
in placement_groups
])
1977 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
1989 restart_mode
=self
.restart_mode
,
1991 if vnfr
.id in self
._vnfrs
:
1992 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
1993 raise NetworkServiceRecordError(err
)
1995 self
._vnfrs
[vnfr
.id] = vnfr
1996 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
1998 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2000 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2006 def create_param_pools(self
):
2007 for param_pool
in self
.nsd_msg
.parameter_pool
:
2008 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2010 start_value
= param_pool
.range.start_value
2011 end_value
= param_pool
.range.end_value
2012 if end_value
< start_value
:
2013 raise NetworkServiceRecordError(
2014 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2015 start_value
, end_value
2019 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2022 range(start_value
, end_value
)
2026 def fetch_vnfr(self
, vnfr_path
):
2027 """ Fetch VNFR record """
2029 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2031 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2033 for ent
in res_iter
:
2034 res
= yield from ent
2040 def instantiate_vnfs(self
, vnfrs
):
2042 This function instantiates VNFs for every VNF in this Network Service
2044 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2046 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2047 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
)
2050 def instantiate_vnffgs(self
):
2052 This function instantiates VNFFGs for every VNFFG in this Network Service
2054 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2055 len(self
.nsd_msg
.vnffgd
), self
.id)
2056 for _
, vnfr
in self
.vnfrs
.items():
2057 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2058 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2059 yield from asyncio
.sleep(2, loop
=self
._loop
)
2060 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2061 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2064 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2065 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2068 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2069 yield from asyncio
.sleep(90, loop
=self
._loop
)
2070 self
._log
.info("Starting VNFFG orchestration")
2071 for vnffg
in self
._vnffgrs
.values():
2072 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2073 yield from vnffg
.instantiate()
2076 def instantiate_scaling_instances(self
, config_xact
):
2077 """ Instantiate any default scaling instances in this Network Service """
2078 for group
in self
._scaling
_groups
.values():
2079 for i
in range(group
.min_instance_count
):
2080 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2081 yield from self
.create_scale_group_instance(
2082 group
.name
, i
, config_xact
, is_default
=True
2085 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2086 if group_msg
.scaling_group_name_ref
!= group
.name
:
2089 for instance
in group_msg
.instance
:
2090 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2091 yield from self
.create_scale_group_instance(
2092 group
.name
, instance
.id, config_xact
, is_default
=False
2095 def has_scaling_instances(self
):
2096 """ Return boolean indicating if the network service has default scaling groups """
2097 for group
in self
._scaling
_groups
.values():
2098 if group
.min_instance_count
> 0:
2101 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2102 if len(group_msg
.instance
) > 0:
2109 """ This function publishes this NSR """
2110 self
._nsr
_msg
= self
.create_msg()
2112 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2116 if self
._debug
_running
:
2117 self
._log
.debug("Publishing NSR in RUNNING state!")
2120 with self
._dts
.transaction() as xact
:
2121 yield from self
._nsm
.nsr_handler
.update(xact
, self
.nsr_xpath
, self
._nsr
_msg
)
2122 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2123 self
._debug
_running
= True
2126 def unpublish(self
, xact
):
2127 """ Unpublish this NSR object """
2128 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2129 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2132 def nsr_xpath(self
):
2133 """ Returns the xpath associated with this NSR """
2135 "D,/nsr:ns-instance-opdata" +
2136 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2140 def xpath_from_nsr(nsr
):
2141 """ Returns the xpath associated with this NSR op data"""
2142 return (NetworkServiceRecord
.XPATH
+
2143 "[nsr:ns-instance-config-ref = '{}']").format(nsr
.id)
2146 def nsd_xpath(self
):
2147 """ Return NSD config xpath."""
2149 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2150 ).format(self
.nsd_id
)
2153 def instantiate(self
, config_xact
):
2154 """"Instantiates a NetworkServiceRecord.
2156 This function instantiates a Network service
2157 which involves the following steps,
2159 * Instantiate every VL in NSD by sending create VLR request to DTS.
2160 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2161 * Publish the NSR details to DTS
2164 nsr: The NSR configuration request containing nsr-id and nsd
2165 config_xact: The configuration transaction which initiated the instatiation
2168 NetworkServiceRecordError if the NSR creation fails
2174 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2176 # Move the state to INIITALIZING
2177 self
.set_state(NetworkServiceRecordState
.INIT
)
2179 event_descr
= "Instantiation Request Received NSR Id:%s" % self
.id
2180 self
.record_event("instantiating", event_descr
)
2183 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2186 # Update ref count if nsd present in catalog
2187 self
._nsm
.get_nsd_ref(self
.nsd_id
)
2189 except NetworkServiceDescriptorError
:
2190 # This could be an NSD not in the nsd-catalog
2193 # Merge any config and initial config primitive values
2194 self
.config_store
.merge_nsd_config(self
.nsd_msg
)
2195 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2197 event_descr
= "Fetched NSD with descriptor id %s" % self
.nsd_id
2198 self
.record_event("nsd-fetched", event_descr
)
2200 if self
._nsd
is None:
2201 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2202 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2203 raise NetworkServiceRecordError(self
)
2205 self
._log
.debug("Got nsd result %s", self
._nsd
)
2207 # Substitute any input parameters
2208 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2211 yield from self
.create(config_xact
)
2213 # Publish the NSR to DTS
2214 yield from self
.publish()
2217 def do_instantiate():
2219 Instantiate network service
2221 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2222 self
.id, self
.nsd_id
)
2224 # instantiate the VLs
2225 event_descr
= ("Instantiating %s external VLs for NSR id %s" %
2226 (len(self
.nsd_msg
.vld
), self
.id))
2227 self
.record_event("begin-external-vls-instantiation", event_descr
)
2229 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2231 yield from self
.instantiate_vls()
2233 # Publish the NSR to DTS
2234 yield from self
.publish()
2236 event_descr
= ("Finished instantiating %s external VLs for NSR id %s" %
2237 (len(self
.nsd_msg
.vld
), self
.id))
2238 self
.record_event("end-external-vls-instantiation", event_descr
)
2240 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2242 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2243 self
.id, self
.nsd_id
)
2245 # instantiate the VNFs
2246 event_descr
= ("Instantiating %s VNFS for NSR id %s" %
2247 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2249 self
.record_event("begin-vnf-instantiation", event_descr
)
2251 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2253 self
._log
.debug(" Finished instantiating %d VNFs for NSR id %s",
2254 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2256 event_descr
= ("Finished instantiating %s VNFs for NSR id %s" %
2257 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2258 self
.record_event("end-vnf-instantiation", event_descr
)
2260 if len(self
.vnffgrs
) > 0:
2261 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2262 event_descr
= ("Instantiating %s VNFFGS for NSR id %s" %
2263 (len(self
.nsd_msg
.vnffgd
), self
.id))
2265 self
.record_event("begin-vnffg-instantiation", event_descr
)
2267 yield from self
.instantiate_vnffgs()
2269 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id %s" %
2270 (len(self
.nsd_msg
.vnffgd
), self
.id))
2271 self
.record_event("end-vnffg-instantiation", event_descr
)
2273 if self
.has_scaling_instances():
2274 event_descr
= ("Instantiating %s Scaling Groups for NSR id %s" %
2275 (len(self
._scaling
_groups
), self
.id))
2277 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2278 yield from self
.instantiate_scaling_instances(config_xact
)
2279 self
.record_event("end-scaling-group-instantiation", event_descr
)
2281 # Give the plugin a chance to deploy the network service now that all
2282 # virtual links and vnfs are instantiated
2283 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2285 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2286 self
.id, self
.nsd_id
)
2288 # Publish the NSR to DTS
2289 yield from self
.publish()
2291 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s]",
2292 self
.id, self
.nsd_id
)
2294 def on_instantiate_done(fut
):
2295 # If the do_instantiate fails, then publish NSR with failed result
2296 if fut
.exception() is not None:
2297 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(fut
.exception()))
2298 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(fut
.exception())))
2300 instantiate_task
= self
._loop
.create_task(do_instantiate())
2301 instantiate_task
.add_done_callback(on_instantiate_done
)
2304 def set_config_status(self
, status
, status_details
=None):
2305 if self
.config_status
!= status
:
2306 self
._log
.debug("Updating NSR {} status for {} to {}".
2307 format(self
.name
, self
.config_status
, status
))
2308 self
._config
_status
= status
2309 self
._config
_status
_details
= status_details
2311 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2312 self
.record_event("config-failed", "NS configuration failed",
2313 evt_details
=self
._config
_status
_details
)
2315 yield from self
.publish()
2318 def is_active(self
):
2319 """ This NS is active """
2320 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2324 # Publish the NSR to DTS
2325 self
._log
.debug("Network service %s is active ", self
.id)
2326 self
._is
_active
= True
2328 event_descr
= "NSR in running state for NSR id %s" % self
.id
2329 self
.record_event("ns-running", event_descr
)
2331 yield from self
.publish()
2334 def instantiation_failed(self
, failed_reason
=None):
2335 """ The NS instantiation failed"""
2336 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2338 self
.set_state(NetworkServiceRecordState
.FAILED
)
2340 event_descr
= "Instantiation of NS %s failed" % self
.id
2341 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2343 # Publish the NSR to DTS
2344 yield from self
.publish()
2347 def terminate_vnfrs(self
, vnfrs
):
2348 """ Terminate VNFRS in this network service """
2349 self
._log
.debug("Terminating VNFs in network service %s", self
.id)
2351 yield from self
.nsm_plugin
.terminate_vnf(vnfr
)
2354 def terminate(self
):
2355 """ Terminate a NetworkServiceRecord."""
2356 def terminate_vnffgrs():
2357 """ Terminate VNFFGRS in this network service """
2358 self
._log
.debug("Terminating VNFFGRs in network service %s", self
.id)
2359 for vnffgr
in self
.vnffgrs
.values():
2360 yield from vnffgr
.terminate()
2362 def terminate_vlrs():
2363 """ Terminate VLRs in this netork service """
2364 self
._log
.debug("Terminating VLs in network service %s", self
.id)
2365 for vlr
in self
.vlrs
:
2366 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2367 vlr
.state
= VlRecordState
.TERMINATED
2369 self
._log
.debug("Terminating network service id %s", self
.id)
2371 # Move the state to TERMINATE
2372 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2373 event_descr
= "Terminate being processed for NS Id:%s" % self
.id
2374 self
.record_event("terminate", event_descr
)
2376 # Move the state to VNF_TERMINATE_PHASE
2377 self
._log
.debug("Terminating VNFFGs in NS ID: %s", self
.id)
2378 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2379 event_descr
= "Terminating VNFFGS in NS Id:%s" % self
.id
2380 self
.record_event("terminating-vnffgss", event_descr
)
2381 yield from terminate_vnffgrs()
2383 # Move the state to VNF_TERMINATE_PHASE
2384 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2385 event_descr
= "Terminating VNFS in NS Id:%s" % self
.id
2386 self
.record_event("terminating-vnfs", event_descr
)
2387 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2389 # Move the state to VL_TERMINATE_PHASE
2390 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2391 event_descr
= "Terminating VLs in NS Id:%s" % self
.id
2392 self
.record_event("terminating-vls", event_descr
)
2393 yield from terminate_vlrs()
2395 yield from self
.nsm_plugin
.terminate_ns(self
)
2397 # Move the state to TERMINATED
2398 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2399 event_descr
= "Terminated NS Id:%s" % self
.id
2400 self
.record_event("terminated", event_descr
)
2403 """"Enable a NetworkServiceRecord."""
2407 """"Disable a NetworkServiceRecord."""
2410 def map_config_status(self
):
2411 self
._log
.debug("Config status for ns {} is {}".
2412 format(self
.name
, self
._config
_status
))
2413 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2414 return 'configuring'
2415 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2419 def vl_phase_completed(self
):
2420 """ Are VLs created in this NS?"""
2421 return self
._vl
_phase
_completed
2423 def vnf_phase_completed(self
):
2424 """ Are VLs created in this NS?"""
2425 return self
._vnf
_phase
_completed
2427 def create_msg(self
):
2428 """ The network serice record as a message """
2429 nsr_dict
= {"ns_instance_config_ref": self
.id}
2430 nsr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2431 #nsr.cloud_account = self.cloud_account_name
2432 nsr
.sdn_account
= self
._sdn
_account
_name
2433 nsr
.name_ref
= self
.name
2434 nsr
.nsd_ref
= self
.nsd_id
2435 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2436 nsr
.operational_events
= self
._op
_status
.msg
2437 nsr
.operational_status
= self
._op
_status
.yang_str()
2438 nsr
.config_status
= self
.map_config_status()
2439 nsr
.config_status_details
= self
._config
_status
_details
2440 nsr
.create_time
= self
._create
_time
2442 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2443 cfg_prim
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2445 nsr
.service_primitive
.append(cfg_prim
)
2447 for init_cfg
in self
.nsd_msg
.initial_config_primitive
:
2448 prim
= NsrYang
.NsrInitialConfigPrimitive
.from_dict(
2450 nsr
.initial_config_primitive
.append(prim
)
2452 if self
.vl_phase_completed():
2453 for vlr
in self
.vlrs
:
2454 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2456 if self
.vnf_phase_completed():
2457 for vnfr_id
in self
.vnfrs
:
2458 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2459 for vnffgr
in self
.vnffgrs
.values():
2460 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2461 for scaling_group
in self
._scaling
_groups
.values():
2462 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2466 def all_vnfs_active(self
):
2467 """ Are all VNFS in this NS active? """
2468 for _
, vnfr
in self
.vnfrs
.items():
2469 if vnfr
.active
is not True:
2474 def update_state(self
):
2475 """ Re-evaluate this NS's state """
2476 curr_state
= self
._op
_status
.state
2478 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2479 self
._log
.debug("NS (%s) in terminated state, not updating state", self
.id)
2482 new_state
= NetworkServiceRecordState
.RUNNING
2483 self
._log
.info("Received update_state for nsr: %s, curr-state: %s",
2484 self
.id, curr_state
)
2486 # Check all the VNFRs are present
2487 for _
, vnfr
in self
.vnfrs
.items():
2488 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2490 elif vnfr
.state
== VnfRecordState
.FAILED
:
2491 if vnfr
._prev
_state
!= vnfr
.state
:
2492 event_descr
= "Instantiation of VNF %s failed" % vnfr
.id
2493 event_error_details
= vnfr
.state_failed_reason
2494 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2495 vnfr
.set_state(VnfRecordState
.FAILED
)
2497 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2498 vnfr
.state
, vnfr
._prev
_state
)
2499 new_state
= NetworkServiceRecordState
.FAILED
2502 self
._log
.info("VNF %s in NSR %s is still not active; current state is: %s",
2503 vnfr
.id, self
.id, vnfr
.state
)
2504 new_state
= curr_state
2506 # If new state is RUNNING; check all VLs
2507 if new_state
== NetworkServiceRecordState
.RUNNING
:
2508 for vl
in self
.vlrs
:
2510 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2512 elif vl
.state
== VlRecordState
.FAILED
:
2513 if vl
.prev_state
!= vl
.state
:
2514 event_descr
= "Instantiation of VL %s failed" % vl
.id
2515 event_error_details
= vl
.state_failed_reason
2516 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2517 vl
.prev_state
= vl
.state
2519 self
._log
.debug("VL %s already in failed state")
2521 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2522 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2525 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2526 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2529 # If new state is RUNNING; check VNFFGRs are also active
2530 if new_state
== NetworkServiceRecordState
.RUNNING
:
2531 for _
, vnffgr
in self
.vnffgrs
.items():
2532 self
._log
.info("Checking vnffgr state for nsr %s is: %s",
2533 self
.id, vnffgr
.state
)
2534 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2536 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2537 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2538 self
.record_event("vnffg-failed", event_descr
)
2539 new_state
= NetworkServiceRecordState
.FAILED
2542 self
._log
.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2543 vnffgr
.id, self
.id, vnffgr
.state
)
2544 new_state
= curr_state
2546 # Update all the scaling group instance operational status to
2547 # reflect the state of all VNFR within that instance
2548 yield from self
._update
_scale
_group
_instances
_status
()
2550 for _
, group
in self
._scaling
_groups
.items():
2551 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
2552 new_state
= NetworkServiceRecordState
.SCALING_OUT
2554 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
2555 new_state
= NetworkServiceRecordState
.SCALING_IN
2558 if new_state
!= curr_state
:
2559 self
._log
.debug("Changing state of Network service %s from %s to %s",
2560 self
.id, curr_state
, new_state
)
2561 if new_state
== NetworkServiceRecordState
.RUNNING
:
2562 yield from self
.is_active()
2563 elif new_state
== NetworkServiceRecordState
.FAILED
:
2564 # If the NS is already active and we entered scaling_in, scaling_out,
2565 # do not mark the NS as failing if scaling operation failed.
2566 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
2567 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
2568 new_state
= NetworkServiceRecordState
.RUNNING
2569 self
.set_state(new_state
)
2571 yield from self
.instantiation_failed()
2573 self
.set_state(new_state
)
2575 yield from self
.publish()
2578 class InputParameterSubstitution(object):
2580 This class is responsible for substituting input parameters into an NSD.
2583 def __init__(self
, log
):
2584 """Create an instance of InputParameterSubstitution
2587 log - a logger for this object to use
2592 def __call__(self
, nsd
, nsr_config
):
2593 """Substitutes input parameters from the NSR config into the NSD
2595 This call modifies the provided NSD with the input parameters that are
2596 contained in the NSR config.
2599 nsd - a GI NSD object
2600 nsr_config - a GI NSR config object
2603 if nsd
is None or nsr_config
is None:
2606 # Create a lookup of the xpath elements that this descriptor allows
2608 optional_input_parameters
= set()
2609 for input_parameter
in nsd
.input_parameter_xpath
:
2610 optional_input_parameters
.add(input_parameter
.xpath
)
2612 # Apply the input parameters to the descriptor
2613 if nsr_config
.input_parameter
:
2614 for param
in nsr_config
.input_parameter
:
2615 if param
.xpath
not in optional_input_parameters
:
2616 msg
= "tried to set an invalid input parameter ({})"
2617 self
.log
.error(msg
.format(param
.xpath
))
2621 "input-parameter:{} = {}".format(
2628 xpath
.setxattr(nsd
, param
.xpath
, param
.value
)
2630 except Exception as e
:
2631 self
.log
.exception(e
)
2634 class NetworkServiceDescriptor(object):
2636 Network service descriptor class
2639 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
2651 """ Returns nsd id """
2656 """ Returns name of nsd """
2657 return self
._nsd
.name
2660 def ref_count(self
):
2661 """ Returns reference count"""
2662 return self
._ref
_count
2665 """ Returns whether nsd is in use or not """
2666 return True if self
.ref_count
> 0 else False
2669 """ Take a reference on this object """
2670 self
._ref
_count
+= 1
2673 """ Release reference on this object """
2674 if self
.ref_count
< 1:
2675 msg
= ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2676 (self
.id, self
.ref_count
))
2677 self
._log
.critical(msg
)
2678 raise NetworkServiceDescriptorError(msg
)
2679 self
._ref
_count
-= 1
2683 """ Return the message associated with this NetworkServiceDescriptor"""
2687 def path_for_id(nsd_id
):
2688 """ Return path for the passed nsd_id"""
2689 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id
)
2692 """ Return the message associated with this NetworkServiceDescriptor"""
2693 return NetworkServiceDescriptor
.path_for_id(self
.id)
2695 def update(self
, nsd
):
2696 """ Update the NSD descriptor """
2700 class NsdDtsHandler(object):
2701 """ The network service descriptor DTS handler """
2702 XPATH
= "C,/nsd:nsd-catalog/nsd:nsd"
2704 def __init__(self
, dts
, log
, loop
, nsm
):
2714 """ Return registration handle """
2719 """ Register for Nsd create/update/delete/read requests from dts """
2721 def on_apply(dts
, acg
, xact
, action
, scratch
):
2722 """Apply the configuration"""
2723 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2724 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2726 # Create/Update an NSD record
2727 for cfg
in self
._regh
.get_xact_elements(xact
):
2728 # Only interested in those NSD cfgs whose ID was received in prepare callback
2729 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
2730 self
._nsm
.update_nsd(cfg
)
2732 scratch
.pop('nsds', None)
2734 return RwTypes
.RwStatus
.SUCCESS
2737 def delete_nsd_libs(nsd_id
):
2738 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2740 rift_artifacts_dir
= os
.environ
['RIFT_ARTIFACTS']
2741 nsd_dir
= os
.path
.join(rift_artifacts_dir
, 'launchpad/libs', nsd_id
)
2743 if os
.path
.exists (nsd_dir
):
2744 shutil
.rmtree(nsd_dir
, ignore_errors
=True)
2745 except Exception as e
:
2746 self
._log
.error("Exception in cleaning up NSD libs {}: {}".
2748 self
._log
.excpetion(e
)
2751 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2752 """ Prepare callback from DTS for NSD config """
2754 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
2757 fref
= ProtobufC
.FieldReference
.alloc()
2758 fref
.goto_whole_message(msg
.to_pbcm())
2760 if fref
.is_field_deleted():
2761 # Delete an NSD record
2762 self
._log
.debug("Deleting NSD with id %s", msg
.id)
2763 if self
._nsm
.nsd_in_use(msg
.id):
2764 self
._log
.debug("Cannot delete NSD in use - %s", msg
.id)
2765 err
= "Cannot delete an NSD in use - %s" % msg
.id
2766 raise NetworkServiceDescriptorRefCountExists(err
)
2768 yield from delete_nsd_libs(msg
.id)
2769 self
._nsm
.delete_nsd(msg
.id)
2771 # Add this NSD to scratch to create/update in apply callback
2772 nsds
= scratch
.setdefault('nsds', [])
2774 # acg._scratch['nsds'].append(msg.id)
2776 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2779 "Registering for NSD config using xpath: %s",
2780 NsdDtsHandler
.XPATH
,
2783 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2784 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2785 # Need a list in scratch to store NSDs to create/update later
2786 # acg._scratch['nsds'] = list()
2787 self
._regh
= acg
.register(
2788 xpath
=NsdDtsHandler
.XPATH
,
2789 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
2790 on_prepare
=on_prepare
)
2793 class VnfdDtsHandler(object):
2794 """ DTS handler for VNFD config changes """
2795 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2797 def __init__(self
, dts
, log
, loop
, nsm
):
2806 """ DTS registration handle """
2811 """ Register for VNFD configuration"""
2814 def on_apply(dts
, acg
, xact
, action
, scratch
):
2815 """Apply the configuration"""
2816 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2817 xact
, action
, scratch
)
2819 # Create/Update a VNFD record
2820 for cfg
in self
._regh
.get_xact_elements(xact
):
2821 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2822 if cfg
.id in scratch
.get('vnfds', []):
2823 self
._nsm
.update_vnfd(cfg
)
2825 for cfg
in self
._regh
.elements
:
2826 if cfg
.id in scratch
.get('deleted_vnfds', []):
2827 yield from self
._nsm
.delete_vnfd(cfg
.id)
2829 scratch
.pop('vnfds', None)
2830 scratch
.pop('deleted_vnfds', None)
2833 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2834 """ on prepare callback """
2835 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2836 ks_path
.to_xpath(RwNsmYang
.get_schema()), xact_info
.query_action
, msg
)
2838 fref
= ProtobufC
.FieldReference
.alloc()
2839 fref
.goto_whole_message(msg
.to_pbcm())
2841 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2842 if fref
.is_field_deleted():
2843 self
._log
.debug("Adding msg to deleted field")
2844 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
2845 deleted_vnfds
.append(msg
.id)
2847 # Add this VNFD to scratch to create/update in apply callback
2848 vnfds
= scratch
.setdefault('vnfds', [])
2849 vnfds
.append(msg
.id)
2851 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2854 "Registering for VNFD config using xpath: %s",
2855 VnfdDtsHandler
.XPATH
,
2857 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2858 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2859 # Need a list in scratch to store VNFDs to create/update later
2860 # acg._scratch['vnfds'] = list()
2861 # acg._scratch['deleted_vnfds'] = list()
2862 self
._regh
= acg
.register(
2863 xpath
=VnfdDtsHandler
.XPATH
,
2864 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2865 on_prepare
=on_prepare
)
2867 class NsrRpcDtsHandler(object):
2868 """ The network service instantiation RPC DTS handler """
2869 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
2870 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
2871 NETCONF_IP_ADDRESS
= "127.0.0.1"
2873 NETCONF_USER
= "admin"
2874 NETCONF_PW
= "admin"
2876 def __init__(self
, dts
, log
, loop
, nsm
):
2883 self
._ns
_regh
= None
2885 self
._manager
= None
2887 self
._model
= RwYang
.Model
.create_libncx()
2888 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
2892 """ Return the NS manager instance """
2896 def wrap_netconf_config_xml(xml
):
2897 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
2901 def _connect(self
, timeout_secs
=240):
2903 start_time
= time
.time()
2904 while (time
.time() - start_time
) < timeout_secs
:
2907 self
._log
.debug("Attemping NsmTasklet netconf connection.")
2909 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
2911 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
2912 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
2913 username
=NsrRpcDtsHandler
.NETCONF_USER
,
2914 password
=NsrRpcDtsHandler
.NETCONF_PW
,
2916 look_for_keys
=False,
2917 hostkey_verify
=False,
2922 except ncclient
.transport
.errors
.SSHError
as e
:
2923 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
2924 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
2926 yield from asyncio
.sleep(5, loop
=self
._loop
)
2928 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2933 """ Register for NS monitoring read from dts """
2935 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
2936 """ prepare callback from dts start-network-service"""
2937 assert action
== rwdts
.QueryAction
.RPC
2939 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
2940 "nsr_id":str(uuid
.uuid4())
2943 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and 'cloud_account' in rpc_ip
):
2944 self
._log
.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip
))
2947 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
2950 # Add used value to the pool
2951 self
._log
.debug("RPC output: {}".format(rpc_op
))
2952 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
2954 if not self
._manager
:
2955 self
._manager
= yield from self
._connect
()
2957 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2958 rpc_ip
.name
, rpc_ip
.nsd_ref
)
2960 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
2961 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
2962 if k
in RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr().fields
}
2963 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
2965 ns_instance_config
= RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
2966 ns_instance_config
.nsd
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2967 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
2969 xml
= ns_instance_config
.to_xml_v2(self
._model
)
2970 netconf_xml
= self
.wrap_netconf_config_xml(xml
)
2972 self
._log
.debug("Sending configure ns-instance-config xml to %s: %s",
2973 netconf_xml
, NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
)
2975 response
= yield from self
._manager
.edit_config(
2979 self
._log
.debug("Received edit config response: %s", str(response
))
2981 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
2982 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
2984 except Exception as e
:
2985 self
._log
.error("Exception processing the "
2986 "start-network-service: {}".format(e
))
2987 self
._log
.exception(e
)
2988 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
2989 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
2992 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
2994 with self
._dts
.group_create() as group
:
2995 self
._ns
_regh
= group
.register(xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
2997 flags
=rwdts
.Flag
.PUBLISHER
,
3001 class NsrDtsHandler(object):
3002 """ The network service DTS handler """
3003 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3004 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3006 def __init__(self
, dts
, log
, loop
, nsm
):
3012 self
._nsr
_regh
= None
3013 self
._scale
_regh
= None
3017 """ Return the NS manager instance """
3022 """ Register for Nsr create/update/delete/read requests from dts """
3024 def nsr_id_from_keyspec(ks
):
3025 nsr_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3026 nsr_id
= nsr_path_entry
.key00
.id
3029 def group_name_from_keyspec(ks
):
3030 group_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3031 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3034 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3035 """ Return boolean indicating if scaling group instance was already commited previously.
3037 By looking at the existing elements in this registration handle (elements not part
3038 of this current xact), we can tell if the instance was configured previously without
3039 keeping any application state.
3041 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3042 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3043 elem_group_name
= group_name_from_keyspec(keyspec
)
3045 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3048 if instance_cfg
.id == instance_id
:
3053 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3054 delta
= {"added": [], "deleted": []}
3055 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3056 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3057 if elem_nsr_id
!= nsr_id
:
3060 elem_group_name
= group_name_from_keyspec(keyspec
)
3061 if elem_group_name
!= group_name
:
3064 delta
["added"].append(instance_cfg
.id)
3066 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3067 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3068 if elem_nsr_id
!= nsr_id
:
3071 elem_group_name
= group_name_from_keyspec(keyspec
)
3072 if elem_group_name
!= group_name
:
3075 if instance_cfg
.id in delta
["added"]:
3076 delta
["added"].remove(instance_cfg
.id)
3078 delta
["deleted"].append(instance_cfg
.id)
3083 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3086 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3087 delta
= {"added": [], "deleted": []}
3088 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3089 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3090 if elem_nsr_id
!= nsr_id
:
3093 if 'vld' in instance_cfg
.nsd
:
3094 for vld
in instance_cfg
.nsd
.vld
:
3095 delta
["added"].append(vld
)
3097 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3098 self
._log
.debug("NSR update: %s", instance_cfg
)
3099 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3100 if elem_nsr_id
!= nsr_id
:
3103 if 'vld' in instance_cfg
.nsd
:
3104 for vld
in instance_cfg
.nsd
.vld
:
3105 if vld
in delta
["added"]:
3106 delta
["added"].remove(vld
)
3108 delta
["deleted"].append(vld
)
3112 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3113 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3115 for vld
in vl_delta
["added"]:
3116 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3118 for vld
in vl_delta
["deleted"]:
3119 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3121 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
, scratch
):
3122 # Unfortunately, it is currently difficult to figure out what has exactly
3123 # changed in this xact without Pbdelta support (RIFT-4916)
3124 # As a workaround, we can fetch the pre and post xact elements and
3125 # perform a comparison to figure out adds/deletes/updates
3126 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
3127 curr_cfgs
= list(dts_member_reg
.elements
)
3129 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
3130 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
3133 added_keys
= set(xact_key_map
) - set(curr_key_map
)
3134 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
3137 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
3138 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
3141 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
3142 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
3143 if xact_key_map
[key
] != curr_key_map
[key
]]
3145 return added_cfgs
, deleted_cfgs
, updated_cfgs
3147 def on_apply(dts
, acg
, xact
, action
, scratch
):
3148 """Apply the configuration"""
3149 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3150 xact
, action
, scratch
)
3152 def handle_create_nsr(msg
, restart_mode
=False):
3153 # Handle create nsr requests """
3154 # Do some validations
3155 if not msg
.has_field("nsd"):
3156 err
= "NSD not provided"
3157 self
._log
.error(err
)
3158 raise NetworkServiceRecordError(err
)
3160 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3161 msg
.id, msg
.as_dict())
3162 nsr
= self
.nsm
.create_nsr(msg
, restart_mode
=restart_mode
)
3165 def handle_delete_nsr(msg
):
3167 def delete_instantiation(ns_id
):
3168 """ Delete instantiation """
3169 with self
._dts
.transaction() as xact
:
3170 yield from self
._nsm
.terminate_ns(ns_id
, xact
)
3172 # Handle delete NSR requests
3173 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3174 # Terminate the NSR instance
3175 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3177 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3178 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3179 nsr
.record_event("terminate-rcvd", event_descr
)
3181 self
._loop
.create_task(delete_instantiation(msg
.id))
3184 def begin_instantiation(nsr
):
3185 # Begin instantiation
3186 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3187 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3189 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3190 xact
, action
, scratch
)
3192 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3193 for element
in self
._nsr
_regh
.elements
:
3194 nsr
= handle_create_nsr(element
, restart_mode
=True)
3195 self
._loop
.create_task(begin_instantiation(nsr
))
3198 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3202 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3203 deleted_msgs
, updated_msgs
)
3205 for msg
in added_msgs
:
3206 if msg
.id not in self
._nsm
.nsrs
:
3207 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3208 nsr
= handle_create_nsr(msg
)
3209 self
._loop
.create_task(begin_instantiation(nsr
))
3211 for msg
in deleted_msgs
:
3212 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3214 handle_delete_nsr(msg
)
3216 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3218 for msg
in updated_msgs
:
3219 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3221 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3224 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3226 for group
in msg
.scaling_group
:
3227 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3228 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3230 for instance_id
in instance_delta
["added"]:
3231 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3233 for instance_id
in instance_delta
["deleted"]:
3234 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3237 return RwTypes
.RwStatus
.SUCCESS
3240 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3241 """ Prepare calllback from DTS for NSR """
3243 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3244 action
= xact_info
.query_action
3246 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3247 xact
, action
, xact_info
, xpath
, msg
3251 def delete_instantiation(ns_id
):
3252 """ Delete instantiation """
3253 yield from self
._nsm
.terminate_ns(ns_id
, None)
3255 def handle_delete_nsr():
3256 """ Handle delete NSR requests """
3257 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3258 # Terminate the NSR instance
3259 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3261 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3262 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3263 nsr
.record_event("terminate-rcvd", event_descr
)
3265 self
._loop
.create_task(delete_instantiation(msg
.id))
3267 fref
= ProtobufC
.FieldReference
.alloc()
3268 fref
.goto_whole_message(msg
.to_pbcm())
3270 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3271 # if this is an NSR create
3272 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3273 # Ensure the Cloud account/datacenter has been specified
3274 if not msg
.has_field("cloud_account") and not msg
.has_field("om_datacenter"):
3275 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3277 # Check if nsd is specified
3278 if not msg
.has_field("nsd"):
3279 raise NsrInstantiationFailed("NSD not specified in NSR")
3282 nsr
= self
._nsm
.nsrs
[msg
.id]
3284 if msg
.has_field("nsd"):
3285 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3286 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3287 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3288 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3290 if msg
.has_field("scaling_group"):
3291 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3292 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3294 if len(msg
.scaling_group
) > 1:
3295 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3297 for group_msg
in msg
.scaling_group
:
3298 num_new_group_instances
= len(group_msg
.instance
)
3299 if num_new_group_instances
> 1:
3300 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3302 elif num_new_group_instances
== 1:
3303 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
3304 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
3305 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
3306 raise ScalingOperationError("Max instances for %s reached" % scale_group
)
3308 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
3311 self
._log
.debug("Registering for NSR config using xpath: %s",
3312 NsrDtsHandler
.NSR_XPATH
)
3314 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3315 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3316 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
3317 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3318 on_prepare
=on_prepare
)
3320 self
._scale
_regh
= acg
.register(
3321 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
3322 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
3326 class NsrOpDataDtsHandler(object):
3327 """ The network service op data DTS handler """
3328 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
3330 def __init__(self
, dts
, log
, loop
, nsm
):
3339 """ Return the registration handle"""
3344 """ Return the NS manager instance """
3349 """ Register for Nsr op data publisher registration"""
3350 self
._log
.debug("Registering Nsr op data path %s as publisher",
3351 NsrOpDataDtsHandler
.XPATH
)
3353 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
3354 handlers
= rift
.tasklets
.Group
.Handler()
3355 with self
._dts
.group_create(handler
=handlers
) as group
:
3356 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
3358 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ | rwdts
.Flag
.DATASTORE
)
3361 def create(self
, path
, msg
):
3363 Create an NS record in DTS with the path and message
3365 self
._log
.debug("Creating NSR %s:%s", path
, msg
)
3366 self
.regh
.create_element(path
, msg
)
3367 self
._log
.debug("Created NSR, %s:%s", path
, msg
)
3370 def update(self
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
3372 Update an NS record in DTS with the path and message
3374 self
._log
.debug("Updating NSR, %s:%s regh = %s", path
, msg
, self
.regh
)
3375 self
.regh
.update_element(path
, msg
, flags
)
3376 self
._log
.debug("Updated NSR, %s:%s", path
, msg
)
3379 def delete(self
, path
):
3381 Update an NS record in DTS with the path and message
3383 self
._log
.debug("Deleting NSR path:%s", path
)
3384 self
.regh
.delete_element(path
)
3385 self
._log
.debug("Deleted NSR path:%s", path
)
3388 class VnfrDtsHandler(object):
3389 """ The virtual network service DTS handler """
3390 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3392 def __init__(self
, dts
, log
, loop
, nsm
):
3402 """ Return registration handle """
3407 """ Return the NS manager instance """
3412 """ Register for vnfr create/update/delete/ advises from dts """
3414 def on_commit(xact_info
):
3415 """ The transaction has been committed """
3416 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
3417 return rwdts
.MemberRspCode
.ACTION_OK
3420 def on_prepare(xact_info
, action
, ks_path
, msg
):
3421 """ prepare callback from dts """
3422 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3424 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3425 xact_info
, action
, ks_path
, msg
3428 schema
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
3429 path_entry
= schema
.keyspec_to_entry(ks_path
)
3430 if path_entry
.key00
.id not in self
._nsm
._vnfrs
:
3431 self
._log
.error("%s request for non existent record path %s",
3433 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
3437 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3438 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3439 yield from self
._nsm
.update_vnfr(msg
)
3440 elif action
== rwdts
.QueryAction
.DELETE
:
3441 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3442 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
3444 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
3446 self
._log
.debug("Registering for VNFR using xpath: %s",
3447 VnfrDtsHandler
.XPATH
,)
3449 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
3450 on_prepare
=on_prepare
,)
3451 with self
._dts
.group_create() as group
:
3452 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
3454 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
3457 class NsdRefCountDtsHandler(object):
3458 """ The NSD Ref Count DTS handler """
3459 XPATH
= "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3461 def __init__(self
, dts
, log
, loop
, nsm
):
3471 """ Return registration handle """
3476 """ Return the NS manager instance """
3481 """ Register for NSD ref count read from dts """
3484 def on_prepare(xact_info
, action
, ks_path
, msg
):
3485 """ prepare callback from dts """
3486 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3488 if action
== rwdts
.QueryAction
.READ
:
3489 schema
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount
.schema()
3490 path_entry
= schema
.keyspec_to_entry(ks_path
)
3491 nsd_list
= yield from self
._nsm
.get_nsd_refcount(path_entry
.key00
.nsd_id_ref
)
3492 for xpath
, msg
in nsd_list
:
3493 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
3496 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3498 raise NetworkServiceRecordError("Not supported operation %s" % action
)
3500 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
3501 with self
._dts
.group_create() as group
:
3502 self
._regh
= group
.register(xpath
=NsdRefCountDtsHandler
.XPATH
,
3504 flags
=rwdts
.Flag
.PUBLISHER
,)
3507 class NsManager(object):
3508 """ The Network Service Manager class"""
3509 def __init__(self
, dts
, log
, loop
,
3510 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
3511 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
3515 self
._nsr
_handler
= nsr_handler
3516 self
._vnfr
_pub
_handler
= vnfr_handler
3517 self
._vlr
_pub
_handler
= vlr_handler
3518 self
._vnffgmgr
= vnffgmgr
3519 self
._vnfd
_pub
_handler
= vnfd_pub_handler
3520 self
._cloud
_account
_handler
= cloud_account_handler
3522 self
._ro
_plugin
_selector
= ro_plugin_selector
3523 self
._ncclient
= rift
.mano
.ncclient
.NcClient(
3535 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
3537 # TODO: All these handlers should move to tasklet level.
3538 # Passing self is often an indication of bad design
3539 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
3540 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
3541 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
3542 VnfrDtsHandler(dts
, log
, loop
, self
),
3543 NsdRefCountDtsHandler(dts
, log
, loop
, self
),
3544 NsrDtsHandler(dts
, log
, loop
, self
),
3545 ScalingRpcHandler(log
, dts
, loop
, self
.scale_rpc_callback
),
3546 NsrRpcDtsHandler(dts
,log
,loop
,self
),
3547 self
._vnfd
_dts
_handler
,
3568 def nsr_handler(self
):
3569 """" NSR handler """
3570 return self
._nsr
_handler
3574 """" So Obj handler """
3579 """ NSRs in this NSM"""
3584 """ NSDs in this NSM"""
3589 """ VNFDs in this NSM"""
3594 """ VNFRs in this NSM"""
3598 def nsr_pub_handler(self
):
3599 """ NSR publication handler """
3600 return self
._nsr
_handler
3603 def vnfr_pub_handler(self
):
3604 """ VNFR publication handler """
3605 return self
._vnfr
_pub
_handler
3608 def vlr_pub_handler(self
):
3609 """ VLR publication handler """
3610 return self
._vlr
_pub
_handler
3613 def vnfd_pub_handler(self
):
3614 return self
._vnfd
_pub
_handler
3618 """ Register all static DTS handlers """
3619 for dts_handle
in self
._dts
_handlers
:
3620 yield from dts_handle
.register()
3623 def get_ns_by_nsr_id(self
, nsr_id
):
3624 """ get NSR by nsr id """
3625 if nsr_id
not in self
._nsrs
:
3626 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
3628 return self
._nsrs
[nsr_id
]
3630 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
3631 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3636 nsr
= self
._nsrs
[nsr_id
]
3637 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3638 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3640 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
3642 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
3643 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3648 nsr
= self
._nsrs
[nsr_id
]
3649 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3650 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3652 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
3654 def scale_rpc_callback(self
, xact
, msg
, action
):
3655 """Callback handler for RPC calls
3657 xact : Transaction Handler
3659 action : Scaling Action
3661 ScalingGroupInstance
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3662 ScalingGroup
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3664 xpath
= ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3666 instance
= ScalingGroupInstance
.from_dict({"id": msg
.instance_id
})
3669 def get_nsr_scaling_group():
3670 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3672 for result
in results
:
3673 res
= yield from result
3674 nsr_config
= res
.result
3676 for scaling_group
in nsr_config
.scaling_group
:
3677 if scaling_group
.scaling_group_name_ref
== msg
.scaling_group_name_ref
:
3680 scaling_group
= nsr_config
.scaling_group
.add()
3681 scaling_group
.scaling_group_name_ref
= msg
.scaling_group_name_ref
3683 return (nsr_config
, scaling_group
)
3686 def update_config(nsr_config
):
3687 xml
= self
._ncclient
.convert_to_xml(RwNsrYang
, nsr_config
)
3688 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
3689 yield from self
._ncclient
.connect()
3690 yield from self
._ncclient
.manager
.edit_config(target
="running", config
=xml
, default_operation
="replace")
3694 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3695 scaling_group
.instance
.append(instance
)
3696 yield from update_config(nsr_config
)
3700 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3701 scaling_group
.instance
.remove(instance
)
3702 yield from update_config(nsr_config
)
3704 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
3705 self
._loop
.create_task(scale_out())
3707 self
._loop
.create_task(scale_in())
3709 # Opdata based calls, disabled for now!
3710 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3711 # self.scale_nsr_out(
3713 # msg.scaling_group_name_ref,
3717 # self.scale_nsr_in(
3719 # msg.scaling_group_name_ref,
3722 def nsr_update_cfg(self
, nsr_id
, msg
):
3723 nsr
= self
._nsrs
[nsr_id
]
3724 nsr
.nsr_cfg_msg
= msg
3726 def nsr_instantiate_vl(self
, nsr_id
, vld
):
3727 self
.log
.debug("NSR {} create VL {}".format(nsr_id
, vld
))
3728 nsr
= self
._nsrs
[nsr_id
]
3729 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3730 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3732 # Not calling in a separate task as this is called from a separate task
3733 yield from nsr
.create_vl_instance(vld
)
3735 def nsr_terminate_vl(self
, nsr_id
, vld
):
3736 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
3737 nsr
= self
._nsrs
[nsr_id
]
3738 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3739 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3741 # Not calling in a separate task as this is called from a separate task
3742 yield from nsr
.delete_vl_instance(vld
)
3744 def create_nsr(self
, nsr_msg
, restart_mode
=False):
3745 """ Create an NSR instance """
3746 if nsr_msg
.id in self
._nsrs
:
3747 msg
= "NSR id %s already exists" % nsr_msg
.id
3748 self
._log
.error(msg
)
3749 raise NetworkServiceRecordError(msg
)
3751 self
._log
.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3755 nsm_plugin
= self
._ro
_plugin
_selector
.ro_plugin
3756 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.cloud_account
)
3758 nsr
= NetworkServiceRecord(self
._dts
,
3765 restart_mode
=restart_mode
3767 self
._nsrs
[nsr_msg
.id] = nsr
3768 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
)
3772 def delete_nsr(self
, nsr_id
):
3774 Delete NSR with the passed nsr id
3776 del self
._nsrs
[nsr_id
]
3779 def instantiate_ns(self
, nsr_id
, config_xact
):
3780 """ Instantiate an NS instance """
3781 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
3782 if nsr_id
not in self
._nsrs
:
3783 err
= "NSR id %s not found " % nsr_id
3784 self
._log
.error(err
)
3785 raise NetworkServiceRecordError(err
)
3787 nsr
= self
._nsrs
[nsr_id
]
3788 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
3791 def update_vnfr(self
, vnfr
):
3792 """Create/Update an VNFR """
3794 vnfr_state
= self
._vnfrs
[vnfr
.id].state
3795 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
3797 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
3798 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
3799 yield from nsr
.update_state()
3801 def find_nsr_for_vnfr(self
, vnfr_id
):
3802 """ Find the NSR which )has the passed vnfr id"""
3803 for nsr
in list(self
.nsrs
.values()):
3804 for vnfr
in list(nsr
.vnfrs
.values()):
3805 if vnfr
.id == vnfr_id
:
3809 def delete_vnfr(self
, vnfr_id
):
3810 """ Delete VNFR with the passed id"""
3811 del self
._vnfrs
[vnfr_id
]
3813 def get_nsd_ref(self
, nsd_id
):
3814 """ Get network service descriptor for the passed nsd_id
3816 nsd
= self
.get_nsd(nsd_id
)
3821 def get_nsr_config(self
, nsd_id
):
3822 xpath
= "C,/nsr:ns-instance-config"
3823 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3825 for result
in results
:
3826 entry
= yield from result
3827 ns_instance_config
= entry
.result
3829 for nsr
in ns_instance_config
.nsr
:
3830 if nsr
.nsd
.id == nsd_id
:
3836 def nsd_unref_by_nsr_id(self
, nsr_id
):
3837 """ Unref the network service descriptor based on NSR id """
3838 self
._log
.debug("NSR Unref called for Nsr Id:%s", nsr_id
)
3839 if nsr_id
in self
._nsrs
:
3840 nsr
= self
._nsrs
[nsr_id
]
3843 nsd
= self
.get_nsd(nsr
.nsd_id
)
3844 self
._log
.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3845 nsd
.id, nsr
.id, nsd
.ref_count
)
3847 except NetworkServiceDescriptorError
:
3848 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3853 self
._log
.error("Cannot find NSR with id %s", nsr_id
)
3854 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id
)
3857 def nsd_unref(self
, nsd_id
):
3858 """ Unref the network service descriptor associated with the id """
3859 nsd
= self
.get_nsd(nsd_id
)
3862 def get_nsd(self
, nsd_id
):
3863 """ Get network service descriptor for the passed nsd_id"""
3864 if nsd_id
not in self
._nsds
:
3865 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
3866 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
3868 return self
._nsds
[nsd_id
]
3870 def create_nsd(self
, nsd_msg
):
3871 """ Create a network service descriptor """
3872 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
3873 if nsd_msg
.id in self
._nsds
:
3874 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
3875 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
3877 nsd
= NetworkServiceDescriptor(
3884 self
._nsds
[nsd_msg
.id] = nsd
3888 def update_nsd(self
, nsd
):
3889 """ update the Network service descriptor """
3890 self
._log
.debug("Update network service descriptor - %s", nsd
)
3891 if nsd
.id not in self
._nsds
:
3892 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
3893 self
.create_nsd(nsd
)
3895 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
3896 self
._nsds
[nsd
.id].update(nsd
)
3898 def delete_nsd(self
, nsd_id
):
3899 """ Delete the Network service descriptor with the passed id """
3900 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
3901 if nsd_id
not in self
._nsds
:
3902 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
3903 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
3905 if nsd_id
not in self
._nsds
:
3906 self
._log
.debug("Cannot delete NSD id %s reference exists %s",
3908 self
._nsds
[nsd_id
].ref_count
)
3909 raise NetworkServiceDescriptorRefCountExists(
3910 "Cannot delete :%s, ref_count:%s",
3912 self
._nsds
[nsd_id
].ref_count
)
3914 del self
._nsds
[nsd_id
]
3916 def get_vnfd_config(self
, xact
):
3917 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
3918 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
3919 if cfg
.id not in self
._vnfds
:
3920 self
.create_vnfd(cfg
)
3922 def get_vnfd(self
, vnfd_id
, xact
):
3923 """ Get virtual network function descriptor for the passed vnfd_id"""
3924 if vnfd_id
not in self
._vnfds
:
3925 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3926 self
.get_vnfd_config(xact
)
3928 if vnfd_id
not in self
._vnfds
:
3929 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3930 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
3932 return self
._vnfds
[vnfd_id
]
3934 def create_vnfd(self
, vnfd
):
3935 """ Create a virtual network function descriptor """
3936 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
3937 if vnfd
.id in self
._vnfds
:
3938 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
3939 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
3941 self
._vnfds
[vnfd
.id] = vnfd
3942 return self
._vnfds
[vnfd
.id]
3944 def update_vnfd(self
, vnfd
):
3945 """ Update the virtual network function descriptor """
3946 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
3948 # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511
3949 for ivld
in vnfd
.internal_vld
:
3950 ivld
.internal_connection_point_ref
= list(set(ivld
.internal_connection_point_ref
))
3952 if vnfd
.id not in self
._vnfds
:
3953 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
3954 self
.create_vnfd(vnfd
)
3956 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
3957 self
._vnfds
[vnfd
.id] = vnfd
3960 def delete_vnfd(self
, vnfd_id
):
3961 """ Delete the virtual network function descriptor with the passed id """
3962 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
3963 if vnfd_id
not in self
._vnfds
:
3964 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
3965 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
3967 del self
._vnfds
[vnfd_id
]
3969 def nsd_in_use(self
, nsd_id
):
3970 """ Is the NSD with the passed id in use """
3971 self
._log
.debug("Is this NSD in use - msg:%s", nsd_id
)
3972 if nsd_id
in self
._nsds
:
3973 return self
._nsds
[nsd_id
].in_use()
3977 def publish_nsr(self
, xact
, path
, msg
):
3978 """ Publish a NSR """
3979 self
._log
.debug("Publish NSR with path %s, msg %s",
3981 yield from self
.nsr_handler
.update(xact
, path
, msg
)
3984 def unpublish_nsr(self
, xact
, path
):
3985 """ Un Publish an NSR """
3986 self
._log
.debug("Publishing delete NSR with path %s", path
)
3987 yield from self
.nsr_handler
.delete(path
, xact
)
3989 def vnfr_is_ready(self
, vnfr_id
):
3990 """ VNFR with the id is ready """
3991 self
._log
.debug("VNFR id %s ready", vnfr_id
)
3992 if vnfr_id
not in self
._vnfds
:
3993 err
= "Did not find VNFR ID with id %s" % vnfr_id
3994 self
._log
.critical("err")
3995 raise VirtualNetworkFunctionRecordError(err
)
3996 self
._vnfrs
[vnfr_id
].is_ready()
3999 def get_nsd_refcount(self
, nsd_id
):
4000 """ Get the nsd_list from this NSM"""
4002 def nsd_refcount_xpath(nsd_id
):
4003 """ xpath for ref count entry """
4004 return (NsdRefCountDtsHandler
.XPATH
+
4005 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id
)
4008 if nsd_id
is None or nsd_id
== "":
4009 for nsd
in self
._nsds
.values():
4010 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4011 nsd_msg
.nsd_id_ref
= nsd
.id
4012 nsd_msg
.instance_ref_count
= nsd
.ref_count
4013 nsd_list
.append((nsd_refcount_xpath(nsd
.id), nsd_msg
))
4014 elif nsd_id
in self
._nsds
:
4015 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4016 nsd_msg
.nsd_id_ref
= self
._nsds
[nsd_id
].id
4017 nsd_msg
.instance_ref_count
= self
._nsds
[nsd_id
].ref_count
4018 nsd_list
.append((nsd_refcount_xpath(nsd_id
), nsd_msg
))
4023 def terminate_ns(self
, nsr_id
, xact
):
4025 Terminate network service for the given NSR Id
4028 # Terminate the instances/networks assocaited with this nw service
4029 self
._log
.debug("Terminating the network service %s", nsr_id
)
4030 yield from self
._nsrs
[nsr_id
].terminate()
4033 yield from self
.nsd_unref_by_nsr_id(nsr_id
)
4035 # Unpublish the NSR record
4036 self
._log
.debug("Unpublishing the network service %s", nsr_id
)
4037 yield from self
._nsrs
[nsr_id
].unpublish(xact
)
4039 # Finaly delete the NS instance from this NS Manager
4040 self
._log
.debug("Deletng the network service %s", nsr_id
)
4041 self
.delete_nsr(nsr_id
)
4044 class NsmRecordsPublisherProxy(object):
4045 """ This class provides a publisher interface that allows plugin objects
4046 to publish NSR/VNFR/VLR"""
4048 def __init__(self
, dts
, log
, loop
, nsr_pub_hdlr
, vnfr_pub_hdlr
, vlr_pub_hdlr
):
4052 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4053 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4054 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4057 def publish_nsr(self
, xact
, nsr
):
4058 """ Publish an NSR """
4059 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4060 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4063 def unpublish_nsr(self
, xact
, nsr
):
4064 """ Unpublish an NSR """
4065 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4066 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4069 def publish_vnfr(self
, xact
, vnfr
):
4070 """ Publish an VNFR """
4071 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4072 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4075 def unpublish_vnfr(self
, xact
, vnfr
):
4076 """ Unpublish a VNFR """
4077 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4078 return (yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
))
4081 def publish_vlr(self
, xact
, vlr
):
4082 """ Publish a VLR """
4083 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4084 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4087 def unpublish_vlr(self
, xact
, vlr
):
4088 """ Unpublish a VLR """
4089 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4090 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4093 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4094 """ The Network service Monitor DTS handler """
4095 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4096 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4098 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4099 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4101 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4103 def __init__(self
, log
, dts
, loop
, callback
=None):
4104 super().__init
__(log
, dts
, loop
)
4105 self
.callback
= callback
4106 self
.last_instance_id
= defaultdict(int)
4112 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4113 assert action
== rwdts
.QueryAction
.RPC
4117 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4119 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4120 "instance_id": msg
.instance_id
})
4122 xact_info
.respond_xpath(
4123 rwdts
.XactRspCode
.ACK
,
4124 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4127 except Exception as e
:
4128 self
.log
.exception(e
)
4129 xact_info
.respond_xpath(
4130 rwdts
.XactRspCode
.NACK
,
4131 self
.__class
__.SCALE_IN_OUTPUT_XPATH
)
4134 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4135 assert action
== rwdts
.QueryAction
.RPC
4138 scaling_group
= msg
.scaling_group_name_ref
4139 if not msg
.instance_id
:
4140 last_instance_id
= self
.last_instance_id
[scale_group
]
4141 msg
.instance_id
= last_instance_id
+ 1
4142 self
.last_instance_id
[scale_group
] += 1
4145 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4147 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4148 "instance_id": msg
.instance_id
})
4150 xact_info
.respond_xpath(
4151 rwdts
.XactRspCode
.ACK
,
4152 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4155 except Exception as e
:
4156 self
.log
.exception(e
)
4157 xact_info
.respond_xpath(
4158 rwdts
.XactRspCode
.NACK
,
4159 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
)
4161 scale_in_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4162 on_prepare
=on_scale_in_prepare
)
4163 scale_out_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4164 on_prepare
=on_scale_out_prepare
)
4166 with self
.dts
.group_create() as group
:
4168 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4169 handler
=scale_in_hdl
,
4170 flags
=rwdts
.Flag
.PUBLISHER
)
4172 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4173 handler
=scale_out_hdl
,
4174 flags
=rwdts
.Flag
.PUBLISHER
)
4177 class NsmTasklet(rift
.tasklets
.Tasklet
):
4179 The network service manager tasklet
4181 def __init__(self
, *args
, **kwargs
):
4182 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
4183 self
.rwlog
.set_category("rw-mano-log")
4184 self
.rwlog
.set_subcategory("nsm")
4189 self
._ro
_plugin
_selector
= None
4190 self
._vnffgmgr
= None
4192 self
._nsr
_handler
= None
4193 self
._vnfr
_pub
_handler
= None
4194 self
._vlr
_pub
_handler
= None
4195 self
._vnfd
_pub
_handler
= None
4196 self
._scale
_cfg
_handler
= None
4198 self
._records
_publisher
_proxy
= None
4201 """ The task start callback """
4202 super(NsmTasklet
, self
).start()
4203 self
.log
.info("Starting NsmTasklet")
4205 self
.log
.debug("Registering with dts")
4206 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
4207 RwNsmYang
.get_schema(),
4209 self
.on_dts_state_change
)
4211 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
4217 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
4220 def on_instance_started(self
):
4221 """ Task instance started callback """
4222 self
.log
.debug("Got instance started callback")
4226 """ Task init callback """
4227 self
.log
.debug("Got instance started callback")
4229 self
.log
.debug("creating config account handler")
4231 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(self
._dts
, self
.log
, self
.loop
)
4232 yield from self
._nsr
_pub
_handler
.register()
4234 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4235 yield from self
._vnfr
_pub
_handler
.register()
4237 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4238 yield from self
._vlr
_pub
_handler
.register()
4240 manifest
= self
.tasklet_info
.get_pb_manifest()
4241 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4242 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4243 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4245 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(use_ssl
, ssl_cert
, ssl_key
, self
.loop
)
4247 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4251 self
._nsr
_pub
_handler
,
4252 self
._vnfr
_pub
_handler
,
4253 self
._vlr
_pub
_handler
,
4256 # Register the NSM to receive the nsm plugin
4257 # when cloud account is configured
4258 self
._ro
_plugin
_selector
= cloud
.ROAccountPluginSelector(
4262 self
._records
_publisher
_proxy
,
4264 yield from self
._ro
_plugin
_selector
.register()
4266 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4271 yield from self
._cloud
_account
_handler
.register()
4273 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
)
4274 yield from self
._vnffgmgr
.register()
4276 self
._nsm
= NsManager(
4280 self
._nsr
_pub
_handler
,
4281 self
._vnfr
_pub
_handler
,
4282 self
._vlr
_pub
_handler
,
4283 self
._ro
_plugin
_selector
,
4285 self
._vnfd
_pub
_handler
,
4286 self
._cloud
_account
_handler
4289 yield from self
._nsm
.register()
4293 """ Task run callback """
4297 def on_dts_state_change(self
, state
):
4298 """Take action according to current dts state to transition
4299 application into the corresponding application state
4302 state - current dts state
4305 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
4306 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
4310 rwdts
.State
.INIT
: self
.init
,
4311 rwdts
.State
.RUN
: self
.run
,
4314 # Transition application to next state
4315 handler
= handlers
.get(state
, None)
4316 if handler
is not None:
4317 yield from handler()
4319 # Transition dts to next state
4320 next_state
= switch
.get(state
, None)
4321 if next_state
is not None:
4322 self
.log
.debug("Changing state to %s", next_state
)
4323 self
._dts
.handle
.set_state(next_state
)