Bug 149
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 vdurvol_data[0]['custom_meta_data'] = metadata_list
429
430 if self._vm_resp.has_field('supplemental_boot_data'):
431 vdur_dict['supplemental_boot_data'] = dict()
432 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
433 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
434 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
435 metadata_list = list()
436 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
437 metadata_list.append(metadata_item.as_dict())
438 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
440 file_list = list()
441 for file_item in self._vm_resp.supplemental_boot_data.config_file:
442 file_list.append(file_item.as_dict())
443 vdur_dict['supplemental_boot_data']['config_file'] = file_list
444
445 icp_list = []
446 ii_list = []
447
448 for intf, cp_id, vlr in self._int_intf:
449 cp = self.find_internal_cp_by_cp_id(cp_id)
450
451 icp_list.append({"name": cp.name,
452 "id": cp.id,
453 "type_yang": "VPORT",
454 "ip_address": self.cp_ip_addr(cp.id),
455 "mac_address": self.cp_mac_addr(cp.id)})
456
457 ii_list.append({"name": intf.name,
458 "vdur_internal_connection_point_ref": cp.id,
459 "virtual_interface": {}})
460
461 vdur_dict["internal_connection_point"] = icp_list
462 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
463 vdur_dict["internal_interface"] = ii_list
464
465 ei_list = []
466 for intf, cp, vlr in self._ext_intf:
467 ei_list.append({"name": cp.name,
468 "vnfd_connection_point_ref": cp.name,
469 "virtual_interface": {}})
470 self._vnfr.update_cp(cp.name,
471 self.cp_ip_addr(cp.name),
472 self.cp_mac_addr(cp.name),
473 self.cp_id(cp.name))
474
475 vdur_dict["external_interface"] = ei_list
476
477 placement_groups = []
478 for group in self._placement_groups:
479 placement_groups.append(group.as_dict())
480 vdur_dict['placement_groups_info'] = placement_groups
481
482 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
483
484 @property
485 def resmgr_path(self):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
488 "/vdu-event" +
489 "/vdu-event-data[event-id='{}']".format(self._request_id))
490
491 @property
492 def vm_flavor_msg(self):
493 """ VM flavor message """
494 flavor = self._vdud.vm_flavor.__class__()
495 flavor.copy_from(self._vdud.vm_flavor)
496
497 return flavor
498
499 @property
500 def vdud_cloud_init(self):
501 """ Return the cloud-init contents for the VDU """
502 if self._vdud_cloud_init is None:
503 self._vdud_cloud_init = self.cloud_init()
504
505 return self._vdud_cloud_init
506
507 def cloud_init(self):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
510 """
511 if self._vdud.cloud_init is not None:
512 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
513 return self._vdud.cloud_init
514 elif self._vdud.cloud_init_file is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
517 filename = self._vdud.cloud_init_file
518 self._vnfd_package_store.refresh()
519 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
520 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
521 try:
522 return cloud_init_extractor.read_script(stored_package, filename)
523 except rift.package.cloud_init.CloudInitExtractionError as e:
524 self.instantiation_failed(str(e))
525 raise VirtualDeploymentUnitRecordError(e)
526 else:
527 self._log.debug("VDU Instantiation: cloud-init script not provided")
528
529 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
530 host_aggregates = []
531 availability_zones = []
532 server_groups = []
533 for group in self._placement_groups:
534 if group.has_field('host_aggregate'):
535 for aggregate in group.host_aggregate:
536 host_aggregates.append(aggregate.as_dict())
537 if group.has_field('availability_zone'):
538 availability_zones.append(group.availability_zone.as_dict())
539 if group.has_field('server_group'):
540 server_groups.append(group.server_group.as_dict())
541
542 if availability_zones:
543 if len(availability_zones) > 1:
544 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
545 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
546 else:
547 vm_create_msg_dict['availability_zone'] = availability_zones[0]
548
549 if server_groups:
550 if len(server_groups) > 1:
551 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
552 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
553 else:
554 vm_create_msg_dict['server_group'] = server_groups[0]
555
556 if host_aggregates:
557 vm_create_msg_dict['host_aggregate'] = host_aggregates
558
559 return
560
561 def process_placement_groups(self, vm_create_msg_dict):
562 """Process the placement_groups and fill resource-mgr request"""
563 if not self._placement_groups:
564 return
565
566 cloud_set = set([group.cloud_type for group in self._placement_groups])
567 assert len(cloud_set) == 1
568 cloud_type = cloud_set.pop()
569
570 if cloud_type == 'openstack':
571 self.process_openstack_placement_group_construct(vm_create_msg_dict)
572
573 else:
574 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
575 return
576
577 def process_custom_bootdata(self, vm_create_msg_dict):
578 """Process the custom boot data"""
579 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
580 return
581
582 self._vnfd_package_store.refresh()
583 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
584 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
585 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
586 if 'source' not in file_item or 'dest' not in file_item:
587 continue
588 source = file_item['source']
589 # Find source file in scripts dir of VNFD
590 self._log.debug("Checking for source config file at %s", source)
591 try:
592 source_file_str = cloud_init_extractor.read_script(stored_package, source)
593 except rift.package.cloud_init.CloudInitExtractionError as e:
594 raise VirtualDeploymentUnitRecordError(e)
595 # Update source file location with file contents
596 file_item['source'] = source_file_str
597
598 return
599
600 def resmgr_msg(self, config=None):
601 vdu_fields = ["vm_flavor",
602 "guest_epa",
603 "vswitch_epa",
604 "hypervisor_epa",
605 "host_epa",
606 "volumes",
607 "supplemental_boot_data"]
608
609 self._log.debug("Creating params based on VDUD: %s", self._vdud)
610 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
611
612 vm_create_msg_dict = {
613 "name": self.name,
614 }
615
616 if self.image_name is not None:
617 vm_create_msg_dict["image_name"] = self.image_name
618
619 if self.image_checksum is not None:
620 vm_create_msg_dict["image_checksum"] = self.image_checksum
621
622 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
623 if self._vdud.has_field('mgmt_vpci'):
624 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
625
626 self._log.debug("VDUD: %s", self._vdud)
627 if config is not None:
628 vm_create_msg_dict['vdu_init'] = {'userdata': config}
629
630 if self._mgmt_network:
631 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
632
633 cp_list = []
634 for intf, cp, vlr in self._ext_intf:
635 cp_info = {"name": cp.name,
636 "virtual_link_id": vlr.network_id,
637 "type_yang": intf.virtual_interface.type_yang,
638 "port_security_enabled": cp.port_security_enabled}
639
640 if (intf.virtual_interface.has_field('vpci') and
641 intf.virtual_interface.vpci is not None):
642 cp_info["vpci"] = intf.virtual_interface.vpci
643
644 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
645 cp_info['security_group'] = vlr.ip_profile_params.security_group
646
647 cp_list.append(cp_info)
648
649 for intf, cp, vlr in self._int_intf:
650 if (intf.virtual_interface.has_field('vpci') and
651 intf.virtual_interface.vpci is not None):
652 cp_list.append({"name": cp,
653 "virtual_link_id": vlr.network_id,
654 "type_yang": intf.virtual_interface.type_yang,
655 "vpci": intf.virtual_interface.vpci})
656 else:
657 cp_list.append({"name": cp,
658 "virtual_link_id": vlr.network_id,
659 "type_yang": intf.virtual_interface.type_yang,
660 "port_security_enabled": cp.port_security_enabled})
661
662 vm_create_msg_dict["connection_points"] = cp_list
663 vm_create_msg_dict.update(vdu_copy_dict)
664
665 self.process_placement_groups(vm_create_msg_dict)
666 if 'supplemental_boot_data' in vm_create_msg_dict:
667 self.process_custom_bootdata(vm_create_msg_dict)
668
669 msg = RwResourceMgrYang.VDUEventData()
670 msg.event_id = self._request_id
671 msg.cloud_account = self.cloud_account_name
672 msg.request_info.from_dict(vm_create_msg_dict)
673
674 return msg
675
676 @asyncio.coroutine
677 def terminate(self, xact):
678 """ Delete resource in VIM """
679 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
680 self._log.warning("VDU terminate in not ready state - Ignoring request")
681 return
682
683 self._state = VDURecordState.TERMINATING
684 if self._vm_resp is not None:
685 try:
686 with self._dts.transaction() as new_xact:
687 yield from self.delete_resource(new_xact)
688 except Exception:
689 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
690
691 if self._rm_regh is not None:
692 self._log.debug("Deregistering resource manager registration handle")
693 self._rm_regh.deregister()
694 self._rm_regh = None
695
696 if self._vdur_console_handler is not None:
697 self._log.error("Deregistering vnfr vdur registration handle")
698 self._vdur_console_handler._regh.deregister()
699 self._vdur_console_handler._regh = None
700
701 self._state = VDURecordState.TERMINATED
702
703 def find_internal_cp_by_cp_id(self, cp_id):
704 """ Find the CP corresponding to the connection point id"""
705 cp = None
706
707 self._log.debug("find_internal_cp_by_cp_id(%s) called",
708 cp_id)
709
710 for int_cp in self._vdud.internal_connection_point:
711 self._log.debug("Checking for int cp %s in internal connection points",
712 int_cp.id)
713 if int_cp.id == cp_id:
714 cp = int_cp
715 break
716
717 if cp is None:
718 self._log.debug("Failed to find cp %s in internal connection points",
719 cp_id)
720 msg = "Failed to find cp %s in internal connection points" % cp_id
721 raise VduRecordError(msg)
722
723 # return the VLR associated with the connection point
724 return cp
725
726 @asyncio.coroutine
727 def create_resource(self, xact, vnfr, config=None):
728 """ Request resource from ResourceMgr """
729 def find_cp_by_name(cp_name):
730 """ Find a connection point by name """
731 cp = None
732 self._log.debug("find_cp_by_name(%s) called", cp_name)
733 for ext_cp in vnfr._cprs:
734 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
735 if ext_cp.name == cp_name:
736 cp = ext_cp
737 break
738 if cp is None:
739 self._log.debug("Failed to find cp %s in external connection points",
740 cp_name)
741 return cp
742
743 def find_internal_vlr_by_cp_name(cp_name):
744 """ Find the VLR corresponding to the connection point name"""
745 cp = None
746
747 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
748 cp_name)
749
750 for int_cp in self._vdud.internal_connection_point:
751 self._log.debug("Checking for int cp %s in internal connection points",
752 int_cp.id)
753 if int_cp.id == cp_name:
754 cp = int_cp
755 break
756
757 if cp is None:
758 self._log.debug("Failed to find cp %s in internal connection points",
759 cp_name)
760 msg = "Failed to find cp %s in internal connection points" % cp_name
761 raise VduRecordError(msg)
762
763 # return the VLR associated with the connection point
764 return vnfr.find_vlr_by_cp(cp_name)
765
766 block = xact.block_create()
767
768 self._log.debug("Executing vm request id: %s, action: create",
769 self._request_id)
770
771 # Resolve the networks associated external interfaces
772 for ext_intf in self._vdud.external_interface:
773 self._log.debug("Resolving external interface name [%s], cp[%s]",
774 ext_intf.name, ext_intf.vnfd_connection_point_ref)
775 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
776 if cp is None:
777 self._log.debug("Failed to find connection point - %s",
778 ext_intf.vnfd_connection_point_ref)
779 continue
780 self._log.debug("Connection point name [%s], type[%s]",
781 cp.name, cp.type_yang)
782
783 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
784
785 etuple = (ext_intf, cp, vlr)
786 self._ext_intf.append(etuple)
787
788 self._log.debug("Created external interface tuple : %s", etuple)
789
790 # Resolve the networks associated internal interfaces
791 for intf in self._vdud.internal_interface:
792 cp_id = intf.vdu_internal_connection_point_ref
793 self._log.debug("Resolving internal interface name [%s], cp[%s]",
794 intf.name, cp_id)
795
796 try:
797 vlr = find_internal_vlr_by_cp_name(cp_id)
798 except Exception as e:
799 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
800 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
801 raise VduRecordError(msg)
802
803 ituple = (intf, cp_id, vlr)
804 self._int_intf.append(ituple)
805
806 self._log.debug("Created internal interface tuple : %s", ituple)
807
808 resmgr_path = self.resmgr_path
809 resmgr_msg = self.resmgr_msg(config)
810
811 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
812 block.add_query_create(resmgr_path, resmgr_msg)
813
814 res_iter = yield from block.execute(now=True)
815
816 resp = None
817
818 for i in res_iter:
819 r = yield from i
820 resp = r.result
821
822 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
823 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
824 self._log.debug("Got vm request response: %s", resp.resource_info)
825 return resp.resource_info
826
827 @asyncio.coroutine
828 def delete_resource(self, xact):
829 block = xact.block_create()
830
831 self._log.debug("Executing vm request id: %s, action: delete",
832 self._request_id)
833
834 block.add_query_delete(self.resmgr_path)
835
836 yield from block.execute(flags=0, now=True)
837
838 @asyncio.coroutine
839 def read_resource(self, xact):
840 block = xact.block_create()
841
842 self._log.debug("Executing vm request id: %s, action: delete",
843 self._request_id)
844
845 block.add_query_read(self.resmgr_path)
846
847 res_iter = yield from block.execute(flags=0, now=True)
848 for i in res_iter:
849 r = yield from i
850 resp = r.result
851
852 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
853 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
854 self._log.debug("Got vm request response: %s", resp.resource_info)
855 #self._vm_resp = resp.resource_info
856 return resp.resource_info
857
858
859 @asyncio.coroutine
860 def start_component(self):
861 """ This VDUR is active """
862 self._log.debug("Starting component %s for vdud %s vdur %s",
863 self._vdud.vcs_component_ref,
864 self._vdud,
865 self._vdur_id)
866 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
867 self.vm_resp.management_ip)
868
869 @property
870 def active(self):
871 """ Is this VDU active """
872 return True if self._state is VDURecordState.READY else False
873
874 @asyncio.coroutine
875 def instantiation_failed(self, failed_reason=None):
876 """ VDU instantiation failed """
877 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
878 self._state = VDURecordState.FAILED
879 self._state_failed_reason = failed_reason
880 yield from self._vnfr.instantiation_failed(failed_reason)
881
882 @asyncio.coroutine
883 def vdu_is_active(self):
884 """ This VDU is active"""
885 if self.active:
886 self._log.warning("VDU %s was already marked as active", self._vdur_id)
887 return
888
889 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
890
891 if self._vdud.vcs_component_ref is not None:
892 yield from self.start_component()
893
894 self._state = VDURecordState.READY
895
896 if self._vnfr.all_vdus_active():
897 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
898 yield from self._vnfr.is_ready()
899
900 @asyncio.coroutine
901 def instantiate(self, xact, vnfr, config=None):
902 """ Instantiate this VDU """
903 self._state = VDURecordState.INSTANTIATING
904
905 @asyncio.coroutine
906 def on_prepare(xact_info, query_action, ks_path, msg):
907 """ This VDUR is active """
908 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
909 query_action,
910 ks_path,
911 msg)
912
913 if (query_action == rwdts.QueryAction.UPDATE or
914 query_action == rwdts.QueryAction.CREATE):
915 self._vm_resp = msg
916
917 if msg.resource_state == "active":
918 # Move this VDU to ready state
919 yield from self.vdu_is_active()
920 elif msg.resource_state == "failed":
921 yield from self.instantiation_failed(msg.resource_errors)
922 elif query_action == rwdts.QueryAction.DELETE:
923 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
924 else:
925 raise NotImplementedError(
926 "%s action on VirtualDeployementUnitRecord not supported",
927 query_action)
928
929 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
930
931 try:
932 reg_event = asyncio.Event(loop=self._loop)
933
934 @asyncio.coroutine
935 def on_ready(regh, status):
936 reg_event.set()
937
938 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
939 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
940 flags=rwdts.Flag.SUBSCRIBER,
941 handler=handler)
942 yield from reg_event.wait()
943
944 vm_resp = yield from self.create_resource(xact, vnfr, config)
945 self._vm_resp = vm_resp
946 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
947
948 self._log.debug("Requested VM from resource manager response %s",
949 vm_resp)
950 if vm_resp.resource_state == "active":
951 self._log.debug("Resourcemgr responded wih an active vm resp %s",
952 vm_resp)
953 yield from self.vdu_is_active()
954 self._state = VDURecordState.READY
955 elif (vm_resp.resource_state == "pending" or
956 vm_resp.resource_state == "inactive"):
957 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
958 vm_resp)
959 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
960 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
961 # flags=rwdts.Flag.SUBSCRIBER,
962 # handler=handler)
963 else:
964 self._log.debug("Resourcemgr responded wih an error vm resp %s",
965 vm_resp)
966 raise VirtualDeploymentUnitRecordError(
967 "Failed VDUR instantiation %s " % vm_resp)
968
969 except Exception as e:
970 import traceback
971 traceback.print_exc()
972 self._log.exception(e)
973 self._log.error("Instantiation of VDU record failed: %s", str(e))
974 self._state = VDURecordState.FAILED
975 yield from self.instantiation_failed(str(e))
976
977
978 class VlRecordState(enum.Enum):
979 """ VL Record State """
980 INIT = 101
981 INSTANTIATION_PENDING = 102
982 ACTIVE = 103
983 TERMINATE_PENDING = 104
984 TERMINATED = 105
985 FAILED = 106
986
987
988 class InternalVirtualLinkRecord(object):
989 """ Internal Virtual Link record """
990 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
991 self._dts = dts
992 self._log = log
993 self._loop = loop
994 self._ivld_msg = ivld_msg
995 self._vnfr_name = vnfr_name
996 self._cloud_account_name = cloud_account_name
997 self._ip_profile = ip_profile
998
999 self._vlr_req = self.create_vlr()
1000 self._vlr = None
1001 self._state = VlRecordState.INIT
1002
1003 @property
1004 def vlr_id(self):
1005 """ Find VLR by id """
1006 return self._vlr_req.id
1007
1008 @property
1009 def name(self):
1010 """ Name of this VL """
1011 if self._ivld_msg.vim_network_name:
1012 return self._ivld_msg.vim_network_name
1013 else:
1014 return self._vnfr_name + "." + self._ivld_msg.name
1015
1016 @property
1017 def network_id(self):
1018 """ Find VLR by id """
1019 return self._vlr.network_id if self._vlr else None
1020
1021 def vlr_path(self):
1022 """ VLR path for this VLR instance"""
1023 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1024
1025 def create_vlr(self):
1026 """ Create the VLR record which will be instantiated """
1027
1028 vld_fields = ["short_name",
1029 "vendor",
1030 "description",
1031 "version",
1032 "type_yang",
1033 "vim_network_name",
1034 "provider_network"]
1035
1036 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1037
1038 vlr_dict = {"id": str(uuid.uuid4()),
1039 "name": self.name,
1040 "cloud_account": self._cloud_account_name,
1041 }
1042
1043 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1044 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1045
1046 vlr_dict.update(vld_copy_dict)
1047
1048 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1049 return vlr
1050
1051 @asyncio.coroutine
1052 def instantiate(self, xact, restart_mode=False):
1053 """ Instantiate VL """
1054
1055 @asyncio.coroutine
1056 def instantiate_vlr():
1057 """ Instantiate VLR"""
1058 self._log.debug("Create VL with xpath %s and vlr %s",
1059 self.vlr_path(), self._vlr_req)
1060
1061 with self._dts.transaction(flags=0) as xact:
1062 block = xact.block_create()
1063 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1064 self._log.debug("Executing VL create path:%s msg:%s",
1065 self.vlr_path(), self._vlr_req)
1066
1067 res_iter = None
1068 try:
1069 res_iter = yield from block.execute()
1070 except Exception:
1071 self._state = VlRecordState.FAILED
1072 self._log.exception("Caught exception while instantial VL")
1073 raise
1074
1075 for ent in res_iter:
1076 res = yield from ent
1077 self._vlr = res.result
1078
1079 if self._vlr.operational_status == 'failed':
1080 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1081 self._state = VlRecordState.FAILED
1082 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1083
1084 self._log.info("Created VL with xpath %s and vlr %s",
1085 self.vlr_path(), self._vlr)
1086
1087 @asyncio.coroutine
1088 def get_vlr():
1089 """ Get the network id """
1090 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1091 vlr = None
1092 for ent in res_iter:
1093 res = yield from ent
1094 vlr = res.result
1095
1096 if vlr is None:
1097 err = "Failed to get VLR for path %s" % self.vlr_path()
1098 self._log.warn(err)
1099 raise InternalVirtualLinkRecordError(err)
1100 return vlr
1101
1102 self._state = VlRecordState.INSTANTIATION_PENDING
1103
1104 if restart_mode:
1105 vl = yield from get_vlr()
1106 if vl is None:
1107 yield from instantiate_vlr()
1108 else:
1109 yield from instantiate_vlr()
1110
1111 self._state = VlRecordState.ACTIVE
1112
1113 def vlr_in_vns(self):
1114 """ Is there a VLR record in VNS """
1115 if (self._state == VlRecordState.ACTIVE or
1116 self._state == VlRecordState.INSTANTIATION_PENDING or
1117 self._state == VlRecordState.FAILED):
1118 return True
1119
1120 return False
1121
1122 @asyncio.coroutine
1123 def terminate(self, xact):
1124 """Terminate this VL """
1125 if not self.vlr_in_vns():
1126 self._log.debug("Ignoring terminate request for id %s in state %s",
1127 self.vlr_id, self._state)
1128 return
1129
1130 self._log.debug("Terminating VL with path %s", self.vlr_path())
1131 self._state = VlRecordState.TERMINATE_PENDING
1132 block = xact.block_create()
1133 block.add_query_delete(self.vlr_path())
1134 yield from block.execute(flags=0, now=True)
1135 self._state = VlRecordState.TERMINATED
1136 self._log.debug("Terminated VL with path %s", self.vlr_path())
1137
1138
1139 class VirtualNetworkFunctionRecord(object):
1140 """ Virtual Network Function Record """
1141 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1142 self._dts = dts
1143 self._log = log
1144 self._loop = loop
1145 self._cluster_name = cluster_name
1146 self._vnfr_msg = vnfr_msg
1147 self._vnfr_id = vnfr_msg.id
1148 self._vnfd_id = vnfr_msg.vnfd.id
1149 self._vnfm = vnfm
1150 self._vcs_handler = vcs_handler
1151 self._vnfr = vnfr_msg
1152 self._mgmt_network = mgmt_network
1153
1154 self._vnfd = vnfr_msg.vnfd
1155 self._state = VirtualNetworkFunctionRecordState.INIT
1156 self._state_failed_reason = None
1157 self._ext_vlrs = {} # The list of external virtual links
1158 self._vlrs = [] # The list of internal virtual links
1159 self._vdus = [] # The list of vdu
1160 self._vlr_by_cp = {}
1161 self._cprs = []
1162 self._inventory = {}
1163 self._create_time = int(time.time())
1164 self._vnf_mon = None
1165 self._config_status = vnfr_msg.config_status
1166 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1167 self._rw_vnfd = None
1168 self._vnfd_ref_count = 0
1169
1170 def _get_vdur_from_vdu_id(self, vdu_id):
1171 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1172 self._log.debug("Searching through vdus: %s", self._vdus)
1173 for vdu in self._vdus:
1174 self._log.debug("vdu_id: %s", vdu.vdu_id)
1175 if vdu.vdu_id == vdu_id:
1176 return vdu
1177
1178 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1179
1180 @property
1181 def operational_status(self):
1182 """ Operational status of this VNFR """
1183 op_status_map = {"INIT": "init",
1184 "VL_INIT_PHASE": "vl_init_phase",
1185 "VM_INIT_PHASE": "vm_init_phase",
1186 "READY": "running",
1187 "TERMINATE": "terminate",
1188 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1189 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1190 "TERMINATED": "terminated",
1191 "FAILED": "failed", }
1192 return op_status_map[self._state.name]
1193
1194 @staticmethod
1195 def vnfd_xpath(vnfd_id):
1196 """ VNFD xpath associated with this VNFR """
1197 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1198
1199 @property
1200 def vnfd_ref_count(self):
1201 """ Returns the VNFD reference count associated with this VNFR """
1202 return self._vnfd_ref_count
1203
1204 def vnfd_in_use(self):
1205 """ Returns whether vnfd is in use or not """
1206 return True if self._vnfd_ref_count > 0 else False
1207
1208 def vnfd_ref(self):
1209 """ Take a reference on this object """
1210 self._vnfd_ref_count += 1
1211 return self._vnfd_ref_count
1212
1213 def vnfd_unref(self):
1214 """ Release reference on this object """
1215 if self._vnfd_ref_count < 1:
1216 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1217 (self.vnfd.id, self._vnfd_ref_count))
1218 self._log.critical(msg)
1219 raise VnfRecordError(msg)
1220 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1221 self.vnfd.id, self._vnfd_ref_count)
1222 self._vnfd_ref_count -= 1
1223 return self._vnfd_ref_count
1224
1225 @property
1226 def vnfd(self):
1227 """ VNFD for this VNFR """
1228 return self._vnfd
1229
1230 @property
1231 def vnf_name(self):
1232 """ VNFD name associated with this VNFR """
1233 return self.vnfd.name
1234
1235 @property
1236 def name(self):
1237 """ Name of this VNF in the record """
1238 return self._vnfr.name
1239
1240 @property
1241 def cloud_account_name(self):
1242 """ Name of the cloud account this VNFR is instantiated in """
1243 return self._vnfr.cloud_account
1244
1245 @property
1246 def vnfd_id(self):
1247 """ VNFD Id associated with this VNFR """
1248 return self.vnfd.id
1249
1250 @property
1251 def vnfr_id(self):
1252 """ VNFR Id associated with this VNFR """
1253 return self._vnfr_id
1254
1255 @property
1256 def member_vnf_index(self):
1257 """ Member VNF index associated with this VNFR """
1258 return self._vnfr.member_vnf_index_ref
1259
1260 @property
1261 def config_status(self):
1262 """ Config agent status for this VNFR """
1263 return self._config_status
1264
1265 def component_by_name(self, component_name):
1266 """ Find a component by name in the inventory list"""
1267 mangled_name = VcsComponent.mangle_name(component_name,
1268 self.vnf_name,
1269 self.vnfd_id)
1270 return self._inventory[mangled_name]
1271
1272
1273
1274 @asyncio.coroutine
1275 def get_nsr_config(self):
1276 ### Need access to NS instance configuration for runtime resolution.
1277 ### This shall be replaced when deployment flavors are implemented
1278 xpath = "C,/nsr:ns-instance-config"
1279 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1280
1281 for result in results:
1282 entry = yield from result
1283 ns_instance_config = entry.result
1284 for nsr in ns_instance_config.nsr:
1285 if nsr.id == self._vnfr_msg.nsr_id_ref:
1286 return nsr
1287 return None
1288
1289 @asyncio.coroutine
1290 def start_component(self, component_name, ip_addr):
1291 """ Start a component in the VNFR by name """
1292 comp = self.component_by_name(component_name)
1293 yield from comp.start(None, None, ip_addr)
1294
1295 def cp_ip_addr(self, cp_name):
1296 """ Get ip address for connection point """
1297 self._log.debug("cp_ip_addr()")
1298 for cp in self._cprs:
1299 if cp.name == cp_name and cp.ip_address is not None:
1300 return cp.ip_address
1301 return "0.0.0.0"
1302
1303 def mgmt_intf_info(self):
1304 """ Get Management interface info for this VNFR """
1305 mgmt_intf_desc = self.vnfd.mgmt_interface
1306 ip_addr = None
1307 if mgmt_intf_desc.has_field("cp"):
1308 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1309 elif mgmt_intf_desc.has_field("vdu_id"):
1310 try:
1311 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1312 ip_addr = vdur.management_ip
1313 except VDURecordNotFound:
1314 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1315 ip_addr = None
1316 else:
1317 ip_addr = mgmt_intf_desc.ip_address
1318 port = mgmt_intf_desc.port
1319
1320 return ip_addr, port
1321
1322 @property
1323 def msg(self):
1324 """ Message associated with this VNFR """
1325 vnfd_fields = ["short_name", "vendor", "description", "version"]
1326 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1327
1328 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1329 ip_address, port = self.mgmt_intf_info()
1330
1331 if ip_address is not None:
1332 mgmt_intf.ip_address = ip_address
1333 if port is not None:
1334 mgmt_intf.port = port
1335
1336 vnfr_dict = {"id": self._vnfr_id,
1337 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1338 "name": self.name,
1339 "member_vnf_index_ref": self.member_vnf_index,
1340 "operational_status": self.operational_status,
1341 "operational_status_details": self._state_failed_reason,
1342 "cloud_account": self.cloud_account_name,
1343 "config_status": self._config_status
1344 }
1345
1346 vnfr_dict.update(vnfd_copy_dict)
1347
1348 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1349 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1350
1351 vnfr_msg.create_time = self._create_time
1352 vnfr_msg.uptime = int(time.time()) - self._create_time
1353 vnfr_msg.mgmt_interface = mgmt_intf
1354
1355 # Add all the VLRs to VNFR
1356 for vlr in self._vlrs:
1357 ivlr = vnfr_msg.internal_vlr.add()
1358 ivlr.vlr_ref = vlr.vlr_id
1359
1360 # Add all the VDURs to VDUR
1361 if self._vdus is not None:
1362 for vdu in self._vdus:
1363 vdur = vnfr_msg.vdur.add()
1364 vdur.from_dict(vdu.msg.as_dict())
1365
1366 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1367 vnfr_msg.dashboard_url = self.dashboard_url
1368
1369 for cpr in self._cprs:
1370 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1371 vnfr_msg.connection_point.append(new_cp)
1372
1373 if self._vnf_mon is not None:
1374 for monp in self._vnf_mon.msg:
1375 vnfr_msg.monitoring_param.append(
1376 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1377
1378 if self._vnfr.vnf_configuration is not None:
1379 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1380 if (ip_address is not None and
1381 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1382 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1383
1384 for group in self._vnfr_msg.placement_groups_info:
1385 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1386 group_info.from_dict(group.as_dict())
1387 vnfr_msg.placement_groups_info.append(group_info)
1388
1389 return vnfr_msg
1390
1391 @property
1392 def dashboard_url(self):
1393 ip, cfg_port = self.mgmt_intf_info()
1394 protocol = 'http'
1395 http_port = 80
1396 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1397 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1398 protocol = 'https'
1399 http_port = 443
1400 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1401 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1402
1403 url = "{protocol}://{ip_address}:{port}/{path}".format(
1404 protocol=protocol,
1405 ip_address=ip,
1406 port=http_port,
1407 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1408 )
1409
1410 return url
1411
1412 @property
1413 def xpath(self):
1414 """ path for this VNFR """
1415 return("D,/vnfr:vnfr-catalog"
1416 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1417
1418 @asyncio.coroutine
1419 def publish(self, xact):
1420 """ publish this VNFR """
1421 vnfr = self.msg
1422 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1423 self.xpath, self.msg)
1424 vnfr.create_time = self._create_time
1425 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1426 self._log.debug("Published VNFR path = [%s], record = [%s]",
1427 self.xpath, self.msg)
1428
1429 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1430 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1431 if not vld.has_field('ip_profile_ref'):
1432 return None
1433 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1434 return profile[0] if profile else None
1435
1436 @asyncio.coroutine
1437 def create_vls(self):
1438 """ Publish The VLs associated with this VNF """
1439 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1440 self.vnfd_id)
1441 for ivld_msg in self.vnfd.internal_vld:
1442 self._log.debug("Creating internal vld:"
1443 " %s, int_cp_ref = %s",
1444 ivld_msg, ivld_msg.internal_connection_point
1445 )
1446 vlr = InternalVirtualLinkRecord(dts=self._dts,
1447 log=self._log,
1448 loop=self._loop,
1449 ivld_msg=ivld_msg,
1450 vnfr_name=self.name,
1451 cloud_account_name=self.cloud_account_name,
1452 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1453 )
1454 self._vlrs.append(vlr)
1455
1456 for int_cp in ivld_msg.internal_connection_point:
1457 if int_cp.id_ref in self._vlr_by_cp:
1458 msg = ("Connection point %s already "
1459 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1460 raise InternalVirtualLinkRecordError(msg)
1461 self._log.debug("Setting vlr %s to internal cp = %s",
1462 vlr, int_cp.id_ref)
1463 self._vlr_by_cp[int_cp.id_ref] = vlr
1464
1465 @asyncio.coroutine
1466 def instantiate_vls(self, xact, restart_mode=False):
1467 """ Instantiate the VLs associated with this VNF """
1468 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1469 self.vnfd_id)
1470
1471 for vlr in self._vlrs:
1472 self._log.debug("Instantiating VLR %s", vlr)
1473 yield from vlr.instantiate(xact, restart_mode)
1474
1475 def find_vlr_by_cp(self, cp_name):
1476 """ Find the VLR associated with the cp name """
1477 return self._vlr_by_cp[cp_name]
1478
1479 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1480 """
1481 Returns the cloud specific construct for placement group
1482 Arguments:
1483 input_group: VNFD PlacementGroup
1484 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1485 """
1486 copy_dict = ['name', 'requirement', 'strategy']
1487 for group_info in nsr_config.vnfd_placement_group_maps:
1488 if group_info.placement_group_ref == input_group.name and \
1489 group_info.vnfd_id_ref == self.vnfd_id:
1490 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1491 group_dict = {k:v for k,v in
1492 group_info.as_dict().items()
1493 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1494 for param in copy_dict:
1495 group_dict.update({param: getattr(input_group, param)})
1496 group.from_dict(group_dict)
1497 return group
1498 return None
1499
1500 @asyncio.coroutine
1501 def get_vdu_placement_groups(self, vdu):
1502 placement_groups = []
1503 ### Step-1: Get VNF level placement groups
1504 for group in self._vnfr_msg.placement_groups_info:
1505 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1506 #group_info.from_dict(group.as_dict())
1507 placement_groups.append(group)
1508
1509 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1510 nsr_config = yield from self.get_nsr_config()
1511
1512 ### Step-3: Get VDU level placement groups
1513 for group in self.vnfd.placement_groups:
1514 for member_vdu in group.member_vdus:
1515 if member_vdu.member_vdu_ref == vdu.id:
1516 group_info = self.resolve_placement_group_cloud_construct(group,
1517 nsr_config)
1518 if group_info is None:
1519 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1520 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1521 else:
1522 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1523 str(group_info),
1524 vdu.name,
1525 self.vnf_name,
1526 self.member_vnf_index)
1527 placement_groups.append(group_info)
1528
1529 return placement_groups
1530
1531 @asyncio.coroutine
1532 def vdu_cloud_init_instantiation(self):
1533 [vdu.vdud_cloud_init for vdu in self._vdus]
1534
1535 @asyncio.coroutine
1536 def create_vdus(self, vnfr, restart_mode=False):
1537 """ Create the VDUs associated with this VNF """
1538
1539 def get_vdur_id(vdud):
1540 """Get the corresponding VDUR's id for the VDUD. This is useful in
1541 case of a restart.
1542
1543 In restart mode we check for exiting VDUR's ID and use them, if
1544 available. This way we don't end up creating duplicate VDURs
1545 """
1546 vdur_id = None
1547
1548 if restart_mode and vdud is not None:
1549 try:
1550 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1551 vdur_id = vdur[0]
1552 except IndexError:
1553 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1554
1555 return vdur_id
1556
1557
1558 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1559 for vdu in self._rw_vnfd.vdu:
1560 self._log.debug("Creating vdu: %s", vdu)
1561 vdur_id = get_vdur_id(vdu)
1562
1563 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1564 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1565 vdu.name,
1566 self.vnf_name,
1567 self.member_vnf_index,
1568 [ group.name for group in placement_groups])
1569
1570 vdur = VirtualDeploymentUnitRecord(
1571 dts=self._dts,
1572 log=self._log,
1573 loop=self._loop,
1574 vdud=vdu,
1575 vnfr=vnfr,
1576 mgmt_intf=self.has_mgmt_interface(vdu),
1577 mgmt_network=self._mgmt_network,
1578 cloud_account_name=self.cloud_account_name,
1579 vnfd_package_store=self._vnfd_package_store,
1580 vdur_id=vdur_id,
1581 placement_groups = placement_groups,
1582 )
1583 yield from vdur.vdu_opdata_register()
1584
1585 self._vdus.append(vdur)
1586
1587 @asyncio.coroutine
1588 def instantiate_vdus(self, xact, vnfr):
1589 """ Instantiate the VDUs associated with this VNF """
1590 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1591
1592 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1593
1594 # Identify any dependencies among the VDUs
1595 dependencies = collections.defaultdict(list)
1596 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1597
1598 for vdu in self._vdus:
1599 if vdu._vdud_cloud_init is not None:
1600 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1601 if vdu_id != vdu.vdu_id:
1602 # This means that vdu.vdu_id depends upon vdu_id,
1603 # i.e. vdu_id must be instantiated before
1604 # vdu.vdu_id.
1605 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1606
1607 # Define the terminal states of VDU instantiation
1608 terminal = (
1609 VDURecordState.READY,
1610 VDURecordState.TERMINATED,
1611 VDURecordState.FAILED,
1612 )
1613
1614 datastore = VdurDatastore()
1615 processed = set()
1616
1617 @asyncio.coroutine
1618 def instantiate_monitor(vdu):
1619 """Monitor the state of the VDU during instantiation
1620
1621 Arguments:
1622 vdu - a VirtualDeploymentUnitRecord
1623
1624 """
1625 # wait for the VDUR to enter a terminal state
1626 while vdu._state not in terminal:
1627 yield from asyncio.sleep(1, loop=self._loop)
1628 # update the datastore
1629 datastore.update(vdu)
1630
1631 # add the VDU to the set of processed VDUs
1632 processed.add(vdu.vdu_id)
1633
1634 @asyncio.coroutine
1635 def instantiate(vdu):
1636 """Instantiate the specified VDU
1637
1638 Arguments:
1639 vdu - a VirtualDeploymentUnitRecord
1640
1641 Raises:
1642 if the VDU, or any of the VDUs this VDU depends upon, are
1643 terminated or fail to instantiate properly, a
1644 VirtualDeploymentUnitRecordError is raised.
1645
1646 """
1647 for dependency in dependencies[vdu.vdu_id]:
1648 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1649
1650 while dependency.vdu_id not in processed:
1651 yield from asyncio.sleep(1, loop=self._loop)
1652
1653 if not dependency.active:
1654 raise VirtualDeploymentUnitRecordError()
1655
1656 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1657
1658 # Populate the datastore with the current values of the VDU
1659 datastore.add(vdu)
1660
1661 # Substitute any variables contained in the cloud config script
1662 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1663
1664 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1665 if len(parts) > 1:
1666
1667 # Extract the variable names
1668 variables = list()
1669 for variable in parts[1::2]:
1670 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1671
1672 # Iterate of the variables and substitute values from the
1673 # datastore.
1674 for variable in variables:
1675
1676 # Handle a reference to a VDU by ID
1677 if variable.startswith('vdu['):
1678 value = datastore.get(variable)
1679 if value is None:
1680 msg = "Unable to find a substitute for {} in {} cloud-init script"
1681 raise ValueError(msg.format(variable, vdu.vdu_id))
1682
1683 config = config.replace("{{ %s }}" % variable, value)
1684 continue
1685
1686 # Handle a reference to the current VDU
1687 if variable.startswith('vdu'):
1688 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1689 config = config.replace("{{ %s }}" % variable, value)
1690 continue
1691
1692 # Handle unrecognized variables
1693 msg = 'unrecognized cloud-config variable: {}'
1694 raise ValueError(msg.format(variable))
1695
1696 # Instantiate the VDU
1697 with self._dts.transaction() as xact:
1698 self._log.debug("Instantiating vdu: %s", vdu)
1699 yield from vdu.instantiate(xact, vnfr, config=config)
1700 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1701 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1702 self.vnfr_id, vdu)
1703
1704 # First create a set of tasks to monitor the state of the VDUs and
1705 # report when they have entered a terminal state
1706 for vdu in self._vdus:
1707 self._loop.create_task(instantiate_monitor(vdu))
1708
1709 for vdu in self._vdus:
1710 self._loop.create_task(instantiate(vdu))
1711
1712 def has_mgmt_interface(self, vdu):
1713 # ## TODO: Support additional mgmt_interface type options
1714 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1715 return True
1716 return False
1717
1718 def vlr_xpath(self, vlr_id):
1719 """ vlr xpath """
1720 return(
1721 "D,/vlr:vlr-catalog/"
1722 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1723
1724 def ext_vlr_by_id(self, vlr_id):
1725 """ find ext vlr by id """
1726 return self._ext_vlrs[vlr_id]
1727
1728 @asyncio.coroutine
1729 def publish_inventory(self, xact):
1730 """ Publish the inventory associated with this VNF """
1731 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1732
1733 for component in self._rw_vnfd.component:
1734 self._log.debug("Creating inventory component %s", component)
1735 mangled_name = VcsComponent.mangle_name(component.component_name,
1736 self.vnf_name,
1737 self.vnfd_id
1738 )
1739 comp = VcsComponent(dts=self._dts,
1740 log=self._log,
1741 loop=self._loop,
1742 cluster_name=self._cluster_name,
1743 vcs_handler=self._vcs_handler,
1744 component=component,
1745 mangled_name=mangled_name,
1746 )
1747 if comp.name in self._inventory:
1748 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1749 component, self._vnfd_id)
1750 return
1751 self._log.debug("Adding component %s for vnrf %s",
1752 comp.name, self._vnfr_id)
1753 self._inventory[comp.name] = comp
1754 yield from comp.publish(xact)
1755
1756 def all_vdus_active(self):
1757 """ Are all VDUS in this VNFR active? """
1758 for vdu in self._vdus:
1759 if not vdu.active:
1760 return False
1761
1762 self._log.debug("Inside all_vdus_active. Returning True")
1763 return True
1764
1765 @asyncio.coroutine
1766 def instantiation_failed(self, failed_reason=None):
1767 """ VNFR instantiation failed """
1768 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1769 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1770 self._state_failed_reason = failed_reason
1771
1772 # Update the VNFR with the changed status
1773 yield from self.publish(None)
1774
1775 @asyncio.coroutine
1776 def is_ready(self):
1777 """ This VNF is ready"""
1778 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1779
1780 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1781 self.set_state(VirtualNetworkFunctionRecordState.READY)
1782
1783 else:
1784 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1785
1786 # Update the VNFR with the changed status
1787 yield from self.publish(None)
1788
1789 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1790 """Updated the connection point with ip address"""
1791 for cp in self._cprs:
1792 if cp.name == cp_name:
1793 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1794 cp_name, cp, ip_address, cp_id)
1795 cp.ip_address = ip_address
1796 cp.mac_address = mac_addr
1797 cp.connection_point_id = cp_id
1798 return
1799
1800 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1801 self._log.debug(err)
1802 raise VirtualDeploymentUnitRecordError(err)
1803
1804 def set_state(self, state):
1805 """ Set state for this VNFR"""
1806 self._state = state
1807
1808 @asyncio.coroutine
1809 def instantiate(self, xact, restart_mode=False):
1810 """ instantiate this VNF """
1811 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1812 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1813
1814 @asyncio.coroutine
1815 def fetch_vlrs():
1816 """ Fetch VLRs """
1817 # Iterate over all the connection points in VNFR and fetch the
1818 # associated VLRs
1819
1820 def cpr_from_cp(cp):
1821 """ Creates a record level connection point from the desciptor cp"""
1822 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1823 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1824 cpr_dict = {}
1825 cpr_dict.update(cp_copy_dict)
1826 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1827
1828 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1829 self._vnfr_id, self._vnfr.connection_point)
1830
1831 for cp in self._vnfr.connection_point:
1832 cpr = cpr_from_cp(cp)
1833 self._cprs.append(cpr)
1834 self._log.debug("Adding Connection point record %s ", cp)
1835
1836 vlr_path = self.vlr_xpath(cp.vlr_ref)
1837 self._log.debug("Fetching VLR with path = %s", vlr_path)
1838 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1839 rwdts.XactFlag.MERGE)
1840 for i in res_iter:
1841 r = yield from i
1842 d = r.result
1843 self._ext_vlrs[cp.vlr_ref] = d
1844 cpr.vlr_ref = cp.vlr_ref
1845 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1846
1847 # Increase the VNFD reference count
1848 self.vnfd_ref()
1849
1850 assert self.vnfd
1851
1852 # Fetch External VLRs
1853 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1854 yield from fetch_vlrs()
1855
1856 # Publish inventory
1857 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1858 yield from self.publish_inventory(xact)
1859
1860 # Publish inventory
1861 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1862 yield from self.create_vls()
1863
1864 # publish the VNFR
1865 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1866 yield from self.publish(xact)
1867
1868
1869 # instantiate VLs
1870 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1871 try:
1872 yield from self.instantiate_vls(xact, restart_mode)
1873 except Exception as e:
1874 self._log.exception("VL instantiation failed (%s)", str(e))
1875 yield from self.instantiation_failed(str(e))
1876 return
1877
1878 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1879
1880 # instantiate VDUs
1881 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1882 yield from self.create_vdus(self, restart_mode)
1883
1884 try:
1885 yield from self.vdu_cloud_init_instantiation()
1886 except Exception as e:
1887 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1888 self._state_failed_reason = str(e)
1889 yield from self.publish(xact)
1890
1891 # publish the VNFR
1892 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1893 yield from self.publish(xact)
1894
1895 # instantiate VDUs
1896 # ToDo: Check if this should be prevented during restart
1897 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1898 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1899
1900 # publish the VNFR
1901 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1902 yield from self.publish(xact)
1903
1904 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1905
1906 # create task updating uptime for this vnfr
1907 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1908 self._loop.create_task(self.vnfr_uptime_update(xact))
1909
1910 @asyncio.coroutine
1911 def terminate(self, xact):
1912 """ Terminate this virtual network function """
1913
1914 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1915
1916 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1917
1918 # stop monitoring
1919 if self._vnf_mon is not None:
1920 self._vnf_mon.stop()
1921 self._vnf_mon.deregister()
1922 self._vnf_mon = None
1923
1924 @asyncio.coroutine
1925 def terminate_vls():
1926 """ Terminate VLs in this VNF """
1927 for vl in self._vlrs:
1928 yield from vl.terminate(xact)
1929
1930 @asyncio.coroutine
1931 def terminate_vdus():
1932 """ Terminate VDUS in this VNF """
1933 for vdu in self._vdus:
1934 yield from vdu.terminate(xact)
1935
1936 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1937 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1938 yield from terminate_vls()
1939
1940 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1941 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1942 yield from terminate_vdus()
1943
1944 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1945 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1946
1947 @asyncio.coroutine
1948 def vnfr_uptime_update(self, xact):
1949 while True:
1950 # Return when vnfr state is FAILED or TERMINATED etc
1951 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1952 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1953 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1954 VirtualNetworkFunctionRecordState.READY]:
1955 return
1956 yield from self.publish(xact)
1957 yield from asyncio.sleep(2, loop=self._loop)
1958
1959
1960
1961 class VnfdDtsHandler(object):
1962 """ DTS handler for VNFD config changes """
1963 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1964
1965 def __init__(self, dts, log, loop, vnfm):
1966 self._dts = dts
1967 self._log = log
1968 self._loop = loop
1969 self._vnfm = vnfm
1970 self._regh = None
1971
1972 @asyncio.coroutine
1973 def regh(self):
1974 """ DTS registration handle """
1975 return self._regh
1976
1977 @asyncio.coroutine
1978 def register(self):
1979 """ Register for VNFD configuration"""
1980
1981 def on_apply(dts, acg, xact, action, scratch):
1982 """Apply the configuration"""
1983 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1984 xact, action, scratch)
1985
1986 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1987
1988 @asyncio.coroutine
1989 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1990 """ on prepare callback """
1991 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1992 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1993 fref = ProtobufC.FieldReference.alloc()
1994 fref.goto_whole_message(msg.to_pbcm())
1995
1996 # Handle deletes in prepare_callback
1997 if fref.is_field_deleted():
1998 # Delete an VNFD record
1999 self._log.debug("Deleting VNFD with id %s", msg.id)
2000 if self._vnfm.vnfd_in_use(msg.id):
2001 self._log.debug("Cannot delete VNFD in use - %s", msg)
2002 err = "Cannot delete a VNFD in use - %s" % msg
2003 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2004 # Delete a VNFD record
2005 yield from self._vnfm.delete_vnfd(msg.id)
2006
2007 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2008
2009 self._log.debug(
2010 "Registering for VNFD config using xpath: %s",
2011 VnfdDtsHandler.XPATH,
2012 )
2013 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2014 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2015 self._regh = acg.register(
2016 xpath=VnfdDtsHandler.XPATH,
2017 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2018 on_prepare=on_prepare)
2019
2020
2021 class VcsComponentDtsHandler(object):
2022 """ Vcs Component DTS handler """
2023 XPATH = ("D,/rw-manifest:manifest" +
2024 "/rw-manifest:operational-inventory" +
2025 "/rw-manifest:component")
2026
2027 def __init__(self, dts, log, loop, vnfm):
2028 self._dts = dts
2029 self._log = log
2030 self._loop = loop
2031 self._regh = None
2032 self._vnfm = vnfm
2033
2034 @property
2035 def regh(self):
2036 """ DTS registration handle """
2037 return self._regh
2038
2039 @asyncio.coroutine
2040 def register(self):
2041 """ Registers VCS component dts publisher registration"""
2042 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2043 VcsComponentDtsHandler.XPATH)
2044
2045 hdl = rift.tasklets.DTS.RegistrationHandler()
2046 handlers = rift.tasklets.Group.Handler()
2047 with self._dts.group_create(handler=handlers) as group:
2048 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2049 handler=hdl,
2050 flags=(rwdts.Flag.PUBLISHER |
2051 rwdts.Flag.NO_PREP_READ |
2052 rwdts.Flag.DATASTORE),)
2053
2054 @asyncio.coroutine
2055 def publish(self, xact, path, msg):
2056 """ Publishes the VCS component """
2057 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2058 xact, path, msg)
2059 self.regh.create_element(path, msg)
2060 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2061 VcsComponentDtsHandler.XPATH, xact, path, msg)
2062
2063 class VnfrConsoleOperdataDtsHandler(object):
2064 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2065 @property
2066 def vnfr_vdu_console_xpath(self):
2067 """ path for resource-mgr"""
2068 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2069
2070 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2071 self._dts = dts
2072 self._log = log
2073 self._loop = loop
2074 self._regh = None
2075 self._vnfm = vnfm
2076
2077 self._vnfr_id = vnfr_id
2078 self._vdur_id = vdur_id
2079 self._vdu_id = vdu_id
2080
2081 @asyncio.coroutine
2082 def register(self):
2083 """ Register for VNFR VDU Operational Data read from dts """
2084
2085 @asyncio.coroutine
2086 def on_prepare(xact_info, action, ks_path, msg):
2087 """ prepare callback from dts """
2088 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2089 self._log.debug(
2090 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2091 xact_info, action, xpath, msg
2092 )
2093
2094 if action == rwdts.QueryAction.READ:
2095 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2096 path_entry = schema.keyspec_to_entry(ks_path)
2097 self._log.debug("VDU Opdata path is {}".format(path_entry))
2098 try:
2099 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2100 except VnfRecordError as e:
2101 self._log.error("VNFR id %s not found", self._vnfr_id)
2102 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2103 return
2104 try:
2105 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2106 if not vdur._state == VDURecordState.READY:
2107 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2108 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2109 return
2110 with self._dts.transaction() as new_xact:
2111 resp = yield from vdur.read_resource(new_xact)
2112 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2113 vdur_console.id = self._vdur_id
2114 if resp.console_url:
2115 vdur_console.console_url = resp.console_url
2116 else:
2117 vdur_console.console_url = 'none'
2118 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2119 except Exception:
2120 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2121 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2122 vdur_console.id = self._vdur_id
2123 vdur_console.console_url = 'none'
2124
2125 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2126 xpath=self.vnfr_vdu_console_xpath,
2127 msg=vdur_console)
2128 else:
2129 #raise VnfRecordError("Not supported operation %s" % action)
2130 self._log.error("Not supported operation %s" % action)
2131 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2132 return
2133
2134
2135 self._log.debug("Registering for VNFR VDU using xpath: %s",
2136 self.vnfr_vdu_console_xpath)
2137 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2138 with self._dts.group_create() as group:
2139 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2140 handler=hdl,
2141 flags=rwdts.Flag.PUBLISHER,
2142 )
2143
2144
2145 class VnfrDtsHandler(object):
2146 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2147 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2148
2149 def __init__(self, dts, log, loop, vnfm):
2150 self._dts = dts
2151 self._log = log
2152 self._loop = loop
2153 self._vnfm = vnfm
2154
2155 self._regh = None
2156
2157 @property
2158 def regh(self):
2159 """ Return registration handle"""
2160 return self._regh
2161
2162 @property
2163 def vnfm(self):
2164 """ Return VNF manager instance """
2165 return self._vnfm
2166
2167 @asyncio.coroutine
2168 def register(self):
2169 """ Register for vnfr create/update/delete/read requests from dts """
2170 def on_commit(xact_info):
2171 """ The transaction has been committed """
2172 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2173 return rwdts.MemberRspCode.ACTION_OK
2174
2175 def on_abort(*args):
2176 """ Abort callback """
2177 self._log.debug("VNF transaction got aborted")
2178
2179 @asyncio.coroutine
2180 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2181
2182 @asyncio.coroutine
2183 def instantiate_realloc_vnfr(vnfr):
2184 """Re-populate the vnfm after restart
2185
2186 Arguments:
2187 vlink
2188
2189 """
2190
2191 yield from vnfr.instantiate(None, restart_mode=True)
2192
2193 if xact_event == rwdts.MemberEvent.INSTALL:
2194 curr_cfg = self.regh.elements
2195 for cfg in curr_cfg:
2196 vnfr = self.vnfm.create_vnfr(cfg)
2197 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2198
2199 self._log.debug("Got on_event in vnfm")
2200
2201 return rwdts.MemberRspCode.ACTION_OK
2202
2203 @asyncio.coroutine
2204 def on_prepare(xact_info, action, ks_path, msg):
2205 """ prepare callback from dts """
2206 self._log.debug(
2207 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2208 xact_info, action, msg
2209 )
2210
2211 if action == rwdts.QueryAction.CREATE:
2212 if not msg.has_field("vnfd"):
2213 err = "Vnfd not provided"
2214 self._log.error(err)
2215 raise VnfRecordError(err)
2216
2217 vnfr = self.vnfm.create_vnfr(msg)
2218 try:
2219 # RIFT-9105: Unable to add a READ query under an existing transaction
2220 # xact = xact_info.xact
2221 yield from vnfr.instantiate(None)
2222 except Exception as e:
2223 self._log.exception(e)
2224 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2225 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2226 yield from vnfr.publish(None)
2227 elif action == rwdts.QueryAction.DELETE:
2228 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2229 path_entry = schema.keyspec_to_entry(ks_path)
2230 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2231
2232 if vnfr is None:
2233 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2234 raise VirtualNetworkFunctionRecordNotFound(
2235 "VNFR id %s", path_entry.key00.id)
2236
2237 try:
2238 yield from vnfr.terminate(xact_info.xact)
2239 # Unref the VNFD
2240 vnfr.vnfd_unref()
2241 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2242 except Exception as e:
2243 self._log.exception(e)
2244 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2245
2246 elif action == rwdts.QueryAction.UPDATE:
2247 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2248 path_entry = schema.keyspec_to_entry(ks_path)
2249 vnfr = None
2250 try:
2251 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2252 except Exception as e:
2253 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2254 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2255 return
2256
2257 if vnfr is None:
2258 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2259 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2260 return
2261
2262 self._log.debug("VNFR {} update config status {} (current {})".
2263 format(vnfr.name, msg.config_status, vnfr.config_status))
2264 # Update the config status and publish
2265 vnfr._config_status = msg.config_status
2266 yield from vnfr.publish(None)
2267
2268 else:
2269 raise NotImplementedError(
2270 "%s action on VirtualNetworkFunctionRecord not supported",
2271 action)
2272
2273 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2274
2275 self._log.debug("Registering for VNFR using xpath: %s",
2276 VnfrDtsHandler.XPATH,)
2277
2278 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2279 on_prepare=on_prepare,)
2280 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2281 with self._dts.group_create(handler=handlers) as group:
2282 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2283 handler=hdl,
2284 flags=(rwdts.Flag.PUBLISHER |
2285 rwdts.Flag.NO_PREP_READ |
2286 rwdts.Flag.CACHE |
2287 rwdts.Flag.DATASTORE),)
2288
2289 @asyncio.coroutine
2290 def create(self, xact, path, msg):
2291 """
2292 Create a VNFR record in DTS with path and message
2293 """
2294 self._log.debug("Creating VNFR xact = %s, %s:%s",
2295 xact, path, msg)
2296
2297 self.regh.create_element(path, msg)
2298 self._log.debug("Created VNFR xact = %s, %s:%s",
2299 xact, path, msg)
2300
2301 @asyncio.coroutine
2302 def update(self, xact, path, msg):
2303 """
2304 Update a VNFR record in DTS with path and message
2305 """
2306 self._log.debug("Updating VNFR xact = %s, %s:%s",
2307 xact, path, msg)
2308 self.regh.update_element(path, msg)
2309 self._log.debug("Updated VNFR xact = %s, %s:%s",
2310 xact, path, msg)
2311
2312 @asyncio.coroutine
2313 def delete(self, xact, path):
2314 """
2315 Delete a VNFR record in DTS with path and message
2316 """
2317 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2318 self.regh.delete_element(path)
2319 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2320
2321
2322 class VnfdRefCountDtsHandler(object):
2323 """ The VNFD Ref Count DTS handler """
2324 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2325
2326 def __init__(self, dts, log, loop, vnfm):
2327 self._dts = dts
2328 self._log = log
2329 self._loop = loop
2330 self._vnfm = vnfm
2331
2332 self._regh = None
2333
2334 @property
2335 def regh(self):
2336 """ Return registration handle """
2337 return self._regh
2338
2339 @property
2340 def vnfm(self):
2341 """ Return the NS manager instance """
2342 return self._vnfm
2343
2344 @asyncio.coroutine
2345 def register(self):
2346 """ Register for VNFD ref count read from dts """
2347
2348 @asyncio.coroutine
2349 def on_prepare(xact_info, action, ks_path, msg):
2350 """ prepare callback from dts """
2351 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2352 self._log.debug(
2353 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2354 xact_info, action, xpath, msg
2355 )
2356
2357 if action == rwdts.QueryAction.READ:
2358 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2359 path_entry = schema.keyspec_to_entry(ks_path)
2360 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2361 for xpath, msg in vnfd_list:
2362 self._log.debug("Responding to ref count query path:%s, msg:%s",
2363 xpath, msg)
2364 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2365 xpath=xpath,
2366 msg=msg)
2367 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2368 else:
2369 raise VnfRecordError("Not supported operation %s" % action)
2370
2371 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2372 with self._dts.group_create() as group:
2373 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2374 handler=hdl,
2375 flags=rwdts.Flag.PUBLISHER,
2376 )
2377
2378
2379 class VdurDatastore(object):
2380 """
2381 This VdurDatastore is intended to expose select information about a VDUR
2382 such that it can be referenced in a cloud config file. The data that is
2383 exposed does not necessarily follow the structure of the data in the yang
2384 model. This is intentional. The data that are exposed are intended to be
2385 agnostic of the yang model so that changes in the model do not necessarily
2386 require changes to the interface provided to the user. It also means that
2387 the user does not need to be familiar with the RIFT.ware yang models.
2388 """
2389
2390 def __init__(self):
2391 """Create an instance of VdurDatastore"""
2392 self._vdur_data = dict()
2393 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2394
2395 def add(self, vdur):
2396 """Add a new VDUR to the datastore
2397
2398 Arguments:
2399 vdur - a VirtualDeploymentUnitRecord instance
2400
2401 Raises:
2402 A ValueError is raised if the VDUR is (1) None or (2) already in
2403 the datastore.
2404
2405 """
2406 if vdur.vdu_id is None:
2407 raise ValueError('VDURs are required to have an ID')
2408
2409 if vdur.vdu_id in self._vdur_data:
2410 raise ValueError('cannot add a VDUR more than once')
2411
2412 self._vdur_data[vdur.vdu_id] = dict()
2413
2414 def set_if_not_none(key, attr):
2415 if attr is not None:
2416 self._vdur_data[vdur.vdu_id][key] = attr
2417
2418 set_if_not_none('name', vdur._vdud.name)
2419 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2420
2421 def update(self, vdur):
2422 """Update the VDUR information in the datastore
2423
2424 Arguments:
2425 vdur - a GI representation of a VDUR
2426
2427 Raises:
2428 A ValueError is raised if the VDUR is (1) None or (2) already in
2429 the datastore.
2430
2431 """
2432 if vdur.vdu_id is None:
2433 raise ValueError('VNFDs are required to have an ID')
2434
2435 if vdur.vdu_id not in self._vdur_data:
2436 raise ValueError('VNF is not recognized')
2437
2438 def set_or_delete(key, attr):
2439 if attr is None:
2440 if key in self._vdur_data[vdur.vdu_id]:
2441 del self._vdur_data[vdur.vdu_id][key]
2442
2443 else:
2444 self._vdur_data[vdur.vdu_id][key] = attr
2445
2446 set_or_delete('name', vdur._vdud.name)
2447 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2448
2449 def remove(self, vdur_id):
2450 """Remove all of the data associated with specified VDUR
2451
2452 Arguments:
2453 vdur_id - the identifier of a VNFD in the datastore
2454
2455 Raises:
2456 A ValueError is raised if the VDUR is not contained in the
2457 datastore.
2458
2459 """
2460 if vdur_id not in self._vdur_data:
2461 raise ValueError('VNF is not recognized')
2462
2463 del self._vdur_data[vdur_id]
2464
2465 def get(self, expr):
2466 """Retrieve VDUR information from the datastore
2467
2468 An expression should be of the form,
2469
2470 vdu[<id>].<attr>
2471
2472 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2473 the exposed attribute that the user wishes to retrieve.
2474
2475 If the requested data is not available, None is returned.
2476
2477 Arguments:
2478 expr - a string that specifies the data to return
2479
2480 Raises:
2481 A ValueError is raised if the provided expression cannot be parsed.
2482
2483 Returns:
2484 The requested data or None
2485
2486 """
2487 result = self._pattern.match(expr)
2488 if result is None:
2489 raise ValueError('data expression not recognized ({})'.format(expr))
2490
2491 vdur_id, key = result.groups()
2492
2493 if vdur_id not in self._vdur_data:
2494 return None
2495
2496 return self._vdur_data[vdur_id].get(key, None)
2497
2498
2499 class VnfManager(object):
2500 """ The virtual network function manager class """
2501 def __init__(self, dts, log, loop, cluster_name):
2502 self._dts = dts
2503 self._log = log
2504 self._loop = loop
2505 self._cluster_name = cluster_name
2506
2507 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2508 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2509 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2510 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2511
2512 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2513 self._vnfr_handler,
2514 self._vcs_handler,
2515 self._vnfr_ref_handler,
2516 self._nsr_handler]
2517 self._vnfrs = {}
2518 self._vnfds_to_vnfr = {}
2519 self._nsrs = {}
2520
2521 @property
2522 def vnfr_handler(self):
2523 """ VNFR dts handler """
2524 return self._vnfr_handler
2525
2526 @property
2527 def vcs_handler(self):
2528 """ VCS dts handler """
2529 return self._vcs_handler
2530
2531 @asyncio.coroutine
2532 def register(self):
2533 """ Register all static DTS handlers """
2534 for hdl in self._dts_handlers:
2535 yield from hdl.register()
2536
2537 @asyncio.coroutine
2538 def run(self):
2539 """ Run this VNFM instance """
2540 self._log.debug("Run VNFManager - registering static DTS handlers""")
2541 yield from self.register()
2542
2543 def handle_nsr(self, nsr, action):
2544 if action in [rwdts.QueryAction.CREATE]:
2545 self._nsrs[nsr.id] = nsr
2546 elif action == rwdts.QueryAction.DELETE:
2547 if nsr.id in self._nsrs:
2548 del self._nsrs[nsr.id]
2549
2550 def get_linked_mgmt_network(self, vnfr):
2551 """For the given VNFR get the related mgmt network from the NSD, if
2552 available.
2553 """
2554 vnfd_id = vnfr.vnfd.id
2555 nsr_id = vnfr.nsr_id_ref
2556
2557 # for the given related VNFR, get the corresponding NSR-config
2558 nsr_obj = None
2559 try:
2560 nsr_obj = self._nsrs[nsr_id]
2561 except KeyError:
2562 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2563
2564 # for the related NSD check if a VLD exists such that it's a mgmt
2565 # network
2566 for vld in nsr_obj.nsd.vld:
2567 if vld.mgmt_network:
2568 return vld.name
2569
2570 return None
2571
2572 def get_vnfr(self, vnfr_id):
2573 """ get VNFR by vnfr id """
2574
2575 if vnfr_id not in self._vnfrs:
2576 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2577
2578 return self._vnfrs[vnfr_id]
2579
2580 def create_vnfr(self, vnfr):
2581 """ Create a VNFR instance """
2582 if vnfr.id in self._vnfrs:
2583 msg = "Vnfr id %s already exists" % vnfr.id
2584 self._log.error(msg)
2585 raise VnfRecordError(msg)
2586
2587 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2588 vnfr.id,
2589 vnfr.vnfd.id)
2590
2591 mgmt_network = self.get_linked_mgmt_network(vnfr)
2592
2593 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2594 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2595 mgmt_network=mgmt_network
2596 )
2597
2598 #Update ref count
2599 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2600 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2601 else:
2602 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2603
2604 return self._vnfrs[vnfr.id]
2605
2606 @asyncio.coroutine
2607 def delete_vnfr(self, xact, vnfr):
2608 """ Create a VNFR instance """
2609 if vnfr.vnfr_id in self._vnfrs:
2610 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2611 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2612
2613 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2614 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2615 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2616
2617 del self._vnfrs[vnfr.vnfr_id]
2618
2619 @asyncio.coroutine
2620 def fetch_vnfd(self, vnfd_id):
2621 """ Fetch VNFDs based with the vnfd id"""
2622 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2623 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2624 vnfd = None
2625
2626 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2627
2628 for ent in res_iter:
2629 res = yield from ent
2630 vnfd = res.result
2631
2632 if vnfd is None:
2633 err = "Failed to get Vnfd %s" % vnfd_id
2634 self._log.error(err)
2635 raise VnfRecordError(err)
2636
2637 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2638
2639 return vnfd
2640
2641 def vnfd_in_use(self, vnfd_id):
2642 """ Is this VNFD in use """
2643 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2644 if vnfd_id in self._vnfds_to_vnfr:
2645 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2646 return False
2647
2648 @asyncio.coroutine
2649 def publish_vnfr(self, xact, path, msg):
2650 """ Publish a VNFR """
2651 self._log.debug("publish_vnfr called with path %s, msg %s",
2652 path, msg)
2653 yield from self.vnfr_handler.update(xact, path, msg)
2654
2655 @asyncio.coroutine
2656 def delete_vnfd(self, vnfd_id):
2657 """ Delete the Virtual Network Function descriptor with the passed id """
2658 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2659 if vnfd_id in self._vnfds_to_vnfr:
2660 if self._vnfds_to_vnfr[vnfd_id]:
2661 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2662 vnfd_id,
2663 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2664 raise VirtualNetworkFunctionDescriptorRefCountExists(
2665 "Cannot delete :%s, ref_count:%s",
2666 vnfd_id,
2667 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2668
2669 del self._vnfds_to_vnfr[vnfd_id]
2670
2671 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2672 try:
2673 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2674 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2675 if os.path.exists(vnfd_dir):
2676 shutil.rmtree(vnfd_dir, ignore_errors=True)
2677 except Exception as e:
2678 self._log.error("Exception in cleaning up VNFD {}: {}".
2679 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2680 self._log.exception(e)
2681
2682
2683 def vnfd_refcount_xpath(self, vnfd_id):
2684 """ xpath for ref count entry """
2685 return (VnfdRefCountDtsHandler.XPATH +
2686 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2687
2688 @asyncio.coroutine
2689 def get_vnfd_refcount(self, vnfd_id):
2690 """ Get the vnfd_list from this VNFM"""
2691 vnfd_list = []
2692 if vnfd_id is None or vnfd_id == "":
2693 for vnfd in self._vnfds_to_vnfr.keys():
2694 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2695 vnfd_msg.vnfd_id_ref = vnfd
2696 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2697 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2698 elif vnfd_id in self._vnfds_to_vnfr:
2699 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2700 vnfd_msg.vnfd_id_ref = vnfd_id
2701 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2702 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2703
2704 return vnfd_list
2705
2706
2707 class VnfmTasklet(rift.tasklets.Tasklet):
2708 """ VNF Manager tasklet class """
2709 def __init__(self, *args, **kwargs):
2710 super(VnfmTasklet, self).__init__(*args, **kwargs)
2711 self.rwlog.set_category("rw-mano-log")
2712 self.rwlog.set_subcategory("vnfm")
2713
2714 self._dts = None
2715 self._vnfm = None
2716
2717 def start(self):
2718 try:
2719 super(VnfmTasklet, self).start()
2720 self.log.info("Starting VnfmTasklet")
2721
2722 self.log.setLevel(logging.DEBUG)
2723
2724 self.log.debug("Registering with dts")
2725 self._dts = rift.tasklets.DTS(self.tasklet_info,
2726 RwVnfmYang.get_schema(),
2727 self.loop,
2728 self.on_dts_state_change)
2729
2730 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2731 except Exception:
2732 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2733 raise
2734
2735 def on_instance_started(self):
2736 """ Task insance started callback """
2737 self.log.debug("Got instance started callback")
2738
2739 def stop(self):
2740 try:
2741 self._dts.deinit()
2742 except Exception:
2743 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2744 raise
2745
2746 @asyncio.coroutine
2747 def init(self):
2748 """ Task init callback """
2749 try:
2750 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2751 assert vm_parent_name is not None
2752 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2753 yield from self._vnfm.run()
2754 except Exception:
2755 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2756 raise
2757
2758 @asyncio.coroutine
2759 def run(self):
2760 """ Task run callback """
2761 pass
2762
2763 @asyncio.coroutine
2764 def on_dts_state_change(self, state):
2765 """Take action according to current dts state to transition
2766 application into the corresponding application state
2767
2768 Arguments
2769 state - current dts state
2770 """
2771 switch = {
2772 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2773 rwdts.State.CONFIG: rwdts.State.RUN,
2774 }
2775
2776 handlers = {
2777 rwdts.State.INIT: self.init,
2778 rwdts.State.RUN: self.run,
2779 }
2780
2781 # Transition application to next state
2782 handler = handlers.get(state, None)
2783 if handler is not None:
2784 yield from handler()
2785
2786 # Transition dts to next state
2787 next_state = switch.get(state, None)
2788 if next_state is not None:
2789 self._dts.handle.set_state(next_state)