2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.mano
.dts
as mano_dts
55 class VMResourceError(Exception):
56 """ VM resource Error"""
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
70 class NotImplemented(Exception):
71 """Not implemented """
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
135 class VNFMPlacementGroupError(Exception):
138 class VirtualNetworkFunctionRecordState(enum
.Enum
):
145 VL_TERMINATE_PHASE
= 6
146 VDU_TERMINATE_PHASE
= 7
151 class VDURecordState(enum
.Enum
):
152 """VDU record state """
155 RESOURCE_ALLOC_PENDING
= 3
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
168 self
._component
= component
169 self
._cluster
_name
= cluster_name
170 self
._vcs
_handler
= vcs_handler
171 self
._mangled
_name
= mangled_name
174 def mangle_name(component_name
, vnf_name
, vnfd_id
):
175 """ mangled component name """
176 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
180 """ name of this component"""
181 return self
._mangled
_name
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self
.name
)
192 def instance_xpath(self
):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
197 "[instance-name = '{}']".format(self
._cluster
_name
))
200 def start_comp_xpath(self
):
201 """ start component xpath """
202 return (self
.instance_xpath
+
203 "/child-n[instance-name = 'START-REQ']")
205 def get_start_comp_msg(self
, ip_address
):
206 """ start this component """
207 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
208 start_msg
.instance_name
= 'START-REQ'
209 start_msg
.component_name
= self
.name
210 start_msg
.admin_command
= "START"
211 start_msg
.ip_address
= ip_address
217 """ Returns the message for this vcs component"""
219 vcs_comp_dict
= self
._component
.as_dict()
221 def mangle_comp_names(comp_dict
):
222 """ mangle component name with VNF name, id"""
223 for key
, val
in comp_dict
.items():
224 if isinstance(val
, dict):
225 comp_dict
[key
] = mangle_comp_names(val
)
226 elif isinstance(val
, list):
229 if isinstance(ent
, dict):
230 val
[i
] = mangle_comp_names(ent
)
234 elif key
== "component_name":
235 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
240 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
241 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
245 def publish(self
, xact
):
246 """ Publishes the VCS component """
247 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self
.name
, self
.path
, self
.msg
)
249 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
252 def start(self
, xact
, parent
, ip_addr
=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg
= self
.get_start_comp_msg(ip_addr
)
256 self
._log
.debug("starting component %s %s",
257 self
.start_comp_xpath
, start_msg
)
258 yield from self
._dts
.query_create(self
.start_comp_xpath
,
261 self
._log
.debug("started component %s, %s",
262 self
.start_comp_xpath
, start_msg
)
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
278 placement_groups
=[]):
284 self
._mgmt
_intf
= mgmt_intf
285 self
._cloud
_account
_name
= cloud_account_name
286 self
._vnfd
_package
_store
= vnfd_package_store
287 self
._mgmt
_network
= mgmt_network
289 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
292 self
._state
= VDURecordState
.INIT
293 self
._state
_failed
_reason
= None
294 self
._request
_id
= str(uuid
.uuid4())
295 self
._name
= vnfr
.name
+ "__" + vdud
.id
296 self
._placement
_groups
= placement_groups
299 self
._vdud
_cloud
_init
= None
300 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
303 def vdu_opdata_register(self
):
304 yield from self
._vdur
_console
_handler
.register()
306 def cp_ip_addr(self
, cp_name
):
307 """ Find ip address by connection point name """
308 if self
._vm
_resp
is not None:
309 for conn_point
in self
._vm
_resp
.connection_points
:
310 if conn_point
.name
== cp_name
:
311 return conn_point
.ip_address
314 def cp_id(self
, cp_name
):
315 """ Find connection point id by connection point name """
316 if self
._vm
_resp
is not None:
317 for conn_point
in self
._vm
_resp
.connection_points
:
318 if conn_point
.name
== cp_name
:
319 return conn_point
.connection_point_id
332 """ Return this VDUR's name """
336 def cloud_account_name(self
):
337 """ Cloud account this VDU should be created in """
338 return self
._cloud
_account
_name
341 def image_name(self
):
342 """ name that should be used to lookup the image on the CMP """
343 return os
.path
.basename(self
._vdud
.image
)
346 def image_checksum(self
):
347 """ name that should be used to lookup the image on the CMP """
348 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
351 def management_ip(self
):
354 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
357 def vm_management_ip(self
):
360 return self
._vm
_resp
.management_ip
363 def operational_status(self
):
364 """ Operational status of this VDU"""
365 op_stats_dict
= {"INIT": "init",
366 "INSTANTIATING": "vm_init_phase",
367 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "TERMINATING": "terminated",
371 "TERMINATED": "terminated",
373 return op_stats_dict
[self
._state
.name
]
378 vdu_fields
= ["vm_flavor",
384 vdu_copy_dict
= {k
: v
for k
, v
in
385 self
._vdud
.as_dict().items() if k
in vdu_fields
}
386 vdur_dict
= {"id": self
._vdur
_id
,
387 "vdu_id_ref": self
._vdud
.id,
388 "operational_status": self
.operational_status
,
389 "operational_status_details": self
._state
_failed
_reason
,
391 if self
.vm_resp
is not None:
392 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
393 "flavor_id": self
.vm_resp
.flavor_id
,
394 "image_id": self
.vm_resp
.image_id
,
397 if self
.management_ip
is not None:
398 vdur_dict
["management_ip"] = self
.management_ip
400 if self
.vm_management_ip
is not None:
401 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
403 vdur_dict
.update(vdu_copy_dict
)
408 for intf
, cp_id
, vlr
in self
._int
_intf
:
409 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
411 icp_list
.append({"name": cp
.name
,
413 "type_yang": "VPORT",
414 "ip_address": self
.cp_ip_addr(cp
.id)})
416 ii_list
.append({"name": intf
.name
,
417 "vdur_internal_connection_point_ref": cp
.id,
418 "virtual_interface": {}})
420 vdur_dict
["internal_connection_point"] = icp_list
421 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
422 vdur_dict
["internal_interface"] = ii_list
425 for intf
, cp
, vlr
in self
._ext
_intf
:
426 ei_list
.append({"name": cp
,
427 "vnfd_connection_point_ref": cp
,
428 "virtual_interface": {}})
429 self
._vnfr
.update_cp(cp
, self
.cp_ip_addr(cp
), self
.cp_id(cp
))
431 vdur_dict
["external_interface"] = ei_list
433 placement_groups
= []
434 for group
in self
._placement
_groups
:
435 placement_groups
.append(group
.as_dict())
437 vdur_dict
['placement_groups_info'] = placement_groups
438 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
441 def resmgr_path(self
):
442 """ path for resource-mgr"""
443 return ("D,/rw-resource-mgr:resource-mgmt" +
445 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
448 def vm_flavor_msg(self
):
449 """ VM flavor message """
450 flavor
= self
._vdud
.vm_flavor
.__class
__()
451 flavor
.copy_from(self
._vdud
.vm_flavor
)
456 def vdud_cloud_init(self
):
457 """ Return the cloud-init contents for the VDU """
458 if self
._vdud
_cloud
_init
is None:
459 self
._vdud
_cloud
_init
= self
.cloud_init()
461 return self
._vdud
_cloud
_init
463 def cloud_init(self
):
464 """ Populate cloud_init with cloud-config script from
465 either the inline contents or from the file provided
467 if self
._vdud
.cloud_init
is not None:
468 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
469 return self
._vdud
.cloud_init
470 elif self
._vdud
.cloud_init_file
is not None:
471 # Get cloud-init script contents from the file provided in the cloud_init_file param
472 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
473 filename
= self
._vdud
.cloud_init_file
474 self
._vnfd
_package
_store
.refresh()
475 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
476 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
478 return cloud_init_extractor
.read_script(stored_package
, filename
)
479 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
480 raise VirtualDeploymentUnitRecordError(e
)
482 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
484 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
486 availability_zones
= []
488 for group
in self
._placement
_groups
:
489 if group
.has_field('host_aggregate'):
490 for aggregate
in group
.host_aggregate
:
491 host_aggregates
.append(aggregate
.as_dict())
492 if group
.has_field('availability_zone'):
493 availability_zones
.append(group
.availability_zone
.as_dict())
494 if group
.has_field('server_group'):
495 server_groups
.append(group
.server_group
.as_dict())
497 if availability_zones
:
498 if len(availability_zones
) > 1:
499 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
500 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
502 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
505 if len(server_groups
) > 1:
506 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
507 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
509 vm_create_msg_dict
['server_group'] = server_groups
[0]
512 vm_create_msg_dict
['host_aggregate'] = host_aggregates
516 def process_placement_groups(self
, vm_create_msg_dict
):
517 """Process the placement_groups and fill resource-mgr request"""
518 if not self
._placement
_groups
:
521 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
522 assert len(cloud_set
) == 1
523 cloud_type
= cloud_set
.pop()
525 if cloud_type
== 'openstack':
526 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
529 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
532 def resmgr_msg(self
, config
=None):
533 vdu_fields
= ["vm_flavor",
539 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
540 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
542 vm_create_msg_dict
= {
544 "image_name": self
.image_name
,
547 if self
.image_checksum
is not None:
548 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
550 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
551 if self
._vdud
.has_field('mgmt_vpci'):
552 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
554 self
._log
.debug("VDUD: %s", self
._vdud
)
555 if config
is not None:
556 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
558 if self
._mgmt
_network
:
559 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
562 for intf
, cp
, vlr
in self
._ext
_intf
:
563 cp_info
= {"name": cp
,
564 "virtual_link_id": vlr
.network_id
,
565 "type_yang": intf
.virtual_interface
.type_yang
}
567 if (intf
.virtual_interface
.has_field('vpci') and
568 intf
.virtual_interface
.vpci
is not None):
569 cp_info
["vpci"] = intf
.virtual_interface
.vpci
571 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
572 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
574 cp_list
.append(cp_info
)
576 for intf
, cp
, vlr
in self
._int
_intf
:
577 if (intf
.virtual_interface
.has_field('vpci') and
578 intf
.virtual_interface
.vpci
is not None):
579 cp_list
.append({"name": cp
,
580 "virtual_link_id": vlr
.network_id
,
581 "type_yang": intf
.virtual_interface
.type_yang
,
582 "vpci": intf
.virtual_interface
.vpci
})
584 cp_list
.append({"name": cp
,
585 "virtual_link_id": vlr
.network_id
,
586 "type_yang": intf
.virtual_interface
.type_yang
})
588 vm_create_msg_dict
["connection_points"] = cp_list
589 vm_create_msg_dict
.update(vdu_copy_dict
)
591 self
.process_placement_groups(vm_create_msg_dict
)
593 msg
= RwResourceMgrYang
.VDUEventData()
594 msg
.event_id
= self
._request
_id
595 msg
.cloud_account
= self
.cloud_account_name
596 msg
.request_info
.from_dict(vm_create_msg_dict
)
600 def terminate(self
, xact
):
601 """ Delete resource in VIM """
602 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
603 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
606 self
._state
= VDURecordState
.TERMINATING
607 if self
._vm
_resp
is not None:
609 with self
._dts
.transaction() as new_xact
:
610 yield from self
.delete_resource(new_xact
)
612 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
614 if self
._rm
_regh
is not None:
615 self
._log
.debug("Deregistering resource manager registration handle")
616 self
._rm
_regh
.deregister()
619 if self
._vdur
_console
_handler
is not None:
620 self
._log
.error("Deregistering vnfr vdur registration handle")
621 self
._vdur
_console
_handler
._regh
.deregister()
622 self
._vdur
_console
_handler
._regh
= None
624 self
._state
= VDURecordState
.TERMINATED
626 def find_internal_cp_by_cp_id(self
, cp_id
):
627 """ Find the CP corresponding to the connection point id"""
630 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
633 for int_cp
in self
._vdud
.internal_connection_point
:
634 self
._log
.debug("Checking for int cp %s in internal connection points",
636 if int_cp
.id == cp_id
:
641 self
._log
.debug("Failed to find cp %s in internal connection points",
643 msg
= "Failed to find cp %s in internal connection points" % cp_id
644 raise VduRecordError(msg
)
646 # return the VLR associated with the connection point
650 def create_resource(self
, xact
, vnfr
, config
=None):
651 """ Request resource from ResourceMgr """
652 def find_cp_by_name(cp_name
):
653 """ Find a connection point by name """
655 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
656 for ext_cp
in vnfr
._cprs
:
657 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
658 if ext_cp
.name
== cp_name
:
662 self
._log
.debug("Failed to find cp %s in external connection points",
666 def find_internal_vlr_by_cp_name(cp_name
):
667 """ Find the VLR corresponding to the connection point name"""
670 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
673 for int_cp
in self
._vdud
.internal_connection_point
:
674 self
._log
.debug("Checking for int cp %s in internal connection points",
676 if int_cp
.id == cp_name
:
681 self
._log
.debug("Failed to find cp %s in internal connection points",
683 msg
= "Failed to find cp %s in internal connection points" % cp_name
684 raise VduRecordError(msg
)
686 # return the VLR associated with the connection point
687 return vnfr
.find_vlr_by_cp(cp_name
)
689 block
= xact
.block_create()
691 self
._log
.debug("Executing vm request id: %s, action: create",
694 # Resolve the networks associated external interfaces
695 for ext_intf
in self
._vdud
.external_interface
:
696 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
697 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
698 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
700 self
._log
.debug("Failed to find connection point - %s",
701 ext_intf
.vnfd_connection_point_ref
)
703 self
._log
.debug("Connection point name [%s], type[%s]",
704 cp
.name
, cp
.type_yang
)
706 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
708 etuple
= (ext_intf
, cp
.name
, vlr
)
709 self
._ext
_intf
.append(etuple
)
711 self
._log
.debug("Created external interface tuple : %s", etuple
)
713 # Resolve the networks associated internal interfaces
714 for intf
in self
._vdud
.internal_interface
:
715 cp_id
= intf
.vdu_internal_connection_point_ref
716 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
720 vlr
= find_internal_vlr_by_cp_name(cp_id
)
721 except Exception as e
:
722 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
723 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
724 raise VduRecordError(msg
)
726 ituple
= (intf
, cp_id
, vlr
)
727 self
._int
_intf
.append(ituple
)
729 self
._log
.debug("Created internal interface tuple : %s", ituple
)
731 resmgr_path
= self
.resmgr_path
732 resmgr_msg
= self
.resmgr_msg(config
)
734 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
735 block
.add_query_create(resmgr_path
, resmgr_msg
)
737 res_iter
= yield from block
.execute(now
=True)
745 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
746 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
747 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
748 return resp
.resource_info
751 def delete_resource(self
, xact
):
752 block
= xact
.block_create()
754 self
._log
.debug("Executing vm request id: %s, action: delete",
757 block
.add_query_delete(self
.resmgr_path
)
759 yield from block
.execute(flags
=0, now
=True)
762 def read_resource(self
, xact
):
763 block
= xact
.block_create()
765 self
._log
.debug("Executing vm request id: %s, action: delete",
768 block
.add_query_read(self
.resmgr_path
)
770 res_iter
= yield from block
.execute(flags
=0, now
=True)
775 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
776 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
777 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
778 #self._vm_resp = resp.resource_info
779 return resp
.resource_info
783 def start_component(self
):
784 """ This VDUR is active """
785 self
._log
.debug("Starting component %s for vdud %s vdur %s",
786 self
._vdud
.vcs_component_ref
,
789 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
790 self
.vm_resp
.management_ip
)
794 """ Is this VDU active """
795 return True if self
._state
is VDURecordState
.READY
else False
798 def instantiation_failed(self
, failed_reason
=None):
799 """ VDU instantiation failed """
800 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
801 self
._state
= VDURecordState
.FAILED
802 self
._state
_failed
_reason
= failed_reason
803 yield from self
._vnfr
.instantiation_failed(failed_reason
)
806 def vdu_is_active(self
):
807 """ This VDU is active"""
809 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
812 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
814 if self
._vdud
.vcs_component_ref
is not None:
815 yield from self
.start_component()
817 self
._state
= VDURecordState
.READY
819 if self
._vnfr
.all_vdus_active():
820 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
821 yield from self
._vnfr
.is_ready()
824 def instantiate(self
, xact
, vnfr
, config
=None):
825 """ Instantiate this VDU """
826 self
._state
= VDURecordState
.INSTANTIATING
829 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
830 """ This VDUR is active """
831 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
836 if (query_action
== rwdts
.QueryAction
.UPDATE
or
837 query_action
== rwdts
.QueryAction
.CREATE
):
840 if msg
.resource_state
== "active":
841 # Move this VDU to ready state
842 yield from self
.vdu_is_active()
843 elif msg
.resource_state
== "failed":
844 yield from self
.instantiation_failed(msg
.resource_errors
)
845 elif query_action
== rwdts
.QueryAction
.DELETE
:
846 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
848 raise NotImplementedError(
849 "%s action on VirtualDeployementUnitRecord not supported",
852 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
855 reg_event
= asyncio
.Event(loop
=self
._loop
)
858 def on_ready(regh
, status
):
861 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
862 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
863 flags
=rwdts
.Flag
.SUBSCRIBER
,
865 yield from reg_event
.wait()
867 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
868 self
._vm
_resp
= vm_resp
870 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
871 self
._log
.debug("Requested VM from resource manager response %s",
873 if vm_resp
.resource_state
== "active":
874 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
876 yield from self
.vdu_is_active()
877 self
._state
= VDURecordState
.READY
878 elif (vm_resp
.resource_state
== "pending" or
879 vm_resp
.resource_state
== "inactive"):
880 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
882 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
883 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
884 # flags=rwdts.Flag.SUBSCRIBER,
887 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
889 raise VirtualDeploymentUnitRecordError(
890 "Failed VDUR instantiation %s " % vm_resp
)
892 except Exception as e
:
894 traceback
.print_exc()
895 self
._log
.exception(e
)
896 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
897 self
._state
= VDURecordState
.FAILED
898 yield from self
.instantiation_failed(str(e
))
901 class VlRecordState(enum
.Enum
):
902 """ VL Record State """
904 INSTANTIATION_PENDING
= 102
906 TERMINATE_PENDING
= 104
911 class InternalVirtualLinkRecord(object):
912 """ Internal Virtual Link record """
913 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
917 self
._ivld
_msg
= ivld_msg
918 self
._vnfr
_name
= vnfr_name
919 self
._cloud
_account
_name
= cloud_account_name
921 self
._vlr
_req
= self
.create_vlr()
923 self
._state
= VlRecordState
.INIT
927 """ Find VLR by id """
928 return self
._vlr
_req
.id
932 """ Name of this VL """
933 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
936 def network_id(self
):
937 """ Find VLR by id """
938 return self
._vlr
.network_id
if self
._vlr
else None
941 """ VLR path for this VLR instance"""
942 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
944 def create_vlr(self
):
945 """ Create the VLR record which will be instantiated """
947 vld_fields
= ["short_name",
954 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
956 vlr_dict
= {"id": str(uuid
.uuid4()),
958 "cloud_account": self
._cloud
_account
_name
,
960 vlr_dict
.update(vld_copy_dict
)
962 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
966 def instantiate(self
, xact
, restart_mode
=False):
967 """ Instantiate VL """
970 def instantiate_vlr():
971 """ Instantiate VLR"""
972 self
._log
.debug("Create VL with xpath %s and vlr %s",
973 self
.vlr_path(), self
._vlr
_req
)
975 with self
._dts
.transaction(flags
=0) as xact
:
976 block
= xact
.block_create()
977 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
978 self
._log
.debug("Executing VL create path:%s msg:%s",
979 self
.vlr_path(), self
._vlr
_req
)
983 res_iter
= yield from block
.execute()
985 self
._state
= VlRecordState
.FAILED
986 self
._log
.exception("Caught exception while instantial VL")
991 self
._vlr
= res
.result
993 if self
._vlr
.operational_status
== 'failed':
994 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
995 self
._state
= VlRecordState
.FAILED
996 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
998 self
._log
.info("Created VL with xpath %s and vlr %s",
999 self
.vlr_path(), self
._vlr
)
1003 """ Get the network id """
1004 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1006 for ent
in res_iter
:
1007 res
= yield from ent
1011 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1013 raise InternalVirtualLinkRecordError(err
)
1016 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1019 vl
= yield from get_vlr()
1021 yield from instantiate_vlr()
1023 yield from instantiate_vlr()
1025 self
._state
= VlRecordState
.ACTIVE
1027 def vlr_in_vns(self
):
1028 """ Is there a VLR record in VNS """
1029 if (self
._state
== VlRecordState
.ACTIVE
or
1030 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1031 self
._state
== VlRecordState
.FAILED
):
1037 def terminate(self
, xact
):
1038 """Terminate this VL """
1039 if not self
.vlr_in_vns():
1040 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1041 self
.vlr_id
, self
._state
)
1044 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1045 self
._state
= VlRecordState
.TERMINATE_PENDING
1046 block
= xact
.block_create()
1047 block
.add_query_delete(self
.vlr_path())
1048 yield from block
.execute(flags
=0, now
=True)
1049 self
._state
= VlRecordState
.TERMINATED
1050 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1053 class VirtualNetworkFunctionRecord(object):
1054 """ Virtual Network Function Record """
1055 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1059 self
._cluster
_name
= cluster_name
1060 self
._vnfr
_msg
= vnfr_msg
1061 self
._vnfr
_id
= vnfr_msg
.id
1062 self
._vnfd
_id
= vnfr_msg
.vnfd_ref
1064 self
._vcs
_handler
= vcs_handler
1065 self
._vnfr
= vnfr_msg
1066 self
._mgmt
_network
= mgmt_network
1069 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1070 self
._state
_failed
_reason
= None
1071 self
._ext
_vlrs
= {} # The list of external virtual links
1072 self
._vlrs
= [] # The list of internal virtual links
1073 self
._vdus
= [] # The list of vdu
1074 self
._vlr
_by
_cp
= {}
1076 self
._inventory
= {}
1077 self
._create
_time
= int(time
.time())
1078 self
._vnf
_mon
= None
1079 self
._config
_status
= vnfr_msg
.config_status
1080 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1082 def _get_vdur_from_vdu_id(self
, vdu_id
):
1083 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1084 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1085 for vdu
in self
._vdus
:
1086 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1087 if vdu
.vdu_id
== vdu_id
:
1090 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1093 def operational_status(self
):
1094 """ Operational status of this VNFR """
1095 op_status_map
= {"INIT": "init",
1096 "VL_INIT_PHASE": "vl_init_phase",
1097 "VM_INIT_PHASE": "vm_init_phase",
1099 "TERMINATE": "terminate",
1100 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1101 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1102 "TERMINATED": "terminated",
1103 "FAILED": "failed", }
1104 return op_status_map
[self
._state
.name
]
1107 def vnfd_xpath(self
):
1108 """ VNFD xpath associated with this VNFR """
1109 return("C,/vnfd:vnfd-catalog/"
1110 "vnfd:vnfd[vnfd:id = '{}']".format(self
._vnfd
_id
))
1114 """ VNFD for this VNFR """
1119 """ VNFD name associated with this VNFR """
1120 return self
.vnfd
.name
1124 """ Name of this VNF in the record """
1125 return self
._vnfr
.name
1128 def cloud_account_name(self
):
1129 """ Name of the cloud account this VNFR is instantiated in """
1130 return self
._vnfr
.cloud_account
1134 """ VNFD Id associated with this VNFR """
1139 """ VNFR Id associated with this VNFR """
1140 return self
._vnfr
_id
1143 def member_vnf_index(self
):
1144 """ Member VNF index associated with this VNFR """
1145 return self
._vnfr
.member_vnf_index_ref
1148 def config_status(self
):
1149 """ Config agent status for this VNFR """
1150 return self
._config
_status
1152 def component_by_name(self
, component_name
):
1153 """ Find a component by name in the inventory list"""
1154 mangled_name
= VcsComponent
.mangle_name(component_name
,
1157 return self
._inventory
[mangled_name
]
1162 def get_nsr_config(self
):
1163 ### Need access to NS instance configuration for runtime resolution.
1164 ### This shall be replaced when deployment flavors are implemented
1165 xpath
= "C,/nsr:ns-instance-config"
1166 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1168 for result
in results
:
1169 entry
= yield from result
1170 ns_instance_config
= entry
.result
1171 for nsr
in ns_instance_config
.nsr
:
1172 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1177 def start_component(self
, component_name
, ip_addr
):
1178 """ Start a component in the VNFR by name """
1179 comp
= self
.component_by_name(component_name
)
1180 yield from comp
.start(None, None, ip_addr
)
1182 def cp_ip_addr(self
, cp_name
):
1183 """ Get ip address for connection point """
1184 self
._log
.debug("cp_ip_addr()")
1185 for cp
in self
._cprs
:
1186 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1187 return cp
.ip_address
1190 def mgmt_intf_info(self
):
1191 """ Get Management interface info for this VNFR """
1192 mgmt_intf_desc
= self
.vnfd
.msg
.mgmt_interface
1194 if mgmt_intf_desc
.has_field("cp"):
1195 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1196 elif mgmt_intf_desc
.has_field("vdu_id"):
1198 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1199 ip_addr
= vdur
.management_ip
1200 except VDURecordNotFound
:
1201 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1204 ip_addr
= mgmt_intf_desc
.ip_address
1205 port
= mgmt_intf_desc
.port
1207 return ip_addr
, port
1211 """ Message associated with this VNFR """
1212 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1213 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.msg
.as_dict().items() if k
in vnfd_fields
}
1215 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1216 ip_address
, port
= self
.mgmt_intf_info()
1218 if ip_address
is not None:
1219 mgmt_intf
.ip_address
= ip_address
1220 if port
is not None:
1221 mgmt_intf
.port
= port
1223 vnfr_dict
= {"id": self
._vnfr
_id
,
1224 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1226 "member_vnf_index_ref": self
.member_vnf_index
,
1227 "vnfd_ref": self
.vnfd_id
,
1228 "operational_status": self
.operational_status
,
1229 "operational_status_details": self
._state
_failed
_reason
,
1230 "cloud_account": self
.cloud_account_name
,
1231 "config_status": self
._config
_status
1234 vnfr_dict
.update(vnfd_copy_dict
)
1236 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1237 vnfr_msg
.mgmt_interface
= mgmt_intf
1239 # Add all the VLRs to VNFR
1240 for vlr
in self
._vlrs
:
1241 ivlr
= vnfr_msg
.internal_vlr
.add()
1242 ivlr
.vlr_ref
= vlr
.vlr_id
1244 # Add all the VDURs to VDUR
1245 if self
._vdus
is not None:
1246 for vdu
in self
._vdus
:
1247 vdur
= vnfr_msg
.vdur
.add()
1248 vdur
.from_dict(vdu
.msg
.as_dict())
1250 if self
.vnfd
.msg
.mgmt_interface
.has_field('dashboard_params'):
1251 vnfr_msg
.dashboard_url
= self
.dashboard_url
1253 for cpr
in self
._cprs
:
1254 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1255 vnfr_msg
.connection_point
.append(new_cp
)
1257 if self
._vnf
_mon
is not None:
1258 for monp
in self
._vnf
_mon
.msg
:
1259 vnfr_msg
.monitoring_param
.append(
1260 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1262 if self
._vnfr
.vnf_configuration
is not None:
1263 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1264 if (ip_address
is not None and
1265 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1266 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1268 for group
in self
._vnfr
_msg
.placement_groups_info
:
1269 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1270 group_info
.from_dict(group
.as_dict())
1271 vnfr_msg
.placement_groups_info
.append(group_info
)
1276 def dashboard_url(self
):
1277 ip
, cfg_port
= self
.mgmt_intf_info()
1280 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('https'):
1281 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.https
is True:
1284 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('port'):
1285 http_port
= self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.port
1287 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1291 path
=self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1298 """ path for this VNFR """
1299 return("D,/vnfr:vnfr-catalog"
1300 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1303 def publish(self
, xact
):
1304 """ publish this VNFR """
1306 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1307 self
.xpath
, self
.msg
)
1308 vnfr
.create_time
= self
._create
_time
1309 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1310 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1311 self
.xpath
, self
.msg
)
1314 def create_vls(self
):
1315 """ Publish The VLs associated with this VNF """
1316 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1318 for ivld_msg
in self
.vnfd
.msg
.internal_vld
:
1319 self
._log
.debug("Creating internal vld:"
1320 " %s, int_cp_ref = %s",
1321 ivld_msg
, ivld_msg
.internal_connection_point
1323 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1327 vnfr_name
=self
.name
,
1328 cloud_account_name
=self
.cloud_account_name
1330 self
._vlrs
.append(vlr
)
1332 for int_cp
in ivld_msg
.internal_connection_point
:
1333 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1334 msg
= ("Connection point %s already "
1335 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1336 raise InternalVirtualLinkRecordError(msg
)
1337 self
._log
.debug("Setting vlr %s to internal cp = %s",
1339 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1342 def instantiate_vls(self
, xact
, restart_mode
=False):
1343 """ Instantiate the VLs associated with this VNF """
1344 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1347 for vlr
in self
._vlrs
:
1348 self
._log
.debug("Instantiating VLR %s", vlr
)
1349 yield from vlr
.instantiate(xact
, restart_mode
)
1351 def find_vlr_by_cp(self
, cp_name
):
1352 """ Find the VLR associated with the cp name """
1353 return self
._vlr
_by
_cp
[cp_name
]
1355 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1357 Returns the cloud specific construct for placement group
1359 input_group: VNFD PlacementGroup
1360 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1362 copy_dict
= ['name', 'requirement', 'strategy']
1363 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1364 if group_info
.placement_group_ref
== input_group
.name
and \
1365 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1366 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1367 group_dict
= {k
:v
for k
,v
in
1368 group_info
.as_dict().items()
1369 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1370 for param
in copy_dict
:
1371 group_dict
.update({param
: getattr(input_group
, param
)})
1372 group
.from_dict(group_dict
)
1377 def get_vdu_placement_groups(self
, vdu
):
1378 placement_groups
= []
1379 ### Step-1: Get VNF level placement groups
1380 for group
in self
._vnfr
_msg
.placement_groups_info
:
1381 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1382 #group_info.from_dict(group.as_dict())
1383 placement_groups
.append(group
)
1385 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1386 nsr_config
= yield from self
.get_nsr_config()
1388 ### Step-3: Get VDU level placement groups
1389 for group
in self
.vnfd
.msg
.placement_groups
:
1390 for member_vdu
in group
.member_vdus
:
1391 if member_vdu
.member_vdu_ref
== vdu
.id:
1392 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1394 if group_info
is None:
1395 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1396 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1398 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1402 self
.member_vnf_index
)
1403 placement_groups
.append(group_info
)
1405 return placement_groups
1408 def create_vdus(self
, vnfr
, restart_mode
=False):
1409 """ Create the VDUs associated with this VNF """
1411 def get_vdur_id(vdud
):
1412 """Get the corresponding VDUR's id for the VDUD. This is useful in
1415 In restart mode we check for exiting VDUR's ID and use them, if
1416 available. This way we don't end up creating duplicate VDURs
1420 if restart_mode
and vdud
is not None:
1422 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1425 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1430 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1431 for vdu
in self
.vnfd
.msg
.vdu
:
1432 self
._log
.debug("Creating vdu: %s", vdu
)
1433 vdur_id
= get_vdur_id(vdu
)
1435 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1436 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1439 self
.member_vnf_index
,
1440 [ group
.name
for group
in placement_groups
])
1442 vdur
= VirtualDeploymentUnitRecord(
1448 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1449 mgmt_network
=self
._mgmt
_network
,
1450 cloud_account_name
=self
.cloud_account_name
,
1451 vnfd_package_store
=self
._vnfd
_package
_store
,
1453 placement_groups
= placement_groups
,
1455 yield from vdur
.vdu_opdata_register()
1457 self
._vdus
.append(vdur
)
1460 def instantiate_vdus(self
, xact
, vnfr
):
1461 """ Instantiate the VDUs associated with this VNF """
1462 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1464 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1466 # Identify any dependencies among the VDUs
1467 dependencies
= collections
.defaultdict(list)
1468 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1470 for vdu
in self
._vdus
:
1471 if vdu
.vdud_cloud_init
is not None:
1472 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1473 if vdu_id
!= vdu
.vdu_id
:
1474 # This means that vdu.vdu_id depends upon vdu_id,
1475 # i.e. vdu_id must be instantiated before
1477 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1479 # Define the terminal states of VDU instantiation
1481 VDURecordState
.READY
,
1482 VDURecordState
.TERMINATED
,
1483 VDURecordState
.FAILED
,
1486 datastore
= VdurDatastore()
1490 def instantiate_monitor(vdu
):
1491 """Monitor the state of the VDU during instantiation
1494 vdu - a VirtualDeploymentUnitRecord
1497 # wait for the VDUR to enter a terminal state
1498 while vdu
._state
not in terminal
:
1499 yield from asyncio
.sleep(1, loop
=self
._loop
)
1501 # update the datastore
1502 datastore
.update(vdu
)
1504 # add the VDU to the set of processed VDUs
1505 processed
.add(vdu
.vdu_id
)
1508 def instantiate(vdu
):
1509 """Instantiate the specified VDU
1512 vdu - a VirtualDeploymentUnitRecord
1515 if the VDU, or any of the VDUs this VDU depends upon, are
1516 terminated or fail to instantiate properly, a
1517 VirtualDeploymentUnitRecordError is raised.
1520 for dependency
in dependencies
[vdu
.vdu_id
]:
1521 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1523 while dependency
.vdu_id
not in processed
:
1524 yield from asyncio
.sleep(1, loop
=self
._loop
)
1526 if not dependency
.active
:
1527 raise VirtualDeploymentUnitRecordError()
1529 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1531 # Populate the datastore with the current values of the VDU
1534 # Substitute any variables contained in the cloud config script
1535 config
= str(vdu
.vdud_cloud_init
)
1537 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1540 # Extract the variable names
1542 for variable
in parts
[1::2]:
1543 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1545 # Iterate of the variables and substitute values from the
1547 for variable
in variables
:
1549 # Handle a reference to a VDU by ID
1550 if variable
.startswith('vdu['):
1551 value
= datastore
.get(variable
)
1553 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1554 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1556 config
= config
.replace("{{ %s }}" % variable
, value
)
1559 # Handle a reference to the current VDU
1560 if variable
.startswith('vdu'):
1561 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1562 config
= config
.replace("{{ %s }}" % variable
, value
)
1565 # Handle unrecognized variables
1566 msg
= 'unrecognized cloud-config variable: {}'
1567 raise ValueError(msg
.format(variable
))
1569 # Instantiate the VDU
1570 with self
._dts
.transaction() as xact
:
1571 self
._log
.debug("Instantiating vdu: %s", vdu
)
1572 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1573 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1574 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1577 # First create a set of tasks to monitor the state of the VDUs and
1578 # report when they have entered a terminal state
1579 for vdu
in self
._vdus
:
1580 self
._loop
.create_task(instantiate_monitor(vdu
))
1582 for vdu
in self
._vdus
:
1583 self
._loop
.create_task(instantiate(vdu
))
1585 def has_mgmt_interface(self
, vdu
):
1586 # ## TODO: Support additional mgmt_interface type options
1587 if self
.vnfd
.msg
.mgmt_interface
.vdu_id
== vdu
.id:
1591 def vlr_xpath(self
, vlr_id
):
1594 "D,/vlr:vlr-catalog/"
1595 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1597 def ext_vlr_by_id(self
, vlr_id
):
1598 """ find ext vlr by id """
1599 return self
._ext
_vlrs
[vlr_id
]
1602 def publish_inventory(self
, xact
):
1603 """ Publish the inventory associated with this VNF """
1604 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1606 for component
in self
.vnfd
.msg
.component
:
1607 self
._log
.debug("Creating inventory component %s", component
)
1608 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1612 comp
= VcsComponent(dts
=self
._dts
,
1615 cluster_name
=self
._cluster
_name
,
1616 vcs_handler
=self
._vcs
_handler
,
1617 component
=component
,
1618 mangled_name
=mangled_name
,
1620 if comp
.name
in self
._inventory
:
1621 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1622 component
, self
._vnfd
_id
)
1624 self
._log
.debug("Adding component %s for vnrf %s",
1625 comp
.name
, self
._vnfr
_id
)
1626 self
._inventory
[comp
.name
] = comp
1627 yield from comp
.publish(xact
)
1629 def all_vdus_active(self
):
1630 """ Are all VDUS in this VNFR active? """
1631 for vdu
in self
._vdus
:
1635 self
._log
.debug("Inside all_vdus_active. Returning True")
1639 def instantiation_failed(self
, failed_reason
=None):
1640 """ VNFR instantiation failed """
1641 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1642 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1643 self
._state
_failed
_reason
= failed_reason
1645 # Update the VNFR with the changed status
1646 yield from self
.publish(None)
1650 """ This VNF is ready"""
1651 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1653 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1654 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1657 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1659 # Update the VNFR with the changed status
1660 yield from self
.publish(None)
1662 def update_cp(self
, cp_name
, ip_address
, cp_id
):
1663 """Updated the connection point with ip address"""
1664 for cp
in self
._cprs
:
1665 if cp
.name
== cp_name
:
1666 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1667 cp_name
, cp
, ip_address
, cp_id
)
1668 cp
.ip_address
= ip_address
1669 cp
.connection_point_id
= cp_id
1672 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1673 self
._log
.debug(err
)
1674 raise VirtualDeploymentUnitRecordError(err
)
1676 def set_state(self
, state
):
1677 """ Set state for this VNFR"""
1681 def instantiate(self
, xact
, restart_mode
=False):
1682 """ instantiate this VNF """
1683 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1688 # Iterate over all the connection points in VNFR and fetch the
1691 def cpr_from_cp(cp
):
1692 """ Creates a record level connection point from the desciptor cp"""
1693 cp_fields
= ["name", "image", "vm-flavor"]
1694 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1696 cpr_dict
.update(cp_copy_dict
)
1697 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1699 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1700 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1702 for cp
in self
._vnfr
.connection_point
:
1703 cpr
= cpr_from_cp(cp
)
1704 self
._cprs
.append(cpr
)
1705 self
._log
.debug("Adding Connection point record %s ", cp
)
1707 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1708 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1709 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1710 rwdts
.XactFlag
.MERGE
)
1714 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1715 cpr
.vlr_ref
= cp
.vlr_ref
1716 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1718 # Fetch the VNFD associated with the VNFR
1719 self
._log
.debug("VNFR-ID %s: Fetching vnfds", self
._vnfr
_id
)
1720 self
._vnfd
= yield from self
._vnfm
.get_vnfd_ref(self
._vnfd
_id
)
1721 self
._log
.debug("VNFR-ID %s: Fetched vnfd:%s", self
._vnfr
_id
, self
._vnfd
)
1723 assert self
.vnfd
is not None
1725 # Fetch External VLRs
1726 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1727 yield from fetch_vlrs()
1730 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1731 yield from self
.publish_inventory(xact
)
1734 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1735 yield from self
.create_vls()
1738 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1739 yield from self
.publish(xact
)
1742 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1744 yield from self
.instantiate_vls(xact
, restart_mode
)
1745 except Exception as e
:
1746 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1747 yield from self
.instantiation_failed(str(e
))
1750 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1753 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1754 yield from self
.create_vdus(self
, restart_mode
)
1757 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1758 yield from self
.publish(xact
)
1761 # ToDo: Check if this should be prevented during restart
1762 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1763 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1766 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1767 yield from self
.publish(xact
)
1769 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1772 def terminate(self
, xact
):
1773 """ Terminate this virtual network function """
1775 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1777 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1780 if self
._vnf
_mon
is not None:
1781 self
._vnf
_mon
.stop()
1782 self
._vnf
_mon
.deregister()
1783 self
._vnf
_mon
= None
1786 def terminate_vls():
1787 """ Terminate VLs in this VNF """
1788 for vl
in self
._vlrs
:
1789 yield from vl
.terminate(xact
)
1792 def terminate_vdus():
1793 """ Terminate VDUS in this VNF """
1794 for vdu
in self
._vdus
:
1795 yield from vdu
.terminate(xact
)
1797 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1798 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1799 yield from terminate_vls()
1801 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1802 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1803 yield from terminate_vdus()
1805 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1806 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1809 class VnfdDtsHandler(object):
1810 """ DTS handler for VNFD config changes """
1811 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1813 def __init__(self
, dts
, log
, loop
, vnfm
):
1822 """ DTS registration handle """
1827 """ Register for VNFD configuration"""
1829 def on_apply(dts
, acg
, xact
, action
, scratch
):
1830 """Apply the configuration"""
1831 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1832 xact
, action
, scratch
)
1834 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1835 # Create/Update a VNFD record
1836 for cfg
in self
._regh
.get_xact_elements(xact
):
1837 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1838 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
1839 self
._vnfm
.update_vnfd(cfg
)
1841 scratch
.pop('vnfds', None)
1844 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1845 """ on prepare callback """
1846 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1847 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1848 fref
= ProtobufC
.FieldReference
.alloc()
1849 fref
.goto_whole_message(msg
.to_pbcm())
1851 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1852 if fref
.is_field_deleted():
1853 # Delete an VNFD record
1854 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1855 if self
._vnfm
.vnfd_in_use(msg
.id):
1856 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1857 err
= "Cannot delete a VNFD in use - %s" % msg
1858 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1859 # Delete a VNFD record
1860 yield from self
._vnfm
.delete_vnfd(msg
.id)
1862 # Handle actual adds/updates in apply_callback,
1863 # just check if VNFD in use in prepare_callback
1864 if self
._vnfm
.vnfd_in_use(msg
.id):
1865 self
._log
.debug("Cannot modify an VNFD in use - %s", msg
)
1866 err
= "Cannot modify an VNFD in use - %s" % msg
1867 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1869 # Add this VNFD to scratch to create/update in apply callback
1870 vnfds
= scratch
.setdefault('vnfds', [])
1871 vnfds
.append(msg
.id)
1873 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1876 "Registering for VNFD config using xpath: %s",
1877 VnfdDtsHandler
.XPATH
,
1879 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1880 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1881 self
._regh
= acg
.register(
1882 xpath
=VnfdDtsHandler
.XPATH
,
1883 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1884 on_prepare
=on_prepare
)
1887 class VcsComponentDtsHandler(object):
1888 """ Vcs Component DTS handler """
1889 XPATH
= ("D,/rw-manifest:manifest" +
1890 "/rw-manifest:operational-inventory" +
1891 "/rw-manifest:component")
1893 def __init__(self
, dts
, log
, loop
, vnfm
):
1902 """ DTS registration handle """
1907 """ Registers VCS component dts publisher registration"""
1908 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1909 VcsComponentDtsHandler
.XPATH
)
1911 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1912 handlers
= rift
.tasklets
.Group
.Handler()
1913 with self
._dts
.group_create(handler
=handlers
) as group
:
1914 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1916 flags
=(rwdts
.Flag
.PUBLISHER |
1917 rwdts
.Flag
.NO_PREP_READ |
1918 rwdts
.Flag
.DATASTORE
),)
1921 def publish(self
, xact
, path
, msg
):
1922 """ Publishes the VCS component """
1923 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1925 self
.regh
.create_element(path
, msg
)
1926 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1927 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1929 class VnfrConsoleOperdataDtsHandler(object):
1930 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1932 def vnfr_vdu_console_xpath(self
):
1933 """ path for resource-mgr"""
1934 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1936 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1943 self
._vnfr
_id
= vnfr_id
1944 self
._vdur
_id
= vdur_id
1945 self
._vdu
_id
= vdu_id
1949 """ Register for VNFR VDU Operational Data read from dts """
1952 def on_prepare(xact_info
, action
, ks_path
, msg
):
1953 """ prepare callback from dts """
1954 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
1956 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1957 xact_info
, action
, xpath
, msg
1960 if action
== rwdts
.QueryAction
.READ
:
1961 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
1962 path_entry
= schema
.keyspec_to_entry(ks_path
)
1963 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
1965 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
1966 except VnfRecordError
as e
:
1967 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
1968 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1971 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
1972 if not vdur
._state
== VDURecordState
.READY
:
1973 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
1974 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1976 with self
._dts
.transaction() as new_xact
:
1977 resp
= yield from vdur
.read_resource(new_xact
)
1978 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1979 vdur_console
.id = self
._vdur
_id
1980 if resp
.console_url
:
1981 vdur_console
.console_url
= resp
.console_url
1983 vdur_console
.console_url
= 'none'
1984 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
1986 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
1987 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1988 vdur_console
.id = self
._vdur
_id
1989 vdur_console
.console_url
= 'none'
1991 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
1992 xpath
=self
.vnfr_vdu_console_xpath
,
1995 #raise VnfRecordError("Not supported operation %s" % action)
1996 self
._log
.error("Not supported operation %s" % action
)
1997 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2001 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2002 self
.vnfr_vdu_console_xpath
)
2003 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2004 with self
._dts
.group_create() as group
:
2005 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2007 flags
=rwdts
.Flag
.PUBLISHER
,
2011 class VnfrDtsHandler(object):
2012 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2013 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2015 def __init__(self
, dts
, log
, loop
, vnfm
):
2025 """ Return registration handle"""
2030 """ Return VNF manager instance """
2035 """ Register for vnfr create/update/delete/read requests from dts """
2036 def on_commit(xact_info
):
2037 """ The transaction has been committed """
2038 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2039 return rwdts
.MemberRspCode
.ACTION_OK
2041 def on_abort(*args
):
2042 """ Abort callback """
2043 self
._log
.debug("VNF transaction got aborted")
2046 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2049 def instantiate_realloc_vnfr(vnfr
):
2050 """Re-populate the vnfm after restart
2057 yield from vnfr
.instantiate(None, restart_mode
=True)
2059 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2060 curr_cfg
= self
.regh
.elements
2061 for cfg
in curr_cfg
:
2062 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2063 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2065 self
._log
.debug("Got on_event in vnfm")
2067 return rwdts
.MemberRspCode
.ACTION_OK
2070 def on_prepare(xact_info
, action
, ks_path
, msg
):
2071 """ prepare callback from dts """
2073 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2074 xact_info
, action
, msg
2077 if action
== rwdts
.QueryAction
.CREATE
:
2078 if not msg
.has_field("vnfd_ref"):
2079 err
= "Vnfd reference not provided"
2080 self
._log
.error(err
)
2081 raise VnfRecordError(err
)
2083 vnfr
= self
.vnfm
.create_vnfr(msg
)
2085 # RIFT-9105: Unable to add a READ query under an existing transaction
2086 # xact = xact_info.xact
2087 yield from vnfr
.instantiate(None)
2088 except Exception as e
:
2089 self
._log
.exception(e
)
2090 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2091 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2092 yield from vnfr
.publish(None)
2093 elif action
== rwdts
.QueryAction
.DELETE
:
2094 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2095 path_entry
= schema
.keyspec_to_entry(ks_path
)
2096 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2099 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2100 raise VirtualNetworkFunctionRecordNotFound(
2101 "VNFR id %s", path_entry
.key00
.id)
2104 yield from vnfr
.terminate(xact_info
.xact
)
2107 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2108 except Exception as e
:
2109 self
._log
.exception(e
)
2110 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2112 elif action
== rwdts
.QueryAction
.UPDATE
:
2113 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2114 path_entry
= schema
.keyspec_to_entry(ks_path
)
2117 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2118 except Exception as e
:
2119 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2120 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2124 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2125 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2128 self
._log
.debug("VNFR {} update config status {} (current {})".
2129 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2130 # Update the config status and publish
2131 vnfr
._config
_status
= msg
.config_status
2132 yield from vnfr
.publish(None)
2135 raise NotImplementedError(
2136 "%s action on VirtualNetworkFunctionRecord not supported",
2139 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2141 self
._log
.debug("Registering for VNFR using xpath: %s",
2142 VnfrDtsHandler
.XPATH
,)
2144 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2145 on_prepare
=on_prepare
,)
2146 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2147 with self
._dts
.group_create(handler
=handlers
) as group
:
2148 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2150 flags
=(rwdts
.Flag
.PUBLISHER |
2151 rwdts
.Flag
.NO_PREP_READ |
2153 rwdts
.Flag
.DATASTORE
),)
2156 def create(self
, xact
, path
, msg
):
2158 Create a VNFR record in DTS with path and message
2160 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2163 self
.regh
.create_element(path
, msg
)
2164 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2168 def update(self
, xact
, path
, msg
):
2170 Update a VNFR record in DTS with path and message
2172 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2174 self
.regh
.update_element(path
, msg
)
2175 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2179 def delete(self
, xact
, path
):
2181 Delete a VNFR record in DTS with path and message
2183 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2184 self
.regh
.delete_element(path
)
2185 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2188 class VirtualNetworkFunctionDescriptor(object):
2190 Virtual Network Function descriptor class
2193 def __init__(self
, dts
, log
, loop
, vnfm
, vnfd
):
2203 def ref_count(self
):
2204 """ Returns the reference count associated with
2205 this Virtual Network Function Descriptor"""
2206 return self
._ref
_count
2210 """ Returns vnfd id """
2211 return self
._vnfd
.id
2215 """ Returns vnfd name """
2216 return self
._vnfd
.name
2219 """ Returns whether vnfd is in use or not """
2220 return True if self
._ref
_count
> 0 else False
2223 """ Take a reference on this object """
2224 self
._ref
_count
+= 1
2225 return self
._ref
_count
2228 """ Release reference on this object """
2229 if self
.ref_count
< 1:
2230 msg
= ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2231 (self
.id, self
._ref
_count
))
2232 self
._log
.critical(msg
)
2233 raise VnfRecordError(msg
)
2234 self
._log
.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2235 self
.id, self
.ref_count
)
2236 self
._ref
_count
-= 1
2237 return self
._ref
_count
2241 """ Return the message associated with this NetworkServiceDescriptor"""
2245 def path_for_id(vnfd_id
):
2246 """ Return path for the passed vnfd_id"""
2247 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
2250 """ Return the path associated with this NetworkServiceDescriptor"""
2251 return VirtualNetworkFunctionDescriptor
.path_for_id(self
.id)
2253 def update(self
, vnfd
):
2254 """ Update the Virtual Network Function Descriptor """
2256 self
._log
.error("Cannot update descriptor %s in use refcnt=%d",
2257 self
.id, self
.ref_count
)
2259 # The following loop is added to debug RIFT-13284
2260 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2261 if vnf_rec
.vnfd_id
== self
.id:
2262 self
._log
.error("descriptor %s in used by %s:%s",
2263 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2264 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self
.id)
2268 """ Delete the Virtual Network Function Descriptor """
2270 self
._log
.error("Cannot delete descriptor %s in use refcnt=%d",
2273 # The following loop is added to debug RIFT-13284
2274 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2275 if vnf_rec
.vnfd_id
== self
.id:
2276 self
._log
.error("descriptor %s in used by %s:%s",
2277 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2278 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self
.id)
2279 self
._vnfm
.delete_vnfd(self
.id)
2282 class VnfdRefCountDtsHandler(object):
2283 """ The VNFD Ref Count DTS handler """
2284 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2286 def __init__(self
, dts
, log
, loop
, vnfm
):
2296 """ Return registration handle """
2301 """ Return the NS manager instance """
2306 """ Register for VNFD ref count read from dts """
2309 def on_prepare(xact_info
, action
, ks_path
, msg
):
2310 """ prepare callback from dts """
2311 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2313 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2314 xact_info
, action
, xpath
, msg
2317 if action
== rwdts
.QueryAction
.READ
:
2318 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2319 path_entry
= schema
.keyspec_to_entry(ks_path
)
2320 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2321 for xpath
, msg
in vnfd_list
:
2322 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2324 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2327 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2329 raise VnfRecordError("Not supported operation %s" % action
)
2331 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2332 with self
._dts
.group_create() as group
:
2333 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2335 flags
=rwdts
.Flag
.PUBLISHER
,
2339 class VdurDatastore(object):
2341 This VdurDatastore is intended to expose select information about a VDUR
2342 such that it can be referenced in a cloud config file. The data that is
2343 exposed does not necessarily follow the structure of the data in the yang
2344 model. This is intentional. The data that are exposed are intended to be
2345 agnostic of the yang model so that changes in the model do not necessarily
2346 require changes to the interface provided to the user. It also means that
2347 the user does not need to be familiar with the RIFT.ware yang models.
2351 """Create an instance of VdurDatastore"""
2352 self
._vdur
_data
= dict()
2353 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2355 def add(self
, vdur
):
2356 """Add a new VDUR to the datastore
2359 vdur - a VirtualDeploymentUnitRecord instance
2362 A ValueError is raised if the VDUR is (1) None or (2) already in
2366 if vdur
.vdu_id
is None:
2367 raise ValueError('VDURs are required to have an ID')
2369 if vdur
.vdu_id
in self
._vdur
_data
:
2370 raise ValueError('cannot add a VDUR more than once')
2372 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2374 def set_if_not_none(key
, attr
):
2375 if attr
is not None:
2376 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2378 set_if_not_none('name', vdur
._vdud
.name
)
2379 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2381 def update(self
, vdur
):
2382 """Update the VDUR information in the datastore
2385 vdur - a GI representation of a VDUR
2388 A ValueError is raised if the VDUR is (1) None or (2) already in
2392 if vdur
.vdu_id
is None:
2393 raise ValueError('VNFDs are required to have an ID')
2395 if vdur
.vdu_id
not in self
._vdur
_data
:
2396 raise ValueError('VNF is not recognized')
2398 def set_or_delete(key
, attr
):
2400 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2401 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2404 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2406 set_or_delete('name', vdur
._vdud
.name
)
2407 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2409 def remove(self
, vdur_id
):
2410 """Remove all of the data associated with specified VDUR
2413 vdur_id - the identifier of a VNFD in the datastore
2416 A ValueError is raised if the VDUR is not contained in the
2420 if vdur_id
not in self
._vdur
_data
:
2421 raise ValueError('VNF is not recognized')
2423 del self
._vdur
_data
[vdur_id
]
2425 def get(self
, expr
):
2426 """Retrieve VDUR information from the datastore
2428 An expression should be of the form,
2432 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2433 the exposed attribute that the user wishes to retrieve.
2435 If the requested data is not available, None is returned.
2438 expr - a string that specifies the data to return
2441 A ValueError is raised if the provided expression cannot be parsed.
2444 The requested data or None
2447 result
= self
._pattern
.match(expr
)
2449 raise ValueError('data expression not recognized ({})'.format(expr
))
2451 vdur_id
, key
= result
.groups()
2453 if vdur_id
not in self
._vdur
_data
:
2456 return self
._vdur
_data
[vdur_id
].get(key
, None)
2459 class VnfManager(object):
2460 """ The virtual network function manager class """
2461 def __init__(self
, dts
, log
, loop
, cluster_name
):
2465 self
._cluster
_name
= cluster_name
2467 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2468 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2469 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2470 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2472 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2480 def vnfr_handler(self
):
2481 """ VNFR dts handler """
2482 return self
._vnfr
_handler
2485 def vcs_handler(self
):
2486 """ VCS dts handler """
2487 return self
._vcs
_handler
2491 """ Register all static DTS handlers """
2492 for hdl
in self
._dts
_handlers
:
2493 yield from hdl
.register()
2497 """ Run this VNFM instance """
2498 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2499 yield from self.register()
2501 def handle_nsr(self, nsr, action):
2502 if action in [rwdts.QueryAction.CREATE]:
2503 self._nsrs[nsr.id] = nsr
2504 elif action == rwdts.QueryAction.DELETE:
2505 if nsr.id in self._nsrs:
2506 del self._nsrs[nsr.id]
2508 def get_linked_mgmt_network(self, vnfr):
2509 """For the given VNFR get the related mgmt network from the NSD, if
2512 vnfd_id = vnfr.vnfd_ref
2513 nsr_id = vnfr.nsr_id_ref
2515 # for the given related VNFR, get the corresponding NSR-config
2518 nsr_obj = self._nsrs[nsr_id]
2520 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2522 # for the related NSD check if a VLD exists such that it's a mgmt
2524 for vld in nsr_obj.nsd.vld:
2525 if vld.mgmt_network:
2530 def get_vnfr(self, vnfr_id):
2531 """ get VNFR by vnfr id """
2533 if vnfr_id not in self._vnfrs:
2534 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2536 return self._vnfrs[vnfr_id]
2538 def create_vnfr(self, vnfr):
2539 """ Create a VNFR instance """
2540 if vnfr.id in self._vnfrs:
2541 msg = "Vnfr
id %s already exists
" % vnfr.id
2542 self._log.error(msg)
2543 raise VnfRecordError(msg)
2545 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2549 mgmt_network = self.get_linked_mgmt_network(vnfr)
2551 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2552 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2553 mgmt_network=mgmt_network
2555 return self._vnfrs[vnfr.id]
2558 def delete_vnfr(self, xact, vnfr):
2559 """ Create a VNFR instance """
2560 if vnfr.vnfr_id in self._vnfrs:
2561 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2562 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2563 del self._vnfrs[vnfr.vnfr_id]
2566 def fetch_vnfd(self, vnfd_id):
2567 """ Fetch VNFDs based with the vnfd id"""
2568 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2569 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2572 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2574 for ent in res_iter:
2575 res = yield from ent
2579 err = "Failed to get Vnfd
%s" % vnfd_id
2580 self._log.error(err)
2581 raise VnfRecordError(err)
2583 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2588 def get_vnfd_ref(self, vnfd_id):
2589 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2590 vnfd = yield from self.get_vnfd(vnfd_id)
2595 def get_vnfd(self, vnfd_id):
2596 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2598 if vnfd_id not in self._vnfds:
2599 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2600 vnfd = yield from self.fetch_vnfd(vnfd_id)
2603 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2604 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD
id:%s", vnfd_id)
2606 if vnfd.id != vnfd_id:
2607 self._log.error("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2608 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2610 if vnfd.id not in self._vnfds:
2611 self.create_vnfd(vnfd)
2613 return self._vnfds[vnfd_id]
2615 def vnfd_in_use(self, vnfd_id):
2616 """ Is this VNFD in use """
2617 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2618 if vnfd_id in self._vnfds:
2619 return self._vnfds[vnfd_id].in_use()
2623 def publish_vnfr(self, xact, path, msg):
2624 """ Publish a VNFR """
2625 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2627 yield from self.vnfr_handler.update(xact, path, msg)
2629 def create_vnfd(self, vnfd):
2630 """ Create a virtual network function descriptor """
2631 self._log.debug("Create virtual networkfunction descriptor
- %s", vnfd)
2632 if vnfd.id in self._vnfds:
2633 self._log.error("Cannot create VNFD
%s -VNFD
id already exists
", vnfd)
2634 raise VirtualNetworkFunctionDescriptorError("VNFD already exists
-%s", vnfd.id)
2636 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2641 return self._vnfds[vnfd.id]
2643 def update_vnfd(self, vnfd):
2644 """ update the Virtual Network Function descriptor """
2645 self._log.debug("Update virtual network function descriptor
- %s", vnfd)
2647 if vnfd.id not in self._vnfds:
2648 self._log.debug("No VNFD found
- creating VNFD
id = %s", vnfd.id)
2649 self.create_vnfd(vnfd)
2651 self._log.debug("Updating VNFD
id = %s, vnfd
= %s", vnfd.id, vnfd)
2652 self._vnfds[vnfd.id].update(vnfd)
2655 def delete_vnfd(self, vnfd_id):
2656 """ Delete the Virtual Network Function descriptor with the passed id """
2657 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2658 if vnfd_id not in self._vnfds:
2659 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2660 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2662 if self._vnfds[vnfd_id].in_use():
2663 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2665 self._vnfds[vnfd_id].ref_count)
2666 raise VirtualNetworkFunctionDescriptorRefCountExists(
2667 "Cannot delete
:%s, ref_count
:%s",
2669 self._vnfds[vnfd_id].ref_count)
2671 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2673 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2674 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2675 if os.path.exists(vnfd_dir):
2676 shutil.rmtree(vnfd_dir, ignore_errors=True)
2677 except Exception as e:
2678 self._log.error("Exception in cleaning up VNFD
{}: {}".
2679 format(self._vnfds[vnfd_id].name, e))
2680 self._log.exception(e)
2682 del self._vnfds[vnfd_id]
2684 def vnfd_refcount_xpath(self, vnfd_id):
2685 """ xpath for ref count entry """
2686 return (VnfdRefCountDtsHandler.XPATH +
2687 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2690 def get_vnfd_refcount(self, vnfd_id):
2691 """ Get the vnfd_list from this VNFM"""
2693 if vnfd_id is None or vnfd_id == "":
2694 for vnfd in self._vnfds.values():
2695 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2696 vnfd_msg.vnfd_id_ref = vnfd.id
2697 vnfd_msg.instance_ref_count = vnfd.ref_count
2698 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2699 elif vnfd_id in self._vnfds:
2700 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2701 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2702 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2707 class VnfmTasklet(rift.tasklets.Tasklet):
2708 """ VNF Manager tasklet class """
2709 def __init__(self, *args, **kwargs):
2710 super(VnfmTasklet, self).__init__(*args, **kwargs)
2711 self.rwlog.set_category("rw
-mano
-log
")
2712 self.rwlog.set_subcategory("vnfm
")
2719 super(VnfmTasklet, self).start()
2720 self.log.info("Starting VnfmTasklet
")
2722 self.log.setLevel(logging.DEBUG)
2724 self.log.debug("Registering with dts
")
2725 self._dts = rift.tasklets.DTS(self.tasklet_info,
2726 RwVnfmYang.get_schema(),
2728 self.on_dts_state_change)
2730 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2732 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2735 def on_instance_started(self):
2736 """ Task insance started callback """
2737 self.log.debug("Got instance started callback
")
2743 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2748 """ Task init callback """
2750 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2751 assert vm_parent_name is not None
2752 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2753 yield from self._vnfm.run()
2755 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2760 """ Task run callback """
2764 def on_dts_state_change(self, state):
2765 """Take action according to current dts state to transition
2766 application into the corresponding application state
2769 state - current dts state
2772 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2773 rwdts.State.CONFIG: rwdts.State.RUN,
2777 rwdts.State.INIT: self.init,
2778 rwdts.State.RUN: self.run,
2781 # Transition application to next state
2782 handler = handlers.get(state, None)
2783 if handler is not None:
2784 yield from handler()
2786 # Transition dts to next state
2787 next_state = switch.get(state, None)
2788 if next_state is not None:
2789 self._dts.handle.set_state(next_state)