2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
54 class VMResourceError(Exception):
55 """ VM resource Error"""
59 class VnfRecordError(Exception):
60 """ VNF record instatiation failed"""
64 class VduRecordError(Exception):
65 """ VDU record instatiation failed"""
69 class NotImplemented(Exception):
70 """Not implemented """
74 class VnfrRecordExistsError(Exception):
75 """VNFR record already exist with the same VNFR id"""
79 class InternalVirtualLinkRecordError(Exception):
80 """Internal virtual link record error"""
84 class VDUImageNotFound(Exception):
85 """VDU Image not found error"""
89 class VirtualDeploymentUnitRecordError(Exception):
90 """VDU Instantiation failed"""
94 class VMNotReadyError(Exception):
95 """ VM Not yet received from resource manager """
99 class VDURecordNotFound(Exception):
100 """ Could not find a VDU record """
104 class VirtualNetworkFunctionRecordDescNotFound(Exception):
105 """ Cannot find Virtual Network Function Record Descriptor """
109 class VirtualNetworkFunctionDescriptorError(Exception):
110 """ Virtual Network Function Record Descriptor Error """
114 class VirtualNetworkFunctionDescriptorNotFound(Exception):
115 """ Virtual Network Function Record Descriptor Not Found """
119 class VirtualNetworkFunctionRecordNotFound(Exception):
120 """ Virtual Network Function Record Not Found """
124 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
125 """ Virtual Network Funtion Descriptor reference count exists """
129 class VnfrInstantiationFailed(Exception):
130 """ Virtual Network Funtion Instantiation failed"""
134 class VNFMPlacementGroupError(Exception):
137 class VirtualNetworkFunctionRecordState(enum
.Enum
):
144 VL_TERMINATE_PHASE
= 6
145 VDU_TERMINATE_PHASE
= 7
150 class VDURecordState(enum
.Enum
):
151 """VDU record state """
154 RESOURCE_ALLOC_PENDING
= 3
161 class VcsComponent(object):
162 """ VCS Component within the VNF descriptor """
163 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
167 self
._component
= component
168 self
._cluster
_name
= cluster_name
169 self
._vcs
_handler
= vcs_handler
170 self
._mangled
_name
= mangled_name
173 def mangle_name(component_name
, vnf_name
, vnfd_id
):
174 """ mangled component name """
175 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
179 """ name of this component"""
180 return self
._mangled
_name
184 """ The path for this object """
185 return("D,/rw-manifest:manifest" +
186 "/rw-manifest:operational-inventory" +
187 "/rw-manifest:component" +
188 "[rw-manifest:component-name = '{}']").format(self
.name
)
191 def instance_xpath(self
):
192 """ The path for this object """
193 return("D,/rw-base:vcs" +
196 "[instance-name = '{}']".format(self
._cluster
_name
))
199 def start_comp_xpath(self
):
200 """ start component xpath """
201 return (self
.instance_xpath
+
202 "/child-n[instance-name = 'START-REQ']")
204 def get_start_comp_msg(self
, ip_address
):
205 """ start this component """
206 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
207 start_msg
.instance_name
= 'START-REQ'
208 start_msg
.component_name
= self
.name
209 start_msg
.admin_command
= "START"
210 start_msg
.ip_address
= ip_address
216 """ Returns the message for this vcs component"""
218 vcs_comp_dict
= self
._component
.as_dict()
220 def mangle_comp_names(comp_dict
):
221 """ mangle component name with VNF name, id"""
222 for key
, val
in comp_dict
.items():
223 if isinstance(val
, dict):
224 comp_dict
[key
] = mangle_comp_names(val
)
225 elif isinstance(val
, list):
228 if isinstance(ent
, dict):
229 val
[i
] = mangle_comp_names(ent
)
233 elif key
== "component_name":
234 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
239 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
240 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
244 def publish(self
, xact
):
245 """ Publishes the VCS component """
246 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
247 self
.name
, self
.path
, self
.msg
)
248 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
251 def start(self
, xact
, parent
, ip_addr
=None):
252 """ Starts this VCS component """
253 # ATTN RV - replace with block add
254 start_msg
= self
.get_start_comp_msg(ip_addr
)
255 self
._log
.debug("starting component %s %s",
256 self
.start_comp_xpath
, start_msg
)
257 yield from self
._dts
.query_create(self
.start_comp_xpath
,
260 self
._log
.debug("started component %s, %s",
261 self
.start_comp_xpath
, start_msg
)
264 class VirtualDeploymentUnitRecord(object):
265 """ Virtual Deployment Unit Record """
276 placement_groups
=[]):
282 self
._mgmt
_intf
= mgmt_intf
283 self
._cloud
_account
_name
= cloud_account_name
284 self
._vnfd
_package
_store
= vnfd_package_store
286 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
289 self
._state
= VDURecordState
.INIT
290 self
._state
_failed
_reason
= None
291 self
._request
_id
= str(uuid
.uuid4())
292 self
._name
= vnfr
.name
+ "__" + vdud
.id
293 self
._placement
_groups
= placement_groups
296 self
._vdud
_cloud
_init
= None
297 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
300 def vdu_opdata_register(self
):
301 yield from self
._vdur
_console
_handler
.register()
303 def cp_ip_addr(self
, cp_name
):
304 """ Find ip address by connection point name """
305 if self
._vm
_resp
is not None:
306 for conn_point
in self
._vm
_resp
.connection_points
:
307 if conn_point
.name
== cp_name
:
308 return conn_point
.ip_address
311 def cp_id(self
, cp_name
):
312 """ Find connection point id by connection point name """
313 if self
._vm
_resp
is not None:
314 for conn_point
in self
._vm
_resp
.connection_points
:
315 if conn_point
.name
== cp_name
:
316 return conn_point
.connection_point_id
329 """ Return this VDUR's name """
333 def cloud_account_name(self
):
334 """ Cloud account this VDU should be created in """
335 return self
._cloud
_account
_name
338 def image_name(self
):
339 """ name that should be used to lookup the image on the CMP """
340 return os
.path
.basename(self
._vdud
.image
)
343 def image_checksum(self
):
344 """ name that should be used to lookup the image on the CMP """
345 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
348 def management_ip(self
):
351 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
354 def vm_management_ip(self
):
357 return self
._vm
_resp
.management_ip
360 def operational_status(self
):
361 """ Operational status of this VDU"""
362 op_stats_dict
= {"INIT": "init",
363 "INSTANTIATING": "vm_init_phase",
364 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
367 "TERMINATING": "terminated",
368 "TERMINATED": "terminated",
370 return op_stats_dict
[self
._state
.name
]
375 vdu_fields
= ["vm_flavor",
381 vdu_copy_dict
= {k
: v
for k
, v
in
382 self
._vdud
.as_dict().items() if k
in vdu_fields
}
383 vdur_dict
= {"id": self
._vdur
_id
,
384 "vdu_id_ref": self
._vdud
.id,
385 "operational_status": self
.operational_status
,
386 "operational_status_details": self
._state
_failed
_reason
,
388 if self
.vm_resp
is not None:
389 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
390 "flavor_id": self
.vm_resp
.flavor_id
,
391 "image_id": self
.vm_resp
.image_id
,
394 if self
.management_ip
is not None:
395 vdur_dict
["management_ip"] = self
.management_ip
397 if self
.vm_management_ip
is not None:
398 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
400 vdur_dict
.update(vdu_copy_dict
)
405 for intf
, cp_id
, vlr
in self
._int
_intf
:
406 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
408 icp_list
.append({"name": cp
.name
,
410 "type_yang": "VPORT",
411 "ip_address": self
.cp_ip_addr(cp
.id)})
413 ii_list
.append({"name": intf
.name
,
414 "vdur_internal_connection_point_ref": cp
.id,
415 "virtual_interface": {}})
417 vdur_dict
["internal_connection_point"] = icp_list
418 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
419 vdur_dict
["internal_interface"] = ii_list
422 for intf
, cp
, vlr
in self
._ext
_intf
:
423 ei_list
.append({"name": cp
,
424 "vnfd_connection_point_ref": cp
,
425 "virtual_interface": {}})
426 self
._vnfr
.update_cp(cp
, self
.cp_ip_addr(cp
), self
.cp_id(cp
))
428 vdur_dict
["external_interface"] = ei_list
430 placement_groups
= []
431 for group
in self
._placement
_groups
:
432 placement_groups
.append(group
.as_dict())
434 vdur_dict
['placement_groups_info'] = placement_groups
435 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
438 def resmgr_path(self
):
439 """ path for resource-mgr"""
440 return ("D,/rw-resource-mgr:resource-mgmt" +
442 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
445 def vm_flavor_msg(self
):
446 """ VM flavor message """
447 flavor
= self
._vdud
.vm_flavor
.__class
__()
448 flavor
.copy_from(self
._vdud
.vm_flavor
)
453 def vdud_cloud_init(self
):
454 """ Return the cloud-init contents for the VDU """
455 if self
._vdud
_cloud
_init
is None:
456 self
._vdud
_cloud
_init
= self
.cloud_init()
458 return self
._vdud
_cloud
_init
460 def cloud_init(self
):
461 """ Populate cloud_init with cloud-config script from
462 either the inline contents or from the file provided
464 if self
._vdud
.cloud_init
is not None:
465 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
466 return self
._vdud
.cloud_init
467 elif self
._vdud
.cloud_init_file
is not None:
468 # Get cloud-init script contents from the file provided in the cloud_init_file param
469 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
470 filename
= self
._vdud
.cloud_init_file
471 self
._vnfd
_package
_store
.refresh()
472 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
473 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
475 return cloud_init_extractor
.read_script(stored_package
, filename
)
476 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
477 raise VirtualDeploymentUnitRecordError(e
)
479 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
481 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
483 availability_zones
= []
485 for group
in self
._placement
_groups
:
486 if group
.has_field('host_aggregate'):
487 for aggregate
in group
.host_aggregate
:
488 host_aggregates
.append(aggregate
.as_dict())
489 if group
.has_field('availability_zone'):
490 availability_zones
.append(group
.availability_zone
.as_dict())
491 if group
.has_field('server_group'):
492 server_groups
.append(group
.server_group
.as_dict())
494 if availability_zones
:
495 if len(availability_zones
) > 1:
496 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
497 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
499 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
502 if len(server_groups
) > 1:
503 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
504 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
506 vm_create_msg_dict
['server_group'] = server_groups
[0]
509 vm_create_msg_dict
['host_aggregate'] = host_aggregates
513 def process_placement_groups(self
, vm_create_msg_dict
):
514 """Process the placement_groups and fill resource-mgr request"""
515 if not self
._placement
_groups
:
518 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
519 assert len(cloud_set
) == 1
520 cloud_type
= cloud_set
.pop()
522 if cloud_type
== 'openstack':
523 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
526 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
529 def resmgr_msg(self
, config
=None):
530 vdu_fields
= ["vm_flavor",
536 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
537 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
539 vm_create_msg_dict
= {
541 "image_name": self
.image_name
,
544 if self
.image_checksum
is not None:
545 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
547 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
548 if self
._vdud
.has_field('mgmt_vpci'):
549 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
551 self
._log
.debug("VDUD: %s", self
._vdud
)
552 if config
is not None:
553 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
556 for intf
, cp
, vlr
in self
._ext
_intf
:
557 cp_info
= {"name": cp
,
558 "virtual_link_id": vlr
.network_id
,
559 "type_yang": intf
.virtual_interface
.type_yang
}
561 if (intf
.virtual_interface
.has_field('vpci') and
562 intf
.virtual_interface
.vpci
is not None):
563 cp_info
["vpci"] = intf
.virtual_interface
.vpci
565 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
566 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
568 cp_list
.append(cp_info
)
570 for intf
, cp
, vlr
in self
._int
_intf
:
571 if (intf
.virtual_interface
.has_field('vpci') and
572 intf
.virtual_interface
.vpci
is not None):
573 cp_list
.append({"name": cp
,
574 "virtual_link_id": vlr
.network_id
,
575 "type_yang": intf
.virtual_interface
.type_yang
,
576 "vpci": intf
.virtual_interface
.vpci
})
578 cp_list
.append({"name": cp
,
579 "virtual_link_id": vlr
.network_id
,
580 "type_yang": intf
.virtual_interface
.type_yang
})
582 vm_create_msg_dict
["connection_points"] = cp_list
583 vm_create_msg_dict
.update(vdu_copy_dict
)
585 self
.process_placement_groups(vm_create_msg_dict
)
587 msg
= RwResourceMgrYang
.VDUEventData()
588 msg
.event_id
= self
._request
_id
589 msg
.cloud_account
= self
.cloud_account_name
590 msg
.request_info
.from_dict(vm_create_msg_dict
)
594 def terminate(self
, xact
):
595 """ Delete resource in VIM """
596 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
597 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
600 self
._state
= VDURecordState
.TERMINATING
601 if self
._vm
_resp
is not None:
603 with self
._dts
.transaction() as new_xact
:
604 yield from self
.delete_resource(new_xact
)
606 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
608 if self
._rm
_regh
is not None:
609 self
._log
.debug("Deregistering resource manager registration handle")
610 self
._rm
_regh
.deregister()
613 if self
._vdur
_console
_handler
is not None:
614 self
._log
.error("Deregistering vnfr vdur registration handle")
615 self
._vdur
_console
_handler
._regh
.deregister()
616 self
._vdur
_console
_handler
._regh
= None
618 self
._state
= VDURecordState
.TERMINATED
620 def find_internal_cp_by_cp_id(self
, cp_id
):
621 """ Find the CP corresponding to the connection point id"""
624 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
627 for int_cp
in self
._vdud
.internal_connection_point
:
628 self
._log
.debug("Checking for int cp %s in internal connection points",
630 if int_cp
.id == cp_id
:
635 self
._log
.debug("Failed to find cp %s in internal connection points",
637 msg
= "Failed to find cp %s in internal connection points" % cp_id
638 raise VduRecordError(msg
)
640 # return the VLR associated with the connection point
644 def create_resource(self
, xact
, vnfr
, config
=None):
645 """ Request resource from ResourceMgr """
646 def find_cp_by_name(cp_name
):
647 """ Find a connection point by name """
649 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
650 for ext_cp
in vnfr
._cprs
:
651 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
652 if ext_cp
.name
== cp_name
:
656 self
._log
.debug("Failed to find cp %s in external connection points",
660 def find_internal_vlr_by_cp_name(cp_name
):
661 """ Find the VLR corresponding to the connection point name"""
664 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
667 for int_cp
in self
._vdud
.internal_connection_point
:
668 self
._log
.debug("Checking for int cp %s in internal connection points",
670 if int_cp
.id == cp_name
:
675 self
._log
.debug("Failed to find cp %s in internal connection points",
677 msg
= "Failed to find cp %s in internal connection points" % cp_name
678 raise VduRecordError(msg
)
680 # return the VLR associated with the connection point
681 return vnfr
.find_vlr_by_cp(cp_name
)
683 block
= xact
.block_create()
685 self
._log
.debug("Executing vm request id: %s, action: create",
688 # Resolve the networks associated external interfaces
689 for ext_intf
in self
._vdud
.external_interface
:
690 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
691 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
692 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
694 self
._log
.debug("Failed to find connection point - %s",
695 ext_intf
.vnfd_connection_point_ref
)
697 self
._log
.debug("Connection point name [%s], type[%s]",
698 cp
.name
, cp
.type_yang
)
700 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
702 etuple
= (ext_intf
, cp
.name
, vlr
)
703 self
._ext
_intf
.append(etuple
)
705 self
._log
.debug("Created external interface tuple : %s", etuple
)
707 # Resolve the networks associated internal interfaces
708 for intf
in self
._vdud
.internal_interface
:
709 cp_id
= intf
.vdu_internal_connection_point_ref
710 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
714 vlr
= find_internal_vlr_by_cp_name(cp_id
)
715 except Exception as e
:
716 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
717 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
718 raise VduRecordError(msg
)
720 ituple
= (intf
, cp_id
, vlr
)
721 self
._int
_intf
.append(ituple
)
723 self
._log
.debug("Created internal interface tuple : %s", ituple
)
725 resmgr_path
= self
.resmgr_path
726 resmgr_msg
= self
.resmgr_msg(config
)
728 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
729 block
.add_query_create(resmgr_path
, resmgr_msg
)
731 res_iter
= yield from block
.execute(now
=True)
739 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
740 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
741 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
742 return resp
.resource_info
745 def delete_resource(self
, xact
):
746 block
= xact
.block_create()
748 self
._log
.debug("Executing vm request id: %s, action: delete",
751 block
.add_query_delete(self
.resmgr_path
)
753 yield from block
.execute(flags
=0, now
=True)
756 def read_resource(self
, xact
):
757 block
= xact
.block_create()
759 self
._log
.debug("Executing vm request id: %s, action: delete",
762 block
.add_query_read(self
.resmgr_path
)
764 res_iter
= yield from block
.execute(flags
=0, now
=True)
769 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
770 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
771 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
772 #self._vm_resp = resp.resource_info
773 return resp
.resource_info
777 def start_component(self
):
778 """ This VDUR is active """
779 self
._log
.debug("Starting component %s for vdud %s vdur %s",
780 self
._vdud
.vcs_component_ref
,
783 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
784 self
.vm_resp
.management_ip
)
788 """ Is this VDU active """
789 return True if self
._state
is VDURecordState
.READY
else False
792 def instantiation_failed(self
, failed_reason
=None):
793 """ VDU instantiation failed """
794 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
795 self
._state
= VDURecordState
.FAILED
796 self
._state
_failed
_reason
= failed_reason
797 yield from self
._vnfr
.instantiation_failed(failed_reason
)
800 def vdu_is_active(self
):
801 """ This VDU is active"""
803 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
806 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
808 if self
._vdud
.vcs_component_ref
is not None:
809 yield from self
.start_component()
811 self
._state
= VDURecordState
.READY
813 if self
._vnfr
.all_vdus_active():
814 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
815 yield from self
._vnfr
.is_ready()
818 def instantiate(self
, xact
, vnfr
, config
=None):
819 """ Instantiate this VDU """
820 self
._state
= VDURecordState
.INSTANTIATING
823 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
824 """ This VDUR is active """
825 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
830 if (query_action
== rwdts
.QueryAction
.UPDATE
or
831 query_action
== rwdts
.QueryAction
.CREATE
):
834 if msg
.resource_state
== "active":
835 # Move this VDU to ready state
836 yield from self
.vdu_is_active()
837 elif msg
.resource_state
== "failed":
838 yield from self
.instantiation_failed(msg
.resource_errors
)
839 elif query_action
== rwdts
.QueryAction
.DELETE
:
840 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
842 raise NotImplementedError(
843 "%s action on VirtualDeployementUnitRecord not supported",
846 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
849 reg_event
= asyncio
.Event(loop
=self
._loop
)
852 def on_ready(regh
, status
):
855 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
856 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
857 flags
=rwdts
.Flag
.SUBSCRIBER
,
859 yield from reg_event
.wait()
861 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
862 self
._vm
_resp
= vm_resp
864 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
865 self
._log
.debug("Requested VM from resource manager response %s",
867 if vm_resp
.resource_state
== "active":
868 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
870 yield from self
.vdu_is_active()
871 self
._state
= VDURecordState
.READY
872 elif (vm_resp
.resource_state
== "pending" or
873 vm_resp
.resource_state
== "inactive"):
874 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
876 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
877 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
878 # flags=rwdts.Flag.SUBSCRIBER,
881 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
883 raise VirtualDeploymentUnitRecordError(
884 "Failed VDUR instantiation %s " % vm_resp
)
886 except Exception as e
:
888 traceback
.print_exc()
889 self
._log
.exception(e
)
890 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
891 self
._state
= VDURecordState
.FAILED
892 yield from self
.instantiation_failed(str(e
))
895 class VlRecordState(enum
.Enum
):
896 """ VL Record State """
898 INSTANTIATION_PENDING
= 102
900 TERMINATE_PENDING
= 104
905 class InternalVirtualLinkRecord(object):
906 """ Internal Virtual Link record """
907 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
911 self
._ivld
_msg
= ivld_msg
912 self
._vnfr
_name
= vnfr_name
913 self
._cloud
_account
_name
= cloud_account_name
915 self
._vlr
_req
= self
.create_vlr()
917 self
._state
= VlRecordState
.INIT
921 """ Find VLR by id """
922 return self
._vlr
_req
.id
926 """ Name of this VL """
927 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
930 def network_id(self
):
931 """ Find VLR by id """
932 return self
._vlr
.network_id
if self
._vlr
else None
935 """ VLR path for this VLR instance"""
936 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
938 def create_vlr(self
):
939 """ Create the VLR record which will be instantiated """
941 vld_fields
= ["short_name",
948 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
950 vlr_dict
= {"id": str(uuid
.uuid4()),
952 "cloud_account": self
._cloud
_account
_name
,
954 vlr_dict
.update(vld_copy_dict
)
956 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
960 def instantiate(self
, xact
, restart_mode
=False):
961 """ Instantiate VL """
964 def instantiate_vlr():
965 """ Instantiate VLR"""
966 self
._log
.debug("Create VL with xpath %s and vlr %s",
967 self
.vlr_path(), self
._vlr
_req
)
969 with self
._dts
.transaction(flags
=0) as xact
:
970 block
= xact
.block_create()
971 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
972 self
._log
.debug("Executing VL create path:%s msg:%s",
973 self
.vlr_path(), self
._vlr
_req
)
977 res_iter
= yield from block
.execute()
979 self
._state
= VlRecordState
.FAILED
980 self
._log
.exception("Caught exception while instantial VL")
985 self
._vlr
= res
.result
987 if self
._vlr
.operational_status
== 'failed':
988 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
989 self
._state
= VlRecordState
.FAILED
990 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
992 self
._log
.info("Created VL with xpath %s and vlr %s",
993 self
.vlr_path(), self
._vlr
)
997 """ Get the network id """
998 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1000 for ent
in res_iter
:
1001 res
= yield from ent
1005 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1007 raise InternalVirtualLinkRecordError(err
)
1010 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1013 vl
= yield from get_vlr()
1015 yield from instantiate_vlr()
1017 yield from instantiate_vlr()
1019 self
._state
= VlRecordState
.ACTIVE
1021 def vlr_in_vns(self
):
1022 """ Is there a VLR record in VNS """
1023 if (self
._state
== VlRecordState
.ACTIVE
or
1024 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1025 self
._state
== VlRecordState
.FAILED
):
1031 def terminate(self
, xact
):
1032 """Terminate this VL """
1033 if not self
.vlr_in_vns():
1034 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1035 self
.vlr_id
, self
._state
)
1038 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1039 self
._state
= VlRecordState
.TERMINATE_PENDING
1040 block
= xact
.block_create()
1041 block
.add_query_delete(self
.vlr_path())
1042 yield from block
.execute(flags
=0, now
=True)
1043 self
._state
= VlRecordState
.TERMINATED
1044 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1047 class VirtualNetworkFunctionRecord(object):
1048 """ Virtual Network Function Record """
1049 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
):
1053 self
._cluster
_name
= cluster_name
1054 self
._vnfr
_msg
= vnfr_msg
1055 self
._vnfr
_id
= vnfr_msg
.id
1056 self
._vnfd
_id
= vnfr_msg
.vnfd_ref
1058 self
._vcs
_handler
= vcs_handler
1059 self
._vnfr
= vnfr_msg
1062 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1063 self
._state
_failed
_reason
= None
1064 self
._ext
_vlrs
= {} # The list of external virtual links
1065 self
._vlrs
= [] # The list of internal virtual links
1066 self
._vdus
= [] # The list of vdu
1067 self
._vlr
_by
_cp
= {}
1069 self
._inventory
= {}
1070 self
._create
_time
= int(time
.time())
1071 self
._vnf
_mon
= None
1072 self
._config
_status
= vnfr_msg
.config_status
1073 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1075 def _get_vdur_from_vdu_id(self
, vdu_id
):
1076 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1077 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1078 for vdu
in self
._vdus
:
1079 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1080 if vdu
.vdu_id
== vdu_id
:
1083 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1086 def operational_status(self
):
1087 """ Operational status of this VNFR """
1088 op_status_map
= {"INIT": "init",
1089 "VL_INIT_PHASE": "vl_init_phase",
1090 "VM_INIT_PHASE": "vm_init_phase",
1092 "TERMINATE": "terminate",
1093 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1094 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1095 "TERMINATED": "terminated",
1096 "FAILED": "failed", }
1097 return op_status_map
[self
._state
.name
]
1100 def vnfd_xpath(self
):
1101 """ VNFD xpath associated with this VNFR """
1102 return("C,/vnfd:vnfd-catalog/"
1103 "vnfd:vnfd[vnfd:id = '{}']".format(self
._vnfd
_id
))
1107 """ VNFD for this VNFR """
1112 """ VNFD name associated with this VNFR """
1113 return self
.vnfd
.name
1117 """ Name of this VNF in the record """
1118 return self
._vnfr
.name
1121 def cloud_account_name(self
):
1122 """ Name of the cloud account this VNFR is instantiated in """
1123 return self
._vnfr
.cloud_account
1127 """ VNFD Id associated with this VNFR """
1132 """ VNFR Id associated with this VNFR """
1133 return self
._vnfr
_id
1136 def member_vnf_index(self
):
1137 """ Member VNF index associated with this VNFR """
1138 return self
._vnfr
.member_vnf_index_ref
1141 def config_status(self
):
1142 """ Config agent status for this VNFR """
1143 return self
._config
_status
1145 def component_by_name(self
, component_name
):
1146 """ Find a component by name in the inventory list"""
1147 mangled_name
= VcsComponent
.mangle_name(component_name
,
1150 return self
._inventory
[mangled_name
]
1155 def get_nsr_config(self
):
1156 ### Need access to NS instance configuration for runtime resolution.
1157 ### This shall be replaced when deployment flavors are implemented
1158 xpath
= "C,/nsr:ns-instance-config"
1159 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1161 for result
in results
:
1162 entry
= yield from result
1163 ns_instance_config
= entry
.result
1164 for nsr
in ns_instance_config
.nsr
:
1165 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1170 def start_component(self
, component_name
, ip_addr
):
1171 """ Start a component in the VNFR by name """
1172 comp
= self
.component_by_name(component_name
)
1173 yield from comp
.start(None, None, ip_addr
)
1175 def cp_ip_addr(self
, cp_name
):
1176 """ Get ip address for connection point """
1177 self
._log
.debug("cp_ip_addr()")
1178 for cp
in self
._cprs
:
1179 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1180 return cp
.ip_address
1183 def mgmt_intf_info(self
):
1184 """ Get Management interface info for this VNFR """
1185 mgmt_intf_desc
= self
.vnfd
.msg
.mgmt_interface
1187 if mgmt_intf_desc
.has_field("cp"):
1188 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1189 elif mgmt_intf_desc
.has_field("vdu_id"):
1191 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1192 ip_addr
= vdur
.management_ip
1193 except VDURecordNotFound
:
1194 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1197 ip_addr
= mgmt_intf_desc
.ip_address
1198 port
= mgmt_intf_desc
.port
1200 return ip_addr
, port
1204 """ Message associated with this VNFR """
1205 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1206 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.msg
.as_dict().items() if k
in vnfd_fields
}
1208 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1209 ip_address
, port
= self
.mgmt_intf_info()
1211 if ip_address
is not None:
1212 mgmt_intf
.ip_address
= ip_address
1213 if port
is not None:
1214 mgmt_intf
.port
= port
1216 vnfr_dict
= {"id": self
._vnfr
_id
,
1217 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1219 "member_vnf_index_ref": self
.member_vnf_index
,
1220 "vnfd_ref": self
.vnfd_id
,
1221 "operational_status": self
.operational_status
,
1222 "operational_status_details": self
._state
_failed
_reason
,
1223 "cloud_account": self
.cloud_account_name
,
1224 "config_status": self
._config
_status
1227 vnfr_dict
.update(vnfd_copy_dict
)
1229 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1230 vnfr_msg
.mgmt_interface
= mgmt_intf
1232 # Add all the VLRs to VNFR
1233 for vlr
in self
._vlrs
:
1234 ivlr
= vnfr_msg
.internal_vlr
.add()
1235 ivlr
.vlr_ref
= vlr
.vlr_id
1237 # Add all the VDURs to VDUR
1238 if self
._vdus
is not None:
1239 for vdu
in self
._vdus
:
1240 vdur
= vnfr_msg
.vdur
.add()
1241 vdur
.from_dict(vdu
.msg
.as_dict())
1243 if self
.vnfd
.msg
.mgmt_interface
.has_field('dashboard_params'):
1244 vnfr_msg
.dashboard_url
= self
.dashboard_url
1246 for cpr
in self
._cprs
:
1247 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1248 vnfr_msg
.connection_point
.append(new_cp
)
1250 if self
._vnf
_mon
is not None:
1251 for monp
in self
._vnf
_mon
.msg
:
1252 vnfr_msg
.monitoring_param
.append(
1253 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1255 if self
._vnfr
.vnf_configuration
is not None:
1256 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1257 if (ip_address
is not None and
1258 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1259 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1261 for group
in self
._vnfr
_msg
.placement_groups_info
:
1262 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1263 group_info
.from_dict(group
.as_dict())
1264 vnfr_msg
.placement_groups_info
.append(group_info
)
1269 def dashboard_url(self
):
1270 ip
, cfg_port
= self
.mgmt_intf_info()
1273 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('https'):
1274 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.https
is True:
1277 if self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.has_field('port'):
1278 http_port
= self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.port
1280 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1284 path
=self
.vnfd
.msg
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1291 """ path for this VNFR """
1292 return("D,/vnfr:vnfr-catalog"
1293 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1296 def publish(self
, xact
):
1297 """ publish this VNFR """
1299 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1300 self
.xpath
, self
.msg
)
1301 vnfr
.create_time
= self
._create
_time
1302 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1303 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1304 self
.xpath
, self
.msg
)
1307 def create_vls(self
):
1308 """ Publish The VLs associated with this VNF """
1309 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1311 for ivld_msg
in self
.vnfd
.msg
.internal_vld
:
1312 self
._log
.debug("Creating internal vld:"
1313 " %s, int_cp_ref = %s",
1314 ivld_msg
, ivld_msg
.internal_connection_point_ref
1316 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1320 vnfr_name
=self
.name
,
1321 cloud_account_name
=self
.cloud_account_name
1323 self
._vlrs
.append(vlr
)
1325 for int_cp
in ivld_msg
.internal_connection_point_ref
:
1326 if int_cp
in self
._vlr
_by
_cp
:
1327 msg
= ("Connection point %s already "
1328 " bound %s" % (int_cp
, self
._vlr
_by
_cp
[int_cp
]))
1329 raise InternalVirtualLinkRecordError(msg
)
1330 self
._log
.debug("Setting vlr %s to internal cp = %s",
1332 self
._vlr
_by
_cp
[int_cp
] = vlr
1335 def instantiate_vls(self
, xact
, restart_mode
=False):
1336 """ Instantiate the VLs associated with this VNF """
1337 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1340 for vlr
in self
._vlrs
:
1341 self
._log
.debug("Instantiating VLR %s", vlr
)
1342 yield from vlr
.instantiate(xact
, restart_mode
)
1344 def find_vlr_by_cp(self
, cp_name
):
1345 """ Find the VLR associated with the cp name """
1346 return self
._vlr
_by
_cp
[cp_name
]
1348 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1350 Returns the cloud specific construct for placement group
1352 input_group: VNFD PlacementGroup
1353 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1355 copy_dict
= ['name', 'requirement', 'strategy']
1356 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1357 if group_info
.placement_group_ref
== input_group
.name
and \
1358 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1359 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1360 group_dict
= {k
:v
for k
,v
in
1361 group_info
.as_dict().items()
1362 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1363 for param
in copy_dict
:
1364 group_dict
.update({param
: getattr(input_group
, param
)})
1365 group
.from_dict(group_dict
)
1370 def get_vdu_placement_groups(self
, vdu
):
1371 placement_groups
= []
1372 ### Step-1: Get VNF level placement groups
1373 for group
in self
._vnfr
_msg
.placement_groups_info
:
1374 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1375 #group_info.from_dict(group.as_dict())
1376 placement_groups
.append(group
)
1378 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1379 nsr_config
= yield from self
.get_nsr_config()
1381 ### Step-3: Get VDU level placement groups
1382 for group
in self
.vnfd
.msg
.placement_groups
:
1383 for member_vdu
in group
.member_vdus
:
1384 if member_vdu
.member_vdu_ref
== vdu
.id:
1385 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1387 if group_info
is None:
1388 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1389 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1391 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1395 self
.member_vnf_index
)
1396 placement_groups
.append(group_info
)
1398 return placement_groups
1401 def create_vdus(self
, vnfr
, restart_mode
=False):
1402 """ Create the VDUs associated with this VNF """
1404 def get_vdur_id(vdud
):
1405 """Get the corresponding VDUR's id for the VDUD. This is useful in
1408 In restart mode we check for exiting VDUR's ID and use them, if
1409 available. This way we don't end up creating duplicate VDURs
1413 if restart_mode
and vdud
is not None:
1415 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1418 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1423 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1424 for vdu
in self
.vnfd
.msg
.vdu
:
1425 self
._log
.debug("Creating vdu: %s", vdu
)
1426 vdur_id
= get_vdur_id(vdu
)
1428 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1429 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1432 self
.member_vnf_index
,
1433 [ group
.name
for group
in placement_groups
])
1435 vdur
= VirtualDeploymentUnitRecord(
1441 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1442 cloud_account_name
=self
.cloud_account_name
,
1443 vnfd_package_store
=self
._vnfd
_package
_store
,
1445 placement_groups
= placement_groups
,
1447 yield from vdur
.vdu_opdata_register()
1449 self
._vdus
.append(vdur
)
1452 def instantiate_vdus(self
, xact
, vnfr
):
1453 """ Instantiate the VDUs associated with this VNF """
1454 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1456 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1458 # Identify any dependencies among the VDUs
1459 dependencies
= collections
.defaultdict(list)
1460 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1462 for vdu
in self
._vdus
:
1463 if vdu
.vdud_cloud_init
is not None:
1464 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1465 if vdu_id
!= vdu
.vdu_id
:
1466 # This means that vdu.vdu_id depends upon vdu_id,
1467 # i.e. vdu_id must be instantiated before
1469 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1471 # Define the terminal states of VDU instantiation
1473 VDURecordState
.READY
,
1474 VDURecordState
.TERMINATED
,
1475 VDURecordState
.FAILED
,
1478 datastore
= VdurDatastore()
1482 def instantiate_monitor(vdu
):
1483 """Monitor the state of the VDU during instantiation
1486 vdu - a VirtualDeploymentUnitRecord
1489 # wait for the VDUR to enter a terminal state
1490 while vdu
._state
not in terminal
:
1491 yield from asyncio
.sleep(1, loop
=self
._loop
)
1493 # update the datastore
1494 datastore
.update(vdu
)
1496 # add the VDU to the set of processed VDUs
1497 processed
.add(vdu
.vdu_id
)
1500 def instantiate(vdu
):
1501 """Instantiate the specified VDU
1504 vdu - a VirtualDeploymentUnitRecord
1507 if the VDU, or any of the VDUs this VDU depends upon, are
1508 terminated or fail to instantiate properly, a
1509 VirtualDeploymentUnitRecordError is raised.
1512 for dependency
in dependencies
[vdu
.vdu_id
]:
1513 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1515 while dependency
.vdu_id
not in processed
:
1516 yield from asyncio
.sleep(1, loop
=self
._loop
)
1518 if not dependency
.active
:
1519 raise VirtualDeploymentUnitRecordError()
1521 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1523 # Populate the datastore with the current values of the VDU
1526 # Substitute any variables contained in the cloud config script
1527 config
= str(vdu
.vdud_cloud_init
)
1529 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1532 # Extract the variable names
1534 for variable
in parts
[1::2]:
1535 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1537 # Iterate of the variables and substitute values from the
1539 for variable
in variables
:
1541 # Handle a reference to a VDU by ID
1542 if variable
.startswith('vdu['):
1543 value
= datastore
.get(variable
)
1545 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1546 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1548 config
= config
.replace("{{ %s }}" % variable
, value
)
1551 # Handle a reference to the current VDU
1552 if variable
.startswith('vdu'):
1553 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1554 config
= config
.replace("{{ %s }}" % variable
, value
)
1557 # Handle unrecognized variables
1558 msg
= 'unrecognized cloud-config variable: {}'
1559 raise ValueError(msg
.format(variable
))
1561 # Instantiate the VDU
1562 with self
._dts
.transaction() as xact
:
1563 self
._log
.debug("Instantiating vdu: %s", vdu
)
1564 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1565 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1566 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1569 # First create a set of tasks to monitor the state of the VDUs and
1570 # report when they have entered a terminal state
1571 for vdu
in self
._vdus
:
1572 self
._loop
.create_task(instantiate_monitor(vdu
))
1574 for vdu
in self
._vdus
:
1575 self
._loop
.create_task(instantiate(vdu
))
1577 def has_mgmt_interface(self
, vdu
):
1578 # ## TODO: Support additional mgmt_interface type options
1579 if self
.vnfd
.msg
.mgmt_interface
.vdu_id
== vdu
.id:
1583 def vlr_xpath(self
, vlr_id
):
1586 "D,/vlr:vlr-catalog/"
1587 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1589 def ext_vlr_by_id(self
, vlr_id
):
1590 """ find ext vlr by id """
1591 return self
._ext
_vlrs
[vlr_id
]
1594 def publish_inventory(self
, xact
):
1595 """ Publish the inventory associated with this VNF """
1596 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1598 for component
in self
.vnfd
.msg
.component
:
1599 self
._log
.debug("Creating inventory component %s", component
)
1600 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1604 comp
= VcsComponent(dts
=self
._dts
,
1607 cluster_name
=self
._cluster
_name
,
1608 vcs_handler
=self
._vcs
_handler
,
1609 component
=component
,
1610 mangled_name
=mangled_name
,
1612 if comp
.name
in self
._inventory
:
1613 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1614 component
, self
._vnfd
_id
)
1616 self
._log
.debug("Adding component %s for vnrf %s",
1617 comp
.name
, self
._vnfr
_id
)
1618 self
._inventory
[comp
.name
] = comp
1619 yield from comp
.publish(xact
)
1621 def all_vdus_active(self
):
1622 """ Are all VDUS in this VNFR active? """
1623 for vdu
in self
._vdus
:
1627 self
._log
.debug("Inside all_vdus_active. Returning True")
1631 def instantiation_failed(self
, failed_reason
=None):
1632 """ VNFR instantiation failed """
1633 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1634 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1635 self
._state
_failed
_reason
= failed_reason
1637 # Update the VNFR with the changed status
1638 yield from self
.publish(None)
1642 """ This VNF is ready"""
1643 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1645 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1646 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1649 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1651 # Update the VNFR with the changed status
1652 yield from self
.publish(None)
1654 def update_cp(self
, cp_name
, ip_address
, cp_id
):
1655 """Updated the connection point with ip address"""
1656 for cp
in self
._cprs
:
1657 if cp
.name
== cp_name
:
1658 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1659 cp_name
, cp
, ip_address
, cp_id
)
1660 cp
.ip_address
= ip_address
1661 cp
.connection_point_id
= cp_id
1664 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1665 self
._log
.debug(err
)
1666 raise VirtualDeploymentUnitRecordError(err
)
1668 def set_state(self
, state
):
1669 """ Set state for this VNFR"""
1673 def instantiate(self
, xact
, restart_mode
=False):
1674 """ instantiate this VNF """
1675 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1680 # Iterate over all the connection points in VNFR and fetch the
1683 def cpr_from_cp(cp
):
1684 """ Creates a record level connection point from the desciptor cp"""
1685 cp_fields
= ["name", "image", "vm-flavor"]
1686 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1688 cpr_dict
.update(cp_copy_dict
)
1689 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1691 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1692 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1694 for cp
in self
._vnfr
.connection_point
:
1695 cpr
= cpr_from_cp(cp
)
1696 self
._cprs
.append(cpr
)
1697 self
._log
.debug("Adding Connection point record %s ", cp
)
1699 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1700 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1701 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1702 rwdts
.XactFlag
.MERGE
)
1706 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1707 cpr
.vlr_ref
= cp
.vlr_ref
1708 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1710 # Fetch the VNFD associated with the VNFR
1711 self
._log
.debug("VNFR-ID %s: Fetching vnfds", self
._vnfr
_id
)
1712 self
._vnfd
= yield from self
._vnfm
.get_vnfd_ref(self
._vnfd
_id
)
1713 self
._log
.debug("VNFR-ID %s: Fetched vnfd:%s", self
._vnfr
_id
, self
._vnfd
)
1715 assert self
.vnfd
is not None
1717 # Fetch External VLRs
1718 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1719 yield from fetch_vlrs()
1722 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1723 yield from self
.publish_inventory(xact
)
1726 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1727 yield from self
.create_vls()
1730 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1731 yield from self
.publish(xact
)
1734 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1736 yield from self
.instantiate_vls(xact
, restart_mode
)
1737 except Exception as e
:
1738 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1739 yield from self
.instantiation_failed(str(e
))
1742 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1745 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1746 yield from self
.create_vdus(self
, restart_mode
)
1749 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1750 yield from self
.publish(xact
)
1753 # ToDo: Check if this should be prevented during restart
1754 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1755 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1758 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1759 yield from self
.publish(xact
)
1761 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1764 def terminate(self
, xact
):
1765 """ Terminate this virtual network function """
1767 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1769 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1772 if self
._vnf
_mon
is not None:
1773 self
._vnf
_mon
.stop()
1774 self
._vnf
_mon
.deregister()
1775 self
._vnf
_mon
= None
1778 def terminate_vls():
1779 """ Terminate VLs in this VNF """
1780 for vl
in self
._vlrs
:
1781 yield from vl
.terminate(xact
)
1784 def terminate_vdus():
1785 """ Terminate VDUS in this VNF """
1786 for vdu
in self
._vdus
:
1787 yield from vdu
.terminate(xact
)
1789 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1790 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1791 yield from terminate_vls()
1793 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1794 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1795 yield from terminate_vdus()
1797 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1798 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1801 class VnfdDtsHandler(object):
1802 """ DTS handler for VNFD config changes """
1803 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1805 def __init__(self
, dts
, log
, loop
, vnfm
):
1814 """ DTS registration handle """
1819 """ Register for VNFD configuration"""
1821 def on_apply(dts
, acg
, xact
, action
, scratch
):
1822 """Apply the configuration"""
1823 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1824 xact
, action
, scratch
)
1826 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1827 # Create/Update a VNFD record
1828 for cfg
in self
._regh
.get_xact_elements(xact
):
1829 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1830 if cfg
.id in scratch
.get('vnfds', []) or is_recovery
:
1831 self
._vnfm
.update_vnfd(cfg
)
1833 scratch
.pop('vnfds', None)
1836 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1837 """ on prepare callback """
1838 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1839 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1840 fref
= ProtobufC
.FieldReference
.alloc()
1841 fref
.goto_whole_message(msg
.to_pbcm())
1843 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1844 if fref
.is_field_deleted():
1845 # Delete an VNFD record
1846 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1847 if self
._vnfm
.vnfd_in_use(msg
.id):
1848 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1849 err
= "Cannot delete a VNFD in use - %s" % msg
1850 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1851 # Delete a VNFD record
1852 yield from self
._vnfm
.delete_vnfd(msg
.id)
1854 # Handle actual adds/updates in apply_callback,
1855 # just check if VNFD in use in prepare_callback
1856 if self
._vnfm
.vnfd_in_use(msg
.id):
1857 self
._log
.debug("Cannot modify an VNFD in use - %s", msg
)
1858 err
= "Cannot modify an VNFD in use - %s" % msg
1859 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1861 # Add this VNFD to scratch to create/update in apply callback
1862 vnfds
= scratch
.setdefault('vnfds', [])
1863 vnfds
.append(msg
.id)
1865 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1868 "Registering for VNFD config using xpath: %s",
1869 VnfdDtsHandler
.XPATH
,
1871 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1872 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1873 self
._regh
= acg
.register(
1874 xpath
=VnfdDtsHandler
.XPATH
,
1875 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1876 on_prepare
=on_prepare
)
1879 class VcsComponentDtsHandler(object):
1880 """ Vcs Component DTS handler """
1881 XPATH
= ("D,/rw-manifest:manifest" +
1882 "/rw-manifest:operational-inventory" +
1883 "/rw-manifest:component")
1885 def __init__(self
, dts
, log
, loop
, vnfm
):
1894 """ DTS registration handle """
1899 """ Registers VCS component dts publisher registration"""
1900 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1901 VcsComponentDtsHandler
.XPATH
)
1903 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1904 handlers
= rift
.tasklets
.Group
.Handler()
1905 with self
._dts
.group_create(handler
=handlers
) as group
:
1906 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1908 flags
=(rwdts
.Flag
.PUBLISHER |
1909 rwdts
.Flag
.NO_PREP_READ |
1910 rwdts
.Flag
.DATASTORE
),)
1913 def publish(self
, xact
, path
, msg
):
1914 """ Publishes the VCS component """
1915 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1917 self
.regh
.create_element(path
, msg
)
1918 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1919 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1921 class VnfrConsoleOperdataDtsHandler(object):
1922 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1924 def vnfr_vdu_console_xpath(self
):
1925 """ path for resource-mgr"""
1926 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1928 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
1935 self
._vnfr
_id
= vnfr_id
1936 self
._vdur
_id
= vdur_id
1937 self
._vdu
_id
= vdu_id
1941 """ Register for VNFR VDU Operational Data read from dts """
1944 def on_prepare(xact_info
, action
, ks_path
, msg
):
1945 """ prepare callback from dts """
1946 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
1948 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1949 xact_info
, action
, xpath
, msg
1952 if action
== rwdts
.QueryAction
.READ
:
1953 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
1954 path_entry
= schema
.keyspec_to_entry(ks_path
)
1955 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
1957 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
1958 except VnfRecordError
as e
:
1959 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
1960 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1963 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
1964 if not vdur
._state
== VDURecordState
.READY
:
1965 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
1966 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1968 with self
._dts
.transaction() as new_xact
:
1969 resp
= yield from vdur
.read_resource(new_xact
)
1970 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1971 vdur_console
.id = self
._vdur
_id
1972 if resp
.console_url
:
1973 vdur_console
.console_url
= resp
.console_url
1975 vdur_console
.console_url
= 'none'
1976 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
1978 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
1979 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
1980 vdur_console
.id = self
._vdur
_id
1981 vdur_console
.console_url
= 'none'
1983 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
1984 xpath
=self
.vnfr_vdu_console_xpath
,
1987 #raise VnfRecordError("Not supported operation %s" % action)
1988 self
._log
.error("Not supported operation %s" % action
)
1989 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
1993 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
1994 self
.vnfr_vdu_console_xpath
)
1995 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
1996 with self
._dts
.group_create() as group
:
1997 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
1999 flags
=rwdts
.Flag
.PUBLISHER
,
2003 class VnfrDtsHandler(object):
2004 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2005 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2007 def __init__(self
, dts
, log
, loop
, vnfm
):
2017 """ Return registration handle"""
2022 """ Return VNF manager instance """
2027 """ Register for vnfr create/update/delete/read requests from dts """
2028 def on_commit(xact_info
):
2029 """ The transaction has been committed """
2030 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2031 return rwdts
.MemberRspCode
.ACTION_OK
2033 def on_abort(*args
):
2034 """ Abort callback """
2035 self
._log
.debug("VNF transaction got aborted")
2038 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2041 def instantiate_realloc_vnfr(vnfr
):
2042 """Re-populate the vnfm after restart
2049 yield from vnfr
.instantiate(None, restart_mode
=True)
2051 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2052 curr_cfg
= self
.regh
.elements
2053 for cfg
in curr_cfg
:
2054 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2055 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2057 self
._log
.debug("Got on_event in vnfm")
2059 return rwdts
.MemberRspCode
.ACTION_OK
2062 def on_prepare(xact_info
, action
, ks_path
, msg
):
2063 """ prepare callback from dts """
2065 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2066 xact_info
, action
, msg
2069 if action
== rwdts
.QueryAction
.CREATE
:
2070 if not msg
.has_field("vnfd_ref"):
2071 err
= "Vnfd reference not provided"
2072 self
._log
.error(err
)
2073 raise VnfRecordError(err
)
2075 vnfr
= self
.vnfm
.create_vnfr(msg
)
2077 # RIFT-9105: Unable to add a READ query under an existing transaction
2078 # xact = xact_info.xact
2079 yield from vnfr
.instantiate(None)
2080 except Exception as e
:
2081 self
._log
.exception(e
)
2082 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2083 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2084 yield from vnfr
.publish(None)
2085 elif action
== rwdts
.QueryAction
.DELETE
:
2086 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2087 path_entry
= schema
.keyspec_to_entry(ks_path
)
2088 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2091 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2092 raise VirtualNetworkFunctionRecordNotFound(
2093 "VNFR id %s", path_entry
.key00
.id)
2096 yield from vnfr
.terminate(xact_info
.xact
)
2099 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2100 except Exception as e
:
2101 self
._log
.exception(e
)
2102 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2104 elif action
== rwdts
.QueryAction
.UPDATE
:
2105 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2106 path_entry
= schema
.keyspec_to_entry(ks_path
)
2109 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2110 except Exception as e
:
2111 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2112 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2116 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2117 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2120 self
._log
.debug("VNFR {} update config status {} (current {})".
2121 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2122 # Update the config status and publish
2123 vnfr
._config
_status
= msg
.config_status
2124 yield from vnfr
.publish(None)
2127 raise NotImplementedError(
2128 "%s action on VirtualNetworkFunctionRecord not supported",
2131 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2133 self
._log
.debug("Registering for VNFR using xpath: %s",
2134 VnfrDtsHandler
.XPATH
,)
2136 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2137 on_prepare
=on_prepare
,)
2138 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2139 with self
._dts
.group_create(handler
=handlers
) as group
:
2140 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2142 flags
=(rwdts
.Flag
.PUBLISHER |
2143 rwdts
.Flag
.NO_PREP_READ |
2145 rwdts
.Flag
.DATASTORE
),)
2148 def create(self
, xact
, path
, msg
):
2150 Create a VNFR record in DTS with path and message
2152 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2155 self
.regh
.create_element(path
, msg
)
2156 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2160 def update(self
, xact
, path
, msg
):
2162 Update a VNFR record in DTS with path and message
2164 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2166 self
.regh
.update_element(path
, msg
)
2167 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2171 def delete(self
, xact
, path
):
2173 Delete a VNFR record in DTS with path and message
2175 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2176 self
.regh
.delete_element(path
)
2177 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2180 class VirtualNetworkFunctionDescriptor(object):
2182 Virtual Network Function descriptor class
2185 def __init__(self
, dts
, log
, loop
, vnfm
, vnfd
):
2195 def ref_count(self
):
2196 """ Returns the reference count associated with
2197 this Virtual Network Function Descriptor"""
2198 return self
._ref
_count
2202 """ Returns vnfd id """
2203 return self
._vnfd
.id
2207 """ Returns vnfd name """
2208 return self
._vnfd
.name
2211 """ Returns whether vnfd is in use or not """
2212 return True if self
._ref
_count
> 0 else False
2215 """ Take a reference on this object """
2216 self
._ref
_count
+= 1
2217 return self
._ref
_count
2220 """ Release reference on this object """
2221 if self
.ref_count
< 1:
2222 msg
= ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2223 (self
.id, self
._ref
_count
))
2224 self
._log
.critical(msg
)
2225 raise VnfRecordError(msg
)
2226 self
._log
.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2227 self
.id, self
.ref_count
)
2228 self
._ref
_count
-= 1
2229 return self
._ref
_count
2233 """ Return the message associated with this NetworkServiceDescriptor"""
2237 def path_for_id(vnfd_id
):
2238 """ Return path for the passed vnfd_id"""
2239 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
2242 """ Return the path associated with this NetworkServiceDescriptor"""
2243 return VirtualNetworkFunctionDescriptor
.path_for_id(self
.id)
2245 def update(self
, vnfd
):
2246 """ Update the Virtual Network Function Descriptor """
2248 self
._log
.error("Cannot update descriptor %s in use refcnt=%d",
2249 self
.id, self
.ref_count
)
2251 # The following loop is added to debug RIFT-13284
2252 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2253 if vnf_rec
.vnfd_id
== self
.id:
2254 self
._log
.error("descriptor %s in used by %s:%s",
2255 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2256 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self
.id)
2260 """ Delete the Virtual Network Function Descriptor """
2262 self
._log
.error("Cannot delete descriptor %s in use refcnt=%d",
2265 # The following loop is added to debug RIFT-13284
2266 for vnf_rec
in self
._vnfm
._vnfrs
.values():
2267 if vnf_rec
.vnfd_id
== self
.id:
2268 self
._log
.error("descriptor %s in used by %s:%s",
2269 self
.id, vnf_rec
.vnfr_id
, vnf_rec
.msg
)
2270 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self
.id)
2271 self
._vnfm
.delete_vnfd(self
.id)
2274 class VnfdRefCountDtsHandler(object):
2275 """ The VNFD Ref Count DTS handler """
2276 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2278 def __init__(self
, dts
, log
, loop
, vnfm
):
2288 """ Return registration handle """
2293 """ Return the NS manager instance """
2298 """ Register for VNFD ref count read from dts """
2301 def on_prepare(xact_info
, action
, ks_path
, msg
):
2302 """ prepare callback from dts """
2303 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2305 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2306 xact_info
, action
, xpath
, msg
2309 if action
== rwdts
.QueryAction
.READ
:
2310 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2311 path_entry
= schema
.keyspec_to_entry(ks_path
)
2312 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2313 for xpath
, msg
in vnfd_list
:
2314 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2316 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2319 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2321 raise VnfRecordError("Not supported operation %s" % action
)
2323 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2324 with self
._dts
.group_create() as group
:
2325 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2327 flags
=rwdts
.Flag
.PUBLISHER
,
2331 class VdurDatastore(object):
2333 This VdurDatastore is intended to expose select information about a VDUR
2334 such that it can be referenced in a cloud config file. The data that is
2335 exposed does not necessarily follow the structure of the data in the yang
2336 model. This is intentional. The data that are exposed are intended to be
2337 agnostic of the yang model so that changes in the model do not necessarily
2338 require changes to the interface provided to the user. It also means that
2339 the user does not need to be familiar with the RIFT.ware yang models.
2343 """Create an instance of VdurDatastore"""
2344 self
._vdur
_data
= dict()
2345 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2347 def add(self
, vdur
):
2348 """Add a new VDUR to the datastore
2351 vdur - a VirtualDeploymentUnitRecord instance
2354 A ValueError is raised if the VDUR is (1) None or (2) already in
2358 if vdur
.vdu_id
is None:
2359 raise ValueError('VDURs are required to have an ID')
2361 if vdur
.vdu_id
in self
._vdur
_data
:
2362 raise ValueError('cannot add a VDUR more than once')
2364 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2366 def set_if_not_none(key
, attr
):
2367 if attr
is not None:
2368 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2370 set_if_not_none('name', vdur
._vdud
.name
)
2371 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2373 def update(self
, vdur
):
2374 """Update the VDUR information in the datastore
2377 vdur - a GI representation of a VDUR
2380 A ValueError is raised if the VDUR is (1) None or (2) already in
2384 if vdur
.vdu_id
is None:
2385 raise ValueError('VNFDs are required to have an ID')
2387 if vdur
.vdu_id
not in self
._vdur
_data
:
2388 raise ValueError('VNF is not recognized')
2390 def set_or_delete(key
, attr
):
2392 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2393 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2396 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2398 set_or_delete('name', vdur
._vdud
.name
)
2399 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2401 def remove(self
, vdur_id
):
2402 """Remove all of the data associated with specified VDUR
2405 vdur_id - the identifier of a VNFD in the datastore
2408 A ValueError is raised if the VDUR is not contained in the
2412 if vdur_id
not in self
._vdur
_data
:
2413 raise ValueError('VNF is not recognized')
2415 del self
._vdur
_data
[vdur_id
]
2417 def get(self
, expr
):
2418 """Retrieve VDUR information from the datastore
2420 An expression should be of the form,
2424 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2425 the exposed attribute that the user wishes to retrieve.
2427 If the requested data is not available, None is returned.
2430 expr - a string that specifies the data to return
2433 A ValueError is raised if the provided expression cannot be parsed.
2436 The requested data or None
2439 result
= self
._pattern
.match(expr
)
2441 raise ValueError('data expression not recognized ({})'.format(expr
))
2443 vdur_id
, key
= result
.groups()
2445 if vdur_id
not in self
._vdur
_data
:
2448 return self
._vdur
_data
[vdur_id
].get(key
, None)
2451 class VnfManager(object):
2452 """ The virtual network function manager class """
2453 def __init__(self
, dts
, log
, loop
, cluster_name
):
2457 self
._cluster
_name
= cluster_name
2459 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2460 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2462 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2465 VnfdRefCountDtsHandler(dts
, log
, loop
, self
)]
2470 def vnfr_handler(self
):
2471 """ VNFR dts handler """
2472 return self
._vnfr
_handler
2475 def vcs_handler(self
):
2476 """ VCS dts handler """
2477 return self
._vcs
_handler
2481 """ Register all static DTS handlers """
2482 for hdl
in self
._dts
_handlers
:
2483 yield from hdl
.register()
2487 """ Run this VNFM instance """
2488 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2489 yield from self.register()
2491 def get_vnfr(self, vnfr_id):
2492 """ get VNFR by vnfr id """
2494 if vnfr_id not in self._vnfrs:
2495 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2497 return self._vnfrs[vnfr_id]
2499 def create_vnfr(self, vnfr):
2500 """ Create a VNFR instance """
2501 if vnfr.id in self._vnfrs:
2502 msg = "Vnfr
id %s already exists
" % vnfr.id
2503 self._log.error(msg)
2504 raise VnfRecordError(msg)
2506 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2510 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2511 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr
2513 return self._vnfrs[vnfr.id]
2516 def delete_vnfr(self, xact, vnfr):
2517 """ Create a VNFR instance """
2518 if vnfr.vnfr_id in self._vnfrs:
2519 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2520 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2521 del self._vnfrs[vnfr.vnfr_id]
2524 def fetch_vnfd(self, vnfd_id):
2525 """ Fetch VNFDs based with the vnfd id"""
2526 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2527 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2530 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2532 for ent in res_iter:
2533 res = yield from ent
2537 err = "Failed to get Vnfd
%s" % vnfd_id
2538 self._log.error(err)
2539 raise VnfRecordError(err)
2541 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2546 def get_vnfd_ref(self, vnfd_id):
2547 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2548 vnfd = yield from self.get_vnfd(vnfd_id)
2553 def get_vnfd(self, vnfd_id):
2554 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2556 if vnfd_id not in self._vnfds:
2557 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2558 vnfd = yield from self.fetch_vnfd(vnfd_id)
2561 self._log.error("Cannot find VNFD
id:%s", vnfd_id)
2562 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD
id:%s", vnfd_id)
2564 if vnfd.id != vnfd_id:
2565 self._log.error("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2566 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state
{} found
for {}".format(vnfd.id, vnfd_id))
2568 if vnfd.id not in self._vnfds:
2569 self.create_vnfd(vnfd)
2571 return self._vnfds[vnfd_id]
2573 def vnfd_in_use(self, vnfd_id):
2574 """ Is this VNFD in use """
2575 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2576 if vnfd_id in self._vnfds:
2577 return self._vnfds[vnfd_id].in_use()
2581 def publish_vnfr(self, xact, path, msg):
2582 """ Publish a VNFR """
2583 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2585 yield from self.vnfr_handler.update(xact, path, msg)
2587 def create_vnfd(self, vnfd):
2588 """ Create a virtual network function descriptor """
2589 self._log.debug("Create virtual networkfunction descriptor
- %s", vnfd)
2590 if vnfd.id in self._vnfds:
2591 self._log.error("Cannot create VNFD
%s -VNFD
id already exists
", vnfd)
2592 raise VirtualNetworkFunctionDescriptorError("VNFD already exists
-%s", vnfd.id)
2594 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2599 return self._vnfds[vnfd.id]
2601 def update_vnfd(self, vnfd):
2602 """ update the Virtual Network Function descriptor """
2603 self._log.debug("Update virtual network function descriptor
- %s", vnfd)
2605 # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511
2606 for ivld in vnfd.internal_vld:
2607 ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref))
2609 if vnfd.id not in self._vnfds:
2610 self._log.debug("No VNFD found
- creating VNFD
id = %s", vnfd.id)
2611 self.create_vnfd(vnfd)
2613 self._log.debug("Updating VNFD
id = %s, vnfd
= %s", vnfd.id, vnfd)
2614 self._vnfds[vnfd.id].update(vnfd)
2617 def delete_vnfd(self, vnfd_id):
2618 """ Delete the Virtual Network Function descriptor with the passed id """
2619 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2620 if vnfd_id not in self._vnfds:
2621 self._log.debug("Delete VNFD failed
- cannot find vnfd
-id %s", vnfd_id)
2622 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find
%s", vnfd_id)
2624 if self._vnfds[vnfd_id].in_use():
2625 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2627 self._vnfds[vnfd_id].ref_count)
2628 raise VirtualNetworkFunctionDescriptorRefCountExists(
2629 "Cannot delete
:%s, ref_count
:%s",
2631 self._vnfds[vnfd_id].ref_count)
2633 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2635 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2636 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2637 if os.path.exists(vnfd_dir):
2638 shutil.rmtree(vnfd_dir, ignore_errors=True)
2639 except Exception as e:
2640 self._log.error("Exception in cleaning up VNFD
{}: {}".
2641 format(self._vnfds[vnfd_id].name, e))
2642 self._log.exception(e)
2644 del self._vnfds[vnfd_id]
2646 def vnfd_refcount_xpath(self, vnfd_id):
2647 """ xpath for ref count entry """
2648 return (VnfdRefCountDtsHandler.XPATH +
2649 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2652 def get_vnfd_refcount(self, vnfd_id):
2653 """ Get the vnfd_list from this VNFM"""
2655 if vnfd_id is None or vnfd_id == "":
2656 for vnfd in self._vnfds.values():
2657 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2658 vnfd_msg.vnfd_id_ref = vnfd.id
2659 vnfd_msg.instance_ref_count = vnfd.ref_count
2660 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2661 elif vnfd_id in self._vnfds:
2662 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2663 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2664 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2669 class VnfmTasklet(rift.tasklets.Tasklet):
2670 """ VNF Manager tasklet class """
2671 def __init__(self, *args, **kwargs):
2672 super(VnfmTasklet, self).__init__(*args, **kwargs)
2673 self.rwlog.set_category("rw
-mano
-log
")
2674 self.rwlog.set_subcategory("vnfm
")
2681 super(VnfmTasklet, self).start()
2682 self.log.info("Starting VnfmTasklet
")
2684 self.log.setLevel(logging.DEBUG)
2686 self.log.debug("Registering with dts
")
2687 self._dts = rift.tasklets.DTS(self.tasklet_info,
2688 RwVnfmYang.get_schema(),
2690 self.on_dts_state_change)
2692 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2694 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2697 def on_instance_started(self):
2698 """ Task insance started callback """
2699 self.log.debug("Got instance started callback
")
2705 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2710 """ Task init callback """
2712 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2713 assert vm_parent_name is not None
2714 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2715 yield from self._vnfm.run()
2717 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2722 """ Task run callback """
2726 def on_dts_state_change(self, state):
2727 """Take action according to current dts state to transition
2728 application into the corresponding application state
2731 state - current dts state
2734 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2735 rwdts.State.CONFIG: rwdts.State.RUN,
2739 rwdts.State.INIT: self.init,
2740 rwdts.State.RUN: self.run,
2743 # Transition application to next state
2744 handler = handlers.get(state, None)
2745 if handler is not None:
2746 yield from handler()
2748 # Transition dts to next state
2749 next_state = switch.get(state, None)
2750 if next_state is not None:
2751 self._dts.handle.set_state(next_state)