14153c9bd53123fe004c56d10bd7723e5a99184b
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 vdurvol_data[0]['custom_meta_data'] = metadata_list
429
430 if self._vm_resp.has_field('supplemental_boot_data'):
431 vdur_dict['supplemental_boot_data'] = dict()
432 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
433 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
434 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
435 metadata_list = list()
436 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
437 metadata_list.append(metadata_item.as_dict())
438 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
439 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
440 file_list = list()
441 for file_item in self._vm_resp.supplemental_boot_data.config_file:
442 file_list.append(file_item.as_dict())
443 vdur_dict['supplemental_boot_data']['config_file'] = file_list
444
445 icp_list = []
446 ii_list = []
447
448 for intf, cp_id, vlr in self._int_intf:
449 cp = self.find_internal_cp_by_cp_id(cp_id)
450
451 icp_list.append({"name": cp.name,
452 "id": cp.id,
453 "type_yang": "VPORT",
454 "ip_address": self.cp_ip_addr(cp.id),
455 "mac_address": self.cp_mac_addr(cp.id)})
456
457 ii_list.append({"name": intf.name,
458 "vdur_internal_connection_point_ref": cp.id,
459 "virtual_interface": {}})
460
461 vdur_dict["internal_connection_point"] = icp_list
462 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
463 vdur_dict["internal_interface"] = ii_list
464
465 ei_list = []
466 for intf, cp, vlr in self._ext_intf:
467 ei_list.append({"name": cp.name,
468 "vnfd_connection_point_ref": cp.name,
469 "virtual_interface": {}})
470 self._vnfr.update_cp(cp.name,
471 self.cp_ip_addr(cp.name),
472 self.cp_mac_addr(cp.name),
473 self.cp_id(cp.name))
474
475 vdur_dict["external_interface"] = ei_list
476
477 placement_groups = []
478 for group in self._placement_groups:
479 placement_groups.append(group.as_dict())
480 vdur_dict['placement_groups_info'] = placement_groups
481
482 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
483
484 @property
485 def resmgr_path(self):
486 """ path for resource-mgr"""
487 return ("D,/rw-resource-mgr:resource-mgmt" +
488 "/vdu-event" +
489 "/vdu-event-data[event-id='{}']".format(self._request_id))
490
491 @property
492 def vm_flavor_msg(self):
493 """ VM flavor message """
494 flavor = self._vdud.vm_flavor.__class__()
495 flavor.copy_from(self._vdud.vm_flavor)
496
497 return flavor
498
499 @property
500 def vdud_cloud_init(self):
501 """ Return the cloud-init contents for the VDU """
502 if self._vdud_cloud_init is None:
503 self._vdud_cloud_init = self.cloud_init()
504
505 return self._vdud_cloud_init
506
507 def cloud_init(self):
508 """ Populate cloud_init with cloud-config script from
509 either the inline contents or from the file provided
510 """
511 if self._vdud.cloud_init is not None:
512 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
513 return self._vdud.cloud_init
514 elif self._vdud.cloud_init_file is not None:
515 # Get cloud-init script contents from the file provided in the cloud_init_file param
516 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
517 filename = self._vdud.cloud_init_file
518 self._vnfd_package_store.refresh()
519 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
520 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
521 try:
522 return cloud_init_extractor.read_script(stored_package, filename)
523 except rift.package.cloud_init.CloudInitExtractionError as e:
524 self.instantiation_failed(str(e))
525 raise VirtualDeploymentUnitRecordError(e)
526 else:
527 self._log.debug("VDU Instantiation: cloud-init script not provided")
528
529 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
530 host_aggregates = []
531 availability_zones = []
532 server_groups = []
533 for group in self._placement_groups:
534 if group.has_field('host_aggregate'):
535 for aggregate in group.host_aggregate:
536 host_aggregates.append(aggregate.as_dict())
537 if group.has_field('availability_zone'):
538 availability_zones.append(group.availability_zone.as_dict())
539 if group.has_field('server_group'):
540 server_groups.append(group.server_group.as_dict())
541
542 if availability_zones:
543 if len(availability_zones) > 1:
544 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
545 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
546 else:
547 vm_create_msg_dict['availability_zone'] = availability_zones[0]
548
549 if server_groups:
550 if len(server_groups) > 1:
551 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
552 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
553 else:
554 vm_create_msg_dict['server_group'] = server_groups[0]
555
556 if host_aggregates:
557 vm_create_msg_dict['host_aggregate'] = host_aggregates
558
559 return
560
561 def process_placement_groups(self, vm_create_msg_dict):
562 """Process the placement_groups and fill resource-mgr request"""
563 if not self._placement_groups:
564 return
565
566 cloud_set = set([group.cloud_type for group in self._placement_groups])
567 assert len(cloud_set) == 1
568 cloud_type = cloud_set.pop()
569
570 if cloud_type == 'openstack':
571 self.process_openstack_placement_group_construct(vm_create_msg_dict)
572
573 else:
574 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
575 return
576
577 def process_custom_bootdata(self, vm_create_msg_dict):
578 """Process the custom boot data"""
579 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
580 return
581
582 self._vnfd_package_store.refresh()
583 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
584 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
585 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
586 if 'source' not in file_item or 'dest' not in file_item:
587 continue
588 source = file_item['source']
589 # Find source file in scripts dir of VNFD
590 self._log.debug("Checking for source config file at %s", source)
591 try:
592 source_file_str = cloud_init_extractor.read_script(stored_package, source)
593 except rift.package.cloud_init.CloudInitExtractionError as e:
594 raise VirtualDeploymentUnitRecordError(e)
595 # Update source file location with file contents
596 file_item['source'] = source_file_str
597
598 return
599
600 def resmgr_msg(self, config=None):
601 vdu_fields = ["vm_flavor",
602 "guest_epa",
603 "vswitch_epa",
604 "hypervisor_epa",
605 "host_epa",
606 "volumes",
607 "supplemental_boot_data"]
608
609 self._log.debug("Creating params based on VDUD: %s", self._vdud)
610 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
611
612 vm_create_msg_dict = {
613 "name": self.name,
614 }
615
616 if self.image_name is not None:
617 vm_create_msg_dict["image_name"] = self.image_name
618
619 if self.image_checksum is not None:
620 vm_create_msg_dict["image_checksum"] = self.image_checksum
621
622 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
623 if self._vdud.has_field('mgmt_vpci'):
624 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
625
626 self._log.debug("VDUD: %s", self._vdud)
627 if config is not None:
628 vm_create_msg_dict['vdu_init'] = {'userdata': config}
629
630 if self._mgmt_network:
631 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
632
633 cp_list = []
634 for intf, cp, vlr in self._ext_intf:
635 cp_info = { "name": cp.name,
636 "virtual_link_id": vlr.network_id,
637 "type_yang": intf.virtual_interface.type_yang }
638
639 if cp.has_field('port_security_enabled'):
640 cp_info["port_security_enabled"] = cp.port_security_enabled
641
642 if (intf.virtual_interface.has_field('vpci') and
643 intf.virtual_interface.vpci is not None):
644 cp_info["vpci"] = intf.virtual_interface.vpci
645
646 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
647 cp_info['security_group'] = vlr.ip_profile_params.security_group
648
649 cp_list.append(cp_info)
650
651 for intf, cp, vlr in self._int_intf:
652 if (intf.virtual_interface.has_field('vpci') and
653 intf.virtual_interface.vpci is not None):
654 cp_list.append({"name": cp,
655 "virtual_link_id": vlr.network_id,
656 "type_yang": intf.virtual_interface.type_yang,
657 "vpci": intf.virtual_interface.vpci})
658 else:
659 if cp.has_field('port_security_enabled'):
660 cp_list.append({"name": cp,
661 "virtual_link_id": vlr.network_id,
662 "type_yang": intf.virtual_interface.type_yang,
663 "port_security_enabled": cp.port_security_enabled})
664 else:
665 cp_list.append({"name": cp,
666 "virtual_link_id": vlr.network_id,
667 "type_yang": intf.virtual_interface.type_yang})
668
669
670 vm_create_msg_dict["connection_points"] = cp_list
671 vm_create_msg_dict.update(vdu_copy_dict)
672
673 self.process_placement_groups(vm_create_msg_dict)
674 if 'supplemental_boot_data' in vm_create_msg_dict:
675 self.process_custom_bootdata(vm_create_msg_dict)
676
677 msg = RwResourceMgrYang.VDUEventData()
678 msg.event_id = self._request_id
679 msg.cloud_account = self.cloud_account_name
680 msg.request_info.from_dict(vm_create_msg_dict)
681
682 return msg
683
684 @asyncio.coroutine
685 def terminate(self, xact):
686 """ Delete resource in VIM """
687 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
688 self._log.warning("VDU terminate in not ready state - Ignoring request")
689 return
690
691 self._state = VDURecordState.TERMINATING
692 if self._vm_resp is not None:
693 try:
694 with self._dts.transaction() as new_xact:
695 yield from self.delete_resource(new_xact)
696 except Exception:
697 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
698
699 if self._rm_regh is not None:
700 self._log.debug("Deregistering resource manager registration handle")
701 self._rm_regh.deregister()
702 self._rm_regh = None
703
704 if self._vdur_console_handler is not None:
705 self._log.error("Deregistering vnfr vdur registration handle")
706 self._vdur_console_handler._regh.deregister()
707 self._vdur_console_handler._regh = None
708
709 self._state = VDURecordState.TERMINATED
710
711 def find_internal_cp_by_cp_id(self, cp_id):
712 """ Find the CP corresponding to the connection point id"""
713 cp = None
714
715 self._log.debug("find_internal_cp_by_cp_id(%s) called",
716 cp_id)
717
718 for int_cp in self._vdud.internal_connection_point:
719 self._log.debug("Checking for int cp %s in internal connection points",
720 int_cp.id)
721 if int_cp.id == cp_id:
722 cp = int_cp
723 break
724
725 if cp is None:
726 self._log.debug("Failed to find cp %s in internal connection points",
727 cp_id)
728 msg = "Failed to find cp %s in internal connection points" % cp_id
729 raise VduRecordError(msg)
730
731 # return the VLR associated with the connection point
732 return cp
733
734 @asyncio.coroutine
735 def create_resource(self, xact, vnfr, config=None):
736 """ Request resource from ResourceMgr """
737 def find_cp_by_name(cp_name):
738 """ Find a connection point by name """
739 cp = None
740 self._log.debug("find_cp_by_name(%s) called", cp_name)
741 for ext_cp in vnfr._cprs:
742 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
743 if ext_cp.name == cp_name:
744 cp = ext_cp
745 break
746 if cp is None:
747 self._log.debug("Failed to find cp %s in external connection points",
748 cp_name)
749 return cp
750
751 def find_internal_vlr_by_cp_name(cp_name):
752 """ Find the VLR corresponding to the connection point name"""
753 cp = None
754
755 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
756 cp_name)
757
758 for int_cp in self._vdud.internal_connection_point:
759 self._log.debug("Checking for int cp %s in internal connection points",
760 int_cp.id)
761 if int_cp.id == cp_name:
762 cp = int_cp
763 break
764
765 if cp is None:
766 self._log.debug("Failed to find cp %s in internal connection points",
767 cp_name)
768 msg = "Failed to find cp %s in internal connection points" % cp_name
769 raise VduRecordError(msg)
770
771 # return the VLR associated with the connection point
772 return vnfr.find_vlr_by_cp(cp_name)
773
774 block = xact.block_create()
775
776 self._log.debug("Executing vm request id: %s, action: create",
777 self._request_id)
778
779 # Resolve the networks associated external interfaces
780 for ext_intf in self._vdud.external_interface:
781 self._log.debug("Resolving external interface name [%s], cp[%s]",
782 ext_intf.name, ext_intf.vnfd_connection_point_ref)
783 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
784 if cp is None:
785 self._log.debug("Failed to find connection point - %s",
786 ext_intf.vnfd_connection_point_ref)
787 continue
788 self._log.debug("Connection point name [%s], type[%s]",
789 cp.name, cp.type_yang)
790
791 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
792
793 etuple = (ext_intf, cp, vlr)
794 self._ext_intf.append(etuple)
795
796 self._log.debug("Created external interface tuple : %s", etuple)
797
798 # Resolve the networks associated internal interfaces
799 for intf in self._vdud.internal_interface:
800 cp_id = intf.vdu_internal_connection_point_ref
801 self._log.debug("Resolving internal interface name [%s], cp[%s]",
802 intf.name, cp_id)
803
804 try:
805 vlr = find_internal_vlr_by_cp_name(cp_id)
806 except Exception as e:
807 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
808 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
809 raise VduRecordError(msg)
810
811 ituple = (intf, cp_id, vlr)
812 self._int_intf.append(ituple)
813
814 self._log.debug("Created internal interface tuple : %s", ituple)
815
816 resmgr_path = self.resmgr_path
817 resmgr_msg = self.resmgr_msg(config)
818
819 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
820 block.add_query_create(resmgr_path, resmgr_msg)
821
822 res_iter = yield from block.execute(now=True)
823
824 resp = None
825
826 for i in res_iter:
827 r = yield from i
828 resp = r.result
829
830 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
831 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
832 self._log.debug("Got vm request response: %s", resp.resource_info)
833 return resp.resource_info
834
835 @asyncio.coroutine
836 def delete_resource(self, xact):
837 block = xact.block_create()
838
839 self._log.debug("Executing vm request id: %s, action: delete",
840 self._request_id)
841
842 block.add_query_delete(self.resmgr_path)
843
844 yield from block.execute(flags=0, now=True)
845
846 @asyncio.coroutine
847 def read_resource(self, xact):
848 block = xact.block_create()
849
850 self._log.debug("Executing vm request id: %s, action: delete",
851 self._request_id)
852
853 block.add_query_read(self.resmgr_path)
854
855 res_iter = yield from block.execute(flags=0, now=True)
856 for i in res_iter:
857 r = yield from i
858 resp = r.result
859
860 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
861 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
862 self._log.debug("Got vm request response: %s", resp.resource_info)
863 #self._vm_resp = resp.resource_info
864 return resp.resource_info
865
866
867 @asyncio.coroutine
868 def start_component(self):
869 """ This VDUR is active """
870 self._log.debug("Starting component %s for vdud %s vdur %s",
871 self._vdud.vcs_component_ref,
872 self._vdud,
873 self._vdur_id)
874 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
875 self.vm_resp.management_ip)
876
877 @property
878 def active(self):
879 """ Is this VDU active """
880 return True if self._state is VDURecordState.READY else False
881
882 @asyncio.coroutine
883 def instantiation_failed(self, failed_reason=None):
884 """ VDU instantiation failed """
885 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
886 self._state = VDURecordState.FAILED
887 self._state_failed_reason = failed_reason
888 yield from self._vnfr.instantiation_failed(failed_reason)
889
890 @asyncio.coroutine
891 def vdu_is_active(self):
892 """ This VDU is active"""
893 if self.active:
894 self._log.warning("VDU %s was already marked as active", self._vdur_id)
895 return
896
897 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
898
899 if self._vdud.vcs_component_ref is not None:
900 yield from self.start_component()
901
902 self._state = VDURecordState.READY
903
904 if self._vnfr.all_vdus_active():
905 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
906 yield from self._vnfr.is_ready()
907
908 @asyncio.coroutine
909 def instantiate(self, xact, vnfr, config=None):
910 """ Instantiate this VDU """
911 self._state = VDURecordState.INSTANTIATING
912
913 @asyncio.coroutine
914 def on_prepare(xact_info, query_action, ks_path, msg):
915 """ This VDUR is active """
916 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
917 query_action,
918 ks_path,
919 msg)
920
921 if (query_action == rwdts.QueryAction.UPDATE or
922 query_action == rwdts.QueryAction.CREATE):
923 self._vm_resp = msg
924
925 if msg.resource_state == "active":
926 # Move this VDU to ready state
927 yield from self.vdu_is_active()
928 elif msg.resource_state == "failed":
929 yield from self.instantiation_failed(msg.resource_errors)
930 elif query_action == rwdts.QueryAction.DELETE:
931 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
932 else:
933 raise NotImplementedError(
934 "%s action on VirtualDeployementUnitRecord not supported",
935 query_action)
936
937 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
938
939 try:
940 reg_event = asyncio.Event(loop=self._loop)
941
942 @asyncio.coroutine
943 def on_ready(regh, status):
944 reg_event.set()
945
946 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
947 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
948 flags=rwdts.Flag.SUBSCRIBER,
949 handler=handler)
950 yield from reg_event.wait()
951
952 vm_resp = yield from self.create_resource(xact, vnfr, config)
953 self._vm_resp = vm_resp
954 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
955
956 self._log.debug("Requested VM from resource manager response %s",
957 vm_resp)
958 if vm_resp.resource_state == "active":
959 self._log.debug("Resourcemgr responded wih an active vm resp %s",
960 vm_resp)
961 yield from self.vdu_is_active()
962 self._state = VDURecordState.READY
963 elif (vm_resp.resource_state == "pending" or
964 vm_resp.resource_state == "inactive"):
965 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
966 vm_resp)
967 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
968 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
969 # flags=rwdts.Flag.SUBSCRIBER,
970 # handler=handler)
971 else:
972 self._log.debug("Resourcemgr responded wih an error vm resp %s",
973 vm_resp)
974 raise VirtualDeploymentUnitRecordError(
975 "Failed VDUR instantiation %s " % vm_resp)
976
977 except Exception as e:
978 import traceback
979 traceback.print_exc()
980 self._log.exception(e)
981 self._log.error("Instantiation of VDU record failed: %s", str(e))
982 self._state = VDURecordState.FAILED
983 yield from self.instantiation_failed(str(e))
984
985
986 class VlRecordState(enum.Enum):
987 """ VL Record State """
988 INIT = 101
989 INSTANTIATION_PENDING = 102
990 ACTIVE = 103
991 TERMINATE_PENDING = 104
992 TERMINATED = 105
993 FAILED = 106
994
995
996 class InternalVirtualLinkRecord(object):
997 """ Internal Virtual Link record """
998 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
999 self._dts = dts
1000 self._log = log
1001 self._loop = loop
1002 self._ivld_msg = ivld_msg
1003 self._vnfr_name = vnfr_name
1004 self._cloud_account_name = cloud_account_name
1005 self._ip_profile = ip_profile
1006
1007 self._vlr_req = self.create_vlr()
1008 self._vlr = None
1009 self._state = VlRecordState.INIT
1010
1011 @property
1012 def vlr_id(self):
1013 """ Find VLR by id """
1014 return self._vlr_req.id
1015
1016 @property
1017 def name(self):
1018 """ Name of this VL """
1019 if self._ivld_msg.vim_network_name:
1020 return self._ivld_msg.vim_network_name
1021 else:
1022 return self._vnfr_name + "." + self._ivld_msg.name
1023
1024 @property
1025 def network_id(self):
1026 """ Find VLR by id """
1027 return self._vlr.network_id if self._vlr else None
1028
1029 def vlr_path(self):
1030 """ VLR path for this VLR instance"""
1031 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1032
1033 def create_vlr(self):
1034 """ Create the VLR record which will be instantiated """
1035
1036 vld_fields = ["short_name",
1037 "vendor",
1038 "description",
1039 "version",
1040 "type_yang",
1041 "vim_network_name",
1042 "provider_network"]
1043
1044 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1045
1046 vlr_dict = {"id": str(uuid.uuid4()),
1047 "name": self.name,
1048 "cloud_account": self._cloud_account_name,
1049 }
1050
1051 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1052 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1053
1054 vlr_dict.update(vld_copy_dict)
1055
1056 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1057 return vlr
1058
1059 @asyncio.coroutine
1060 def instantiate(self, xact, restart_mode=False):
1061 """ Instantiate VL """
1062
1063 @asyncio.coroutine
1064 def instantiate_vlr():
1065 """ Instantiate VLR"""
1066 self._log.debug("Create VL with xpath %s and vlr %s",
1067 self.vlr_path(), self._vlr_req)
1068
1069 with self._dts.transaction(flags=0) as xact:
1070 block = xact.block_create()
1071 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1072 self._log.debug("Executing VL create path:%s msg:%s",
1073 self.vlr_path(), self._vlr_req)
1074
1075 res_iter = None
1076 try:
1077 res_iter = yield from block.execute()
1078 except Exception:
1079 self._state = VlRecordState.FAILED
1080 self._log.exception("Caught exception while instantial VL")
1081 raise
1082
1083 for ent in res_iter:
1084 res = yield from ent
1085 self._vlr = res.result
1086
1087 if self._vlr.operational_status == 'failed':
1088 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1089 self._state = VlRecordState.FAILED
1090 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1091
1092 self._log.info("Created VL with xpath %s and vlr %s",
1093 self.vlr_path(), self._vlr)
1094
1095 @asyncio.coroutine
1096 def get_vlr():
1097 """ Get the network id """
1098 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1099 vlr = None
1100 for ent in res_iter:
1101 res = yield from ent
1102 vlr = res.result
1103
1104 if vlr is None:
1105 err = "Failed to get VLR for path %s" % self.vlr_path()
1106 self._log.warn(err)
1107 raise InternalVirtualLinkRecordError(err)
1108 return vlr
1109
1110 self._state = VlRecordState.INSTANTIATION_PENDING
1111
1112 if restart_mode:
1113 vl = yield from get_vlr()
1114 if vl is None:
1115 yield from instantiate_vlr()
1116 else:
1117 yield from instantiate_vlr()
1118
1119 self._state = VlRecordState.ACTIVE
1120
1121 def vlr_in_vns(self):
1122 """ Is there a VLR record in VNS """
1123 if (self._state == VlRecordState.ACTIVE or
1124 self._state == VlRecordState.INSTANTIATION_PENDING or
1125 self._state == VlRecordState.FAILED):
1126 return True
1127
1128 return False
1129
1130 @asyncio.coroutine
1131 def terminate(self, xact):
1132 """Terminate this VL """
1133 if not self.vlr_in_vns():
1134 self._log.debug("Ignoring terminate request for id %s in state %s",
1135 self.vlr_id, self._state)
1136 return
1137
1138 self._log.debug("Terminating VL with path %s", self.vlr_path())
1139 self._state = VlRecordState.TERMINATE_PENDING
1140 block = xact.block_create()
1141 block.add_query_delete(self.vlr_path())
1142 yield from block.execute(flags=0, now=True)
1143 self._state = VlRecordState.TERMINATED
1144 self._log.debug("Terminated VL with path %s", self.vlr_path())
1145
1146
1147 class VirtualNetworkFunctionRecord(object):
1148 """ Virtual Network Function Record """
1149 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1150 self._dts = dts
1151 self._log = log
1152 self._loop = loop
1153 self._cluster_name = cluster_name
1154 self._vnfr_msg = vnfr_msg
1155 self._vnfr_id = vnfr_msg.id
1156 self._vnfd_id = vnfr_msg.vnfd.id
1157 self._vnfm = vnfm
1158 self._vcs_handler = vcs_handler
1159 self._vnfr = vnfr_msg
1160 self._mgmt_network = mgmt_network
1161
1162 self._vnfd = vnfr_msg.vnfd
1163 self._state = VirtualNetworkFunctionRecordState.INIT
1164 self._state_failed_reason = None
1165 self._ext_vlrs = {} # The list of external virtual links
1166 self._vlrs = [] # The list of internal virtual links
1167 self._vdus = [] # The list of vdu
1168 self._vlr_by_cp = {}
1169 self._cprs = []
1170 self._inventory = {}
1171 self._create_time = int(time.time())
1172 self._vnf_mon = None
1173 self._config_status = vnfr_msg.config_status
1174 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1175 self._rw_vnfd = None
1176 self._vnfd_ref_count = 0
1177
1178 def _get_vdur_from_vdu_id(self, vdu_id):
1179 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1180 self._log.debug("Searching through vdus: %s", self._vdus)
1181 for vdu in self._vdus:
1182 self._log.debug("vdu_id: %s", vdu.vdu_id)
1183 if vdu.vdu_id == vdu_id:
1184 return vdu
1185
1186 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1187
1188 @property
1189 def operational_status(self):
1190 """ Operational status of this VNFR """
1191 op_status_map = {"INIT": "init",
1192 "VL_INIT_PHASE": "vl_init_phase",
1193 "VM_INIT_PHASE": "vm_init_phase",
1194 "READY": "running",
1195 "TERMINATE": "terminate",
1196 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1197 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1198 "TERMINATED": "terminated",
1199 "FAILED": "failed", }
1200 return op_status_map[self._state.name]
1201
1202 @staticmethod
1203 def vnfd_xpath(vnfd_id):
1204 """ VNFD xpath associated with this VNFR """
1205 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1206
1207 @property
1208 def vnfd_ref_count(self):
1209 """ Returns the VNFD reference count associated with this VNFR """
1210 return self._vnfd_ref_count
1211
1212 def vnfd_in_use(self):
1213 """ Returns whether vnfd is in use or not """
1214 return True if self._vnfd_ref_count > 0 else False
1215
1216 def vnfd_ref(self):
1217 """ Take a reference on this object """
1218 self._vnfd_ref_count += 1
1219 return self._vnfd_ref_count
1220
1221 def vnfd_unref(self):
1222 """ Release reference on this object """
1223 if self._vnfd_ref_count < 1:
1224 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1225 (self.vnfd.id, self._vnfd_ref_count))
1226 self._log.critical(msg)
1227 raise VnfRecordError(msg)
1228 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1229 self.vnfd.id, self._vnfd_ref_count)
1230 self._vnfd_ref_count -= 1
1231 return self._vnfd_ref_count
1232
1233 @property
1234 def vnfd(self):
1235 """ VNFD for this VNFR """
1236 return self._vnfd
1237
1238 @property
1239 def vnf_name(self):
1240 """ VNFD name associated with this VNFR """
1241 return self.vnfd.name
1242
1243 @property
1244 def name(self):
1245 """ Name of this VNF in the record """
1246 return self._vnfr.name
1247
1248 @property
1249 def cloud_account_name(self):
1250 """ Name of the cloud account this VNFR is instantiated in """
1251 return self._vnfr.cloud_account
1252
1253 @property
1254 def vnfd_id(self):
1255 """ VNFD Id associated with this VNFR """
1256 return self.vnfd.id
1257
1258 @property
1259 def vnfr_id(self):
1260 """ VNFR Id associated with this VNFR """
1261 return self._vnfr_id
1262
1263 @property
1264 def member_vnf_index(self):
1265 """ Member VNF index associated with this VNFR """
1266 return self._vnfr.member_vnf_index_ref
1267
1268 @property
1269 def config_status(self):
1270 """ Config agent status for this VNFR """
1271 return self._config_status
1272
1273 def component_by_name(self, component_name):
1274 """ Find a component by name in the inventory list"""
1275 mangled_name = VcsComponent.mangle_name(component_name,
1276 self.vnf_name,
1277 self.vnfd_id)
1278 return self._inventory[mangled_name]
1279
1280
1281
1282 @asyncio.coroutine
1283 def get_nsr_config(self):
1284 ### Need access to NS instance configuration for runtime resolution.
1285 ### This shall be replaced when deployment flavors are implemented
1286 xpath = "C,/nsr:ns-instance-config"
1287 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1288
1289 for result in results:
1290 entry = yield from result
1291 ns_instance_config = entry.result
1292 for nsr in ns_instance_config.nsr:
1293 if nsr.id == self._vnfr_msg.nsr_id_ref:
1294 return nsr
1295 return None
1296
1297 @asyncio.coroutine
1298 def start_component(self, component_name, ip_addr):
1299 """ Start a component in the VNFR by name """
1300 comp = self.component_by_name(component_name)
1301 yield from comp.start(None, None, ip_addr)
1302
1303 def cp_ip_addr(self, cp_name):
1304 """ Get ip address for connection point """
1305 self._log.debug("cp_ip_addr()")
1306 for cp in self._cprs:
1307 if cp.name == cp_name and cp.ip_address is not None:
1308 return cp.ip_address
1309 return "0.0.0.0"
1310
1311 def mgmt_intf_info(self):
1312 """ Get Management interface info for this VNFR """
1313 mgmt_intf_desc = self.vnfd.mgmt_interface
1314 ip_addr = None
1315 if mgmt_intf_desc.has_field("cp"):
1316 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1317 elif mgmt_intf_desc.has_field("vdu_id"):
1318 try:
1319 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1320 ip_addr = vdur.management_ip
1321 except VDURecordNotFound:
1322 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1323 ip_addr = None
1324 else:
1325 ip_addr = mgmt_intf_desc.ip_address
1326 port = mgmt_intf_desc.port
1327
1328 return ip_addr, port
1329
1330 @property
1331 def msg(self):
1332 """ Message associated with this VNFR """
1333 vnfd_fields = ["short_name", "vendor", "description", "version"]
1334 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1335
1336 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1337 ip_address, port = self.mgmt_intf_info()
1338
1339 if ip_address is not None:
1340 mgmt_intf.ip_address = ip_address
1341 if port is not None:
1342 mgmt_intf.port = port
1343
1344 vnfr_dict = {"id": self._vnfr_id,
1345 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1346 "name": self.name,
1347 "member_vnf_index_ref": self.member_vnf_index,
1348 "operational_status": self.operational_status,
1349 "operational_status_details": self._state_failed_reason,
1350 "cloud_account": self.cloud_account_name,
1351 "config_status": self._config_status
1352 }
1353
1354 vnfr_dict.update(vnfd_copy_dict)
1355
1356 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1357 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1358
1359 vnfr_msg.create_time = self._create_time
1360 vnfr_msg.uptime = int(time.time()) - self._create_time
1361 vnfr_msg.mgmt_interface = mgmt_intf
1362
1363 # Add all the VLRs to VNFR
1364 for vlr in self._vlrs:
1365 ivlr = vnfr_msg.internal_vlr.add()
1366 ivlr.vlr_ref = vlr.vlr_id
1367
1368 # Add all the VDURs to VDUR
1369 if self._vdus is not None:
1370 for vdu in self._vdus:
1371 vdur = vnfr_msg.vdur.add()
1372 vdur.from_dict(vdu.msg.as_dict())
1373
1374 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1375 vnfr_msg.dashboard_url = self.dashboard_url
1376
1377 for cpr in self._cprs:
1378 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1379 vnfr_msg.connection_point.append(new_cp)
1380
1381 if self._vnf_mon is not None:
1382 for monp in self._vnf_mon.msg:
1383 vnfr_msg.monitoring_param.append(
1384 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1385
1386 if self._vnfr.vnf_configuration is not None:
1387 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1388 if (ip_address is not None and
1389 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1390 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1391
1392 for group in self._vnfr_msg.placement_groups_info:
1393 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1394 group_info.from_dict(group.as_dict())
1395 vnfr_msg.placement_groups_info.append(group_info)
1396
1397 return vnfr_msg
1398
1399 @property
1400 def dashboard_url(self):
1401 ip, cfg_port = self.mgmt_intf_info()
1402 protocol = 'http'
1403 http_port = 80
1404 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1405 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1406 protocol = 'https'
1407 http_port = 443
1408 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1409 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1410
1411 url = "{protocol}://{ip_address}:{port}/{path}".format(
1412 protocol=protocol,
1413 ip_address=ip,
1414 port=http_port,
1415 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1416 )
1417
1418 return url
1419
1420 @property
1421 def xpath(self):
1422 """ path for this VNFR """
1423 return("D,/vnfr:vnfr-catalog"
1424 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1425
1426 @asyncio.coroutine
1427 def publish(self, xact):
1428 """ publish this VNFR """
1429 vnfr = self.msg
1430 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1431 self.xpath, self.msg)
1432 vnfr.create_time = self._create_time
1433 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1434 self._log.debug("Published VNFR path = [%s], record = [%s]",
1435 self.xpath, self.msg)
1436
1437 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1438 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1439 if not vld.has_field('ip_profile_ref'):
1440 return None
1441 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1442 return profile[0] if profile else None
1443
1444 @asyncio.coroutine
1445 def create_vls(self):
1446 """ Publish The VLs associated with this VNF """
1447 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1448 self.vnfd_id)
1449 for ivld_msg in self.vnfd.internal_vld:
1450 self._log.debug("Creating internal vld:"
1451 " %s, int_cp_ref = %s",
1452 ivld_msg, ivld_msg.internal_connection_point
1453 )
1454 vlr = InternalVirtualLinkRecord(dts=self._dts,
1455 log=self._log,
1456 loop=self._loop,
1457 ivld_msg=ivld_msg,
1458 vnfr_name=self.name,
1459 cloud_account_name=self.cloud_account_name,
1460 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1461 )
1462 self._vlrs.append(vlr)
1463
1464 for int_cp in ivld_msg.internal_connection_point:
1465 if int_cp.id_ref in self._vlr_by_cp:
1466 msg = ("Connection point %s already "
1467 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1468 raise InternalVirtualLinkRecordError(msg)
1469 self._log.debug("Setting vlr %s to internal cp = %s",
1470 vlr, int_cp.id_ref)
1471 self._vlr_by_cp[int_cp.id_ref] = vlr
1472
1473 @asyncio.coroutine
1474 def instantiate_vls(self, xact, restart_mode=False):
1475 """ Instantiate the VLs associated with this VNF """
1476 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1477 self.vnfd_id)
1478
1479 for vlr in self._vlrs:
1480 self._log.debug("Instantiating VLR %s", vlr)
1481 yield from vlr.instantiate(xact, restart_mode)
1482
1483 def find_vlr_by_cp(self, cp_name):
1484 """ Find the VLR associated with the cp name """
1485 return self._vlr_by_cp[cp_name]
1486
1487 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1488 """
1489 Returns the cloud specific construct for placement group
1490 Arguments:
1491 input_group: VNFD PlacementGroup
1492 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1493 """
1494 copy_dict = ['name', 'requirement', 'strategy']
1495 for group_info in nsr_config.vnfd_placement_group_maps:
1496 if group_info.placement_group_ref == input_group.name and \
1497 group_info.vnfd_id_ref == self.vnfd_id:
1498 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1499 group_dict = {k:v for k,v in
1500 group_info.as_dict().items()
1501 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1502 for param in copy_dict:
1503 group_dict.update({param: getattr(input_group, param)})
1504 group.from_dict(group_dict)
1505 return group
1506 return None
1507
1508 @asyncio.coroutine
1509 def get_vdu_placement_groups(self, vdu):
1510 placement_groups = []
1511 ### Step-1: Get VNF level placement groups
1512 for group in self._vnfr_msg.placement_groups_info:
1513 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1514 #group_info.from_dict(group.as_dict())
1515 placement_groups.append(group)
1516
1517 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1518 nsr_config = yield from self.get_nsr_config()
1519
1520 ### Step-3: Get VDU level placement groups
1521 for group in self.vnfd.placement_groups:
1522 for member_vdu in group.member_vdus:
1523 if member_vdu.member_vdu_ref == vdu.id:
1524 group_info = self.resolve_placement_group_cloud_construct(group,
1525 nsr_config)
1526 if group_info is None:
1527 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1528 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1529 else:
1530 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1531 str(group_info),
1532 vdu.name,
1533 self.vnf_name,
1534 self.member_vnf_index)
1535 placement_groups.append(group_info)
1536
1537 return placement_groups
1538
1539 @asyncio.coroutine
1540 def vdu_cloud_init_instantiation(self):
1541 [vdu.vdud_cloud_init for vdu in self._vdus]
1542
1543 @asyncio.coroutine
1544 def create_vdus(self, vnfr, restart_mode=False):
1545 """ Create the VDUs associated with this VNF """
1546
1547 def get_vdur_id(vdud):
1548 """Get the corresponding VDUR's id for the VDUD. This is useful in
1549 case of a restart.
1550
1551 In restart mode we check for exiting VDUR's ID and use them, if
1552 available. This way we don't end up creating duplicate VDURs
1553 """
1554 vdur_id = None
1555
1556 if restart_mode and vdud is not None:
1557 try:
1558 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1559 vdur_id = vdur[0]
1560 except IndexError:
1561 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1562
1563 return vdur_id
1564
1565
1566 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1567 for vdu in self._rw_vnfd.vdu:
1568 self._log.debug("Creating vdu: %s", vdu)
1569 vdur_id = get_vdur_id(vdu)
1570
1571 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1572 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1573 vdu.name,
1574 self.vnf_name,
1575 self.member_vnf_index,
1576 [ group.name for group in placement_groups])
1577
1578 vdur = VirtualDeploymentUnitRecord(
1579 dts=self._dts,
1580 log=self._log,
1581 loop=self._loop,
1582 vdud=vdu,
1583 vnfr=vnfr,
1584 mgmt_intf=self.has_mgmt_interface(vdu),
1585 mgmt_network=self._mgmt_network,
1586 cloud_account_name=self.cloud_account_name,
1587 vnfd_package_store=self._vnfd_package_store,
1588 vdur_id=vdur_id,
1589 placement_groups = placement_groups,
1590 )
1591 yield from vdur.vdu_opdata_register()
1592
1593 self._vdus.append(vdur)
1594
1595 @asyncio.coroutine
1596 def instantiate_vdus(self, xact, vnfr):
1597 """ Instantiate the VDUs associated with this VNF """
1598 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1599
1600 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1601
1602 # Identify any dependencies among the VDUs
1603 dependencies = collections.defaultdict(list)
1604 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1605
1606 for vdu in self._vdus:
1607 if vdu._vdud_cloud_init is not None:
1608 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1609 if vdu_id != vdu.vdu_id:
1610 # This means that vdu.vdu_id depends upon vdu_id,
1611 # i.e. vdu_id must be instantiated before
1612 # vdu.vdu_id.
1613 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1614
1615 # Define the terminal states of VDU instantiation
1616 terminal = (
1617 VDURecordState.READY,
1618 VDURecordState.TERMINATED,
1619 VDURecordState.FAILED,
1620 )
1621
1622 datastore = VdurDatastore()
1623 processed = set()
1624
1625 @asyncio.coroutine
1626 def instantiate_monitor(vdu):
1627 """Monitor the state of the VDU during instantiation
1628
1629 Arguments:
1630 vdu - a VirtualDeploymentUnitRecord
1631
1632 """
1633 # wait for the VDUR to enter a terminal state
1634 while vdu._state not in terminal:
1635 yield from asyncio.sleep(1, loop=self._loop)
1636 # update the datastore
1637 datastore.update(vdu)
1638
1639 # add the VDU to the set of processed VDUs
1640 processed.add(vdu.vdu_id)
1641
1642 @asyncio.coroutine
1643 def instantiate(vdu):
1644 """Instantiate the specified VDU
1645
1646 Arguments:
1647 vdu - a VirtualDeploymentUnitRecord
1648
1649 Raises:
1650 if the VDU, or any of the VDUs this VDU depends upon, are
1651 terminated or fail to instantiate properly, a
1652 VirtualDeploymentUnitRecordError is raised.
1653
1654 """
1655 for dependency in dependencies[vdu.vdu_id]:
1656 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1657
1658 while dependency.vdu_id not in processed:
1659 yield from asyncio.sleep(1, loop=self._loop)
1660
1661 if not dependency.active:
1662 raise VirtualDeploymentUnitRecordError()
1663
1664 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1665
1666 # Populate the datastore with the current values of the VDU
1667 datastore.add(vdu)
1668
1669 # Substitute any variables contained in the cloud config script
1670 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1671
1672 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1673 if len(parts) > 1:
1674
1675 # Extract the variable names
1676 variables = list()
1677 for variable in parts[1::2]:
1678 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1679
1680 # Iterate of the variables and substitute values from the
1681 # datastore.
1682 for variable in variables:
1683
1684 # Handle a reference to a VDU by ID
1685 if variable.startswith('vdu['):
1686 value = datastore.get(variable)
1687 if value is None:
1688 msg = "Unable to find a substitute for {} in {} cloud-init script"
1689 raise ValueError(msg.format(variable, vdu.vdu_id))
1690
1691 config = config.replace("{{ %s }}" % variable, value)
1692 continue
1693
1694 # Handle a reference to the current VDU
1695 if variable.startswith('vdu'):
1696 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1697 config = config.replace("{{ %s }}" % variable, value)
1698 continue
1699
1700 # Handle unrecognized variables
1701 msg = 'unrecognized cloud-config variable: {}'
1702 raise ValueError(msg.format(variable))
1703
1704 # Instantiate the VDU
1705 with self._dts.transaction() as xact:
1706 self._log.debug("Instantiating vdu: %s", vdu)
1707 yield from vdu.instantiate(xact, vnfr, config=config)
1708 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1709 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1710 self.vnfr_id, vdu)
1711
1712 # First create a set of tasks to monitor the state of the VDUs and
1713 # report when they have entered a terminal state
1714 for vdu in self._vdus:
1715 self._loop.create_task(instantiate_monitor(vdu))
1716
1717 for vdu in self._vdus:
1718 self._loop.create_task(instantiate(vdu))
1719
1720 def has_mgmt_interface(self, vdu):
1721 # ## TODO: Support additional mgmt_interface type options
1722 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1723 return True
1724 return False
1725
1726 def vlr_xpath(self, vlr_id):
1727 """ vlr xpath """
1728 return(
1729 "D,/vlr:vlr-catalog/"
1730 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1731
1732 def ext_vlr_by_id(self, vlr_id):
1733 """ find ext vlr by id """
1734 return self._ext_vlrs[vlr_id]
1735
1736 @asyncio.coroutine
1737 def publish_inventory(self, xact):
1738 """ Publish the inventory associated with this VNF """
1739 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1740
1741 for component in self._rw_vnfd.component:
1742 self._log.debug("Creating inventory component %s", component)
1743 mangled_name = VcsComponent.mangle_name(component.component_name,
1744 self.vnf_name,
1745 self.vnfd_id
1746 )
1747 comp = VcsComponent(dts=self._dts,
1748 log=self._log,
1749 loop=self._loop,
1750 cluster_name=self._cluster_name,
1751 vcs_handler=self._vcs_handler,
1752 component=component,
1753 mangled_name=mangled_name,
1754 )
1755 if comp.name in self._inventory:
1756 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1757 component, self._vnfd_id)
1758 return
1759 self._log.debug("Adding component %s for vnrf %s",
1760 comp.name, self._vnfr_id)
1761 self._inventory[comp.name] = comp
1762 yield from comp.publish(xact)
1763
1764 def all_vdus_active(self):
1765 """ Are all VDUS in this VNFR active? """
1766 for vdu in self._vdus:
1767 if not vdu.active:
1768 return False
1769
1770 self._log.debug("Inside all_vdus_active. Returning True")
1771 return True
1772
1773 @asyncio.coroutine
1774 def instantiation_failed(self, failed_reason=None):
1775 """ VNFR instantiation failed """
1776 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1777 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1778 self._state_failed_reason = failed_reason
1779
1780 # Update the VNFR with the changed status
1781 yield from self.publish(None)
1782
1783 @asyncio.coroutine
1784 def is_ready(self):
1785 """ This VNF is ready"""
1786 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1787
1788 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1789 self.set_state(VirtualNetworkFunctionRecordState.READY)
1790
1791 else:
1792 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1793
1794 # Update the VNFR with the changed status
1795 yield from self.publish(None)
1796
1797 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1798 """Updated the connection point with ip address"""
1799 for cp in self._cprs:
1800 if cp.name == cp_name:
1801 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1802 cp_name, cp, ip_address, cp_id)
1803 cp.ip_address = ip_address
1804 cp.mac_address = mac_addr
1805 cp.connection_point_id = cp_id
1806 return
1807
1808 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1809 self._log.debug(err)
1810 raise VirtualDeploymentUnitRecordError(err)
1811
1812 def set_state(self, state):
1813 """ Set state for this VNFR"""
1814 self._state = state
1815
1816 @asyncio.coroutine
1817 def instantiate(self, xact, restart_mode=False):
1818 """ instantiate this VNF """
1819 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1820 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1821
1822 @asyncio.coroutine
1823 def fetch_vlrs():
1824 """ Fetch VLRs """
1825 # Iterate over all the connection points in VNFR and fetch the
1826 # associated VLRs
1827
1828 def cpr_from_cp(cp):
1829 """ Creates a record level connection point from the desciptor cp"""
1830 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1831 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1832 cpr_dict = {}
1833 cpr_dict.update(cp_copy_dict)
1834 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1835
1836 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1837 self._vnfr_id, self._vnfr.connection_point)
1838
1839 for cp in self._vnfr.connection_point:
1840 cpr = cpr_from_cp(cp)
1841 self._cprs.append(cpr)
1842 self._log.debug("Adding Connection point record %s ", cp)
1843
1844 vlr_path = self.vlr_xpath(cp.vlr_ref)
1845 self._log.debug("Fetching VLR with path = %s", vlr_path)
1846 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1847 rwdts.XactFlag.MERGE)
1848 for i in res_iter:
1849 r = yield from i
1850 d = r.result
1851 self._ext_vlrs[cp.vlr_ref] = d
1852 cpr.vlr_ref = cp.vlr_ref
1853 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1854
1855 # Increase the VNFD reference count
1856 self.vnfd_ref()
1857
1858 assert self.vnfd
1859
1860 # Fetch External VLRs
1861 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1862 yield from fetch_vlrs()
1863
1864 # Publish inventory
1865 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1866 yield from self.publish_inventory(xact)
1867
1868 # Publish inventory
1869 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1870 yield from self.create_vls()
1871
1872 # publish the VNFR
1873 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1874 yield from self.publish(xact)
1875
1876
1877 # instantiate VLs
1878 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1879 try:
1880 yield from self.instantiate_vls(xact, restart_mode)
1881 except Exception as e:
1882 self._log.exception("VL instantiation failed (%s)", str(e))
1883 yield from self.instantiation_failed(str(e))
1884 return
1885
1886 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1887
1888 # instantiate VDUs
1889 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1890 yield from self.create_vdus(self, restart_mode)
1891
1892 try:
1893 yield from self.vdu_cloud_init_instantiation()
1894 except Exception as e:
1895 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1896 self._state_failed_reason = str(e)
1897 yield from self.publish(xact)
1898
1899 # publish the VNFR
1900 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1901 yield from self.publish(xact)
1902
1903 # instantiate VDUs
1904 # ToDo: Check if this should be prevented during restart
1905 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1906 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1907
1908 # publish the VNFR
1909 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1910 yield from self.publish(xact)
1911
1912 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1913
1914 # create task updating uptime for this vnfr
1915 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1916 self._loop.create_task(self.vnfr_uptime_update(xact))
1917
1918 @asyncio.coroutine
1919 def terminate(self, xact):
1920 """ Terminate this virtual network function """
1921
1922 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1923
1924 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1925
1926 # stop monitoring
1927 if self._vnf_mon is not None:
1928 self._vnf_mon.stop()
1929 self._vnf_mon.deregister()
1930 self._vnf_mon = None
1931
1932 @asyncio.coroutine
1933 def terminate_vls():
1934 """ Terminate VLs in this VNF """
1935 for vl in self._vlrs:
1936 yield from vl.terminate(xact)
1937
1938 @asyncio.coroutine
1939 def terminate_vdus():
1940 """ Terminate VDUS in this VNF """
1941 for vdu in self._vdus:
1942 yield from vdu.terminate(xact)
1943
1944 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1945 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1946 yield from terminate_vls()
1947
1948 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1949 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1950 yield from terminate_vdus()
1951
1952 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1953 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1954
1955 @asyncio.coroutine
1956 def vnfr_uptime_update(self, xact):
1957 while True:
1958 # Return when vnfr state is FAILED or TERMINATED etc
1959 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1960 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1961 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1962 VirtualNetworkFunctionRecordState.READY]:
1963 return
1964 yield from self.publish(xact)
1965 yield from asyncio.sleep(2, loop=self._loop)
1966
1967
1968
1969 class VnfdDtsHandler(object):
1970 """ DTS handler for VNFD config changes """
1971 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1972
1973 def __init__(self, dts, log, loop, vnfm):
1974 self._dts = dts
1975 self._log = log
1976 self._loop = loop
1977 self._vnfm = vnfm
1978 self._regh = None
1979
1980 @asyncio.coroutine
1981 def regh(self):
1982 """ DTS registration handle """
1983 return self._regh
1984
1985 @asyncio.coroutine
1986 def register(self):
1987 """ Register for VNFD configuration"""
1988
1989 def on_apply(dts, acg, xact, action, scratch):
1990 """Apply the configuration"""
1991 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1992 xact, action, scratch)
1993
1994 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1995
1996 @asyncio.coroutine
1997 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1998 """ on prepare callback """
1999 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
2000 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
2001 fref = ProtobufC.FieldReference.alloc()
2002 fref.goto_whole_message(msg.to_pbcm())
2003
2004 # Handle deletes in prepare_callback
2005 if fref.is_field_deleted():
2006 # Delete an VNFD record
2007 self._log.debug("Deleting VNFD with id %s", msg.id)
2008 if self._vnfm.vnfd_in_use(msg.id):
2009 self._log.debug("Cannot delete VNFD in use - %s", msg)
2010 err = "Cannot delete a VNFD in use - %s" % msg
2011 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2012 # Delete a VNFD record
2013 yield from self._vnfm.delete_vnfd(msg.id)
2014
2015 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2016
2017 self._log.debug(
2018 "Registering for VNFD config using xpath: %s",
2019 VnfdDtsHandler.XPATH,
2020 )
2021 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2022 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2023 self._regh = acg.register(
2024 xpath=VnfdDtsHandler.XPATH,
2025 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2026 on_prepare=on_prepare)
2027
2028
2029 class VcsComponentDtsHandler(object):
2030 """ Vcs Component DTS handler """
2031 XPATH = ("D,/rw-manifest:manifest" +
2032 "/rw-manifest:operational-inventory" +
2033 "/rw-manifest:component")
2034
2035 def __init__(self, dts, log, loop, vnfm):
2036 self._dts = dts
2037 self._log = log
2038 self._loop = loop
2039 self._regh = None
2040 self._vnfm = vnfm
2041
2042 @property
2043 def regh(self):
2044 """ DTS registration handle """
2045 return self._regh
2046
2047 @asyncio.coroutine
2048 def register(self):
2049 """ Registers VCS component dts publisher registration"""
2050 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2051 VcsComponentDtsHandler.XPATH)
2052
2053 hdl = rift.tasklets.DTS.RegistrationHandler()
2054 handlers = rift.tasklets.Group.Handler()
2055 with self._dts.group_create(handler=handlers) as group:
2056 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2057 handler=hdl,
2058 flags=(rwdts.Flag.PUBLISHER |
2059 rwdts.Flag.NO_PREP_READ |
2060 rwdts.Flag.DATASTORE),)
2061
2062 @asyncio.coroutine
2063 def publish(self, xact, path, msg):
2064 """ Publishes the VCS component """
2065 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2066 xact, path, msg)
2067 self.regh.create_element(path, msg)
2068 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2069 VcsComponentDtsHandler.XPATH, xact, path, msg)
2070
2071 class VnfrConsoleOperdataDtsHandler(object):
2072 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2073 @property
2074 def vnfr_vdu_console_xpath(self):
2075 """ path for resource-mgr"""
2076 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2077
2078 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2079 self._dts = dts
2080 self._log = log
2081 self._loop = loop
2082 self._regh = None
2083 self._vnfm = vnfm
2084
2085 self._vnfr_id = vnfr_id
2086 self._vdur_id = vdur_id
2087 self._vdu_id = vdu_id
2088
2089 @asyncio.coroutine
2090 def register(self):
2091 """ Register for VNFR VDU Operational Data read from dts """
2092
2093 @asyncio.coroutine
2094 def on_prepare(xact_info, action, ks_path, msg):
2095 """ prepare callback from dts """
2096 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2097 self._log.debug(
2098 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2099 xact_info, action, xpath, msg
2100 )
2101
2102 if action == rwdts.QueryAction.READ:
2103 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2104 path_entry = schema.keyspec_to_entry(ks_path)
2105 self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id))
2106 try:
2107 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2108 except VnfRecordError as e:
2109 self._log.error("VNFR id %s not found", self._vnfr_id)
2110 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2111 return
2112 try:
2113 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2114 if not vdur._state == VDURecordState.READY:
2115 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2116 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2117 return
2118 with self._dts.transaction() as new_xact:
2119 resp = yield from vdur.read_resource(new_xact)
2120 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2121 vdur_console.id = self._vdur_id
2122 if resp.console_url:
2123 vdur_console.console_url = resp.console_url
2124 else:
2125 vdur_console.console_url = 'none'
2126 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2127 except Exception:
2128 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2129 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2130 vdur_console.id = self._vdur_id
2131 vdur_console.console_url = 'none'
2132
2133 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2134 xpath=self.vnfr_vdu_console_xpath,
2135 msg=vdur_console)
2136 else:
2137 #raise VnfRecordError("Not supported operation %s" % action)
2138 self._log.error("Not supported operation %s" % action)
2139 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2140 return
2141
2142
2143 self._log.debug("Registering for VNFR VDU using xpath: %s",
2144 self.vnfr_vdu_console_xpath)
2145 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2146 with self._dts.group_create() as group:
2147 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2148 handler=hdl,
2149 flags=rwdts.Flag.PUBLISHER,
2150 )
2151
2152
2153 class VnfrDtsHandler(object):
2154 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2155 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2156
2157 def __init__(self, dts, log, loop, vnfm):
2158 self._dts = dts
2159 self._log = log
2160 self._loop = loop
2161 self._vnfm = vnfm
2162
2163 self._regh = None
2164
2165 @property
2166 def regh(self):
2167 """ Return registration handle"""
2168 return self._regh
2169
2170 @property
2171 def vnfm(self):
2172 """ Return VNF manager instance """
2173 return self._vnfm
2174
2175 @asyncio.coroutine
2176 def register(self):
2177 """ Register for vnfr create/update/delete/read requests from dts """
2178 def on_commit(xact_info):
2179 """ The transaction has been committed """
2180 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2181 return rwdts.MemberRspCode.ACTION_OK
2182
2183 def on_abort(*args):
2184 """ Abort callback """
2185 self._log.debug("VNF transaction got aborted")
2186
2187 @asyncio.coroutine
2188 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2189
2190 @asyncio.coroutine
2191 def instantiate_realloc_vnfr(vnfr):
2192 """Re-populate the vnfm after restart
2193
2194 Arguments:
2195 vlink
2196
2197 """
2198
2199 yield from vnfr.instantiate(None, restart_mode=True)
2200
2201 if xact_event == rwdts.MemberEvent.INSTALL:
2202 curr_cfg = self.regh.elements
2203 for cfg in curr_cfg:
2204 vnfr = self.vnfm.create_vnfr(cfg)
2205 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2206
2207 self._log.debug("Got on_event in vnfm")
2208
2209 return rwdts.MemberRspCode.ACTION_OK
2210
2211 @asyncio.coroutine
2212 def on_prepare(xact_info, action, ks_path, msg):
2213 """ prepare callback from dts """
2214 self._log.debug(
2215 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2216 xact_info, action, msg
2217 )
2218
2219 if action == rwdts.QueryAction.CREATE:
2220 if not msg.has_field("vnfd"):
2221 err = "Vnfd not provided"
2222 self._log.error(err)
2223 raise VnfRecordError(err)
2224
2225 vnfr = self.vnfm.create_vnfr(msg)
2226 try:
2227 # RIFT-9105: Unable to add a READ query under an existing transaction
2228 # xact = xact_info.xact
2229 yield from vnfr.instantiate(None)
2230 except Exception as e:
2231 self._log.exception(e)
2232 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2233 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2234 yield from vnfr.publish(None)
2235 elif action == rwdts.QueryAction.DELETE:
2236 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2237 path_entry = schema.keyspec_to_entry(ks_path)
2238 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2239
2240 if vnfr is None:
2241 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2242 raise VirtualNetworkFunctionRecordNotFound(
2243 "VNFR id %s", path_entry.key00.id)
2244
2245 try:
2246 yield from vnfr.terminate(xact_info.xact)
2247 # Unref the VNFD
2248 vnfr.vnfd_unref()
2249 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2250 except Exception as e:
2251 self._log.exception(e)
2252 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2253
2254 elif action == rwdts.QueryAction.UPDATE:
2255 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2256 path_entry = schema.keyspec_to_entry(ks_path)
2257 vnfr = None
2258 try:
2259 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2260 except Exception as e:
2261 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2262 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2263 return
2264
2265 if vnfr is None:
2266 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2267 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2268 return
2269
2270 self._log.debug("VNFR {} update config status {} (current {})".
2271 format(vnfr.name, msg.config_status, vnfr.config_status))
2272 # Update the config status and publish
2273 vnfr._config_status = msg.config_status
2274 yield from vnfr.publish(None)
2275
2276 else:
2277 raise NotImplementedError(
2278 "%s action on VirtualNetworkFunctionRecord not supported",
2279 action)
2280
2281 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2282
2283 self._log.debug("Registering for VNFR using xpath: %s",
2284 VnfrDtsHandler.XPATH,)
2285
2286 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2287 on_prepare=on_prepare,)
2288 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2289 with self._dts.group_create(handler=handlers) as group:
2290 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2291 handler=hdl,
2292 flags=(rwdts.Flag.PUBLISHER |
2293 rwdts.Flag.NO_PREP_READ |
2294 rwdts.Flag.CACHE |
2295 rwdts.Flag.DATASTORE),)
2296
2297 @asyncio.coroutine
2298 def create(self, xact, path, msg):
2299 """
2300 Create a VNFR record in DTS with path and message
2301 """
2302 self._log.debug("Creating VNFR xact = %s, %s:%s",
2303 xact, path, msg)
2304
2305 self.regh.create_element(path, msg)
2306 self._log.debug("Created VNFR xact = %s, %s:%s",
2307 xact, path, msg)
2308
2309 @asyncio.coroutine
2310 def update(self, xact, path, msg):
2311 """
2312 Update a VNFR record in DTS with path and message
2313 """
2314 self._log.debug("Updating VNFR xact = %s, %s:%s",
2315 xact, path, msg)
2316 self.regh.update_element(path, msg)
2317 self._log.debug("Updated VNFR xact = %s, %s:%s",
2318 xact, path, msg)
2319
2320 @asyncio.coroutine
2321 def delete(self, xact, path):
2322 """
2323 Delete a VNFR record in DTS with path and message
2324 """
2325 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2326 self.regh.delete_element(path)
2327 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2328
2329
2330 class VnfdRefCountDtsHandler(object):
2331 """ The VNFD Ref Count DTS handler """
2332 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2333
2334 def __init__(self, dts, log, loop, vnfm):
2335 self._dts = dts
2336 self._log = log
2337 self._loop = loop
2338 self._vnfm = vnfm
2339
2340 self._regh = None
2341
2342 @property
2343 def regh(self):
2344 """ Return registration handle """
2345 return self._regh
2346
2347 @property
2348 def vnfm(self):
2349 """ Return the NS manager instance """
2350 return self._vnfm
2351
2352 @asyncio.coroutine
2353 def register(self):
2354 """ Register for VNFD ref count read from dts """
2355
2356 @asyncio.coroutine
2357 def on_prepare(xact_info, action, ks_path, msg):
2358 """ prepare callback from dts """
2359 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2360 self._log.debug(
2361 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2362 xact_info, action, xpath, msg
2363 )
2364
2365 if action == rwdts.QueryAction.READ:
2366 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2367 path_entry = schema.keyspec_to_entry(ks_path)
2368 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2369 for xpath, msg in vnfd_list:
2370 self._log.debug("Responding to ref count query path:%s, msg:%s",
2371 xpath, msg)
2372 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2373 xpath=xpath,
2374 msg=msg)
2375 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2376 else:
2377 raise VnfRecordError("Not supported operation %s" % action)
2378
2379 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2380 with self._dts.group_create() as group:
2381 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2382 handler=hdl,
2383 flags=rwdts.Flag.PUBLISHER,
2384 )
2385
2386
2387 class VdurDatastore(object):
2388 """
2389 This VdurDatastore is intended to expose select information about a VDUR
2390 such that it can be referenced in a cloud config file. The data that is
2391 exposed does not necessarily follow the structure of the data in the yang
2392 model. This is intentional. The data that are exposed are intended to be
2393 agnostic of the yang model so that changes in the model do not necessarily
2394 require changes to the interface provided to the user. It also means that
2395 the user does not need to be familiar with the RIFT.ware yang models.
2396 """
2397
2398 def __init__(self):
2399 """Create an instance of VdurDatastore"""
2400 self._vdur_data = dict()
2401 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2402
2403 def add(self, vdur):
2404 """Add a new VDUR to the datastore
2405
2406 Arguments:
2407 vdur - a VirtualDeploymentUnitRecord instance
2408
2409 Raises:
2410 A ValueError is raised if the VDUR is (1) None or (2) already in
2411 the datastore.
2412
2413 """
2414 if vdur.vdu_id is None:
2415 raise ValueError('VDURs are required to have an ID')
2416
2417 if vdur.vdu_id in self._vdur_data:
2418 raise ValueError('cannot add a VDUR more than once')
2419
2420 self._vdur_data[vdur.vdu_id] = dict()
2421
2422 def set_if_not_none(key, attr):
2423 if attr is not None:
2424 self._vdur_data[vdur.vdu_id][key] = attr
2425
2426 set_if_not_none('name', vdur._vdud.name)
2427 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2428
2429 def update(self, vdur):
2430 """Update the VDUR information in the datastore
2431
2432 Arguments:
2433 vdur - a GI representation of a VDUR
2434
2435 Raises:
2436 A ValueError is raised if the VDUR is (1) None or (2) already in
2437 the datastore.
2438
2439 """
2440 if vdur.vdu_id is None:
2441 raise ValueError('VNFDs are required to have an ID')
2442
2443 if vdur.vdu_id not in self._vdur_data:
2444 raise ValueError('VNF is not recognized')
2445
2446 def set_or_delete(key, attr):
2447 if attr is None:
2448 if key in self._vdur_data[vdur.vdu_id]:
2449 del self._vdur_data[vdur.vdu_id][key]
2450
2451 else:
2452 self._vdur_data[vdur.vdu_id][key] = attr
2453
2454 set_or_delete('name', vdur._vdud.name)
2455 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2456
2457 def remove(self, vdur_id):
2458 """Remove all of the data associated with specified VDUR
2459
2460 Arguments:
2461 vdur_id - the identifier of a VNFD in the datastore
2462
2463 Raises:
2464 A ValueError is raised if the VDUR is not contained in the
2465 datastore.
2466
2467 """
2468 if vdur_id not in self._vdur_data:
2469 raise ValueError('VNF is not recognized')
2470
2471 del self._vdur_data[vdur_id]
2472
2473 def get(self, expr):
2474 """Retrieve VDUR information from the datastore
2475
2476 An expression should be of the form,
2477
2478 vdu[<id>].<attr>
2479
2480 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2481 the exposed attribute that the user wishes to retrieve.
2482
2483 If the requested data is not available, None is returned.
2484
2485 Arguments:
2486 expr - a string that specifies the data to return
2487
2488 Raises:
2489 A ValueError is raised if the provided expression cannot be parsed.
2490
2491 Returns:
2492 The requested data or None
2493
2494 """
2495 result = self._pattern.match(expr)
2496 if result is None:
2497 raise ValueError('data expression not recognized ({})'.format(expr))
2498
2499 vdur_id, key = result.groups()
2500
2501 if vdur_id not in self._vdur_data:
2502 return None
2503
2504 return self._vdur_data[vdur_id].get(key, None)
2505
2506
2507 class VnfManager(object):
2508 """ The virtual network function manager class """
2509 def __init__(self, dts, log, loop, cluster_name):
2510 self._dts = dts
2511 self._log = log
2512 self._loop = loop
2513 self._cluster_name = cluster_name
2514
2515 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2516 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2517 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2518 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2519
2520 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2521 self._vnfr_handler,
2522 self._vcs_handler,
2523 self._vnfr_ref_handler,
2524 self._nsr_handler]
2525 self._vnfrs = {}
2526 self._vnfds_to_vnfr = {}
2527 self._nsrs = {}
2528
2529 @property
2530 def vnfr_handler(self):
2531 """ VNFR dts handler """
2532 return self._vnfr_handler
2533
2534 @property
2535 def vcs_handler(self):
2536 """ VCS dts handler """
2537 return self._vcs_handler
2538
2539 @asyncio.coroutine
2540 def register(self):
2541 """ Register all static DTS handlers """
2542 for hdl in self._dts_handlers:
2543 yield from hdl.register()
2544
2545 @asyncio.coroutine
2546 def run(self):
2547 """ Run this VNFM instance """
2548 self._log.debug("Run VNFManager - registering static DTS handlers""")
2549 yield from self.register()
2550
2551 def handle_nsr(self, nsr, action):
2552 if action in [rwdts.QueryAction.CREATE]:
2553 self._nsrs[nsr.id] = nsr
2554 elif action == rwdts.QueryAction.DELETE:
2555 if nsr.id in self._nsrs:
2556 del self._nsrs[nsr.id]
2557
2558 def get_linked_mgmt_network(self, vnfr):
2559 """For the given VNFR get the related mgmt network from the NSD, if
2560 available.
2561 """
2562 vnfd_id = vnfr.vnfd.id
2563 nsr_id = vnfr.nsr_id_ref
2564
2565 # for the given related VNFR, get the corresponding NSR-config
2566 nsr_obj = None
2567 try:
2568 nsr_obj = self._nsrs[nsr_id]
2569 except KeyError:
2570 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2571
2572 # for the related NSD check if a VLD exists such that it's a mgmt
2573 # network
2574 for vld in nsr_obj.nsd.vld:
2575 if vld.mgmt_network:
2576 return vld.name
2577
2578 return None
2579
2580 def get_vnfr(self, vnfr_id):
2581 """ get VNFR by vnfr id """
2582
2583 if vnfr_id not in self._vnfrs:
2584 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2585
2586 return self._vnfrs[vnfr_id]
2587
2588 def create_vnfr(self, vnfr):
2589 """ Create a VNFR instance """
2590 if vnfr.id in self._vnfrs:
2591 msg = "Vnfr id %s already exists" % vnfr.id
2592 self._log.error(msg)
2593 raise VnfRecordError(msg)
2594
2595 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2596 vnfr.id,
2597 vnfr.vnfd.id)
2598
2599 mgmt_network = self.get_linked_mgmt_network(vnfr)
2600
2601 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2602 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2603 mgmt_network=mgmt_network
2604 )
2605
2606 #Update ref count
2607 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2608 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2609 else:
2610 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2611
2612 return self._vnfrs[vnfr.id]
2613
2614 @asyncio.coroutine
2615 def delete_vnfr(self, xact, vnfr):
2616 """ Create a VNFR instance """
2617 if vnfr.vnfr_id in self._vnfrs:
2618 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2619 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2620
2621 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2622 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2623 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2624
2625 del self._vnfrs[vnfr.vnfr_id]
2626
2627 @asyncio.coroutine
2628 def fetch_vnfd(self, vnfd_id):
2629 """ Fetch VNFDs based with the vnfd id"""
2630 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2631 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2632 vnfd = None
2633
2634 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2635
2636 for ent in res_iter:
2637 res = yield from ent
2638 vnfd = res.result
2639
2640 if vnfd is None:
2641 err = "Failed to get Vnfd %s" % vnfd_id
2642 self._log.error(err)
2643 raise VnfRecordError(err)
2644
2645 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2646
2647 return vnfd
2648
2649 def vnfd_in_use(self, vnfd_id):
2650 """ Is this VNFD in use """
2651 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2652 if vnfd_id in self._vnfds_to_vnfr:
2653 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2654 return False
2655
2656 @asyncio.coroutine
2657 def publish_vnfr(self, xact, path, msg):
2658 """ Publish a VNFR """
2659 self._log.debug("publish_vnfr called with path %s, msg %s",
2660 path, msg)
2661 yield from self.vnfr_handler.update(xact, path, msg)
2662
2663 @asyncio.coroutine
2664 def delete_vnfd(self, vnfd_id):
2665 """ Delete the Virtual Network Function descriptor with the passed id """
2666 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2667 if vnfd_id in self._vnfds_to_vnfr:
2668 if self._vnfds_to_vnfr[vnfd_id]:
2669 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2670 vnfd_id,
2671 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2672 raise VirtualNetworkFunctionDescriptorRefCountExists(
2673 "Cannot delete :%s, ref_count:%s",
2674 vnfd_id,
2675 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2676
2677 del self._vnfds_to_vnfr[vnfd_id]
2678
2679 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2680 try:
2681 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2682 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2683 if os.path.exists(vnfd_dir):
2684 shutil.rmtree(vnfd_dir, ignore_errors=True)
2685 except Exception as e:
2686 self._log.error("Exception in cleaning up VNFD {}: {}".
2687 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2688 self._log.exception(e)
2689
2690
2691 def vnfd_refcount_xpath(self, vnfd_id):
2692 """ xpath for ref count entry """
2693 return (VnfdRefCountDtsHandler.XPATH +
2694 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2695
2696 @asyncio.coroutine
2697 def get_vnfd_refcount(self, vnfd_id):
2698 """ Get the vnfd_list from this VNFM"""
2699 vnfd_list = []
2700 if vnfd_id is None or vnfd_id == "":
2701 for vnfd in self._vnfds_to_vnfr.keys():
2702 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2703 vnfd_msg.vnfd_id_ref = vnfd
2704 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2705 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2706 elif vnfd_id in self._vnfds_to_vnfr:
2707 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2708 vnfd_msg.vnfd_id_ref = vnfd_id
2709 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2710 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2711
2712 return vnfd_list
2713
2714
2715 class VnfmTasklet(rift.tasklets.Tasklet):
2716 """ VNF Manager tasklet class """
2717 def __init__(self, *args, **kwargs):
2718 super(VnfmTasklet, self).__init__(*args, **kwargs)
2719 self.rwlog.set_category("rw-mano-log")
2720 self.rwlog.set_subcategory("vnfm")
2721
2722 self._dts = None
2723 self._vnfm = None
2724
2725 def start(self):
2726 try:
2727 super(VnfmTasklet, self).start()
2728 self.log.info("Starting VnfmTasklet")
2729
2730 self.log.setLevel(logging.DEBUG)
2731
2732 self.log.debug("Registering with dts")
2733 self._dts = rift.tasklets.DTS(self.tasklet_info,
2734 RwVnfmYang.get_schema(),
2735 self.loop,
2736 self.on_dts_state_change)
2737
2738 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2739 except Exception:
2740 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2741 raise
2742
2743 def on_instance_started(self):
2744 """ Task insance started callback """
2745 self.log.debug("Got instance started callback")
2746
2747 def stop(self):
2748 try:
2749 self._dts.deinit()
2750 except Exception:
2751 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2752 raise
2753
2754 @asyncio.coroutine
2755 def init(self):
2756 """ Task init callback """
2757 try:
2758 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2759 assert vm_parent_name is not None
2760 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2761 yield from self._vnfm.run()
2762 except Exception:
2763 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2764 raise
2765
2766 @asyncio.coroutine
2767 def run(self):
2768 """ Task run callback """
2769 pass
2770
2771 @asyncio.coroutine
2772 def on_dts_state_change(self, state):
2773 """Take action according to current dts state to transition
2774 application into the corresponding application state
2775
2776 Arguments
2777 state - current dts state
2778 """
2779 switch = {
2780 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2781 rwdts.State.CONFIG: rwdts.State.RUN,
2782 }
2783
2784 handlers = {
2785 rwdts.State.INIT: self.init,
2786 rwdts.State.RUN: self.run,
2787 }
2788
2789 # Transition application to next state
2790 handler = handlers.get(state, None)
2791 if handler is not None:
2792 yield from handler()
2793
2794 # Transition dts to next state
2795 next_state = switch.get(state, None)
2796 if next_state is not None:
2797 self._dts.handle.set_state(next_state)