2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
20 import ncclient
.asyncio_manager
32 from collections
import deque
33 from collections
import defaultdict
37 gi
.require_version('RwYang', '1.0')
38 gi
.require_version('RwNsdYang', '1.0')
39 gi
.require_version('RwDts', '1.0')
40 gi
.require_version('RwNsmYang', '1.0')
41 gi
.require_version('RwNsrYang', '1.0')
42 gi
.require_version('RwTypes', '1.0')
43 gi
.require_version('RwVlrYang', '1.0')
44 gi
.require_version('RwVnfrYang', '1.0')
45 from gi
.repository
import (
61 import rift
.mano
.ncclient
62 import rift
.mano
.config_data
.config
63 import rift
.mano
.dts
as mano_dts
65 from . import rwnsm_conman
as conman
67 from . import publisher
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
74 class NetworkServiceRecordState(Enum
):
75 """ Network Service Record State """
79 VNFFG_INIT_PHASE
= 104
85 VL_TERMINATE_PHASE
= 111
86 VNF_TERMINATE_PHASE
= 112
87 VNFFG_TERMINATE_PHASE
= 113
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
114 class NetworkServiceDescriptorNotFound(Exception):
115 """ Network Service Descriptor reference count exists """
118 class NsrInstantiationFailed(Exception):
119 """ Failed to instantiate network service """
123 class VnfInstantiationFailed(Exception):
124 """ Failed to instantiate virtual network function"""
128 class VnffgInstantiationFailed(Exception):
129 """ Failed to instantiate virtual network function"""
133 class VnfDescriptorError(Exception):
134 """Failed to instantiate virtual network function"""
138 class ScalingOperationError(Exception):
142 class ScaleGroupMissingError(Exception):
146 class PlacementGroupError(Exception):
150 class NsrNsdUpdateError(Exception):
154 class NsrVlUpdateError(NsrNsdUpdateError
):
158 class VlRecordState(Enum
):
159 """ VL Record State """
161 INSTANTIATION_PENDING
= 102
163 TERMINATE_PENDING
= 104
168 class VnffgRecordState(Enum
):
169 """ VNFFG Record State """
171 INSTANTIATION_PENDING
= 102
173 TERMINATE_PENDING
= 104
178 class VnffgRecord(object):
179 """ Vnffg Records class"""
182 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
):
187 self
._vnffgmgr
= vnffgmgr
189 self
._nsr
_name
= nsr_name
190 self
._vnffgd
_msg
= vnffgd_msg
191 if sdn_account_name
is None:
192 self
._sdn
_account
_name
= ''
194 self
._sdn
_account
_name
= sdn_account_name
196 self
._vnffgr
_id
= str(uuid
.uuid4())
197 self
._vnffgr
_rsp
_id
= list()
198 self
._vnffgr
_state
= VnffgRecordState
.INIT
203 return self
._vnffgr
_id
207 """ state of this VNF """
208 return self
._vnffgr
_state
210 def fetch_vnffgr(self
):
212 Get VNFFGR message to be published
215 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
216 vnffgr_dict
= {"id": self
._vnffgr
_id
,
217 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
218 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
219 "sdn_account": self
._sdn
_account
_name
,
220 "operational_status": 'init',
222 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
223 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
224 vnffgr_dict
= {"id": self
._vnffgr
_id
,
225 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
226 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
227 "sdn_account": self
._sdn
_account
_name
,
228 "operational_status": 'terminated',
230 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
233 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
235 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
236 self
._vnffgr
_state
= VnffgRecordState
.FAILED
237 vnffgr_dict
= {"id": self
._vnffgr
_id
,
238 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
239 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
240 "sdn_account": self
._sdn
_account
_name
,
241 "operational_status": 'failed',
243 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
248 def vnffgr_create_msg(self
):
249 """ Virtual Link Record message for Creating VLR in VNS """
250 vnffgr_dict
= {"id": self
._vnffgr
_id
,
251 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
252 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
253 "sdn_account": self
._sdn
_account
_name
,
255 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
256 for rsp
in self
._vnffgd
_msg
.rsp
:
257 vnffgr_rsp
= vnffgr
.rsp
.add()
258 vnffgr_rsp
.id = str(uuid
.uuid4())
259 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
260 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
261 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
262 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
263 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
264 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
265 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
266 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
267 self
._log
.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
269 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref
.vnfd_id_ref
)
272 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
273 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
274 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
275 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
276 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
277 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
278 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
279 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
280 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
281 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
282 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
284 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
285 self
._log
.debug(" Received VNFR is %s", vnfr
)
286 while vnfr
.operational_status
!= 'running':
287 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
288 if vnfr
.operational_status
== 'failed':
289 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
290 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
291 yield from asyncio
.sleep(2, loop
=self
._loop
)
292 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
293 self
._log
.debug("Received VNFR is %s", vnfr
)
295 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
296 for cp
in vnfr
.connection_point
:
297 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
298 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
299 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
300 for vdu
in vnfr
.vdur
:
301 for ext_intf
in vdu
.external_interface
:
302 if ext_intf
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
303 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
304 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
305 vnfr_cp_ref
.connection_point_params
.vm_id
)
308 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
309 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
311 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
312 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
314 rsp_id_ref
= _rsp
[0].id
315 rsp_name
= _rsp
[0].name
317 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
319 vnffgr_classifier
= vnffgr
.classifier
.add()
320 vnffgr_classifier
.id = vnffgd_classifier
.id
321 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
322 _rsp
[0].classifier_name
= vnffgr_classifier
.name
323 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
324 vnffgr_classifier
.rsp_name
= rsp_name
325 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
326 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
327 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
328 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
329 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
330 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
332 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
333 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
335 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
336 self
._log
.debug(" Received VNFR is %s", vnfr
)
337 while vnfr
.operational_status
!= 'running':
338 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
339 if vnfr
.operational_status
== 'failed':
340 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
341 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
342 yield from asyncio
.sleep(2, loop
=self
._loop
)
343 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
344 self
._log
.debug("Received VNFR is %s", vnfr
)
346 for cp
in vnfr
.connection_point
:
347 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
348 vnffgr_classifier
.port_id
= cp
.connection_point_id
349 vnffgr_classifier
.ip_address
= cp
.ip_address
350 for vdu
in vnfr
.vdur
:
351 for ext_intf
in vdu
.external_interface
:
352 if ext_intf
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
353 vnffgr_classifier
.vm_id
= vdu
.vim_id
354 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
355 vnfr_cp_ref
.connection_point_params
.vm_id
)
358 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
362 def vnffgr_nsr_sff_list(self
):
363 """ SFF List for VNFR """
365 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
367 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
368 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
369 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
370 self
._log
.debug(" Received VNFR is %s", vnfr
)
371 while vnfr
.operational_status
!= 'running':
372 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
373 if vnfr
.operational_status
== 'failed':
374 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
375 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
376 yield from asyncio
.sleep(2, loop
=self
._loop
)
377 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
378 self
._log
.debug("Received VNFR is %s", vnfr
)
380 sff
= RwsdnalYang
.VNFFGSff()
381 sff_list
[nsr_vnfr
.vnfd
.id] = sff
382 sff
.name
= nsr_vnfr
.name
383 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
385 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
386 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
387 for cp
in vnfr
.connection_point
:
388 sff_dp
= sff
.dp_endpoints
.add()
389 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
390 sff_dp
.address
= cp
.ip_address
391 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
392 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
393 for sf_name
in sf_list
:
394 _sf
= sff
.vnfr_list
.add()
395 _sf
.vnfr_name
= sf_name
400 def instantiate(self
):
401 """ Instantiate this VNFFG """
403 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
407 vnffgr_request
= yield from self
.vnffgr_create_msg()
408 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
411 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
412 except Exception as e
:
413 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
414 self
._vnffgr
_state
= VnffgRecordState
.FAILED
415 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
417 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
419 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
420 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
422 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
423 yield from self
._nsr
.update_state()
425 def vnffgr_in_vnffgrm(self
):
426 """ Is there a VNFR record in VNFM """
427 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
428 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
429 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
436 """ Terminate this VNFFGR """
437 if not self
.vnffgr_in_vnffgrm():
438 self
._log
.error("Ignoring terminate request for id %s in state %s",
439 self
.id, self
._vnffgr
_state
)
442 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
443 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
445 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
447 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
448 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
451 class VirtualLinkRecord(object):
452 """ Virtual Link Records class"""
453 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
456 def create_record(dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
, restart_mode
=False):
457 """Creates a new VLR object based on the given data.
459 If restart mode is enabled, then we look for existing records in the
460 DTS and create a VLR records using the exiting data(ID)
465 vlr_obj
= VirtualLinkRecord(
478 res_iter
= yield from dts
.query_read(
479 "D,/vlr:vlr-catalog/vlr:vlr",
480 rwdts
.XactFlag
.MERGE
)
483 response
= yield from fut
484 vlr
= response
.result
486 # Check if the record is already present, if so use the ID of
487 # the existing record. Since the name of the record is uniquely
488 # formed we can use it as a search key!
489 if vlr
.name
== vlr_obj
.name
:
490 vlr_obj
.reset_id(vlr
.id)
495 def __init__(self
, dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
):
499 self
._nsr
_name
= nsr_name
500 self
._vld
_msg
= vld_msg
501 self
._cloud
_account
_name
= cloud_account_name
502 self
._om
_datacenter
_name
= om_datacenter
503 self
._assigned
_subnet
= None
504 self
._nsr
_id
= nsr_id
505 self
._ip
_profile
= ip_profile
506 self
._vlr
_id
= str(uuid
.uuid4())
507 self
._state
= VlRecordState
.INIT
508 self
._prev
_state
= None
509 self
._create
_time
= int(time
.time())
513 """ path for this object """
514 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
._vlr
_id
)
523 """ Get NSR name for this VL """
528 """ Virtual Link Desciptor """
532 def assigned_subnet(self
):
533 """ Subnet assigned to this VL"""
534 return self
._assigned
_subnet
539 Get the name for this VLR.
540 VLR name is "nsr name:VLD name"
542 if self
.vld_msg
.vim_network_name
:
543 return self
.vld_msg
.vim_network_name
544 elif self
.vld_msg
.name
== "multisite":
545 # This is a temporary hack to identify manually provisioned inter-site network
546 return self
.vld_msg
.name
548 return self
._nsr
_name
+ "." + self
.vld_msg
.name
551 def cloud_account_name(self
):
552 """ Cloud account that this VLR should be created in """
553 return self
._cloud
_account
_name
556 def om_datacenter_name(self
):
557 """ Datacenter that this VLR should be created in """
558 return self
._om
_datacenter
_name
562 """ Get the VLR path from VLR """
563 return (VirtualLinkRecord
.XPATH
+ "[vlr:id = '{}']").format(vlr
.id)
571 def state(self
, value
):
572 """ VLR set state """
576 def prev_state(self
):
577 """ VLR previous state """
578 return self
._prev
_state
581 def prev_state(self
, value
):
582 """ VLR set previous state """
583 self
._prev
_state
= value
587 """ Virtual Link Record message for Creating VLR in VNS """
588 vld_fields
= ["short_name",
596 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
599 vlr_dict
= {"id": self
._vlr
_id
,
600 "nsr_id_ref": self
._nsr
_id
,
601 "vld_ref": self
.vld_msg
.id,
603 "create_time": self
._create
_time
,
604 "cloud_account": self
.cloud_account_name
,
605 "om_datacenter": self
.om_datacenter_name
,
608 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
609 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
611 vlr_dict
.update(vld_copy_dict
)
612 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
615 def reset_id(self
, vlr_id
):
616 self
._vlr
_id
= vlr_id
618 def create_nsr_vlr_msg(self
, vnfrs
):
619 """ The VLR message"""
620 nsr_vlr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
621 nsr_vlr
.vlr_ref
= self
._vlr
_id
622 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
623 nsr_vlr
.cloud_account
= self
.cloud_account_name
624 nsr_vlr
.om_datacenter
= self
.om_datacenter_name
626 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
628 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
629 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
630 self
.cloud_account_name
== vnfr
.cloud_account_name
and
631 self
.om_datacenter_name
== vnfr
.om_datacenter_name
):
632 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
633 cp_entry
.vnfr_id
= vnfr
.id
634 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
639 def instantiate(self
):
640 """ Instantiate this VL """
641 self
._log
.debug("Instaniating VLR key %s, vld %s",
642 self
.xpath
, self
._vld
_msg
)
644 self
._state
= VlRecordState
.INSTANTIATION_PENDING
645 self
._log
.debug("Executing VL create path:%s msg:%s",
646 self
.xpath
, self
.vlr_msg
)
648 with self
._dts
.transaction(flags
=0) as xact
:
649 block
= xact
.block_create()
650 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
651 self
._log
.debug("Executing VL create path:%s msg:%s",
652 self
.xpath
, self
.vlr_msg
)
653 res_iter
= yield from block
.execute(now
=True)
659 self
._state
= VlRecordState
.FAILED
660 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
662 if vlr
.operational_status
== 'failed':
663 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
664 self
._state
= VlRecordState
.FAILED
665 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
667 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
669 self
._state
= VlRecordState
.ACTIVE
670 self
._assigned
_subnet
= vlr
.assigned_subnet
672 def vlr_in_vns(self
):
673 """ Is there a VLR record in VNS """
674 if (self
._state
== VlRecordState
.ACTIVE
or
675 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
676 self
._state
== VlRecordState
.TERMINATE_PENDING
or
677 self
._state
== VlRecordState
.FAILED
):
684 """ Terminate this VL """
685 if not self
.vlr_in_vns():
686 self
._log
.debug("Ignoring terminate request for id %s in state %s",
687 self
.id, self
._state
)
690 self
._log
.debug("Terminating VL id:%s", self
.id)
691 self
._state
= VlRecordState
.TERMINATE_PENDING
693 with self
._dts
.transaction(flags
=0) as xact
:
694 block
= xact
.block_create()
695 block
.add_query_delete(self
.xpath
)
696 yield from block
.execute(flags
=0, now
=True)
698 self
._state
= VlRecordState
.TERMINATED
699 self
._log
.debug("Terminated VL id:%s", self
.id)
702 class VnfRecordState(Enum
):
703 """ Vnf Record State """
705 INSTANTIATION_PENDING
= 102
707 TERMINATE_PENDING
= 104
712 class VirtualNetworkFunctionRecord(object):
713 """ Virtual Network Function Record class"""
714 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
718 def create_record(dts
, log
, loop
, vnfd
, const_vnfd_msg
, nsd_id
, nsr_name
,
719 cloud_account_name
, om_datacenter_name
, nsr_id
, group_name
, group_instance_id
,
720 placement_groups
, restart_mode
=False):
721 """Creates a new VNFR object based on the given data.
723 If restart mode is enabled, then we look for existing records in the
724 DTS and create a VNFR records using the exiting data(ID)
727 VirtualNetworkFunctionRecord
729 vnfr_obj
= VirtualNetworkFunctionRecord(
743 restart_mode
=restart_mode
)
746 res_iter
= yield from dts
.query_read(
747 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
748 rwdts
.XactFlag
.MERGE
)
751 response
= yield from fut
752 vnfr
= response
.result
754 if vnfr
.name
== vnfr_obj
.name
:
755 vnfr_obj
.reset_id(vnfr
.id)
772 group_instance_id
=None,
773 placement_groups
= [],
774 restart_mode
= False):
779 self
._const
_vnfd
_msg
= const_vnfd_msg
780 self
._nsd
_id
= nsd_id
781 self
._nsr
_name
= nsr_name
782 self
._nsr
_id
= nsr_id
783 self
._cloud
_account
_name
= cloud_account_name
784 self
._om
_datacenter
_name
= om_datacenter_name
785 self
._group
_name
= group_name
786 self
._group
_instance
_id
= group_instance_id
787 self
._placement
_groups
= placement_groups
788 self
._config
_status
= NsrYang
.ConfigStates
.INIT
789 self
._create
_time
= int(time
.time())
791 self
._prev
_state
= VnfRecordState
.INIT
792 self
._state
= VnfRecordState
.INIT
793 self
._state
_failed
_reason
= None
795 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
798 self
._vnfr
_id
= str(uuid
.uuid4())
800 self
._vnfr
_msg
= self
.create_vnfr_msg()
801 self
._log
.debug("Set VNFR {} config type to {}".
802 format(self
.name
, self
.config_type
))
803 self
.restart_mode
= restart_mode
806 if group_name
is None and group_instance_id
is not None:
807 raise ValueError("Group instance id must not be provided with an empty group name")
817 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self
.id)
822 return self
._vnfr
_msg
825 def const_vnfr_msg(self
):
827 return RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id
=self
.id,cloud_account
=self
.cloud_account_name
,om_datacenter
=self
._om
_datacenter
_name
)
835 def cloud_account_name(self
):
836 """ Cloud account that this VNF should be created in """
837 return self
._cloud
_account
_name
840 def om_datacenter_name(self
):
841 """ Datacenter that this VNF should be created in """
842 return self
._om
_datacenter
_name
847 """ Is this VNF actve """
848 return True if self
._state
== VnfRecordState
.ACTIVE
else False
852 """ state of this VNF """
856 def state_failed_reason(self
):
857 """ Error message in case this VNF is in failed state """
858 return self
._state
_failed
_reason
861 def member_vnf_index(self
):
862 """ Member VNF index """
863 return self
._const
_vnfd
_msg
.member_vnf_index
868 return self
._nsr
_name
872 """ Name of this VNFR """
873 if self
._name
is not None:
876 name_tags
= [self
._nsr
_name
]
878 if self
._group
_name
is not None:
879 name_tags
.append(self
._group
_name
)
881 if self
._group
_instance
_id
is not None:
882 name_tags
.append(str(self
._group
_instance
_id
))
884 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
886 self
._name
= "__".join(name_tags
)
891 def vnfr_xpath(vnfr
):
892 """ Get the VNFR path from VNFR """
893 return (VirtualNetworkFunctionRecord
.XPATH
+ "[vnfr:id = '{}']").format(vnfr
.id)
896 def config_type(self
):
897 cfg_types
= ['netconf', 'juju', 'script']
898 for method
in cfg_types
:
899 if self
._vnfd
.vnf_configuration
.has_field(method
):
904 def config_status(self
):
905 """Return the config status as YANG ENUM string"""
906 self
._log
.debug("Map VNFR {} config status {} ({})".
907 format(self
.name
, self
._config
_status
, self
.config_type
))
908 if self
.config_type
== 'none':
909 return 'config_not_needed'
910 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
912 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
917 def set_state(self
, state
):
918 """ set the state of this object """
919 self
._prev
_state
= self
._state
922 def reset_id(self
, vnfr_id
):
923 self
._vnfr
_id
= vnfr_id
924 self
._vnfr
_msg
= self
.create_vnfr_msg()
927 self
.config_store
.merge_vnfd_config(
930 self
.member_vnf_index
,
933 def create_vnfr_msg(self
):
934 """ VNFR message for this VNFR """
942 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
945 "nsr_id_ref": self
._nsr
_id
,
947 "cloud_account": self
._cloud
_account
_name
,
948 "om_datacenter": self
._om
_datacenter
_name
,
949 "config_status": self
.config_status
951 vnfr_dict
.update(vnfd_copy_dict
)
953 vnfr
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
954 vnfr
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict(),
955 ignore_missing_keys
=True)
956 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
957 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
959 if self
._vnfd
.mgmt_interface
.has_field("port"):
960 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
962 for group_info
in self
._placement
_groups
:
963 group
= vnfr
.placement_groups_info
.add()
964 group
.from_dict(group_info
.as_dict())
966 # UI expects the monitoring param field to exist
967 vnfr
.monitoring_param
= []
969 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
973 def update_vnfm(self
):
974 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
975 format(self
.name
, self
.vnfr_msg
))
976 yield from self
._dts
.query_update(
978 rwdts
.XactFlag
.TRACE
,
982 def get_config_status(self
):
983 """Return the config status as YANG ENUM"""
984 return self
._config
_status
987 def set_config_status(self
, status
):
989 def status_to_string(status
):
991 NsrYang
.ConfigStates
.INIT
: 'init',
992 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
993 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
994 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
995 NsrYang
.ConfigStates
.FAILED
: 'failed',
998 return status_dc
[status
]
1000 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
1001 format(self
.name
, self
._config
_status
,
1002 self
.config_type
, status
))
1003 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1004 self
._log
.error("Updating already configured VNFR {}".
1008 if self
._config
_status
!= status
:
1010 self
._config
_status
= status
1011 # I don't think this is used. Original implementor can check.
1012 # Caused Exception, so corrected it by status_to_string
1013 # But not sure whats the use of this variable?
1014 self
.vnfr_msg
.config_status
= status_to_string(status
)
1015 except Exception as e
:
1016 self
._log
.error("Exception=%s", str(e
))
1019 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1021 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1023 # Publish only after VNFM has the VNFR created
1024 yield from self
.update_vnfm()
1025 except Exception as e
:
1026 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1027 format(status
, self
.name
, e
))
1028 self
._log
.exception(e
)
1030 def is_configured(self
):
1031 if self
.config_type
== 'none':
1034 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1040 def instantiate(self
, nsr
):
1041 """ Instantiate this VNFR"""
1043 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1044 self
.xpath
, self
._vnfd
)
1046 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1047 self
.xpath
, self
.vnfr_msg
)
1049 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1051 def find_vlr_for_cp(conn
):
1052 """ Find VLR for the given connection point """
1053 for vlr
in nsr
.vlrs
:
1054 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1055 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1056 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1057 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1058 vlr
.cloud_account_name
== self
.cloud_account_name
):
1059 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1060 conn
.name
, self
.member_vnf_index
)
1064 # For every connection point in the VNFD fill in the identifier
1065 for conn_p
in self
._vnfd
.connection_point
:
1066 cpr
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1067 cpr
.name
= conn_p
.name
1068 cpr
.type_yang
= conn_p
.type_yang
1069 if conn_p
.has_field('port_security_enabled'):
1070 cpr
.port_security_enabled
= conn_p
.port_security_enabled
1072 vlr_ref
= find_vlr_for_cp(conn_p
)
1074 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1075 self
._log
.debug("%s", msg
)
1076 # raise VirtualNetworkFunctionRecordError(msg)
1079 cpr
.vlr_ref
= vlr_ref
.id
1080 self
.vnfr_msg
.connection_point
.append(cpr
)
1081 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1082 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd
.id)
1084 if not self
.restart_mode
:
1085 yield from self
._dts
.query_create(self
.xpath
,
1089 yield from self
._dts
.query_update(self
.xpath
,
1093 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1094 self
.xpath
, self
.vnfr_msg
)
1096 self
._log
.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1097 self
.xpath
, self
._vnfd
, self
.vnfr_msg
)
1100 def update_state(self
, vnfr_msg
):
1101 """ Update this VNFR"""
1102 if vnfr_msg
.operational_status
== "running":
1103 if self
.vnfr_msg
.operational_status
!= "running":
1104 yield from self
.is_active()
1105 elif vnfr_msg
.operational_status
== "failed":
1106 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1109 def is_active(self
):
1110 """ This VNFR is active """
1111 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1112 self
.set_state(VnfRecordState
.ACTIVE
)
1115 def instantiation_failed(self
, failed_reason
=None):
1116 """ This VNFR instantiation failed"""
1117 self
._log
.error("VNFR %s instantiation failed", self
._vnfr
_id
)
1118 self
.set_state(VnfRecordState
.FAILED
)
1119 self
._state
_failed
_reason
= failed_reason
1121 def vnfr_in_vnfm(self
):
1122 """ Is there a VNFR record in VNFM """
1123 if (self
._state
== VnfRecordState
.ACTIVE
or
1124 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1125 self
._state
== VnfRecordState
.FAILED
):
1131 def terminate(self
):
1132 """ Terminate this VNF """
1133 if not self
.vnfr_in_vnfm():
1134 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1135 self
.id, self
._state
)
1138 self
._log
.debug("Terminating VNF id:%s", self
.id)
1139 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1140 with self
._dts
.transaction(flags
=0) as xact
:
1141 block
= xact
.block_create()
1142 block
.add_query_delete(self
.xpath
)
1143 yield from block
.execute(flags
=0)
1144 self
.set_state(VnfRecordState
.TERMINATED
)
1145 self
._log
.debug("Terminated VNF id:%s", self
.id)
1148 class NetworkServiceStatus(object):
1149 """ A class representing the Network service's status """
1150 MAX_EVENTS_RECORDED
= 10
1151 """ Network service Status class"""
1152 def __init__(self
, dts
, log
, loop
):
1157 self
._state
= NetworkServiceRecordState
.INIT
1158 self
._events
= deque([])
1161 def create_notification(self
, evt
, evt_desc
, evt_details
):
1162 xp
= "N,/rw-nsr:nsm-notification"
1163 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1165 notif
.description
= evt_desc
1166 notif
.details
= evt_details
if evt_details
is not None else None
1168 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1169 self
._log
.info("Notification called by creating dts query: %s", notif
)
1171 def record_event(self
, evt
, evt_desc
, evt_details
):
1172 """ Record an event """
1173 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1174 evt
, evt_desc
, len(self
._events
))
1175 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1176 self
._events
.popleft()
1177 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1178 evt_details
if evt_details
is not None else None))
1180 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1182 def set_state(self
, state
):
1183 """ set the state of this status object """
1187 """ Return the state as a yang enum string """
1188 state_to_str_map
= {"INIT": "init",
1189 "VL_INIT_PHASE": "vl_init_phase",
1190 "VNF_INIT_PHASE": "vnf_init_phase",
1191 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1192 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1193 "RUNNING": "running",
1194 "SCALING_OUT": "scaling_out",
1195 "SCALING_IN": "scaling_in",
1196 "TERMINATE_RCVD": "terminate_rcvd",
1197 "TERMINATE": "terminate",
1198 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1199 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1200 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1201 "TERMINATED": "terminated",
1203 "VL_INSTANTIATE": "vl_instantiate",
1204 "VL_TERMINATE": "vl_terminate",
1206 return state_to_str_map
[self
._state
.name
]
1210 """ State of this status object """
1215 """ Network Service Record as a message"""
1218 for entry
in self
._events
:
1219 event
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1222 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1223 event_list
.append(event
)
1227 class NetworkServiceRecord(object):
1228 """ Network service record """
1229 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1231 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
, sdn_account_name
, key_pairs
, restart_mode
=False,
1237 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1238 self
._nsm
_plugin
= nsm_plugin
1239 self
._sdn
_account
_name
= sdn_account_name
1240 self
._vlr
_handler
= vlr_handler
1243 self
._nsr
_msg
= None
1244 self
._nsr
_regh
= None
1245 self
._key
_pairs
= key_pairs
1250 self
._param
_pools
= {}
1251 self
._scaling
_groups
= {}
1252 self
._create
_time
= int(time
.time())
1253 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1254 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1255 self
._config
_status
_details
= None
1257 self
.restart_mode
= restart_mode
1258 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1259 self
._debug
_running
= False
1260 self
._is
_active
= False
1261 self
._vl
_phase
_completed
= False
1262 self
._vnf
_phase
_completed
= False
1264 # Initalise the state to init
1265 # The NSR moves through the following transitions
1266 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1267 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1268 # 3. VNFS_READY - READY when the NSR is published
1270 self
.set_state(NetworkServiceRecordState
.INIT
)
1272 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
)
1275 def nsm_plugin(self
):
1277 return self
._nsm
_plugin
1279 def set_state(self
, state
):
1280 """ Set state for this NSR"""
1281 self
._log
.debug("Setting state to %s", state
)
1282 # We are in init phase and is moving to the next state
1283 # The new state could be a FAILED state or VNF_INIIT_PHASE
1284 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1285 self
._vl
_phase
_completed
= True
1287 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1288 self
._vnf
_phase
_completed
= True
1290 self
._op
_status
.set_state(state
)
1291 self
._nsm
_plugin
.set_state(self
.id, state
)
1295 """ Get id for this NSR"""
1296 return self
._nsr
_cfg
_msg
.id
1300 """ Name of this network service record """
1301 return self
._nsr
_cfg
_msg
.name
1304 def cloud_account_name(self
):
1305 return self
._nsr
_cfg
_msg
.cloud_account
1308 def om_datacenter_name(self
):
1309 if self
._nsr
_cfg
_msg
.has_field('om_datacenter'):
1310 return self
._nsr
_cfg
_msg
.om_datacenter
1315 """State of this NetworkServiceRecord"""
1316 return self
._op
_status
.state
1320 """ Is this NSR active ?"""
1321 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1325 """ VLRs associated with this NSR"""
1330 """ VNFRs associated with this NSR"""
1335 """ VNFFGRs associated with this NSR"""
1336 return self
._vnffgrs
1339 def scaling_groups(self
):
1340 """ Scaling groups associated with this NSR """
1341 return self
._scaling
_groups
1344 def param_pools(self
):
1345 """ Parameter value pools associated with this NSR"""
1346 return self
._param
_pools
1349 def nsr_cfg_msg(self
):
1350 return self
._nsr
_cfg
_msg
1353 def nsr_cfg_msg(self
, msg
):
1354 self
._nsr
_cfg
_msg
= msg
1358 """ NSD Protobuf for this NSR """
1359 if self
._nsd
is not None:
1361 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1366 """ NSD ID for this NSR """
1367 return self
.nsd_msg
.id
1371 ''' Get a new job id for config primitive'''
1376 def config_status(self
):
1377 """ Config status for NSR """
1378 return self
._config
_status
1380 def resolve_placement_group_cloud_construct(self
, input_group
):
1382 Returns the cloud specific construct for placement group
1384 copy_dict
= ['name', 'requirement', 'strategy']
1386 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1387 if group_info
.placement_group_ref
== input_group
.name
:
1388 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1389 group_dict
= {k
:v
for k
,v
in
1390 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1391 for param
in copy_dict
:
1392 group_dict
.update({param
: getattr(input_group
, param
)})
1393 group
.from_dict(group_dict
)
1399 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1400 self
.name
, self
.nsd_id
, self
.cloud_account_name
1403 def _get_vnfd(self
, vnfd_id
, config_xact
):
1404 """ Fetch vnfd msg for the passed vnfd id """
1405 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1407 def _get_vnfd_cloud_account(self
, vnfd_member_index
):
1408 """ Fetch Cloud Account for the passed vnfd id """
1409 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1410 vim_accounts
= [(vnf
.cloud_account
,vnf
.om_datacenter
) for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map \
1411 if vnfd_member_index
== vnf
.member_vnf_index_ref
]
1412 if vim_accounts
and vim_accounts
[0]:
1413 return vim_accounts
[0]
1414 return (self
.cloud_account_name
,self
.om_datacenter_name
)
1416 def _get_constituent_vnfd_msg(self
, vnf_index
):
1417 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1418 if const_vnfd
.member_vnf_index
== vnf_index
:
1421 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1423 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1424 """ Record an event """
1425 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1426 if state
is not None:
1427 self
.set_state(state
)
1429 def scaling_trigger_str(self
, trigger
):
1430 SCALING_TRIGGER_STRS
= {
1431 NsdYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1432 NsdYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1433 NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1434 NsdYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1437 return SCALING_TRIGGER_STRS
[trigger
]
1438 except Exception as e
:
1439 self
._log
.error("Scaling trigger mapping error for {} : {}".
1441 self
._log
.exception(e
)
1442 return "Unknown trigger"
1445 def instantiate_vls(self
):
1447 This function instantiates VLs for every VL in this Network Service
1449 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1451 for vlr
in self
._vlrs
:
1452 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1453 vlr
.state
= VlRecordState
.ACTIVE
1457 def create(self
, config_xact
):
1458 """ Create this network service"""
1459 # Create virtual links for all the external vnf
1460 # connection points in this NS
1461 yield from self
.create_vls()
1463 # Create VNFs in this network service
1464 yield from self
.create_vnfs(config_xact
)
1466 # Create VNFFG for network service
1467 self
.create_vnffgs()
1469 # Create Scaling Groups for each scaling group in NSD
1470 self
.create_scaling_groups()
1472 # Create Parameter Pools
1473 self
.create_param_pools()
1476 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1477 """ Apply config based on script for scale group """
1480 def add_vnfrs_data(vnfrs_list
):
1481 """ Add as a dict each of the VNFRs data """
1483 for vnfr
in vnfrs_list
:
1484 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1486 vnfr_data
['name'] = vnfr
.name
1487 if trigger
in [NsdYang
.ScalingTrigger
.PRE_SCALE_IN
, NsdYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1488 # Get VNF management and other IPs, etc
1489 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1490 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1492 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1493 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1494 except Exception as e
:
1495 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1496 format(vnfr
.name
, e
))
1499 vnfr_data
['connection_points'] = []
1500 for cp
in opdata
.connection_point
:
1502 con_pt
['name'] = cp
.name
1503 con_pt
['ip_address'] = cp
.ip_address
1504 vnfr_data
['connection_points'].append(con_pt
)
1505 except Exception as e
:
1506 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1507 format(vnfr
.name
, e
))
1509 vnfrs_data
.append(vnfr_data
)
1510 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1514 def add_nsr_data(nsr
):
1516 nsr_data
['name'] = nsr
.name
1519 if script
is None or len(script
) == 0:
1520 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1523 if script
[0] == '/':
1526 path
= os
.path
.join(os
.environ
['RIFT_INSTALL'], "usr/bin", script
)
1527 if not os
.path
.exists(path
):
1528 self
._log
.error("Config faled for scale group {}: Script does not exist at {}".
1529 format(group
.name
, path
))
1532 # Build a YAML file with all parameters for the script to execute
1533 # The data consists of 5 sections
1535 # 2. Scale group config
1536 # 3. VNFRs in the scale group
1537 # 4. VNFRs outside scale group
1540 data
['trigger'] = group
.trigger_map(trigger
)
1541 data
['config'] = group
.group_msg
.as_dict()
1544 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1546 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1548 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1549 data
["nsr"] = add_nsr_data(self
)
1552 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1553 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1556 self
._log
.debug("Creating a temp file: {} with input data: {}".
1557 format(tmp_file
.name
, data
))
1559 cmd
= "{} {}".format(path
, tmp_file
.name
)
1560 self
._log
.debug("Running the CMD: {}".format(cmd
))
1561 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1562 rc
= yield from proc
.wait()
1564 self
._log
.error("The script {} for scale group {} config returned: {}".
1565 format(script
, group
.name
, rc
))
1573 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1574 """ Apply the config for the scaling group based on trigger """
1575 if group
is None or scale_instance
is None:
1579 def update_config_status(success
=True, err_msg
=None):
1580 self
._log
.debug("Update %s config status to %r : %s",
1581 scale_instance
, success
, err_msg
)
1582 if (scale_instance
.config_status
== "failed"):
1583 # Do not update the config status if it is already in failed state
1586 if scale_instance
.config_status
== "configured":
1587 # Update only to failed state an already configured scale instance
1589 scale_instance
.config_status
= "failed"
1590 scale_instance
.config_err_msg
= err_msg
1591 yield from self
.update_state()
1593 # We are in configuring state
1594 # Only after post scale out mark instance as configured
1595 if trigger
== NsdYang
.ScalingTrigger
.POST_SCALE_OUT
:
1597 scale_instance
.config_status
= "configured"
1599 scale_instance
.config_status
= "failed"
1600 scale_instance
.config_err_msg
= err_msg
1601 yield from self
.update_state()
1603 config
= group
.trigger_config(trigger
)
1607 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1608 if config
.has_field("ns_config_primitive_name_ref"):
1609 config_name
= config
.ns_config_primitive_name_ref
1610 nsd_msg
= self
.nsd_msg
1611 config_primitive
= None
1612 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1613 if ns_cfg_prim
.name
== config_name
:
1614 config_primitive
= ns_cfg_prim
1617 if config_primitive
is None:
1618 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1620 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1621 if config_primitive
.has_field("user_defined_script"):
1622 rc
= yield from self
.apply_scale_group_config_script(config_primitive
.user_defined_script
,
1623 group
, scale_instance
, trigger
, vnfrs
)
1626 err_msg
= "Failed config for trigger {} using config script '{}'". \
1627 format(self
.scaling_trigger_str(trigger
),
1628 config_primitive
.user_defined_script
)
1629 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1632 err_msg
= "Failed config for trigger {} as config script is not specified". \
1633 format(self
.scaling_trigger_str(trigger
))
1634 yield from update_config_status(success
=False, err_msg
=err_msg
)
1635 raise NotImplementedError("Only script based config support for scale group for now: {}".
1638 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1639 format(self
.scaling_trigger_str(trigger
))
1640 yield from update_config_status(success
=False, err_msg
=err_msg
)
1641 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1645 def create_scaling_groups(self
):
1646 """ This function creates a NSScalingGroup for every scaling
1647 group defined in he NSD"""
1649 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1650 self
._log
.debug("Found scaling_group %s in nsr id %s",
1651 scaling_group_msg
.name
, self
.id)
1653 group_record
= scale_group
.ScalingGroup(
1658 self
._scaling
_groups
[group_record
.name
] = group_record
1661 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1662 group
= self
._scaling
_groups
[group_name
]
1663 scale_instance
= group
.create_instance(index
, is_default
)
1667 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1668 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1671 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1672 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1673 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1675 cloud_account_name
, om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd_msg
.member_vnf_index
)
1676 if cloud_account_name
is None:
1677 cloud_account_name
= self
.cloud_account_name
1678 for _
in range(count
):
1679 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, cloud_account_name
, om_datacenter_name
, group_name
, index
)
1680 scale_instance
.add_vnfr(vnfr
)
1685 def instantiate_instance():
1686 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1687 vnfrs
= yield from create_vnfs()
1688 yield from self
.publish()
1690 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1691 scale_instance
.operational_status
= "vnf_init_phase"
1692 yield from self
.update_state()
1695 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1696 group
, scale_instance
, vnfrs
)
1698 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1699 format(group
.name
, index
))
1700 scale_instance
.operational_status
= "failed"
1702 yield from self
.instantiate_vnfs(vnfrs
, scaleout
=True)
1705 except Exception as e
:
1706 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1707 format(group
.name
, e
))
1708 self
._log
.exception(e
)
1709 scale_instance
.operational_status
= "failed"
1711 yield from self
.update_state()
1713 yield from instantiate_instance()
1716 def delete_scale_group_instance(self
, group_name
, index
):
1717 group
= self
._scaling
_groups
[group_name
]
1718 scale_instance
= group
.get_instance(index
)
1719 if scale_instance
.is_default
:
1720 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1722 scale_instance
.operational_status
= "terminate"
1723 yield from self
.update_state()
1726 def terminate_instance():
1727 self
._log
.debug("Terminating %s VNFRS" % scale_instance
)
1728 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_IN
,
1729 group
, scale_instance
)
1731 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1732 format(group
.name
, index
))
1734 # Going ahead with terminate, even if there is an error in pre-scale-in config
1735 # as this could be result of scale out failure and we need to cleanup this group
1736 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
, scalein
=True)
1737 group
.delete_instance(index
)
1739 scale_instance
.operational_status
= "vnf_terminate_phase"
1740 yield from self
.update_state()
1742 yield from terminate_instance()
1745 def _update_scale_group_instances_status(self
):
1747 def post_scale_out_task(group
, instance
):
1748 # Apply post scale out config once all VNFRs are active
1749 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_OUT
,
1751 instance
.operational_status
= "running"
1753 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1754 format(group
.name
, instance
.instance_id
))
1756 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1757 format(group
.name
, instance
.instance_id
))
1759 yield from self
.update_state()
1761 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1762 for group
, instances
in group_instances
.items():
1763 self
._log
.debug("Updating %s instance status", group
)
1764 for instance
in instances
:
1765 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
1766 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
1767 if instance
.operational_status
== "vnf_init_phase":
1768 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
1769 instance
.operational_status
= "running"
1771 # Create a task for post scale out to allow us to sleep before attempting
1772 # to configure newly created VM's
1773 self
._loop
.create_task(post_scale_out_task(group
, instance
))
1775 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
1776 self
._log
.debug("Scale out for group {} and instance {} failed".
1777 format(group
.name
, instance
.instance_id
))
1778 instance
.operational_status
= "failed"
1780 elif instance
.operational_status
== "vnf_terminate_phase":
1781 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
1782 instance
.operational_status
= "terminated"
1783 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_IN
,
1786 self
._log
.debug("Scale in for group {} and instance {} succeeded".
1787 format(group
.name
, instance
.instance_id
))
1789 self
._log
.error("Post scale in config for scale group {} ({}) failed".
1790 format(group
.name
, instance
.instance_id
))
1792 def create_vnffgs(self
):
1793 """ This function creates VNFFGs for every VNFFG in the NSD
1794 associated with this NSR"""
1796 for vnffgd
in self
.nsd_msg
.vnffgd
:
1797 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
1798 vnffgr
= VnffgRecord(self
._dts
,
1801 self
._nsm
._vnffgmgr
,
1805 self
._sdn
_account
_name
1807 self
._vnffgrs
[vnffgr
.id] = vnffgr
1809 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
1810 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1811 if not vld
.has_field('ip_profile_ref'):
1813 profile
= [profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1814 return profile
[0] if profile
else None
1817 def _create_vls(self
, vld
, cloud_account
,om_datacenter
):
1818 """Create a VLR in the cloud account specified using the given VLD
1822 cloud_account : Cloud account name
1827 vlr
= yield from VirtualLinkRecord
.create_record(
1835 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
1837 restart_mode
=self
.restart_mode
)
1841 def _extract_cloud_accounts_for_vl(self
, vld
):
1843 Extracts the list of cloud accounts from the NS Config obj
1846 1. Cloud accounts based connection point (vnf_cloud_account_map)
1848 vld : VLD yang object
1853 cloud_account_list
= []
1855 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1856 # Handle case where cloud_account is None
1858 for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1859 if vnf
.cloud_account
is not None or vnf
.om_datacenter
is not None:
1860 vnf_cloud_map
[vnf
.member_vnf_index_ref
] = (vnf
.cloud_account
,vnf
.om_datacenter
)
1862 for vnfc
in vld
.vnfd_connection_point_ref
:
1863 cloud_account
= vnf_cloud_map
.get(
1864 vnfc
.member_vnf_index_ref
,
1865 (self
.cloud_account_name
,self
.om_datacenter_name
))
1867 cloud_account_list
.append(cloud_account
)
1869 if self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1870 for vld_map
in self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1871 if vld_map
.vld_id_ref
== vld
.id:
1872 for cloud_account
in vld_map
.cloud_accounts
:
1873 cloud_account_list
.extend((cloud_account
,None))
1874 for om_datacenter
in vld_map
.om_datacenters
:
1875 cloud_account_list
.extend((None,om_datacenter
))
1877 # If no config has been provided then fall-back to the default
1879 if not cloud_account_list
:
1880 cloud_account_list
= [(self
.cloud_account_name
,self
.om_datacenter_name
)]
1882 self
._log
.debug("VL {} cloud accounts: {}".
1883 format(vld
.name
, cloud_account_list
))
1884 return set(cloud_account_list
)
1887 def create_vls(self
):
1888 """ This function creates VLs for every VLD in the NSD
1889 associated with this NSR"""
1890 for vld
in self
.nsd_msg
.vld
:
1892 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
1893 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1894 for cloud_account
,om_datacenter
in cloud_account_list
:
1895 vlr
= yield from self
._create
_vls
(vld
, cloud_account
,om_datacenter
)
1896 self
._vlrs
.append(vlr
)
1900 def create_vl_instance(self
, vld
):
1901 self
._log
.debug("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
1902 # Check if the VL is already present
1904 for vl
in self
._vlrs
:
1905 if vl
.vld_msg
.id == vld
.id:
1906 self
._log
.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1907 vld
.id, self
.id, vl
.id, vl
.state
)
1909 if vlr
.state
!= VlRecordState
.TERMINATED
:
1910 err_msg
= "VLR for VL %s in NSR %s already instantiated", \
1912 self
._log
.error(err_msg
)
1913 raise NsrVlUpdateError(err_msg
)
1917 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1918 for account
,om_datacenter
in cloud_account_list
:
1919 vlr
= yield from self
._create
_vls
(vld
, account
,om_datacenter
)
1920 self
._vlrs
.append(vlr
)
1922 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
1923 yield from self
.update_state()
1926 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1927 vlr
.state
= VlRecordState
.ACTIVE
1929 except Exception as e
:
1930 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
1931 format(self
.id, vld
.id, e
)
1932 self
._log
.error(err_msg
)
1933 self
._log
.exception(e
)
1934 vlr
.state
= VlRecordState
.FAILED
1936 yield from self
.update_state()
1939 def delete_vl_instance(self
, vld
):
1940 for vlr
in self
._vlrs
:
1941 if vlr
.vld_msg
.id == vld
.id:
1942 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
1943 vlr
.id, vld
.id, self
.id)
1944 vlr
.state
= VlRecordState
.TERMINATE_PENDING
1945 yield from self
.update_state()
1948 yield from self
.nsm_plugin
.terminate_vl(vlr
)
1949 vlr
.state
= VlRecordState
.TERMINATED
1950 self
._vlrs
.remove(vlr
)
1952 except Exception as e
:
1953 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
1954 format(self
.id, vld
.id, e
)
1955 self
._log
.error(err_msg
)
1956 self
._log
.exception(e
)
1957 vlr
.state
= VlRecordState
.FAILED
1959 yield from self
.update_state()
1963 def create_vnfs(self
, config_xact
):
1965 This function creates VNFs for every VNF in the NSD
1966 associated with this NSR
1968 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
1969 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
1971 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1972 if not const_vnfd
.start_by_default
:
1973 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1974 const_vnfd
.member_vnf_index
)
1977 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
1978 cloud_account_name
,om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd
.member_vnf_index
)
1979 if cloud_account_name
is None:
1980 cloud_account_name
= self
.cloud_account_name
1981 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
)
1984 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
1985 placement_groups
= []
1986 for group
in self
.nsd_msg
.placement_groups
:
1987 for member_vnfd
in group
.member_vnfd
:
1988 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
1989 (member_vnfd
.member_vnf_index_ref
== const_vnfd
.member_vnf_index
):
1990 group_info
= self
.resolve_placement_group_cloud_construct(group
)
1991 if group_info
is None:
1992 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1993 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1995 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1998 const_vnfd
.member_vnf_index
)
1999 placement_groups
.append(group_info
)
2000 return placement_groups
2003 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
, group_name
=None, group_instance_id
=None):
2004 # Fetch the VNFD associated with this VNF
2005 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
2006 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,cloud_account_name
)
2007 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2009 const_vnfd
.member_vnf_index
,
2010 [ group
.name
for group
in placement_groups
])
2011 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
2024 restart_mode
=self
.restart_mode
,
2026 if vnfr
.id in self
._vnfrs
:
2027 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
2028 raise NetworkServiceRecordError(err
)
2030 self
._vnfrs
[vnfr
.id] = vnfr
2031 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
2033 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2035 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2041 def create_param_pools(self
):
2042 for param_pool
in self
.nsd_msg
.parameter_pool
:
2043 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2045 start_value
= param_pool
.range.start_value
2046 end_value
= param_pool
.range.end_value
2047 if end_value
< start_value
:
2048 raise NetworkServiceRecordError(
2049 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2050 start_value
, end_value
2054 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2057 range(start_value
, end_value
)
2061 def fetch_vnfr(self
, vnfr_path
):
2062 """ Fetch VNFR record """
2064 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2066 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2068 for ent
in res_iter
:
2069 res
= yield from ent
2075 def instantiate_vnfs(self
, vnfrs
, scaleout
=False):
2077 This function instantiates VNFs for every VNF in this Network Service
2079 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2081 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2082 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
,scaleout
)
2085 def instantiate_vnffgs(self
):
2087 This function instantiates VNFFGs for every VNFFG in this Network Service
2089 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2090 len(self
.nsd_msg
.vnffgd
), self
.id)
2091 for _
, vnfr
in self
.vnfrs
.items():
2092 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2093 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2094 yield from asyncio
.sleep(2, loop
=self
._loop
)
2095 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2096 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2099 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2100 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2103 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2104 yield from asyncio
.sleep(90, loop
=self
._loop
)
2105 self
._log
.info("Starting VNFFG orchestration")
2106 for vnffg
in self
._vnffgrs
.values():
2107 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2108 yield from vnffg
.instantiate()
2111 def instantiate_scaling_instances(self
, config_xact
):
2112 """ Instantiate any default scaling instances in this Network Service """
2113 for group
in self
._scaling
_groups
.values():
2114 for i
in range(group
.min_instance_count
):
2115 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2116 yield from self
.create_scale_group_instance(
2117 group
.name
, i
, config_xact
, is_default
=True
2120 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2121 if group_msg
.scaling_group_name_ref
!= group
.name
:
2124 for instance
in group_msg
.instance
:
2125 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2126 yield from self
.create_scale_group_instance(
2127 group
.name
, instance
.id, config_xact
, is_default
=False
2130 def has_scaling_instances(self
):
2131 """ Return boolean indicating if the network service has default scaling groups """
2132 for group
in self
._scaling
_groups
.values():
2133 if group
.min_instance_count
> 0:
2136 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2137 if len(group_msg
.instance
) > 0:
2144 """ This function publishes this NSR """
2145 self
._nsr
_msg
= self
.create_msg()
2147 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2151 if self
._debug
_running
:
2152 self
._log
.debug("Publishing NSR in RUNNING state!")
2155 with self
._dts
.transaction() as xact
:
2156 yield from self
._nsm
.nsr_handler
.update(xact
, self
.nsr_xpath
, self
._nsr
_msg
)
2157 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2158 self
._debug
_running
= True
2161 def unpublish(self
, xact
):
2162 """ Unpublish this NSR object """
2163 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2164 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2167 def nsr_xpath(self
):
2168 """ Returns the xpath associated with this NSR """
2170 "D,/nsr:ns-instance-opdata" +
2171 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2175 def xpath_from_nsr(nsr
):
2176 """ Returns the xpath associated with this NSR op data"""
2177 return (NetworkServiceRecord
.XPATH
+
2178 "[nsr:ns-instance-config-ref = '{}']").format(nsr
.id)
2181 def nsd_xpath(self
):
2182 """ Return NSD config xpath."""
2184 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2185 ).format(self
.nsd_id
)
2188 def instantiate(self
, config_xact
):
2189 """"Instantiates a NetworkServiceRecord.
2191 This function instantiates a Network service
2192 which involves the following steps,
2194 * Instantiate every VL in NSD by sending create VLR request to DTS.
2195 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2196 * Publish the NSR details to DTS
2199 nsr: The NSR configuration request containing nsr-id and nsd
2200 config_xact: The configuration transaction which initiated the instatiation
2203 NetworkServiceRecordError if the NSR creation fails
2209 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2211 # Move the state to INIITALIZING
2212 self
.set_state(NetworkServiceRecordState
.INIT
)
2214 event_descr
= "Instantiation Request Received NSR Id:%s" % self
.id
2215 self
.record_event("instantiating", event_descr
)
2218 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2220 # Merge any config and initial config primitive values
2221 self
.config_store
.merge_nsd_config(self
.nsd_msg
)
2222 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2224 event_descr
= "Fetched NSD with descriptor id %s" % self
.nsd_id
2225 self
.record_event("nsd-fetched", event_descr
)
2227 if self
._nsd
is None:
2228 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2229 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2230 raise NetworkServiceRecordError(self
)
2232 self
._log
.debug("Got nsd result %s", self
._nsd
)
2234 # Substitute any input parameters
2235 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2238 yield from self
.create(config_xact
)
2240 # Publish the NSR to DTS
2241 yield from self
.publish()
2244 def do_instantiate():
2246 Instantiate network service
2248 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2249 self
.id, self
.nsd_id
)
2251 # instantiate the VLs
2252 event_descr
= ("Instantiating %s external VLs for NSR id %s" %
2253 (len(self
.nsd_msg
.vld
), self
.id))
2254 self
.record_event("begin-external-vls-instantiation", event_descr
)
2256 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2258 yield from self
.instantiate_vls()
2260 # Publish the NSR to DTS
2261 yield from self
.publish()
2263 event_descr
= ("Finished instantiating %s external VLs for NSR id %s" %
2264 (len(self
.nsd_msg
.vld
), self
.id))
2265 self
.record_event("end-external-vls-instantiation", event_descr
)
2267 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2269 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2270 self
.id, self
.nsd_id
)
2272 # instantiate the VNFs
2273 event_descr
= ("Instantiating %s VNFS for NSR id %s" %
2274 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2276 self
.record_event("begin-vnf-instantiation", event_descr
)
2278 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2280 self
._log
.debug(" Finished instantiating %d VNFs for NSR id %s",
2281 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2283 event_descr
= ("Finished instantiating %s VNFs for NSR id %s" %
2284 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2285 self
.record_event("end-vnf-instantiation", event_descr
)
2287 if len(self
.vnffgrs
) > 0:
2288 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2289 event_descr
= ("Instantiating %s VNFFGS for NSR id %s" %
2290 (len(self
.nsd_msg
.vnffgd
), self
.id))
2292 self
.record_event("begin-vnffg-instantiation", event_descr
)
2294 yield from self
.instantiate_vnffgs()
2296 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id %s" %
2297 (len(self
.nsd_msg
.vnffgd
), self
.id))
2298 self
.record_event("end-vnffg-instantiation", event_descr
)
2300 if self
.has_scaling_instances():
2301 event_descr
= ("Instantiating %s Scaling Groups for NSR id %s" %
2302 (len(self
._scaling
_groups
), self
.id))
2304 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2305 yield from self
.instantiate_scaling_instances(config_xact
)
2306 self
.record_event("end-scaling-group-instantiation", event_descr
)
2308 # Give the plugin a chance to deploy the network service now that all
2309 # virtual links and vnfs are instantiated
2310 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2312 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2313 self
.id, self
.nsd_id
)
2315 # Publish the NSR to DTS
2316 yield from self
.publish()
2318 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s]",
2319 self
.id, self
.nsd_id
)
2321 def on_instantiate_done(fut
):
2322 # If the do_instantiate fails, then publish NSR with failed result
2325 import traceback
, sys
2326 print(traceback
.format_exception(None,e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
2327 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(e
))
2328 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(e
)))
2330 instantiate_task
= self
._loop
.create_task(do_instantiate())
2331 instantiate_task
.add_done_callback(on_instantiate_done
)
2334 def set_config_status(self
, status
, status_details
=None):
2335 if self
.config_status
!= status
:
2336 self
._log
.debug("Updating NSR {} status for {} to {}".
2337 format(self
.name
, self
.config_status
, status
))
2338 self
._config
_status
= status
2339 self
._config
_status
_details
= status_details
2341 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2342 self
.record_event("config-failed", "NS configuration failed",
2343 evt_details
=self
._config
_status
_details
)
2345 yield from self
.publish()
2348 def is_active(self
):
2349 """ This NS is active """
2350 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2354 # Publish the NSR to DTS
2355 self
._log
.debug("Network service %s is active ", self
.id)
2356 self
._is
_active
= True
2358 event_descr
= "NSR in running state for NSR id %s" % self
.id
2359 self
.record_event("ns-running", event_descr
)
2361 yield from self
.publish()
2364 def instantiation_failed(self
, failed_reason
=None):
2365 """ The NS instantiation failed"""
2366 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2368 self
.set_state(NetworkServiceRecordState
.FAILED
)
2370 event_descr
= "Instantiation of NS %s failed" % self
.id
2371 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2373 # Publish the NSR to DTS
2374 yield from self
.publish()
2377 def terminate_vnfrs(self
, vnfrs
, scalein
=False):
2378 """ Terminate VNFRS in this network service """
2379 self
._log
.debug("Terminating VNFs in network service %s", self
.id)
2381 self
._log
.debug("Terminating VNFs in network service %s %s", vnfr
.id, self
.id)
2383 yield from self
.nsm_plugin
.terminate_vnf(self
, vnfr
, scalein
=True)
2386 def terminate(self
):
2387 """ Terminate a NetworkServiceRecord."""
2388 def terminate_vnffgrs():
2389 """ Terminate VNFFGRS in this network service """
2390 self
._log
.debug("Terminating VNFFGRs in network service %s", self
.id)
2391 for vnffgr
in self
.vnffgrs
.values():
2392 yield from vnffgr
.terminate()
2394 def terminate_vlrs():
2395 """ Terminate VLRs in this netork service """
2396 self
._log
.debug("Terminating VLs in network service %s", self
.id)
2397 for vlr
in self
.vlrs
:
2398 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2399 vlr
.state
= VlRecordState
.TERMINATED
2401 self
._log
.debug("Terminating network service id %s", self
.id)
2403 # Move the state to TERMINATE
2404 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2405 event_descr
= "Terminate being processed for NS Id:%s" % self
.id
2406 self
.record_event("terminate", event_descr
)
2408 # Move the state to VNF_TERMINATE_PHASE
2409 self
._log
.debug("Terminating VNFFGs in NS ID: %s", self
.id)
2410 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2411 event_descr
= "Terminating VNFFGS in NS Id:%s" % self
.id
2412 self
.record_event("terminating-vnffgss", event_descr
)
2413 yield from terminate_vnffgrs()
2415 # Move the state to VNF_TERMINATE_PHASE
2416 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2417 event_descr
= "Terminating VNFS in NS Id:%s" % self
.id
2418 self
.record_event("terminating-vnfs", event_descr
)
2419 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2421 # Move the state to VL_TERMINATE_PHASE
2422 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2423 event_descr
= "Terminating VLs in NS Id:%s" % self
.id
2424 self
.record_event("terminating-vls", event_descr
)
2425 yield from terminate_vlrs()
2426 yield from self
.nsm_plugin
.terminate_ns(self
)
2427 # Move the state to TERMINATED
2428 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2429 event_descr
= "Terminated NS Id:%s" % self
.id
2430 self
.record_event("terminated", event_descr
)
2433 """"Enable a NetworkServiceRecord."""
2437 """"Disable a NetworkServiceRecord."""
2440 def map_config_status(self
):
2441 self
._log
.debug("Config status for ns {} is {}".
2442 format(self
.name
, self
._config
_status
))
2443 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2444 return 'configuring'
2445 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2449 def vl_phase_completed(self
):
2450 """ Are VLs created in this NS?"""
2451 return self
._vl
_phase
_completed
2453 def vnf_phase_completed(self
):
2454 """ Are VLs created in this NS?"""
2455 return self
._vnf
_phase
_completed
2457 def create_msg(self
):
2458 """ The network serice record as a message """
2459 nsr_dict
= {"ns_instance_config_ref": self
.id}
2460 nsr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2461 #nsr.cloud_account = self.cloud_account_name
2462 nsr
.sdn_account
= self
._sdn
_account
_name
2463 nsr
.name_ref
= self
.name
2464 nsr
.nsd_ref
= self
.nsd_id
2465 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2466 nsr
.operational_events
= self
._op
_status
.msg
2467 nsr
.operational_status
= self
._op
_status
.yang_str()
2468 nsr
.config_status
= self
.map_config_status()
2469 nsr
.config_status_details
= self
._config
_status
_details
2470 nsr
.create_time
= self
._create
_time
2471 nsr
.uptime
= int(time
.time()) - self
._create
_time
2473 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2474 cfg_prim
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2476 nsr
.service_primitive
.append(cfg_prim
)
2478 for init_cfg
in self
.nsd_msg
.initial_config_primitive
:
2479 prim
= NsrYang
.NsrInitialConfigPrimitive
.from_dict(
2481 nsr
.initial_config_primitive
.append(prim
)
2483 if self
.vl_phase_completed():
2484 for vlr
in self
.vlrs
:
2485 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2487 if self
.vnf_phase_completed():
2488 for vnfr_id
in self
.vnfrs
:
2489 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2490 for vnffgr
in self
.vnffgrs
.values():
2491 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2492 for scaling_group
in self
._scaling
_groups
.values():
2493 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2497 def all_vnfs_active(self
):
2498 """ Are all VNFS in this NS active? """
2499 for _
, vnfr
in self
.vnfrs
.items():
2500 if vnfr
.active
is not True:
2505 def update_state(self
):
2506 """ Re-evaluate this NS's state """
2507 curr_state
= self
._op
_status
.state
2509 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2510 self
._log
.debug("NS (%s) in terminated state, not updating state", self
.id)
2513 new_state
= NetworkServiceRecordState
.RUNNING
2514 self
._log
.info("Received update_state for nsr: %s, curr-state: %s",
2515 self
.id, curr_state
)
2517 # Check all the VNFRs are present
2518 for _
, vnfr
in self
.vnfrs
.items():
2519 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2521 elif vnfr
.state
== VnfRecordState
.FAILED
:
2522 if vnfr
._prev
_state
!= vnfr
.state
:
2523 event_descr
= "Instantiation of VNF %s failed" % vnfr
.id
2524 event_error_details
= vnfr
.state_failed_reason
2525 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2526 vnfr
.set_state(VnfRecordState
.FAILED
)
2528 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2529 vnfr
.state
, vnfr
._prev
_state
)
2530 new_state
= NetworkServiceRecordState
.FAILED
2533 self
._log
.info("VNF %s in NSR %s is still not active; current state is: %s",
2534 vnfr
.id, self
.id, vnfr
.state
)
2535 new_state
= curr_state
2537 # If new state is RUNNING; check all VLs
2538 if new_state
== NetworkServiceRecordState
.RUNNING
:
2539 for vl
in self
.vlrs
:
2541 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2543 elif vl
.state
== VlRecordState
.FAILED
:
2544 if vl
.prev_state
!= vl
.state
:
2545 event_descr
= "Instantiation of VL %s failed" % vl
.id
2546 event_error_details
= vl
.state_failed_reason
2547 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2548 vl
.prev_state
= vl
.state
2550 self
._log
.debug("VL %s already in failed state")
2552 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2553 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2556 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2557 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2560 # If new state is RUNNING; check VNFFGRs are also active
2561 if new_state
== NetworkServiceRecordState
.RUNNING
:
2562 for _
, vnffgr
in self
.vnffgrs
.items():
2563 self
._log
.info("Checking vnffgr state for nsr %s is: %s",
2564 self
.id, vnffgr
.state
)
2565 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2567 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2568 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2569 self
.record_event("vnffg-failed", event_descr
)
2570 new_state
= NetworkServiceRecordState
.FAILED
2573 self
._log
.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2574 vnffgr
.id, self
.id, vnffgr
.state
)
2575 new_state
= curr_state
2577 # Update all the scaling group instance operational status to
2578 # reflect the state of all VNFR within that instance
2579 yield from self
._update
_scale
_group
_instances
_status
()
2581 for _
, group
in self
._scaling
_groups
.items():
2582 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
2583 new_state
= NetworkServiceRecordState
.SCALING_OUT
2585 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
2586 new_state
= NetworkServiceRecordState
.SCALING_IN
2589 if new_state
!= curr_state
:
2590 self
._log
.debug("Changing state of Network service %s from %s to %s",
2591 self
.id, curr_state
, new_state
)
2592 if new_state
== NetworkServiceRecordState
.RUNNING
:
2593 yield from self
.is_active()
2594 elif new_state
== NetworkServiceRecordState
.FAILED
:
2595 # If the NS is already active and we entered scaling_in, scaling_out,
2596 # do not mark the NS as failing if scaling operation failed.
2597 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
2598 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
2599 new_state
= NetworkServiceRecordState
.RUNNING
2600 self
.set_state(new_state
)
2602 yield from self
.instantiation_failed()
2604 self
.set_state(new_state
)
2606 yield from self
.publish()
2609 class InputParameterSubstitution(object):
2611 This class is responsible for substituting input parameters into an NSD.
2614 def __init__(self
, log
):
2615 """Create an instance of InputParameterSubstitution
2618 log - a logger for this object to use
2623 def __call__(self
, nsd
, nsr_config
):
2624 """Substitutes input parameters from the NSR config into the NSD
2626 This call modifies the provided NSD with the input parameters that are
2627 contained in the NSR config.
2630 nsd - a GI NSD object
2631 nsr_config - a GI NSR config object
2634 if nsd
is None or nsr_config
is None:
2637 # Create a lookup of the xpath elements that this descriptor allows
2639 optional_input_parameters
= set()
2640 for input_parameter
in nsd
.input_parameter_xpath
:
2641 optional_input_parameters
.add(input_parameter
.xpath
)
2643 # Apply the input parameters to the descriptor
2644 if nsr_config
.input_parameter
:
2645 for param
in nsr_config
.input_parameter
:
2646 if param
.xpath
not in optional_input_parameters
:
2647 msg
= "tried to set an invalid input parameter ({})"
2648 self
.log
.error(msg
.format(param
.xpath
))
2652 "input-parameter:{} = {}".format(
2659 xpath
.setxattr(nsd
, param
.xpath
, param
.value
)
2661 except Exception as e
:
2662 self
.log
.exception(e
)
2665 class NetworkServiceDescriptor(object):
2667 Network service descriptor class
2670 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
2680 """ Returns nsd id """
2685 """ Returns name of nsd """
2686 return self
._nsd
.name
2690 """ Return the message associated with this NetworkServiceDescriptor"""
2694 def path_for_id(nsd_id
):
2695 """ Return path for the passed nsd_id"""
2696 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id
)
2699 """ Return the message associated with this NetworkServiceDescriptor"""
2700 return NetworkServiceDescriptor
.path_for_id(self
.id)
2702 def update(self
, nsd
):
2703 """ Update the NSD descriptor """
2707 class NsdDtsHandler(object):
2708 """ The network service descriptor DTS handler """
2709 XPATH
= "C,/nsd:nsd-catalog/nsd:nsd"
2711 def __init__(self
, dts
, log
, loop
, nsm
):
2721 """ Return registration handle """
2726 """ Register for Nsd create/update/delete/read requests from dts """
2728 def on_apply(dts
, acg
, xact
, action
, scratch
):
2729 """Apply the configuration"""
2730 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2731 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2733 # Create/Update an NSD record
2734 for cfg
in self
._regh
.get_xact_elements(xact
):
2735 # Only interested in those NSD cfgs whose ID was received in prepare callback
2736 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
2737 self
._nsm
.update_nsd(cfg
)
2739 scratch
.pop('nsds', None)
2741 return RwTypes
.RwStatus
.SUCCESS
2744 def delete_nsd_libs(nsd_id
):
2745 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2747 rift_artifacts_dir
= os
.environ
['RIFT_ARTIFACTS']
2748 nsd_dir
= os
.path
.join(rift_artifacts_dir
, 'launchpad/libs', nsd_id
)
2750 if os
.path
.exists (nsd_dir
):
2751 shutil
.rmtree(nsd_dir
, ignore_errors
=True)
2752 except Exception as e
:
2753 self
._log
.error("Exception in cleaning up NSD libs {}: {}".
2755 self
._log
.excpetion(e
)
2758 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2759 """ Prepare callback from DTS for NSD config """
2761 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
2764 fref
= ProtobufC
.FieldReference
.alloc()
2765 fref
.goto_whole_message(msg
.to_pbcm())
2767 if fref
.is_field_deleted():
2768 # Delete an NSD record
2769 self
._log
.debug("Deleting NSD with id %s", msg
.id)
2770 yield from delete_nsd_libs(msg
.id)
2771 self
._nsm
.delete_nsd(msg
.id)
2773 # Add this NSD to scratch to create/update in apply callback
2774 nsds
= scratch
.setdefault('nsds', [])
2776 # acg._scratch['nsds'].append(msg.id)
2778 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2781 "Registering for NSD config using xpath: %s",
2782 NsdDtsHandler
.XPATH
,
2785 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2786 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2787 # Need a list in scratch to store NSDs to create/update later
2788 # acg._scratch['nsds'] = list()
2789 self
._regh
= acg
.register(
2790 xpath
=NsdDtsHandler
.XPATH
,
2791 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
2792 on_prepare
=on_prepare
)
2795 class VnfdDtsHandler(object):
2796 """ DTS handler for VNFD config changes """
2797 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2799 def __init__(self
, dts
, log
, loop
, nsm
):
2808 """ DTS registration handle """
2813 """ Register for VNFD configuration"""
2816 def on_apply(dts
, acg
, xact
, action
, scratch
):
2817 """Apply the configuration"""
2818 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2819 xact
, action
, scratch
)
2821 # Create/Update a VNFD record
2822 for cfg
in self
._regh
.get_xact_elements(xact
):
2823 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2824 if cfg
.id in scratch
.get('vnfds', []):
2825 self
._nsm
.update_vnfd(cfg
)
2827 for cfg
in self
._regh
.elements
:
2828 if cfg
.id in scratch
.get('deleted_vnfds', []):
2829 yield from self
._nsm
.delete_vnfd(cfg
.id)
2831 scratch
.pop('vnfds', None)
2832 scratch
.pop('deleted_vnfds', None)
2835 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2836 """ on prepare callback """
2837 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2838 ks_path
.to_xpath(RwNsmYang
.get_schema()), xact_info
.query_action
, msg
)
2840 fref
= ProtobufC
.FieldReference
.alloc()
2841 fref
.goto_whole_message(msg
.to_pbcm())
2843 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2844 if fref
.is_field_deleted():
2845 self
._log
.debug("Adding msg to deleted field")
2846 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
2847 deleted_vnfds
.append(msg
.id)
2849 # Add this VNFD to scratch to create/update in apply callback
2850 vnfds
= scratch
.setdefault('vnfds', [])
2851 vnfds
.append(msg
.id)
2853 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2856 "Registering for VNFD config using xpath: %s",
2857 VnfdDtsHandler
.XPATH
,
2859 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2860 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2861 # Need a list in scratch to store VNFDs to create/update later
2862 # acg._scratch['vnfds'] = list()
2863 # acg._scratch['deleted_vnfds'] = list()
2864 self
._regh
= acg
.register(
2865 xpath
=VnfdDtsHandler
.XPATH
,
2866 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2867 on_prepare
=on_prepare
)
2869 class NsrRpcDtsHandler(object):
2870 """ The network service instantiation RPC DTS handler """
2871 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
2872 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
2873 NETCONF_IP_ADDRESS
= "127.0.0.1"
2875 RESTCONF_PORT
= 8888
2876 NETCONF_USER
= "admin"
2877 NETCONF_PW
= "admin"
2878 REST_BASE_V2_URL
= 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2880 def __init__(self
, dts
, log
, loop
, nsm
):
2887 self
._ns
_regh
= None
2889 self
._manager
= None
2890 self
._nsr
_config
_url
= NsrRpcDtsHandler
.REST_BASE_V2_URL
+ 'config/ns-instance-config'
2892 self
._model
= RwYang
.Model
.create_libncx()
2893 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
2897 """ Return the NS manager instance """
2901 def wrap_netconf_config_xml(xml
):
2902 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
2906 def _connect(self
, timeout_secs
=240):
2908 start_time
= time
.time()
2909 while (time
.time() - start_time
) < timeout_secs
:
2912 self
._log
.debug("Attemping NsmTasklet netconf connection.")
2914 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
2916 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
2917 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
2918 username
=NsrRpcDtsHandler
.NETCONF_USER
,
2919 password
=NsrRpcDtsHandler
.NETCONF_PW
,
2921 look_for_keys
=False,
2922 hostkey_verify
=False,
2927 except ncclient
.transport
.errors
.SSHError
as e
:
2928 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
2929 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
2931 yield from asyncio
.sleep(5, loop
=self
._loop
)
2933 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2936 def _apply_ns_instance_config(self
,payload_dict
):
2937 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2938 req_hdr
= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2939 response
=requests
.post(self
._nsr
_config
_url
, headers
=req_hdr
, auth
=('admin', 'admin'),data
=payload_dict
,verify
=False)
2944 """ Register for NS monitoring read from dts """
2946 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
2947 """ prepare callback from dts start-network-service"""
2948 assert action
== rwdts
.QueryAction
.RPC
2950 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
2951 "nsr_id":str(uuid
.uuid4())
2954 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and ('cloud_account' in rpc_ip
or 'om_datacenter' in rpc_ip
)):
2955 self
._log
.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip
))
2958 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
2961 # Add used value to the pool
2962 self
._log
.debug("RPC output: {}".format(rpc_op
))
2964 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
2966 #if not self._manager:
2967 # self._manager = yield from self._connect()
2969 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2970 rpc_ip
.name
, rpc_ip
.nsd_ref
)
2972 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
2973 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
2974 if k
in RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr().fields
}
2975 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
2977 ns_instance_config
= RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
2978 ns_instance_config
.nsd
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2979 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
2981 payload_dict
= ns_instance_config
.to_json(self
._model
)
2982 #xml = ns_instance_config.to_xml_v2(self._model)
2983 #netconf_xml = self.wrap_netconf_config_xml(xml)
2985 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
2986 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
2987 self
._log
.debug("Sending configure ns-instance-config json to %s: %s",
2988 self
._nsr
_config
_url
,ns_instance_config
)
2990 #response = yield from self._manager.edit_config(
2992 # config=netconf_xml,
2994 response
= yield from self
._loop
.run_in_executor(
2996 self
._apply
_ns
_instance
_config
,
2999 response
.raise_for_status()
3000 self
._log
.debug("Received edit config response: %s", response
.json())
3002 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
3003 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3005 except Exception as e
:
3006 self
._log
.error("Exception processing the "
3007 "start-network-service: {}".format(e
))
3008 self
._log
.exception(e
)
3009 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3010 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3013 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
3015 with self
._dts
.group_create() as group
:
3016 self
._ns
_regh
= group
.register(xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
3018 flags
=rwdts
.Flag
.PUBLISHER
,
3022 class NsrDtsHandler(object):
3023 """ The network service DTS handler """
3024 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3025 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3026 KEY_PAIR_XPATH
= "C,/nsr:key-pair"
3028 def __init__(self
, dts
, log
, loop
, nsm
):
3034 self
._nsr
_regh
= None
3035 self
._scale
_regh
= None
3036 self
._key
_pair
_regh
= None
3040 """ Return the NS manager instance """
3045 """ Register for Nsr create/update/delete/read requests from dts """
3047 def nsr_id_from_keyspec(ks
):
3048 nsr_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3049 nsr_id
= nsr_path_entry
.key00
.id
3052 def group_name_from_keyspec(ks
):
3053 group_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3054 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3057 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3058 """ Return boolean indicating if scaling group instance was already commited previously.
3060 By looking at the existing elements in this registration handle (elements not part
3061 of this current xact), we can tell if the instance was configured previously without
3062 keeping any application state.
3064 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3065 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3066 elem_group_name
= group_name_from_keyspec(keyspec
)
3068 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3071 if instance_cfg
.id == instance_id
:
3076 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3077 delta
= {"added": [], "deleted": []}
3078 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3079 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3080 if elem_nsr_id
!= nsr_id
:
3083 elem_group_name
= group_name_from_keyspec(keyspec
)
3084 if elem_group_name
!= group_name
:
3087 delta
["added"].append(instance_cfg
.id)
3089 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3090 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3091 if elem_nsr_id
!= nsr_id
:
3094 elem_group_name
= group_name_from_keyspec(keyspec
)
3095 if elem_group_name
!= group_name
:
3098 if instance_cfg
.id in delta
["added"]:
3099 delta
["added"].remove(instance_cfg
.id)
3101 delta
["deleted"].append(instance_cfg
.id)
3106 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3109 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3110 delta
= {"added": [], "deleted": []}
3111 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3112 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3113 if elem_nsr_id
!= nsr_id
:
3116 if 'vld' in instance_cfg
.nsd
:
3117 for vld
in instance_cfg
.nsd
.vld
:
3118 delta
["added"].append(vld
)
3120 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3121 self
._log
.debug("NSR update: %s", instance_cfg
)
3122 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3123 if elem_nsr_id
!= nsr_id
:
3126 if 'vld' in instance_cfg
.nsd
:
3127 for vld
in instance_cfg
.nsd
.vld
:
3128 if vld
in delta
["added"]:
3129 delta
["added"].remove(vld
)
3131 delta
["deleted"].append(vld
)
3135 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3136 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3138 for vld
in vl_delta
["added"]:
3139 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3141 for vld
in vl_delta
["deleted"]:
3142 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3144 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
, scratch
):
3145 # Unfortunately, it is currently difficult to figure out what has exactly
3146 # changed in this xact without Pbdelta support (RIFT-4916)
3147 # As a workaround, we can fetch the pre and post xact elements and
3148 # perform a comparison to figure out adds/deletes/updates
3149 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
3150 curr_cfgs
= list(dts_member_reg
.elements
)
3152 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
3153 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
3156 added_keys
= set(xact_key_map
) - set(curr_key_map
)
3157 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
3160 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
3161 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
3164 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
3165 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
3166 if xact_key_map
[key
] != curr_key_map
[key
]]
3168 return added_cfgs
, deleted_cfgs
, updated_cfgs
3170 def get_nsr_key_pairs(dts_member_reg
, xact
):
3172 for instance_cfg
, keyspec
in dts_member_reg
.get_xact_elements(xact
, include_keyspec
=True):
3173 self
._log
.debug("Key pair received is {} KS: {}".format(instance_cfg
, keyspec
))
3174 xpath
= keyspec
.to_xpath(RwNsrYang
.get_schema())
3175 key_pairs
[instance_cfg
.name
] = instance_cfg
3178 def on_apply(dts
, acg
, xact
, action
, scratch
):
3179 """Apply the configuration"""
3180 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3181 xact
, action
, scratch
)
3183 def handle_create_nsr(msg
, key_pairs
=None, restart_mode
=False):
3184 # Handle create nsr requests """
3185 # Do some validations
3186 if not msg
.has_field("nsd"):
3187 err
= "NSD not provided"
3188 self
._log
.error(err
)
3189 raise NetworkServiceRecordError(err
)
3191 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3192 msg
.id, msg
.as_dict())
3193 nsr
= self
.nsm
.create_nsr(msg
, key_pairs
=key_pairs
, restart_mode
=restart_mode
)
3196 def handle_delete_nsr(msg
):
3198 def delete_instantiation(ns_id
):
3199 """ Delete instantiation """
3200 with self
._dts
.transaction() as xact
:
3201 yield from self
._nsm
.terminate_ns(ns_id
, xact
)
3203 # Handle delete NSR requests
3204 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3205 # Terminate the NSR instance
3206 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3208 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3209 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3210 nsr
.record_event("terminate-rcvd", event_descr
)
3212 self
._loop
.create_task(delete_instantiation(msg
.id))
3215 def begin_instantiation(nsr
):
3216 # Begin instantiation
3217 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3218 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3220 def on_instantiate_done(fut
):
3221 # If the do_instantiate fails, then publish NSR with failed result
3225 print(traceback
.format_exception(None, e
, e
.__traceback
__), file=sys
.stderr
, flush
=True)
3226 self
._log
.error("NSR instantiation failed for NSR id %s: %s", msg
.id, str(e
))
3227 failed_nsr
= self
._nsm
.nsrs
[msg
.id]
3228 self
._loop
.create_task(failed_nsr
.instantiation_failed(failed_reason
=str(e
)))
3231 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3232 xact
, action
, scratch
)
3234 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3236 for element
in self
._key
_pair
_regh
.elements
:
3237 key_pairs
.append(element
)
3238 for element
in self
._nsr
_regh
.elements
:
3239 nsr
= handle_create_nsr(element
, key_pairs
, restart_mode
=True)
3240 instantiate_task
= self
._loop
.create_task(begin_instantiation(nsr
))
3241 instantiate_task
.add_done_callback(on_instantiate_done
)
3244 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3248 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3249 deleted_msgs
, updated_msgs
)
3251 for msg
in added_msgs
:
3252 if msg
.id not in self
._nsm
.nsrs
:
3253 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3254 key_pairs
= get_nsr_key_pairs(self
._key
_pair
_regh
, xact
)
3255 nsr
= handle_create_nsr(msg
,key_pairs
)
3256 instantiate_task
= self
._loop
.create_task(begin_instantiation(nsr
))
3257 instantiate_task
.add_done_callback(on_instantiate_done
)
3259 for msg
in deleted_msgs
:
3260 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3262 handle_delete_nsr(msg
)
3264 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3266 for msg
in updated_msgs
:
3267 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3269 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3272 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3274 for group
in msg
.scaling_group
:
3275 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3276 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3278 for instance_id
in instance_delta
["added"]:
3279 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3281 for instance_id
in instance_delta
["deleted"]:
3282 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3285 return RwTypes
.RwStatus
.SUCCESS
3288 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3289 """ Prepare calllback from DTS for NSR """
3291 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3292 action
= xact_info
.query_action
3294 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3295 xact
, action
, xact_info
, xpath
, msg
3299 def delete_instantiation(ns_id
):
3300 """ Delete instantiation """
3301 yield from self
._nsm
.terminate_ns(ns_id
, None)
3303 def handle_delete_nsr():
3304 """ Handle delete NSR requests """
3305 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3306 # Terminate the NSR instance
3307 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3309 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3310 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3311 nsr
.record_event("terminate-rcvd", event_descr
)
3313 self
._loop
.create_task(delete_instantiation(msg
.id))
3315 fref
= ProtobufC
.FieldReference
.alloc()
3316 fref
.goto_whole_message(msg
.to_pbcm())
3318 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3319 # if this is an NSR create
3320 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3321 # Ensure the Cloud account/datacenter has been specified
3322 if not msg
.has_field("cloud_account") and not msg
.has_field("om_datacenter"):
3323 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3325 # Check if nsd is specified
3326 if not msg
.has_field("nsd"):
3327 raise NsrInstantiationFailed("NSD not specified in NSR")
3330 nsr
= self
._nsm
.nsrs
[msg
.id]
3332 if msg
.has_field("nsd"):
3333 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3334 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3335 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3336 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3338 if msg
.has_field("scaling_group"):
3339 self
._log
.debug("ScaleMsg %s", msg
)
3340 self
._log
.debug("NSSCALINGSTATE %s", nsr
.state
)
3341 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3342 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3344 if len(msg
.scaling_group
) > 1:
3345 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3347 for group_msg
in msg
.scaling_group
:
3348 num_new_group_instances
= len(group_msg
.instance
)
3349 if num_new_group_instances
> 1:
3350 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3352 elif num_new_group_instances
== 1:
3353 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
3354 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
3355 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
3356 raise ScalingOperationError("Max instances for %s reached" % scale_group
)
3358 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
3361 self
._log
.debug("Registering for NSR config using xpath: %s",
3362 NsrDtsHandler
.NSR_XPATH
)
3364 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3365 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3366 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
3367 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3368 on_prepare
=on_prepare
)
3370 self
._scale
_regh
= acg
.register(
3371 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
3372 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
3375 self
._key
_pair
_regh
= acg
.register(
3376 xpath
=NsrDtsHandler
.KEY_PAIR_XPATH
,
3377 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3381 class NsrOpDataDtsHandler(object):
3382 """ The network service op data DTS handler """
3383 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
3385 def __init__(self
, dts
, log
, loop
, nsm
):
3394 """ Return the registration handle"""
3399 """ Return the NS manager instance """
3404 """ Register for Nsr op data publisher registration"""
3405 self
._log
.debug("Registering Nsr op data path %s as publisher",
3406 NsrOpDataDtsHandler
.XPATH
)
3408 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
3409 handlers
= rift
.tasklets
.Group
.Handler()
3410 with self
._dts
.group_create(handler
=handlers
) as group
:
3411 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
3413 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ | rwdts
.Flag
.DATASTORE
)
3416 def create(self
, path
, msg
):
3418 Create an NS record in DTS with the path and message
3420 self
._log
.debug("Creating NSR %s:%s", path
, msg
)
3421 self
.regh
.create_element(path
, msg
)
3422 self
._log
.debug("Created NSR, %s:%s", path
, msg
)
3425 def update(self
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
3427 Update an NS record in DTS with the path and message
3429 self
._log
.debug("Updating NSR, %s:%s regh = %s", path
, msg
, self
.regh
)
3430 self
.regh
.update_element(path
, msg
, flags
)
3431 self
._log
.debug("Updated NSR, %s:%s", path
, msg
)
3434 def delete(self
, path
):
3436 Update an NS record in DTS with the path and message
3438 self
._log
.debug("Deleting NSR path:%s", path
)
3439 self
.regh
.delete_element(path
)
3440 self
._log
.debug("Deleted NSR path:%s", path
)
3443 class VnfrDtsHandler(object):
3444 """ The virtual network service DTS handler """
3445 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3447 def __init__(self
, dts
, log
, loop
, nsm
):
3457 """ Return registration handle """
3462 """ Return the NS manager instance """
3467 """ Register for vnfr create/update/delete/ advises from dts """
3469 def on_commit(xact_info
):
3470 """ The transaction has been committed """
3471 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
3472 return rwdts
.MemberRspCode
.ACTION_OK
3475 def on_prepare(xact_info
, action
, ks_path
, msg
):
3476 """ prepare callback from dts """
3477 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3479 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3480 xact_info
, action
, ks_path
, msg
3483 schema
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
3484 path_entry
= schema
.keyspec_to_entry(ks_path
)
3485 if path_entry
.key00
.id not in self
._nsm
._vnfrs
:
3486 self
._log
.error("%s request for non existent record path %s",
3488 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
3492 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3493 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3494 yield from self
._nsm
.update_vnfr(msg
)
3495 elif action
== rwdts
.QueryAction
.DELETE
:
3496 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3497 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
3499 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
3501 self
._log
.debug("Registering for VNFR using xpath: %s",
3502 VnfrDtsHandler
.XPATH
,)
3504 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
3505 on_prepare
=on_prepare
,)
3506 with self
._dts
.group_create() as group
:
3507 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
3509 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
3512 class NsManager(object):
3513 """ The Network Service Manager class"""
3514 def __init__(self
, dts
, log
, loop
,
3515 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
3516 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
3520 self
._nsr
_handler
= nsr_handler
3521 self
._vnfr
_pub
_handler
= vnfr_handler
3522 self
._vlr
_pub
_handler
= vlr_handler
3523 self
._vnffgmgr
= vnffgmgr
3524 self
._vnfd
_pub
_handler
= vnfd_pub_handler
3525 self
._cloud
_account
_handler
= cloud_account_handler
3527 self
._ro
_plugin
_selector
= ro_plugin_selector
3529 # Intialize the set of variables for implementing Scaling RPC using REST.
3530 self
._headers
= {"content-type":"application/json", "accept":"application/json"}
3531 #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed.
3532 self
._user
= 'admin'
3533 self
._password
= 'admin'
3534 self
._ip
= 'localhost'
3536 self
._conf
_url
= "https://{ip}:{port}/api/config". \
3545 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
3547 # TODO: All these handlers should move to tasklet level.
3548 # Passing self is often an indication of bad design
3549 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
3550 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
3551 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
3552 VnfrDtsHandler(dts
, log
, loop
, self
),
3553 NsrDtsHandler(dts
, log
, loop
, self
),
3554 ScalingRpcHandler(log
, dts
, loop
, self
.scale_rpc_callback
),
3555 NsrRpcDtsHandler(dts
,log
,loop
,self
),
3556 self
._vnfd
_dts
_handler
,
3577 def nsr_handler(self
):
3578 """" NSR handler """
3579 return self
._nsr
_handler
3583 """" So Obj handler """
3588 """ NSRs in this NSM"""
3593 """ NSDs in this NSM"""
3598 """ VNFDs in this NSM"""
3603 """ VNFRs in this NSM"""
3607 def nsr_pub_handler(self
):
3608 """ NSR publication handler """
3609 return self
._nsr
_handler
3612 def vnfr_pub_handler(self
):
3613 """ VNFR publication handler """
3614 return self
._vnfr
_pub
_handler
3617 def vlr_pub_handler(self
):
3618 """ VLR publication handler """
3619 return self
._vlr
_pub
_handler
3622 def vnfd_pub_handler(self
):
3623 return self
._vnfd
_pub
_handler
3627 """ Register all static DTS handlers """
3628 for dts_handle
in self
._dts
_handlers
:
3629 yield from dts_handle
.register()
3632 def get_ns_by_nsr_id(self
, nsr_id
):
3633 """ get NSR by nsr id """
3634 if nsr_id
not in self
._nsrs
:
3635 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
3637 return self
._nsrs
[nsr_id
]
3639 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
3640 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3645 nsr
= self
._nsrs
[nsr_id
]
3646 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3647 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3649 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
3651 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
3652 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3657 nsr
= self
._nsrs
[nsr_id
]
3658 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3659 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3661 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
3663 def scale_rpc_callback(self
, xact
, msg
, action
):
3664 """Callback handler for RPC calls
3666 xact : Transaction Handler
3668 action : Scaling Action
3670 def get_scaling_group_information():
3671 scaling_group_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
3672 output
= requests
.get(scaling_group_url
, headers
=self
._headers
, auth
=(self
._user
, self
._password
), verify
=False)
3673 if output
.text
== None or len(output
.text
) == 0:
3674 self
.log
.error("nsr id %s information not present", self
._nsr
_id
)
3676 scaling_group_info
= json
.loads(output
.text
)
3677 return scaling_group_info
3679 def config_scaling_group_information(scaling_group_info
):
3680 data_str
= json
.dumps(scaling_group_info
)
3681 self
.log
.debug("scaling group Info %s", data_str
)
3683 scale_out_url
= "{url}/ns-instance-config/nsr/{nsr_id}".format(url
=self
._conf
_url
, nsr_id
=msg
.nsr_id_ref
)
3684 response
= requests
.put(scale_out_url
, data
=data_str
, verify
=False, auth
=(self
._user
, self
._password
), headers
=self
._headers
)
3685 response
.raise_for_status()
3688 scaling_group_info
= get_scaling_group_information()
3689 if scaling_group_info
is None:
3692 scaling_group_present
= False
3693 if "scaling-group" in scaling_group_info
["nsr:nsr"]:
3694 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
3695 for scaling_group
in scaling_group_array
:
3696 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
3697 scaling_group_present
= True
3698 if 'instance' not in scaling_group
:
3699 scaling_group
['instance'] = []
3700 for instance
in scaling_group
['instance']:
3701 if instance
["id"] == int(msg
.instance_id
):
3702 self
.log
.error("scaling group with instance id %s exists for scale out", msg
.instance_id
)
3704 scaling_group
["instance"].append({"id": int(msg
.instance_id
)})
3706 if not scaling_group_present
:
3707 scaling_group_info
["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg
.scaling_group_name_ref
, "instance": [{"id": msg
.instance_id
}]}]
3709 config_scaling_group_information(scaling_group_info
)
3713 scaling_group_info
= get_scaling_group_information()
3714 if scaling_group_info
is None:
3717 scaling_group_array
= scaling_group_info
["nsr:nsr"]["scaling-group"]
3718 scaling_group_present
= False
3719 instance_id_present
= False
3720 for scaling_group
in scaling_group_array
:
3721 if scaling_group
["scaling-group-name-ref"] == msg
.scaling_group_name_ref
:
3722 scaling_group_present
= True
3723 if 'instance' in scaling_group
:
3724 instance_array
= scaling_group
["instance"];
3725 for index
in range(len(instance_array
)):
3726 if instance_array
[index
]["id"] == int(msg
.instance_id
):
3727 instance_array
.pop(index
)
3728 instance_id_present
= True
3731 if not scaling_group_present
:
3732 self
.log
.error("Scaling group %s doesnot exists for scale in", msg
.scaling_group_name_ref
)
3735 if not instance_id_present
:
3736 self
.log
.error("Instance id %s doesnot exists for scale in", msg
.instance_id
)
3739 config_scaling_group_information(scaling_group_info
)
3742 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
3743 self
._loop
.run_in_executor(None, scale_out
)
3745 self
._loop
.run_in_executor(None, scale_in
)
3747 def nsr_update_cfg(self
, nsr_id
, msg
):
3748 nsr
= self
._nsrs
[nsr_id
]
3749 nsr
.nsr_cfg_msg
= msg
3751 def nsr_instantiate_vl(self
, nsr_id
, vld
):
3752 self
.log
.debug("NSR {} create VL {}".format(nsr_id
, vld
))
3753 nsr
= self
._nsrs
[nsr_id
]
3754 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3755 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3757 # Not calling in a separate task as this is called from a separate task
3758 yield from nsr
.create_vl_instance(vld
)
3760 def nsr_terminate_vl(self
, nsr_id
, vld
):
3761 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
3762 nsr
= self
._nsrs
[nsr_id
]
3763 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3764 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3766 # Not calling in a separate task as this is called from a separate task
3767 yield from nsr
.delete_vl_instance(vld
)
3769 def create_nsr(self
, nsr_msg
, key_pairs
=None,restart_mode
=False):
3770 """ Create an NSR instance """
3771 self
._log
.debug("NSRMSG %s", nsr_msg
)
3772 if nsr_msg
.id in self
._nsrs
:
3773 msg
= "NSR id %s already exists" % nsr_msg
.id
3774 self
._log
.error(msg
)
3775 raise NetworkServiceRecordError(msg
)
3777 self
._log
.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3781 nsm_plugin
= self
._ro
_plugin
_selector
.ro_plugin
3782 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.cloud_account
)
3784 nsr
= NetworkServiceRecord(self
._dts
,
3792 restart_mode
=restart_mode
,
3793 vlr_handler
=self
._ro
_plugin
_selector
._records
_publisher
._vlr
_pub
_hdlr
3795 self
._nsrs
[nsr_msg
.id] = nsr
3796 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
, key_pairs
)
3800 def delete_nsr(self
, nsr_id
):
3802 Delete NSR with the passed nsr id
3804 del self
._nsrs
[nsr_id
]
3807 def instantiate_ns(self
, nsr_id
, config_xact
):
3808 """ Instantiate an NS instance """
3809 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
3810 if nsr_id
not in self
._nsrs
:
3811 err
= "NSR id %s not found " % nsr_id
3812 self
._log
.error(err
)
3813 raise NetworkServiceRecordError(err
)
3815 nsr
= self
._nsrs
[nsr_id
]
3816 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
3819 def update_vnfr(self
, vnfr
):
3820 """Create/Update an VNFR """
3822 vnfr_state
= self
._vnfrs
[vnfr
.id].state
3823 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
3825 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
3826 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
3827 yield from nsr
.update_state()
3829 def find_nsr_for_vnfr(self
, vnfr_id
):
3830 """ Find the NSR which )has the passed vnfr id"""
3831 for nsr
in list(self
.nsrs
.values()):
3832 for vnfr
in list(nsr
.vnfrs
.values()):
3833 if vnfr
.id == vnfr_id
:
3837 def delete_vnfr(self
, vnfr_id
):
3838 """ Delete VNFR with the passed id"""
3839 del self
._vnfrs
[vnfr_id
]
3842 def get_nsr_config(self
, nsd_id
):
3843 xpath
= "C,/nsr:ns-instance-config"
3844 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3846 for result
in results
:
3847 entry
= yield from result
3848 ns_instance_config
= entry
.result
3850 for nsr
in ns_instance_config
.nsr
:
3851 if nsr
.nsd
.id == nsd_id
:
3856 def get_nsd(self
, nsd_id
):
3857 """ Get network service descriptor for the passed nsd_id"""
3858 if nsd_id
not in self
._nsds
:
3859 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
3860 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
3862 return self
._nsds
[nsd_id
]
3864 def create_nsd(self
, nsd_msg
):
3865 """ Create a network service descriptor """
3866 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
3867 if nsd_msg
.id in self
._nsds
:
3868 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
3869 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
3871 nsd
= NetworkServiceDescriptor(
3878 self
._nsds
[nsd_msg
.id] = nsd
3882 def update_nsd(self
, nsd
):
3883 """ update the Network service descriptor """
3884 self
._log
.debug("Update network service descriptor - %s", nsd
)
3885 if nsd
.id not in self
._nsds
:
3886 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
3887 self
.create_nsd(nsd
)
3889 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
3890 self
._nsds
[nsd
.id].update(nsd
)
3892 def delete_nsd(self
, nsd_id
):
3893 """ Delete the Network service descriptor with the passed id """
3894 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
3895 if nsd_id
not in self
._nsds
:
3896 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
3897 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
3898 del self
._nsds
[nsd_id
]
3900 def get_vnfd_config(self
, xact
):
3901 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
3902 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
3903 if cfg
.id not in self
._vnfds
:
3904 self
.create_vnfd(cfg
)
3906 def get_vnfd(self
, vnfd_id
, xact
):
3907 """ Get virtual network function descriptor for the passed vnfd_id"""
3908 if vnfd_id
not in self
._vnfds
:
3909 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3910 self
.get_vnfd_config(xact
)
3912 if vnfd_id
not in self
._vnfds
:
3913 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3914 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
3916 return self
._vnfds
[vnfd_id
]
3918 def create_vnfd(self
, vnfd
):
3919 """ Create a virtual network function descriptor """
3920 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
3921 if vnfd
.id in self
._vnfds
:
3922 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
3923 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
3925 self
._vnfds
[vnfd
.id] = vnfd
3926 return self
._vnfds
[vnfd
.id]
3928 def update_vnfd(self
, vnfd
):
3929 """ Update the virtual network function descriptor """
3930 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
3933 if vnfd
.id not in self
._vnfds
:
3934 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
3935 self
.create_vnfd(vnfd
)
3937 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
3938 self
._vnfds
[vnfd
.id] = vnfd
3941 def delete_vnfd(self
, vnfd_id
):
3942 """ Delete the virtual network function descriptor with the passed id """
3943 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
3944 if vnfd_id
not in self
._vnfds
:
3945 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
3946 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
3948 del self
._vnfds
[vnfd_id
]
3951 def publish_nsr(self
, xact
, path
, msg
):
3952 """ Publish a NSR """
3953 self
._log
.debug("Publish NSR with path %s, msg %s",
3955 yield from self
.nsr_handler
.update(xact
, path
, msg
)
3958 def unpublish_nsr(self
, xact
, path
):
3959 """ Un Publish an NSR """
3960 self
._log
.debug("Publishing delete NSR with path %s", path
)
3961 yield from self
.nsr_handler
.delete(path
, xact
)
3963 def vnfr_is_ready(self
, vnfr_id
):
3964 """ VNFR with the id is ready """
3965 self
._log
.debug("VNFR id %s ready", vnfr_id
)
3966 if vnfr_id
not in self
._vnfds
:
3967 err
= "Did not find VNFR ID with id %s" % vnfr_id
3968 self
._log
.critical("err")
3969 raise VirtualNetworkFunctionRecordError(err
)
3970 self
._vnfrs
[vnfr_id
].is_ready()
3974 def terminate_ns(self
, nsr_id
, xact
):
3976 Terminate network service for the given NSR Id
3979 # Terminate the instances/networks assocaited with this nw service
3980 self
._log
.debug("Terminating the network service %s", nsr_id
)
3982 yield from self
._nsrs
[nsr_id
].terminate()
3983 except Exception as e
:
3984 self
.log
.exception("Failed to terminate NSR[id=%s]", nsr_id
)
3986 # Unpublish the NSR record
3987 self
._log
.debug("Unpublishing the network service %s", nsr_id
)
3988 yield from self
._nsrs
[nsr_id
].unpublish(xact
)
3990 # Finaly delete the NS instance from this NS Manager
3991 self
._log
.debug("Deletng the network service %s", nsr_id
)
3992 self
.delete_nsr(nsr_id
)
3995 class NsmRecordsPublisherProxy(object):
3996 """ This class provides a publisher interface that allows plugin objects
3997 to publish NSR/VNFR/VLR"""
3999 def __init__(self
, dts
, log
, loop
, nsr_pub_hdlr
, vnfr_pub_hdlr
, vlr_pub_hdlr
):
4003 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4004 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4005 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4008 def publish_nsr(self
, xact
, nsr
):
4009 """ Publish an NSR """
4010 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4011 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4014 def unpublish_nsr(self
, xact
, nsr
):
4015 """ Unpublish an NSR """
4016 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4017 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4020 def publish_vnfr(self
, xact
, vnfr
):
4021 """ Publish an VNFR """
4022 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4023 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4026 def unpublish_vnfr(self
, xact
, vnfr
):
4027 """ Unpublish a VNFR """
4028 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4029 return (yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
))
4032 def publish_vlr(self
, xact
, vlr
):
4033 """ Publish a VLR """
4034 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4035 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4038 def unpublish_vlr(self
, xact
, vlr
):
4039 """ Unpublish a VLR """
4040 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4041 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4044 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4045 """ The Network service Monitor DTS handler """
4046 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4047 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4049 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4050 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4052 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4054 def __init__(self
, log
, dts
, loop
, callback
=None):
4055 super().__init
__(log
, dts
, loop
)
4056 self
.callback
= callback
4057 self
.last_instance_id
= defaultdict(int)
4063 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4064 assert action
== rwdts
.QueryAction
.RPC
4067 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4068 "instance_id": msg
.instance_id
})
4070 xact_info
.respond_xpath(
4071 rwdts
.XactRspCode
.ACK
,
4072 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4076 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4077 except Exception as e
:
4078 self
.log
.exception(e
)
4079 xact_info
.respond_xpath(
4080 rwdts
.XactRspCode
.NACK
,
4081 self
.__class
__.SCALE_IN_OUTPUT_XPATH
)
4084 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4085 assert action
== rwdts
.QueryAction
.RPC
4088 scaling_group
= msg
.scaling_group_name_ref
4089 if not msg
.instance_id
:
4090 last_instance_id
= self
.last_instance_id
[scale_group
]
4091 msg
.instance_id
= last_instance_id
+ 1
4092 self
.last_instance_id
[scale_group
] += 1
4094 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4095 "instance_id": msg
.instance_id
})
4097 xact_info
.respond_xpath(
4098 rwdts
.XactRspCode
.ACK
,
4099 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4103 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4104 except Exception as e
:
4105 self
.log
.exception(e
)
4106 xact_info
.respond_xpath(
4107 rwdts
.XactRspCode
.NACK
,
4108 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
)
4110 scale_in_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4111 on_prepare
=on_scale_in_prepare
)
4112 scale_out_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4113 on_prepare
=on_scale_out_prepare
)
4115 with self
.dts
.group_create() as group
:
4117 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4118 handler
=scale_in_hdl
,
4119 flags
=rwdts
.Flag
.PUBLISHER
)
4121 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4122 handler
=scale_out_hdl
,
4123 flags
=rwdts
.Flag
.PUBLISHER
)
4126 class NsmTasklet(rift
.tasklets
.Tasklet
):
4128 The network service manager tasklet
4130 def __init__(self
, *args
, **kwargs
):
4131 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
4132 self
.rwlog
.set_category("rw-mano-log")
4133 self
.rwlog
.set_subcategory("nsm")
4138 self
._ro
_plugin
_selector
= None
4139 self
._vnffgmgr
= None
4141 self
._nsr
_handler
= None
4142 self
._vnfr
_pub
_handler
= None
4143 self
._vlr
_pub
_handler
= None
4144 self
._vnfd
_pub
_handler
= None
4145 self
._scale
_cfg
_handler
= None
4147 self
._records
_publisher
_proxy
= None
4150 """ The task start callback """
4151 super(NsmTasklet
, self
).start()
4152 self
.log
.info("Starting NsmTasklet")
4154 self
.log
.debug("Registering with dts")
4155 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
4156 RwNsmYang
.get_schema(),
4158 self
.on_dts_state_change
)
4160 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
4166 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
4169 def on_instance_started(self
):
4170 """ Task instance started callback """
4171 self
.log
.debug("Got instance started callback")
4175 """ Task init callback """
4176 self
.log
.debug("Got instance started callback")
4178 self
.log
.debug("creating config account handler")
4180 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(self
._dts
, self
.log
, self
.loop
)
4181 yield from self
._nsr
_pub
_handler
.register()
4183 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4184 yield from self
._vnfr
_pub
_handler
.register()
4186 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4187 yield from self
._vlr
_pub
_handler
.register()
4189 manifest
= self
.tasklet_info
.get_pb_manifest()
4190 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4191 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4192 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4194 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(use_ssl
, ssl_cert
, ssl_key
, self
.loop
)
4196 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4200 self
._nsr
_pub
_handler
,
4201 self
._vnfr
_pub
_handler
,
4202 self
._vlr
_pub
_handler
,
4205 # Register the NSM to receive the nsm plugin
4206 # when cloud account is configured
4207 self
._ro
_plugin
_selector
= cloud
.ROAccountPluginSelector(
4211 self
._records
_publisher
_proxy
,
4213 yield from self
._ro
_plugin
_selector
.register()
4215 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4220 yield from self
._cloud
_account
_handler
.register()
4222 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
)
4223 yield from self
._vnffgmgr
.register()
4225 self
._nsm
= NsManager(
4229 self
._nsr
_pub
_handler
,
4230 self
._vnfr
_pub
_handler
,
4231 self
._vlr
_pub
_handler
,
4232 self
._ro
_plugin
_selector
,
4234 self
._vnfd
_pub
_handler
,
4235 self
._cloud
_account
_handler
4238 yield from self
._nsm
.register()
4242 """ Task run callback """
4246 def on_dts_state_change(self
, state
):
4247 """Take action according to current dts state to transition
4248 application into the corresponding application state
4251 state - current dts state
4254 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
4255 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
4259 rwdts
.State
.INIT
: self
.init
,
4260 rwdts
.State
.RUN
: self
.run
,
4263 # Transition application to next state
4264 handler
= handlers
.get(state
, None)
4265 if handler
is not None:
4266 yield from handler()
4268 # Transition dts to next state
4269 next_state
= switch
.get(state
, None)
4270 if next_state
is not None:
4271 self
.log
.debug("Changing state to %s", next_state
)
4272 self
._dts
.handle
.set_state(next_state
)