Bug 138
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54
55
56 class VMResourceError(Exception):
57 """ VM resource Error"""
58 pass
59
60
61 class VnfRecordError(Exception):
62 """ VNF record instatiation failed"""
63 pass
64
65
66 class VduRecordError(Exception):
67 """ VDU record instatiation failed"""
68 pass
69
70
71 class NotImplemented(Exception):
72 """Not implemented """
73 pass
74
75
76 class VnfrRecordExistsError(Exception):
77 """VNFR record already exist with the same VNFR id"""
78 pass
79
80
81 class InternalVirtualLinkRecordError(Exception):
82 """Internal virtual link record error"""
83 pass
84
85
86 class VDUImageNotFound(Exception):
87 """VDU Image not found error"""
88 pass
89
90
91 class VirtualDeploymentUnitRecordError(Exception):
92 """VDU Instantiation failed"""
93 pass
94
95
96 class VMNotReadyError(Exception):
97 """ VM Not yet received from resource manager """
98 pass
99
100
101 class VDURecordNotFound(Exception):
102 """ Could not find a VDU record """
103 pass
104
105
106 class VirtualNetworkFunctionRecordDescNotFound(Exception):
107 """ Cannot find Virtual Network Function Record Descriptor """
108 pass
109
110
111 class VirtualNetworkFunctionDescriptorError(Exception):
112 """ Virtual Network Function Record Descriptor Error """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorNotFound(Exception):
117 """ Virtual Network Function Record Descriptor Not Found """
118 pass
119
120
121 class VirtualNetworkFunctionRecordNotFound(Exception):
122 """ Virtual Network Function Record Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
127 """ Virtual Network Funtion Descriptor reference count exists """
128 pass
129
130
131 class VnfrInstantiationFailed(Exception):
132 """ Virtual Network Funtion Instantiation failed"""
133 pass
134
135
136 class VNFMPlacementGroupError(Exception):
137 pass
138
139 class VirtualNetworkFunctionRecordState(enum.Enum):
140 """ VNFR state """
141 INIT = 1
142 VL_INIT_PHASE = 2
143 VM_INIT_PHASE = 3
144 READY = 4
145 TERMINATE = 5
146 VL_TERMINATE_PHASE = 6
147 VDU_TERMINATE_PHASE = 7
148 TERMINATED = 7
149 FAILED = 10
150
151
152 class VDURecordState(enum.Enum):
153 """VDU record state """
154 INIT = 1
155 INSTANTIATING = 2
156 RESOURCE_ALLOC_PENDING = 3
157 READY = 4
158 TERMINATING = 5
159 TERMINATED = 6
160 FAILED = 10
161
162
163 class VcsComponent(object):
164 """ VCS Component within the VNF descriptor """
165 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
166 self._dts = dts
167 self._log = log
168 self._loop = loop
169 self._component = component
170 self._cluster_name = cluster_name
171 self._vcs_handler = vcs_handler
172 self._mangled_name = mangled_name
173
174 @staticmethod
175 def mangle_name(component_name, vnf_name, vnfd_id):
176 """ mangled component name """
177 return vnf_name + ":" + component_name + ":" + vnfd_id
178
179 @property
180 def name(self):
181 """ name of this component"""
182 return self._mangled_name
183
184 @property
185 def path(self):
186 """ The path for this object """
187 return("D,/rw-manifest:manifest" +
188 "/rw-manifest:operational-inventory" +
189 "/rw-manifest:component" +
190 "[rw-manifest:component-name = '{}']").format(self.name)
191
192 @property
193 def instance_xpath(self):
194 """ The path for this object """
195 return("D,/rw-base:vcs" +
196 "/instances" +
197 "/instance" +
198 "[instance-name = '{}']".format(self._cluster_name))
199
200 @property
201 def start_comp_xpath(self):
202 """ start component xpath """
203 return (self.instance_xpath +
204 "/child-n[instance-name = 'START-REQ']")
205
206 def get_start_comp_msg(self, ip_address):
207 """ start this component """
208 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
209 start_msg.instance_name = 'START-REQ'
210 start_msg.component_name = self.name
211 start_msg.admin_command = "START"
212 start_msg.ip_address = ip_address
213
214 return start_msg
215
216 @property
217 def msg(self):
218 """ Returns the message for this vcs component"""
219
220 vcs_comp_dict = self._component.as_dict()
221
222 def mangle_comp_names(comp_dict):
223 """ mangle component name with VNF name, id"""
224 for key, val in comp_dict.items():
225 if isinstance(val, dict):
226 comp_dict[key] = mangle_comp_names(val)
227 elif isinstance(val, list):
228 i = 0
229 for ent in val:
230 if isinstance(ent, dict):
231 val[i] = mangle_comp_names(ent)
232 else:
233 val[i] = ent
234 i += 1
235 elif key == "component_name":
236 comp_dict[key] = VcsComponent.mangle_name(val,
237 self._vnfd_name,
238 self._vnfd_id)
239 return comp_dict
240
241 mangled_dict = mangle_comp_names(vcs_comp_dict)
242 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
243 return msg
244
245 @asyncio.coroutine
246 def publish(self, xact):
247 """ Publishes the VCS component """
248 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
249 self.name, self.path, self.msg)
250 yield from self._vcs_handler.publish(xact, self.path, self.msg)
251
252 @asyncio.coroutine
253 def start(self, xact, parent, ip_addr=None):
254 """ Starts this VCS component """
255 # ATTN RV - replace with block add
256 start_msg = self.get_start_comp_msg(ip_addr)
257 self._log.debug("starting component %s %s",
258 self.start_comp_xpath, start_msg)
259 yield from self._dts.query_create(self.start_comp_xpath,
260 0,
261 start_msg)
262 self._log.debug("started component %s, %s",
263 self.start_comp_xpath, start_msg)
264
265
266 class VirtualDeploymentUnitRecord(object):
267 """ Virtual Deployment Unit Record """
268 def __init__(self,
269 dts,
270 log,
271 loop,
272 vdud,
273 vnfr,
274 mgmt_intf,
275 mgmt_network,
276 cloud_account_name,
277 vnfd_package_store,
278 vdur_id=None,
279 placement_groups=[]):
280 self._dts = dts
281 self._log = log
282 self._loop = loop
283 self._vdud = vdud
284 self._vnfr = vnfr
285 self._mgmt_intf = mgmt_intf
286 self._cloud_account_name = cloud_account_name
287 self._vnfd_package_store = vnfd_package_store
288 self._mgmt_network = mgmt_network
289
290 self._vdur_id = vdur_id or str(uuid.uuid4())
291 self._int_intf = []
292 self._ext_intf = []
293 self._state = VDURecordState.INIT
294 self._state_failed_reason = None
295 self._request_id = str(uuid.uuid4())
296 self._name = vnfr.name + "__" + vdud.id
297 self._placement_groups = placement_groups
298 self._rm_regh = None
299 self._vm_resp = None
300 self._vdud_cloud_init = None
301 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
302
303 @asyncio.coroutine
304 def vdu_opdata_register(self):
305 yield from self._vdur_console_handler.register()
306
307 def cp_ip_addr(self, cp_name):
308 """ Find ip address by connection point name """
309 if self._vm_resp is not None:
310 for conn_point in self._vm_resp.connection_points:
311 if conn_point.name == cp_name:
312 return conn_point.ip_address
313 return "0.0.0.0"
314
315 def cp_mac_addr(self, cp_name):
316 """ Find mac address by connection point name """
317 if self._vm_resp is not None:
318 for conn_point in self._vm_resp.connection_points:
319 if conn_point.name == cp_name:
320 return conn_point.mac_addr
321 return "00:00:00:00:00:00"
322
323 def cp_id(self, cp_name):
324 """ Find connection point id by connection point name """
325 if self._vm_resp is not None:
326 for conn_point in self._vm_resp.connection_points:
327 if conn_point.name == cp_name:
328 return conn_point.connection_point_id
329 return ''
330
331 @property
332 def vdu_id(self):
333 return self._vdud.id
334
335 @property
336 def vm_resp(self):
337 return self._vm_resp
338
339 @property
340 def name(self):
341 """ Return this VDUR's name """
342 return self._name
343
344 @property
345 def cloud_account_name(self):
346 """ Cloud account this VDU should be created in """
347 return self._cloud_account_name
348
349 @property
350 def image_name(self):
351 """ name that should be used to lookup the image on the CMP """
352 if 'image' not in self._vdud:
353 return None
354 return os.path.basename(self._vdud.image)
355
356 @property
357 def image_checksum(self):
358 """ name that should be used to lookup the image on the CMP """
359 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
360
361 @property
362 def management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
366
367 @property
368 def vm_management_ip(self):
369 if not self.active:
370 return None
371 return self._vm_resp.management_ip
372
373 @property
374 def operational_status(self):
375 """ Operational status of this VDU"""
376 op_stats_dict = {"INIT": "init",
377 "INSTANTIATING": "vm_init_phase",
378 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
379 "READY": "running",
380 "FAILED": "failed",
381 "TERMINATING": "terminated",
382 "TERMINATED": "terminated",
383 }
384 return op_stats_dict[self._state.name]
385
386 @property
387 def msg(self):
388 """ Process VDU message from resmgr"""
389 vdu_fields = ["vm_flavor",
390 "guest_epa",
391 "vswitch_epa",
392 "hypervisor_epa",
393 "host_epa",
394 "volumes",
395 "name"]
396 vdu_copy_dict = {k: v for k, v in
397 self._vdud.as_dict().items() if k in vdu_fields}
398 vdur_dict = {"id": self._vdur_id,
399 "vdu_id_ref": self._vdud.id,
400 "operational_status": self.operational_status,
401 "operational_status_details": self._state_failed_reason,
402 }
403 if self.vm_resp is not None:
404 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
405 "flavor_id": self.vm_resp.flavor_id
406 })
407 if self._vm_resp.has_field('image_id'):
408 vdur_dict.update({ "image_id": self.vm_resp.image_id })
409
410 if self.management_ip is not None:
411 vdur_dict["management_ip"] = self.management_ip
412
413 if self.vm_management_ip is not None:
414 vdur_dict["vm_management_ip"] = self.vm_management_ip
415
416 vdur_dict.update(vdu_copy_dict)
417
418 if self.vm_resp is not None:
419 if self._vm_resp.has_field('volumes'):
420 for opvolume in self._vm_resp.volumes:
421 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
422 if len(vdurvol_data) == 1:
423 vdurvol_data[0]["volume_id"] = opvolume.volume_id
424 if opvolume.has_field('custom_meta_data'):
425 metadata_list = list()
426 for metadata_item in opvolume.custom_meta_data:
427 metadata_list.append(metadata_item.as_dict())
428 if 'guest_params' not in vdurvol_data[0]:
429 vdurvol_data[0]['guest_params'] = dict()
430 vdurvol_data[0]['guest_params']['custom_meta_data'] = metadata_list
431
432 if self._vm_resp.has_field('custom_boot_data'):
433 vdur_dict['custom_boot_data'] = dict()
434 if self._vm_resp.custom_boot_data.has_field('custom_drive'):
435 vdur_dict['custom_boot_data']['custom_drive'] = self._vm_resp.custom_boot_data.custom_drive
436 if self._vm_resp.custom_boot_data.has_field('custom_meta_data'):
437 metadata_list = list()
438 for metadata_item in self._vm_resp.custom_boot_data.custom_meta_data:
439 metadata_list.append(metadata_item.as_dict())
440 vdur_dict['custom_boot_data']['custom_meta_data'] = metadata_list
441 if self._vm_resp.custom_boot_data.has_field('custom_config_files'):
442 file_list = list()
443 for file_item in self._vm_resp.custom_boot_data.custom_config_files:
444 file_list.append(file_item.as_dict())
445 vdur_dict['custom_boot_data']['custom_config_files'] = file_list
446
447 icp_list = []
448 ii_list = []
449
450 for intf, cp_id, vlr in self._int_intf:
451 cp = self.find_internal_cp_by_cp_id(cp_id)
452
453 icp_list.append({"name": cp.name,
454 "id": cp.id,
455 "type_yang": "VPORT",
456 "ip_address": self.cp_ip_addr(cp.id),
457 "mac_address": self.cp_mac_addr(cp.id)})
458
459 ii_list.append({"name": intf.name,
460 "vdur_internal_connection_point_ref": cp.id,
461 "virtual_interface": {}})
462
463 vdur_dict["internal_connection_point"] = icp_list
464 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
465 vdur_dict["internal_interface"] = ii_list
466
467 ei_list = []
468 for intf, cp, vlr in self._ext_intf:
469 ei_list.append({"name": cp.name,
470 "vnfd_connection_point_ref": cp.name,
471 "virtual_interface": {}})
472 self._vnfr.update_cp(cp.name,
473 self.cp_ip_addr(cp.name),
474 self.cp_mac_addr(cp.name),
475 self.cp_id(cp.name))
476
477 vdur_dict["external_interface"] = ei_list
478
479 placement_groups = []
480 for group in self._placement_groups:
481 placement_groups.append(group.as_dict())
482 vdur_dict['placement_groups_info'] = placement_groups
483
484 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
485
486 @property
487 def resmgr_path(self):
488 """ path for resource-mgr"""
489 return ("D,/rw-resource-mgr:resource-mgmt" +
490 "/vdu-event" +
491 "/vdu-event-data[event-id='{}']".format(self._request_id))
492
493 @property
494 def vm_flavor_msg(self):
495 """ VM flavor message """
496 flavor = self._vdud.vm_flavor.__class__()
497 flavor.copy_from(self._vdud.vm_flavor)
498
499 return flavor
500
501 @property
502 def vdud_cloud_init(self):
503 """ Return the cloud-init contents for the VDU """
504 if self._vdud_cloud_init is None:
505 self._vdud_cloud_init = self.cloud_init()
506
507 return self._vdud_cloud_init
508
509 def cloud_init(self):
510 """ Populate cloud_init with cloud-config script from
511 either the inline contents or from the file provided
512 """
513 if self._vdud.cloud_init is not None:
514 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
515 return self._vdud.cloud_init
516 elif self._vdud.cloud_init_file is not None:
517 # Get cloud-init script contents from the file provided in the cloud_init_file param
518 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
519 filename = self._vdud.cloud_init_file
520 self._vnfd_package_store.refresh()
521 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
522 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
523 try:
524 return cloud_init_extractor.read_script(stored_package, filename)
525 except rift.package.cloud_init.CloudInitExtractionError as e:
526 raise VirtualDeploymentUnitRecordError(e)
527 else:
528 self._log.debug("VDU Instantiation: cloud-init script not provided")
529
530 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
531 host_aggregates = []
532 availability_zones = []
533 server_groups = []
534 for group in self._placement_groups:
535 if group.has_field('host_aggregate'):
536 for aggregate in group.host_aggregate:
537 host_aggregates.append(aggregate.as_dict())
538 if group.has_field('availability_zone'):
539 availability_zones.append(group.availability_zone.as_dict())
540 if group.has_field('server_group'):
541 server_groups.append(group.server_group.as_dict())
542
543 if availability_zones:
544 if len(availability_zones) > 1:
545 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
546 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
547 else:
548 vm_create_msg_dict['availability_zone'] = availability_zones[0]
549
550 if server_groups:
551 if len(server_groups) > 1:
552 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
553 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
554 else:
555 vm_create_msg_dict['server_group'] = server_groups[0]
556
557 if host_aggregates:
558 vm_create_msg_dict['host_aggregate'] = host_aggregates
559
560 return
561
562 def process_placement_groups(self, vm_create_msg_dict):
563 """Process the placement_groups and fill resource-mgr request"""
564 if not self._placement_groups:
565 return
566
567 cloud_set = set([group.cloud_type for group in self._placement_groups])
568 assert len(cloud_set) == 1
569 cloud_type = cloud_set.pop()
570
571 if cloud_type == 'openstack':
572 self.process_openstack_placement_group_construct(vm_create_msg_dict)
573
574 else:
575 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
576 return
577
578 def process_custom_bootdata(self, vm_create_msg_dict):
579 """Process the custom boot data"""
580 if 'custom_config_files' not in vm_create_msg_dict['custom_boot_data']:
581 return
582
583 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
584 script_extractor = rift.package.script.PackageScriptExtractor(self._log)
585 for custom_file_item in vm_create_msg_dict['custom_boot_data']['custom_config_files']:
586 if 'source' not in custom_file_item or 'dest' not in custom_file_item:
587 continue
588 source = custom_file_item['source']
589 # Find source file in scripts dir of VNFD
590 self._vnfd_package_store.refresh()
591 self._log.debug("Checking for source config file at %s", source)
592 try:
593 source_file_str = script_extractor.read_script(stored_package, source)
594 except rift.package.script.ScriptExtractionError as e:
595 raise VirtualDeploymentUnitRecordError(e)
596 # Update source file location with file contents
597 custom_file_item['source'] = source_file_str
598
599 return
600
601 def resmgr_msg(self, config=None):
602 vdu_fields = ["vm_flavor",
603 "guest_epa",
604 "vswitch_epa",
605 "hypervisor_epa",
606 "host_epa",
607 "volumes",
608 "custom_boot_data"]
609
610 self._log.debug("Creating params based on VDUD: %s", self._vdud)
611 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
612
613 vm_create_msg_dict = {
614 "name": self.name,
615 }
616
617 if self.image_name is not None:
618 vm_create_msg_dict["image_name"] = self.image_name
619
620 if self.image_checksum is not None:
621 vm_create_msg_dict["image_checksum"] = self.image_checksum
622
623 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
624 if self._vdud.has_field('mgmt_vpci'):
625 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
626
627 self._log.debug("VDUD: %s", self._vdud)
628 if config is not None:
629 vm_create_msg_dict['vdu_init'] = {'userdata': config}
630
631 if self._mgmt_network:
632 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
633
634 cp_list = []
635 for intf, cp, vlr in self._ext_intf:
636 cp_info = {"name": cp.name,
637 "virtual_link_id": vlr.network_id,
638 "type_yang": intf.virtual_interface.type_yang,
639 "port_security_enabled": cp.port_security_enabled}
640
641 if (intf.virtual_interface.has_field('vpci') and
642 intf.virtual_interface.vpci is not None):
643 cp_info["vpci"] = intf.virtual_interface.vpci
644
645 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
646 cp_info['security_group'] = vlr.ip_profile_params.security_group
647
648 cp_list.append(cp_info)
649
650 for intf, cp, vlr in self._int_intf:
651 if (intf.virtual_interface.has_field('vpci') and
652 intf.virtual_interface.vpci is not None):
653 cp_list.append({"name": cp,
654 "virtual_link_id": vlr.network_id,
655 "type_yang": intf.virtual_interface.type_yang,
656 "vpci": intf.virtual_interface.vpci})
657 else:
658 cp_list.append({"name": cp,
659 "virtual_link_id": vlr.network_id,
660 "type_yang": intf.virtual_interface.type_yang,
661 "port_security_enabled": cp.port_security_enabled})
662
663 vm_create_msg_dict["connection_points"] = cp_list
664 vm_create_msg_dict.update(vdu_copy_dict)
665
666 self.process_placement_groups(vm_create_msg_dict)
667 if 'custom_boot_data' in vm_create_msg_dict:
668 self.process_custom_bootdata(vm_create_msg_dict)
669
670 msg = RwResourceMgrYang.VDUEventData()
671 msg.event_id = self._request_id
672 msg.cloud_account = self.cloud_account_name
673 msg.request_info.from_dict(vm_create_msg_dict)
674
675 return msg
676
677 @asyncio.coroutine
678 def terminate(self, xact):
679 """ Delete resource in VIM """
680 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
681 self._log.warning("VDU terminate in not ready state - Ignoring request")
682 return
683
684 self._state = VDURecordState.TERMINATING
685 if self._vm_resp is not None:
686 try:
687 with self._dts.transaction() as new_xact:
688 yield from self.delete_resource(new_xact)
689 except Exception:
690 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
691
692 if self._rm_regh is not None:
693 self._log.debug("Deregistering resource manager registration handle")
694 self._rm_regh.deregister()
695 self._rm_regh = None
696
697 if self._vdur_console_handler is not None:
698 self._log.error("Deregistering vnfr vdur registration handle")
699 self._vdur_console_handler._regh.deregister()
700 self._vdur_console_handler._regh = None
701
702 self._state = VDURecordState.TERMINATED
703
704 def find_internal_cp_by_cp_id(self, cp_id):
705 """ Find the CP corresponding to the connection point id"""
706 cp = None
707
708 self._log.debug("find_internal_cp_by_cp_id(%s) called",
709 cp_id)
710
711 for int_cp in self._vdud.internal_connection_point:
712 self._log.debug("Checking for int cp %s in internal connection points",
713 int_cp.id)
714 if int_cp.id == cp_id:
715 cp = int_cp
716 break
717
718 if cp is None:
719 self._log.debug("Failed to find cp %s in internal connection points",
720 cp_id)
721 msg = "Failed to find cp %s in internal connection points" % cp_id
722 raise VduRecordError(msg)
723
724 # return the VLR associated with the connection point
725 return cp
726
727 @asyncio.coroutine
728 def create_resource(self, xact, vnfr, config=None):
729 """ Request resource from ResourceMgr """
730 def find_cp_by_name(cp_name):
731 """ Find a connection point by name """
732 cp = None
733 self._log.debug("find_cp_by_name(%s) called", cp_name)
734 for ext_cp in vnfr._cprs:
735 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
736 if ext_cp.name == cp_name:
737 cp = ext_cp
738 break
739 if cp is None:
740 self._log.debug("Failed to find cp %s in external connection points",
741 cp_name)
742 return cp
743
744 def find_internal_vlr_by_cp_name(cp_name):
745 """ Find the VLR corresponding to the connection point name"""
746 cp = None
747
748 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
749 cp_name)
750
751 for int_cp in self._vdud.internal_connection_point:
752 self._log.debug("Checking for int cp %s in internal connection points",
753 int_cp.id)
754 if int_cp.id == cp_name:
755 cp = int_cp
756 break
757
758 if cp is None:
759 self._log.debug("Failed to find cp %s in internal connection points",
760 cp_name)
761 msg = "Failed to find cp %s in internal connection points" % cp_name
762 raise VduRecordError(msg)
763
764 # return the VLR associated with the connection point
765 return vnfr.find_vlr_by_cp(cp_name)
766
767 block = xact.block_create()
768
769 self._log.debug("Executing vm request id: %s, action: create",
770 self._request_id)
771
772 # Resolve the networks associated external interfaces
773 for ext_intf in self._vdud.external_interface:
774 self._log.debug("Resolving external interface name [%s], cp[%s]",
775 ext_intf.name, ext_intf.vnfd_connection_point_ref)
776 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
777 if cp is None:
778 self._log.debug("Failed to find connection point - %s",
779 ext_intf.vnfd_connection_point_ref)
780 continue
781 self._log.debug("Connection point name [%s], type[%s]",
782 cp.name, cp.type_yang)
783
784 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
785
786 etuple = (ext_intf, cp, vlr)
787 self._ext_intf.append(etuple)
788
789 self._log.debug("Created external interface tuple : %s", etuple)
790
791 # Resolve the networks associated internal interfaces
792 for intf in self._vdud.internal_interface:
793 cp_id = intf.vdu_internal_connection_point_ref
794 self._log.debug("Resolving internal interface name [%s], cp[%s]",
795 intf.name, cp_id)
796
797 try:
798 vlr = find_internal_vlr_by_cp_name(cp_id)
799 except Exception as e:
800 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
801 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
802 raise VduRecordError(msg)
803
804 ituple = (intf, cp_id, vlr)
805 self._int_intf.append(ituple)
806
807 self._log.debug("Created internal interface tuple : %s", ituple)
808
809 resmgr_path = self.resmgr_path
810 resmgr_msg = self.resmgr_msg(config)
811
812 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
813 block.add_query_create(resmgr_path, resmgr_msg)
814
815 res_iter = yield from block.execute(now=True)
816
817 resp = None
818
819 for i in res_iter:
820 r = yield from i
821 resp = r.result
822
823 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
824 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
825 self._log.debug("Got vm request response: %s", resp.resource_info)
826 return resp.resource_info
827
828 @asyncio.coroutine
829 def delete_resource(self, xact):
830 block = xact.block_create()
831
832 self._log.debug("Executing vm request id: %s, action: delete",
833 self._request_id)
834
835 block.add_query_delete(self.resmgr_path)
836
837 yield from block.execute(flags=0, now=True)
838
839 @asyncio.coroutine
840 def read_resource(self, xact):
841 block = xact.block_create()
842
843 self._log.debug("Executing vm request id: %s, action: delete",
844 self._request_id)
845
846 block.add_query_read(self.resmgr_path)
847
848 res_iter = yield from block.execute(flags=0, now=True)
849 for i in res_iter:
850 r = yield from i
851 resp = r.result
852
853 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
854 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
855 self._log.debug("Got vm request response: %s", resp.resource_info)
856 #self._vm_resp = resp.resource_info
857 return resp.resource_info
858
859
860 @asyncio.coroutine
861 def start_component(self):
862 """ This VDUR is active """
863 self._log.debug("Starting component %s for vdud %s vdur %s",
864 self._vdud.vcs_component_ref,
865 self._vdud,
866 self._vdur_id)
867 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
868 self.vm_resp.management_ip)
869
870 @property
871 def active(self):
872 """ Is this VDU active """
873 return True if self._state is VDURecordState.READY else False
874
875 @asyncio.coroutine
876 def instantiation_failed(self, failed_reason=None):
877 """ VDU instantiation failed """
878 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
879 self._state = VDURecordState.FAILED
880 self._state_failed_reason = failed_reason
881 yield from self._vnfr.instantiation_failed(failed_reason)
882
883 @asyncio.coroutine
884 def vdu_is_active(self):
885 """ This VDU is active"""
886 if self.active:
887 self._log.warning("VDU %s was already marked as active", self._vdur_id)
888 return
889
890 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
891
892 if self._vdud.vcs_component_ref is not None:
893 yield from self.start_component()
894
895 self._state = VDURecordState.READY
896
897 if self._vnfr.all_vdus_active():
898 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
899 yield from self._vnfr.is_ready()
900
901 @asyncio.coroutine
902 def instantiate(self, xact, vnfr, config=None):
903 """ Instantiate this VDU """
904 self._state = VDURecordState.INSTANTIATING
905
906 @asyncio.coroutine
907 def on_prepare(xact_info, query_action, ks_path, msg):
908 """ This VDUR is active """
909 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
910 query_action,
911 ks_path,
912 msg)
913
914 if (query_action == rwdts.QueryAction.UPDATE or
915 query_action == rwdts.QueryAction.CREATE):
916 self._vm_resp = msg
917
918 if msg.resource_state == "active":
919 # Move this VDU to ready state
920 yield from self.vdu_is_active()
921 elif msg.resource_state == "failed":
922 yield from self.instantiation_failed(msg.resource_errors)
923 elif query_action == rwdts.QueryAction.DELETE:
924 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
925 else:
926 raise NotImplementedError(
927 "%s action on VirtualDeployementUnitRecord not supported",
928 query_action)
929
930 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
931
932 try:
933 reg_event = asyncio.Event(loop=self._loop)
934
935 @asyncio.coroutine
936 def on_ready(regh, status):
937 reg_event.set()
938
939 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
940 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
941 flags=rwdts.Flag.SUBSCRIBER,
942 handler=handler)
943 yield from reg_event.wait()
944
945 vm_resp = yield from self.create_resource(xact, vnfr, config)
946 self._vm_resp = vm_resp
947
948 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
949 self._log.debug("Requested VM from resource manager response %s",
950 vm_resp)
951 if vm_resp.resource_state == "active":
952 self._log.debug("Resourcemgr responded wih an active vm resp %s",
953 vm_resp)
954 yield from self.vdu_is_active()
955 self._state = VDURecordState.READY
956 elif (vm_resp.resource_state == "pending" or
957 vm_resp.resource_state == "inactive"):
958 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
959 vm_resp)
960 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
961 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
962 # flags=rwdts.Flag.SUBSCRIBER,
963 # handler=handler)
964 else:
965 self._log.debug("Resourcemgr responded wih an error vm resp %s",
966 vm_resp)
967 raise VirtualDeploymentUnitRecordError(
968 "Failed VDUR instantiation %s " % vm_resp)
969
970 except Exception as e:
971 import traceback
972 traceback.print_exc()
973 self._log.exception(e)
974 self._log.error("Instantiation of VDU record failed: %s", str(e))
975 self._state = VDURecordState.FAILED
976 yield from self.instantiation_failed(str(e))
977
978
979 class VlRecordState(enum.Enum):
980 """ VL Record State """
981 INIT = 101
982 INSTANTIATION_PENDING = 102
983 ACTIVE = 103
984 TERMINATE_PENDING = 104
985 TERMINATED = 105
986 FAILED = 106
987
988
989 class InternalVirtualLinkRecord(object):
990 """ Internal Virtual Link record """
991 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
992 self._dts = dts
993 self._log = log
994 self._loop = loop
995 self._ivld_msg = ivld_msg
996 self._vnfr_name = vnfr_name
997 self._cloud_account_name = cloud_account_name
998
999 self._vlr_req = self.create_vlr()
1000 self._vlr = None
1001 self._state = VlRecordState.INIT
1002
1003 @property
1004 def vlr_id(self):
1005 """ Find VLR by id """
1006 return self._vlr_req.id
1007
1008 @property
1009 def name(self):
1010 """ Name of this VL """
1011 return self._vnfr_name + "." + self._ivld_msg.name
1012
1013 @property
1014 def network_id(self):
1015 """ Find VLR by id """
1016 return self._vlr.network_id if self._vlr else None
1017
1018 def vlr_path(self):
1019 """ VLR path for this VLR instance"""
1020 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
1021
1022 def create_vlr(self):
1023 """ Create the VLR record which will be instantiated """
1024
1025 vld_fields = ["short_name",
1026 "vendor",
1027 "description",
1028 "version",
1029 "type_yang",
1030 "provider_network"]
1031
1032 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1033
1034 vlr_dict = {"id": str(uuid.uuid4()),
1035 "name": self.name,
1036 "cloud_account": self._cloud_account_name,
1037 }
1038 vlr_dict.update(vld_copy_dict)
1039
1040 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
1041 return vlr
1042
1043 @asyncio.coroutine
1044 def instantiate(self, xact, restart_mode=False):
1045 """ Instantiate VL """
1046
1047 @asyncio.coroutine
1048 def instantiate_vlr():
1049 """ Instantiate VLR"""
1050 self._log.debug("Create VL with xpath %s and vlr %s",
1051 self.vlr_path(), self._vlr_req)
1052
1053 with self._dts.transaction(flags=0) as xact:
1054 block = xact.block_create()
1055 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1056 self._log.debug("Executing VL create path:%s msg:%s",
1057 self.vlr_path(), self._vlr_req)
1058
1059 res_iter = None
1060 try:
1061 res_iter = yield from block.execute()
1062 except Exception:
1063 self._state = VlRecordState.FAILED
1064 self._log.exception("Caught exception while instantial VL")
1065 raise
1066
1067 for ent in res_iter:
1068 res = yield from ent
1069 self._vlr = res.result
1070
1071 if self._vlr.operational_status == 'failed':
1072 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1073 self._state = VlRecordState.FAILED
1074 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1075
1076 self._log.info("Created VL with xpath %s and vlr %s",
1077 self.vlr_path(), self._vlr)
1078
1079 @asyncio.coroutine
1080 def get_vlr():
1081 """ Get the network id """
1082 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1083 vlr = None
1084 for ent in res_iter:
1085 res = yield from ent
1086 vlr = res.result
1087
1088 if vlr is None:
1089 err = "Failed to get VLR for path %s" % self.vlr_path()
1090 self._log.warn(err)
1091 raise InternalVirtualLinkRecordError(err)
1092 return vlr
1093
1094 self._state = VlRecordState.INSTANTIATION_PENDING
1095
1096 if restart_mode:
1097 vl = yield from get_vlr()
1098 if vl is None:
1099 yield from instantiate_vlr()
1100 else:
1101 yield from instantiate_vlr()
1102
1103 self._state = VlRecordState.ACTIVE
1104
1105 def vlr_in_vns(self):
1106 """ Is there a VLR record in VNS """
1107 if (self._state == VlRecordState.ACTIVE or
1108 self._state == VlRecordState.INSTANTIATION_PENDING or
1109 self._state == VlRecordState.FAILED):
1110 return True
1111
1112 return False
1113
1114 @asyncio.coroutine
1115 def terminate(self, xact):
1116 """Terminate this VL """
1117 if not self.vlr_in_vns():
1118 self._log.debug("Ignoring terminate request for id %s in state %s",
1119 self.vlr_id, self._state)
1120 return
1121
1122 self._log.debug("Terminating VL with path %s", self.vlr_path())
1123 self._state = VlRecordState.TERMINATE_PENDING
1124 block = xact.block_create()
1125 block.add_query_delete(self.vlr_path())
1126 yield from block.execute(flags=0, now=True)
1127 self._state = VlRecordState.TERMINATED
1128 self._log.debug("Terminated VL with path %s", self.vlr_path())
1129
1130
1131 class VirtualNetworkFunctionRecord(object):
1132 """ Virtual Network Function Record """
1133 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1134 self._dts = dts
1135 self._log = log
1136 self._loop = loop
1137 self._cluster_name = cluster_name
1138 self._vnfr_msg = vnfr_msg
1139 self._vnfr_id = vnfr_msg.id
1140 self._vnfd_id = vnfr_msg.vnfd.id
1141 self._vnfm = vnfm
1142 self._vcs_handler = vcs_handler
1143 self._vnfr = vnfr_msg
1144 self._mgmt_network = mgmt_network
1145
1146 self._vnfd = vnfr_msg.vnfd
1147 self._state = VirtualNetworkFunctionRecordState.INIT
1148 self._state_failed_reason = None
1149 self._ext_vlrs = {} # The list of external virtual links
1150 self._vlrs = [] # The list of internal virtual links
1151 self._vdus = [] # The list of vdu
1152 self._vlr_by_cp = {}
1153 self._cprs = []
1154 self._inventory = {}
1155 self._create_time = int(time.time())
1156 self._vnf_mon = None
1157 self._config_status = vnfr_msg.config_status
1158 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1159 self._rw_vnfd = None
1160 self._vnfd_ref_count = 0
1161
1162 def _get_vdur_from_vdu_id(self, vdu_id):
1163 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1164 self._log.debug("Searching through vdus: %s", self._vdus)
1165 for vdu in self._vdus:
1166 self._log.debug("vdu_id: %s", vdu.vdu_id)
1167 if vdu.vdu_id == vdu_id:
1168 return vdu
1169
1170 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1171
1172 @property
1173 def operational_status(self):
1174 """ Operational status of this VNFR """
1175 op_status_map = {"INIT": "init",
1176 "VL_INIT_PHASE": "vl_init_phase",
1177 "VM_INIT_PHASE": "vm_init_phase",
1178 "READY": "running",
1179 "TERMINATE": "terminate",
1180 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1181 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1182 "TERMINATED": "terminated",
1183 "FAILED": "failed", }
1184 return op_status_map[self._state.name]
1185
1186 @staticmethod
1187 def vnfd_xpath(vnfd_id):
1188 """ VNFD xpath associated with this VNFR """
1189 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1190
1191 @property
1192 def vnfd_ref_count(self):
1193 """ Returns the VNFD reference count associated with this VNFR """
1194 return self._vnfd_ref_count
1195
1196 def vnfd_in_use(self):
1197 """ Returns whether vnfd is in use or not """
1198 return True if self._vnfd_ref_count > 0 else False
1199
1200 def vnfd_ref(self):
1201 """ Take a reference on this object """
1202 self._vnfd_ref_count += 1
1203 return self._vnfd_ref_count
1204
1205 def vnfd_unref(self):
1206 """ Release reference on this object """
1207 if self._vnfd_ref_count < 1:
1208 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1209 (self.vnfd.id, self._vnfd_ref_count))
1210 self._log.critical(msg)
1211 raise VnfRecordError(msg)
1212 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1213 self.vnfd.id, self._vnfd_ref_count)
1214 self._vnfd_ref_count -= 1
1215 return self._vnfd_ref_count
1216
1217 @property
1218 def vnfd(self):
1219 """ VNFD for this VNFR """
1220 return self._vnfd
1221
1222 @property
1223 def vnf_name(self):
1224 """ VNFD name associated with this VNFR """
1225 return self.vnfd.name
1226
1227 @property
1228 def name(self):
1229 """ Name of this VNF in the record """
1230 return self._vnfr.name
1231
1232 @property
1233 def cloud_account_name(self):
1234 """ Name of the cloud account this VNFR is instantiated in """
1235 return self._vnfr.cloud_account
1236
1237 @property
1238 def vnfd_id(self):
1239 """ VNFD Id associated with this VNFR """
1240 return self.vnfd.id
1241
1242 @property
1243 def vnfr_id(self):
1244 """ VNFR Id associated with this VNFR """
1245 return self._vnfr_id
1246
1247 @property
1248 def member_vnf_index(self):
1249 """ Member VNF index associated with this VNFR """
1250 return self._vnfr.member_vnf_index_ref
1251
1252 @property
1253 def config_status(self):
1254 """ Config agent status for this VNFR """
1255 return self._config_status
1256
1257 def component_by_name(self, component_name):
1258 """ Find a component by name in the inventory list"""
1259 mangled_name = VcsComponent.mangle_name(component_name,
1260 self.vnf_name,
1261 self.vnfd_id)
1262 return self._inventory[mangled_name]
1263
1264
1265
1266 @asyncio.coroutine
1267 def get_nsr_config(self):
1268 ### Need access to NS instance configuration for runtime resolution.
1269 ### This shall be replaced when deployment flavors are implemented
1270 xpath = "C,/nsr:ns-instance-config"
1271 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1272
1273 for result in results:
1274 entry = yield from result
1275 ns_instance_config = entry.result
1276 for nsr in ns_instance_config.nsr:
1277 if nsr.id == self._vnfr_msg.nsr_id_ref:
1278 return nsr
1279 return None
1280
1281 @asyncio.coroutine
1282 def start_component(self, component_name, ip_addr):
1283 """ Start a component in the VNFR by name """
1284 comp = self.component_by_name(component_name)
1285 yield from comp.start(None, None, ip_addr)
1286
1287 def cp_ip_addr(self, cp_name):
1288 """ Get ip address for connection point """
1289 self._log.debug("cp_ip_addr()")
1290 for cp in self._cprs:
1291 if cp.name == cp_name and cp.ip_address is not None:
1292 return cp.ip_address
1293 return "0.0.0.0"
1294
1295 def mgmt_intf_info(self):
1296 """ Get Management interface info for this VNFR """
1297 mgmt_intf_desc = self.vnfd.mgmt_interface
1298 ip_addr = None
1299 if mgmt_intf_desc.has_field("cp"):
1300 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1301 elif mgmt_intf_desc.has_field("vdu_id"):
1302 try:
1303 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1304 ip_addr = vdur.management_ip
1305 except VDURecordNotFound:
1306 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1307 ip_addr = None
1308 else:
1309 ip_addr = mgmt_intf_desc.ip_address
1310 port = mgmt_intf_desc.port
1311
1312 return ip_addr, port
1313
1314 @property
1315 def msg(self):
1316 """ Message associated with this VNFR """
1317 vnfd_fields = ["short_name", "vendor", "description", "version"]
1318 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1319
1320 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1321 ip_address, port = self.mgmt_intf_info()
1322
1323 if ip_address is not None:
1324 mgmt_intf.ip_address = ip_address
1325 if port is not None:
1326 mgmt_intf.port = port
1327
1328 vnfr_dict = {"id": self._vnfr_id,
1329 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1330 "name": self.name,
1331 "member_vnf_index_ref": self.member_vnf_index,
1332 "operational_status": self.operational_status,
1333 "operational_status_details": self._state_failed_reason,
1334 "cloud_account": self.cloud_account_name,
1335 "config_status": self._config_status
1336 }
1337
1338 vnfr_dict.update(vnfd_copy_dict)
1339
1340 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1341 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1342
1343 vnfr_msg.create_time = self._create_time
1344 vnfr_msg.uptime = int(time.time()) - self._create_time
1345 vnfr_msg.mgmt_interface = mgmt_intf
1346
1347 # Add all the VLRs to VNFR
1348 for vlr in self._vlrs:
1349 ivlr = vnfr_msg.internal_vlr.add()
1350 ivlr.vlr_ref = vlr.vlr_id
1351
1352 # Add all the VDURs to VDUR
1353 if self._vdus is not None:
1354 for vdu in self._vdus:
1355 vdur = vnfr_msg.vdur.add()
1356 vdur.from_dict(vdu.msg.as_dict())
1357
1358 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1359 vnfr_msg.dashboard_url = self.dashboard_url
1360
1361 for cpr in self._cprs:
1362 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1363 vnfr_msg.connection_point.append(new_cp)
1364
1365 if self._vnf_mon is not None:
1366 for monp in self._vnf_mon.msg:
1367 vnfr_msg.monitoring_param.append(
1368 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1369
1370 if self._vnfr.vnf_configuration is not None:
1371 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1372 if (ip_address is not None and
1373 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1374 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1375
1376 for group in self._vnfr_msg.placement_groups_info:
1377 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1378 group_info.from_dict(group.as_dict())
1379 vnfr_msg.placement_groups_info.append(group_info)
1380
1381 return vnfr_msg
1382
1383 @property
1384 def dashboard_url(self):
1385 ip, cfg_port = self.mgmt_intf_info()
1386 protocol = 'http'
1387 http_port = 80
1388 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1389 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1390 protocol = 'https'
1391 http_port = 443
1392 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1393 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1394
1395 url = "{protocol}://{ip_address}:{port}/{path}".format(
1396 protocol=protocol,
1397 ip_address=ip,
1398 port=http_port,
1399 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1400 )
1401
1402 return url
1403
1404 @property
1405 def xpath(self):
1406 """ path for this VNFR """
1407 return("D,/vnfr:vnfr-catalog"
1408 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1409
1410 @asyncio.coroutine
1411 def publish(self, xact):
1412 """ publish this VNFR """
1413 vnfr = self.msg
1414 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1415 self.xpath, self.msg)
1416 vnfr.create_time = self._create_time
1417 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1418 self._log.debug("Published VNFR path = [%s], record = [%s]",
1419 self.xpath, self.msg)
1420
1421 @asyncio.coroutine
1422 def create_vls(self):
1423 """ Publish The VLs associated with this VNF """
1424 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1425 self.vnfd_id)
1426 for ivld_msg in self.vnfd.internal_vld:
1427 self._log.debug("Creating internal vld:"
1428 " %s, int_cp_ref = %s",
1429 ivld_msg, ivld_msg.internal_connection_point
1430 )
1431 vlr = InternalVirtualLinkRecord(dts=self._dts,
1432 log=self._log,
1433 loop=self._loop,
1434 ivld_msg=ivld_msg,
1435 vnfr_name=self.name,
1436 cloud_account_name=self.cloud_account_name
1437 )
1438 self._vlrs.append(vlr)
1439
1440 for int_cp in ivld_msg.internal_connection_point:
1441 if int_cp.id_ref in self._vlr_by_cp:
1442 msg = ("Connection point %s already "
1443 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1444 raise InternalVirtualLinkRecordError(msg)
1445 self._log.debug("Setting vlr %s to internal cp = %s",
1446 vlr, int_cp.id_ref)
1447 self._vlr_by_cp[int_cp.id_ref] = vlr
1448
1449 @asyncio.coroutine
1450 def instantiate_vls(self, xact, restart_mode=False):
1451 """ Instantiate the VLs associated with this VNF """
1452 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1453 self.vnfd_id)
1454
1455 for vlr in self._vlrs:
1456 self._log.debug("Instantiating VLR %s", vlr)
1457 yield from vlr.instantiate(xact, restart_mode)
1458
1459 def find_vlr_by_cp(self, cp_name):
1460 """ Find the VLR associated with the cp name """
1461 return self._vlr_by_cp[cp_name]
1462
1463 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1464 """
1465 Returns the cloud specific construct for placement group
1466 Arguments:
1467 input_group: VNFD PlacementGroup
1468 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1469 """
1470 copy_dict = ['name', 'requirement', 'strategy']
1471 for group_info in nsr_config.vnfd_placement_group_maps:
1472 if group_info.placement_group_ref == input_group.name and \
1473 group_info.vnfd_id_ref == self.vnfd_id:
1474 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1475 group_dict = {k:v for k,v in
1476 group_info.as_dict().items()
1477 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1478 for param in copy_dict:
1479 group_dict.update({param: getattr(input_group, param)})
1480 group.from_dict(group_dict)
1481 return group
1482 return None
1483
1484 @asyncio.coroutine
1485 def get_vdu_placement_groups(self, vdu):
1486 placement_groups = []
1487 ### Step-1: Get VNF level placement groups
1488 for group in self._vnfr_msg.placement_groups_info:
1489 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1490 #group_info.from_dict(group.as_dict())
1491 placement_groups.append(group)
1492
1493 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1494 nsr_config = yield from self.get_nsr_config()
1495
1496 ### Step-3: Get VDU level placement groups
1497 for group in self.vnfd.placement_groups:
1498 for member_vdu in group.member_vdus:
1499 if member_vdu.member_vdu_ref == vdu.id:
1500 group_info = self.resolve_placement_group_cloud_construct(group,
1501 nsr_config)
1502 if group_info is None:
1503 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1504 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1505 else:
1506 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1507 str(group_info),
1508 vdu.name,
1509 self.vnf_name,
1510 self.member_vnf_index)
1511 placement_groups.append(group_info)
1512
1513 return placement_groups
1514
1515 @asyncio.coroutine
1516 def create_vdus(self, vnfr, restart_mode=False):
1517 """ Create the VDUs associated with this VNF """
1518
1519 def get_vdur_id(vdud):
1520 """Get the corresponding VDUR's id for the VDUD. This is useful in
1521 case of a restart.
1522
1523 In restart mode we check for exiting VDUR's ID and use them, if
1524 available. This way we don't end up creating duplicate VDURs
1525 """
1526 vdur_id = None
1527
1528 if restart_mode and vdud is not None:
1529 try:
1530 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1531 vdur_id = vdur[0]
1532 except IndexError:
1533 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1534
1535 return vdur_id
1536
1537
1538 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1539 for vdu in self._rw_vnfd.vdu:
1540 self._log.debug("Creating vdu: %s", vdu)
1541 vdur_id = get_vdur_id(vdu)
1542
1543 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1544 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1545 vdu.name,
1546 self.vnf_name,
1547 self.member_vnf_index,
1548 [ group.name for group in placement_groups])
1549
1550 vdur = VirtualDeploymentUnitRecord(
1551 dts=self._dts,
1552 log=self._log,
1553 loop=self._loop,
1554 vdud=vdu,
1555 vnfr=vnfr,
1556 mgmt_intf=self.has_mgmt_interface(vdu),
1557 mgmt_network=self._mgmt_network,
1558 cloud_account_name=self.cloud_account_name,
1559 vnfd_package_store=self._vnfd_package_store,
1560 vdur_id=vdur_id,
1561 placement_groups = placement_groups,
1562 )
1563 yield from vdur.vdu_opdata_register()
1564
1565 self._vdus.append(vdur)
1566
1567 @asyncio.coroutine
1568 def instantiate_vdus(self, xact, vnfr):
1569 """ Instantiate the VDUs associated with this VNF """
1570 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1571
1572 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1573
1574 # Identify any dependencies among the VDUs
1575 dependencies = collections.defaultdict(list)
1576 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1577
1578 for vdu in self._vdus:
1579 if vdu.vdud_cloud_init is not None:
1580 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1581 if vdu_id != vdu.vdu_id:
1582 # This means that vdu.vdu_id depends upon vdu_id,
1583 # i.e. vdu_id must be instantiated before
1584 # vdu.vdu_id.
1585 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1586
1587 # Define the terminal states of VDU instantiation
1588 terminal = (
1589 VDURecordState.READY,
1590 VDURecordState.TERMINATED,
1591 VDURecordState.FAILED,
1592 )
1593
1594 datastore = VdurDatastore()
1595 processed = set()
1596
1597 @asyncio.coroutine
1598 def instantiate_monitor(vdu):
1599 """Monitor the state of the VDU during instantiation
1600
1601 Arguments:
1602 vdu - a VirtualDeploymentUnitRecord
1603
1604 """
1605 # wait for the VDUR to enter a terminal state
1606 while vdu._state not in terminal:
1607 yield from asyncio.sleep(1, loop=self._loop)
1608
1609 # update the datastore
1610 datastore.update(vdu)
1611
1612 # add the VDU to the set of processed VDUs
1613 processed.add(vdu.vdu_id)
1614
1615 @asyncio.coroutine
1616 def instantiate(vdu):
1617 """Instantiate the specified VDU
1618
1619 Arguments:
1620 vdu - a VirtualDeploymentUnitRecord
1621
1622 Raises:
1623 if the VDU, or any of the VDUs this VDU depends upon, are
1624 terminated or fail to instantiate properly, a
1625 VirtualDeploymentUnitRecordError is raised.
1626
1627 """
1628 for dependency in dependencies[vdu.vdu_id]:
1629 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1630
1631 while dependency.vdu_id not in processed:
1632 yield from asyncio.sleep(1, loop=self._loop)
1633
1634 if not dependency.active:
1635 raise VirtualDeploymentUnitRecordError()
1636
1637 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1638
1639 # Populate the datastore with the current values of the VDU
1640 datastore.add(vdu)
1641
1642 # Substitute any variables contained in the cloud config script
1643 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1644
1645 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1646 if len(parts) > 1:
1647
1648 # Extract the variable names
1649 variables = list()
1650 for variable in parts[1::2]:
1651 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1652
1653 # Iterate of the variables and substitute values from the
1654 # datastore.
1655 for variable in variables:
1656
1657 # Handle a reference to a VDU by ID
1658 if variable.startswith('vdu['):
1659 value = datastore.get(variable)
1660 if value is None:
1661 msg = "Unable to find a substitute for {} in {} cloud-init script"
1662 raise ValueError(msg.format(variable, vdu.vdu_id))
1663
1664 config = config.replace("{{ %s }}" % variable, value)
1665 continue
1666
1667 # Handle a reference to the current VDU
1668 if variable.startswith('vdu'):
1669 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1670 config = config.replace("{{ %s }}" % variable, value)
1671 continue
1672
1673 # Handle unrecognized variables
1674 msg = 'unrecognized cloud-config variable: {}'
1675 raise ValueError(msg.format(variable))
1676
1677 # Instantiate the VDU
1678 with self._dts.transaction() as xact:
1679 self._log.debug("Instantiating vdu: %s", vdu)
1680 yield from vdu.instantiate(xact, vnfr, config=config)
1681 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1682 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1683 self.vnfr_id, vdu)
1684
1685 # First create a set of tasks to monitor the state of the VDUs and
1686 # report when they have entered a terminal state
1687 for vdu in self._vdus:
1688 self._loop.create_task(instantiate_monitor(vdu))
1689
1690 for vdu in self._vdus:
1691 self._loop.create_task(instantiate(vdu))
1692
1693 def has_mgmt_interface(self, vdu):
1694 # ## TODO: Support additional mgmt_interface type options
1695 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1696 return True
1697 return False
1698
1699 def vlr_xpath(self, vlr_id):
1700 """ vlr xpath """
1701 return(
1702 "D,/vlr:vlr-catalog/"
1703 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1704
1705 def ext_vlr_by_id(self, vlr_id):
1706 """ find ext vlr by id """
1707 return self._ext_vlrs[vlr_id]
1708
1709 @asyncio.coroutine
1710 def publish_inventory(self, xact):
1711 """ Publish the inventory associated with this VNF """
1712 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1713
1714 for component in self._rw_vnfd.component:
1715 self._log.debug("Creating inventory component %s", component)
1716 mangled_name = VcsComponent.mangle_name(component.component_name,
1717 self.vnf_name,
1718 self.vnfd_id
1719 )
1720 comp = VcsComponent(dts=self._dts,
1721 log=self._log,
1722 loop=self._loop,
1723 cluster_name=self._cluster_name,
1724 vcs_handler=self._vcs_handler,
1725 component=component,
1726 mangled_name=mangled_name,
1727 )
1728 if comp.name in self._inventory:
1729 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1730 component, self._vnfd_id)
1731 return
1732 self._log.debug("Adding component %s for vnrf %s",
1733 comp.name, self._vnfr_id)
1734 self._inventory[comp.name] = comp
1735 yield from comp.publish(xact)
1736
1737 def all_vdus_active(self):
1738 """ Are all VDUS in this VNFR active? """
1739 for vdu in self._vdus:
1740 if not vdu.active:
1741 return False
1742
1743 self._log.debug("Inside all_vdus_active. Returning True")
1744 return True
1745
1746 @asyncio.coroutine
1747 def instantiation_failed(self, failed_reason=None):
1748 """ VNFR instantiation failed """
1749 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1750 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1751 self._state_failed_reason = failed_reason
1752
1753 # Update the VNFR with the changed status
1754 yield from self.publish(None)
1755
1756 @asyncio.coroutine
1757 def is_ready(self):
1758 """ This VNF is ready"""
1759 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1760
1761 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1762 self.set_state(VirtualNetworkFunctionRecordState.READY)
1763
1764 else:
1765 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1766
1767 # Update the VNFR with the changed status
1768 yield from self.publish(None)
1769
1770 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1771 """Updated the connection point with ip address"""
1772 for cp in self._cprs:
1773 if cp.name == cp_name:
1774 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1775 cp_name, cp, ip_address, cp_id)
1776 cp.ip_address = ip_address
1777 cp.mac_address = mac_addr
1778 cp.connection_point_id = cp_id
1779 return
1780
1781 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1782 self._log.debug(err)
1783 raise VirtualDeploymentUnitRecordError(err)
1784
1785 def set_state(self, state):
1786 """ Set state for this VNFR"""
1787 self._state = state
1788
1789 @asyncio.coroutine
1790 def instantiate(self, xact, restart_mode=False):
1791 """ instantiate this VNF """
1792 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1793 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1794
1795 @asyncio.coroutine
1796 def fetch_vlrs():
1797 """ Fetch VLRs """
1798 # Iterate over all the connection points in VNFR and fetch the
1799 # associated VLRs
1800
1801 def cpr_from_cp(cp):
1802 """ Creates a record level connection point from the desciptor cp"""
1803 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1804 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1805 cpr_dict = {}
1806 cpr_dict.update(cp_copy_dict)
1807 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1808
1809 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1810 self._vnfr_id, self._vnfr.connection_point)
1811
1812 for cp in self._vnfr.connection_point:
1813 cpr = cpr_from_cp(cp)
1814 self._cprs.append(cpr)
1815 self._log.debug("Adding Connection point record %s ", cp)
1816
1817 vlr_path = self.vlr_xpath(cp.vlr_ref)
1818 self._log.debug("Fetching VLR with path = %s", vlr_path)
1819 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1820 rwdts.XactFlag.MERGE)
1821 for i in res_iter:
1822 r = yield from i
1823 d = r.result
1824 self._ext_vlrs[cp.vlr_ref] = d
1825 cpr.vlr_ref = cp.vlr_ref
1826 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1827
1828 # Increase the VNFD reference count
1829 self.vnfd_ref()
1830
1831 assert self.vnfd
1832
1833 # Fetch External VLRs
1834 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1835 yield from fetch_vlrs()
1836
1837 # Publish inventory
1838 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1839 yield from self.publish_inventory(xact)
1840
1841 # Publish inventory
1842 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1843 yield from self.create_vls()
1844
1845 # publish the VNFR
1846 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1847 yield from self.publish(xact)
1848
1849 # instantiate VLs
1850 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1851 try:
1852 yield from self.instantiate_vls(xact, restart_mode)
1853 except Exception as e:
1854 self._log.exception("VL instantiation failed (%s)", str(e))
1855 yield from self.instantiation_failed(str(e))
1856 return
1857
1858 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1859
1860 # instantiate VDUs
1861 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1862 yield from self.create_vdus(self, restart_mode)
1863
1864 # publish the VNFR
1865 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1866 yield from self.publish(xact)
1867
1868 # instantiate VDUs
1869 # ToDo: Check if this should be prevented during restart
1870 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1871 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1872
1873 # publish the VNFR
1874 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1875 yield from self.publish(xact)
1876
1877 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1878
1879 # create task updating uptime for this vnfr
1880 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1881 self._loop.create_task(self.vnfr_uptime_update(xact))
1882
1883 @asyncio.coroutine
1884 def terminate(self, xact):
1885 """ Terminate this virtual network function """
1886
1887 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1888
1889 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1890
1891 # stop monitoring
1892 if self._vnf_mon is not None:
1893 self._vnf_mon.stop()
1894 self._vnf_mon.deregister()
1895 self._vnf_mon = None
1896
1897 @asyncio.coroutine
1898 def terminate_vls():
1899 """ Terminate VLs in this VNF """
1900 for vl in self._vlrs:
1901 yield from vl.terminate(xact)
1902
1903 @asyncio.coroutine
1904 def terminate_vdus():
1905 """ Terminate VDUS in this VNF """
1906 for vdu in self._vdus:
1907 yield from vdu.terminate(xact)
1908
1909 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1910 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1911 yield from terminate_vls()
1912
1913 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1914 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1915 yield from terminate_vdus()
1916
1917 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1918 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1919
1920 @asyncio.coroutine
1921 def vnfr_uptime_update(self, xact):
1922 while True:
1923 # Return when vnfr state is FAILED or TERMINATED etc
1924 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1925 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1926 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1927 VirtualNetworkFunctionRecordState.READY]:
1928 return
1929 yield from self.publish(xact)
1930 yield from asyncio.sleep(2, loop=self._loop)
1931
1932
1933
1934 class VnfdDtsHandler(object):
1935 """ DTS handler for VNFD config changes """
1936 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1937
1938 def __init__(self, dts, log, loop, vnfm):
1939 self._dts = dts
1940 self._log = log
1941 self._loop = loop
1942 self._vnfm = vnfm
1943 self._regh = None
1944
1945 @asyncio.coroutine
1946 def regh(self):
1947 """ DTS registration handle """
1948 return self._regh
1949
1950 @asyncio.coroutine
1951 def register(self):
1952 """ Register for VNFD configuration"""
1953
1954 def on_apply(dts, acg, xact, action, scratch):
1955 """Apply the configuration"""
1956 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1957 xact, action, scratch)
1958
1959 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1960
1961 @asyncio.coroutine
1962 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1963 """ on prepare callback """
1964 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1965 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1966 fref = ProtobufC.FieldReference.alloc()
1967 fref.goto_whole_message(msg.to_pbcm())
1968
1969 # Handle deletes in prepare_callback
1970 if fref.is_field_deleted():
1971 # Delete an VNFD record
1972 self._log.debug("Deleting VNFD with id %s", msg.id)
1973 if self._vnfm.vnfd_in_use(msg.id):
1974 self._log.debug("Cannot delete VNFD in use - %s", msg)
1975 err = "Cannot delete a VNFD in use - %s" % msg
1976 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1977 # Delete a VNFD record
1978 yield from self._vnfm.delete_vnfd(msg.id)
1979
1980 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1981
1982 self._log.debug(
1983 "Registering for VNFD config using xpath: %s",
1984 VnfdDtsHandler.XPATH,
1985 )
1986 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1987 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1988 self._regh = acg.register(
1989 xpath=VnfdDtsHandler.XPATH,
1990 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1991 on_prepare=on_prepare)
1992
1993
1994 class VcsComponentDtsHandler(object):
1995 """ Vcs Component DTS handler """
1996 XPATH = ("D,/rw-manifest:manifest" +
1997 "/rw-manifest:operational-inventory" +
1998 "/rw-manifest:component")
1999
2000 def __init__(self, dts, log, loop, vnfm):
2001 self._dts = dts
2002 self._log = log
2003 self._loop = loop
2004 self._regh = None
2005 self._vnfm = vnfm
2006
2007 @property
2008 def regh(self):
2009 """ DTS registration handle """
2010 return self._regh
2011
2012 @asyncio.coroutine
2013 def register(self):
2014 """ Registers VCS component dts publisher registration"""
2015 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2016 VcsComponentDtsHandler.XPATH)
2017
2018 hdl = rift.tasklets.DTS.RegistrationHandler()
2019 handlers = rift.tasklets.Group.Handler()
2020 with self._dts.group_create(handler=handlers) as group:
2021 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2022 handler=hdl,
2023 flags=(rwdts.Flag.PUBLISHER |
2024 rwdts.Flag.NO_PREP_READ |
2025 rwdts.Flag.DATASTORE),)
2026
2027 @asyncio.coroutine
2028 def publish(self, xact, path, msg):
2029 """ Publishes the VCS component """
2030 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2031 xact, path, msg)
2032 self.regh.create_element(path, msg)
2033 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2034 VcsComponentDtsHandler.XPATH, xact, path, msg)
2035
2036 class VnfrConsoleOperdataDtsHandler(object):
2037 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
2038 @property
2039 def vnfr_vdu_console_xpath(self):
2040 """ path for resource-mgr"""
2041 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2042
2043 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2044 self._dts = dts
2045 self._log = log
2046 self._loop = loop
2047 self._regh = None
2048 self._vnfm = vnfm
2049
2050 self._vnfr_id = vnfr_id
2051 self._vdur_id = vdur_id
2052 self._vdu_id = vdu_id
2053
2054 @asyncio.coroutine
2055 def register(self):
2056 """ Register for VNFR VDU Operational Data read from dts """
2057
2058 @asyncio.coroutine
2059 def on_prepare(xact_info, action, ks_path, msg):
2060 """ prepare callback from dts """
2061 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2062 self._log.debug(
2063 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2064 xact_info, action, xpath, msg
2065 )
2066
2067 if action == rwdts.QueryAction.READ:
2068 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2069 path_entry = schema.keyspec_to_entry(ks_path)
2070 self._log.debug("VDU Opdata path is {}".format(path_entry))
2071 try:
2072 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2073 except VnfRecordError as e:
2074 self._log.error("VNFR id %s not found", self._vnfr_id)
2075 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2076 return
2077 try:
2078 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2079 if not vdur._state == VDURecordState.READY:
2080 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2081 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2082 return
2083 with self._dts.transaction() as new_xact:
2084 resp = yield from vdur.read_resource(new_xact)
2085 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2086 vdur_console.id = self._vdur_id
2087 if resp.console_url:
2088 vdur_console.console_url = resp.console_url
2089 else:
2090 vdur_console.console_url = 'none'
2091 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2092 except Exception:
2093 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2094 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2095 vdur_console.id = self._vdur_id
2096 vdur_console.console_url = 'none'
2097
2098 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2099 xpath=self.vnfr_vdu_console_xpath,
2100 msg=vdur_console)
2101 else:
2102 #raise VnfRecordError("Not supported operation %s" % action)
2103 self._log.error("Not supported operation %s" % action)
2104 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2105 return
2106
2107
2108 self._log.debug("Registering for VNFR VDU using xpath: %s",
2109 self.vnfr_vdu_console_xpath)
2110 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2111 with self._dts.group_create() as group:
2112 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2113 handler=hdl,
2114 flags=rwdts.Flag.PUBLISHER,
2115 )
2116
2117
2118 class VnfrDtsHandler(object):
2119 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2120 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2121
2122 def __init__(self, dts, log, loop, vnfm):
2123 self._dts = dts
2124 self._log = log
2125 self._loop = loop
2126 self._vnfm = vnfm
2127
2128 self._regh = None
2129
2130 @property
2131 def regh(self):
2132 """ Return registration handle"""
2133 return self._regh
2134
2135 @property
2136 def vnfm(self):
2137 """ Return VNF manager instance """
2138 return self._vnfm
2139
2140 @asyncio.coroutine
2141 def register(self):
2142 """ Register for vnfr create/update/delete/read requests from dts """
2143 def on_commit(xact_info):
2144 """ The transaction has been committed """
2145 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2146 return rwdts.MemberRspCode.ACTION_OK
2147
2148 def on_abort(*args):
2149 """ Abort callback """
2150 self._log.debug("VNF transaction got aborted")
2151
2152 @asyncio.coroutine
2153 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2154
2155 @asyncio.coroutine
2156 def instantiate_realloc_vnfr(vnfr):
2157 """Re-populate the vnfm after restart
2158
2159 Arguments:
2160 vlink
2161
2162 """
2163
2164 yield from vnfr.instantiate(None, restart_mode=True)
2165
2166 if xact_event == rwdts.MemberEvent.INSTALL:
2167 curr_cfg = self.regh.elements
2168 for cfg in curr_cfg:
2169 vnfr = self.vnfm.create_vnfr(cfg)
2170 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2171
2172 self._log.debug("Got on_event in vnfm")
2173
2174 return rwdts.MemberRspCode.ACTION_OK
2175
2176 @asyncio.coroutine
2177 def on_prepare(xact_info, action, ks_path, msg):
2178 """ prepare callback from dts """
2179 self._log.debug(
2180 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2181 xact_info, action, msg
2182 )
2183
2184 if action == rwdts.QueryAction.CREATE:
2185 if not msg.has_field("vnfd"):
2186 err = "Vnfd not provided"
2187 self._log.error(err)
2188 raise VnfRecordError(err)
2189
2190 vnfr = self.vnfm.create_vnfr(msg)
2191 try:
2192 # RIFT-9105: Unable to add a READ query under an existing transaction
2193 # xact = xact_info.xact
2194 yield from vnfr.instantiate(None)
2195 except Exception as e:
2196 self._log.exception(e)
2197 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2198 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2199 yield from vnfr.publish(None)
2200 elif action == rwdts.QueryAction.DELETE:
2201 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2202 path_entry = schema.keyspec_to_entry(ks_path)
2203 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2204
2205 if vnfr is None:
2206 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2207 raise VirtualNetworkFunctionRecordNotFound(
2208 "VNFR id %s", path_entry.key00.id)
2209
2210 try:
2211 yield from vnfr.terminate(xact_info.xact)
2212 # Unref the VNFD
2213 vnfr.vnfd_unref()
2214 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2215 except Exception as e:
2216 self._log.exception(e)
2217 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2218
2219 elif action == rwdts.QueryAction.UPDATE:
2220 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2221 path_entry = schema.keyspec_to_entry(ks_path)
2222 vnfr = None
2223 try:
2224 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2225 except Exception as e:
2226 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2227 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2228 return
2229
2230 if vnfr is None:
2231 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2232 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2233 return
2234
2235 self._log.debug("VNFR {} update config status {} (current {})".
2236 format(vnfr.name, msg.config_status, vnfr.config_status))
2237 # Update the config status and publish
2238 vnfr._config_status = msg.config_status
2239 yield from vnfr.publish(None)
2240
2241 else:
2242 raise NotImplementedError(
2243 "%s action on VirtualNetworkFunctionRecord not supported",
2244 action)
2245
2246 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2247
2248 self._log.debug("Registering for VNFR using xpath: %s",
2249 VnfrDtsHandler.XPATH,)
2250
2251 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2252 on_prepare=on_prepare,)
2253 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2254 with self._dts.group_create(handler=handlers) as group:
2255 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2256 handler=hdl,
2257 flags=(rwdts.Flag.PUBLISHER |
2258 rwdts.Flag.NO_PREP_READ |
2259 rwdts.Flag.CACHE |
2260 rwdts.Flag.DATASTORE),)
2261
2262 @asyncio.coroutine
2263 def create(self, xact, path, msg):
2264 """
2265 Create a VNFR record in DTS with path and message
2266 """
2267 self._log.debug("Creating VNFR xact = %s, %s:%s",
2268 xact, path, msg)
2269
2270 self.regh.create_element(path, msg)
2271 self._log.debug("Created VNFR xact = %s, %s:%s",
2272 xact, path, msg)
2273
2274 @asyncio.coroutine
2275 def update(self, xact, path, msg):
2276 """
2277 Update a VNFR record in DTS with path and message
2278 """
2279 self._log.debug("Updating VNFR xact = %s, %s:%s",
2280 xact, path, msg)
2281 self.regh.update_element(path, msg)
2282 self._log.debug("Updated VNFR xact = %s, %s:%s",
2283 xact, path, msg)
2284
2285 @asyncio.coroutine
2286 def delete(self, xact, path):
2287 """
2288 Delete a VNFR record in DTS with path and message
2289 """
2290 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2291 self.regh.delete_element(path)
2292 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2293
2294
2295 class VnfdRefCountDtsHandler(object):
2296 """ The VNFD Ref Count DTS handler """
2297 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2298
2299 def __init__(self, dts, log, loop, vnfm):
2300 self._dts = dts
2301 self._log = log
2302 self._loop = loop
2303 self._vnfm = vnfm
2304
2305 self._regh = None
2306
2307 @property
2308 def regh(self):
2309 """ Return registration handle """
2310 return self._regh
2311
2312 @property
2313 def vnfm(self):
2314 """ Return the NS manager instance """
2315 return self._vnfm
2316
2317 @asyncio.coroutine
2318 def register(self):
2319 """ Register for VNFD ref count read from dts """
2320
2321 @asyncio.coroutine
2322 def on_prepare(xact_info, action, ks_path, msg):
2323 """ prepare callback from dts """
2324 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2325 self._log.debug(
2326 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2327 xact_info, action, xpath, msg
2328 )
2329
2330 if action == rwdts.QueryAction.READ:
2331 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2332 path_entry = schema.keyspec_to_entry(ks_path)
2333 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2334 for xpath, msg in vnfd_list:
2335 self._log.debug("Responding to ref count query path:%s, msg:%s",
2336 xpath, msg)
2337 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2338 xpath=xpath,
2339 msg=msg)
2340 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2341 else:
2342 raise VnfRecordError("Not supported operation %s" % action)
2343
2344 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2345 with self._dts.group_create() as group:
2346 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2347 handler=hdl,
2348 flags=rwdts.Flag.PUBLISHER,
2349 )
2350
2351
2352 class VdurDatastore(object):
2353 """
2354 This VdurDatastore is intended to expose select information about a VDUR
2355 such that it can be referenced in a cloud config file. The data that is
2356 exposed does not necessarily follow the structure of the data in the yang
2357 model. This is intentional. The data that are exposed are intended to be
2358 agnostic of the yang model so that changes in the model do not necessarily
2359 require changes to the interface provided to the user. It also means that
2360 the user does not need to be familiar with the RIFT.ware yang models.
2361 """
2362
2363 def __init__(self):
2364 """Create an instance of VdurDatastore"""
2365 self._vdur_data = dict()
2366 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2367
2368 def add(self, vdur):
2369 """Add a new VDUR to the datastore
2370
2371 Arguments:
2372 vdur - a VirtualDeploymentUnitRecord instance
2373
2374 Raises:
2375 A ValueError is raised if the VDUR is (1) None or (2) already in
2376 the datastore.
2377
2378 """
2379 if vdur.vdu_id is None:
2380 raise ValueError('VDURs are required to have an ID')
2381
2382 if vdur.vdu_id in self._vdur_data:
2383 raise ValueError('cannot add a VDUR more than once')
2384
2385 self._vdur_data[vdur.vdu_id] = dict()
2386
2387 def set_if_not_none(key, attr):
2388 if attr is not None:
2389 self._vdur_data[vdur.vdu_id][key] = attr
2390
2391 set_if_not_none('name', vdur._vdud.name)
2392 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2393
2394 def update(self, vdur):
2395 """Update the VDUR information in the datastore
2396
2397 Arguments:
2398 vdur - a GI representation of a VDUR
2399
2400 Raises:
2401 A ValueError is raised if the VDUR is (1) None or (2) already in
2402 the datastore.
2403
2404 """
2405 if vdur.vdu_id is None:
2406 raise ValueError('VNFDs are required to have an ID')
2407
2408 if vdur.vdu_id not in self._vdur_data:
2409 raise ValueError('VNF is not recognized')
2410
2411 def set_or_delete(key, attr):
2412 if attr is None:
2413 if key in self._vdur_data[vdur.vdu_id]:
2414 del self._vdur_data[vdur.vdu_id][key]
2415
2416 else:
2417 self._vdur_data[vdur.vdu_id][key] = attr
2418
2419 set_or_delete('name', vdur._vdud.name)
2420 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2421
2422 def remove(self, vdur_id):
2423 """Remove all of the data associated with specified VDUR
2424
2425 Arguments:
2426 vdur_id - the identifier of a VNFD in the datastore
2427
2428 Raises:
2429 A ValueError is raised if the VDUR is not contained in the
2430 datastore.
2431
2432 """
2433 if vdur_id not in self._vdur_data:
2434 raise ValueError('VNF is not recognized')
2435
2436 del self._vdur_data[vdur_id]
2437
2438 def get(self, expr):
2439 """Retrieve VDUR information from the datastore
2440
2441 An expression should be of the form,
2442
2443 vdu[<id>].<attr>
2444
2445 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2446 the exposed attribute that the user wishes to retrieve.
2447
2448 If the requested data is not available, None is returned.
2449
2450 Arguments:
2451 expr - a string that specifies the data to return
2452
2453 Raises:
2454 A ValueError is raised if the provided expression cannot be parsed.
2455
2456 Returns:
2457 The requested data or None
2458
2459 """
2460 result = self._pattern.match(expr)
2461 if result is None:
2462 raise ValueError('data expression not recognized ({})'.format(expr))
2463
2464 vdur_id, key = result.groups()
2465
2466 if vdur_id not in self._vdur_data:
2467 return None
2468
2469 return self._vdur_data[vdur_id].get(key, None)
2470
2471
2472 class VnfManager(object):
2473 """ The virtual network function manager class """
2474 def __init__(self, dts, log, loop, cluster_name):
2475 self._dts = dts
2476 self._log = log
2477 self._loop = loop
2478 self._cluster_name = cluster_name
2479
2480 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2481 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2482 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2483 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2484
2485 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2486 self._vnfr_handler,
2487 self._vcs_handler,
2488 self._vnfr_ref_handler,
2489 self._nsr_handler]
2490 self._vnfrs = {}
2491 self._vnfds_to_vnfr = {}
2492 self._nsrs = {}
2493
2494 @property
2495 def vnfr_handler(self):
2496 """ VNFR dts handler """
2497 return self._vnfr_handler
2498
2499 @property
2500 def vcs_handler(self):
2501 """ VCS dts handler """
2502 return self._vcs_handler
2503
2504 @asyncio.coroutine
2505 def register(self):
2506 """ Register all static DTS handlers """
2507 for hdl in self._dts_handlers:
2508 yield from hdl.register()
2509
2510 @asyncio.coroutine
2511 def run(self):
2512 """ Run this VNFM instance """
2513 self._log.debug("Run VNFManager - registering static DTS handlers""")
2514 yield from self.register()
2515
2516 def handle_nsr(self, nsr, action):
2517 if action in [rwdts.QueryAction.CREATE]:
2518 self._nsrs[nsr.id] = nsr
2519 elif action == rwdts.QueryAction.DELETE:
2520 if nsr.id in self._nsrs:
2521 del self._nsrs[nsr.id]
2522
2523 def get_linked_mgmt_network(self, vnfr):
2524 """For the given VNFR get the related mgmt network from the NSD, if
2525 available.
2526 """
2527 vnfd_id = vnfr.vnfd.id
2528 nsr_id = vnfr.nsr_id_ref
2529
2530 # for the given related VNFR, get the corresponding NSR-config
2531 nsr_obj = None
2532 try:
2533 nsr_obj = self._nsrs[nsr_id]
2534 except KeyError:
2535 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2536
2537 # for the related NSD check if a VLD exists such that it's a mgmt
2538 # network
2539 for vld in nsr_obj.nsd.vld:
2540 if vld.mgmt_network:
2541 return vld.name
2542
2543 return None
2544
2545 def get_vnfr(self, vnfr_id):
2546 """ get VNFR by vnfr id """
2547
2548 if vnfr_id not in self._vnfrs:
2549 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2550
2551 return self._vnfrs[vnfr_id]
2552
2553 def create_vnfr(self, vnfr):
2554 """ Create a VNFR instance """
2555 if vnfr.id in self._vnfrs:
2556 msg = "Vnfr id %s already exists" % vnfr.id
2557 self._log.error(msg)
2558 raise VnfRecordError(msg)
2559
2560 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2561 vnfr.id,
2562 vnfr.vnfd.id)
2563
2564 mgmt_network = self.get_linked_mgmt_network(vnfr)
2565
2566 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2567 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2568 mgmt_network=mgmt_network
2569 )
2570
2571 #Update ref count
2572 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2573 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2574 else:
2575 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2576
2577 return self._vnfrs[vnfr.id]
2578
2579 @asyncio.coroutine
2580 def delete_vnfr(self, xact, vnfr):
2581 """ Create a VNFR instance """
2582 if vnfr.vnfr_id in self._vnfrs:
2583 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2584 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2585
2586 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2587 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2588 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2589
2590 del self._vnfrs[vnfr.vnfr_id]
2591
2592 @asyncio.coroutine
2593 def fetch_vnfd(self, vnfd_id):
2594 """ Fetch VNFDs based with the vnfd id"""
2595 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2596 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2597 vnfd = None
2598
2599 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2600
2601 for ent in res_iter:
2602 res = yield from ent
2603 vnfd = res.result
2604
2605 if vnfd is None:
2606 err = "Failed to get Vnfd %s" % vnfd_id
2607 self._log.error(err)
2608 raise VnfRecordError(err)
2609
2610 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2611
2612 return vnfd
2613
2614 def vnfd_in_use(self, vnfd_id):
2615 """ Is this VNFD in use """
2616 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2617 if vnfd_id in self._vnfds_to_vnfr:
2618 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2619 return False
2620
2621 @asyncio.coroutine
2622 def publish_vnfr(self, xact, path, msg):
2623 """ Publish a VNFR """
2624 self._log.debug("publish_vnfr called with path %s, msg %s",
2625 path, msg)
2626 yield from self.vnfr_handler.update(xact, path, msg)
2627
2628 @asyncio.coroutine
2629 def delete_vnfd(self, vnfd_id):
2630 """ Delete the Virtual Network Function descriptor with the passed id """
2631 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2632 if vnfd_id in self._vnfds_to_vnfr:
2633 if self._vnfds_to_vnfr[vnfd_id]:
2634 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2635 vnfd_id,
2636 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2637 raise VirtualNetworkFunctionDescriptorRefCountExists(
2638 "Cannot delete :%s, ref_count:%s",
2639 vnfd_id,
2640 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2641
2642 del self._vnfds_to_vnfr[vnfd_id]
2643
2644 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2645 try:
2646 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2647 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2648 if os.path.exists(vnfd_dir):
2649 shutil.rmtree(vnfd_dir, ignore_errors=True)
2650 except Exception as e:
2651 self._log.error("Exception in cleaning up VNFD {}: {}".
2652 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2653 self._log.exception(e)
2654
2655
2656 def vnfd_refcount_xpath(self, vnfd_id):
2657 """ xpath for ref count entry """
2658 return (VnfdRefCountDtsHandler.XPATH +
2659 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2660
2661 @asyncio.coroutine
2662 def get_vnfd_refcount(self, vnfd_id):
2663 """ Get the vnfd_list from this VNFM"""
2664 vnfd_list = []
2665 if vnfd_id is None or vnfd_id == "":
2666 for vnfd in self._vnfds_to_vnfr.keys():
2667 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2668 vnfd_msg.vnfd_id_ref = vnfd
2669 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2670 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2671 elif vnfd_id in self._vnfds_to_vnfr:
2672 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2673 vnfd_msg.vnfd_id_ref = vnfd_id
2674 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2675 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2676
2677 return vnfd_list
2678
2679
2680 class VnfmTasklet(rift.tasklets.Tasklet):
2681 """ VNF Manager tasklet class """
2682 def __init__(self, *args, **kwargs):
2683 super(VnfmTasklet, self).__init__(*args, **kwargs)
2684 self.rwlog.set_category("rw-mano-log")
2685 self.rwlog.set_subcategory("vnfm")
2686
2687 self._dts = None
2688 self._vnfm = None
2689
2690 def start(self):
2691 try:
2692 super(VnfmTasklet, self).start()
2693 self.log.info("Starting VnfmTasklet")
2694
2695 self.log.setLevel(logging.DEBUG)
2696
2697 self.log.debug("Registering with dts")
2698 self._dts = rift.tasklets.DTS(self.tasklet_info,
2699 RwVnfmYang.get_schema(),
2700 self.loop,
2701 self.on_dts_state_change)
2702
2703 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2704 except Exception:
2705 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2706 raise
2707
2708 def on_instance_started(self):
2709 """ Task insance started callback """
2710 self.log.debug("Got instance started callback")
2711
2712 def stop(self):
2713 try:
2714 self._dts.deinit()
2715 except Exception:
2716 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2717 raise
2718
2719 @asyncio.coroutine
2720 def init(self):
2721 """ Task init callback """
2722 try:
2723 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2724 assert vm_parent_name is not None
2725 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2726 yield from self._vnfm.run()
2727 except Exception:
2728 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2729 raise
2730
2731 @asyncio.coroutine
2732 def run(self):
2733 """ Task run callback """
2734 pass
2735
2736 @asyncio.coroutine
2737 def on_dts_state_change(self, state):
2738 """Take action according to current dts state to transition
2739 application into the corresponding application state
2740
2741 Arguments
2742 state - current dts state
2743 """
2744 switch = {
2745 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2746 rwdts.State.CONFIG: rwdts.State.RUN,
2747 }
2748
2749 handlers = {
2750 rwdts.State.INIT: self.init,
2751 rwdts.State.RUN: self.run,
2752 }
2753
2754 # Transition application to next state
2755 handler = handlers.get(state, None)
2756 if handler is not None:
2757 yield from handler()
2758
2759 # Transition dts to next state
2760 next_state = switch.get(state, None)
2761 if next_state is not None:
2762 self._dts.handle.set_state(next_state)