Deprecate old config-manager model attributes
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54 import rift.mano.utils.short_name as mano_short_name
55
56
57 class VMResourceError(Exception):
58 """ VM resource Error"""
59 pass
60
61
62 class VnfRecordError(Exception):
63 """ VNF record instatiation failed"""
64 pass
65
66
67 class VduRecordError(Exception):
68 """ VDU record instatiation failed"""
69 pass
70
71
72 class NotImplemented(Exception):
73 """Not implemented """
74 pass
75
76
77 class VnfrRecordExistsError(Exception):
78 """VNFR record already exist with the same VNFR id"""
79 pass
80
81
82 class InternalVirtualLinkRecordError(Exception):
83 """Internal virtual link record error"""
84 pass
85
86
87 class VDUImageNotFound(Exception):
88 """VDU Image not found error"""
89 pass
90
91
92 class VirtualDeploymentUnitRecordError(Exception):
93 """VDU Instantiation failed"""
94 pass
95
96
97 class VMNotReadyError(Exception):
98 """ VM Not yet received from resource manager """
99 pass
100
101
102 class VDURecordNotFound(Exception):
103 """ Could not find a VDU record """
104 pass
105
106
107 class VirtualNetworkFunctionRecordDescNotFound(Exception):
108 """ Cannot find Virtual Network Function Record Descriptor """
109 pass
110
111
112 class VirtualNetworkFunctionDescriptorError(Exception):
113 """ Virtual Network Function Record Descriptor Error """
114 pass
115
116
117 class VirtualNetworkFunctionDescriptorNotFound(Exception):
118 """ Virtual Network Function Record Descriptor Not Found """
119 pass
120
121
122 class VirtualNetworkFunctionRecordNotFound(Exception):
123 """ Virtual Network Function Record Not Found """
124 pass
125
126
127 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
128 """ Virtual Network Funtion Descriptor reference count exists """
129 pass
130
131
132 class VnfrInstantiationFailed(Exception):
133 """ Virtual Network Funtion Instantiation failed"""
134 pass
135
136
137 class VNFMPlacementGroupError(Exception):
138 pass
139
140 class VirtualNetworkFunctionRecordState(enum.Enum):
141 """ VNFR state """
142 INIT = 1
143 VL_INIT_PHASE = 2
144 VM_INIT_PHASE = 3
145 READY = 4
146 TERMINATE = 5
147 VL_TERMINATE_PHASE = 6
148 VDU_TERMINATE_PHASE = 7
149 TERMINATED = 7
150 FAILED = 10
151
152
153 class VDURecordState(enum.Enum):
154 """VDU record state """
155 INIT = 1
156 INSTANTIATING = 2
157 RESOURCE_ALLOC_PENDING = 3
158 READY = 4
159 TERMINATING = 5
160 TERMINATED = 6
161 FAILED = 10
162
163
164 class VcsComponent(object):
165 """ VCS Component within the VNF descriptor """
166 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
167 self._dts = dts
168 self._log = log
169 self._loop = loop
170 self._component = component
171 self._cluster_name = cluster_name
172 self._vcs_handler = vcs_handler
173 self._mangled_name = mangled_name
174
175 @staticmethod
176 def mangle_name(component_name, vnf_name, vnfd_id):
177 """ mangled component name """
178 return vnf_name + ":" + component_name + ":" + vnfd_id
179
180 @property
181 def name(self):
182 """ name of this component"""
183 return self._mangled_name
184
185 @property
186 def path(self):
187 """ The path for this object """
188 return("D,/rw-manifest:manifest" +
189 "/rw-manifest:operational-inventory" +
190 "/rw-manifest:component" +
191 "[rw-manifest:component-name = '{}']").format(self.name)
192
193 @property
194 def instance_xpath(self):
195 """ The path for this object """
196 return("D,/rw-base:vcs" +
197 "/instances" +
198 "/instance" +
199 "[instance-name = '{}']".format(self._cluster_name))
200
201 @property
202 def start_comp_xpath(self):
203 """ start component xpath """
204 return (self.instance_xpath +
205 "/child-n[instance-name = 'START-REQ']")
206
207 def get_start_comp_msg(self, ip_address):
208 """ start this component """
209 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
210 start_msg.instance_name = 'START-REQ'
211 start_msg.component_name = self.name
212 start_msg.admin_command = "START"
213 start_msg.ip_address = ip_address
214
215 return start_msg
216
217 @property
218 def msg(self):
219 """ Returns the message for this vcs component"""
220
221 vcs_comp_dict = self._component.as_dict()
222
223 def mangle_comp_names(comp_dict):
224 """ mangle component name with VNF name, id"""
225 for key, val in comp_dict.items():
226 if isinstance(val, dict):
227 comp_dict[key] = mangle_comp_names(val)
228 elif isinstance(val, list):
229 i = 0
230 for ent in val:
231 if isinstance(ent, dict):
232 val[i] = mangle_comp_names(ent)
233 else:
234 val[i] = ent
235 i += 1
236 elif key == "component_name":
237 comp_dict[key] = VcsComponent.mangle_name(val,
238 self._vnfd_name,
239 self._vnfd_id)
240 return comp_dict
241
242 mangled_dict = mangle_comp_names(vcs_comp_dict)
243 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
244 return msg
245
246 @asyncio.coroutine
247 def publish(self, xact):
248 """ Publishes the VCS component """
249 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
250 self.name, self.path, self.msg)
251 yield from self._vcs_handler.publish(xact, self.path, self.msg)
252
253 @asyncio.coroutine
254 def start(self, xact, parent, ip_addr=None):
255 """ Starts this VCS component """
256 # ATTN RV - replace with block add
257 start_msg = self.get_start_comp_msg(ip_addr)
258 self._log.debug("starting component %s %s",
259 self.start_comp_xpath, start_msg)
260 yield from self._dts.query_create(self.start_comp_xpath,
261 0,
262 start_msg)
263 self._log.debug("started component %s, %s",
264 self.start_comp_xpath, start_msg)
265
266
267 class VirtualDeploymentUnitRecord(object):
268 """ Virtual Deployment Unit Record """
269 def __init__(self,
270 dts,
271 log,
272 loop,
273 vdud,
274 vnfr,
275 nsr_config,
276 mgmt_intf,
277 mgmt_network,
278 cloud_account_name,
279 vnfd_package_store,
280 vdur_id=None,
281 placement_groups=[]):
282 self._dts = dts
283 self._log = log
284 self._loop = loop
285 self._vdud = vdud
286 self._vnfr = vnfr
287 self._nsr_config = nsr_config
288 self._mgmt_intf = mgmt_intf
289 self._cloud_account_name = cloud_account_name
290 self._vnfd_package_store = vnfd_package_store
291 self._mgmt_network = mgmt_network
292
293 self._vdur_id = vdur_id or str(uuid.uuid4())
294 self._int_intf = []
295 self._ext_intf = []
296 self._state = VDURecordState.INIT
297 self._state_failed_reason = None
298 self._request_id = str(uuid.uuid4())
299 self._name = vnfr.name + "__" + vdud.id
300 self._placement_groups = placement_groups
301 self._rm_regh = None
302 self._vm_resp = None
303 self._vdud_cloud_init = None
304 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
305
306 @asyncio.coroutine
307 def vdu_opdata_register(self):
308 yield from self._vdur_console_handler.register()
309
310 def cp_ip_addr(self, cp_name):
311 """ Find ip address by connection point name """
312 if self._vm_resp is not None:
313 for conn_point in self._vm_resp.connection_points:
314 if conn_point.name == cp_name:
315 return conn_point.ip_address
316 return "0.0.0.0"
317
318 def cp_mac_addr(self, cp_name):
319 """ Find mac address by connection point name """
320 if self._vm_resp is not None:
321 for conn_point in self._vm_resp.connection_points:
322 if conn_point.name == cp_name:
323 return conn_point.mac_addr
324 return "00:00:00:00:00:00"
325
326 def cp_id(self, cp_name):
327 """ Find connection point id by connection point name """
328 if self._vm_resp is not None:
329 for conn_point in self._vm_resp.connection_points:
330 if conn_point.name == cp_name:
331 return conn_point.connection_point_id
332 return ''
333
334 @property
335 def vdu_id(self):
336 return self._vdud.id
337
338 @property
339 def vm_resp(self):
340 return self._vm_resp
341
342 @property
343 def name(self):
344 """ Return this VDUR's name """
345 return self._name
346
347 # Truncated name confirming to RFC 1123
348 @property
349 def unique_short_name(self):
350 """ Return this VDUR's unique short name """
351 # Impose these restrictions on Unique name
352 # Max 64
353 # - Max 10 of NSR name (remove all specialcharacters, only numbers and alphabets)
354 # - 6 chars of shortened name
355 # - Max 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
356 #
357 def _restrict_tag(input_str):
358 # Exclude all characters except a-zA-Z0-9
359 outstr = re.sub('[^a-zA-Z0-9]', '', input_str)
360 # Take max of 10 chars
361 return outstr[-10:]
362
363 # Use NSR name for part1
364 part1 = _restrict_tag(self._nsr_config.name)
365 # Get unique short string (6 chars)
366 part2 = mano_short_name.StringShortner(self._name)
367 # Use VDU ID for part3
368 part3 = _restrict_tag(self._vdud.id)
369 shortstr = part1 + "-" + part2.short_string + "-" + part3
370 return shortstr
371
372 @property
373 def cloud_account_name(self):
374 """ Cloud account this VDU should be created in """
375 return self._cloud_account_name
376
377 @property
378 def image_name(self):
379 """ name that should be used to lookup the image on the CMP """
380 if 'image' not in self._vdud:
381 return None
382 return os.path.basename(self._vdud.image)
383
384 @property
385 def image_checksum(self):
386 """ name that should be used to lookup the image on the CMP """
387 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
388
389 @property
390 def management_ip(self):
391 if not self.active:
392 return None
393 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
394
395 @property
396 def vm_management_ip(self):
397 if not self.active:
398 return None
399 return self._vm_resp.management_ip
400
401 @property
402 def operational_status(self):
403 """ Operational status of this VDU"""
404 op_stats_dict = {"INIT": "init",
405 "INSTANTIATING": "vm_init_phase",
406 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
407 "READY": "running",
408 "FAILED": "failed",
409 "TERMINATING": "terminated",
410 "TERMINATED": "terminated",
411 }
412 return op_stats_dict[self._state.name]
413
414 @property
415 def msg(self):
416 """ Process VDU message from resmgr"""
417 vdu_fields = ["vm_flavor",
418 "guest_epa",
419 "vswitch_epa",
420 "hypervisor_epa",
421 "host_epa",
422 "volumes",
423 ]
424 vdu_copy_dict = {k: v for k, v in
425 self._vdud.as_dict().items() if k in vdu_fields}
426 vdur_dict = {"id": self._vdur_id,
427 "vdu_id_ref": self._vdud.id,
428 "operational_status": self.operational_status,
429 "operational_status_details": self._state_failed_reason,
430 "name": self.name,
431 "unique_short_name": self.unique_short_name
432 }
433
434 if self.vm_resp is not None:
435 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
436 "flavor_id": self.vm_resp.flavor_id
437 })
438 if self._vm_resp.has_field('image_id'):
439 vdur_dict.update({ "image_id": self.vm_resp.image_id })
440
441 if self.management_ip is not None:
442 vdur_dict["management_ip"] = self.management_ip
443
444 if self.vm_management_ip is not None:
445 vdur_dict["vm_management_ip"] = self.vm_management_ip
446
447 vdur_dict.update(vdu_copy_dict)
448
449 if self.vm_resp is not None:
450 if self._vm_resp.has_field('volumes'):
451 for opvolume in self._vm_resp.volumes:
452 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
453 if len(vdurvol_data) == 1:
454 vdurvol_data[0]["volume_id"] = opvolume.volume_id
455 if opvolume.has_field('custom_meta_data'):
456 metadata_list = list()
457 for metadata_item in opvolume.custom_meta_data:
458 metadata_list.append(metadata_item.as_dict())
459 vdurvol_data[0]['custom_meta_data'] = metadata_list
460
461 if self._vm_resp.has_field('supplemental_boot_data'):
462 vdur_dict['supplemental_boot_data'] = dict()
463 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
464 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
465 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
466 metadata_list = list()
467 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
468 metadata_list.append(metadata_item.as_dict())
469 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
470 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
471 file_list = list()
472 for file_item in self._vm_resp.supplemental_boot_data.config_file:
473 file_list.append(file_item.as_dict())
474 vdur_dict['supplemental_boot_data']['config_file'] = file_list
475
476 icp_list = []
477 ii_list = []
478
479 for intf, cp_id, vlr in self._int_intf:
480 cp = self.find_internal_cp_by_cp_id(cp_id)
481
482 icp_list.append({"name": cp.name,
483 "id": cp.id,
484 "type_yang": "VPORT",
485 "ip_address": self.cp_ip_addr(cp.id),
486 "mac_address": self.cp_mac_addr(cp.id)})
487
488 ii_list.append({"name": intf.name,
489 "vdur_internal_connection_point_ref": cp.id,
490 "virtual_interface": {}})
491
492 vdur_dict["internal_connection_point"] = icp_list
493 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
494 vdur_dict["internal_interface"] = ii_list
495
496 ei_list = []
497 for intf, cp, vlr in self._ext_intf:
498 ei_list.append({"name": cp.name,
499 "vnfd_connection_point_ref": cp.name,
500 "virtual_interface": {}})
501 self._vnfr.update_cp(cp.name,
502 self.cp_ip_addr(cp.name),
503 self.cp_mac_addr(cp.name),
504 self.cp_id(cp.name))
505
506 vdur_dict["external_interface"] = ei_list
507
508 placement_groups = []
509 for group in self._placement_groups:
510 placement_groups.append(group.as_dict())
511 vdur_dict['placement_groups_info'] = placement_groups
512
513 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
514
515 @property
516 def resmgr_path(self):
517 """ path for resource-mgr"""
518 return ("D,/rw-resource-mgr:resource-mgmt" +
519 "/vdu-event" +
520 "/vdu-event-data[event-id='{}']".format(self._request_id))
521
522 @property
523 def vm_flavor_msg(self):
524 """ VM flavor message """
525 flavor = self._vdud.vm_flavor.__class__()
526 flavor.copy_from(self._vdud.vm_flavor)
527
528 return flavor
529
530 @property
531 def vdud_cloud_init(self):
532 """ Return the cloud-init contents for the VDU """
533 if self._vdud_cloud_init is None:
534 self._vdud_cloud_init = self.cloud_init()
535
536 return self._vdud_cloud_init
537
538 def cloud_init(self):
539 """ Populate cloud_init with cloud-config script from
540 either the inline contents or from the file provided
541 """
542 if self._vdud.cloud_init is not None:
543 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
544 return self._vdud.cloud_init
545 elif self._vdud.cloud_init_file is not None:
546 # Get cloud-init script contents from the file provided in the cloud_init_file param
547 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
548 filename = self._vdud.cloud_init_file
549 self._vnfd_package_store.refresh()
550 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
551 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
552 try:
553 return cloud_init_extractor.read_script(stored_package, filename)
554 except rift.package.cloud_init.CloudInitExtractionError as e:
555 self.instantiation_failed(str(e))
556 raise VirtualDeploymentUnitRecordError(e)
557 else:
558 self._log.debug("VDU Instantiation: cloud-init script not provided")
559
560 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
561 host_aggregates = []
562 availability_zones = []
563 server_groups = []
564 for group in self._placement_groups:
565 if group.has_field('host_aggregate'):
566 for aggregate in group.host_aggregate:
567 host_aggregates.append(aggregate.as_dict())
568 if group.has_field('availability_zone'):
569 availability_zones.append(group.availability_zone.as_dict())
570 if group.has_field('server_group'):
571 server_groups.append(group.server_group.as_dict())
572
573 if availability_zones:
574 if len(availability_zones) > 1:
575 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
576 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
577 else:
578 vm_create_msg_dict['availability_zone'] = availability_zones[0]
579
580 if server_groups:
581 if len(server_groups) > 1:
582 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
583 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
584 else:
585 vm_create_msg_dict['server_group'] = server_groups[0]
586
587 if host_aggregates:
588 vm_create_msg_dict['host_aggregate'] = host_aggregates
589
590 return
591
592 def process_placement_groups(self, vm_create_msg_dict):
593 """Process the placement_groups and fill resource-mgr request"""
594 if not self._placement_groups:
595 return
596
597 cloud_set = set([group.cloud_type for group in self._placement_groups])
598 assert len(cloud_set) == 1
599 cloud_type = cloud_set.pop()
600
601 if cloud_type == 'openstack':
602 self.process_openstack_placement_group_construct(vm_create_msg_dict)
603
604 else:
605 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
606 return
607
608 def process_custom_bootdata(self, vm_create_msg_dict):
609 """Process the custom boot data"""
610 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
611 return
612
613 self._vnfd_package_store.refresh()
614 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
615 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
616 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
617 if 'source' not in file_item or 'dest' not in file_item:
618 continue
619 source = file_item['source']
620 # Find source file in scripts dir of VNFD
621 self._log.debug("Checking for source config file at %s", source)
622 try:
623 source_file_str = cloud_init_extractor.read_script(stored_package, source)
624 except rift.package.cloud_init.CloudInitExtractionError as e:
625 raise VirtualDeploymentUnitRecordError(e)
626 # Update source file location with file contents
627 file_item['source'] = source_file_str
628
629 return
630
631 def resmgr_msg(self, config=None):
632 vdu_fields = ["vm_flavor",
633 "guest_epa",
634 "vswitch_epa",
635 "hypervisor_epa",
636 "host_epa",
637 "volumes",
638 "supplemental_boot_data"]
639
640 self._log.debug("Creating params based on VDUD: %s", self._vdud)
641 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
642
643 vm_create_msg_dict = {
644 "name": self.unique_short_name, # Truncated name confirming to RFC 1123
645 "node_id": self.name, # Rift assigned Id
646 }
647
648 if self.image_name is not None:
649 vm_create_msg_dict["image_name"] = self.image_name
650
651 if self.image_checksum is not None:
652 vm_create_msg_dict["image_checksum"] = self.image_checksum
653
654 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
655 if self._vdud.has_field('mgmt_vpci'):
656 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
657
658 self._log.debug("VDUD: %s", self._vdud)
659 if config is not None:
660 vm_create_msg_dict['vdu_init'] = {'userdata': config}
661
662 if self._mgmt_network:
663 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
664
665 cp_list = []
666 for intf, cp, vlr in self._ext_intf:
667 cp_info = { "name": cp.name,
668 "virtual_link_id": vlr.network_id,
669 "type_yang": intf.virtual_interface.type_yang }
670
671 if cp.has_field('port_security_enabled'):
672 cp_info["port_security_enabled"] = cp.port_security_enabled
673
674 if (intf.virtual_interface.has_field('vpci') and
675 intf.virtual_interface.vpci is not None):
676 cp_info["vpci"] = intf.virtual_interface.vpci
677
678 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
679 cp_info['security_group'] = vlr.ip_profile_params.security_group
680
681 cp_list.append(cp_info)
682
683 for intf, cp, vlr in self._int_intf:
684 if (intf.virtual_interface.has_field('vpci') and
685 intf.virtual_interface.vpci is not None):
686 cp_list.append({"name": cp,
687 "virtual_link_id": vlr.network_id,
688 "type_yang": intf.virtual_interface.type_yang,
689 "vpci": intf.virtual_interface.vpci})
690 else:
691 if cp.has_field('port_security_enabled'):
692 cp_list.append({"name": cp,
693 "virtual_link_id": vlr.network_id,
694 "type_yang": intf.virtual_interface.type_yang,
695 "port_security_enabled": cp.port_security_enabled})
696 else:
697 cp_list.append({"name": cp,
698 "virtual_link_id": vlr.network_id,
699 "type_yang": intf.virtual_interface.type_yang})
700
701
702 vm_create_msg_dict["connection_points"] = cp_list
703 vm_create_msg_dict.update(vdu_copy_dict)
704
705 self.process_placement_groups(vm_create_msg_dict)
706 if 'supplemental_boot_data' in vm_create_msg_dict:
707 self.process_custom_bootdata(vm_create_msg_dict)
708
709 msg = RwResourceMgrYang.VDUEventData()
710 msg.event_id = self._request_id
711 msg.cloud_account = self.cloud_account_name
712 msg.request_info.from_dict(vm_create_msg_dict)
713
714 return msg
715
716 @asyncio.coroutine
717 def terminate(self, xact):
718 """ Delete resource in VIM """
719 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
720 self._log.warning("VDU terminate in not ready state - Ignoring request")
721 return
722
723 self._state = VDURecordState.TERMINATING
724 if self._vm_resp is not None:
725 try:
726 with self._dts.transaction() as new_xact:
727 yield from self.delete_resource(new_xact)
728 except Exception:
729 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
730
731 if self._rm_regh is not None:
732 self._log.debug("Deregistering resource manager registration handle")
733 self._rm_regh.deregister()
734 self._rm_regh = None
735
736 if self._vdur_console_handler is not None:
737 self._log.error("Deregistering vnfr vdur registration handle")
738 self._vdur_console_handler._regh.deregister()
739 self._vdur_console_handler._regh = None
740
741 self._state = VDURecordState.TERMINATED
742
743 def find_internal_cp_by_cp_id(self, cp_id):
744 """ Find the CP corresponding to the connection point id"""
745 cp = None
746
747 self._log.debug("find_internal_cp_by_cp_id(%s) called",
748 cp_id)
749
750 for int_cp in self._vdud.internal_connection_point:
751 self._log.debug("Checking for int cp %s in internal connection points",
752 int_cp.id)
753 if int_cp.id == cp_id:
754 cp = int_cp
755 break
756
757 if cp is None:
758 self._log.debug("Failed to find cp %s in internal connection points",
759 cp_id)
760 msg = "Failed to find cp %s in internal connection points" % cp_id
761 raise VduRecordError(msg)
762
763 # return the VLR associated with the connection point
764 return cp
765
766 @asyncio.coroutine
767 def create_resource(self, xact, vnfr, config=None):
768 """ Request resource from ResourceMgr """
769 def find_cp_by_name(cp_name):
770 """ Find a connection point by name """
771 cp = None
772 self._log.debug("find_cp_by_name(%s) called", cp_name)
773 for ext_cp in vnfr._cprs:
774 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
775 if ext_cp.name == cp_name:
776 cp = ext_cp
777 break
778 if cp is None:
779 self._log.debug("Failed to find cp %s in external connection points",
780 cp_name)
781 return cp
782
783 def find_internal_vlr_by_cp_name(cp_name):
784 """ Find the VLR corresponding to the connection point name"""
785 cp = None
786
787 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
788 cp_name)
789
790 for int_cp in self._vdud.internal_connection_point:
791 self._log.debug("Checking for int cp %s in internal connection points",
792 int_cp.id)
793 if int_cp.id == cp_name:
794 cp = int_cp
795 break
796
797 if cp is None:
798 self._log.debug("Failed to find cp %s in internal connection points",
799 cp_name)
800 msg = "Failed to find cp %s in internal connection points" % cp_name
801 raise VduRecordError(msg)
802
803 # return the VLR associated with the connection point
804 return vnfr.find_vlr_by_cp(cp_name)
805
806 block = xact.block_create()
807
808 self._log.debug("Executing vm request id: %s, action: create",
809 self._request_id)
810
811 # Resolve the networks associated external interfaces
812 for ext_intf in self._vdud.external_interface:
813 self._log.debug("Resolving external interface name [%s], cp[%s]",
814 ext_intf.name, ext_intf.vnfd_connection_point_ref)
815 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
816 if cp is None:
817 self._log.debug("Failed to find connection point - %s",
818 ext_intf.vnfd_connection_point_ref)
819 continue
820 self._log.debug("Connection point name [%s], type[%s]",
821 cp.name, cp.type_yang)
822
823 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
824
825 etuple = (ext_intf, cp, vlr)
826 self._ext_intf.append(etuple)
827
828 self._log.debug("Created external interface tuple : %s", etuple)
829
830 # Resolve the networks associated internal interfaces
831 for intf in self._vdud.internal_interface:
832 cp_id = intf.vdu_internal_connection_point_ref
833 self._log.debug("Resolving internal interface name [%s], cp[%s]",
834 intf.name, cp_id)
835
836 try:
837 vlr = find_internal_vlr_by_cp_name(cp_id)
838 except Exception as e:
839 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
840 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
841 raise VduRecordError(msg)
842
843 ituple = (intf, cp_id, vlr)
844 self._int_intf.append(ituple)
845
846 self._log.debug("Created internal interface tuple : %s", ituple)
847
848 resmgr_path = self.resmgr_path
849 resmgr_msg = self.resmgr_msg(config)
850
851 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
852 block.add_query_create(resmgr_path, resmgr_msg)
853
854 res_iter = yield from block.execute(now=True)
855
856 resp = None
857
858 for i in res_iter:
859 r = yield from i
860 resp = r.result
861
862 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
863 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
864 self._log.debug("Got vm request response: %s", resp.resource_info)
865 return resp.resource_info
866
867 @asyncio.coroutine
868 def delete_resource(self, xact):
869 block = xact.block_create()
870
871 self._log.debug("Executing vm request id: %s, action: delete",
872 self._request_id)
873
874 block.add_query_delete(self.resmgr_path)
875
876 yield from block.execute(flags=0, now=True)
877
878 @asyncio.coroutine
879 def read_resource(self, xact):
880 block = xact.block_create()
881
882 self._log.debug("Executing vm request id: %s, action: delete",
883 self._request_id)
884
885 block.add_query_read(self.resmgr_path)
886
887 res_iter = yield from block.execute(flags=0, now=True)
888 for i in res_iter:
889 r = yield from i
890 resp = r.result
891
892 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
893 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
894 self._log.debug("Got vm request response: %s", resp.resource_info)
895 #self._vm_resp = resp.resource_info
896 return resp.resource_info
897
898
899 @asyncio.coroutine
900 def start_component(self):
901 """ This VDUR is active """
902 self._log.debug("Starting component %s for vdud %s vdur %s",
903 self._vdud.vcs_component_ref,
904 self._vdud,
905 self._vdur_id)
906 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
907 self.vm_resp.management_ip)
908
909 @property
910 def active(self):
911 """ Is this VDU active """
912 return True if self._state is VDURecordState.READY else False
913
914 @asyncio.coroutine
915 def instantiation_failed(self, failed_reason=None):
916 """ VDU instantiation failed """
917 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
918 self._state = VDURecordState.FAILED
919 self._state_failed_reason = failed_reason
920 yield from self._vnfr.instantiation_failed(failed_reason)
921
922 @asyncio.coroutine
923 def vdu_is_active(self):
924 """ This VDU is active"""
925 if self.active:
926 self._log.warning("VDU %s was already marked as active", self._vdur_id)
927 return
928
929 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
930
931 if self._vdud.vcs_component_ref is not None:
932 yield from self.start_component()
933
934 self._state = VDURecordState.READY
935
936 if self._vnfr.all_vdus_active():
937 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
938 yield from self._vnfr.is_ready()
939
940 @asyncio.coroutine
941 def instantiate(self, xact, vnfr, config=None):
942 """ Instantiate this VDU """
943 self._state = VDURecordState.INSTANTIATING
944
945 @asyncio.coroutine
946 def on_prepare(xact_info, query_action, ks_path, msg):
947 """ This VDUR is active """
948 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
949 query_action,
950 ks_path,
951 msg)
952
953 if (query_action == rwdts.QueryAction.UPDATE or
954 query_action == rwdts.QueryAction.CREATE):
955 self._vm_resp = msg
956
957 if msg.resource_state == "active":
958 # Move this VDU to ready state
959 yield from self.vdu_is_active()
960 elif msg.resource_state == "failed":
961 yield from self.instantiation_failed(msg.resource_errors)
962 elif query_action == rwdts.QueryAction.DELETE:
963 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
964 else:
965 raise NotImplementedError(
966 "%s action on VirtualDeployementUnitRecord not supported",
967 query_action)
968
969 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
970
971 try:
972 reg_event = asyncio.Event(loop=self._loop)
973
974 @asyncio.coroutine
975 def on_ready(regh, status):
976 reg_event.set()
977
978 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
979 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
980 flags=rwdts.Flag.SUBSCRIBER,
981 handler=handler)
982 yield from reg_event.wait()
983
984 vm_resp = yield from self.create_resource(xact, vnfr, config)
985 self._vm_resp = vm_resp
986 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
987
988 self._log.debug("Requested VM from resource manager response %s",
989 vm_resp)
990 if vm_resp.resource_state == "active":
991 self._log.debug("Resourcemgr responded wih an active vm resp %s",
992 vm_resp)
993 yield from self.vdu_is_active()
994 self._state = VDURecordState.READY
995 elif (vm_resp.resource_state == "pending" or
996 vm_resp.resource_state == "inactive"):
997 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
998 vm_resp)
999 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1000 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1001 # flags=rwdts.Flag.SUBSCRIBER,
1002 # handler=handler)
1003 else:
1004 self._log.debug("Resourcemgr responded wih an error vm resp %s",
1005 vm_resp)
1006 raise VirtualDeploymentUnitRecordError(
1007 "Failed VDUR instantiation %s " % vm_resp)
1008
1009 except Exception as e:
1010 import traceback
1011 traceback.print_exc()
1012 self._log.exception(e)
1013 self._log.error("Instantiation of VDU record failed: %s", str(e))
1014 self._state = VDURecordState.FAILED
1015 yield from self.instantiation_failed(str(e))
1016
1017
1018 class VlRecordState(enum.Enum):
1019 """ VL Record State """
1020 INIT = 101
1021 INSTANTIATION_PENDING = 102
1022 ACTIVE = 103
1023 TERMINATE_PENDING = 104
1024 TERMINATED = 105
1025 FAILED = 106
1026
1027
1028 class InternalVirtualLinkRecord(object):
1029 """ Internal Virtual Link record """
1030 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
1031 self._dts = dts
1032 self._log = log
1033 self._loop = loop
1034 self._ivld_msg = ivld_msg
1035 self._vnfr_name = vnfr_name
1036 self._cloud_account_name = cloud_account_name
1037 self._ip_profile = ip_profile
1038
1039 self._vlr_req = self.create_vlr()
1040 self._vlr = None
1041 self._state = VlRecordState.INIT
1042
1043 @property
1044 def vlr_id(self):
1045 """ Find VLR by id """
1046 return self._vlr_req.id
1047
1048 @property
1049 def name(self):
1050 """ Name of this VL """
1051 if self._ivld_msg.vim_network_name:
1052 return self._ivld_msg.vim_network_name
1053 else:
1054 return self._vnfr_name + "." + self._ivld_msg.name
1055
1056 @property
1057 def network_id(self):
1058 """ Find VLR by id """
1059 return self._vlr.network_id if self._vlr else None
1060
1061 def vlr_path(self):
1062 """ VLR path for this VLR instance"""
1063 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1064
1065 def create_vlr(self):
1066 """ Create the VLR record which will be instantiated """
1067
1068 vld_fields = ["short_name",
1069 "vendor",
1070 "description",
1071 "version",
1072 "type_yang",
1073 "vim_network_name",
1074 "provider_network"]
1075
1076 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1077
1078 vlr_dict = {"id": str(uuid.uuid4()),
1079 "name": self.name,
1080 "cloud_account": self._cloud_account_name,
1081 }
1082
1083 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1084 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1085
1086 vlr_dict.update(vld_copy_dict)
1087
1088 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1089 return vlr
1090
1091 @asyncio.coroutine
1092 def instantiate(self, xact, restart_mode=False):
1093 """ Instantiate VL """
1094
1095 @asyncio.coroutine
1096 def instantiate_vlr():
1097 """ Instantiate VLR"""
1098 self._log.debug("Create VL with xpath %s and vlr %s",
1099 self.vlr_path(), self._vlr_req)
1100
1101 with self._dts.transaction(flags=0) as xact:
1102 block = xact.block_create()
1103 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1104 self._log.debug("Executing VL create path:%s msg:%s",
1105 self.vlr_path(), self._vlr_req)
1106
1107 res_iter = None
1108 try:
1109 res_iter = yield from block.execute()
1110 except Exception:
1111 self._state = VlRecordState.FAILED
1112 self._log.exception("Caught exception while instantial VL")
1113 raise
1114
1115 for ent in res_iter:
1116 res = yield from ent
1117 self._vlr = res.result
1118
1119 if self._vlr.operational_status == 'failed':
1120 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1121 self._state = VlRecordState.FAILED
1122 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1123
1124 self._log.info("Created VL with xpath %s and vlr %s",
1125 self.vlr_path(), self._vlr)
1126
1127 @asyncio.coroutine
1128 def get_vlr():
1129 """ Get the network id """
1130 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1131 vlr = None
1132 for ent in res_iter:
1133 res = yield from ent
1134 vlr = res.result
1135
1136 if vlr is None:
1137 err = "Failed to get VLR for path %s" % self.vlr_path()
1138 self._log.warn(err)
1139 raise InternalVirtualLinkRecordError(err)
1140 return vlr
1141
1142 self._state = VlRecordState.INSTANTIATION_PENDING
1143
1144 if restart_mode:
1145 vl = yield from get_vlr()
1146 if vl is None:
1147 yield from instantiate_vlr()
1148 else:
1149 yield from instantiate_vlr()
1150
1151 self._state = VlRecordState.ACTIVE
1152
1153 def vlr_in_vns(self):
1154 """ Is there a VLR record in VNS """
1155 if (self._state == VlRecordState.ACTIVE or
1156 self._state == VlRecordState.INSTANTIATION_PENDING or
1157 self._state == VlRecordState.FAILED):
1158 return True
1159
1160 return False
1161
1162 @asyncio.coroutine
1163 def terminate(self, xact):
1164 """Terminate this VL """
1165 if not self.vlr_in_vns():
1166 self._log.debug("Ignoring terminate request for id %s in state %s",
1167 self.vlr_id, self._state)
1168 return
1169
1170 self._log.debug("Terminating VL with path %s", self.vlr_path())
1171 self._state = VlRecordState.TERMINATE_PENDING
1172 block = xact.block_create()
1173 block.add_query_delete(self.vlr_path())
1174 yield from block.execute(flags=0, now=True)
1175 self._state = VlRecordState.TERMINATED
1176 self._log.debug("Terminated VL with path %s", self.vlr_path())
1177
1178
1179 class VirtualNetworkFunctionRecord(object):
1180 """ Virtual Network Function Record """
1181 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1182 self._dts = dts
1183 self._log = log
1184 self._loop = loop
1185 self._cluster_name = cluster_name
1186 self._vnfr_msg = vnfr_msg
1187 self._vnfr_id = vnfr_msg.id
1188 self._vnfd_id = vnfr_msg.vnfd.id
1189 self._vnfm = vnfm
1190 self._vcs_handler = vcs_handler
1191 self._vnfr = vnfr_msg
1192 self._mgmt_network = mgmt_network
1193
1194 self._vnfd = vnfr_msg.vnfd
1195 self._state = VirtualNetworkFunctionRecordState.INIT
1196 self._state_failed_reason = None
1197 self._ext_vlrs = {} # The list of external virtual links
1198 self._vlrs = [] # The list of internal virtual links
1199 self._vdus = [] # The list of vdu
1200 self._vlr_by_cp = {}
1201 self._cprs = []
1202 self._inventory = {}
1203 self._create_time = int(time.time())
1204 self._vnf_mon = None
1205 self._config_status = vnfr_msg.config_status
1206 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1207 self._rw_vnfd = None
1208 self._vnfd_ref_count = 0
1209
1210 def _get_vdur_from_vdu_id(self, vdu_id):
1211 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1212 self._log.debug("Searching through vdus: %s", self._vdus)
1213 for vdu in self._vdus:
1214 self._log.debug("vdu_id: %s", vdu.vdu_id)
1215 if vdu.vdu_id == vdu_id:
1216 return vdu
1217
1218 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1219
1220 @property
1221 def operational_status(self):
1222 """ Operational status of this VNFR """
1223 op_status_map = {"INIT": "init",
1224 "VL_INIT_PHASE": "vl_init_phase",
1225 "VM_INIT_PHASE": "vm_init_phase",
1226 "READY": "running",
1227 "TERMINATE": "terminate",
1228 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1229 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1230 "TERMINATED": "terminated",
1231 "FAILED": "failed", }
1232 return op_status_map[self._state.name]
1233
1234 @staticmethod
1235 def vnfd_xpath(vnfd_id):
1236 """ VNFD xpath associated with this VNFR """
1237 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1238
1239 @property
1240 def vnfd_ref_count(self):
1241 """ Returns the VNFD reference count associated with this VNFR """
1242 return self._vnfd_ref_count
1243
1244 def vnfd_in_use(self):
1245 """ Returns whether vnfd is in use or not """
1246 return True if self._vnfd_ref_count > 0 else False
1247
1248 def vnfd_ref(self):
1249 """ Take a reference on this object """
1250 self._vnfd_ref_count += 1
1251 return self._vnfd_ref_count
1252
1253 def vnfd_unref(self):
1254 """ Release reference on this object """
1255 if self._vnfd_ref_count < 1:
1256 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1257 (self.vnfd.id, self._vnfd_ref_count))
1258 self._log.critical(msg)
1259 raise VnfRecordError(msg)
1260 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1261 self.vnfd.id, self._vnfd_ref_count)
1262 self._vnfd_ref_count -= 1
1263 return self._vnfd_ref_count
1264
1265 @property
1266 def vnfd(self):
1267 """ VNFD for this VNFR """
1268 return self._vnfd
1269
1270 @property
1271 def vnf_name(self):
1272 """ VNFD name associated with this VNFR """
1273 return self.vnfd.name
1274
1275 @property
1276 def name(self):
1277 """ Name of this VNF in the record """
1278 return self._vnfr.name
1279
1280 @property
1281 def cloud_account_name(self):
1282 """ Name of the cloud account this VNFR is instantiated in """
1283 return self._vnfr.cloud_account
1284
1285 @property
1286 def vnfd_id(self):
1287 """ VNFD Id associated with this VNFR """
1288 return self.vnfd.id
1289
1290 @property
1291 def vnfr_id(self):
1292 """ VNFR Id associated with this VNFR """
1293 return self._vnfr_id
1294
1295 @property
1296 def member_vnf_index(self):
1297 """ Member VNF index associated with this VNFR """
1298 return self._vnfr.member_vnf_index_ref
1299
1300 @property
1301 def config_status(self):
1302 """ Config agent status for this VNFR """
1303 return self._config_status
1304
1305 def component_by_name(self, component_name):
1306 """ Find a component by name in the inventory list"""
1307 mangled_name = VcsComponent.mangle_name(component_name,
1308 self.vnf_name,
1309 self.vnfd_id)
1310 return self._inventory[mangled_name]
1311
1312
1313
1314 @asyncio.coroutine
1315 def get_nsr_config(self):
1316 ### Need access to NS instance configuration for runtime resolution.
1317 ### This shall be replaced when deployment flavors are implemented
1318 xpath = "C,/nsr:ns-instance-config"
1319 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1320
1321 for result in results:
1322 entry = yield from result
1323 ns_instance_config = entry.result
1324 for nsr in ns_instance_config.nsr:
1325 if nsr.id == self._vnfr_msg.nsr_id_ref:
1326 return nsr
1327 return None
1328
1329 @asyncio.coroutine
1330 def start_component(self, component_name, ip_addr):
1331 """ Start a component in the VNFR by name """
1332 comp = self.component_by_name(component_name)
1333 yield from comp.start(None, None, ip_addr)
1334
1335 def cp_ip_addr(self, cp_name):
1336 """ Get ip address for connection point """
1337 self._log.debug("cp_ip_addr()")
1338 for cp in self._cprs:
1339 if cp.name == cp_name and cp.ip_address is not None:
1340 return cp.ip_address
1341 return "0.0.0.0"
1342
1343 def mgmt_intf_info(self):
1344 """ Get Management interface info for this VNFR """
1345 mgmt_intf_desc = self.vnfd.mgmt_interface
1346 ip_addr = None
1347 if mgmt_intf_desc.has_field("cp"):
1348 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1349 elif mgmt_intf_desc.has_field("vdu_id"):
1350 try:
1351 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1352 ip_addr = vdur.management_ip
1353 except VDURecordNotFound:
1354 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1355 ip_addr = None
1356 else:
1357 ip_addr = mgmt_intf_desc.ip_address
1358 port = mgmt_intf_desc.port
1359
1360 return ip_addr, port
1361
1362 @property
1363 def msg(self):
1364 """ Message associated with this VNFR """
1365 vnfd_fields = ["short_name", "vendor", "description", "version"]
1366 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1367
1368 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1369 ip_address, port = self.mgmt_intf_info()
1370
1371 if ip_address is not None:
1372 mgmt_intf.ip_address = ip_address
1373 if port is not None:
1374 mgmt_intf.port = port
1375
1376 vnfr_dict = {"id": self._vnfr_id,
1377 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1378 "name": self.name,
1379 "member_vnf_index_ref": self.member_vnf_index,
1380 "operational_status": self.operational_status,
1381 "operational_status_details": self._state_failed_reason,
1382 "cloud_account": self.cloud_account_name,
1383 "config_status": self._config_status
1384 }
1385
1386 vnfr_dict.update(vnfd_copy_dict)
1387
1388 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1389 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1390
1391 vnfr_msg.create_time = self._create_time
1392 vnfr_msg.uptime = int(time.time()) - self._create_time
1393 vnfr_msg.mgmt_interface = mgmt_intf
1394
1395 # Add all the VLRs to VNFR
1396 for vlr in self._vlrs:
1397 ivlr = vnfr_msg.internal_vlr.add()
1398 ivlr.vlr_ref = vlr.vlr_id
1399
1400 # Add all the VDURs to VDUR
1401 if self._vdus is not None:
1402 for vdu in self._vdus:
1403 vdur = vnfr_msg.vdur.add()
1404 vdur.from_dict(vdu.msg.as_dict())
1405
1406 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1407 vnfr_msg.dashboard_url = self.dashboard_url
1408
1409 for cpr in self._cprs:
1410 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1411 vnfr_msg.connection_point.append(new_cp)
1412
1413 if self._vnf_mon is not None:
1414 for monp in self._vnf_mon.msg:
1415 vnfr_msg.monitoring_param.append(
1416 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1417
1418 if self._vnfr.vnf_configuration is not None:
1419 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1420
1421 for group in self._vnfr_msg.placement_groups_info:
1422 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1423 group_info.from_dict(group.as_dict())
1424 vnfr_msg.placement_groups_info.append(group_info)
1425
1426 return vnfr_msg
1427
1428 @property
1429 def dashboard_url(self):
1430 ip, cfg_port = self.mgmt_intf_info()
1431 protocol = 'http'
1432 http_port = 80
1433 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1434 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1435 protocol = 'https'
1436 http_port = 443
1437 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1438 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1439
1440 url = "{protocol}://{ip_address}:{port}/{path}".format(
1441 protocol=protocol,
1442 ip_address=ip,
1443 port=http_port,
1444 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1445 )
1446
1447 return url
1448
1449 @property
1450 def xpath(self):
1451 """ path for this VNFR """
1452 return("D,/vnfr:vnfr-catalog"
1453 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1454
1455 @asyncio.coroutine
1456 def publish(self, xact):
1457 """ publish this VNFR """
1458 vnfr = self.msg
1459 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1460 self.xpath, self.msg)
1461 vnfr.create_time = self._create_time
1462 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1463 self._log.debug("Published VNFR path = [%s], record = [%s]",
1464 self.xpath, self.msg)
1465
1466 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1467 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1468 if not vld.has_field('ip_profile_ref'):
1469 return None
1470 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1471 return profile[0] if profile else None
1472
1473 @asyncio.coroutine
1474 def create_vls(self):
1475 """ Publish The VLs associated with this VNF """
1476 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1477 self.vnfd_id)
1478 for ivld_msg in self.vnfd.internal_vld:
1479 self._log.debug("Creating internal vld:"
1480 " %s, int_cp_ref = %s",
1481 ivld_msg, ivld_msg.internal_connection_point
1482 )
1483 vlr = InternalVirtualLinkRecord(dts=self._dts,
1484 log=self._log,
1485 loop=self._loop,
1486 ivld_msg=ivld_msg,
1487 vnfr_name=self.name,
1488 cloud_account_name=self.cloud_account_name,
1489 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1490 )
1491 self._vlrs.append(vlr)
1492
1493 for int_cp in ivld_msg.internal_connection_point:
1494 if int_cp.id_ref in self._vlr_by_cp:
1495 msg = ("Connection point %s already "
1496 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1497 raise InternalVirtualLinkRecordError(msg)
1498 self._log.debug("Setting vlr %s to internal cp = %s",
1499 vlr, int_cp.id_ref)
1500 self._vlr_by_cp[int_cp.id_ref] = vlr
1501
1502 @asyncio.coroutine
1503 def instantiate_vls(self, xact, restart_mode=False):
1504 """ Instantiate the VLs associated with this VNF """
1505 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1506 self.vnfd_id)
1507
1508 for vlr in self._vlrs:
1509 self._log.debug("Instantiating VLR %s", vlr)
1510 yield from vlr.instantiate(xact, restart_mode)
1511
1512 def find_vlr_by_cp(self, cp_name):
1513 """ Find the VLR associated with the cp name """
1514 return self._vlr_by_cp[cp_name]
1515
1516 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1517 """
1518 Returns the cloud specific construct for placement group
1519 Arguments:
1520 input_group: VNFD PlacementGroup
1521 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1522 """
1523 copy_dict = ['name', 'requirement', 'strategy']
1524 for group_info in nsr_config.vnfd_placement_group_maps:
1525 if group_info.placement_group_ref == input_group.name and \
1526 group_info.vnfd_id_ref == self.vnfd_id:
1527 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1528 group_dict = {k:v for k,v in
1529 group_info.as_dict().items()
1530 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1531 for param in copy_dict:
1532 group_dict.update({param: getattr(input_group, param)})
1533 group.from_dict(group_dict)
1534 return group
1535 return None
1536
1537 @asyncio.coroutine
1538 def get_vdu_placement_groups(self, vdu, nsr_config):
1539 placement_groups = []
1540 ### Step-1: Get VNF level placement groups
1541 for group in self._vnfr_msg.placement_groups_info:
1542 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1543 #group_info.from_dict(group.as_dict())
1544 placement_groups.append(group)
1545
1546 ### Step-2: Get VDU level placement groups
1547 for group in self.vnfd.placement_groups:
1548 for member_vdu in group.member_vdus:
1549 if member_vdu.member_vdu_ref == vdu.id:
1550 group_info = self.resolve_placement_group_cloud_construct(group,
1551 nsr_config)
1552 if group_info is None:
1553 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1554 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1555 else:
1556 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1557 str(group_info),
1558 vdu.name,
1559 self.vnf_name,
1560 self.member_vnf_index)
1561 placement_groups.append(group_info)
1562
1563 return placement_groups
1564
1565 @asyncio.coroutine
1566 def vdu_cloud_init_instantiation(self):
1567 [vdu.vdud_cloud_init for vdu in self._vdus]
1568
1569 @asyncio.coroutine
1570 def create_vdus(self, vnfr, restart_mode=False):
1571 """ Create the VDUs associated with this VNF """
1572
1573 def get_vdur_id(vdud):
1574 """Get the corresponding VDUR's id for the VDUD. This is useful in
1575 case of a restart.
1576
1577 In restart mode we check for exiting VDUR's ID and use them, if
1578 available. This way we don't end up creating duplicate VDURs
1579 """
1580 vdur_id = None
1581
1582 if restart_mode and vdud is not None:
1583 try:
1584 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1585 vdur_id = vdur[0]
1586 except IndexError:
1587 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1588
1589 return vdur_id
1590
1591
1592 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1593
1594 # Get NSR config - Needed for placement groups and to derive VDU short-name
1595 nsr_config = yield from self.get_nsr_config()
1596
1597 for vdu in self._rw_vnfd.vdu:
1598 self._log.debug("Creating vdu: %s", vdu)
1599 vdur_id = get_vdur_id(vdu)
1600
1601
1602 placement_groups = yield from self.get_vdu_placement_groups(vdu, nsr_config)
1603 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1604 vdu.name,
1605 self.vnf_name,
1606 self.member_vnf_index,
1607 [ group.name for group in placement_groups],
1608 vdur_id)
1609
1610 vdur = VirtualDeploymentUnitRecord(
1611 dts=self._dts,
1612 log=self._log,
1613 loop=self._loop,
1614 vdud=vdu,
1615 vnfr=vnfr,
1616 nsr_config=nsr_config,
1617 mgmt_intf=self.has_mgmt_interface(vdu),
1618 mgmt_network=self._mgmt_network,
1619 cloud_account_name=self.cloud_account_name,
1620 vnfd_package_store=self._vnfd_package_store,
1621 vdur_id=vdur_id,
1622 placement_groups = placement_groups,
1623 )
1624 yield from vdur.vdu_opdata_register()
1625
1626 self._vdus.append(vdur)
1627
1628 @asyncio.coroutine
1629 def instantiate_vdus(self, xact, vnfr):
1630 """ Instantiate the VDUs associated with this VNF """
1631 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1632
1633 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1634
1635 # Identify any dependencies among the VDUs
1636 dependencies = collections.defaultdict(list)
1637 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1638
1639 for vdu in self._vdus:
1640 if vdu._vdud_cloud_init is not None:
1641 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1642 if vdu_id != vdu.vdu_id:
1643 # This means that vdu.vdu_id depends upon vdu_id,
1644 # i.e. vdu_id must be instantiated before
1645 # vdu.vdu_id.
1646 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1647
1648 # Define the terminal states of VDU instantiation
1649 terminal = (
1650 VDURecordState.READY,
1651 VDURecordState.TERMINATED,
1652 VDURecordState.FAILED,
1653 )
1654
1655 datastore = VdurDatastore()
1656 processed = set()
1657
1658 @asyncio.coroutine
1659 def instantiate_monitor(vdu):
1660 """Monitor the state of the VDU during instantiation
1661
1662 Arguments:
1663 vdu - a VirtualDeploymentUnitRecord
1664
1665 """
1666 # wait for the VDUR to enter a terminal state
1667 while vdu._state not in terminal:
1668 yield from asyncio.sleep(1, loop=self._loop)
1669 # update the datastore
1670 datastore.update(vdu)
1671
1672 # add the VDU to the set of processed VDUs
1673 processed.add(vdu.vdu_id)
1674
1675 @asyncio.coroutine
1676 def instantiate(vdu):
1677 """Instantiate the specified VDU
1678
1679 Arguments:
1680 vdu - a VirtualDeploymentUnitRecord
1681
1682 Raises:
1683 if the VDU, or any of the VDUs this VDU depends upon, are
1684 terminated or fail to instantiate properly, a
1685 VirtualDeploymentUnitRecordError is raised.
1686
1687 """
1688 for dependency in dependencies[vdu.vdu_id]:
1689 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1690
1691 while dependency.vdu_id not in processed:
1692 yield from asyncio.sleep(1, loop=self._loop)
1693
1694 if not dependency.active:
1695 raise VirtualDeploymentUnitRecordError()
1696
1697 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1698
1699 # Populate the datastore with the current values of the VDU
1700 datastore.add(vdu)
1701
1702 # Substitute any variables contained in the cloud config script
1703 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1704
1705 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1706 if len(parts) > 1:
1707
1708 # Extract the variable names
1709 variables = list()
1710 for variable in parts[1::2]:
1711 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1712
1713 # Iterate of the variables and substitute values from the
1714 # datastore.
1715 for variable in variables:
1716
1717 # Handle a reference to a VDU by ID
1718 if variable.startswith('vdu['):
1719 value = datastore.get(variable)
1720 if value is None:
1721 msg = "Unable to find a substitute for {} in {} cloud-init script"
1722 raise ValueError(msg.format(variable, vdu.vdu_id))
1723
1724 config = config.replace("{{ %s }}" % variable, value)
1725 continue
1726
1727 # Handle a reference to the current VDU
1728 if variable.startswith('vdu'):
1729 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1730 config = config.replace("{{ %s }}" % variable, value)
1731 continue
1732
1733 # Handle unrecognized variables
1734 msg = 'unrecognized cloud-config variable: {}'
1735 raise ValueError(msg.format(variable))
1736
1737 # Instantiate the VDU
1738 with self._dts.transaction() as xact:
1739 self._log.debug("Instantiating vdu: %s", vdu)
1740 yield from vdu.instantiate(xact, vnfr, config=config)
1741 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1742 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1743 self.vnfr_id, vdu)
1744
1745 # First create a set of tasks to monitor the state of the VDUs and
1746 # report when they have entered a terminal state
1747 for vdu in self._vdus:
1748 self._loop.create_task(instantiate_monitor(vdu))
1749
1750 for vdu in self._vdus:
1751 self._loop.create_task(instantiate(vdu))
1752
1753 def has_mgmt_interface(self, vdu):
1754 # ## TODO: Support additional mgmt_interface type options
1755 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1756 return True
1757 return False
1758
1759 def vlr_xpath(self, vlr_id):
1760 """ vlr xpath """
1761 return(
1762 "D,/vlr:vlr-catalog/"
1763 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1764
1765 def ext_vlr_by_id(self, vlr_id):
1766 """ find ext vlr by id """
1767 return self._ext_vlrs[vlr_id]
1768
1769 @asyncio.coroutine
1770 def publish_inventory(self, xact):
1771 """ Publish the inventory associated with this VNF """
1772 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1773
1774 for component in self._rw_vnfd.component:
1775 self._log.debug("Creating inventory component %s", component)
1776 mangled_name = VcsComponent.mangle_name(component.component_name,
1777 self.vnf_name,
1778 self.vnfd_id
1779 )
1780 comp = VcsComponent(dts=self._dts,
1781 log=self._log,
1782 loop=self._loop,
1783 cluster_name=self._cluster_name,
1784 vcs_handler=self._vcs_handler,
1785 component=component,
1786 mangled_name=mangled_name,
1787 )
1788 if comp.name in self._inventory:
1789 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1790 component, self._vnfd_id)
1791 return
1792 self._log.debug("Adding component %s for vnrf %s",
1793 comp.name, self._vnfr_id)
1794 self._inventory[comp.name] = comp
1795 yield from comp.publish(xact)
1796
1797 def all_vdus_active(self):
1798 """ Are all VDUS in this VNFR active? """
1799 for vdu in self._vdus:
1800 if not vdu.active:
1801 return False
1802
1803 self._log.debug("Inside all_vdus_active. Returning True")
1804 return True
1805
1806 @asyncio.coroutine
1807 def instantiation_failed(self, failed_reason=None):
1808 """ VNFR instantiation failed """
1809 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1810 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1811 self._state_failed_reason = failed_reason
1812
1813 # Update the VNFR with the changed status
1814 yield from self.publish(None)
1815
1816 @asyncio.coroutine
1817 def is_ready(self):
1818 """ This VNF is ready"""
1819 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1820
1821 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1822 self.set_state(VirtualNetworkFunctionRecordState.READY)
1823
1824 else:
1825 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1826
1827 # Update the VNFR with the changed status
1828 yield from self.publish(None)
1829
1830 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1831 """Updated the connection point with ip address"""
1832 for cp in self._cprs:
1833 if cp.name == cp_name:
1834 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1835 cp_name, cp, ip_address, cp_id)
1836 cp.ip_address = ip_address
1837 cp.mac_address = mac_addr
1838 cp.connection_point_id = cp_id
1839 return
1840
1841 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1842 self._log.debug(err)
1843 raise VirtualDeploymentUnitRecordError(err)
1844
1845 def set_state(self, state):
1846 """ Set state for this VNFR"""
1847 self._state = state
1848
1849 @asyncio.coroutine
1850 def instantiate(self, xact, restart_mode=False):
1851 """ instantiate this VNF """
1852 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1853 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1854
1855 @asyncio.coroutine
1856 def fetch_vlrs():
1857 """ Fetch VLRs """
1858 # Iterate over all the connection points in VNFR and fetch the
1859 # associated VLRs
1860
1861 def cpr_from_cp(cp):
1862 """ Creates a record level connection point from the desciptor cp"""
1863 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1864 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1865 cpr_dict = {}
1866 cpr_dict.update(cp_copy_dict)
1867 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1868
1869 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1870 self._vnfr_id, self._vnfr.connection_point)
1871
1872 for cp in self._vnfr.connection_point:
1873 cpr = cpr_from_cp(cp)
1874 self._cprs.append(cpr)
1875 self._log.debug("Adding Connection point record %s ", cp)
1876
1877 vlr_path = self.vlr_xpath(cp.vlr_ref)
1878 self._log.debug("Fetching VLR with path = %s", vlr_path)
1879 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1880 rwdts.XactFlag.MERGE)
1881 for i in res_iter:
1882 r = yield from i
1883 d = r.result
1884 self._ext_vlrs[cp.vlr_ref] = d
1885 cpr.vlr_ref = cp.vlr_ref
1886 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1887
1888 # Increase the VNFD reference count
1889 self.vnfd_ref()
1890
1891 assert self.vnfd
1892
1893 # Fetch External VLRs
1894 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1895 yield from fetch_vlrs()
1896
1897 # Publish inventory
1898 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1899 yield from self.publish_inventory(xact)
1900
1901 # Publish inventory
1902 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1903 yield from self.create_vls()
1904
1905 # publish the VNFR
1906 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1907 yield from self.publish(xact)
1908
1909
1910 # instantiate VLs
1911 self._log.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self._vnfr_id, restart_mode)
1912 try:
1913 yield from self.instantiate_vls(xact, restart_mode)
1914 except Exception as e:
1915 self._log.exception("VL instantiation failed (%s)", str(e))
1916 yield from self.instantiation_failed(str(e))
1917 return
1918
1919 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1920
1921 # instantiate VDUs
1922 self._log.debug("VNFR-ID %s: Create VDUs, restart mode %s", self._vnfr_id, restart_mode)
1923 yield from self.create_vdus(self, restart_mode)
1924
1925 try:
1926 yield from self.vdu_cloud_init_instantiation()
1927 except Exception as e:
1928 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1929 self._state_failed_reason = str(e)
1930 yield from self.publish(xact)
1931
1932 # publish the VNFR
1933 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1934 yield from self.publish(xact)
1935
1936 # instantiate VDUs
1937 # ToDo: Check if this should be prevented during restart
1938 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1939 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1940
1941 # publish the VNFR
1942 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1943 yield from self.publish(xact)
1944
1945 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1946
1947 # create task updating uptime for this vnfr
1948 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1949 self._loop.create_task(self.vnfr_uptime_update(xact))
1950
1951 @asyncio.coroutine
1952 def terminate(self, xact):
1953 """ Terminate this virtual network function """
1954
1955 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1956
1957 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1958
1959 # stop monitoring
1960 if self._vnf_mon is not None:
1961 self._vnf_mon.stop()
1962 self._vnf_mon.deregister()
1963 self._vnf_mon = None
1964
1965 @asyncio.coroutine
1966 def terminate_vls():
1967 """ Terminate VLs in this VNF """
1968 for vl in self._vlrs:
1969 yield from vl.terminate(xact)
1970
1971 @asyncio.coroutine
1972 def terminate_vdus():
1973 """ Terminate VDUS in this VNF """
1974 for vdu in self._vdus:
1975 yield from vdu.terminate(xact)
1976
1977 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1978 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1979 yield from terminate_vls()
1980
1981 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1982 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1983 yield from terminate_vdus()
1984
1985 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1986 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1987
1988 @asyncio.coroutine
1989 def vnfr_uptime_update(self, xact):
1990 while True:
1991 # Return when vnfr state is FAILED or TERMINATED etc
1992 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1993 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1994 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1995 VirtualNetworkFunctionRecordState.READY]:
1996 return
1997 yield from self.publish(xact)
1998 yield from asyncio.sleep(2, loop=self._loop)
1999
2000
2001
2002 class VnfdDtsHandler(object):
2003 """ DTS handler for VNFD config changes """
2004 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2005
2006 def __init__(self, dts, log, loop, vnfm):
2007 self._dts = dts
2008 self._log = log
2009 self._loop = loop
2010 self._vnfm = vnfm
2011 self._regh = None
2012
2013 @asyncio.coroutine
2014 def regh(self):
2015 """ DTS registration handle """
2016 return self._regh
2017
2018 @asyncio.coroutine
2019 def register(self):
2020 """ Register for VNFD configuration"""
2021
2022 def on_apply(dts, acg, xact, action, scratch):
2023 """Apply the configuration"""
2024 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2025 xact, action, scratch)
2026
2027 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2028
2029 @asyncio.coroutine
2030 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2031 """ on prepare callback """
2032 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2033 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
2034 fref = ProtobufC.FieldReference.alloc()
2035 fref.goto_whole_message(msg.to_pbcm())
2036
2037 # Handle deletes in prepare_callback
2038 if fref.is_field_deleted():
2039 # Delete an VNFD record
2040 self._log.debug("Deleting VNFD with id %s", msg.id)
2041 if self._vnfm.vnfd_in_use(msg.id):
2042 self._log.debug("Cannot delete VNFD in use - %s", msg)
2043 err = "Cannot delete a VNFD in use - %s" % msg
2044 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2045 # Delete a VNFD record
2046 yield from self._vnfm.delete_vnfd(msg.id)
2047
2048 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2049
2050 self._log.debug(
2051 "Registering for VNFD config using xpath: %s",
2052 VnfdDtsHandler.XPATH,
2053 )
2054 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2055 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2056 self._regh = acg.register(
2057 xpath=VnfdDtsHandler.XPATH,
2058 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2059 on_prepare=on_prepare)
2060
2061
2062 class VcsComponentDtsHandler(object):
2063 """ Vcs Component DTS handler """
2064 XPATH = ("D,/rw-manifest:manifest" +
2065 "/rw-manifest:operational-inventory" +
2066 "/rw-manifest:component")
2067
2068 def __init__(self, dts, log, loop, vnfm):
2069 self._dts = dts
2070 self._log = log
2071 self._loop = loop
2072 self._regh = None
2073 self._vnfm = vnfm
2074
2075 @property
2076 def regh(self):
2077 """ DTS registration handle """
2078 return self._regh
2079
2080 @asyncio.coroutine
2081 def register(self):
2082 """ Registers VCS component dts publisher registration"""
2083 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2084 VcsComponentDtsHandler.XPATH)
2085
2086 hdl = rift.tasklets.DTS.RegistrationHandler()
2087 handlers = rift.tasklets.Group.Handler()
2088 with self._dts.group_create(handler=handlers) as group:
2089 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2090 handler=hdl,
2091 flags=(rwdts.Flag.PUBLISHER |
2092 rwdts.Flag.NO_PREP_READ |
2093 rwdts.Flag.DATASTORE),)
2094
2095 @asyncio.coroutine
2096 def publish(self, xact, path, msg):
2097 """ Publishes the VCS component """
2098 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2099 xact, path, msg)
2100 self.regh.create_element(path, msg)
2101 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2102 VcsComponentDtsHandler.XPATH, xact, path, msg)
2103
2104 class VnfrConsoleOperdataDtsHandler(object):
2105 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2106 @property
2107 def vnfr_vdu_console_xpath(self):
2108 """ path for resource-mgr"""
2109 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2110
2111 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2112 self._dts = dts
2113 self._log = log
2114 self._loop = loop
2115 self._regh = None
2116 self._vnfm = vnfm
2117
2118 self._vnfr_id = vnfr_id
2119 self._vdur_id = vdur_id
2120 self._vdu_id = vdu_id
2121
2122 @asyncio.coroutine
2123 def register(self):
2124 """ Register for VNFR VDU Operational Data read from dts """
2125
2126 @asyncio.coroutine
2127 def on_prepare(xact_info, action, ks_path, msg):
2128 """ prepare callback from dts """
2129 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2130 self._log.debug(
2131 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2132 xact_info, action, xpath, msg
2133 )
2134
2135 if action == rwdts.QueryAction.READ:
2136 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2137 path_entry = schema.keyspec_to_entry(ks_path)
2138 self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id))
2139 try:
2140 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2141 except VnfRecordError as e:
2142 self._log.error("VNFR id %s not found", self._vnfr_id)
2143 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2144 return
2145 try:
2146 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2147 if not vdur._state == VDURecordState.READY:
2148 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2149 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2150 return
2151 with self._dts.transaction() as new_xact:
2152 resp = yield from vdur.read_resource(new_xact)
2153 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2154 vdur_console.id = self._vdur_id
2155 if resp.console_url:
2156 vdur_console.console_url = resp.console_url
2157 else:
2158 vdur_console.console_url = 'none'
2159 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2160 except Exception:
2161 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2162 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2163 vdur_console.id = self._vdur_id
2164 vdur_console.console_url = 'none'
2165
2166 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2167 xpath=self.vnfr_vdu_console_xpath,
2168 msg=vdur_console)
2169 else:
2170 #raise VnfRecordError("Not supported operation %s" % action)
2171 self._log.error("Not supported operation %s" % action)
2172 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2173 return
2174
2175
2176 self._log.debug("Registering for VNFR VDU using xpath: %s",
2177 self.vnfr_vdu_console_xpath)
2178 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2179 with self._dts.group_create() as group:
2180 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2181 handler=hdl,
2182 flags=rwdts.Flag.PUBLISHER,
2183 )
2184
2185
2186 class VnfrDtsHandler(object):
2187 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2188 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2189
2190 def __init__(self, dts, log, loop, vnfm):
2191 self._dts = dts
2192 self._log = log
2193 self._loop = loop
2194 self._vnfm = vnfm
2195
2196 self._regh = None
2197
2198 @property
2199 def regh(self):
2200 """ Return registration handle"""
2201 return self._regh
2202
2203 @property
2204 def vnfm(self):
2205 """ Return VNF manager instance """
2206 return self._vnfm
2207
2208 @asyncio.coroutine
2209 def register(self):
2210 """ Register for vnfr create/update/delete/read requests from dts """
2211 def on_commit(xact_info):
2212 """ The transaction has been committed """
2213 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2214 return rwdts.MemberRspCode.ACTION_OK
2215
2216 def on_abort(*args):
2217 """ Abort callback """
2218 self._log.debug("VNF transaction got aborted")
2219
2220 @asyncio.coroutine
2221 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2222
2223 @asyncio.coroutine
2224 def instantiate_realloc_vnfr(vnfr):
2225 """Re-populate the vnfm after restart
2226
2227 Arguments:
2228 vlink
2229
2230 """
2231
2232 yield from vnfr.instantiate(None, restart_mode=True)
2233
2234 if xact_event == rwdts.MemberEvent.INSTALL:
2235 curr_cfg = self.regh.elements
2236 for cfg in curr_cfg:
2237 vnfr = self.vnfm.create_vnfr(cfg)
2238 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2239
2240 self._log.debug("Got on_event in vnfm")
2241
2242 return rwdts.MemberRspCode.ACTION_OK
2243
2244 @asyncio.coroutine
2245 def on_prepare(xact_info, action, ks_path, msg):
2246 """ prepare callback from dts """
2247 self._log.debug(
2248 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2249 xact_info, action, msg
2250 )
2251
2252 if action == rwdts.QueryAction.CREATE:
2253 if not msg.has_field("vnfd"):
2254 err = "Vnfd not provided"
2255 self._log.error(err)
2256 raise VnfRecordError(err)
2257
2258 vnfr = self.vnfm.create_vnfr(msg)
2259 try:
2260 # RIFT-9105: Unable to add a READ query under an existing transaction
2261 # xact = xact_info.xact
2262 yield from vnfr.instantiate(None)
2263 except Exception as e:
2264 self._log.exception(e)
2265 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2266 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2267 yield from vnfr.publish(None)
2268 elif action == rwdts.QueryAction.DELETE:
2269 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2270 path_entry = schema.keyspec_to_entry(ks_path)
2271 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2272
2273 if vnfr is None:
2274 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2275 raise VirtualNetworkFunctionRecordNotFound(
2276 "VNFR id %s", path_entry.key00.id)
2277
2278 try:
2279 yield from vnfr.terminate(xact_info.xact)
2280 # Unref the VNFD
2281 vnfr.vnfd_unref()
2282 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2283 except Exception as e:
2284 self._log.exception(e)
2285 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2286
2287 elif action == rwdts.QueryAction.UPDATE:
2288 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2289 path_entry = schema.keyspec_to_entry(ks_path)
2290 vnfr = None
2291 try:
2292 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2293 except Exception as e:
2294 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2295 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2296 return
2297
2298 if vnfr is None:
2299 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2300 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2301 return
2302
2303 self._log.debug("VNFR {} update config status {} (current {})".
2304 format(vnfr.name, msg.config_status, vnfr.config_status))
2305 # Update the config status and publish
2306 vnfr._config_status = msg.config_status
2307 yield from vnfr.publish(None)
2308
2309 else:
2310 raise NotImplementedError(
2311 "%s action on VirtualNetworkFunctionRecord not supported",
2312 action)
2313
2314 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2315
2316 self._log.debug("Registering for VNFR using xpath: %s",
2317 VnfrDtsHandler.XPATH,)
2318
2319 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2320 on_prepare=on_prepare,)
2321 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2322 with self._dts.group_create(handler=handlers) as group:
2323 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2324 handler=hdl,
2325 flags=(rwdts.Flag.PUBLISHER |
2326 rwdts.Flag.NO_PREP_READ |
2327 rwdts.Flag.CACHE |
2328 rwdts.Flag.DATASTORE),)
2329
2330 @asyncio.coroutine
2331 def create(self, xact, path, msg):
2332 """
2333 Create a VNFR record in DTS with path and message
2334 """
2335 self._log.debug("Creating VNFR xact = %s, %s:%s",
2336 xact, path, msg)
2337
2338 self.regh.create_element(path, msg)
2339 self._log.debug("Created VNFR xact = %s, %s:%s",
2340 xact, path, msg)
2341
2342 @asyncio.coroutine
2343 def update(self, xact, path, msg):
2344 """
2345 Update a VNFR record in DTS with path and message
2346 """
2347 self._log.debug("Updating VNFR xact = %s, %s:%s",
2348 xact, path, msg)
2349 self.regh.update_element(path, msg)
2350 self._log.debug("Updated VNFR xact = %s, %s:%s",
2351 xact, path, msg)
2352
2353 @asyncio.coroutine
2354 def delete(self, xact, path):
2355 """
2356 Delete a VNFR record in DTS with path and message
2357 """
2358 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2359 self.regh.delete_element(path)
2360 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2361
2362
2363 class VnfdRefCountDtsHandler(object):
2364 """ The VNFD Ref Count DTS handler """
2365 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2366
2367 def __init__(self, dts, log, loop, vnfm):
2368 self._dts = dts
2369 self._log = log
2370 self._loop = loop
2371 self._vnfm = vnfm
2372
2373 self._regh = None
2374
2375 @property
2376 def regh(self):
2377 """ Return registration handle """
2378 return self._regh
2379
2380 @property
2381 def vnfm(self):
2382 """ Return the NS manager instance """
2383 return self._vnfm
2384
2385 @asyncio.coroutine
2386 def register(self):
2387 """ Register for VNFD ref count read from dts """
2388
2389 @asyncio.coroutine
2390 def on_prepare(xact_info, action, ks_path, msg):
2391 """ prepare callback from dts """
2392 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2393 self._log.debug(
2394 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2395 xact_info, action, xpath, msg
2396 )
2397
2398 if action == rwdts.QueryAction.READ:
2399 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2400 path_entry = schema.keyspec_to_entry(ks_path)
2401 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2402 for xpath, msg in vnfd_list:
2403 self._log.debug("Responding to ref count query path:%s, msg:%s",
2404 xpath, msg)
2405 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2406 xpath=xpath,
2407 msg=msg)
2408 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2409 else:
2410 raise VnfRecordError("Not supported operation %s" % action)
2411
2412 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2413 with self._dts.group_create() as group:
2414 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2415 handler=hdl,
2416 flags=rwdts.Flag.PUBLISHER,
2417 )
2418
2419
2420 class VdurDatastore(object):
2421 """
2422 This VdurDatastore is intended to expose select information about a VDUR
2423 such that it can be referenced in a cloud config file. The data that is
2424 exposed does not necessarily follow the structure of the data in the yang
2425 model. This is intentional. The data that are exposed are intended to be
2426 agnostic of the yang model so that changes in the model do not necessarily
2427 require changes to the interface provided to the user. It also means that
2428 the user does not need to be familiar with the RIFT.ware yang models.
2429 """
2430
2431 def __init__(self):
2432 """Create an instance of VdurDatastore"""
2433 self._vdur_data = dict()
2434 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2435
2436 def add(self, vdur):
2437 """Add a new VDUR to the datastore
2438
2439 Arguments:
2440 vdur - a VirtualDeploymentUnitRecord instance
2441
2442 Raises:
2443 A ValueError is raised if the VDUR is (1) None or (2) already in
2444 the datastore.
2445
2446 """
2447 if vdur.vdu_id is None:
2448 raise ValueError('VDURs are required to have an ID')
2449
2450 if vdur.vdu_id in self._vdur_data:
2451 raise ValueError('cannot add a VDUR more than once')
2452
2453 self._vdur_data[vdur.vdu_id] = dict()
2454
2455 def set_if_not_none(key, attr):
2456 if attr is not None:
2457 self._vdur_data[vdur.vdu_id][key] = attr
2458
2459 set_if_not_none('name', vdur._vdud.name)
2460 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2461 # The below can be used for hostname
2462 set_if_not_none('vdur_name', vdur.unique_short_name)
2463
2464 def update(self, vdur):
2465 """Update the VDUR information in the datastore
2466
2467 Arguments:
2468 vdur - a GI representation of a VDUR
2469
2470 Raises:
2471 A ValueError is raised if the VDUR is (1) None or (2) already in
2472 the datastore.
2473
2474 """
2475 if vdur.vdu_id is None:
2476 raise ValueError('VNFDs are required to have an ID')
2477
2478 if vdur.vdu_id not in self._vdur_data:
2479 raise ValueError('VNF is not recognized')
2480
2481 def set_or_delete(key, attr):
2482 if attr is None:
2483 if key in self._vdur_data[vdur.vdu_id]:
2484 del self._vdur_data[vdur.vdu_id][key]
2485
2486 else:
2487 self._vdur_data[vdur.vdu_id][key] = attr
2488
2489 set_or_delete('name', vdur._vdud.name)
2490 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2491 # The below can be used for hostname
2492 set_or_delete('vdur_name', vdur.unique_short_name)
2493
2494 def remove(self, vdur_id):
2495 """Remove all of the data associated with specified VDUR
2496
2497 Arguments:
2498 vdur_id - the identifier of a VNFD in the datastore
2499
2500 Raises:
2501 A ValueError is raised if the VDUR is not contained in the
2502 datastore.
2503
2504 """
2505 if vdur_id not in self._vdur_data:
2506 raise ValueError('VNF is not recognized')
2507
2508 del self._vdur_data[vdur_id]
2509
2510 def get(self, expr):
2511 """Retrieve VDUR information from the datastore
2512
2513 An expression should be of the form,
2514
2515 vdu[<id>].<attr>
2516
2517 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2518 the exposed attribute that the user wishes to retrieve.
2519
2520 If the requested data is not available, None is returned.
2521
2522 Arguments:
2523 expr - a string that specifies the data to return
2524
2525 Raises:
2526 A ValueError is raised if the provided expression cannot be parsed.
2527
2528 Returns:
2529 The requested data or None
2530
2531 """
2532 result = self._pattern.match(expr)
2533 if result is None:
2534 raise ValueError('data expression not recognized ({})'.format(expr))
2535
2536 vdur_id, key = result.groups()
2537
2538 if vdur_id not in self._vdur_data:
2539 return None
2540
2541 return self._vdur_data[vdur_id].get(key, None)
2542
2543
2544 class VnfManager(object):
2545 """ The virtual network function manager class """
2546 def __init__(self, dts, log, loop, cluster_name):
2547 self._dts = dts
2548 self._log = log
2549 self._loop = loop
2550 self._cluster_name = cluster_name
2551
2552 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2553 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2554 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2555 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2556
2557 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2558 self._vnfr_handler,
2559 self._vcs_handler,
2560 self._vnfr_ref_handler,
2561 self._nsr_handler]
2562 self._vnfrs = {}
2563 self._vnfds_to_vnfr = {}
2564 self._nsrs = {}
2565
2566 @property
2567 def vnfr_handler(self):
2568 """ VNFR dts handler """
2569 return self._vnfr_handler
2570
2571 @property
2572 def vcs_handler(self):
2573 """ VCS dts handler """
2574 return self._vcs_handler
2575
2576 @asyncio.coroutine
2577 def register(self):
2578 """ Register all static DTS handlers """
2579 for hdl in self._dts_handlers:
2580 yield from hdl.register()
2581
2582 @asyncio.coroutine
2583 def run(self):
2584 """ Run this VNFM instance """
2585 self._log.debug("Run VNFManager - registering static DTS handlers""")
2586 yield from self.register()
2587
2588 def handle_nsr(self, nsr, action):
2589 if action in [rwdts.QueryAction.CREATE]:
2590 self._nsrs[nsr.id] = nsr
2591 elif action == rwdts.QueryAction.DELETE:
2592 if nsr.id in self._nsrs:
2593 del self._nsrs[nsr.id]
2594
2595 def get_linked_mgmt_network(self, vnfr):
2596 """For the given VNFR get the related mgmt network from the NSD, if
2597 available.
2598 """
2599 vnfd_id = vnfr.vnfd.id
2600 nsr_id = vnfr.nsr_id_ref
2601
2602 # for the given related VNFR, get the corresponding NSR-config
2603 nsr_obj = None
2604 try:
2605 nsr_obj = self._nsrs[nsr_id]
2606 except KeyError:
2607 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2608
2609 # for the related NSD check if a VLD exists such that it's a mgmt
2610 # network
2611 for vld in nsr_obj.nsd.vld:
2612 if vld.mgmt_network:
2613 return vld.name
2614
2615 return None
2616
2617 def get_vnfr(self, vnfr_id):
2618 """ get VNFR by vnfr id """
2619
2620 if vnfr_id not in self._vnfrs:
2621 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2622
2623 return self._vnfrs[vnfr_id]
2624
2625 def create_vnfr(self, vnfr):
2626 """ Create a VNFR instance """
2627 if vnfr.id in self._vnfrs:
2628 msg = "Vnfr id %s already exists" % vnfr.id
2629 self._log.error(msg)
2630 raise VnfRecordError(msg)
2631
2632 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2633 vnfr.id,
2634 vnfr.vnfd.id)
2635
2636 mgmt_network = self.get_linked_mgmt_network(vnfr)
2637
2638 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2639 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2640 mgmt_network=mgmt_network
2641 )
2642
2643 #Update ref count
2644 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2645 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2646 else:
2647 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2648
2649 return self._vnfrs[vnfr.id]
2650
2651 @asyncio.coroutine
2652 def delete_vnfr(self, xact, vnfr):
2653 """ Create a VNFR instance """
2654 if vnfr.vnfr_id in self._vnfrs:
2655 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2656 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2657
2658 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2659 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2660 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2661
2662 del self._vnfrs[vnfr.vnfr_id]
2663
2664 @asyncio.coroutine
2665 def fetch_vnfd(self, vnfd_id):
2666 """ Fetch VNFDs based with the vnfd id"""
2667 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2668 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2669 vnfd = None
2670
2671 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2672
2673 for ent in res_iter:
2674 res = yield from ent
2675 vnfd = res.result
2676
2677 if vnfd is None:
2678 err = "Failed to get Vnfd %s" % vnfd_id
2679 self._log.error(err)
2680 raise VnfRecordError(err)
2681
2682 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2683
2684 return vnfd
2685
2686 def vnfd_in_use(self, vnfd_id):
2687 """ Is this VNFD in use """
2688 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2689 if vnfd_id in self._vnfds_to_vnfr:
2690 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2691 return False
2692
2693 @asyncio.coroutine
2694 def publish_vnfr(self, xact, path, msg):
2695 """ Publish a VNFR """
2696 self._log.debug("publish_vnfr called with path %s, msg %s",
2697 path, msg)
2698 yield from self.vnfr_handler.update(xact, path, msg)
2699
2700 @asyncio.coroutine
2701 def delete_vnfd(self, vnfd_id):
2702 """ Delete the Virtual Network Function descriptor with the passed id """
2703 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2704 if vnfd_id in self._vnfds_to_vnfr:
2705 if self._vnfds_to_vnfr[vnfd_id]:
2706 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2707 vnfd_id,
2708 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2709 raise VirtualNetworkFunctionDescriptorRefCountExists(
2710 "Cannot delete :%s, ref_count:%s",
2711 vnfd_id,
2712 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2713
2714 del self._vnfds_to_vnfr[vnfd_id]
2715
2716 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2717 try:
2718 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2719 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2720 if os.path.exists(vnfd_dir):
2721 shutil.rmtree(vnfd_dir, ignore_errors=True)
2722 except Exception as e:
2723 self._log.error("Exception in cleaning up VNFD {}: {}".
2724 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2725 self._log.exception(e)
2726
2727
2728 def vnfd_refcount_xpath(self, vnfd_id):
2729 """ xpath for ref count entry """
2730 return (VnfdRefCountDtsHandler.XPATH +
2731 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2732
2733 @asyncio.coroutine
2734 def get_vnfd_refcount(self, vnfd_id):
2735 """ Get the vnfd_list from this VNFM"""
2736 vnfd_list = []
2737 if vnfd_id is None or vnfd_id == "":
2738 for vnfd in self._vnfds_to_vnfr.keys():
2739 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2740 vnfd_msg.vnfd_id_ref = vnfd
2741 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2742 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2743 elif vnfd_id in self._vnfds_to_vnfr:
2744 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2745 vnfd_msg.vnfd_id_ref = vnfd_id
2746 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2747 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2748
2749 return vnfd_list
2750
2751
2752 class VnfmTasklet(rift.tasklets.Tasklet):
2753 """ VNF Manager tasklet class """
2754 def __init__(self, *args, **kwargs):
2755 super(VnfmTasklet, self).__init__(*args, **kwargs)
2756 self.rwlog.set_category("rw-mano-log")
2757 self.rwlog.set_subcategory("vnfm")
2758
2759 self._dts = None
2760 self._vnfm = None
2761
2762 def start(self):
2763 try:
2764 super(VnfmTasklet, self).start()
2765 self.log.info("Starting VnfmTasklet")
2766
2767 self.log.setLevel(logging.DEBUG)
2768
2769 self.log.debug("Registering with dts")
2770 self._dts = rift.tasklets.DTS(self.tasklet_info,
2771 RwVnfmYang.get_schema(),
2772 self.loop,
2773 self.on_dts_state_change)
2774
2775 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2776 except Exception:
2777 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2778 raise
2779
2780 def on_instance_started(self):
2781 """ Task insance started callback """
2782 self.log.debug("Got instance started callback")
2783
2784 def stop(self):
2785 try:
2786 self._dts.deinit()
2787 except Exception:
2788 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2789 raise
2790
2791 @asyncio.coroutine
2792 def init(self):
2793 """ Task init callback """
2794 try:
2795 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2796 assert vm_parent_name is not None
2797 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2798 yield from self._vnfm.run()
2799 except Exception:
2800 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2801 raise
2802
2803 @asyncio.coroutine
2804 def run(self):
2805 """ Task run callback """
2806 pass
2807
2808 @asyncio.coroutine
2809 def on_dts_state_change(self, state):
2810 """Take action according to current dts state to transition
2811 application into the corresponding application state
2812
2813 Arguments
2814 state - current dts state
2815 """
2816 switch = {
2817 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2818 rwdts.State.CONFIG: rwdts.State.RUN,
2819 }
2820
2821 handlers = {
2822 rwdts.State.INIT: self.init,
2823 rwdts.State.RUN: self.run,
2824 }
2825
2826 # Transition application to next state
2827 handler = handlers.get(state, None)
2828 if handler is not None:
2829 yield from handler()
2830
2831 # Transition dts to next state
2832 next_state = switch.get(state, None)
2833 if next_state is not None:
2834 self._dts.handle.set_state(next_state)