2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwVnfrYang', '1.0')
31 gi
.require_version('RwVnfmYang', '1.0')
32 gi
.require_version('RwVlrYang', '1.0')
33 gi
.require_version('RwManifestYang', '1.0')
34 gi
.require_version('RwBaseYang', '1.0')
35 gi
.require_version('RwResourceMgrYang', '1.0')
37 from gi
.repository
import (
50 import rift
.package
.store
51 import rift
.package
.cloud_init
52 import rift
.mano
.dts
as mano_dts
55 class VMResourceError(Exception):
56 """ VM resource Error"""
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
70 class NotImplemented(Exception):
71 """Not implemented """
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
135 class VNFMPlacementGroupError(Exception):
138 class VirtualNetworkFunctionRecordState(enum
.Enum
):
145 VL_TERMINATE_PHASE
= 6
146 VDU_TERMINATE_PHASE
= 7
151 class VDURecordState(enum
.Enum
):
152 """VDU record state """
155 RESOURCE_ALLOC_PENDING
= 3
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self
, dts
, log
, loop
, cluster_name
, vcs_handler
, component
, mangled_name
):
168 self
._component
= component
169 self
._cluster
_name
= cluster_name
170 self
._vcs
_handler
= vcs_handler
171 self
._mangled
_name
= mangled_name
174 def mangle_name(component_name
, vnf_name
, vnfd_id
):
175 """ mangled component name """
176 return vnf_name
+ ":" + component_name
+ ":" + vnfd_id
180 """ name of this component"""
181 return self
._mangled
_name
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self
.name
)
192 def instance_xpath(self
):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
197 "[instance-name = '{}']".format(self
._cluster
_name
))
200 def start_comp_xpath(self
):
201 """ start component xpath """
202 return (self
.instance_xpath
+
203 "/child-n[instance-name = 'START-REQ']")
205 def get_start_comp_msg(self
, ip_address
):
206 """ start this component """
207 start_msg
= RwBaseYang
.VcsInstance_Instance_ChildN()
208 start_msg
.instance_name
= 'START-REQ'
209 start_msg
.component_name
= self
.name
210 start_msg
.admin_command
= "START"
211 start_msg
.ip_address
= ip_address
217 """ Returns the message for this vcs component"""
219 vcs_comp_dict
= self
._component
.as_dict()
221 def mangle_comp_names(comp_dict
):
222 """ mangle component name with VNF name, id"""
223 for key
, val
in comp_dict
.items():
224 if isinstance(val
, dict):
225 comp_dict
[key
] = mangle_comp_names(val
)
226 elif isinstance(val
, list):
229 if isinstance(ent
, dict):
230 val
[i
] = mangle_comp_names(ent
)
234 elif key
== "component_name":
235 comp_dict
[key
] = VcsComponent
.mangle_name(val
,
240 mangled_dict
= mangle_comp_names(vcs_comp_dict
)
241 msg
= RwManifestYang
.OpInventory_Component
.from_dict(mangled_dict
)
245 def publish(self
, xact
):
246 """ Publishes the VCS component """
247 self
._log
.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self
.name
, self
.path
, self
.msg
)
249 yield from self
._vcs
_handler
.publish(xact
, self
.path
, self
.msg
)
252 def start(self
, xact
, parent
, ip_addr
=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg
= self
.get_start_comp_msg(ip_addr
)
256 self
._log
.debug("starting component %s %s",
257 self
.start_comp_xpath
, start_msg
)
258 yield from self
._dts
.query_create(self
.start_comp_xpath
,
261 self
._log
.debug("started component %s, %s",
262 self
.start_comp_xpath
, start_msg
)
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
278 placement_groups
=[]):
284 self
._mgmt
_intf
= mgmt_intf
285 self
._cloud
_account
_name
= cloud_account_name
286 self
._vnfd
_package
_store
= vnfd_package_store
287 self
._mgmt
_network
= mgmt_network
289 self
._vdur
_id
= vdur_id
or str(uuid
.uuid4())
292 self
._state
= VDURecordState
.INIT
293 self
._state
_failed
_reason
= None
294 self
._request
_id
= str(uuid
.uuid4())
295 self
._name
= vnfr
.name
+ "__" + vdud
.id
296 self
._placement
_groups
= placement_groups
299 self
._vdud
_cloud
_init
= None
300 self
._vdur
_console
_handler
= VnfrConsoleOperdataDtsHandler(dts
, log
, loop
, self
._vnfr
._vnfm
, self
._vnfr
.vnfr_id
, self
._vdur
_id
,self
.vdu_id
)
303 def vdu_opdata_register(self
):
304 yield from self
._vdur
_console
_handler
.register()
306 def cp_ip_addr(self
, cp_name
):
307 """ Find ip address by connection point name """
308 if self
._vm
_resp
is not None:
309 for conn_point
in self
._vm
_resp
.connection_points
:
310 if conn_point
.name
== cp_name
:
311 return conn_point
.ip_address
314 def cp_mac_addr(self
, cp_name
):
315 """ Find mac address by connection point name """
316 if self
._vm
_resp
is not None:
317 for conn_point
in self
._vm
_resp
.connection_points
:
318 if conn_point
.name
== cp_name
:
319 return conn_point
.mac_addr
320 return "00:00:00:00:00:00"
322 def cp_id(self
, cp_name
):
323 """ Find connection point id by connection point name """
324 if self
._vm
_resp
is not None:
325 for conn_point
in self
._vm
_resp
.connection_points
:
326 if conn_point
.name
== cp_name
:
327 return conn_point
.connection_point_id
340 """ Return this VDUR's name """
344 def cloud_account_name(self
):
345 """ Cloud account this VDU should be created in """
346 return self
._cloud
_account
_name
349 def image_name(self
):
350 """ name that should be used to lookup the image on the CMP """
351 if 'image' not in self
._vdud
:
353 return os
.path
.basename(self
._vdud
.image
)
356 def image_checksum(self
):
357 """ name that should be used to lookup the image on the CMP """
358 return self
._vdud
.image_checksum
if self
._vdud
.has_field("image_checksum") else None
361 def management_ip(self
):
364 return self
._vm
_resp
.public_ip
if self
._vm
_resp
.has_field('public_ip') else self
._vm
_resp
.management_ip
367 def vm_management_ip(self
):
370 return self
._vm
_resp
.management_ip
373 def operational_status(self
):
374 """ Operational status of this VDU"""
375 op_stats_dict
= {"INIT": "init",
376 "INSTANTIATING": "vm_init_phase",
377 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
380 "TERMINATING": "terminated",
381 "TERMINATED": "terminated",
383 return op_stats_dict
[self
._state
.name
]
388 vdu_fields
= ["vm_flavor",
395 vdu_copy_dict
= {k
: v
for k
, v
in
396 self
._vdud
.as_dict().items() if k
in vdu_fields
}
397 vdur_dict
= {"id": self
._vdur
_id
,
398 "vdu_id_ref": self
._vdud
.id,
399 "operational_status": self
.operational_status
,
400 "operational_status_details": self
._state
_failed
_reason
,
402 if self
.vm_resp
is not None:
403 vdur_dict
.update({"vim_id": self
.vm_resp
.vdu_id
,
404 "flavor_id": self
.vm_resp
.flavor_id
406 if self
._vm
_resp
.has_field('image_id'):
407 vdur_dict
.update({ "image_id": self
.vm_resp
.image_id
})
409 if self
.management_ip
is not None:
410 vdur_dict
["management_ip"] = self
.management_ip
412 if self
.vm_management_ip
is not None:
413 vdur_dict
["vm_management_ip"] = self
.vm_management_ip
415 vdur_dict
.update(vdu_copy_dict
)
417 if self
.vm_resp
is not None:
418 if self
._vm
_resp
.has_field('volumes'):
419 for opvolume
in self
._vm
_resp
.volumes
:
420 vdurvol_data
= [vduvol
for vduvol
in vdur_dict
['volumes'] if vduvol
['name'] == opvolume
.name
]
421 if len(vdurvol_data
) == 1:
422 vdurvol_data
[0]["volume_id"] = opvolume
.volume_id
427 for intf
, cp_id
, vlr
in self
._int
_intf
:
428 cp
= self
.find_internal_cp_by_cp_id(cp_id
)
430 icp_list
.append({"name": cp
.name
,
432 "type_yang": "VPORT",
433 "ip_address": self
.cp_ip_addr(cp
.id),
434 "mac_address": self
.cp_mac_addr(cp
.id)})
436 ii_list
.append({"name": intf
.name
,
437 "vdur_internal_connection_point_ref": cp
.id,
438 "virtual_interface": {}})
440 vdur_dict
["internal_connection_point"] = icp_list
441 self
._log
.debug("internal_connection_point:%s", vdur_dict
["internal_connection_point"])
442 vdur_dict
["internal_interface"] = ii_list
445 for intf
, cp
, vlr
in self
._ext
_intf
:
446 ei_list
.append({"name": cp
,
447 "vnfd_connection_point_ref": cp
,
448 "virtual_interface": {}})
449 self
._vnfr
.update_cp(cp
,
451 self
.cp_mac_addr(cp
),
454 vdur_dict
["external_interface"] = ei_list
456 placement_groups
= []
457 for group
in self
._placement
_groups
:
458 placement_groups
.append(group
.as_dict())
459 vdur_dict
['placement_groups_info'] = placement_groups
461 return RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur
.from_dict(vdur_dict
)
464 def resmgr_path(self
):
465 """ path for resource-mgr"""
466 return ("D,/rw-resource-mgr:resource-mgmt" +
468 "/vdu-event-data[event-id='{}']".format(self
._request
_id
))
471 def vm_flavor_msg(self
):
472 """ VM flavor message """
473 flavor
= self
._vdud
.vm_flavor
.__class
__()
474 flavor
.copy_from(self
._vdud
.vm_flavor
)
479 def vdud_cloud_init(self
):
480 """ Return the cloud-init contents for the VDU """
481 if self
._vdud
_cloud
_init
is None:
482 self
._vdud
_cloud
_init
= self
.cloud_init()
484 return self
._vdud
_cloud
_init
486 def cloud_init(self
):
487 """ Populate cloud_init with cloud-config script from
488 either the inline contents or from the file provided
490 if self
._vdud
.cloud_init
is not None:
491 self
._log
.debug("cloud_init script provided inline %s", self
._vdud
.cloud_init
)
492 return self
._vdud
.cloud_init
493 elif self
._vdud
.cloud_init_file
is not None:
494 # Get cloud-init script contents from the file provided in the cloud_init_file param
495 self
._log
.debug("cloud_init script provided in file %s", self
._vdud
.cloud_init_file
)
496 filename
= self
._vdud
.cloud_init_file
497 self
._vnfd
_package
_store
.refresh()
498 stored_package
= self
._vnfd
_package
_store
.get_package(self
._vnfr
.vnfd_id
)
499 cloud_init_extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
501 return cloud_init_extractor
.read_script(stored_package
, filename
)
502 except rift
.package
.cloud_init
.CloudInitExtractionError
as e
:
503 raise VirtualDeploymentUnitRecordError(e
)
505 self
._log
.debug("VDU Instantiation: cloud-init script not provided")
507 def process_openstack_placement_group_construct(self
, vm_create_msg_dict
):
509 availability_zones
= []
511 for group
in self
._placement
_groups
:
512 if group
.has_field('host_aggregate'):
513 for aggregate
in group
.host_aggregate
:
514 host_aggregates
.append(aggregate
.as_dict())
515 if group
.has_field('availability_zone'):
516 availability_zones
.append(group
.availability_zone
.as_dict())
517 if group
.has_field('server_group'):
518 server_groups
.append(group
.server_group
.as_dict())
520 if availability_zones
:
521 if len(availability_zones
) > 1:
522 self
._log
.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self
.name
, availability_zones
)
523 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self
.name
, availability_zones
))
525 vm_create_msg_dict
['availability_zone'] = availability_zones
[0]
528 if len(server_groups
) > 1:
529 self
._log
.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self
.name
, server_groups
)
530 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self
.name
, server_groups
))
532 vm_create_msg_dict
['server_group'] = server_groups
[0]
535 vm_create_msg_dict
['host_aggregate'] = host_aggregates
539 def process_placement_groups(self
, vm_create_msg_dict
):
540 """Process the placement_groups and fill resource-mgr request"""
541 if not self
._placement
_groups
:
544 cloud_set
= set([group
.cloud_type
for group
in self
._placement
_groups
])
545 assert len(cloud_set
) == 1
546 cloud_type
= cloud_set
.pop()
548 if cloud_type
== 'openstack':
549 self
.process_openstack_placement_group_construct(vm_create_msg_dict
)
552 self
._log
.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type
)
555 def resmgr_msg(self
, config
=None):
556 vdu_fields
= ["vm_flavor",
562 self
._log
.debug("Creating params based on VDUD: %s", self
._vdud
)
563 vdu_copy_dict
= {k
: v
for k
, v
in self
._vdud
.as_dict().items() if k
in vdu_fields
}
565 vm_create_msg_dict
= {
569 if self
.image_name
is not None:
570 vm_create_msg_dict
["image_name"] = self
.image_name
572 if self
.image_checksum
is not None:
573 vm_create_msg_dict
["image_checksum"] = self
.image_checksum
575 vm_create_msg_dict
["allocate_public_address"] = self
._mgmt
_intf
576 if self
._vdud
.has_field('mgmt_vpci'):
577 vm_create_msg_dict
["mgmt_vpci"] = self
._vdud
.mgmt_vpci
579 self
._log
.debug("VDUD: %s", self
._vdud
)
580 if config
is not None:
581 vm_create_msg_dict
['vdu_init'] = {'userdata': config
}
583 if self
._mgmt
_network
:
584 vm_create_msg_dict
['mgmt_network'] = self
._mgmt
_network
587 for intf
, cp
, vlr
in self
._ext
_intf
:
588 cp_info
= {"name": cp
,
589 "virtual_link_id": vlr
.network_id
,
590 "type_yang": intf
.virtual_interface
.type_yang
}
592 if (intf
.virtual_interface
.has_field('vpci') and
593 intf
.virtual_interface
.vpci
is not None):
594 cp_info
["vpci"] = intf
.virtual_interface
.vpci
596 if (vlr
.has_field('ip_profile_params')) and (vlr
.ip_profile_params
.has_field('security_group')):
597 cp_info
['security_group'] = vlr
.ip_profile_params
.security_group
599 cp_list
.append(cp_info
)
601 for intf
, cp
, vlr
in self
._int
_intf
:
602 if (intf
.virtual_interface
.has_field('vpci') and
603 intf
.virtual_interface
.vpci
is not None):
604 cp_list
.append({"name": cp
,
605 "virtual_link_id": vlr
.network_id
,
606 "type_yang": intf
.virtual_interface
.type_yang
,
607 "vpci": intf
.virtual_interface
.vpci
})
609 cp_list
.append({"name": cp
,
610 "virtual_link_id": vlr
.network_id
,
611 "type_yang": intf
.virtual_interface
.type_yang
})
613 vm_create_msg_dict
["connection_points"] = cp_list
614 vm_create_msg_dict
.update(vdu_copy_dict
)
616 self
.process_placement_groups(vm_create_msg_dict
)
618 msg
= RwResourceMgrYang
.VDUEventData()
619 msg
.event_id
= self
._request
_id
620 msg
.cloud_account
= self
.cloud_account_name
621 msg
.request_info
.from_dict(vm_create_msg_dict
)
623 for volume
in self
._vdud
.volumes
:
624 v
= msg
.request_info
.volumes
.add()
625 v
.from_dict(volume
.as_dict())
629 def terminate(self
, xact
):
630 """ Delete resource in VIM """
631 if self
._state
!= VDURecordState
.READY
and self
._state
!= VDURecordState
.FAILED
:
632 self
._log
.warning("VDU terminate in not ready state - Ignoring request")
635 self
._state
= VDURecordState
.TERMINATING
636 if self
._vm
_resp
is not None:
638 with self
._dts
.transaction() as new_xact
:
639 yield from self
.delete_resource(new_xact
)
641 self
._log
.exception("Caught exception while deleting VDU %s", self
.vdu_id
)
643 if self
._rm
_regh
is not None:
644 self
._log
.debug("Deregistering resource manager registration handle")
645 self
._rm
_regh
.deregister()
648 if self
._vdur
_console
_handler
is not None:
649 self
._log
.error("Deregistering vnfr vdur registration handle")
650 self
._vdur
_console
_handler
._regh
.deregister()
651 self
._vdur
_console
_handler
._regh
= None
653 self
._state
= VDURecordState
.TERMINATED
655 def find_internal_cp_by_cp_id(self
, cp_id
):
656 """ Find the CP corresponding to the connection point id"""
659 self
._log
.debug("find_internal_cp_by_cp_id(%s) called",
662 for int_cp
in self
._vdud
.internal_connection_point
:
663 self
._log
.debug("Checking for int cp %s in internal connection points",
665 if int_cp
.id == cp_id
:
670 self
._log
.debug("Failed to find cp %s in internal connection points",
672 msg
= "Failed to find cp %s in internal connection points" % cp_id
673 raise VduRecordError(msg
)
675 # return the VLR associated with the connection point
679 def create_resource(self
, xact
, vnfr
, config
=None):
680 """ Request resource from ResourceMgr """
681 def find_cp_by_name(cp_name
):
682 """ Find a connection point by name """
684 self
._log
.debug("find_cp_by_name(%s) called", cp_name
)
685 for ext_cp
in vnfr
._cprs
:
686 self
._log
.debug("Checking ext cp (%s) called", ext_cp
.name
)
687 if ext_cp
.name
== cp_name
:
691 self
._log
.debug("Failed to find cp %s in external connection points",
695 def find_internal_vlr_by_cp_name(cp_name
):
696 """ Find the VLR corresponding to the connection point name"""
699 self
._log
.debug("find_internal_vlr_by_cp_name(%s) called",
702 for int_cp
in self
._vdud
.internal_connection_point
:
703 self
._log
.debug("Checking for int cp %s in internal connection points",
705 if int_cp
.id == cp_name
:
710 self
._log
.debug("Failed to find cp %s in internal connection points",
712 msg
= "Failed to find cp %s in internal connection points" % cp_name
713 raise VduRecordError(msg
)
715 # return the VLR associated with the connection point
716 return vnfr
.find_vlr_by_cp(cp_name
)
718 block
= xact
.block_create()
720 self
._log
.debug("Executing vm request id: %s, action: create",
723 # Resolve the networks associated external interfaces
724 for ext_intf
in self
._vdud
.external_interface
:
725 self
._log
.debug("Resolving external interface name [%s], cp[%s]",
726 ext_intf
.name
, ext_intf
.vnfd_connection_point_ref
)
727 cp
= find_cp_by_name(ext_intf
.vnfd_connection_point_ref
)
729 self
._log
.debug("Failed to find connection point - %s",
730 ext_intf
.vnfd_connection_point_ref
)
732 self
._log
.debug("Connection point name [%s], type[%s]",
733 cp
.name
, cp
.type_yang
)
735 vlr
= vnfr
.ext_vlr_by_id(cp
.vlr_ref
)
737 etuple
= (ext_intf
, cp
.name
, vlr
)
738 self
._ext
_intf
.append(etuple
)
740 self
._log
.debug("Created external interface tuple : %s", etuple
)
742 # Resolve the networks associated internal interfaces
743 for intf
in self
._vdud
.internal_interface
:
744 cp_id
= intf
.vdu_internal_connection_point_ref
745 self
._log
.debug("Resolving internal interface name [%s], cp[%s]",
749 vlr
= find_internal_vlr_by_cp_name(cp_id
)
750 except Exception as e
:
751 self
._log
.debug("Failed to find cp %s in internal VLR list", cp_id
)
752 msg
= "Failed to find cp %s in internal VLR list, e = %s" % (cp_id
, e
)
753 raise VduRecordError(msg
)
755 ituple
= (intf
, cp_id
, vlr
)
756 self
._int
_intf
.append(ituple
)
758 self
._log
.debug("Created internal interface tuple : %s", ituple
)
760 resmgr_path
= self
.resmgr_path
761 resmgr_msg
= self
.resmgr_msg(config
)
763 self
._log
.debug("Creating new VM request at: %s, params: %s", resmgr_path
, resmgr_msg
)
764 block
.add_query_create(resmgr_path
, resmgr_msg
)
766 res_iter
= yield from block
.execute(now
=True)
774 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
775 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
776 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
777 return resp
.resource_info
780 def delete_resource(self
, xact
):
781 block
= xact
.block_create()
783 self
._log
.debug("Executing vm request id: %s, action: delete",
786 block
.add_query_delete(self
.resmgr_path
)
788 yield from block
.execute(flags
=0, now
=True)
791 def read_resource(self
, xact
):
792 block
= xact
.block_create()
794 self
._log
.debug("Executing vm request id: %s, action: delete",
797 block
.add_query_read(self
.resmgr_path
)
799 res_iter
= yield from block
.execute(flags
=0, now
=True)
804 if resp
is None or not (resp
.has_field('resource_info') and resp
.resource_info
.has_field('resource_state')):
805 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp
)
806 self
._log
.debug("Got vm request response: %s", resp
.resource_info
)
807 #self._vm_resp = resp.resource_info
808 return resp
.resource_info
812 def start_component(self
):
813 """ This VDUR is active """
814 self
._log
.debug("Starting component %s for vdud %s vdur %s",
815 self
._vdud
.vcs_component_ref
,
818 yield from self
._vnfr
.start_component(self
._vdud
.vcs_component_ref
,
819 self
.vm_resp
.management_ip
)
823 """ Is this VDU active """
824 return True if self
._state
is VDURecordState
.READY
else False
827 def instantiation_failed(self
, failed_reason
=None):
828 """ VDU instantiation failed """
829 self
._log
.debug("VDU %s instantiation failed ", self
._vdur
_id
)
830 self
._state
= VDURecordState
.FAILED
831 self
._state
_failed
_reason
= failed_reason
832 yield from self
._vnfr
.instantiation_failed(failed_reason
)
835 def vdu_is_active(self
):
836 """ This VDU is active"""
838 self
._log
.warning("VDU %s was already marked as active", self
._vdur
_id
)
841 self
._log
.debug("VDUR id %s in VNFR %s is active", self
._vdur
_id
, self
._vnfr
.vnfr_id
)
843 if self
._vdud
.vcs_component_ref
is not None:
844 yield from self
.start_component()
846 self
._state
= VDURecordState
.READY
848 if self
._vnfr
.all_vdus_active():
849 self
._log
.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self
._vnfr
)
850 yield from self
._vnfr
.is_ready()
853 def instantiate(self
, xact
, vnfr
, config
=None):
854 """ Instantiate this VDU """
855 self
._state
= VDURecordState
.INSTANTIATING
858 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
859 """ This VDUR is active """
860 self
._log
.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
865 if (query_action
== rwdts
.QueryAction
.UPDATE
or
866 query_action
== rwdts
.QueryAction
.CREATE
):
869 if msg
.resource_state
== "active":
870 # Move this VDU to ready state
871 yield from self
.vdu_is_active()
872 elif msg
.resource_state
== "failed":
873 yield from self
.instantiation_failed(msg
.resource_errors
)
874 elif query_action
== rwdts
.QueryAction
.DELETE
:
875 self
._log
.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
877 raise NotImplementedError(
878 "%s action on VirtualDeployementUnitRecord not supported",
881 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
884 reg_event
= asyncio
.Event(loop
=self
._loop
)
887 def on_ready(regh
, status
):
890 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
891 self
._rm
_regh
= yield from self
._dts
.register(self
.resmgr_path
+ '/resource-info',
892 flags
=rwdts
.Flag
.SUBSCRIBER
,
894 yield from reg_event
.wait()
896 vm_resp
= yield from self
.create_resource(xact
, vnfr
, config
)
897 self
._vm
_resp
= vm_resp
899 self
._state
= VDURecordState
.RESOURCE_ALLOC_PENDING
900 self
._log
.debug("Requested VM from resource manager response %s",
902 if vm_resp
.resource_state
== "active":
903 self
._log
.debug("Resourcemgr responded wih an active vm resp %s",
905 yield from self
.vdu_is_active()
906 self
._state
= VDURecordState
.READY
907 elif (vm_resp
.resource_state
== "pending" or
908 vm_resp
.resource_state
== "inactive"):
909 self
._log
.debug("Resourcemgr responded wih a pending vm resp %s",
911 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
912 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
913 # flags=rwdts.Flag.SUBSCRIBER,
916 self
._log
.debug("Resourcemgr responded wih an error vm resp %s",
918 raise VirtualDeploymentUnitRecordError(
919 "Failed VDUR instantiation %s " % vm_resp
)
921 except Exception as e
:
923 traceback
.print_exc()
924 self
._log
.exception(e
)
925 self
._log
.error("Instantiation of VDU record failed: %s", str(e
))
926 self
._state
= VDURecordState
.FAILED
927 yield from self
.instantiation_failed(str(e
))
930 class VlRecordState(enum
.Enum
):
931 """ VL Record State """
933 INSTANTIATION_PENDING
= 102
935 TERMINATE_PENDING
= 104
940 class InternalVirtualLinkRecord(object):
941 """ Internal Virtual Link record """
942 def __init__(self
, dts
, log
, loop
, ivld_msg
, vnfr_name
, cloud_account_name
):
946 self
._ivld
_msg
= ivld_msg
947 self
._vnfr
_name
= vnfr_name
948 self
._cloud
_account
_name
= cloud_account_name
950 self
._vlr
_req
= self
.create_vlr()
952 self
._state
= VlRecordState
.INIT
956 """ Find VLR by id """
957 return self
._vlr
_req
.id
961 """ Name of this VL """
962 return self
._vnfr
_name
+ "." + self
._ivld
_msg
.name
965 def network_id(self
):
966 """ Find VLR by id """
967 return self
._vlr
.network_id
if self
._vlr
else None
970 """ VLR path for this VLR instance"""
971 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
.vlr_id
)
973 def create_vlr(self
):
974 """ Create the VLR record which will be instantiated """
976 vld_fields
= ["short_name",
983 vld_copy_dict
= {k
: v
for k
, v
in self
._ivld
_msg
.as_dict().items() if k
in vld_fields
}
985 vlr_dict
= {"id": str(uuid
.uuid4()),
987 "cloud_account": self
._cloud
_account
_name
,
989 vlr_dict
.update(vld_copy_dict
)
991 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
995 def instantiate(self
, xact
, restart_mode
=False):
996 """ Instantiate VL """
999 def instantiate_vlr():
1000 """ Instantiate VLR"""
1001 self
._log
.debug("Create VL with xpath %s and vlr %s",
1002 self
.vlr_path(), self
._vlr
_req
)
1004 with self
._dts
.transaction(flags
=0) as xact
:
1005 block
= xact
.block_create()
1006 block
.add_query_create(xpath
=self
.vlr_path(), msg
=self
._vlr
_req
)
1007 self
._log
.debug("Executing VL create path:%s msg:%s",
1008 self
.vlr_path(), self
._vlr
_req
)
1012 res_iter
= yield from block
.execute()
1014 self
._state
= VlRecordState
.FAILED
1015 self
._log
.exception("Caught exception while instantial VL")
1018 for ent
in res_iter
:
1019 res
= yield from ent
1020 self
._vlr
= res
.result
1022 if self
._vlr
.operational_status
== 'failed':
1023 self
._log
.debug("VL creation failed for vlr id %s", self
._vlr
.id)
1024 self
._state
= VlRecordState
.FAILED
1025 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self
._vlr
.id))
1027 self
._log
.info("Created VL with xpath %s and vlr %s",
1028 self
.vlr_path(), self
._vlr
)
1032 """ Get the network id """
1033 res_iter
= yield from self
._dts
.query_read(self
.vlr_path(), rwdts
.XactFlag
.MERGE
)
1035 for ent
in res_iter
:
1036 res
= yield from ent
1040 err
= "Failed to get VLR for path %s" % self
.vlr_path()
1042 raise InternalVirtualLinkRecordError(err
)
1045 self
._state
= VlRecordState
.INSTANTIATION_PENDING
1048 vl
= yield from get_vlr()
1050 yield from instantiate_vlr()
1052 yield from instantiate_vlr()
1054 self
._state
= VlRecordState
.ACTIVE
1056 def vlr_in_vns(self
):
1057 """ Is there a VLR record in VNS """
1058 if (self
._state
== VlRecordState
.ACTIVE
or
1059 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
1060 self
._state
== VlRecordState
.FAILED
):
1066 def terminate(self
, xact
):
1067 """Terminate this VL """
1068 if not self
.vlr_in_vns():
1069 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1070 self
.vlr_id
, self
._state
)
1073 self
._log
.debug("Terminating VL with path %s", self
.vlr_path())
1074 self
._state
= VlRecordState
.TERMINATE_PENDING
1075 block
= xact
.block_create()
1076 block
.add_query_delete(self
.vlr_path())
1077 yield from block
.execute(flags
=0, now
=True)
1078 self
._state
= VlRecordState
.TERMINATED
1079 self
._log
.debug("Terminated VL with path %s", self
.vlr_path())
1082 class VirtualNetworkFunctionRecord(object):
1083 """ Virtual Network Function Record """
1084 def __init__(self
, dts
, log
, loop
, cluster_name
, vnfm
, vcs_handler
, vnfr_msg
, mgmt_network
=None):
1088 self
._cluster
_name
= cluster_name
1089 self
._vnfr
_msg
= vnfr_msg
1090 self
._vnfr
_id
= vnfr_msg
.id
1091 self
._vnfd
_id
= vnfr_msg
.vnfd
.id
1093 self
._vcs
_handler
= vcs_handler
1094 self
._vnfr
= vnfr_msg
1095 self
._mgmt
_network
= mgmt_network
1097 self
._vnfd
= vnfr_msg
.vnfd
1098 self
._state
= VirtualNetworkFunctionRecordState
.INIT
1099 self
._state
_failed
_reason
= None
1100 self
._ext
_vlrs
= {} # The list of external virtual links
1101 self
._vlrs
= [] # The list of internal virtual links
1102 self
._vdus
= [] # The list of vdu
1103 self
._vlr
_by
_cp
= {}
1105 self
._inventory
= {}
1106 self
._create
_time
= int(time
.time())
1107 self
._vnf
_mon
= None
1108 self
._config
_status
= vnfr_msg
.config_status
1109 self
._vnfd
_package
_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
._log
)
1110 self
._rw
_vnfd
= None
1111 self
._vnfd
_ref
_count
= 0
1113 def _get_vdur_from_vdu_id(self
, vdu_id
):
1114 self
._log
.debug("Finding vdur for vdu_id %s", vdu_id
)
1115 self
._log
.debug("Searching through vdus: %s", self
._vdus
)
1116 for vdu
in self
._vdus
:
1117 self
._log
.debug("vdu_id: %s", vdu
.vdu_id
)
1118 if vdu
.vdu_id
== vdu_id
:
1121 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id
)
1124 def operational_status(self
):
1125 """ Operational status of this VNFR """
1126 op_status_map
= {"INIT": "init",
1127 "VL_INIT_PHASE": "vl_init_phase",
1128 "VM_INIT_PHASE": "vm_init_phase",
1130 "TERMINATE": "terminate",
1131 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1132 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1133 "TERMINATED": "terminated",
1134 "FAILED": "failed", }
1135 return op_status_map
[self
._state
.name
]
1138 def vnfd_xpath(vnfd_id
):
1139 """ VNFD xpath associated with this VNFR """
1140 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id
)
1143 def vnfd_ref_count(self
):
1144 """ Returns the VNFD reference count associated with this VNFR """
1145 return self
._vnfd
_ref
_count
1147 def vnfd_in_use(self
):
1148 """ Returns whether vnfd is in use or not """
1149 return True if self
._vnfd
_ref
_count
> 0 else False
1152 """ Take a reference on this object """
1153 self
._vnfd
_ref
_count
+= 1
1154 return self
._vnfd
_ref
_count
1156 def vnfd_unref(self
):
1157 """ Release reference on this object """
1158 if self
._vnfd
_ref
_count
< 1:
1159 msg
= ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1160 (self
.vnfd
.id, self
._vnfd
_ref
_count
))
1161 self
._log
.critical(msg
)
1162 raise VnfRecordError(msg
)
1163 self
._log
.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1164 self
.vnfd
.id, self
._vnfd
_ref
_count
)
1165 self
._vnfd
_ref
_count
-= 1
1166 return self
._vnfd
_ref
_count
1170 """ VNFD for this VNFR """
1175 """ VNFD name associated with this VNFR """
1176 return self
.vnfd
.name
1180 """ Name of this VNF in the record """
1181 return self
._vnfr
.name
1184 def cloud_account_name(self
):
1185 """ Name of the cloud account this VNFR is instantiated in """
1186 return self
._vnfr
.cloud_account
1190 """ VNFD Id associated with this VNFR """
1195 """ VNFR Id associated with this VNFR """
1196 return self
._vnfr
_id
1199 def member_vnf_index(self
):
1200 """ Member VNF index associated with this VNFR """
1201 return self
._vnfr
.member_vnf_index_ref
1204 def config_status(self
):
1205 """ Config agent status for this VNFR """
1206 return self
._config
_status
1208 def component_by_name(self
, component_name
):
1209 """ Find a component by name in the inventory list"""
1210 mangled_name
= VcsComponent
.mangle_name(component_name
,
1213 return self
._inventory
[mangled_name
]
1218 def get_nsr_config(self
):
1219 ### Need access to NS instance configuration for runtime resolution.
1220 ### This shall be replaced when deployment flavors are implemented
1221 xpath
= "C,/nsr:ns-instance-config"
1222 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
1224 for result
in results
:
1225 entry
= yield from result
1226 ns_instance_config
= entry
.result
1227 for nsr
in ns_instance_config
.nsr
:
1228 if nsr
.id == self
._vnfr
_msg
.nsr_id_ref
:
1233 def start_component(self
, component_name
, ip_addr
):
1234 """ Start a component in the VNFR by name """
1235 comp
= self
.component_by_name(component_name
)
1236 yield from comp
.start(None, None, ip_addr
)
1238 def cp_ip_addr(self
, cp_name
):
1239 """ Get ip address for connection point """
1240 self
._log
.debug("cp_ip_addr()")
1241 for cp
in self
._cprs
:
1242 if cp
.name
== cp_name
and cp
.ip_address
is not None:
1243 return cp
.ip_address
1246 def mgmt_intf_info(self
):
1247 """ Get Management interface info for this VNFR """
1248 mgmt_intf_desc
= self
.vnfd
.mgmt_interface
1250 if mgmt_intf_desc
.has_field("cp"):
1251 ip_addr
= self
.cp_ip_addr(mgmt_intf_desc
.cp
)
1252 elif mgmt_intf_desc
.has_field("vdu_id"):
1254 vdur
= self
._get
_vdur
_from
_vdu
_id
(mgmt_intf_desc
.vdu_id
)
1255 ip_addr
= vdur
.management_ip
1256 except VDURecordNotFound
:
1257 self
._log
.debug("Did not find mgmt interface for vnfr id %s", self
._vnfr
_id
)
1260 ip_addr
= mgmt_intf_desc
.ip_address
1261 port
= mgmt_intf_desc
.port
1263 return ip_addr
, port
1267 """ Message associated with this VNFR """
1268 vnfd_fields
= ["short_name", "vendor", "description", "version"]
1269 vnfd_copy_dict
= {k
: v
for k
, v
in self
.vnfd
.as_dict().items() if k
in vnfd_fields
}
1271 mgmt_intf
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1272 ip_address
, port
= self
.mgmt_intf_info()
1274 if ip_address
is not None:
1275 mgmt_intf
.ip_address
= ip_address
1276 if port
is not None:
1277 mgmt_intf
.port
= port
1279 vnfr_dict
= {"id": self
._vnfr
_id
,
1280 "nsr_id_ref": self
._vnfr
_msg
.nsr_id_ref
,
1282 "member_vnf_index_ref": self
.member_vnf_index
,
1283 "operational_status": self
.operational_status
,
1284 "operational_status_details": self
._state
_failed
_reason
,
1285 "cloud_account": self
.cloud_account_name
,
1286 "config_status": self
._config
_status
1289 vnfr_dict
.update(vnfd_copy_dict
)
1291 vnfr_msg
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
1292 vnfr_msg
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
1294 vnfr_msg
.create_time
= self
._create
_time
1295 vnfr_msg
.uptime
= int(time
.time()) - self
._create
_time
1296 vnfr_msg
.mgmt_interface
= mgmt_intf
1298 # Add all the VLRs to VNFR
1299 for vlr
in self
._vlrs
:
1300 ivlr
= vnfr_msg
.internal_vlr
.add()
1301 ivlr
.vlr_ref
= vlr
.vlr_id
1303 # Add all the VDURs to VDUR
1304 if self
._vdus
is not None:
1305 for vdu
in self
._vdus
:
1306 vdur
= vnfr_msg
.vdur
.add()
1307 vdur
.from_dict(vdu
.msg
.as_dict())
1309 if self
.vnfd
.mgmt_interface
.has_field('dashboard_params'):
1310 vnfr_msg
.dashboard_url
= self
.dashboard_url
1312 for cpr
in self
._cprs
:
1313 new_cp
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr
.as_dict())
1314 vnfr_msg
.connection_point
.append(new_cp
)
1316 if self
._vnf
_mon
is not None:
1317 for monp
in self
._vnf
_mon
.msg
:
1318 vnfr_msg
.monitoring_param
.append(
1319 VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(monp
.as_dict()))
1321 if self
._vnfr
.vnf_configuration
is not None:
1322 vnfr_msg
.vnf_configuration
.from_dict(self
._vnfr
.vnf_configuration
.as_dict())
1323 if (ip_address
is not None and
1324 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
is None):
1325 vnfr_msg
.vnf_configuration
.config_access
.mgmt_ip_address
= ip_address
1327 for group
in self
._vnfr
_msg
.placement_groups_info
:
1328 group_info
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1329 group_info
.from_dict(group
.as_dict())
1330 vnfr_msg
.placement_groups_info
.append(group_info
)
1335 def dashboard_url(self
):
1336 ip
, cfg_port
= self
.mgmt_intf_info()
1339 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('https'):
1340 if self
.vnfd
.mgmt_interface
.dashboard_params
.https
is True:
1343 if self
.vnfd
.mgmt_interface
.dashboard_params
.has_field('port'):
1344 http_port
= self
.vnfd
.mgmt_interface
.dashboard_params
.port
1346 url
= "{protocol}://{ip_address}:{port}/{path}".format(
1350 path
=self
.vnfd
.mgmt_interface
.dashboard_params
.path
.lstrip("/"),
1357 """ path for this VNFR """
1358 return("D,/vnfr:vnfr-catalog"
1359 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
))
1362 def publish(self
, xact
):
1363 """ publish this VNFR """
1365 self
._log
.debug("Publishing VNFR path = [%s], record = [%s]",
1366 self
.xpath
, self
.msg
)
1367 vnfr
.create_time
= self
._create
_time
1368 yield from self
._vnfm
.publish_vnfr(xact
, self
.xpath
, self
.msg
)
1369 self
._log
.debug("Published VNFR path = [%s], record = [%s]",
1370 self
.xpath
, self
.msg
)
1373 def create_vls(self
):
1374 """ Publish The VLs associated with this VNF """
1375 self
._log
.debug("Publishing Internal Virtual Links for vnfd id: %s",
1377 for ivld_msg
in self
.vnfd
.internal_vld
:
1378 self
._log
.debug("Creating internal vld:"
1379 " %s, int_cp_ref = %s",
1380 ivld_msg
, ivld_msg
.internal_connection_point
1382 vlr
= InternalVirtualLinkRecord(dts
=self
._dts
,
1386 vnfr_name
=self
.name
,
1387 cloud_account_name
=self
.cloud_account_name
1389 self
._vlrs
.append(vlr
)
1391 for int_cp
in ivld_msg
.internal_connection_point
:
1392 if int_cp
.id_ref
in self
._vlr
_by
_cp
:
1393 msg
= ("Connection point %s already "
1394 " bound %s" % (int_cp
.id_ref
, self
._vlr
_by
_cp
[int_cp
.id_ref
]))
1395 raise InternalVirtualLinkRecordError(msg
)
1396 self
._log
.debug("Setting vlr %s to internal cp = %s",
1398 self
._vlr
_by
_cp
[int_cp
.id_ref
] = vlr
1401 def instantiate_vls(self
, xact
, restart_mode
=False):
1402 """ Instantiate the VLs associated with this VNF """
1403 self
._log
.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1406 for vlr
in self
._vlrs
:
1407 self
._log
.debug("Instantiating VLR %s", vlr
)
1408 yield from vlr
.instantiate(xact
, restart_mode
)
1410 def find_vlr_by_cp(self
, cp_name
):
1411 """ Find the VLR associated with the cp name """
1412 return self
._vlr
_by
_cp
[cp_name
]
1414 def resolve_placement_group_cloud_construct(self
, input_group
, nsr_config
):
1416 Returns the cloud specific construct for placement group
1418 input_group: VNFD PlacementGroup
1419 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1421 copy_dict
= ['name', 'requirement', 'strategy']
1422 for group_info
in nsr_config
.vnfd_placement_group_maps
:
1423 if group_info
.placement_group_ref
== input_group
.name
and \
1424 group_info
.vnfd_id_ref
== self
.vnfd_id
:
1425 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1426 group_dict
= {k
:v
for k
,v
in
1427 group_info
.as_dict().items()
1428 if (k
!= 'placement_group_ref' and k
!='vnfd_id_ref')}
1429 for param
in copy_dict
:
1430 group_dict
.update({param
: getattr(input_group
, param
)})
1431 group
.from_dict(group_dict
)
1436 def get_vdu_placement_groups(self
, vdu
):
1437 placement_groups
= []
1438 ### Step-1: Get VNF level placement groups
1439 for group
in self
._vnfr
_msg
.placement_groups_info
:
1440 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1441 #group_info.from_dict(group.as_dict())
1442 placement_groups
.append(group
)
1444 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1445 nsr_config
= yield from self
.get_nsr_config()
1447 ### Step-3: Get VDU level placement groups
1448 for group
in self
.vnfd
.placement_groups
:
1449 for member_vdu
in group
.member_vdus
:
1450 if member_vdu
.member_vdu_ref
== vdu
.id:
1451 group_info
= self
.resolve_placement_group_cloud_construct(group
,
1453 if group_info
is None:
1454 self
._log
.info("Could not resolve cloud-construct for placement group: %s", group
.name
)
1455 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1457 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1461 self
.member_vnf_index
)
1462 placement_groups
.append(group_info
)
1464 return placement_groups
1467 def create_vdus(self
, vnfr
, restart_mode
=False):
1468 """ Create the VDUs associated with this VNF """
1470 def get_vdur_id(vdud
):
1471 """Get the corresponding VDUR's id for the VDUD. This is useful in
1474 In restart mode we check for exiting VDUR's ID and use them, if
1475 available. This way we don't end up creating duplicate VDURs
1479 if restart_mode
and vdud
is not None:
1481 vdur
= [vdur
.id for vdur
in vnfr
._vnfr
.vdur
if vdur
.vdu_id_ref
== vdud
.id]
1484 self
._log
.error("Unable to find a VDUR for VDUD {}".format(vdud
))
1489 self
._log
.info("Creating VDU's for vnfd id: %s", self
.vnfd_id
)
1490 for vdu
in self
._rw
_vnfd
.vdu
:
1491 self
._log
.debug("Creating vdu: %s", vdu
)
1492 vdur_id
= get_vdur_id(vdu
)
1494 placement_groups
= yield from self
.get_vdu_placement_groups(vdu
)
1495 self
._log
.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1498 self
.member_vnf_index
,
1499 [ group
.name
for group
in placement_groups
])
1501 vdur
= VirtualDeploymentUnitRecord(
1507 mgmt_intf
=self
.has_mgmt_interface(vdu
),
1508 mgmt_network
=self
._mgmt
_network
,
1509 cloud_account_name
=self
.cloud_account_name
,
1510 vnfd_package_store
=self
._vnfd
_package
_store
,
1512 placement_groups
= placement_groups
,
1514 yield from vdur
.vdu_opdata_register()
1516 self
._vdus
.append(vdur
)
1519 def instantiate_vdus(self
, xact
, vnfr
):
1520 """ Instantiate the VDUs associated with this VNF """
1521 self
._log
.debug("Instantiating VDU's for vnfd id %s: %s", self
.vnfd_id
, self
._vdus
)
1523 lookup
= {vdu
.vdu_id
: vdu
for vdu
in self
._vdus
}
1525 # Identify any dependencies among the VDUs
1526 dependencies
= collections
.defaultdict(list)
1527 vdu_id_pattern
= re
.compile(r
"\{\{ vdu\[([^]]+)\]\S* \}\}")
1529 for vdu
in self
._vdus
:
1530 if vdu
.vdud_cloud_init
is not None:
1531 for vdu_id
in vdu_id_pattern
.findall(vdu
.vdud_cloud_init
):
1532 if vdu_id
!= vdu
.vdu_id
:
1533 # This means that vdu.vdu_id depends upon vdu_id,
1534 # i.e. vdu_id must be instantiated before
1536 dependencies
[vdu
.vdu_id
].append(lookup
[vdu_id
])
1538 # Define the terminal states of VDU instantiation
1540 VDURecordState
.READY
,
1541 VDURecordState
.TERMINATED
,
1542 VDURecordState
.FAILED
,
1545 datastore
= VdurDatastore()
1549 def instantiate_monitor(vdu
):
1550 """Monitor the state of the VDU during instantiation
1553 vdu - a VirtualDeploymentUnitRecord
1556 # wait for the VDUR to enter a terminal state
1557 while vdu
._state
not in terminal
:
1558 yield from asyncio
.sleep(1, loop
=self
._loop
)
1560 # update the datastore
1561 datastore
.update(vdu
)
1563 # add the VDU to the set of processed VDUs
1564 processed
.add(vdu
.vdu_id
)
1567 def instantiate(vdu
):
1568 """Instantiate the specified VDU
1571 vdu - a VirtualDeploymentUnitRecord
1574 if the VDU, or any of the VDUs this VDU depends upon, are
1575 terminated or fail to instantiate properly, a
1576 VirtualDeploymentUnitRecordError is raised.
1579 for dependency
in dependencies
[vdu
.vdu_id
]:
1580 self
._log
.debug("{}: waiting for {}".format(vdu
.vdu_id
, dependency
.vdu_id
))
1582 while dependency
.vdu_id
not in processed
:
1583 yield from asyncio
.sleep(1, loop
=self
._loop
)
1585 if not dependency
.active
:
1586 raise VirtualDeploymentUnitRecordError()
1588 self
._log
.debug('instantiating {}'.format(vdu
.vdu_id
))
1590 # Populate the datastore with the current values of the VDU
1593 # Substitute any variables contained in the cloud config script
1594 config
= str(vdu
.vdud_cloud_init
) if vdu
.vdud_cloud_init
is not None else ""
1596 parts
= re
.split("\{\{ ([^\}]+) \}\}", config
)
1599 # Extract the variable names
1601 for variable
in parts
[1::2]:
1602 variables
.append(variable
.lstrip('{{').rstrip('}}').strip())
1604 # Iterate of the variables and substitute values from the
1606 for variable
in variables
:
1608 # Handle a reference to a VDU by ID
1609 if variable
.startswith('vdu['):
1610 value
= datastore
.get(variable
)
1612 msg
= "Unable to find a substitute for {} in {} cloud-init script"
1613 raise ValueError(msg
.format(variable
, vdu
.vdu_id
))
1615 config
= config
.replace("{{ %s }}" % variable
, value
)
1618 # Handle a reference to the current VDU
1619 if variable
.startswith('vdu'):
1620 value
= datastore
.get('vdu[{}]'.format(vdu
.vdu_id
) + variable
[3:])
1621 config
= config
.replace("{{ %s }}" % variable
, value
)
1624 # Handle unrecognized variables
1625 msg
= 'unrecognized cloud-config variable: {}'
1626 raise ValueError(msg
.format(variable
))
1628 # Instantiate the VDU
1629 with self
._dts
.transaction() as xact
:
1630 self
._log
.debug("Instantiating vdu: %s", vdu
)
1631 yield from vdu
.instantiate(xact
, vnfr
, config
=config
)
1632 if self
._state
== VirtualNetworkFunctionRecordState
.FAILED
:
1633 self
._log
.error("Instatiation of VNF %s failed while instantiating vdu %s",
1636 # First create a set of tasks to monitor the state of the VDUs and
1637 # report when they have entered a terminal state
1638 for vdu
in self
._vdus
:
1639 self
._loop
.create_task(instantiate_monitor(vdu
))
1641 for vdu
in self
._vdus
:
1642 self
._loop
.create_task(instantiate(vdu
))
1644 def has_mgmt_interface(self
, vdu
):
1645 # ## TODO: Support additional mgmt_interface type options
1646 if self
.vnfd
.mgmt_interface
.vdu_id
== vdu
.id:
1650 def vlr_xpath(self
, vlr_id
):
1653 "D,/vlr:vlr-catalog/"
1654 "vlr:vlr[vlr:id = '{}']".format(vlr_id
))
1656 def ext_vlr_by_id(self
, vlr_id
):
1657 """ find ext vlr by id """
1658 return self
._ext
_vlrs
[vlr_id
]
1661 def publish_inventory(self
, xact
):
1662 """ Publish the inventory associated with this VNF """
1663 self
._log
.debug("Publishing inventory for VNFR id: %s", self
._vnfr
_id
)
1665 for component
in self
._rw
_vnfd
.component
:
1666 self
._log
.debug("Creating inventory component %s", component
)
1667 mangled_name
= VcsComponent
.mangle_name(component
.component_name
,
1671 comp
= VcsComponent(dts
=self
._dts
,
1674 cluster_name
=self
._cluster
_name
,
1675 vcs_handler
=self
._vcs
_handler
,
1676 component
=component
,
1677 mangled_name
=mangled_name
,
1679 if comp
.name
in self
._inventory
:
1680 self
._log
.debug("Duplicate entries in inventory %s for vnfr %s",
1681 component
, self
._vnfd
_id
)
1683 self
._log
.debug("Adding component %s for vnrf %s",
1684 comp
.name
, self
._vnfr
_id
)
1685 self
._inventory
[comp
.name
] = comp
1686 yield from comp
.publish(xact
)
1688 def all_vdus_active(self
):
1689 """ Are all VDUS in this VNFR active? """
1690 for vdu
in self
._vdus
:
1694 self
._log
.debug("Inside all_vdus_active. Returning True")
1698 def instantiation_failed(self
, failed_reason
=None):
1699 """ VNFR instantiation failed """
1700 self
._log
.debug("VNFR %s instantiation failed ", self
.vnfr_id
)
1701 self
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
1702 self
._state
_failed
_reason
= failed_reason
1704 # Update the VNFR with the changed status
1705 yield from self
.publish(None)
1709 """ This VNF is ready"""
1710 self
._log
.debug("VNFR id %s is ready", self
.vnfr_id
)
1712 if self
._state
!= VirtualNetworkFunctionRecordState
.FAILED
:
1713 self
.set_state(VirtualNetworkFunctionRecordState
.READY
)
1716 self
._log
.debug("VNFR id %s ignoring state change", self
.vnfr_id
)
1718 # Update the VNFR with the changed status
1719 yield from self
.publish(None)
1721 def update_cp(self
, cp_name
, ip_address
, mac_addr
, cp_id
):
1722 """Updated the connection point with ip address"""
1723 for cp
in self
._cprs
:
1724 if cp
.name
== cp_name
:
1725 self
._log
.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1726 cp_name
, cp
, ip_address
, cp_id
)
1727 cp
.ip_address
= ip_address
1728 cp
.mac_address
= mac_addr
1729 cp
.connection_point_id
= cp_id
1732 err
= "No connection point %s found in VNFR id %s" % (cp
.name
, self
._vnfr
_id
)
1733 self
._log
.debug(err
)
1734 raise VirtualDeploymentUnitRecordError(err
)
1736 def set_state(self
, state
):
1737 """ Set state for this VNFR"""
1741 def instantiate(self
, xact
, restart_mode
=False):
1742 """ instantiate this VNF """
1743 self
.set_state(VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
)
1744 self
._rw
_vnfd
= yield from self
._vnfm
.fetch_vnfd(self
._vnfd
_id
)
1749 # Iterate over all the connection points in VNFR and fetch the
1752 def cpr_from_cp(cp
):
1753 """ Creates a record level connection point from the desciptor cp"""
1754 cp_fields
= ["name", "image", "vm-flavor"]
1755 cp_copy_dict
= {k
: v
for k
, v
in cp
.as_dict().items() if k
in cp_fields
}
1757 cpr_dict
.update(cp_copy_dict
)
1758 return VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint
.from_dict(cpr_dict
)
1760 self
._log
.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1761 self
._vnfr
_id
, self
._vnfr
.connection_point
)
1763 for cp
in self
._vnfr
.connection_point
:
1764 cpr
= cpr_from_cp(cp
)
1765 self
._cprs
.append(cpr
)
1766 self
._log
.debug("Adding Connection point record %s ", cp
)
1768 vlr_path
= self
.vlr_xpath(cp
.vlr_ref
)
1769 self
._log
.debug("Fetching VLR with path = %s", vlr_path
)
1770 res_iter
= yield from self
._dts
.query_read(self
.vlr_xpath(cp
.vlr_ref
),
1771 rwdts
.XactFlag
.MERGE
)
1775 self
._ext
_vlrs
[cp
.vlr_ref
] = d
1776 cpr
.vlr_ref
= cp
.vlr_ref
1777 self
._log
.debug("Fetched VLR [%s] with path = [%s]", d
, vlr_path
)
1779 # Increase the VNFD reference count
1784 # Fetch External VLRs
1785 self
._log
.debug("VNFR-ID %s: Fetching vlrs", self
._vnfr
_id
)
1786 yield from fetch_vlrs()
1789 self
._log
.debug("VNFR-ID %s: Publishing Inventory", self
._vnfr
_id
)
1790 yield from self
.publish_inventory(xact
)
1793 self
._log
.debug("VNFR-ID %s: Creating VLs", self
._vnfr
_id
)
1794 yield from self
.create_vls()
1797 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1798 yield from self
.publish(xact
)
1801 self
._log
.debug("VNFR-ID %s: Instantiate VLs", self
._vnfr
_id
)
1803 yield from self
.instantiate_vls(xact
, restart_mode
)
1804 except Exception as e
:
1805 self
._log
.exception("VL instantiation failed (%s)", str(e
))
1806 yield from self
.instantiation_failed(str(e
))
1809 self
.set_state(VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
)
1812 self
._log
.debug("VNFR-ID %s: Create VDUs", self
._vnfr
_id
)
1813 yield from self
.create_vdus(self
, restart_mode
)
1816 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1817 yield from self
.publish(xact
)
1820 # ToDo: Check if this should be prevented during restart
1821 self
._log
.debug("VNFR-ID %s: Instantiate VDUs", self
._vnfr
_id
)
1822 _
= self
._loop
.create_task(self
.instantiate_vdus(xact
, self
))
1825 self
._log
.debug("VNFR-ID %s: Publish VNFR", self
._vnfr
_id
)
1826 yield from self
.publish(xact
)
1828 self
._log
.debug("VNFR-ID %s: Instantiation Done", self
._vnfr
_id
)
1830 # create task updating uptime for this vnfr
1831 self
._log
.debug("VNFR-ID %s: Starting task to update uptime", self
._vnfr
_id
)
1832 self
._loop
.create_task(self
.vnfr_uptime_update(xact
))
1835 def terminate(self
, xact
):
1836 """ Terminate this virtual network function """
1838 self
._log
.debug("Terminatng VNF id %s", self
.vnfr_id
)
1840 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATE
)
1843 if self
._vnf
_mon
is not None:
1844 self
._vnf
_mon
.stop()
1845 self
._vnf
_mon
.deregister()
1846 self
._vnf
_mon
= None
1849 def terminate_vls():
1850 """ Terminate VLs in this VNF """
1851 for vl
in self
._vlrs
:
1852 yield from vl
.terminate(xact
)
1855 def terminate_vdus():
1856 """ Terminate VDUS in this VNF """
1857 for vdu
in self
._vdus
:
1858 yield from vdu
.terminate(xact
)
1860 self
._log
.debug("Terminatng VLs in VNF id %s", self
.vnfr_id
)
1861 self
.set_state(VirtualNetworkFunctionRecordState
.VL_TERMINATE_PHASE
)
1862 yield from terminate_vls()
1864 self
._log
.debug("Terminatng VDUs in VNF id %s", self
.vnfr_id
)
1865 self
.set_state(VirtualNetworkFunctionRecordState
.VDU_TERMINATE_PHASE
)
1866 yield from terminate_vdus()
1868 self
._log
.debug("Terminated VNF id %s", self
.vnfr_id
)
1869 self
.set_state(VirtualNetworkFunctionRecordState
.TERMINATED
)
1872 def vnfr_uptime_update(self
, xact
):
1874 # Return when vnfr state is FAILED or TERMINATED etc
1875 if self
._state
not in [VirtualNetworkFunctionRecordState
.INIT
,
1876 VirtualNetworkFunctionRecordState
.VL_INIT_PHASE
,
1877 VirtualNetworkFunctionRecordState
.VM_INIT_PHASE
,
1878 VirtualNetworkFunctionRecordState
.READY
]:
1880 yield from self
.publish(xact
)
1881 yield from asyncio
.sleep(2, loop
=self
._loop
)
1885 class VnfdDtsHandler(object):
1886 """ DTS handler for VNFD config changes """
1887 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1889 def __init__(self
, dts
, log
, loop
, vnfm
):
1898 """ DTS registration handle """
1903 """ Register for VNFD configuration"""
1905 def on_apply(dts
, acg
, xact
, action
, scratch
):
1906 """Apply the configuration"""
1907 self
._log
.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1908 xact
, action
, scratch
)
1910 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
1913 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
1914 """ on prepare callback """
1915 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1916 ks_path
.to_xpath(RwVnfmYang
.get_schema()), msg
)
1917 fref
= ProtobufC
.FieldReference
.alloc()
1918 fref
.goto_whole_message(msg
.to_pbcm())
1920 # Handle deletes in prepare_callback
1921 if fref
.is_field_deleted():
1922 # Delete an VNFD record
1923 self
._log
.debug("Deleting VNFD with id %s", msg
.id)
1924 if self
._vnfm
.vnfd_in_use(msg
.id):
1925 self
._log
.debug("Cannot delete VNFD in use - %s", msg
)
1926 err
= "Cannot delete a VNFD in use - %s" % msg
1927 raise VirtualNetworkFunctionDescriptorRefCountExists(err
)
1928 # Delete a VNFD record
1929 yield from self
._vnfm
.delete_vnfd(msg
.id)
1931 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1934 "Registering for VNFD config using xpath: %s",
1935 VnfdDtsHandler
.XPATH
,
1937 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
1938 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
1939 self
._regh
= acg
.register(
1940 xpath
=VnfdDtsHandler
.XPATH
,
1941 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1942 on_prepare
=on_prepare
)
1945 class VcsComponentDtsHandler(object):
1946 """ Vcs Component DTS handler """
1947 XPATH
= ("D,/rw-manifest:manifest" +
1948 "/rw-manifest:operational-inventory" +
1949 "/rw-manifest:component")
1951 def __init__(self
, dts
, log
, loop
, vnfm
):
1960 """ DTS registration handle """
1965 """ Registers VCS component dts publisher registration"""
1966 self
._log
.debug("VCS Comp publisher DTS handler registering path %s",
1967 VcsComponentDtsHandler
.XPATH
)
1969 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1970 handlers
= rift
.tasklets
.Group
.Handler()
1971 with self
._dts
.group_create(handler
=handlers
) as group
:
1972 self
._regh
= group
.register(xpath
=VcsComponentDtsHandler
.XPATH
,
1974 flags
=(rwdts
.Flag
.PUBLISHER |
1975 rwdts
.Flag
.NO_PREP_READ |
1976 rwdts
.Flag
.DATASTORE
),)
1979 def publish(self
, xact
, path
, msg
):
1980 """ Publishes the VCS component """
1981 self
._log
.debug("Publishing the VcsComponent xact = %s, %s:%s",
1983 self
.regh
.create_element(path
, msg
)
1984 self
._log
.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1985 VcsComponentDtsHandler
.XPATH
, xact
, path
, msg
)
1987 class VnfrConsoleOperdataDtsHandler(object):
1988 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1990 def vnfr_vdu_console_xpath(self
):
1991 """ path for resource-mgr"""
1992 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self
._vnfr
_id
,self
._vdur
_id
))
1994 def __init__(self
, dts
, log
, loop
, vnfm
, vnfr_id
, vdur_id
, vdu_id
):
2001 self
._vnfr
_id
= vnfr_id
2002 self
._vdur
_id
= vdur_id
2003 self
._vdu
_id
= vdu_id
2007 """ Register for VNFR VDU Operational Data read from dts """
2010 def on_prepare(xact_info
, action
, ks_path
, msg
):
2011 """ prepare callback from dts """
2012 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2014 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2015 xact_info
, action
, xpath
, msg
2018 if action
== rwdts
.QueryAction
.READ
:
2019 schema
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur
.schema()
2020 path_entry
= schema
.keyspec_to_entry(ks_path
)
2021 self
._log
.debug("VDU Opdata path is {}".format(path_entry
))
2023 vnfr
= self
._vnfm
.get_vnfr(self
._vnfr
_id
)
2024 except VnfRecordError
as e
:
2025 self
._log
.error("VNFR id %s not found", self
._vnfr
_id
)
2026 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2029 vdur
= vnfr
._get
_vdur
_from
_vdu
_id
(self
._vdu
_id
)
2030 if not vdur
._state
== VDURecordState
.READY
:
2031 self
._log
.debug("VDUR state is not READY. current state is {}".format(vdur
._state
))
2032 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2034 with self
._dts
.transaction() as new_xact
:
2035 resp
= yield from vdur
.read_resource(new_xact
)
2036 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2037 vdur_console
.id = self
._vdur
_id
2038 if resp
.console_url
:
2039 vdur_console
.console_url
= resp
.console_url
2041 vdur_console
.console_url
= 'none'
2042 self
._log
.debug("Recevied console URL for vdu {} is {}".format(self
._vdu
_id
,vdur_console
))
2044 self
._log
.exception("Caught exception while reading VDU %s", self
._vdu
_id
)
2045 vdur_console
= RwVnfrYang
.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2046 vdur_console
.id = self
._vdur
_id
2047 vdur_console
.console_url
= 'none'
2049 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
2050 xpath
=self
.vnfr_vdu_console_xpath
,
2053 #raise VnfRecordError("Not supported operation %s" % action)
2054 self
._log
.error("Not supported operation %s" % action
)
2055 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
)
2059 self
._log
.debug("Registering for VNFR VDU using xpath: %s",
2060 self
.vnfr_vdu_console_xpath
)
2061 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2062 with self
._dts
.group_create() as group
:
2063 self
._regh
= group
.register(xpath
=self
.vnfr_vdu_console_xpath
,
2065 flags
=rwdts
.Flag
.PUBLISHER
,
2069 class VnfrDtsHandler(object):
2070 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2071 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2073 def __init__(self
, dts
, log
, loop
, vnfm
):
2083 """ Return registration handle"""
2088 """ Return VNF manager instance """
2093 """ Register for vnfr create/update/delete/read requests from dts """
2094 def on_commit(xact_info
):
2095 """ The transaction has been committed """
2096 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
2097 return rwdts
.MemberRspCode
.ACTION_OK
2099 def on_abort(*args
):
2100 """ Abort callback """
2101 self
._log
.debug("VNF transaction got aborted")
2104 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
2107 def instantiate_realloc_vnfr(vnfr
):
2108 """Re-populate the vnfm after restart
2115 yield from vnfr
.instantiate(None, restart_mode
=True)
2117 if xact_event
== rwdts
.MemberEvent
.INSTALL
:
2118 curr_cfg
= self
.regh
.elements
2119 for cfg
in curr_cfg
:
2120 vnfr
= self
.vnfm
.create_vnfr(cfg
)
2121 self
._loop
.create_task(instantiate_realloc_vnfr(vnfr
))
2123 self
._log
.debug("Got on_event in vnfm")
2125 return rwdts
.MemberRspCode
.ACTION_OK
2128 def on_prepare(xact_info
, action
, ks_path
, msg
):
2129 """ prepare callback from dts """
2131 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2132 xact_info
, action
, msg
2135 if action
== rwdts
.QueryAction
.CREATE
:
2136 if not msg
.has_field("vnfd"):
2137 err
= "Vnfd not provided"
2138 self
._log
.error(err
)
2139 raise VnfRecordError(err
)
2141 vnfr
= self
.vnfm
.create_vnfr(msg
)
2143 # RIFT-9105: Unable to add a READ query under an existing transaction
2144 # xact = xact_info.xact
2145 yield from vnfr
.instantiate(None)
2146 except Exception as e
:
2147 self
._log
.exception(e
)
2148 self
._log
.error("Error while instantiating vnfr:%s", vnfr
.vnfr_id
)
2149 vnfr
.set_state(VirtualNetworkFunctionRecordState
.FAILED
)
2150 yield from vnfr
.publish(None)
2151 elif action
== rwdts
.QueryAction
.DELETE
:
2152 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2153 path_entry
= schema
.keyspec_to_entry(ks_path
)
2154 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2157 self
._log
.debug("VNFR id %s not found for delete", path_entry
.key00
.id)
2158 raise VirtualNetworkFunctionRecordNotFound(
2159 "VNFR id %s", path_entry
.key00
.id)
2162 yield from vnfr
.terminate(xact_info
.xact
)
2165 yield from self
._vnfm
.delete_vnfr(xact_info
.xact
, vnfr
)
2166 except Exception as e
:
2167 self
._log
.exception(e
)
2168 self
._log
.error("Caught exception while deleting vnfr %s", path_entry
.key00
.id)
2170 elif action
== rwdts
.QueryAction
.UPDATE
:
2171 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
2172 path_entry
= schema
.keyspec_to_entry(ks_path
)
2175 vnfr
= self
._vnfm
.get_vnfr(path_entry
.key00
.id)
2176 except Exception as e
:
2177 self
._log
.debug("No vnfr found with id %s", path_entry
.key00
.id)
2178 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2182 self
._log
.debug("VNFR id %s not found for update", path_entry
.key00
.id)
2183 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
2186 self
._log
.debug("VNFR {} update config status {} (current {})".
2187 format(vnfr
.name
, msg
.config_status
, vnfr
.config_status
))
2188 # Update the config status and publish
2189 vnfr
._config
_status
= msg
.config_status
2190 yield from vnfr
.publish(None)
2193 raise NotImplementedError(
2194 "%s action on VirtualNetworkFunctionRecord not supported",
2197 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2199 self
._log
.debug("Registering for VNFR using xpath: %s",
2200 VnfrDtsHandler
.XPATH
,)
2202 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
2203 on_prepare
=on_prepare
,)
2204 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
2205 with self
._dts
.group_create(handler
=handlers
) as group
:
2206 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
2208 flags
=(rwdts
.Flag
.PUBLISHER |
2209 rwdts
.Flag
.NO_PREP_READ |
2211 rwdts
.Flag
.DATASTORE
),)
2214 def create(self
, xact
, path
, msg
):
2216 Create a VNFR record in DTS with path and message
2218 self
._log
.debug("Creating VNFR xact = %s, %s:%s",
2221 self
.regh
.create_element(path
, msg
)
2222 self
._log
.debug("Created VNFR xact = %s, %s:%s",
2226 def update(self
, xact
, path
, msg
):
2228 Update a VNFR record in DTS with path and message
2230 self
._log
.debug("Updating VNFR xact = %s, %s:%s",
2232 self
.regh
.update_element(path
, msg
)
2233 self
._log
.debug("Updated VNFR xact = %s, %s:%s",
2237 def delete(self
, xact
, path
):
2239 Delete a VNFR record in DTS with path and message
2241 self
._log
.debug("Deleting VNFR xact = %s, %s", xact
, path
)
2242 self
.regh
.delete_element(path
)
2243 self
._log
.debug("Deleted VNFR xact = %s, %s", xact
, path
)
2246 class VnfdRefCountDtsHandler(object):
2247 """ The VNFD Ref Count DTS handler """
2248 XPATH
= "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2250 def __init__(self
, dts
, log
, loop
, vnfm
):
2260 """ Return registration handle """
2265 """ Return the NS manager instance """
2270 """ Register for VNFD ref count read from dts """
2273 def on_prepare(xact_info
, action
, ks_path
, msg
):
2274 """ prepare callback from dts """
2275 xpath
= ks_path
.to_xpath(RwVnfrYang
.get_schema())
2277 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2278 xact_info
, action
, xpath
, msg
2281 if action
== rwdts
.QueryAction
.READ
:
2282 schema
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_VnfdRefCount
.schema()
2283 path_entry
= schema
.keyspec_to_entry(ks_path
)
2284 vnfd_list
= yield from self
._vnfm
.get_vnfd_refcount(path_entry
.key00
.vnfd_id_ref
)
2285 for xpath
, msg
in vnfd_list
:
2286 self
._log
.debug("Responding to ref count query path:%s, msg:%s",
2288 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
2291 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2293 raise VnfRecordError("Not supported operation %s" % action
)
2295 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
2296 with self
._dts
.group_create() as group
:
2297 self
._regh
= group
.register(xpath
=VnfdRefCountDtsHandler
.XPATH
,
2299 flags
=rwdts
.Flag
.PUBLISHER
,
2303 class VdurDatastore(object):
2305 This VdurDatastore is intended to expose select information about a VDUR
2306 such that it can be referenced in a cloud config file. The data that is
2307 exposed does not necessarily follow the structure of the data in the yang
2308 model. This is intentional. The data that are exposed are intended to be
2309 agnostic of the yang model so that changes in the model do not necessarily
2310 require changes to the interface provided to the user. It also means that
2311 the user does not need to be familiar with the RIFT.ware yang models.
2315 """Create an instance of VdurDatastore"""
2316 self
._vdur
_data
= dict()
2317 self
._pattern
= re
.compile("vdu\[([^]]+)\]\.(.+)")
2319 def add(self
, vdur
):
2320 """Add a new VDUR to the datastore
2323 vdur - a VirtualDeploymentUnitRecord instance
2326 A ValueError is raised if the VDUR is (1) None or (2) already in
2330 if vdur
.vdu_id
is None:
2331 raise ValueError('VDURs are required to have an ID')
2333 if vdur
.vdu_id
in self
._vdur
_data
:
2334 raise ValueError('cannot add a VDUR more than once')
2336 self
._vdur
_data
[vdur
.vdu_id
] = dict()
2338 def set_if_not_none(key
, attr
):
2339 if attr
is not None:
2340 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2342 set_if_not_none('name', vdur
._vdud
.name
)
2343 set_if_not_none('mgmt.ip', vdur
.vm_management_ip
)
2345 def update(self
, vdur
):
2346 """Update the VDUR information in the datastore
2349 vdur - a GI representation of a VDUR
2352 A ValueError is raised if the VDUR is (1) None or (2) already in
2356 if vdur
.vdu_id
is None:
2357 raise ValueError('VNFDs are required to have an ID')
2359 if vdur
.vdu_id
not in self
._vdur
_data
:
2360 raise ValueError('VNF is not recognized')
2362 def set_or_delete(key
, attr
):
2364 if key
in self
._vdur
_data
[vdur
.vdu_id
]:
2365 del self
._vdur
_data
[vdur
.vdu_id
][key
]
2368 self
._vdur
_data
[vdur
.vdu_id
][key
] = attr
2370 set_or_delete('name', vdur
._vdud
.name
)
2371 set_or_delete('mgmt.ip', vdur
.vm_management_ip
)
2373 def remove(self
, vdur_id
):
2374 """Remove all of the data associated with specified VDUR
2377 vdur_id - the identifier of a VNFD in the datastore
2380 A ValueError is raised if the VDUR is not contained in the
2384 if vdur_id
not in self
._vdur
_data
:
2385 raise ValueError('VNF is not recognized')
2387 del self
._vdur
_data
[vdur_id
]
2389 def get(self
, expr
):
2390 """Retrieve VDUR information from the datastore
2392 An expression should be of the form,
2396 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2397 the exposed attribute that the user wishes to retrieve.
2399 If the requested data is not available, None is returned.
2402 expr - a string that specifies the data to return
2405 A ValueError is raised if the provided expression cannot be parsed.
2408 The requested data or None
2411 result
= self
._pattern
.match(expr
)
2413 raise ValueError('data expression not recognized ({})'.format(expr
))
2415 vdur_id
, key
= result
.groups()
2417 if vdur_id
not in self
._vdur
_data
:
2420 return self
._vdur
_data
[vdur_id
].get(key
, None)
2423 class VnfManager(object):
2424 """ The virtual network function manager class """
2425 def __init__(self
, dts
, log
, loop
, cluster_name
):
2429 self
._cluster
_name
= cluster_name
2431 self
._vcs
_handler
= VcsComponentDtsHandler(dts
, log
, loop
, self
)
2432 self
._vnfr
_handler
= VnfrDtsHandler(dts
, log
, loop
, self
)
2433 self
._vnfr
_ref
_handler
= VnfdRefCountDtsHandler(dts
, log
, loop
, self
)
2434 self
._nsr
_handler
= mano_dts
.NsInstanceConfigSubscriber(log
, dts
, loop
, callback
=self
.handle_nsr
)
2436 self
._dts
_handlers
= [VnfdDtsHandler(dts
, log
, loop
, self
),
2439 self
._vnfr
_ref
_handler
,
2442 self
._vnfds
_to
_vnfr
= {}
2446 def vnfr_handler(self
):
2447 """ VNFR dts handler """
2448 return self
._vnfr
_handler
2451 def vcs_handler(self
):
2452 """ VCS dts handler """
2453 return self
._vcs
_handler
2457 """ Register all static DTS handlers """
2458 for hdl
in self
._dts
_handlers
:
2459 yield from hdl
.register()
2463 """ Run this VNFM instance """
2464 self
._log
.debug("Run VNFManager - registering static DTS handlers""")
2465 yield from self.register()
2467 def handle_nsr(self, nsr, action):
2468 if action in [rwdts.QueryAction.CREATE]:
2469 self._nsrs[nsr.id] = nsr
2470 elif action == rwdts.QueryAction.DELETE:
2471 if nsr.id in self._nsrs:
2472 del self._nsrs[nsr.id]
2474 def get_linked_mgmt_network(self, vnfr):
2475 """For the given VNFR get the related mgmt network from the NSD, if
2478 vnfd_id = vnfr.vnfd.id
2479 nsr_id = vnfr.nsr_id_ref
2481 # for the given related VNFR, get the corresponding NSR-config
2484 nsr_obj = self._nsrs[nsr_id]
2486 raise("Unable to find the NS with the ID
: {}".format(nsr_id))
2488 # for the related NSD check if a VLD exists such that it's a mgmt
2490 for vld in nsr_obj.nsd.vld:
2491 if vld.mgmt_network:
2496 def get_vnfr(self, vnfr_id):
2497 """ get VNFR by vnfr id """
2499 if vnfr_id not in self._vnfrs:
2500 raise VnfRecordError("VNFR
id %s not found
", vnfr_id)
2502 return self._vnfrs[vnfr_id]
2504 def create_vnfr(self, vnfr):
2505 """ Create a VNFR instance """
2506 if vnfr.id in self._vnfrs:
2507 msg = "Vnfr
id %s already exists
" % vnfr.id
2508 self._log.error(msg)
2509 raise VnfRecordError(msg)
2511 self._log.info("Create VirtualNetworkFunctionRecord
%s from vnfd_id
: %s",
2515 mgmt_network = self.get_linked_mgmt_network(vnfr)
2517 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2518 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2519 mgmt_network=mgmt_network
2523 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2524 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2526 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2528 return self._vnfrs[vnfr.id]
2531 def delete_vnfr(self, xact, vnfr):
2532 """ Create a VNFR instance """
2533 if vnfr.vnfr_id in self._vnfrs:
2534 self._log.debug("Deleting VNFR
id %s", vnfr.vnfr_id)
2535 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2537 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2538 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2539 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2541 del self._vnfrs[vnfr.vnfr_id]
2544 def fetch_vnfd(self, vnfd_id):
2545 """ Fetch VNFDs based with the vnfd id"""
2546 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2547 self._log.debug("Fetch vnfd with path
%s", vnfd_path)
2550 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2552 for ent in res_iter:
2553 res = yield from ent
2557 err = "Failed to get Vnfd
%s" % vnfd_id
2558 self._log.error(err)
2559 raise VnfRecordError(err)
2561 self._log.debug("Fetched vnfd
for path
%s, vnfd
- %s", vnfd_path, vnfd)
2565 def vnfd_in_use(self, vnfd_id):
2566 """ Is this VNFD in use """
2567 self._log.debug("Is this VNFD
in use
- msg
:%s", vnfd_id)
2568 if vnfd_id in self._vnfds_to_vnfr:
2569 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2573 def publish_vnfr(self, xact, path, msg):
2574 """ Publish a VNFR """
2575 self._log.debug("publish_vnfr called with path
%s, msg
%s",
2577 yield from self.vnfr_handler.update(xact, path, msg)
2580 def delete_vnfd(self, vnfd_id):
2581 """ Delete the Virtual Network Function descriptor with the passed id """
2582 self._log.debug("Deleting the virtual network function descriptor
- %s", vnfd_id)
2583 if vnfd_id in self._vnfds_to_vnfr:
2584 if self._vnfds_to_vnfr[vnfd_id]:
2585 self._log.debug("Cannot delete VNFD
id %s reference exists
%s",
2587 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2588 raise VirtualNetworkFunctionDescriptorRefCountExists(
2589 "Cannot delete
:%s, ref_count
:%s",
2591 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2593 del self._vnfds_to_vnfr[vnfd_id]
2595 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2597 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2598 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2599 if os.path.exists(vnfd_dir):
2600 shutil.rmtree(vnfd_dir, ignore_errors=True)
2601 except Exception as e:
2602 self._log.error("Exception in cleaning up VNFD
{}: {}".
2603 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2604 self._log.exception(e)
2607 def vnfd_refcount_xpath(self, vnfd_id):
2608 """ xpath for ref count entry """
2609 return (VnfdRefCountDtsHandler.XPATH +
2610 "[rw
-vnfr
:vnfd
-id-ref
= '{}']").format(vnfd_id)
2613 def get_vnfd_refcount(self, vnfd_id):
2614 """ Get the vnfd_list from this VNFM"""
2616 if vnfd_id is None or vnfd_id == "":
2617 for vnfd in self._vnfds_to_vnfr.keys():
2618 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2619 vnfd_msg.vnfd_id_ref = vnfd
2620 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2621 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2622 elif vnfd_id in self._vnfds_to_vnfr:
2623 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2624 vnfd_msg.vnfd_id_ref = vnfd_id
2625 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2626 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2631 class VnfmTasklet(rift.tasklets.Tasklet):
2632 """ VNF Manager tasklet class """
2633 def __init__(self, *args, **kwargs):
2634 super(VnfmTasklet, self).__init__(*args, **kwargs)
2635 self.rwlog.set_category("rw
-mano
-log
")
2636 self.rwlog.set_subcategory("vnfm
")
2643 super(VnfmTasklet, self).start()
2644 self.log.info("Starting VnfmTasklet
")
2646 self.log.setLevel(logging.DEBUG)
2648 self.log.debug("Registering with dts
")
2649 self._dts = rift.tasklets.DTS(self.tasklet_info,
2650 RwVnfmYang.get_schema(),
2652 self.on_dts_state_change)
2654 self.log.debug("Created DTS Api GI Object
: %s", self._dts)
2656 print("Caught
Exception in VNFM start
:", sys.exc_info()[0])
2659 def on_instance_started(self):
2660 """ Task insance started callback """
2661 self.log.debug("Got instance started callback
")
2667 print("Caught
Exception in VNFM stop
:", sys.exc_info()[0])
2672 """ Task init callback """
2674 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2675 assert vm_parent_name is not None
2676 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2677 yield from self._vnfm.run()
2679 print("Caught
Exception in VNFM init
:", sys.exc_info()[0])
2684 """ Task run callback """
2688 def on_dts_state_change(self, state):
2689 """Take action according to current dts state to transition
2690 application into the corresponding application state
2693 state - current dts state
2696 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2697 rwdts.State.CONFIG: rwdts.State.RUN,
2701 rwdts.State.INIT: self.init,
2702 rwdts.State.RUN: self.run,
2705 # Transition application to next state
2706 handler = handlers.get(state, None)
2707 if handler is not None:
2708 yield from handler()
2710 # Transition dts to next state
2711 next_state = switch.get(state, None)
2712 if next_state is not None:
2713 self._dts.handle.set_state(next_state)