Merge "Bug 140"
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 vdurvol_data[0]['custom_meta_data'] = metadata_list
429
430 if self._vm_resp.has_field('supplemental_boot_data'):
431 vdur_dict['supplemental_boot_data'] = dict()
432 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
433 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
434 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
435 metadata_list = list()
436 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
437 metadata_list.append(metadata_item.as_dict())
438 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
440 file_list = list()
441 for file_item in self._vm_resp.supplemental_boot_data.config_file:
442 file_list.append(file_item.as_dict())
443 vdur_dict['supplemental_boot_data']['config_file'] = file_list
444
445 icp_list = []
446 ii_list = []
447
448 for intf, cp_id, vlr in self._int_intf:
449 cp = self.find_internal_cp_by_cp_id(cp_id)
450
451 icp_list.append({"name": cp.name,
452 "id": cp.id,
453 "type_yang": "VPORT",
454 "ip_address": self.cp_ip_addr(cp.id),
455 "mac_address": self.cp_mac_addr(cp.id)})
456
457 ii_list.append({"name": intf.name,
458 "vdur_internal_connection_point_ref": cp.id,
459 "virtual_interface": {}})
460
461 vdur_dict["internal_connection_point"] = icp_list
462 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
463 vdur_dict["internal_interface"] = ii_list
464
465 ei_list = []
466 for intf, cp, vlr in self._ext_intf:
467 ei_list.append({"name": cp.name,
468 "vnfd_connection_point_ref": cp.name,
469 "virtual_interface": {}})
470 self._vnfr.update_cp(cp.name,
471 self.cp_ip_addr(cp.name),
472 self.cp_mac_addr(cp.name),
473 self.cp_id(cp.name))
474
475 vdur_dict["external_interface"] = ei_list
476
477 placement_groups = []
478 for group in self._placement_groups:
479 placement_groups.append(group.as_dict())
480 vdur_dict['placement_groups_info'] = placement_groups
481
482 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
483
484 @property
485 def resmgr_path(self):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
488 "/vdu-event" +
489 "/vdu-event-data[event-id='{}']".format(self._request_id))
490
491 @property
492 def vm_flavor_msg(self):
493 """ VM flavor message """
494 flavor = self._vdud.vm_flavor.__class__()
495 flavor.copy_from(self._vdud.vm_flavor)
496
497 return flavor
498
499 @property
500 def vdud_cloud_init(self):
501 """ Return the cloud-init contents for the VDU """
502 if self._vdud_cloud_init is None:
503 self._vdud_cloud_init = self.cloud_init()
504
505 return self._vdud_cloud_init
506
507 def cloud_init(self):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
510 """
511 if self._vdud.cloud_init is not None:
512 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
513 return self._vdud.cloud_init
514 elif self._vdud.cloud_init_file is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
517 filename = self._vdud.cloud_init_file
518 self._vnfd_package_store.refresh()
519 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
520 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
521 try:
522 return cloud_init_extractor.read_script(stored_package, filename)
523 except rift.package.cloud_init.CloudInitExtractionError as e:
524 raise VirtualDeploymentUnitRecordError(e)
525 else:
526 self._log.debug("VDU Instantiation: cloud-init script not provided")
527
528 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
529 host_aggregates = []
530 availability_zones = []
531 server_groups = []
532 for group in self._placement_groups:
533 if group.has_field('host_aggregate'):
534 for aggregate in group.host_aggregate:
535 host_aggregates.append(aggregate.as_dict())
536 if group.has_field('availability_zone'):
537 availability_zones.append(group.availability_zone.as_dict())
538 if group.has_field('server_group'):
539 server_groups.append(group.server_group.as_dict())
540
541 if availability_zones:
542 if len(availability_zones) > 1:
543 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
544 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
545 else:
546 vm_create_msg_dict['availability_zone'] = availability_zones[0]
547
548 if server_groups:
549 if len(server_groups) > 1:
550 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
551 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
552 else:
553 vm_create_msg_dict['server_group'] = server_groups[0]
554
555 if host_aggregates:
556 vm_create_msg_dict['host_aggregate'] = host_aggregates
557
558 return
559
560 def process_placement_groups(self, vm_create_msg_dict):
561 """Process the placement_groups and fill resource-mgr request"""
562 if not self._placement_groups:
563 return
564
565 cloud_set = set([group.cloud_type for group in self._placement_groups])
566 assert len(cloud_set) == 1
567 cloud_type = cloud_set.pop()
568
569 if cloud_type == 'openstack':
570 self.process_openstack_placement_group_construct(vm_create_msg_dict)
571
572 else:
573 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
574 return
575
576 def process_custom_bootdata(self, vm_create_msg_dict):
577 """Process the custom boot data"""
578 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
579 return
580
581 self._vnfd_package_store.refresh()
582 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
583 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
584 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
585 if 'source' not in file_item or 'dest' not in file_item:
586 continue
587 source = file_item['source']
588 # Find source file in scripts dir of VNFD
589 self._log.debug("Checking for source config file at %s", source)
590 try:
591 source_file_str = cloud_init_extractor.read_script(stored_package, source)
592 except rift.package.cloud_init.CloudInitExtractionError as e:
593 raise VirtualDeploymentUnitRecordError(e)
594 # Update source file location with file contents
595 file_item['source'] = source_file_str
596
597 return
598
599 def resmgr_msg(self, config=None):
600 vdu_fields = ["vm_flavor",
601 "guest_epa",
602 "vswitch_epa",
603 "hypervisor_epa",
604 "host_epa",
605 "volumes",
606 "supplemental_boot_data"]
607
608 self._log.debug("Creating params based on VDUD: %s", self._vdud)
609 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
610
611 vm_create_msg_dict = {
612 "name": self.name,
613 }
614
615 if self.image_name is not None:
616 vm_create_msg_dict["image_name"] = self.image_name
617
618 if self.image_checksum is not None:
619 vm_create_msg_dict["image_checksum"] = self.image_checksum
620
621 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
622 if self._vdud.has_field('mgmt_vpci'):
623 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
624
625 self._log.debug("VDUD: %s", self._vdud)
626 if config is not None:
627 vm_create_msg_dict['vdu_init'] = {'userdata': config}
628
629 if self._mgmt_network:
630 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
631
632 cp_list = []
633 for intf, cp, vlr in self._ext_intf:
634 cp_info = {"name": cp.name,
635 "virtual_link_id": vlr.network_id,
636 "type_yang": intf.virtual_interface.type_yang,
637 "port_security_enabled": cp.port_security_enabled}
638
639 if (intf.virtual_interface.has_field('vpci') and
640 intf.virtual_interface.vpci is not None):
641 cp_info["vpci"] = intf.virtual_interface.vpci
642
643 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
644 cp_info['security_group'] = vlr.ip_profile_params.security_group
645
646 cp_list.append(cp_info)
647
648 for intf, cp, vlr in self._int_intf:
649 if (intf.virtual_interface.has_field('vpci') and
650 intf.virtual_interface.vpci is not None):
651 cp_list.append({"name": cp,
652 "virtual_link_id": vlr.network_id,
653 "type_yang": intf.virtual_interface.type_yang,
654 "vpci": intf.virtual_interface.vpci})
655 else:
656 cp_list.append({"name": cp,
657 "virtual_link_id": vlr.network_id,
658 "type_yang": intf.virtual_interface.type_yang,
659 "port_security_enabled": cp.port_security_enabled})
660
661 vm_create_msg_dict["connection_points"] = cp_list
662 vm_create_msg_dict.update(vdu_copy_dict)
663
664 self.process_placement_groups(vm_create_msg_dict)
665 if 'supplemental_boot_data' in vm_create_msg_dict:
666 self.process_custom_bootdata(vm_create_msg_dict)
667
668 msg = RwResourceMgrYang.VDUEventData()
669 msg.event_id = self._request_id
670 msg.cloud_account = self.cloud_account_name
671 msg.request_info.from_dict(vm_create_msg_dict)
672
673 return msg
674
675 @asyncio.coroutine
676 def terminate(self, xact):
677 """ Delete resource in VIM """
678 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
679 self._log.warning("VDU terminate in not ready state - Ignoring request")
680 return
681
682 self._state = VDURecordState.TERMINATING
683 if self._vm_resp is not None:
684 try:
685 with self._dts.transaction() as new_xact:
686 yield from self.delete_resource(new_xact)
687 except Exception:
688 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
689
690 if self._rm_regh is not None:
691 self._log.debug("Deregistering resource manager registration handle")
692 self._rm_regh.deregister()
693 self._rm_regh = None
694
695 if self._vdur_console_handler is not None:
696 self._log.error("Deregistering vnfr vdur registration handle")
697 self._vdur_console_handler._regh.deregister()
698 self._vdur_console_handler._regh = None
699
700 self._state = VDURecordState.TERMINATED
701
702 def find_internal_cp_by_cp_id(self, cp_id):
703 """ Find the CP corresponding to the connection point id"""
704 cp = None
705
706 self._log.debug("find_internal_cp_by_cp_id(%s) called",
707 cp_id)
708
709 for int_cp in self._vdud.internal_connection_point:
710 self._log.debug("Checking for int cp %s in internal connection points",
711 int_cp.id)
712 if int_cp.id == cp_id:
713 cp = int_cp
714 break
715
716 if cp is None:
717 self._log.debug("Failed to find cp %s in internal connection points",
718 cp_id)
719 msg = "Failed to find cp %s in internal connection points" % cp_id
720 raise VduRecordError(msg)
721
722 # return the VLR associated with the connection point
723 return cp
724
725 @asyncio.coroutine
726 def create_resource(self, xact, vnfr, config=None):
727 """ Request resource from ResourceMgr """
728 def find_cp_by_name(cp_name):
729 """ Find a connection point by name """
730 cp = None
731 self._log.debug("find_cp_by_name(%s) called", cp_name)
732 for ext_cp in vnfr._cprs:
733 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
734 if ext_cp.name == cp_name:
735 cp = ext_cp
736 break
737 if cp is None:
738 self._log.debug("Failed to find cp %s in external connection points",
739 cp_name)
740 return cp
741
742 def find_internal_vlr_by_cp_name(cp_name):
743 """ Find the VLR corresponding to the connection point name"""
744 cp = None
745
746 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
747 cp_name)
748
749 for int_cp in self._vdud.internal_connection_point:
750 self._log.debug("Checking for int cp %s in internal connection points",
751 int_cp.id)
752 if int_cp.id == cp_name:
753 cp = int_cp
754 break
755
756 if cp is None:
757 self._log.debug("Failed to find cp %s in internal connection points",
758 cp_name)
759 msg = "Failed to find cp %s in internal connection points" % cp_name
760 raise VduRecordError(msg)
761
762 # return the VLR associated with the connection point
763 return vnfr.find_vlr_by_cp(cp_name)
764
765 block = xact.block_create()
766
767 self._log.debug("Executing vm request id: %s, action: create",
768 self._request_id)
769
770 # Resolve the networks associated external interfaces
771 for ext_intf in self._vdud.external_interface:
772 self._log.debug("Resolving external interface name [%s], cp[%s]",
773 ext_intf.name, ext_intf.vnfd_connection_point_ref)
774 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
775 if cp is None:
776 self._log.debug("Failed to find connection point - %s",
777 ext_intf.vnfd_connection_point_ref)
778 continue
779 self._log.debug("Connection point name [%s], type[%s]",
780 cp.name, cp.type_yang)
781
782 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
783
784 etuple = (ext_intf, cp, vlr)
785 self._ext_intf.append(etuple)
786
787 self._log.debug("Created external interface tuple : %s", etuple)
788
789 # Resolve the networks associated internal interfaces
790 for intf in self._vdud.internal_interface:
791 cp_id = intf.vdu_internal_connection_point_ref
792 self._log.debug("Resolving internal interface name [%s], cp[%s]",
793 intf.name, cp_id)
794
795 try:
796 vlr = find_internal_vlr_by_cp_name(cp_id)
797 except Exception as e:
798 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
799 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
800 raise VduRecordError(msg)
801
802 ituple = (intf, cp_id, vlr)
803 self._int_intf.append(ituple)
804
805 self._log.debug("Created internal interface tuple : %s", ituple)
806
807 resmgr_path = self.resmgr_path
808 resmgr_msg = self.resmgr_msg(config)
809
810 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
811 block.add_query_create(resmgr_path, resmgr_msg)
812
813 res_iter = yield from block.execute(now=True)
814
815 resp = None
816
817 for i in res_iter:
818 r = yield from i
819 resp = r.result
820
821 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
822 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
823 self._log.debug("Got vm request response: %s", resp.resource_info)
824 return resp.resource_info
825
826 @asyncio.coroutine
827 def delete_resource(self, xact):
828 block = xact.block_create()
829
830 self._log.debug("Executing vm request id: %s, action: delete",
831 self._request_id)
832
833 block.add_query_delete(self.resmgr_path)
834
835 yield from block.execute(flags=0, now=True)
836
837 @asyncio.coroutine
838 def read_resource(self, xact):
839 block = xact.block_create()
840
841 self._log.debug("Executing vm request id: %s, action: delete",
842 self._request_id)
843
844 block.add_query_read(self.resmgr_path)
845
846 res_iter = yield from block.execute(flags=0, now=True)
847 for i in res_iter:
848 r = yield from i
849 resp = r.result
850
851 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
852 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
853 self._log.debug("Got vm request response: %s", resp.resource_info)
854 #self._vm_resp = resp.resource_info
855 return resp.resource_info
856
857
858 @asyncio.coroutine
859 def start_component(self):
860 """ This VDUR is active """
861 self._log.debug("Starting component %s for vdud %s vdur %s",
862 self._vdud.vcs_component_ref,
863 self._vdud,
864 self._vdur_id)
865 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
866 self.vm_resp.management_ip)
867
868 @property
869 def active(self):
870 """ Is this VDU active """
871 return True if self._state is VDURecordState.READY else False
872
873 @asyncio.coroutine
874 def instantiation_failed(self, failed_reason=None):
875 """ VDU instantiation failed """
876 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
877 self._state = VDURecordState.FAILED
878 self._state_failed_reason = failed_reason
879 yield from self._vnfr.instantiation_failed(failed_reason)
880
881 @asyncio.coroutine
882 def vdu_is_active(self):
883 """ This VDU is active"""
884 if self.active:
885 self._log.warning("VDU %s was already marked as active", self._vdur_id)
886 return
887
888 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
889
890 if self._vdud.vcs_component_ref is not None:
891 yield from self.start_component()
892
893 self._state = VDURecordState.READY
894
895 if self._vnfr.all_vdus_active():
896 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
897 yield from self._vnfr.is_ready()
898
899 @asyncio.coroutine
900 def instantiate(self, xact, vnfr, config=None):
901 """ Instantiate this VDU """
902 self._state = VDURecordState.INSTANTIATING
903
904 @asyncio.coroutine
905 def on_prepare(xact_info, query_action, ks_path, msg):
906 """ This VDUR is active """
907 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
908 query_action,
909 ks_path,
910 msg)
911
912 if (query_action == rwdts.QueryAction.UPDATE or
913 query_action == rwdts.QueryAction.CREATE):
914 self._vm_resp = msg
915
916 if msg.resource_state == "active":
917 # Move this VDU to ready state
918 yield from self.vdu_is_active()
919 elif msg.resource_state == "failed":
920 yield from self.instantiation_failed(msg.resource_errors)
921 elif query_action == rwdts.QueryAction.DELETE:
922 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
923 else:
924 raise NotImplementedError(
925 "%s action on VirtualDeployementUnitRecord not supported",
926 query_action)
927
928 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
929
930 try:
931 reg_event = asyncio.Event(loop=self._loop)
932
933 @asyncio.coroutine
934 def on_ready(regh, status):
935 reg_event.set()
936
937 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
938 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
939 flags=rwdts.Flag.SUBSCRIBER,
940 handler=handler)
941 yield from reg_event.wait()
942
943 vm_resp = yield from self.create_resource(xact, vnfr, config)
944 self._vm_resp = vm_resp
945
946 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
947 self._log.debug("Requested VM from resource manager response %s",
948 vm_resp)
949 if vm_resp.resource_state == "active":
950 self._log.debug("Resourcemgr responded wih an active vm resp %s",
951 vm_resp)
952 yield from self.vdu_is_active()
953 self._state = VDURecordState.READY
954 elif (vm_resp.resource_state == "pending" or
955 vm_resp.resource_state == "inactive"):
956 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
957 vm_resp)
958 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
959 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
960 # flags=rwdts.Flag.SUBSCRIBER,
961 # handler=handler)
962 else:
963 self._log.debug("Resourcemgr responded wih an error vm resp %s",
964 vm_resp)
965 raise VirtualDeploymentUnitRecordError(
966 "Failed VDUR instantiation %s " % vm_resp)
967
968 except Exception as e:
969 import traceback
970 traceback.print_exc()
971 self._log.exception(e)
972 self._log.error("Instantiation of VDU record failed: %s", str(e))
973 self._state = VDURecordState.FAILED
974 yield from self.instantiation_failed(str(e))
975
976
977 class VlRecordState(enum.Enum):
978 """ VL Record State """
979 INIT = 101
980 INSTANTIATION_PENDING = 102
981 ACTIVE = 103
982 TERMINATE_PENDING = 104
983 TERMINATED = 105
984 FAILED = 106
985
986
987 class InternalVirtualLinkRecord(object):
988 """ Internal Virtual Link record """
989 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
990 self._dts = dts
991 self._log = log
992 self._loop = loop
993 self._ivld_msg = ivld_msg
994 self._vnfr_name = vnfr_name
995 self._cloud_account_name = cloud_account_name
996 self._ip_profile = ip_profile
997
998 self._vlr_req = self.create_vlr()
999 self._vlr = None
1000 self._state = VlRecordState.INIT
1001
1002 @property
1003 def vlr_id(self):
1004 """ Find VLR by id """
1005 return self._vlr_req.id
1006
1007 @property
1008 def name(self):
1009 """ Name of this VL """
1010 if self._ivld_msg.vim_network_name:
1011 return self._ivld_msg.vim_network_name
1012 else:
1013 return self._vnfr_name + "." + self._ivld_msg.name
1014
1015 @property
1016 def network_id(self):
1017 """ Find VLR by id """
1018 return self._vlr.network_id if self._vlr else None
1019
1020 def vlr_path(self):
1021 """ VLR path for this VLR instance"""
1022 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1023
1024 def create_vlr(self):
1025 """ Create the VLR record which will be instantiated """
1026
1027 vld_fields = ["short_name",
1028 "vendor",
1029 "description",
1030 "version",
1031 "type_yang",
1032 "vim_network_name",
1033 "provider_network"]
1034
1035 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1036
1037 vlr_dict = {"id": str(uuid.uuid4()),
1038 "name": self.name,
1039 "cloud_account": self._cloud_account_name,
1040 }
1041
1042 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1043 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1044
1045 vlr_dict.update(vld_copy_dict)
1046
1047 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1048 return vlr
1049
1050 @asyncio.coroutine
1051 def instantiate(self, xact, restart_mode=False):
1052 """ Instantiate VL """
1053
1054 @asyncio.coroutine
1055 def instantiate_vlr():
1056 """ Instantiate VLR"""
1057 self._log.debug("Create VL with xpath %s and vlr %s",
1058 self.vlr_path(), self._vlr_req)
1059
1060 with self._dts.transaction(flags=0) as xact:
1061 block = xact.block_create()
1062 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1063 self._log.debug("Executing VL create path:%s msg:%s",
1064 self.vlr_path(), self._vlr_req)
1065
1066 res_iter = None
1067 try:
1068 res_iter = yield from block.execute()
1069 except Exception:
1070 self._state = VlRecordState.FAILED
1071 self._log.exception("Caught exception while instantial VL")
1072 raise
1073
1074 for ent in res_iter:
1075 res = yield from ent
1076 self._vlr = res.result
1077
1078 if self._vlr.operational_status == 'failed':
1079 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1080 self._state = VlRecordState.FAILED
1081 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1082
1083 self._log.info("Created VL with xpath %s and vlr %s",
1084 self.vlr_path(), self._vlr)
1085
1086 @asyncio.coroutine
1087 def get_vlr():
1088 """ Get the network id """
1089 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1090 vlr = None
1091 for ent in res_iter:
1092 res = yield from ent
1093 vlr = res.result
1094
1095 if vlr is None:
1096 err = "Failed to get VLR for path %s" % self.vlr_path()
1097 self._log.warn(err)
1098 raise InternalVirtualLinkRecordError(err)
1099 return vlr
1100
1101 self._state = VlRecordState.INSTANTIATION_PENDING
1102
1103 if restart_mode:
1104 vl = yield from get_vlr()
1105 if vl is None:
1106 yield from instantiate_vlr()
1107 else:
1108 yield from instantiate_vlr()
1109
1110 self._state = VlRecordState.ACTIVE
1111
1112 def vlr_in_vns(self):
1113 """ Is there a VLR record in VNS """
1114 if (self._state == VlRecordState.ACTIVE or
1115 self._state == VlRecordState.INSTANTIATION_PENDING or
1116 self._state == VlRecordState.FAILED):
1117 return True
1118
1119 return False
1120
1121 @asyncio.coroutine
1122 def terminate(self, xact):
1123 """Terminate this VL """
1124 if not self.vlr_in_vns():
1125 self._log.debug("Ignoring terminate request for id %s in state %s",
1126 self.vlr_id, self._state)
1127 return
1128
1129 self._log.debug("Terminating VL with path %s", self.vlr_path())
1130 self._state = VlRecordState.TERMINATE_PENDING
1131 block = xact.block_create()
1132 block.add_query_delete(self.vlr_path())
1133 yield from block.execute(flags=0, now=True)
1134 self._state = VlRecordState.TERMINATED
1135 self._log.debug("Terminated VL with path %s", self.vlr_path())
1136
1137
1138 class VirtualNetworkFunctionRecord(object):
1139 """ Virtual Network Function Record """
1140 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1141 self._dts = dts
1142 self._log = log
1143 self._loop = loop
1144 self._cluster_name = cluster_name
1145 self._vnfr_msg = vnfr_msg
1146 self._vnfr_id = vnfr_msg.id
1147 self._vnfd_id = vnfr_msg.vnfd.id
1148 self._vnfm = vnfm
1149 self._vcs_handler = vcs_handler
1150 self._vnfr = vnfr_msg
1151 self._mgmt_network = mgmt_network
1152
1153 self._vnfd = vnfr_msg.vnfd
1154 self._state = VirtualNetworkFunctionRecordState.INIT
1155 self._state_failed_reason = None
1156 self._ext_vlrs = {} # The list of external virtual links
1157 self._vlrs = [] # The list of internal virtual links
1158 self._vdus = [] # The list of vdu
1159 self._vlr_by_cp = {}
1160 self._cprs = []
1161 self._inventory = {}
1162 self._create_time = int(time.time())
1163 self._vnf_mon = None
1164 self._config_status = vnfr_msg.config_status
1165 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1166 self._rw_vnfd = None
1167 self._vnfd_ref_count = 0
1168
1169 def _get_vdur_from_vdu_id(self, vdu_id):
1170 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1171 self._log.debug("Searching through vdus: %s", self._vdus)
1172 for vdu in self._vdus:
1173 self._log.debug("vdu_id: %s", vdu.vdu_id)
1174 if vdu.vdu_id == vdu_id:
1175 return vdu
1176
1177 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1178
1179 @property
1180 def operational_status(self):
1181 """ Operational status of this VNFR """
1182 op_status_map = {"INIT": "init",
1183 "VL_INIT_PHASE": "vl_init_phase",
1184 "VM_INIT_PHASE": "vm_init_phase",
1185 "READY": "running",
1186 "TERMINATE": "terminate",
1187 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1188 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1189 "TERMINATED": "terminated",
1190 "FAILED": "failed", }
1191 return op_status_map[self._state.name]
1192
1193 @staticmethod
1194 def vnfd_xpath(vnfd_id):
1195 """ VNFD xpath associated with this VNFR """
1196 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1197
1198 @property
1199 def vnfd_ref_count(self):
1200 """ Returns the VNFD reference count associated with this VNFR """
1201 return self._vnfd_ref_count
1202
1203 def vnfd_in_use(self):
1204 """ Returns whether vnfd is in use or not """
1205 return True if self._vnfd_ref_count > 0 else False
1206
1207 def vnfd_ref(self):
1208 """ Take a reference on this object """
1209 self._vnfd_ref_count += 1
1210 return self._vnfd_ref_count
1211
1212 def vnfd_unref(self):
1213 """ Release reference on this object """
1214 if self._vnfd_ref_count < 1:
1215 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1216 (self.vnfd.id, self._vnfd_ref_count))
1217 self._log.critical(msg)
1218 raise VnfRecordError(msg)
1219 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1220 self.vnfd.id, self._vnfd_ref_count)
1221 self._vnfd_ref_count -= 1
1222 return self._vnfd_ref_count
1223
1224 @property
1225 def vnfd(self):
1226 """ VNFD for this VNFR """
1227 return self._vnfd
1228
1229 @property
1230 def vnf_name(self):
1231 """ VNFD name associated with this VNFR """
1232 return self.vnfd.name
1233
1234 @property
1235 def name(self):
1236 """ Name of this VNF in the record """
1237 return self._vnfr.name
1238
1239 @property
1240 def cloud_account_name(self):
1241 """ Name of the cloud account this VNFR is instantiated in """
1242 return self._vnfr.cloud_account
1243
1244 @property
1245 def vnfd_id(self):
1246 """ VNFD Id associated with this VNFR """
1247 return self.vnfd.id
1248
1249 @property
1250 def vnfr_id(self):
1251 """ VNFR Id associated with this VNFR """
1252 return self._vnfr_id
1253
1254 @property
1255 def member_vnf_index(self):
1256 """ Member VNF index associated with this VNFR """
1257 return self._vnfr.member_vnf_index_ref
1258
1259 @property
1260 def config_status(self):
1261 """ Config agent status for this VNFR """
1262 return self._config_status
1263
1264 def component_by_name(self, component_name):
1265 """ Find a component by name in the inventory list"""
1266 mangled_name = VcsComponent.mangle_name(component_name,
1267 self.vnf_name,
1268 self.vnfd_id)
1269 return self._inventory[mangled_name]
1270
1271
1272
1273 @asyncio.coroutine
1274 def get_nsr_config(self):
1275 ### Need access to NS instance configuration for runtime resolution.
1276 ### This shall be replaced when deployment flavors are implemented
1277 xpath = "C,/nsr:ns-instance-config"
1278 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1279
1280 for result in results:
1281 entry = yield from result
1282 ns_instance_config = entry.result
1283 for nsr in ns_instance_config.nsr:
1284 if nsr.id == self._vnfr_msg.nsr_id_ref:
1285 return nsr
1286 return None
1287
1288 @asyncio.coroutine
1289 def start_component(self, component_name, ip_addr):
1290 """ Start a component in the VNFR by name """
1291 comp = self.component_by_name(component_name)
1292 yield from comp.start(None, None, ip_addr)
1293
1294 def cp_ip_addr(self, cp_name):
1295 """ Get ip address for connection point """
1296 self._log.debug("cp_ip_addr()")
1297 for cp in self._cprs:
1298 if cp.name == cp_name and cp.ip_address is not None:
1299 return cp.ip_address
1300 return "0.0.0.0"
1301
1302 def mgmt_intf_info(self):
1303 """ Get Management interface info for this VNFR """
1304 mgmt_intf_desc = self.vnfd.mgmt_interface
1305 ip_addr = None
1306 if mgmt_intf_desc.has_field("cp"):
1307 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1308 elif mgmt_intf_desc.has_field("vdu_id"):
1309 try:
1310 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1311 ip_addr = vdur.management_ip
1312 except VDURecordNotFound:
1313 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1314 ip_addr = None
1315 else:
1316 ip_addr = mgmt_intf_desc.ip_address
1317 port = mgmt_intf_desc.port
1318
1319 return ip_addr, port
1320
1321 @property
1322 def msg(self):
1323 """ Message associated with this VNFR """
1324 vnfd_fields = ["short_name", "vendor", "description", "version"]
1325 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1326
1327 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1328 ip_address, port = self.mgmt_intf_info()
1329
1330 if ip_address is not None:
1331 mgmt_intf.ip_address = ip_address
1332 if port is not None:
1333 mgmt_intf.port = port
1334
1335 vnfr_dict = {"id": self._vnfr_id,
1336 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1337 "name": self.name,
1338 "member_vnf_index_ref": self.member_vnf_index,
1339 "operational_status": self.operational_status,
1340 "operational_status_details": self._state_failed_reason,
1341 "cloud_account": self.cloud_account_name,
1342 "config_status": self._config_status
1343 }
1344
1345 vnfr_dict.update(vnfd_copy_dict)
1346
1347 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1348 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1349
1350 vnfr_msg.create_time = self._create_time
1351 vnfr_msg.uptime = int(time.time()) - self._create_time
1352 vnfr_msg.mgmt_interface = mgmt_intf
1353
1354 # Add all the VLRs to VNFR
1355 for vlr in self._vlrs:
1356 ivlr = vnfr_msg.internal_vlr.add()
1357 ivlr.vlr_ref = vlr.vlr_id
1358
1359 # Add all the VDURs to VDUR
1360 if self._vdus is not None:
1361 for vdu in self._vdus:
1362 vdur = vnfr_msg.vdur.add()
1363 vdur.from_dict(vdu.msg.as_dict())
1364
1365 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1366 vnfr_msg.dashboard_url = self.dashboard_url
1367
1368 for cpr in self._cprs:
1369 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1370 vnfr_msg.connection_point.append(new_cp)
1371
1372 if self._vnf_mon is not None:
1373 for monp in self._vnf_mon.msg:
1374 vnfr_msg.monitoring_param.append(
1375 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1376
1377 if self._vnfr.vnf_configuration is not None:
1378 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1379 if (ip_address is not None and
1380 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1381 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1382
1383 for group in self._vnfr_msg.placement_groups_info:
1384 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1385 group_info.from_dict(group.as_dict())
1386 vnfr_msg.placement_groups_info.append(group_info)
1387
1388 return vnfr_msg
1389
1390 @property
1391 def dashboard_url(self):
1392 ip, cfg_port = self.mgmt_intf_info()
1393 protocol = 'http'
1394 http_port = 80
1395 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1396 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1397 protocol = 'https'
1398 http_port = 443
1399 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1400 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1401
1402 url = "{protocol}://{ip_address}:{port}/{path}".format(
1403 protocol=protocol,
1404 ip_address=ip,
1405 port=http_port,
1406 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1407 )
1408
1409 return url
1410
1411 @property
1412 def xpath(self):
1413 """ path for this VNFR """
1414 return("D,/vnfr:vnfr-catalog"
1415 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1416
1417 @asyncio.coroutine
1418 def publish(self, xact):
1419 """ publish this VNFR """
1420 vnfr = self.msg
1421 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1422 self.xpath, self.msg)
1423 vnfr.create_time = self._create_time
1424 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1425 self._log.debug("Published VNFR path = [%s], record = [%s]",
1426 self.xpath, self.msg)
1427
1428 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1429 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1430 if not vld.has_field('ip_profile_ref'):
1431 return None
1432 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1433 return profile[0] if profile else None
1434
1435 @asyncio.coroutine
1436 def create_vls(self):
1437 """ Publish The VLs associated with this VNF """
1438 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1439 self.vnfd_id)
1440 for ivld_msg in self.vnfd.internal_vld:
1441 self._log.debug("Creating internal vld:"
1442 " %s, int_cp_ref = %s",
1443 ivld_msg, ivld_msg.internal_connection_point
1444 )
1445 vlr = InternalVirtualLinkRecord(dts=self._dts,
1446 log=self._log,
1447 loop=self._loop,
1448 ivld_msg=ivld_msg,
1449 vnfr_name=self.name,
1450 cloud_account_name=self.cloud_account_name,
1451 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1452 )
1453 self._vlrs.append(vlr)
1454
1455 for int_cp in ivld_msg.internal_connection_point:
1456 if int_cp.id_ref in self._vlr_by_cp:
1457 msg = ("Connection point %s already "
1458 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1459 raise InternalVirtualLinkRecordError(msg)
1460 self._log.debug("Setting vlr %s to internal cp = %s",
1461 vlr, int_cp.id_ref)
1462 self._vlr_by_cp[int_cp.id_ref] = vlr
1463
1464 @asyncio.coroutine
1465 def instantiate_vls(self, xact, restart_mode=False):
1466 """ Instantiate the VLs associated with this VNF """
1467 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1468 self.vnfd_id)
1469
1470 for vlr in self._vlrs:
1471 self._log.debug("Instantiating VLR %s", vlr)
1472 yield from vlr.instantiate(xact, restart_mode)
1473
1474 def find_vlr_by_cp(self, cp_name):
1475 """ Find the VLR associated with the cp name """
1476 return self._vlr_by_cp[cp_name]
1477
1478 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1479 """
1480 Returns the cloud specific construct for placement group
1481 Arguments:
1482 input_group: VNFD PlacementGroup
1483 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1484 """
1485 copy_dict = ['name', 'requirement', 'strategy']
1486 for group_info in nsr_config.vnfd_placement_group_maps:
1487 if group_info.placement_group_ref == input_group.name and \
1488 group_info.vnfd_id_ref == self.vnfd_id:
1489 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1490 group_dict = {k:v for k,v in
1491 group_info.as_dict().items()
1492 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1493 for param in copy_dict:
1494 group_dict.update({param: getattr(input_group, param)})
1495 group.from_dict(group_dict)
1496 return group
1497 return None
1498
1499 @asyncio.coroutine
1500 def get_vdu_placement_groups(self, vdu):
1501 placement_groups = []
1502 ### Step-1: Get VNF level placement groups
1503 for group in self._vnfr_msg.placement_groups_info:
1504 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1505 #group_info.from_dict(group.as_dict())
1506 placement_groups.append(group)
1507
1508 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1509 nsr_config = yield from self.get_nsr_config()
1510
1511 ### Step-3: Get VDU level placement groups
1512 for group in self.vnfd.placement_groups:
1513 for member_vdu in group.member_vdus:
1514 if member_vdu.member_vdu_ref == vdu.id:
1515 group_info = self.resolve_placement_group_cloud_construct(group,
1516 nsr_config)
1517 if group_info is None:
1518 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1519 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1520 else:
1521 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1522 str(group_info),
1523 vdu.name,
1524 self.vnf_name,
1525 self.member_vnf_index)
1526 placement_groups.append(group_info)
1527
1528 return placement_groups
1529
1530 @asyncio.coroutine
1531 def create_vdus(self, vnfr, restart_mode=False):
1532 """ Create the VDUs associated with this VNF """
1533
1534 def get_vdur_id(vdud):
1535 """Get the corresponding VDUR's id for the VDUD. This is useful in
1536 case of a restart.
1537
1538 In restart mode we check for exiting VDUR's ID and use them, if
1539 available. This way we don't end up creating duplicate VDURs
1540 """
1541 vdur_id = None
1542
1543 if restart_mode and vdud is not None:
1544 try:
1545 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1546 vdur_id = vdur[0]
1547 except IndexError:
1548 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1549
1550 return vdur_id
1551
1552
1553 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1554 for vdu in self._rw_vnfd.vdu:
1555 self._log.debug("Creating vdu: %s", vdu)
1556 vdur_id = get_vdur_id(vdu)
1557
1558 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1559 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1560 vdu.name,
1561 self.vnf_name,
1562 self.member_vnf_index,
1563 [ group.name for group in placement_groups])
1564
1565 vdur = VirtualDeploymentUnitRecord(
1566 dts=self._dts,
1567 log=self._log,
1568 loop=self._loop,
1569 vdud=vdu,
1570 vnfr=vnfr,
1571 mgmt_intf=self.has_mgmt_interface(vdu),
1572 mgmt_network=self._mgmt_network,
1573 cloud_account_name=self.cloud_account_name,
1574 vnfd_package_store=self._vnfd_package_store,
1575 vdur_id=vdur_id,
1576 placement_groups = placement_groups,
1577 )
1578 yield from vdur.vdu_opdata_register()
1579
1580 self._vdus.append(vdur)
1581
1582 @asyncio.coroutine
1583 def instantiate_vdus(self, xact, vnfr):
1584 """ Instantiate the VDUs associated with this VNF """
1585 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1586
1587 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1588
1589 # Identify any dependencies among the VDUs
1590 dependencies = collections.defaultdict(list)
1591 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1592
1593 for vdu in self._vdus:
1594 if vdu.vdud_cloud_init is not None:
1595 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1596 if vdu_id != vdu.vdu_id:
1597 # This means that vdu.vdu_id depends upon vdu_id,
1598 # i.e. vdu_id must be instantiated before
1599 # vdu.vdu_id.
1600 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1601
1602 # Define the terminal states of VDU instantiation
1603 terminal = (
1604 VDURecordState.READY,
1605 VDURecordState.TERMINATED,
1606 VDURecordState.FAILED,
1607 )
1608
1609 datastore = VdurDatastore()
1610 processed = set()
1611
1612 @asyncio.coroutine
1613 def instantiate_monitor(vdu):
1614 """Monitor the state of the VDU during instantiation
1615
1616 Arguments:
1617 vdu - a VirtualDeploymentUnitRecord
1618
1619 """
1620 # wait for the VDUR to enter a terminal state
1621 while vdu._state not in terminal:
1622 yield from asyncio.sleep(1, loop=self._loop)
1623
1624 # update the datastore
1625 datastore.update(vdu)
1626
1627 # add the VDU to the set of processed VDUs
1628 processed.add(vdu.vdu_id)
1629
1630 @asyncio.coroutine
1631 def instantiate(vdu):
1632 """Instantiate the specified VDU
1633
1634 Arguments:
1635 vdu - a VirtualDeploymentUnitRecord
1636
1637 Raises:
1638 if the VDU, or any of the VDUs this VDU depends upon, are
1639 terminated or fail to instantiate properly, a
1640 VirtualDeploymentUnitRecordError is raised.
1641
1642 """
1643 for dependency in dependencies[vdu.vdu_id]:
1644 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1645
1646 while dependency.vdu_id not in processed:
1647 yield from asyncio.sleep(1, loop=self._loop)
1648
1649 if not dependency.active:
1650 raise VirtualDeploymentUnitRecordError()
1651
1652 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1653
1654 # Populate the datastore with the current values of the VDU
1655 datastore.add(vdu)
1656
1657 # Substitute any variables contained in the cloud config script
1658 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1659
1660 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1661 if len(parts) > 1:
1662
1663 # Extract the variable names
1664 variables = list()
1665 for variable in parts[1::2]:
1666 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1667
1668 # Iterate of the variables and substitute values from the
1669 # datastore.
1670 for variable in variables:
1671
1672 # Handle a reference to a VDU by ID
1673 if variable.startswith('vdu['):
1674 value = datastore.get(variable)
1675 if value is None:
1676 msg = "Unable to find a substitute for {} in {} cloud-init script"
1677 raise ValueError(msg.format(variable, vdu.vdu_id))
1678
1679 config = config.replace("{{ %s }}" % variable, value)
1680 continue
1681
1682 # Handle a reference to the current VDU
1683 if variable.startswith('vdu'):
1684 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1685 config = config.replace("{{ %s }}" % variable, value)
1686 continue
1687
1688 # Handle unrecognized variables
1689 msg = 'unrecognized cloud-config variable: {}'
1690 raise ValueError(msg.format(variable))
1691
1692 # Instantiate the VDU
1693 with self._dts.transaction() as xact:
1694 self._log.debug("Instantiating vdu: %s", vdu)
1695 yield from vdu.instantiate(xact, vnfr, config=config)
1696 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1697 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1698 self.vnfr_id, vdu)
1699
1700 # First create a set of tasks to monitor the state of the VDUs and
1701 # report when they have entered a terminal state
1702 for vdu in self._vdus:
1703 self._loop.create_task(instantiate_monitor(vdu))
1704
1705 for vdu in self._vdus:
1706 self._loop.create_task(instantiate(vdu))
1707
1708 def has_mgmt_interface(self, vdu):
1709 # ## TODO: Support additional mgmt_interface type options
1710 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1711 return True
1712 return False
1713
1714 def vlr_xpath(self, vlr_id):
1715 """ vlr xpath """
1716 return(
1717 "D,/vlr:vlr-catalog/"
1718 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1719
1720 def ext_vlr_by_id(self, vlr_id):
1721 """ find ext vlr by id """
1722 return self._ext_vlrs[vlr_id]
1723
1724 @asyncio.coroutine
1725 def publish_inventory(self, xact):
1726 """ Publish the inventory associated with this VNF """
1727 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1728
1729 for component in self._rw_vnfd.component:
1730 self._log.debug("Creating inventory component %s", component)
1731 mangled_name = VcsComponent.mangle_name(component.component_name,
1732 self.vnf_name,
1733 self.vnfd_id
1734 )
1735 comp = VcsComponent(dts=self._dts,
1736 log=self._log,
1737 loop=self._loop,
1738 cluster_name=self._cluster_name,
1739 vcs_handler=self._vcs_handler,
1740 component=component,
1741 mangled_name=mangled_name,
1742 )
1743 if comp.name in self._inventory:
1744 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1745 component, self._vnfd_id)
1746 return
1747 self._log.debug("Adding component %s for vnrf %s",
1748 comp.name, self._vnfr_id)
1749 self._inventory[comp.name] = comp
1750 yield from comp.publish(xact)
1751
1752 def all_vdus_active(self):
1753 """ Are all VDUS in this VNFR active? """
1754 for vdu in self._vdus:
1755 if not vdu.active:
1756 return False
1757
1758 self._log.debug("Inside all_vdus_active. Returning True")
1759 return True
1760
1761 @asyncio.coroutine
1762 def instantiation_failed(self, failed_reason=None):
1763 """ VNFR instantiation failed """
1764 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1765 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1766 self._state_failed_reason = failed_reason
1767
1768 # Update the VNFR with the changed status
1769 yield from self.publish(None)
1770
1771 @asyncio.coroutine
1772 def is_ready(self):
1773 """ This VNF is ready"""
1774 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1775
1776 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1777 self.set_state(VirtualNetworkFunctionRecordState.READY)
1778
1779 else:
1780 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1781
1782 # Update the VNFR with the changed status
1783 yield from self.publish(None)
1784
1785 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1786 """Updated the connection point with ip address"""
1787 for cp in self._cprs:
1788 if cp.name == cp_name:
1789 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1790 cp_name, cp, ip_address, cp_id)
1791 cp.ip_address = ip_address
1792 cp.mac_address = mac_addr
1793 cp.connection_point_id = cp_id
1794 return
1795
1796 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1797 self._log.debug(err)
1798 raise VirtualDeploymentUnitRecordError(err)
1799
1800 def set_state(self, state):
1801 """ Set state for this VNFR"""
1802 self._state = state
1803
1804 @asyncio.coroutine
1805 def instantiate(self, xact, restart_mode=False):
1806 """ instantiate this VNF """
1807 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1808 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1809
1810 @asyncio.coroutine
1811 def fetch_vlrs():
1812 """ Fetch VLRs """
1813 # Iterate over all the connection points in VNFR and fetch the
1814 # associated VLRs
1815
1816 def cpr_from_cp(cp):
1817 """ Creates a record level connection point from the desciptor cp"""
1818 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1819 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1820 cpr_dict = {}
1821 cpr_dict.update(cp_copy_dict)
1822 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1823
1824 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1825 self._vnfr_id, self._vnfr.connection_point)
1826
1827 for cp in self._vnfr.connection_point:
1828 cpr = cpr_from_cp(cp)
1829 self._cprs.append(cpr)
1830 self._log.debug("Adding Connection point record %s ", cp)
1831
1832 vlr_path = self.vlr_xpath(cp.vlr_ref)
1833 self._log.debug("Fetching VLR with path = %s", vlr_path)
1834 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1835 rwdts.XactFlag.MERGE)
1836 for i in res_iter:
1837 r = yield from i
1838 d = r.result
1839 self._ext_vlrs[cp.vlr_ref] = d
1840 cpr.vlr_ref = cp.vlr_ref
1841 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1842
1843 # Increase the VNFD reference count
1844 self.vnfd_ref()
1845
1846 assert self.vnfd
1847
1848 # Fetch External VLRs
1849 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1850 yield from fetch_vlrs()
1851
1852 # Publish inventory
1853 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1854 yield from self.publish_inventory(xact)
1855
1856 # Publish inventory
1857 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1858 yield from self.create_vls()
1859
1860 # publish the VNFR
1861 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1862 yield from self.publish(xact)
1863
1864 # instantiate VLs
1865 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1866 try:
1867 yield from self.instantiate_vls(xact, restart_mode)
1868 except Exception as e:
1869 self._log.exception("VL instantiation failed (%s)", str(e))
1870 yield from self.instantiation_failed(str(e))
1871 return
1872
1873 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1874
1875 # instantiate VDUs
1876 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1877 yield from self.create_vdus(self, restart_mode)
1878
1879 # publish the VNFR
1880 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1881 yield from self.publish(xact)
1882
1883 # instantiate VDUs
1884 # ToDo: Check if this should be prevented during restart
1885 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1886 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1887
1888 # publish the VNFR
1889 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1890 yield from self.publish(xact)
1891
1892 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1893
1894 # create task updating uptime for this vnfr
1895 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1896 self._loop.create_task(self.vnfr_uptime_update(xact))
1897
1898 @asyncio.coroutine
1899 def terminate(self, xact):
1900 """ Terminate this virtual network function """
1901
1902 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1903
1904 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1905
1906 # stop monitoring
1907 if self._vnf_mon is not None:
1908 self._vnf_mon.stop()
1909 self._vnf_mon.deregister()
1910 self._vnf_mon = None
1911
1912 @asyncio.coroutine
1913 def terminate_vls():
1914 """ Terminate VLs in this VNF """
1915 for vl in self._vlrs:
1916 yield from vl.terminate(xact)
1917
1918 @asyncio.coroutine
1919 def terminate_vdus():
1920 """ Terminate VDUS in this VNF """
1921 for vdu in self._vdus:
1922 yield from vdu.terminate(xact)
1923
1924 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1925 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1926 yield from terminate_vls()
1927
1928 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1929 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1930 yield from terminate_vdus()
1931
1932 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1933 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1934
1935 @asyncio.coroutine
1936 def vnfr_uptime_update(self, xact):
1937 while True:
1938 # Return when vnfr state is FAILED or TERMINATED etc
1939 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1940 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1941 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1942 VirtualNetworkFunctionRecordState.READY]:
1943 return
1944 yield from self.publish(xact)
1945 yield from asyncio.sleep(2, loop=self._loop)
1946
1947
1948
1949 class VnfdDtsHandler(object):
1950 """ DTS handler for VNFD config changes """
1951 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1952
1953 def __init__(self, dts, log, loop, vnfm):
1954 self._dts = dts
1955 self._log = log
1956 self._loop = loop
1957 self._vnfm = vnfm
1958 self._regh = None
1959
1960 @asyncio.coroutine
1961 def regh(self):
1962 """ DTS registration handle """
1963 return self._regh
1964
1965 @asyncio.coroutine
1966 def register(self):
1967 """ Register for VNFD configuration"""
1968
1969 def on_apply(dts, acg, xact, action, scratch):
1970 """Apply the configuration"""
1971 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1972 xact, action, scratch)
1973
1974 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1975
1976 @asyncio.coroutine
1977 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1978 """ on prepare callback """
1979 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1980 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1981 fref = ProtobufC.FieldReference.alloc()
1982 fref.goto_whole_message(msg.to_pbcm())
1983
1984 # Handle deletes in prepare_callback
1985 if fref.is_field_deleted():
1986 # Delete an VNFD record
1987 self._log.debug("Deleting VNFD with id %s", msg.id)
1988 if self._vnfm.vnfd_in_use(msg.id):
1989 self._log.debug("Cannot delete VNFD in use - %s", msg)
1990 err = "Cannot delete a VNFD in use - %s" % msg
1991 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1992 # Delete a VNFD record
1993 yield from self._vnfm.delete_vnfd(msg.id)
1994
1995 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1996
1997 self._log.debug(
1998 "Registering for VNFD config using xpath: %s",
1999 VnfdDtsHandler.XPATH,
2000 )
2001 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2002 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2003 self._regh = acg.register(
2004 xpath=VnfdDtsHandler.XPATH,
2005 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2006 on_prepare=on_prepare)
2007
2008
2009 class VcsComponentDtsHandler(object):
2010 """ Vcs Component DTS handler """
2011 XPATH = ("D,/rw-manifest:manifest" +
2012 "/rw-manifest:operational-inventory" +
2013 "/rw-manifest:component")
2014
2015 def __init__(self, dts, log, loop, vnfm):
2016 self._dts = dts
2017 self._log = log
2018 self._loop = loop
2019 self._regh = None
2020 self._vnfm = vnfm
2021
2022 @property
2023 def regh(self):
2024 """ DTS registration handle """
2025 return self._regh
2026
2027 @asyncio.coroutine
2028 def register(self):
2029 """ Registers VCS component dts publisher registration"""
2030 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2031 VcsComponentDtsHandler.XPATH)
2032
2033 hdl = rift.tasklets.DTS.RegistrationHandler()
2034 handlers = rift.tasklets.Group.Handler()
2035 with self._dts.group_create(handler=handlers) as group:
2036 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2037 handler=hdl,
2038 flags=(rwdts.Flag.PUBLISHER |
2039 rwdts.Flag.NO_PREP_READ |
2040 rwdts.Flag.DATASTORE),)
2041
2042 @asyncio.coroutine
2043 def publish(self, xact, path, msg):
2044 """ Publishes the VCS component """
2045 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2046 xact, path, msg)
2047 self.regh.create_element(path, msg)
2048 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2049 VcsComponentDtsHandler.XPATH, xact, path, msg)
2050
2051 class VnfrConsoleOperdataDtsHandler(object):
2052 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2053 @property
2054 def vnfr_vdu_console_xpath(self):
2055 """ path for resource-mgr"""
2056 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2057
2058 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2059 self._dts = dts
2060 self._log = log
2061 self._loop = loop
2062 self._regh = None
2063 self._vnfm = vnfm
2064
2065 self._vnfr_id = vnfr_id
2066 self._vdur_id = vdur_id
2067 self._vdu_id = vdu_id
2068
2069 @asyncio.coroutine
2070 def register(self):
2071 """ Register for VNFR VDU Operational Data read from dts """
2072
2073 @asyncio.coroutine
2074 def on_prepare(xact_info, action, ks_path, msg):
2075 """ prepare callback from dts """
2076 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2077 self._log.debug(
2078 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2079 xact_info, action, xpath, msg
2080 )
2081
2082 if action == rwdts.QueryAction.READ:
2083 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2084 path_entry = schema.keyspec_to_entry(ks_path)
2085 self._log.debug("VDU Opdata path is {}".format(path_entry))
2086 try:
2087 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2088 except VnfRecordError as e:
2089 self._log.error("VNFR id %s not found", self._vnfr_id)
2090 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2091 return
2092 try:
2093 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2094 if not vdur._state == VDURecordState.READY:
2095 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2096 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2097 return
2098 with self._dts.transaction() as new_xact:
2099 resp = yield from vdur.read_resource(new_xact)
2100 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2101 vdur_console.id = self._vdur_id
2102 if resp.console_url:
2103 vdur_console.console_url = resp.console_url
2104 else:
2105 vdur_console.console_url = 'none'
2106 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2107 except Exception:
2108 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2109 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2110 vdur_console.id = self._vdur_id
2111 vdur_console.console_url = 'none'
2112
2113 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2114 xpath=self.vnfr_vdu_console_xpath,
2115 msg=vdur_console)
2116 else:
2117 #raise VnfRecordError("Not supported operation %s" % action)
2118 self._log.error("Not supported operation %s" % action)
2119 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2120 return
2121
2122
2123 self._log.debug("Registering for VNFR VDU using xpath: %s",
2124 self.vnfr_vdu_console_xpath)
2125 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2126 with self._dts.group_create() as group:
2127 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2128 handler=hdl,
2129 flags=rwdts.Flag.PUBLISHER,
2130 )
2131
2132
2133 class VnfrDtsHandler(object):
2134 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2135 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2136
2137 def __init__(self, dts, log, loop, vnfm):
2138 self._dts = dts
2139 self._log = log
2140 self._loop = loop
2141 self._vnfm = vnfm
2142
2143 self._regh = None
2144
2145 @property
2146 def regh(self):
2147 """ Return registration handle"""
2148 return self._regh
2149
2150 @property
2151 def vnfm(self):
2152 """ Return VNF manager instance """
2153 return self._vnfm
2154
2155 @asyncio.coroutine
2156 def register(self):
2157 """ Register for vnfr create/update/delete/read requests from dts """
2158 def on_commit(xact_info):
2159 """ The transaction has been committed """
2160 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2161 return rwdts.MemberRspCode.ACTION_OK
2162
2163 def on_abort(*args):
2164 """ Abort callback """
2165 self._log.debug("VNF transaction got aborted")
2166
2167 @asyncio.coroutine
2168 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2169
2170 @asyncio.coroutine
2171 def instantiate_realloc_vnfr(vnfr):
2172 """Re-populate the vnfm after restart
2173
2174 Arguments:
2175 vlink
2176
2177 """
2178
2179 yield from vnfr.instantiate(None, restart_mode=True)
2180
2181 if xact_event == rwdts.MemberEvent.INSTALL:
2182 curr_cfg = self.regh.elements
2183 for cfg in curr_cfg:
2184 vnfr = self.vnfm.create_vnfr(cfg)
2185 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2186
2187 self._log.debug("Got on_event in vnfm")
2188
2189 return rwdts.MemberRspCode.ACTION_OK
2190
2191 @asyncio.coroutine
2192 def on_prepare(xact_info, action, ks_path, msg):
2193 """ prepare callback from dts """
2194 self._log.debug(
2195 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2196 xact_info, action, msg
2197 )
2198
2199 if action == rwdts.QueryAction.CREATE:
2200 if not msg.has_field("vnfd"):
2201 err = "Vnfd not provided"
2202 self._log.error(err)
2203 raise VnfRecordError(err)
2204
2205 vnfr = self.vnfm.create_vnfr(msg)
2206 try:
2207 # RIFT-9105: Unable to add a READ query under an existing transaction
2208 # xact = xact_info.xact
2209 yield from vnfr.instantiate(None)
2210 except Exception as e:
2211 self._log.exception(e)
2212 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2213 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2214 yield from vnfr.publish(None)
2215 elif action == rwdts.QueryAction.DELETE:
2216 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2217 path_entry = schema.keyspec_to_entry(ks_path)
2218 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2219
2220 if vnfr is None:
2221 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2222 raise VirtualNetworkFunctionRecordNotFound(
2223 "VNFR id %s", path_entry.key00.id)
2224
2225 try:
2226 yield from vnfr.terminate(xact_info.xact)
2227 # Unref the VNFD
2228 vnfr.vnfd_unref()
2229 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2230 except Exception as e:
2231 self._log.exception(e)
2232 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2233
2234 elif action == rwdts.QueryAction.UPDATE:
2235 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2236 path_entry = schema.keyspec_to_entry(ks_path)
2237 vnfr = None
2238 try:
2239 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2240 except Exception as e:
2241 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2242 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2243 return
2244
2245 if vnfr is None:
2246 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2247 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2248 return
2249
2250 self._log.debug("VNFR {} update config status {} (current {})".
2251 format(vnfr.name, msg.config_status, vnfr.config_status))
2252 # Update the config status and publish
2253 vnfr._config_status = msg.config_status
2254 yield from vnfr.publish(None)
2255
2256 else:
2257 raise NotImplementedError(
2258 "%s action on VirtualNetworkFunctionRecord not supported",
2259 action)
2260
2261 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2262
2263 self._log.debug("Registering for VNFR using xpath: %s",
2264 VnfrDtsHandler.XPATH,)
2265
2266 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2267 on_prepare=on_prepare,)
2268 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2269 with self._dts.group_create(handler=handlers) as group:
2270 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2271 handler=hdl,
2272 flags=(rwdts.Flag.PUBLISHER |
2273 rwdts.Flag.NO_PREP_READ |
2274 rwdts.Flag.CACHE |
2275 rwdts.Flag.DATASTORE),)
2276
2277 @asyncio.coroutine
2278 def create(self, xact, path, msg):
2279 """
2280 Create a VNFR record in DTS with path and message
2281 """
2282 self._log.debug("Creating VNFR xact = %s, %s:%s",
2283 xact, path, msg)
2284
2285 self.regh.create_element(path, msg)
2286 self._log.debug("Created VNFR xact = %s, %s:%s",
2287 xact, path, msg)
2288
2289 @asyncio.coroutine
2290 def update(self, xact, path, msg):
2291 """
2292 Update a VNFR record in DTS with path and message
2293 """
2294 self._log.debug("Updating VNFR xact = %s, %s:%s",
2295 xact, path, msg)
2296 self.regh.update_element(path, msg)
2297 self._log.debug("Updated VNFR xact = %s, %s:%s",
2298 xact, path, msg)
2299
2300 @asyncio.coroutine
2301 def delete(self, xact, path):
2302 """
2303 Delete a VNFR record in DTS with path and message
2304 """
2305 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2306 self.regh.delete_element(path)
2307 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2308
2309
2310 class VnfdRefCountDtsHandler(object):
2311 """ The VNFD Ref Count DTS handler """
2312 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2313
2314 def __init__(self, dts, log, loop, vnfm):
2315 self._dts = dts
2316 self._log = log
2317 self._loop = loop
2318 self._vnfm = vnfm
2319
2320 self._regh = None
2321
2322 @property
2323 def regh(self):
2324 """ Return registration handle """
2325 return self._regh
2326
2327 @property
2328 def vnfm(self):
2329 """ Return the NS manager instance """
2330 return self._vnfm
2331
2332 @asyncio.coroutine
2333 def register(self):
2334 """ Register for VNFD ref count read from dts """
2335
2336 @asyncio.coroutine
2337 def on_prepare(xact_info, action, ks_path, msg):
2338 """ prepare callback from dts """
2339 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2340 self._log.debug(
2341 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2342 xact_info, action, xpath, msg
2343 )
2344
2345 if action == rwdts.QueryAction.READ:
2346 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2347 path_entry = schema.keyspec_to_entry(ks_path)
2348 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2349 for xpath, msg in vnfd_list:
2350 self._log.debug("Responding to ref count query path:%s, msg:%s",
2351 xpath, msg)
2352 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2353 xpath=xpath,
2354 msg=msg)
2355 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2356 else:
2357 raise VnfRecordError("Not supported operation %s" % action)
2358
2359 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2360 with self._dts.group_create() as group:
2361 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2362 handler=hdl,
2363 flags=rwdts.Flag.PUBLISHER,
2364 )
2365
2366
2367 class VdurDatastore(object):
2368 """
2369 This VdurDatastore is intended to expose select information about a VDUR
2370 such that it can be referenced in a cloud config file. The data that is
2371 exposed does not necessarily follow the structure of the data in the yang
2372 model. This is intentional. The data that are exposed are intended to be
2373 agnostic of the yang model so that changes in the model do not necessarily
2374 require changes to the interface provided to the user. It also means that
2375 the user does not need to be familiar with the RIFT.ware yang models.
2376 """
2377
2378 def __init__(self):
2379 """Create an instance of VdurDatastore"""
2380 self._vdur_data = dict()
2381 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2382
2383 def add(self, vdur):
2384 """Add a new VDUR to the datastore
2385
2386 Arguments:
2387 vdur - a VirtualDeploymentUnitRecord instance
2388
2389 Raises:
2390 A ValueError is raised if the VDUR is (1) None or (2) already in
2391 the datastore.
2392
2393 """
2394 if vdur.vdu_id is None:
2395 raise ValueError('VDURs are required to have an ID')
2396
2397 if vdur.vdu_id in self._vdur_data:
2398 raise ValueError('cannot add a VDUR more than once')
2399
2400 self._vdur_data[vdur.vdu_id] = dict()
2401
2402 def set_if_not_none(key, attr):
2403 if attr is not None:
2404 self._vdur_data[vdur.vdu_id][key] = attr
2405
2406 set_if_not_none('name', vdur._vdud.name)
2407 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2408
2409 def update(self, vdur):
2410 """Update the VDUR information in the datastore
2411
2412 Arguments:
2413 vdur - a GI representation of a VDUR
2414
2415 Raises:
2416 A ValueError is raised if the VDUR is (1) None or (2) already in
2417 the datastore.
2418
2419 """
2420 if vdur.vdu_id is None:
2421 raise ValueError('VNFDs are required to have an ID')
2422
2423 if vdur.vdu_id not in self._vdur_data:
2424 raise ValueError('VNF is not recognized')
2425
2426 def set_or_delete(key, attr):
2427 if attr is None:
2428 if key in self._vdur_data[vdur.vdu_id]:
2429 del self._vdur_data[vdur.vdu_id][key]
2430
2431 else:
2432 self._vdur_data[vdur.vdu_id][key] = attr
2433
2434 set_or_delete('name', vdur._vdud.name)
2435 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2436
2437 def remove(self, vdur_id):
2438 """Remove all of the data associated with specified VDUR
2439
2440 Arguments:
2441 vdur_id - the identifier of a VNFD in the datastore
2442
2443 Raises:
2444 A ValueError is raised if the VDUR is not contained in the
2445 datastore.
2446
2447 """
2448 if vdur_id not in self._vdur_data:
2449 raise ValueError('VNF is not recognized')
2450
2451 del self._vdur_data[vdur_id]
2452
2453 def get(self, expr):
2454 """Retrieve VDUR information from the datastore
2455
2456 An expression should be of the form,
2457
2458 vdu[<id>].<attr>
2459
2460 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2461 the exposed attribute that the user wishes to retrieve.
2462
2463 If the requested data is not available, None is returned.
2464
2465 Arguments:
2466 expr - a string that specifies the data to return
2467
2468 Raises:
2469 A ValueError is raised if the provided expression cannot be parsed.
2470
2471 Returns:
2472 The requested data or None
2473
2474 """
2475 result = self._pattern.match(expr)
2476 if result is None:
2477 raise ValueError('data expression not recognized ({})'.format(expr))
2478
2479 vdur_id, key = result.groups()
2480
2481 if vdur_id not in self._vdur_data:
2482 return None
2483
2484 return self._vdur_data[vdur_id].get(key, None)
2485
2486
2487 class VnfManager(object):
2488 """ The virtual network function manager class """
2489 def __init__(self, dts, log, loop, cluster_name):
2490 self._dts = dts
2491 self._log = log
2492 self._loop = loop
2493 self._cluster_name = cluster_name
2494
2495 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2496 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2497 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2498 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2499
2500 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2501 self._vnfr_handler,
2502 self._vcs_handler,
2503 self._vnfr_ref_handler,
2504 self._nsr_handler]
2505 self._vnfrs = {}
2506 self._vnfds_to_vnfr = {}
2507 self._nsrs = {}
2508
2509 @property
2510 def vnfr_handler(self):
2511 """ VNFR dts handler """
2512 return self._vnfr_handler
2513
2514 @property
2515 def vcs_handler(self):
2516 """ VCS dts handler """
2517 return self._vcs_handler
2518
2519 @asyncio.coroutine
2520 def register(self):
2521 """ Register all static DTS handlers """
2522 for hdl in self._dts_handlers:
2523 yield from hdl.register()
2524
2525 @asyncio.coroutine
2526 def run(self):
2527 """ Run this VNFM instance """
2528 self._log.debug("Run VNFManager - registering static DTS handlers""")
2529 yield from self.register()
2530
2531 def handle_nsr(self, nsr, action):
2532 if action in [rwdts.QueryAction.CREATE]:
2533 self._nsrs[nsr.id] = nsr
2534 elif action == rwdts.QueryAction.DELETE:
2535 if nsr.id in self._nsrs:
2536 del self._nsrs[nsr.id]
2537
2538 def get_linked_mgmt_network(self, vnfr):
2539 """For the given VNFR get the related mgmt network from the NSD, if
2540 available.
2541 """
2542 vnfd_id = vnfr.vnfd.id
2543 nsr_id = vnfr.nsr_id_ref
2544
2545 # for the given related VNFR, get the corresponding NSR-config
2546 nsr_obj = None
2547 try:
2548 nsr_obj = self._nsrs[nsr_id]
2549 except KeyError:
2550 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2551
2552 # for the related NSD check if a VLD exists such that it's a mgmt
2553 # network
2554 for vld in nsr_obj.nsd.vld:
2555 if vld.mgmt_network:
2556 return vld.name
2557
2558 return None
2559
2560 def get_vnfr(self, vnfr_id):
2561 """ get VNFR by vnfr id """
2562
2563 if vnfr_id not in self._vnfrs:
2564 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2565
2566 return self._vnfrs[vnfr_id]
2567
2568 def create_vnfr(self, vnfr):
2569 """ Create a VNFR instance """
2570 if vnfr.id in self._vnfrs:
2571 msg = "Vnfr id %s already exists" % vnfr.id
2572 self._log.error(msg)
2573 raise VnfRecordError(msg)
2574
2575 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2576 vnfr.id,
2577 vnfr.vnfd.id)
2578
2579 mgmt_network = self.get_linked_mgmt_network(vnfr)
2580
2581 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2582 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2583 mgmt_network=mgmt_network
2584 )
2585
2586 #Update ref count
2587 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2588 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2589 else:
2590 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2591
2592 return self._vnfrs[vnfr.id]
2593
2594 @asyncio.coroutine
2595 def delete_vnfr(self, xact, vnfr):
2596 """ Create a VNFR instance """
2597 if vnfr.vnfr_id in self._vnfrs:
2598 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2599 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2600
2601 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2602 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2603 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2604
2605 del self._vnfrs[vnfr.vnfr_id]
2606
2607 @asyncio.coroutine
2608 def fetch_vnfd(self, vnfd_id):
2609 """ Fetch VNFDs based with the vnfd id"""
2610 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2611 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2612 vnfd = None
2613
2614 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2615
2616 for ent in res_iter:
2617 res = yield from ent
2618 vnfd = res.result
2619
2620 if vnfd is None:
2621 err = "Failed to get Vnfd %s" % vnfd_id
2622 self._log.error(err)
2623 raise VnfRecordError(err)
2624
2625 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2626
2627 return vnfd
2628
2629 def vnfd_in_use(self, vnfd_id):
2630 """ Is this VNFD in use """
2631 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2632 if vnfd_id in self._vnfds_to_vnfr:
2633 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2634 return False
2635
2636 @asyncio.coroutine
2637 def publish_vnfr(self, xact, path, msg):
2638 """ Publish a VNFR """
2639 self._log.debug("publish_vnfr called with path %s, msg %s",
2640 path, msg)
2641 yield from self.vnfr_handler.update(xact, path, msg)
2642
2643 @asyncio.coroutine
2644 def delete_vnfd(self, vnfd_id):
2645 """ Delete the Virtual Network Function descriptor with the passed id """
2646 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2647 if vnfd_id in self._vnfds_to_vnfr:
2648 if self._vnfds_to_vnfr[vnfd_id]:
2649 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2650 vnfd_id,
2651 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2652 raise VirtualNetworkFunctionDescriptorRefCountExists(
2653 "Cannot delete :%s, ref_count:%s",
2654 vnfd_id,
2655 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2656
2657 del self._vnfds_to_vnfr[vnfd_id]
2658
2659 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2660 try:
2661 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2662 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2663 if os.path.exists(vnfd_dir):
2664 shutil.rmtree(vnfd_dir, ignore_errors=True)
2665 except Exception as e:
2666 self._log.error("Exception in cleaning up VNFD {}: {}".
2667 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2668 self._log.exception(e)
2669
2670
2671 def vnfd_refcount_xpath(self, vnfd_id):
2672 """ xpath for ref count entry """
2673 return (VnfdRefCountDtsHandler.XPATH +
2674 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2675
2676 @asyncio.coroutine
2677 def get_vnfd_refcount(self, vnfd_id):
2678 """ Get the vnfd_list from this VNFM"""
2679 vnfd_list = []
2680 if vnfd_id is None or vnfd_id == "":
2681 for vnfd in self._vnfds_to_vnfr.keys():
2682 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2683 vnfd_msg.vnfd_id_ref = vnfd
2684 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2685 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2686 elif vnfd_id in self._vnfds_to_vnfr:
2687 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2688 vnfd_msg.vnfd_id_ref = vnfd_id
2689 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2690 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2691
2692 return vnfd_list
2693
2694
2695 class VnfmTasklet(rift.tasklets.Tasklet):
2696 """ VNF Manager tasklet class """
2697 def __init__(self, *args, **kwargs):
2698 super(VnfmTasklet, self).__init__(*args, **kwargs)
2699 self.rwlog.set_category("rw-mano-log")
2700 self.rwlog.set_subcategory("vnfm")
2701
2702 self._dts = None
2703 self._vnfm = None
2704
2705 def start(self):
2706 try:
2707 super(VnfmTasklet, self).start()
2708 self.log.info("Starting VnfmTasklet")
2709
2710 self.log.setLevel(logging.DEBUG)
2711
2712 self.log.debug("Registering with dts")
2713 self._dts = rift.tasklets.DTS(self.tasklet_info,
2714 RwVnfmYang.get_schema(),
2715 self.loop,
2716 self.on_dts_state_change)
2717
2718 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2719 except Exception:
2720 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2721 raise
2722
2723 def on_instance_started(self):
2724 """ Task insance started callback """
2725 self.log.debug("Got instance started callback")
2726
2727 def stop(self):
2728 try:
2729 self._dts.deinit()
2730 except Exception:
2731 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2732 raise
2733
2734 @asyncio.coroutine
2735 def init(self):
2736 """ Task init callback """
2737 try:
2738 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2739 assert vm_parent_name is not None
2740 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2741 yield from self._vnfm.run()
2742 except Exception:
2743 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2744 raise
2745
2746 @asyncio.coroutine
2747 def run(self):
2748 """ Task run callback """
2749 pass
2750
2751 @asyncio.coroutine
2752 def on_dts_state_change(self, state):
2753 """Take action according to current dts state to transition
2754 application into the corresponding application state
2755
2756 Arguments
2757 state - current dts state
2758 """
2759 switch = {
2760 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2761 rwdts.State.CONFIG: rwdts.State.RUN,
2762 }
2763
2764 handlers = {
2765 rwdts.State.INIT: self.init,
2766 rwdts.State.RUN: self.run,
2767 }
2768
2769 # Transition application to next state
2770 handler = handlers.get(state, None)
2771 if handler is not None:
2772 yield from handler()
2773
2774 # Transition dts to next state
2775 next_state = switch.get(state, None)
2776 if next_state is not None:
2777 self._dts.handle.set_state(next_state)