Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.package.script
53 import rift.mano.dts as mano_dts
54 from rift.mano.utils.project import (
55 ManoProject,
56 ProjectHandler,
57 )
58 import rift.mano.utils.short_name as mano_short_name
59
60
61 class VMResourceError(Exception):
62 """ VM resource Error"""
63 pass
64
65
66 class VnfRecordError(Exception):
67 """ VNF record instatiation failed"""
68 pass
69
70
71 class VduRecordError(Exception):
72 """ VDU record instatiation failed"""
73 pass
74
75
76 class NotImplemented(Exception):
77 """Not implemented """
78 pass
79
80
81 class VnfrRecordExistsError(Exception):
82 """VNFR record already exist with the same VNFR id"""
83 pass
84
85
86 class InternalVirtualLinkRecordError(Exception):
87 """Internal virtual link record error"""
88 pass
89
90
91 class VDUImageNotFound(Exception):
92 """VDU Image not found error"""
93 pass
94
95
96 class VirtualDeploymentUnitRecordError(Exception):
97 """VDU Instantiation failed"""
98 pass
99
100
101 class VMNotReadyError(Exception):
102 """ VM Not yet received from resource manager """
103 pass
104
105
106 class VDURecordNotFound(Exception):
107 """ Could not find a VDU record """
108 pass
109
110
111 class VirtualNetworkFunctionRecordDescNotFound(Exception):
112 """ Cannot find Virtual Network Function Record Descriptor """
113 pass
114
115
116 class VirtualNetworkFunctionDescriptorError(Exception):
117 """ Virtual Network Function Record Descriptor Error """
118 pass
119
120
121 class VirtualNetworkFunctionDescriptorNotFound(Exception):
122 """ Virtual Network Function Record Descriptor Not Found """
123 pass
124
125
126 class VirtualNetworkFunctionRecordNotFound(Exception):
127 """ Virtual Network Function Record Not Found """
128 pass
129
130
131 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
132 """ Virtual Network Funtion Descriptor reference count exists """
133 pass
134
135
136 class VnfrInstantiationFailed(Exception):
137 """ Virtual Network Funtion Instantiation failed"""
138 pass
139
140
141 class VNFMPlacementGroupError(Exception):
142 pass
143
144 class VirtualNetworkFunctionRecordState(enum.Enum):
145 """ VNFR state """
146 INIT = 1
147 VL_INIT_PHASE = 2
148 VM_INIT_PHASE = 3
149 READY = 4
150 TERMINATE = 5
151 VL_TERMINATE_PHASE = 6
152 VDU_TERMINATE_PHASE = 7
153 TERMINATED = 7
154 FAILED = 10
155
156
157 class VDURecordState(enum.Enum):
158 """VDU record state """
159 INIT = 1
160 INSTANTIATING = 2
161 RESOURCE_ALLOC_PENDING = 3
162 READY = 4
163 TERMINATING = 5
164 TERMINATED = 6
165 FAILED = 10
166
167
168 class VcsComponent(object):
169 """ VCS Component within the VNF descriptor """
170 def __init__(self, dts, log, loop, cluster_name,
171 vcs_handler, component, mangled_name):
172 self._dts = dts
173 self._log = log
174 self._loop = loop
175 self._component = component
176 self._cluster_name = cluster_name
177 self._vcs_handler = vcs_handler
178 self._mangled_name = mangled_name
179
180 @staticmethod
181 def mangle_name(component_name, vnf_name, vnfd_id):
182 """ mangled component name """
183 return vnf_name + ":" + component_name + ":" + vnfd_id
184
185 @property
186 def name(self):
187 """ name of this component"""
188 return self._mangled_name
189
190 @property
191 def path(self):
192 """ The path for this object """
193 return ("D,/rw-manifest:manifest" +
194 "/rw-manifest:operational-inventory" +
195 "/rw-manifest:component" +
196 "[rw-manifest:component-name = '{}']").format(self.name)
197
198 @property
199 def instance_xpath(self):
200 """ The path for this object """
201 return("D,/rw-base:vcs" +
202 "/instances" +
203 "/instance" +
204 "[instance-name = '{}']".format(self._cluster_name))
205
206 @property
207 def start_comp_xpath(self):
208 """ start component xpath """
209 return (self.instance_xpath +
210 "/child-n[instance-name = 'START-REQ']")
211
212 def get_start_comp_msg(self, ip_address):
213 """ start this component """
214 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
215 start_msg.instance_name = 'START-REQ'
216 start_msg.component_name = self.name
217 start_msg.admin_command = "START"
218 start_msg.ip_address = ip_address
219
220 return start_msg
221
222 @property
223 def msg(self):
224 """ Returns the message for this vcs component"""
225
226 vcs_comp_dict = self._component.as_dict()
227
228 def mangle_comp_names(comp_dict):
229 """ mangle component name with VNF name, id"""
230 for key, val in comp_dict.items():
231 if isinstance(val, dict):
232 comp_dict[key] = mangle_comp_names(val)
233 elif isinstance(val, list):
234 i = 0
235 for ent in val:
236 if isinstance(ent, dict):
237 val[i] = mangle_comp_names(ent)
238 else:
239 val[i] = ent
240 i += 1
241 elif key == "component_name":
242 comp_dict[key] = VcsComponent.mangle_name(val,
243 self._vnfd_name,
244 self._vnfd_id)
245 return comp_dict
246
247 mangled_dict = mangle_comp_names(vcs_comp_dict)
248 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
249 return msg
250
251 @asyncio.coroutine
252 def publish(self, xact):
253 """ Publishes the VCS component """
254 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
255 self.name, self.path, self.msg)
256 yield from self._vcs_handler.publish(xact, self.path, self.msg)
257
258 @asyncio.coroutine
259 def start(self, xact, parent, ip_addr=None):
260 """ Starts this VCS component """
261 # ATTN RV - replace with block add
262 start_msg = self.get_start_comp_msg(ip_addr)
263 self._log.debug("starting component %s %s",
264 self.start_comp_xpath, start_msg)
265 yield from self._dts.query_create(self.start_comp_xpath,
266 0,
267 start_msg)
268 self._log.debug("started component %s, %s",
269 self.start_comp_xpath, start_msg)
270
271
272 class VirtualDeploymentUnitRecord(object):
273 """ Virtual Deployment Unit Record """
274 def __init__(self,
275 dts,
276 log,
277 loop,
278 project,
279 vdud,
280 vnfr,
281 nsr_config,
282 mgmt_intf,
283 mgmt_network,
284 cloud_account_name,
285 vnfd_package_store,
286 vdur_id=None,
287 placement_groups=[]):
288 self._dts = dts
289 self._log = log
290 self._loop = loop
291 self._project = project
292 self._vdud = vdud
293 self._vnfr = vnfr
294 self._nsr_config = nsr_config
295 self._mgmt_intf = mgmt_intf
296 self._cloud_account_name = cloud_account_name
297 self._vnfd_package_store = vnfd_package_store
298 self._mgmt_network = mgmt_network
299
300 self._vdur_id = vdur_id or str(uuid.uuid4())
301 self._int_intf = []
302 self._ext_intf = []
303 self._state = VDURecordState.INIT
304 self._state_failed_reason = None
305 self._request_id = str(uuid.uuid4())
306 self._name = vnfr.name + "__" + vdud.id
307 self._placement_groups = placement_groups
308 self._rm_regh = None
309 self._vm_resp = None
310 self._vdud_cloud_init = None
311 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(
312 dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
313
314 @asyncio.coroutine
315 def vdu_opdata_register(self):
316 yield from self._vdur_console_handler.register()
317
318 def cp_ip_addr(self, cp_name):
319 """ Find ip address by connection point name """
320 if self._vm_resp is not None:
321 for conn_point in self._vm_resp.connection_points:
322 if conn_point.name == cp_name:
323 return conn_point.ip_address
324 return "0.0.0.0"
325
326 def cp_mac_addr(self, cp_name):
327 """ Find mac address by connection point name """
328 if self._vm_resp is not None:
329 for conn_point in self._vm_resp.connection_points:
330 if conn_point.name == cp_name:
331 return conn_point.mac_addr
332 return "00:00:00:00:00:00"
333
334 def cp_id(self, cp_name):
335 """ Find connection point id by connection point name """
336 if self._vm_resp is not None:
337 for conn_point in self._vm_resp.connection_points:
338 if conn_point.name == cp_name:
339 return conn_point.connection_point_id
340 return ''
341
342 @property
343 def vdu_id(self):
344 return self._vdud.id
345
346 @property
347 def vm_resp(self):
348 return self._vm_resp
349
350 @property
351 def name(self):
352 """ Return this VDUR's name """
353 return self._name
354
355 # Truncated name confirming to RFC 1123
356 @property
357 def unique_short_name(self):
358 """ Return this VDUR's unique short name """
359 # Impose these restrictions on Unique name
360 # Max 64
361 # - Max 10 of NSR name (remove all specialcharacters, only numbers and alphabets)
362 # - 6 chars of shortened name
363 # - Max 10 of VDU name (remove all specialcharacters, only numbers and alphabets)
364 #
365 def _restrict_tag(input_str):
366 # Exclude all characters except a-zA-Z0-9
367 outstr = re.sub('[^a-zA-Z0-9]', '', input_str)
368 # Take max of 10 chars
369 return outstr[-10:]
370
371 # Use NSR name for part1
372 part1 = _restrict_tag(self._nsr_config.name)
373 # Get unique short string (6 chars)
374 part2 = mano_short_name.StringShortner(self._name)
375 # Use VDU ID for part3
376 part3 = _restrict_tag(self._vdud.id)
377 shortstr = part1 + "-" + part2.short_string + "-" + part3
378 return shortstr
379
380 @property
381 def cloud_account_name(self):
382 """ Cloud account this VDU should be created in """
383 return self._cloud_account_name
384
385 @property
386 def image_name(self):
387 """ name that should be used to lookup the image on the CMP """
388 if 'image' not in self._vdud:
389 return None
390 return os.path.basename(self._vdud.image)
391
392 @property
393 def image_checksum(self):
394 """ name that should be used to lookup the image on the CMP """
395 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
396
397 @property
398 def management_ip(self):
399 if not self.active:
400 return None
401 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
402
403 @property
404 def vm_management_ip(self):
405 if not self.active:
406 return None
407 return self._vm_resp.management_ip
408
409 @property
410 def operational_status(self):
411 """ Operational status of this VDU"""
412 op_stats_dict = {"INIT": "init",
413 "INSTANTIATING": "vm_init_phase",
414 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
415 "READY": "running",
416 "FAILED": "failed",
417 "TERMINATING": "terminated",
418 "TERMINATED": "terminated",
419 }
420 return op_stats_dict[self._state.name]
421
422 @property
423 def msg(self):
424 """ Process VDU message from resmgr"""
425 vdu_fields = ["vm_flavor",
426 "guest_epa",
427 "vswitch_epa",
428 "hypervisor_epa",
429 "host_epa",
430 "volumes",
431 ]
432 vdu_copy_dict = {k: v for k, v in
433 self._vdud.as_dict().items() if k in vdu_fields}
434 vdur_dict = {"id": self._vdur_id,
435 "vdu_id_ref": self._vdud.id,
436 "operational_status": self.operational_status,
437 "operational_status_details": self._state_failed_reason,
438 "name": self.name,
439 "unique_short_name": self.unique_short_name
440 }
441
442 if self.vm_resp is not None:
443 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
444 "flavor_id": self.vm_resp.flavor_id
445 })
446 if self._vm_resp.has_field('image_id'):
447 vdur_dict.update({ "image_id": self.vm_resp.image_id })
448
449 if self.management_ip is not None:
450 vdur_dict["management_ip"] = self.management_ip
451
452 if self.vm_management_ip is not None:
453 vdur_dict["vm_management_ip"] = self.vm_management_ip
454
455 vdur_dict.update(vdu_copy_dict)
456
457 if self.vm_resp is not None:
458 if self._vm_resp.has_field('volumes'):
459 for opvolume in self._vm_resp.volumes:
460 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
461 if len(vdurvol_data) == 1:
462 vdurvol_data[0]["volume_id"] = opvolume.volume_id
463 if opvolume.has_field('custom_meta_data'):
464 metadata_list = list()
465 for metadata_item in opvolume.custom_meta_data:
466 metadata_list.append(metadata_item.as_dict())
467 vdurvol_data[0]['custom_meta_data'] = metadata_list
468
469 if self._vm_resp.has_field('supplemental_boot_data'):
470 vdur_dict['supplemental_boot_data'] = dict()
471 if self._vm_resp.supplemental_boot_data.has_field('boot_data_drive'):
472 vdur_dict['supplemental_boot_data']['boot_data_drive'] = self._vm_resp.supplemental_boot_data.boot_data_drive
473 if self._vm_resp.supplemental_boot_data.has_field('custom_meta_data'):
474 metadata_list = list()
475 for metadata_item in self._vm_resp.supplemental_boot_data.custom_meta_data:
476 metadata_list.append(metadata_item.as_dict())
477 vdur_dict['supplemental_boot_data']['custom_meta_data'] = metadata_list
478 if self._vm_resp.supplemental_boot_data.has_field('config_file'):
479 file_list = list()
480 for file_item in self._vm_resp.supplemental_boot_data.config_file:
481 file_list.append(file_item.as_dict())
482 vdur_dict['supplemental_boot_data']['config_file'] = file_list
483
484 icp_list = []
485 ii_list = []
486
487 for intf, cp_id, vlr in self._int_intf:
488 cp = self.find_internal_cp_by_cp_id(cp_id)
489
490 icp_list.append({"name": cp.name,
491 "id": cp.id,
492 "type_yang": "VPORT",
493 "ip_address": self.cp_ip_addr(cp.id),
494 "mac_address": self.cp_mac_addr(cp.id)})
495
496 ii_list.append({"name": intf.name,
497 "vdur_internal_connection_point_ref": cp.id,
498 "virtual_interface": {}})
499
500 vdur_dict["internal_connection_point"] = icp_list
501 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
502 vdur_dict["internal_interface"] = ii_list
503
504 ei_list = []
505 for intf, cp, vlr in self._ext_intf:
506 ei_list.append({"name": cp.name,
507 "vnfd_connection_point_ref": cp.name,
508 "virtual_interface": {}})
509 self._vnfr.update_cp(cp.name,
510 self.cp_ip_addr(cp.name),
511 self.cp_mac_addr(cp.name),
512 self.cp_id(cp.name))
513
514 vdur_dict["external_interface"] = ei_list
515
516 placement_groups = []
517 for group in self._placement_groups:
518 placement_groups.append(group.as_dict())
519 vdur_dict['placement_groups_info'] = placement_groups
520
521 return RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
522
523 @property
524 def resmgr_path(self):
525 """ path for resource-mgr"""
526 xpath = self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
527 "/vdu-event" +
528 "/vdu-event-data[event-id='{}']".format(self._request_id))
529 return xpath
530
531 @property
532 def vm_flavor_msg(self):
533 """ VM flavor message """
534 flavor = self._vdud.vm_flavor.__class__()
535 flavor.copy_from(self._vdud.vm_flavor)
536
537 return flavor
538
539 @property
540 def vdud_cloud_init(self):
541 """ Return the cloud-init contents for the VDU """
542 if self._vdud_cloud_init is None:
543 self._vdud_cloud_init = self.cloud_init()
544
545 return self._vdud_cloud_init
546
547 def cloud_init(self):
548 """ Populate cloud_init with cloud-config script from
549 either the inline contents or from the file provided
550 """
551 if self._vdud.cloud_init is not None:
552 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
553 return self._vdud.cloud_init
554 elif self._vdud.cloud_init_file is not None:
555 # Get cloud-init script contents from the file provided in the cloud_init_file param
556 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
557 filename = self._vdud.cloud_init_file
558 self._vnfd_package_store.refresh()
559 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
560 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
561 try:
562 return cloud_init_extractor.read_script(stored_package, filename)
563 except rift.package.cloud_init.CloudInitExtractionError as e:
564 self.instantiation_failed(str(e))
565 raise VirtualDeploymentUnitRecordError(e)
566 else:
567 self._log.debug("VDU Instantiation: cloud-init script not provided")
568
569 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
570 host_aggregates = []
571 availability_zones = []
572 server_groups = []
573 for group in self._placement_groups:
574 if group.has_field('host_aggregate'):
575 for aggregate in group.host_aggregate:
576 host_aggregates.append(aggregate.as_dict())
577 if group.has_field('availability_zone'):
578 availability_zones.append(group.availability_zone.as_dict())
579 if group.has_field('server_group'):
580 server_groups.append(group.server_group.as_dict())
581
582 if availability_zones:
583 if len(availability_zones) > 1:
584 self._log.error("Can not launch VDU: %s in multiple availability zones. " +
585 "Requested Zones: %s", self.name, availability_zones)
586 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability" +
587 " zones. Requsted Zones".format(self.name, availability_zones))
588 else:
589 vm_create_msg_dict['availability_zone'] = availability_zones[0]
590
591 if server_groups:
592 if len(server_groups) > 1:
593 self._log.error("Can not launch VDU: %s in multiple Server Group. " +
594 "Requested Groups: %s", self.name, server_groups)
595 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple " +
596 "Server Groups. Requsted Groups".format(self.name, server_groups))
597 else:
598 vm_create_msg_dict['server_group'] = server_groups[0]
599
600 if host_aggregates:
601 vm_create_msg_dict['host_aggregate'] = host_aggregates
602
603 return
604
605 def process_placement_groups(self, vm_create_msg_dict):
606 """Process the placement_groups and fill resource-mgr request"""
607 if not self._placement_groups:
608 return
609
610 cloud_set = set([group.cloud_type for group in self._placement_groups])
611 assert len(cloud_set) == 1
612 cloud_type = cloud_set.pop()
613
614 if cloud_type == 'openstack':
615 self.process_openstack_placement_group_construct(vm_create_msg_dict)
616
617 else:
618 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
619 return
620
621 def process_custom_bootdata(self, vm_create_msg_dict):
622 """Process the custom boot data"""
623 if 'config_file' not in vm_create_msg_dict['supplemental_boot_data']:
624 return
625
626 self._vnfd_package_store.refresh()
627 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
628 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
629 for file_item in vm_create_msg_dict['supplemental_boot_data']['config_file']:
630 if 'source' not in file_item or 'dest' not in file_item:
631 continue
632 source = file_item['source']
633 # Find source file in scripts dir of VNFD
634 self._log.debug("Checking for source config file at %s", source)
635 try:
636 source_file_str = cloud_init_extractor.read_script(stored_package, source)
637 except rift.package.cloud_init.CloudInitExtractionError as e:
638 raise VirtualDeploymentUnitRecordError(e)
639 # Update source file location with file contents
640 file_item['source'] = source_file_str
641
642 return
643
644 def resmgr_msg(self, config=None):
645 vdu_fields = ["vm_flavor",
646 "guest_epa",
647 "vswitch_epa",
648 "hypervisor_epa",
649 "host_epa",
650 "volumes",
651 "supplemental_boot_data"]
652
653 self._log.debug("Creating params based on VDUD: %s", self._vdud)
654 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
655
656 vm_create_msg_dict = {
657 "name": self.unique_short_name, # Truncated name confirming to RFC 1123
658 "node_id": self.name, # Rift assigned Id
659 }
660
661 if self.image_name is not None:
662 vm_create_msg_dict["image_name"] = self.image_name
663
664 if self.image_checksum is not None:
665 vm_create_msg_dict["image_checksum"] = self.image_checksum
666
667 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
668 if self._vdud.has_field('mgmt_vpci'):
669 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
670
671 self._log.debug("VDUD: %s", self._vdud)
672 if config is not None:
673 vm_create_msg_dict['vdu_init'] = {'userdata': config}
674
675 if self._mgmt_network:
676 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
677
678 cp_list = []
679 for intf, cp, vlr in self._ext_intf:
680 cp_info = { "name": cp.name,
681 "virtual_link_id": vlr.network_id,
682 "type_yang": intf.virtual_interface.type_yang }
683
684 if cp.has_field('port_security_enabled'):
685 cp_info["port_security_enabled"] = cp.port_security_enabled
686
687 if (intf.virtual_interface.has_field('vpci') and
688 intf.virtual_interface.vpci is not None):
689 cp_info["vpci"] = intf.virtual_interface.vpci
690
691 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
692 cp_info['security_group'] = vlr.ip_profile_params.security_group
693
694 cp_list.append(cp_info)
695
696 for intf, cp, vlr in self._int_intf:
697 if (intf.virtual_interface.has_field('vpci') and
698 intf.virtual_interface.vpci is not None):
699 cp_list.append({"name": cp,
700 "virtual_link_id": vlr.network_id,
701 "type_yang": intf.virtual_interface.type_yang,
702 "vpci": intf.virtual_interface.vpci})
703 else:
704 if cp.has_field('port_security_enabled'):
705 cp_list.append({"name": cp,
706 "virtual_link_id": vlr.network_id,
707 "type_yang": intf.virtual_interface.type_yang,
708 "port_security_enabled": cp.port_security_enabled})
709 else:
710 cp_list.append({"name": cp,
711 "virtual_link_id": vlr.network_id,
712 "type_yang": intf.virtual_interface.type_yang})
713
714
715 vm_create_msg_dict["connection_points"] = cp_list
716 vm_create_msg_dict.update(vdu_copy_dict)
717
718 self.process_placement_groups(vm_create_msg_dict)
719 if 'supplemental_boot_data' in vm_create_msg_dict:
720 self.process_custom_bootdata(vm_create_msg_dict)
721
722 msg = RwResourceMgrYang.VDUEventData()
723 msg.event_id = self._request_id
724 msg.cloud_account = self.cloud_account_name
725 msg.request_info.from_dict(vm_create_msg_dict)
726
727 return msg
728
729 @asyncio.coroutine
730 def terminate(self, xact):
731 """ Delete resource in VIM """
732 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
733 self._log.warning("VDU terminate in not ready state - Ignoring request")
734 return
735
736 self._state = VDURecordState.TERMINATING
737 if self._vm_resp is not None:
738 try:
739 with self._dts.transaction() as new_xact:
740 yield from self.delete_resource(new_xact)
741 except Exception:
742 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
743
744 if self._rm_regh is not None:
745 self._log.debug("Deregistering resource manager registration handle")
746 self._rm_regh.deregister()
747 self._rm_regh = None
748
749 if self._vdur_console_handler is not None:
750 self._log.debug("Deregistering vnfr vdur registration handle")
751 self._vdur_console_handler._regh.deregister()
752 self._vdur_console_handler._regh = None
753
754 self._state = VDURecordState.TERMINATED
755
756 def find_internal_cp_by_cp_id(self, cp_id):
757 """ Find the CP corresponding to the connection point id"""
758 cp = None
759
760 self._log.debug("find_internal_cp_by_cp_id(%s) called",
761 cp_id)
762
763 for int_cp in self._vdud.internal_connection_point:
764 self._log.debug("Checking for int cp %s in internal connection points",
765 int_cp.id)
766 if int_cp.id == cp_id:
767 cp = int_cp
768 break
769
770 if cp is None:
771 self._log.debug("Failed to find cp %s in internal connection points",
772 cp_id)
773 msg = "Failed to find cp %s in internal connection points" % cp_id
774 raise VduRecordError(msg)
775
776 # return the VLR associated with the connection point
777 return cp
778
779 @asyncio.coroutine
780 def create_resource(self, xact, vnfr, config=None):
781 """ Request resource from ResourceMgr """
782 def find_cp_by_name(cp_name):
783 """ Find a connection point by name """
784 cp = None
785 self._log.debug("find_cp_by_name(%s) called", cp_name)
786 for ext_cp in vnfr._cprs:
787 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
788 if ext_cp.name == cp_name:
789 cp = ext_cp
790 break
791 if cp is None:
792 self._log.debug("Failed to find cp %s in external connection points",
793 cp_name)
794 return cp
795
796 def find_internal_vlr_by_cp_name(cp_name):
797 """ Find the VLR corresponding to the connection point name"""
798 cp = None
799
800 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
801 cp_name)
802
803 for int_cp in self._vdud.internal_connection_point:
804 self._log.debug("Checking for int cp %s in internal connection points",
805 int_cp.id)
806 if int_cp.id == cp_name:
807 cp = int_cp
808 break
809
810 if cp is None:
811 self._log.debug("Failed to find cp %s in internal connection points",
812 cp_name)
813 msg = "Failed to find cp %s in internal connection points" % cp_name
814 raise VduRecordError(msg)
815
816 # return the VLR associated with the connection point
817 return vnfr.find_vlr_by_cp(cp_name)
818
819 block = xact.block_create()
820
821 self._log.debug("Executing vm request id: %s, action: create",
822 self._request_id)
823
824 # Resolve the networks associated external interfaces
825 for ext_intf in self._vdud.external_interface:
826 self._log.debug("Resolving external interface name [%s], cp[%s]",
827 ext_intf.name, ext_intf.vnfd_connection_point_ref)
828 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
829 if cp is None:
830 self._log.debug("Failed to find connection point - %s",
831 ext_intf.vnfd_connection_point_ref)
832 continue
833 self._log.debug("Connection point name [%s], type[%s]",
834 cp.name, cp.type_yang)
835
836 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
837
838 etuple = (ext_intf, cp, vlr)
839 self._ext_intf.append(etuple)
840
841 self._log.debug("Created external interface tuple : %s", etuple)
842
843 # Resolve the networks associated internal interfaces
844 for intf in self._vdud.internal_interface:
845 cp_id = intf.vdu_internal_connection_point_ref
846 self._log.debug("Resolving internal interface name [%s], cp[%s]",
847 intf.name, cp_id)
848
849 try:
850 vlr = find_internal_vlr_by_cp_name(cp_id)
851 except Exception as e:
852 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
853 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
854 raise VduRecordError(msg)
855
856 ituple = (intf, cp_id, vlr)
857 self._int_intf.append(ituple)
858
859 self._log.debug("Created internal interface tuple : %s", ituple)
860
861 resmgr_path = self.resmgr_path
862 resmgr_msg = self.resmgr_msg(config)
863
864 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
865 block.add_query_create(resmgr_path, resmgr_msg)
866
867 res_iter = yield from block.execute(now=True)
868
869 resp = None
870
871 for i in res_iter:
872 r = yield from i
873 resp = r.result
874
875 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
876 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
877 self._log.debug("Got vm request response: %s", resp.resource_info)
878 return resp.resource_info
879
880 @asyncio.coroutine
881 def delete_resource(self, xact):
882 block = xact.block_create()
883
884 self._log.debug("Executing vm request id: %s, action: delete",
885 self._request_id)
886
887 block.add_query_delete(self.resmgr_path)
888
889 yield from block.execute(flags=0, now=True)
890
891 @asyncio.coroutine
892 def read_resource(self, xact):
893 block = xact.block_create()
894
895 self._log.debug("Executing vm request id: %s, action: delete",
896 self._request_id)
897
898 block.add_query_read(self.resmgr_path)
899
900 res_iter = yield from block.execute(flags=0, now=True)
901 for i in res_iter:
902 r = yield from i
903 resp = r.result
904
905 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
906 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
907 self._log.debug("Got vm request response: %s", resp.resource_info)
908 #self._vm_resp = resp.resource_info
909 return resp.resource_info
910
911
912 @asyncio.coroutine
913 def start_component(self):
914 """ This VDUR is active """
915 self._log.debug("Starting component %s for vdud %s vdur %s",
916 self._vdud.vcs_component_ref,
917 self._vdud,
918 self._vdur_id)
919 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
920 self.vm_resp.management_ip)
921
922 @property
923 def active(self):
924 """ Is this VDU active """
925 return True if self._state is VDURecordState.READY else False
926
927 @asyncio.coroutine
928 def instantiation_failed(self, failed_reason=None):
929 """ VDU instantiation failed """
930 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
931 self._state = VDURecordState.FAILED
932 self._state_failed_reason = failed_reason
933 yield from self._vnfr.instantiation_failed(failed_reason)
934
935 @asyncio.coroutine
936 def vdu_is_active(self):
937 """ This VDU is active"""
938 if self.active:
939 self._log.warning("VDU %s was already marked as active", self._vdur_id)
940 return
941
942 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
943
944 if self._vdud.vcs_component_ref is not None:
945 yield from self.start_component()
946
947 self._state = VDURecordState.READY
948
949 if self._vnfr.all_vdus_active():
950 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
951 yield from self._vnfr.is_ready()
952
953 @asyncio.coroutine
954 def instantiate(self, xact, vnfr, config=None):
955 """ Instantiate this VDU """
956 self._state = VDURecordState.INSTANTIATING
957
958 @asyncio.coroutine
959 def on_prepare(xact_info, query_action, ks_path, msg):
960 """ This VDUR is active """
961 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
962 query_action,
963 ks_path,
964 msg)
965
966 if (query_action == rwdts.QueryAction.UPDATE or
967 query_action == rwdts.QueryAction.CREATE):
968 self._vm_resp = msg
969
970 if msg.resource_state == "active":
971 # Move this VDU to ready state
972 yield from self.vdu_is_active()
973 elif msg.resource_state == "failed":
974 yield from self.instantiation_failed(msg.resource_errors)
975 elif query_action == rwdts.QueryAction.DELETE:
976 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
977 else:
978 raise NotImplementedError(
979 "%s action on VirtualDeployementUnitRecord not supported",
980 query_action)
981
982 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
983
984 try:
985 reg_event = asyncio.Event(loop=self._loop)
986
987 @asyncio.coroutine
988 def on_ready(regh, status):
989 reg_event.set()
990
991 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
992 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
993 flags=rwdts.Flag.SUBSCRIBER,
994 handler=handler)
995 yield from reg_event.wait()
996
997 vm_resp = yield from self.create_resource(xact, vnfr, config)
998 self._vm_resp = vm_resp
999 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
1000
1001 self._log.debug("Requested VM from resource manager response %s",
1002 vm_resp)
1003 if vm_resp.resource_state == "active":
1004 self._log.debug("Resourcemgr responded wih an active vm resp %s",
1005 vm_resp)
1006 yield from self.vdu_is_active()
1007 self._state = VDURecordState.READY
1008 elif (vm_resp.resource_state == "pending" or
1009 vm_resp.resource_state == "inactive"):
1010 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
1011 vm_resp)
1012 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1013 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
1014 # flags=rwdts.Flag.SUBSCRIBER,
1015 # handler=handler)
1016 else:
1017 self._log.debug("Resourcemgr responded wih an error vm resp %s",
1018 vm_resp)
1019 raise VirtualDeploymentUnitRecordError(
1020 "Failed VDUR instantiation %s " % vm_resp)
1021
1022 except Exception as e:
1023 import traceback
1024 traceback.print_exc()
1025 self._log.exception(e)
1026 self._log.error("Instantiation of VDU record failed: %s", str(e))
1027 self._state = VDURecordState.FAILED
1028 yield from self.instantiation_failed(str(e))
1029
1030
1031 class VlRecordState(enum.Enum):
1032 """ VL Record State """
1033 INIT = 101
1034 INSTANTIATION_PENDING = 102
1035 ACTIVE = 103
1036 TERMINATE_PENDING = 104
1037 TERMINATED = 105
1038 FAILED = 106
1039
1040
1041 class InternalVirtualLinkRecord(object):
1042 """ Internal Virtual Link record """
1043 def __init__(self, dts, log, loop, project,
1044 ivld_msg, vnfr_name, cloud_account_name, ip_profile=None):
1045 self._dts = dts
1046 self._log = log
1047 self._loop = loop
1048 self._project = project
1049 self._ivld_msg = ivld_msg
1050 self._vnfr_name = vnfr_name
1051 self._cloud_account_name = cloud_account_name
1052 self._ip_profile = ip_profile
1053
1054 self._vlr_req = self.create_vlr()
1055 self._vlr = None
1056 self._state = VlRecordState.INIT
1057
1058 @property
1059 def vlr_id(self):
1060 """ Find VLR by id """
1061 return self._vlr_req.id
1062
1063 @property
1064 def name(self):
1065 """ Name of this VL """
1066 if self._ivld_msg.vim_network_name:
1067 return self._ivld_msg.vim_network_name
1068 else:
1069 return self._vnfr_name + "." + self._ivld_msg.name
1070
1071 @property
1072 def network_id(self):
1073 """ Find VLR by id """
1074 return self._vlr.network_id if self._vlr else None
1075
1076 def vlr_path(self):
1077 """ VLR path for this VLR instance"""
1078 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".
1079 format(self.vlr_id))
1080
1081 def create_vlr(self):
1082 """ Create the VLR record which will be instantiated """
1083
1084 vld_fields = ["short_name",
1085 "vendor",
1086 "description",
1087 "version",
1088 "type_yang",
1089 "vim_network_name",
1090 "provider_network"]
1091
1092 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
1093
1094 vlr_dict = {"id": str(uuid.uuid4()),
1095 "name": self.name,
1096 "cloud_account": self._cloud_account_name,
1097 }
1098
1099 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
1100 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
1101
1102 vlr_dict.update(vld_copy_dict)
1103
1104 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
1105 return vlr
1106
1107 @asyncio.coroutine
1108 def instantiate(self, xact, restart_mode=False):
1109 """ Instantiate VL """
1110
1111 @asyncio.coroutine
1112 def instantiate_vlr():
1113 """ Instantiate VLR"""
1114 self._log.debug("Create VL with xpath %s and vlr %s",
1115 self.vlr_path(), self._vlr_req)
1116
1117 with self._dts.transaction(flags=0) as xact:
1118 block = xact.block_create()
1119 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1120 self._log.debug("Executing VL create path:%s msg:%s",
1121 self.vlr_path(), self._vlr_req)
1122
1123 res_iter = None
1124 try:
1125 res_iter = yield from block.execute()
1126 except Exception:
1127 self._state = VlRecordState.FAILED
1128 self._log.exception("Caught exception while instantial VL")
1129 raise
1130
1131 for ent in res_iter:
1132 res = yield from ent
1133 self._vlr = res.result
1134
1135 if self._vlr.operational_status == 'failed':
1136 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1137 self._state = VlRecordState.FAILED
1138 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1139
1140 self._log.info("Created VL with xpath %s and vlr %s",
1141 self.vlr_path(), self._vlr)
1142
1143 @asyncio.coroutine
1144 def get_vlr():
1145 """ Get the network id """
1146 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1147 vlr = None
1148 for ent in res_iter:
1149 res = yield from ent
1150 vlr = res.result
1151
1152 if vlr is None:
1153 err = "Failed to get VLR for path %s" % self.vlr_path()
1154 self._log.warn(err)
1155 raise InternalVirtualLinkRecordError(err)
1156 return vlr
1157
1158 self._state = VlRecordState.INSTANTIATION_PENDING
1159
1160 if restart_mode:
1161 vl = yield from get_vlr()
1162 if vl is None:
1163 yield from instantiate_vlr()
1164 else:
1165 yield from instantiate_vlr()
1166
1167 self._state = VlRecordState.ACTIVE
1168
1169 def vlr_in_vns(self):
1170 """ Is there a VLR record in VNS """
1171 if (self._state == VlRecordState.ACTIVE or
1172 self._state == VlRecordState.INSTANTIATION_PENDING or
1173 self._state == VlRecordState.FAILED):
1174 return True
1175
1176 return False
1177
1178 @asyncio.coroutine
1179 def terminate(self, xact):
1180 """Terminate this VL """
1181 if not self.vlr_in_vns():
1182 self._log.debug("Ignoring terminate request for id %s in state %s",
1183 self.vlr_id, self._state)
1184 return
1185
1186 self._log.debug("Terminating VL with path %s", self.vlr_path())
1187 self._state = VlRecordState.TERMINATE_PENDING
1188 block = xact.block_create()
1189 block.add_query_delete(self.vlr_path())
1190 yield from block.execute(flags=0, now=True)
1191 self._state = VlRecordState.TERMINATED
1192 self._log.debug("Terminated VL with path %s", self.vlr_path())
1193
1194
1195 class VirtualNetworkFunctionRecord(object):
1196 """ Virtual Network Function Record """
1197 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1198 self._dts = dts
1199 self._log = log
1200 self._loop = loop
1201 self._project = vnfm._project
1202 self._cluster_name = cluster_name
1203 self._vnfr_msg = vnfr_msg
1204 self._vnfr_id = vnfr_msg.id
1205 self._vnfd_id = vnfr_msg.vnfd.id
1206 self._vnfm = vnfm
1207 self._vcs_handler = vcs_handler
1208 self._vnfr = vnfr_msg
1209 self._mgmt_network = mgmt_network
1210
1211 self._vnfd = vnfr_msg.vnfd
1212 self._state = VirtualNetworkFunctionRecordState.INIT
1213 self._state_failed_reason = None
1214 self._ext_vlrs = {} # The list of external virtual links
1215 self._vlrs = [] # The list of internal virtual links
1216 self._vdus = [] # The list of vdu
1217 self._vlr_by_cp = {}
1218 self._cprs = []
1219 self._inventory = {}
1220 self._create_time = int(time.time())
1221 self._vnf_mon = None
1222 self._config_status = vnfr_msg.config_status
1223 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1224 self._rw_vnfd = None
1225 self._vnfd_ref_count = 0
1226
1227 def _get_vdur_from_vdu_id(self, vdu_id):
1228 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1229 self._log.debug("Searching through vdus: %s", self._vdus)
1230 for vdu in self._vdus:
1231 self._log.debug("vdu_id: %s", vdu.vdu_id)
1232 if vdu.vdu_id == vdu_id:
1233 return vdu
1234
1235 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1236
1237 @property
1238 def operational_status(self):
1239 """ Operational status of this VNFR """
1240 op_status_map = {"INIT": "init",
1241 "VL_INIT_PHASE": "vl_init_phase",
1242 "VM_INIT_PHASE": "vm_init_phase",
1243 "READY": "running",
1244 "TERMINATE": "terminate",
1245 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1246 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1247 "TERMINATED": "terminated",
1248 "FAILED": "failed", }
1249 return op_status_map[self._state.name]
1250
1251 @staticmethod
1252 def vnfd_xpath(vnfd_id):
1253 """ VNFD xpath associated with this VNFR """
1254 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd[project-vnfd:id = '{}']".
1255 format(vnfd_id))
1256
1257 @property
1258 def vnfd_ref_count(self):
1259 """ Returns the VNFD reference count associated with this VNFR """
1260 return self._vnfd_ref_count
1261
1262 def vnfd_in_use(self):
1263 """ Returns whether vnfd is in use or not """
1264 return True if self._vnfd_ref_count > 0 else False
1265
1266 def vnfd_ref(self):
1267 """ Take a reference on this object """
1268 self._vnfd_ref_count += 1
1269 return self._vnfd_ref_count
1270
1271 def vnfd_unref(self):
1272 """ Release reference on this object """
1273 if self._vnfd_ref_count < 1:
1274 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1275 (self.vnfd.id, self._vnfd_ref_count))
1276 self._log.critical(msg)
1277 raise VnfRecordError(msg)
1278 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1279 self.vnfd.id, self._vnfd_ref_count)
1280 self._vnfd_ref_count -= 1
1281 return self._vnfd_ref_count
1282
1283 @property
1284 def vnfd(self):
1285 """ VNFD for this VNFR """
1286 return self._vnfd
1287
1288 @property
1289 def vnf_name(self):
1290 """ VNFD name associated with this VNFR """
1291 return self.vnfd.name
1292
1293 @property
1294 def name(self):
1295 """ Name of this VNF in the record """
1296 return self._vnfr.name
1297
1298 @property
1299 def cloud_account_name(self):
1300 """ Name of the cloud account this VNFR is instantiated in """
1301 return self._vnfr.cloud_account
1302
1303 @property
1304 def vnfd_id(self):
1305 """ VNFD Id associated with this VNFR """
1306 return self.vnfd.id
1307
1308 @property
1309 def vnfr_id(self):
1310 """ VNFR Id associated with this VNFR """
1311 return self._vnfr_id
1312
1313 @property
1314 def member_vnf_index(self):
1315 """ Member VNF index associated with this VNFR """
1316 return self._vnfr.member_vnf_index_ref
1317
1318 @property
1319 def config_status(self):
1320 """ Config agent status for this VNFR """
1321 return self._config_status
1322
1323 def component_by_name(self, component_name):
1324 """ Find a component by name in the inventory list"""
1325 mangled_name = VcsComponent.mangle_name(component_name,
1326 self.vnf_name,
1327 self.vnfd_id)
1328 return self._inventory[mangled_name]
1329
1330
1331
1332 @asyncio.coroutine
1333 def get_nsr_config(self):
1334 ### Need access to NS instance configuration for runtime resolution.
1335 ### This shall be replaced when deployment flavors are implemented
1336 xpath = self._project.add_project("C,/nsr:ns-instance-config")
1337 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1338
1339 for result in results:
1340 entry = yield from result
1341 ns_instance_config = entry.result
1342 for nsr in ns_instance_config.nsr:
1343 if nsr.id == self._vnfr_msg.nsr_id_ref:
1344 return nsr
1345 return None
1346
1347 @asyncio.coroutine
1348 def start_component(self, component_name, ip_addr):
1349 """ Start a component in the VNFR by name """
1350 comp = self.component_by_name(component_name)
1351 yield from comp.start(None, None, ip_addr)
1352
1353 def cp_ip_addr(self, cp_name):
1354 """ Get ip address for connection point """
1355 self._log.debug("cp_ip_addr()")
1356 for cp in self._cprs:
1357 if cp.name == cp_name and cp.ip_address is not None:
1358 return cp.ip_address
1359 return "0.0.0.0"
1360
1361 def mgmt_intf_info(self):
1362 """ Get Management interface info for this VNFR """
1363 mgmt_intf_desc = self.vnfd.mgmt_interface
1364 ip_addr = None
1365 if mgmt_intf_desc.has_field("cp"):
1366 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1367 elif mgmt_intf_desc.has_field("vdu_id"):
1368 try:
1369 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1370 ip_addr = vdur.management_ip
1371 except VDURecordNotFound:
1372 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1373 ip_addr = None
1374 else:
1375 ip_addr = mgmt_intf_desc.ip_address
1376 port = mgmt_intf_desc.port
1377
1378 return ip_addr, port
1379
1380 @property
1381 def msg(self):
1382 """ Message associated with this VNFR """
1383 vnfd_fields = ["short_name", "vendor", "description", "version"]
1384 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1385
1386 mgmt_intf = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MgmtInterface()
1387 ip_address, port = self.mgmt_intf_info()
1388
1389 if ip_address is not None:
1390 mgmt_intf.ip_address = ip_address
1391 if port is not None:
1392 mgmt_intf.port = port
1393
1394 vnfr_dict = {"id": self._vnfr_id,
1395 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1396 "name": self.name,
1397 "member_vnf_index_ref": self.member_vnf_index,
1398 "operational_status": self.operational_status,
1399 "operational_status_details": self._state_failed_reason,
1400 "cloud_account": self.cloud_account_name,
1401 "config_status": self._config_status
1402 }
1403
1404 vnfr_dict.update(vnfd_copy_dict)
1405
1406 vnfr_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1407 vnfr_msg.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1408
1409 vnfr_msg.create_time = self._create_time
1410 vnfr_msg.uptime = int(time.time()) - self._create_time
1411 vnfr_msg.mgmt_interface = mgmt_intf
1412
1413 # Add all the VLRs to VNFR
1414 for vlr in self._vlrs:
1415 ivlr = vnfr_msg.internal_vlr.add()
1416 ivlr.vlr_ref = vlr.vlr_id
1417
1418 # Add all the VDURs to VDUR
1419 if self._vdus is not None:
1420 for vdu in self._vdus:
1421 vdur = vnfr_msg.vdur.add()
1422 vdur.from_dict(vdu.msg.as_dict())
1423
1424 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1425 vnfr_msg.dashboard_url = self.dashboard_url
1426
1427 for cpr in self._cprs:
1428 new_cp = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1429 vnfr_msg.connection_point.append(new_cp)
1430
1431 if self._vnf_mon is not None:
1432 for monp in self._vnf_mon.msg:
1433 vnfr_msg.monitoring_param.append(
1434 VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1435
1436 if self._vnfr.vnf_configuration is not None:
1437 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1438 if (ip_address is not None and
1439 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1440 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1441
1442 for group in self._vnfr_msg.placement_groups_info:
1443 group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1444 group_info.from_dict(group.as_dict())
1445 vnfr_msg.placement_groups_info.append(group_info)
1446
1447 return vnfr_msg
1448
1449 @property
1450 def dashboard_url(self):
1451 ip, cfg_port = self.mgmt_intf_info()
1452 protocol = 'http'
1453 http_port = 80
1454 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1455 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1456 protocol = 'https'
1457 http_port = 443
1458 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1459 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1460
1461 url = "{protocol}://{ip_address}:{port}/{path}".format(
1462 protocol=protocol,
1463 ip_address=ip,
1464 port=http_port,
1465 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1466 )
1467
1468 return url
1469
1470 @property
1471 def xpath(self):
1472 """ path for this VNFR """
1473 return self._project.add_project("D,/vnfr:vnfr-catalog"
1474 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1475
1476 @asyncio.coroutine
1477 def publish(self, xact):
1478 """ publish this VNFR """
1479 vnfr = self.msg
1480 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1481 self.xpath, self.msg)
1482 vnfr.create_time = self._create_time
1483 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1484 self._log.debug("Published VNFR path = [%s], record = [%s]",
1485 self.xpath, self.msg)
1486
1487 def resolve_vld_ip_profile(self, vnfd_msg, vld):
1488 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1489 if not vld.has_field('ip_profile_ref'):
1490 return None
1491 profile = [profile for profile in vnfd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1492 return profile[0] if profile else None
1493
1494 @asyncio.coroutine
1495 def create_vls(self):
1496 """ Publish The VLs associated with this VNF """
1497 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1498 self.vnfd_id)
1499 for ivld_msg in self.vnfd.internal_vld:
1500 self._log.debug("Creating internal vld:"
1501 " %s, int_cp_ref = %s",
1502 ivld_msg, ivld_msg.internal_connection_point
1503 )
1504 vlr = InternalVirtualLinkRecord(dts=self._dts,
1505 log=self._log,
1506 loop=self._loop,
1507 ivld_msg=ivld_msg,
1508 vnfr_name=self.name,
1509 cloud_account_name=self.cloud_account_name,
1510 ip_profile=self.resolve_vld_ip_profile(self.vnfd, ivld_msg)
1511 )
1512 self._vlrs.append(vlr)
1513
1514 for int_cp in ivld_msg.internal_connection_point:
1515 if int_cp.id_ref in self._vlr_by_cp:
1516 msg = ("Connection point %s already "
1517 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1518 raise InternalVirtualLinkRecordError(msg)
1519 self._log.debug("Setting vlr %s to internal cp = %s",
1520 vlr, int_cp.id_ref)
1521 self._vlr_by_cp[int_cp.id_ref] = vlr
1522
1523 @asyncio.coroutine
1524 def instantiate_vls(self, xact, restart_mode=False):
1525 """ Instantiate the VLs associated with this VNF """
1526 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1527 self.vnfd_id)
1528
1529 for vlr in self._vlrs:
1530 self._log.debug("Instantiating VLR %s", vlr)
1531 yield from vlr.instantiate(xact, restart_mode)
1532
1533 def find_vlr_by_cp(self, cp_name):
1534 """ Find the VLR associated with the cp name """
1535 return self._vlr_by_cp[cp_name]
1536
1537 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1538 """
1539 Returns the cloud specific construct for placement group
1540 Arguments:
1541 input_group: VNFD PlacementGroup
1542 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1543 """
1544 copy_dict = ['name', 'requirement', 'strategy']
1545 for group_info in nsr_config.vnfd_placement_group_maps:
1546 if group_info.placement_group_ref == input_group.name and \
1547 group_info.vnfd_id_ref == self.vnfd_id:
1548 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1549 group_dict = {k:v for k,v in
1550 group_info.as_dict().items()
1551 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1552 for param in copy_dict:
1553 group_dict.update({param: getattr(input_group, param)})
1554 group.from_dict(group_dict)
1555 return group
1556 return None
1557
1558 @asyncio.coroutine
1559 def get_vdu_placement_groups(self, vdu, nsr_config):
1560 placement_groups = []
1561 ### Step-1: Get VNF level placement groups
1562 for group in self._vnfr_msg.placement_groups_info:
1563 #group_info = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1564 #group_info.from_dict(group.as_dict())
1565 placement_groups.append(group)
1566
1567 ### Step-2: Get VDU level placement groups
1568 for group in self.vnfd.placement_groups:
1569 for member_vdu in group.member_vdus:
1570 if member_vdu.member_vdu_ref == vdu.id:
1571 group_info = self.resolve_placement_group_cloud_construct(group,
1572 nsr_config)
1573 if group_info is None:
1574 self._log.info("Could not resolve cloud-construct for " +
1575 "placement group: %s", group.name)
1576 else:
1577 self._log.info("Successfully resolved cloud construct for " +
1578 "placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1579 str(group_info),
1580 vdu.name,
1581 self.vnf_name,
1582 self.member_vnf_index)
1583 placement_groups.append(group_info)
1584
1585 return placement_groups
1586
1587 @asyncio.coroutine
1588 def vdu_cloud_init_instantiation(self):
1589 [vdu.vdud_cloud_init for vdu in self._vdus]
1590
1591 @asyncio.coroutine
1592 def create_vdus(self, vnfr, restart_mode=False):
1593 """ Create the VDUs associated with this VNF """
1594
1595 def get_vdur_id(vdud):
1596 """Get the corresponding VDUR's id for the VDUD. This is useful in
1597 case of a restart.
1598
1599 In restart mode we check for exiting VDUR's ID and use them, if
1600 available. This way we don't end up creating duplicate VDURs
1601 """
1602 vdur_id = None
1603
1604 if restart_mode and vdud is not None:
1605 try:
1606 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1607 vdur_id = vdur[0]
1608 except IndexError:
1609 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1610
1611 return vdur_id
1612
1613
1614 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1615
1616 # Get NSR config - Needed for placement groups and to derive VDU short-name
1617 nsr_config = yield from self.get_nsr_config()
1618
1619 for vdu in self._rw_vnfd.vdu:
1620 self._log.debug("Creating vdu: %s", vdu)
1621 vdur_id = get_vdur_id(vdu)
1622
1623
1624 placement_groups = yield from self.get_vdu_placement_groups(vdu, nsr_config)
1625 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s, Existing vdur_id %s",
1626 vdu.name,
1627 self.vnf_name,
1628 self.member_vnf_index,
1629 [ group.name for group in placement_groups],
1630 vdur_id)
1631
1632 vdur = VirtualDeploymentUnitRecord(
1633 dts=self._dts,
1634 log=self._log,
1635 loop=self._loop,
1636 project = self._project,
1637 vdud=vdu,
1638 vnfr=vnfr,
1639 nsr_config=nsr_config,
1640 mgmt_intf=self.has_mgmt_interface(vdu),
1641 mgmt_network=self._mgmt_network,
1642 cloud_account_name=self.cloud_account_name,
1643 vnfd_package_store=self._vnfd_package_store,
1644 vdur_id=vdur_id,
1645 placement_groups = placement_groups,
1646 )
1647 yield from vdur.vdu_opdata_register()
1648
1649 self._vdus.append(vdur)
1650
1651 @asyncio.coroutine
1652 def instantiate_vdus(self, xact, vnfr):
1653 """ Instantiate the VDUs associated with this VNF """
1654 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1655
1656 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1657
1658 # Identify any dependencies among the VDUs
1659 dependencies = collections.defaultdict(list)
1660 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1661
1662 for vdu in self._vdus:
1663 if vdu._vdud_cloud_init is not None:
1664 for vdu_id in vdu_id_pattern.findall(vdu._vdud_cloud_init):
1665 if vdu_id != vdu.vdu_id:
1666 # This means that vdu.vdu_id depends upon vdu_id,
1667 # i.e. vdu_id must be instantiated before
1668 # vdu.vdu_id.
1669 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1670
1671 # Define the terminal states of VDU instantiation
1672 terminal = (
1673 VDURecordState.READY,
1674 VDURecordState.TERMINATED,
1675 VDURecordState.FAILED,
1676 )
1677
1678 datastore = VdurDatastore()
1679 processed = set()
1680
1681 @asyncio.coroutine
1682 def instantiate_monitor(vdu):
1683 """Monitor the state of the VDU during instantiation
1684
1685 Arguments:
1686 vdu - a VirtualDeploymentUnitRecord
1687
1688 """
1689 # wait for the VDUR to enter a terminal state
1690 while vdu._state not in terminal:
1691 yield from asyncio.sleep(1, loop=self._loop)
1692 # update the datastore
1693 datastore.update(vdu)
1694
1695 # add the VDU to the set of processed VDUs
1696 processed.add(vdu.vdu_id)
1697
1698 @asyncio.coroutine
1699 def instantiate(vdu):
1700 """Instantiate the specified VDU
1701
1702 Arguments:
1703 vdu - a VirtualDeploymentUnitRecord
1704
1705 Raises:
1706 if the VDU, or any of the VDUs this VDU depends upon, are
1707 terminated or fail to instantiate properly, a
1708 VirtualDeploymentUnitRecordError is raised.
1709
1710 """
1711 for dependency in dependencies[vdu.vdu_id]:
1712 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1713
1714 while dependency.vdu_id not in processed:
1715 yield from asyncio.sleep(1, loop=self._loop)
1716
1717 if not dependency.active:
1718 raise VirtualDeploymentUnitRecordError()
1719
1720 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1721
1722 # Populate the datastore with the current values of the VDU
1723 datastore.add(vdu)
1724
1725 # Substitute any variables contained in the cloud config script
1726 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1727
1728 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1729 if len(parts) > 1:
1730
1731 # Extract the variable names
1732 variables = list()
1733 for variable in parts[1::2]:
1734 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1735
1736 # Iterate of the variables and substitute values from the
1737 # datastore.
1738 for variable in variables:
1739
1740 # Handle a reference to a VDU by ID
1741 if variable.startswith('vdu['):
1742 value = datastore.get(variable)
1743 if value is None:
1744 msg = "Unable to find a substitute for {} in {} cloud-init script"
1745 raise ValueError(msg.format(variable, vdu.vdu_id))
1746
1747 config = config.replace("{{ %s }}" % variable, value)
1748 continue
1749
1750 # Handle a reference to the current VDU
1751 if variable.startswith('vdu'):
1752 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1753 config = config.replace("{{ %s }}" % variable, value)
1754 continue
1755
1756 # Handle unrecognized variables
1757 msg = 'unrecognized cloud-config variable: {}'
1758 raise ValueError(msg.format(variable))
1759
1760 # Instantiate the VDU
1761 with self._dts.transaction() as xact:
1762 self._log.debug("Instantiating vdu: %s", vdu)
1763 yield from vdu.instantiate(xact, vnfr, config=config)
1764 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1765 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1766 self.vnfr_id, vdu)
1767
1768 # First create a set of tasks to monitor the state of the VDUs and
1769 # report when they have entered a terminal state
1770 for vdu in self._vdus:
1771 self._loop.create_task(instantiate_monitor(vdu))
1772
1773 for vdu in self._vdus:
1774 self._loop.create_task(instantiate(vdu))
1775
1776 def has_mgmt_interface(self, vdu):
1777 # ## TODO: Support additional mgmt_interface type options
1778 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1779 return True
1780 return False
1781
1782 def vlr_xpath(self, vlr_id):
1783 """ vlr xpath """
1784 return self._project.add_project("D,/vlr:vlr-catalog/"
1785 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1786
1787 def ext_vlr_by_id(self, vlr_id):
1788 """ find ext vlr by id """
1789 return self._ext_vlrs[vlr_id]
1790
1791 @asyncio.coroutine
1792 def publish_inventory(self, xact):
1793 """ Publish the inventory associated with this VNF """
1794 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1795
1796 for component in self._rw_vnfd.component:
1797 self._log.debug("Creating inventory component %s", component)
1798 mangled_name = VcsComponent.mangle_name(component.component_name,
1799 self.vnf_name,
1800 self.vnfd_id
1801 )
1802 comp = VcsComponent(dts=self._dts,
1803 log=self._log,
1804 loop=self._loop,
1805 cluster_name=self._cluster_name,
1806 vcs_handler=self._vcs_handler,
1807 component=component,
1808 mangled_name=mangled_name,
1809 )
1810 if comp.name in self._inventory:
1811 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1812 component, self._vnfd_id)
1813 return
1814 self._log.debug("Adding component %s for vnrf %s",
1815 comp.name, self._vnfr_id)
1816 self._inventory[comp.name] = comp
1817 yield from comp.publish(xact)
1818
1819 def all_vdus_active(self):
1820 """ Are all VDUS in this VNFR active? """
1821 for vdu in self._vdus:
1822 if not vdu.active:
1823 return False
1824
1825 self._log.debug("Inside all_vdus_active. Returning True")
1826 return True
1827
1828 @asyncio.coroutine
1829 def instantiation_failed(self, failed_reason=None):
1830 """ VNFR instantiation failed """
1831 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1832 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1833 self._state_failed_reason = failed_reason
1834
1835 # Update the VNFR with the changed status
1836 yield from self.publish(None)
1837
1838 @asyncio.coroutine
1839 def is_ready(self):
1840 """ This VNF is ready"""
1841 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1842
1843 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1844 self.set_state(VirtualNetworkFunctionRecordState.READY)
1845
1846 else:
1847 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1848
1849 # Update the VNFR with the changed status
1850 yield from self.publish(None)
1851
1852 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1853 """Updated the connection point with ip address"""
1854 for cp in self._cprs:
1855 if cp.name == cp_name:
1856 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1857 cp_name, cp, ip_address, cp_id)
1858 cp.ip_address = ip_address
1859 cp.mac_address = mac_addr
1860 cp.connection_point_id = cp_id
1861 return
1862
1863 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1864 self._log.debug(err)
1865 raise VirtualDeploymentUnitRecordError(err)
1866
1867 def set_state(self, state):
1868 """ Set state for this VNFR"""
1869 self._state = state
1870
1871 @asyncio.coroutine
1872 def instantiate(self, xact, restart_mode=False):
1873 """ instantiate this VNF """
1874 self._log.info("Instantiate VNF {}: {}".format(self._vnfr_id, self._state))
1875 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1876 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1877
1878 @asyncio.coroutine
1879 def fetch_vlrs():
1880 """ Fetch VLRs """
1881 # Iterate over all the connection points in VNFR and fetch the
1882 # associated VLRs
1883
1884 def cpr_from_cp(cp):
1885 """ Creates a record level connection point from the desciptor cp"""
1886 cp_fields = ["name", "image", "vm-flavor", "port_security_enabled"]
1887 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1888 cpr_dict = {}
1889 cpr_dict.update(cp_copy_dict)
1890 return VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1891
1892 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1893 self._vnfr_id, self._vnfr.connection_point)
1894
1895 for cp in self._vnfr.connection_point:
1896 cpr = cpr_from_cp(cp)
1897 self._cprs.append(cpr)
1898 self._log.debug("Adding Connection point record %s ", cp)
1899
1900 vlr_path = self.vlr_xpath(cp.vlr_ref)
1901 self._log.debug("Fetching VLR with path = %s", vlr_path)
1902 res_iter = yield from self._dts.query_read(vlr_path,
1903 rwdts.XactFlag.MERGE)
1904 for i in res_iter:
1905 r = yield from i
1906 d = r.result
1907 self._ext_vlrs[cp.vlr_ref] = d
1908 cpr.vlr_ref = cp.vlr_ref
1909 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1910
1911 # Increase the VNFD reference count
1912 self.vnfd_ref()
1913
1914 assert self.vnfd
1915
1916 # Fetch External VLRs
1917 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1918 yield from fetch_vlrs()
1919
1920 # Publish inventory
1921 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1922 yield from self.publish_inventory(xact)
1923
1924 # Publish inventory
1925 self._log.debug("Create VLs {}: {}".format(self._vnfr_id, self._state))
1926 yield from self.create_vls()
1927
1928 # publish the VNFR
1929 self._log.debug("Publish VNFR {}: {}".format(self._vnfr_id, self._state))
1930 yield from self.publish(xact)
1931
1932
1933 # instantiate VLs
1934 self._log.debug("VNFR-ID %s: Instantiate VLs, restart mode %s", self._vnfr_id, restart_mode)
1935 try:
1936 yield from self.instantiate_vls(xact, restart_mode)
1937 except Exception as e:
1938 self._log.exception("VL instantiation failed (%s)", str(e))
1939 yield from self.instantiation_failed(str(e))
1940 return
1941
1942 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1943
1944 # instantiate VDUs
1945 self._log.debug("VNFR-ID %s: Create VDUs, restart mode %s", self._vnfr_id, restart_mode)
1946 yield from self.create_vdus(self, restart_mode)
1947
1948 try:
1949 yield from self.vdu_cloud_init_instantiation()
1950 except Exception as e:
1951 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1952 self._state_failed_reason = str(e)
1953 yield from self.publish(xact)
1954
1955 # publish the VNFR
1956 self._log.debug("VNFR {}: Publish VNFR with state {}".
1957 format(self._vnfr_id, self._state))
1958 yield from self.publish(xact)
1959
1960 # instantiate VDUs
1961 # ToDo: Check if this should be prevented during restart
1962 self._log.debug("Instantiate VDUs {}: {}".format(self._vnfr_id, self._state))
1963 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1964
1965 # publish the VNFR
1966 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1967 yield from self.publish(xact)
1968
1969 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1970
1971 # create task updating uptime for this vnfr
1972 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1973 self._loop.create_task(self.vnfr_uptime_update(xact))
1974
1975 @asyncio.coroutine
1976 def terminate(self, xact):
1977 """ Terminate this virtual network function """
1978
1979 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1980
1981 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1982
1983 # stop monitoring
1984 if self._vnf_mon is not None:
1985 self._vnf_mon.stop()
1986 self._vnf_mon.deregister()
1987 self._vnf_mon = None
1988
1989 @asyncio.coroutine
1990 def terminate_vls():
1991 """ Terminate VLs in this VNF """
1992 for vl in self._vlrs:
1993 yield from vl.terminate(xact)
1994
1995 @asyncio.coroutine
1996 def terminate_vdus():
1997 """ Terminate VDUS in this VNF """
1998 for vdu in self._vdus:
1999 yield from vdu.terminate(xact)
2000
2001 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
2002 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
2003 yield from terminate_vls()
2004
2005 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
2006 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
2007 yield from terminate_vdus()
2008
2009 self._log.debug("Terminated VNF id %s", self.vnfr_id)
2010 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
2011
2012 @asyncio.coroutine
2013 def vnfr_uptime_update(self, xact):
2014 while True:
2015 # Return when vnfr state is FAILED or TERMINATED etc
2016 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
2017 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
2018 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
2019 VirtualNetworkFunctionRecordState.READY]:
2020 return
2021 yield from self.publish(xact)
2022 yield from asyncio.sleep(2, loop=self._loop)
2023
2024
2025
2026 class VnfdDtsHandler(object):
2027 """ DTS handler for VNFD config changes """
2028 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
2029
2030 def __init__(self, dts, log, loop, vnfm):
2031 self._dts = dts
2032 self._log = log
2033 self._loop = loop
2034 self._vnfm = vnfm
2035 self._regh = None
2036
2037 @asyncio.coroutine
2038 def regh(self):
2039 """ DTS registration handle """
2040 return self._regh
2041
2042 def deregister(self):
2043 '''De-register from DTS'''
2044 self._log.debug("De-register VNFD DTS handler for project {}".
2045 format(self._vnfm._project.name))
2046 if self._regh:
2047 self._regh.deregister()
2048 self._regh = None
2049
2050 @asyncio.coroutine
2051 def register(self):
2052 """ Register for VNFD configuration"""
2053
2054 def on_apply(dts, acg, xact, action, scratch):
2055 """Apply the configuration"""
2056 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2057 xact, action, scratch)
2058
2059 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2060
2061 @asyncio.coroutine
2062 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2063 """ on prepare callback """
2064 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2065 ks_path.to_xpath(RwVnfmYang.get_schema()),
2066 xact_info.query_action, msg)
2067 fref = ProtobufC.FieldReference.alloc()
2068 fref.goto_whole_message(msg.to_pbcm())
2069
2070 # Handle deletes in prepare_callback
2071 if fref.is_field_deleted():
2072 # Delete an VNFD record
2073 self._log.debug("Deleting VNFD with id %s", msg.id)
2074 if self._vnfm.vnfd_in_use(msg.id):
2075 self._log.debug("Cannot delete VNFD in use - %s", msg)
2076 err = "Cannot delete a VNFD in use - %s" % msg
2077 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
2078 # Delete a VNFD record
2079 yield from self._vnfm.delete_vnfd(msg.id)
2080
2081 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2082
2083 xpath = self._vnfm._project.add_project(VnfdDtsHandler.XPATH)
2084 self._log.debug("Registering for VNFD config using xpath: {}".
2085 format(xpath))
2086
2087 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2088 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2089 self._regh = acg.register(
2090 xpath=xpath,
2091 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2092 on_prepare=on_prepare)
2093
2094
2095 class VcsComponentDtsHandler(object):
2096 """ Vcs Component DTS handler """
2097 XPATH = ("D,/rw-manifest:manifest" +
2098 "/rw-manifest:operational-inventory" +
2099 "/rw-manifest:component")
2100
2101 def __init__(self, dts, log, loop, vnfm):
2102 self._dts = dts
2103 self._log = log
2104 self._loop = loop
2105 self._regh = None
2106 self._vnfm = vnfm
2107
2108 @property
2109 def regh(self):
2110 """ DTS registration handle """
2111 return self._regh
2112
2113 def deregister(self):
2114 '''De-register from DTS'''
2115 self._log.debug("De-register VCS DTS handler for project {}".
2116 format(self._vnfm._project))
2117 if self._regh:
2118 self._regh.deregister()
2119 self._regh = None
2120
2121 @asyncio.coroutine
2122 def register(self):
2123 """ Registers VCS component dts publisher registration"""
2124 self._log.debug("VCS Comp publisher DTS handler registering path %s",
2125 VcsComponentDtsHandler.XPATH)
2126
2127 hdl = rift.tasklets.DTS.RegistrationHandler()
2128 handlers = rift.tasklets.Group.Handler()
2129 with self._dts.group_create(handler=handlers) as group:
2130 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
2131 handler=hdl,
2132 flags=(rwdts.Flag.PUBLISHER |
2133 rwdts.Flag.NO_PREP_READ |
2134 rwdts.Flag.DATASTORE),)
2135
2136 @asyncio.coroutine
2137 def publish(self, xact, path, msg):
2138 """ Publishes the VCS component """
2139 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
2140 xact, path, msg)
2141 self.regh.create_element(path, msg)
2142 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
2143 VcsComponentDtsHandler.XPATH, xact, path, msg)
2144
2145 class VnfrConsoleOperdataDtsHandler(object):
2146 """
2147 Registers 'D,/rw-project:project/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]'
2148 and handles CRUD from DTS
2149 """
2150
2151 @property
2152 def vnfr_vdu_console_xpath(self):
2153 """ path for resource-mgr"""
2154 return self._project.add_project("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']" +
2155 "/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
2156
2157 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
2158 self._dts = dts
2159 self._log = log
2160 self._loop = loop
2161 self._regh = None
2162 self._vnfm = vnfm
2163
2164 self._vnfr_id = vnfr_id
2165 self._vdur_id = vdur_id
2166 self._vdu_id = vdu_id
2167
2168 self._project = vnfm._project
2169
2170 def deregister(self):
2171 '''De-register from DTS'''
2172 self._log.debug("De-register VNFR console DTS handler for project {}".
2173 format(self._project))
2174 if self._regh:
2175 self._regh.deregister()
2176 self._regh = None
2177
2178 @asyncio.coroutine
2179 def register(self):
2180 """ Register for VNFR VDU Operational Data read from dts """
2181
2182 @asyncio.coroutine
2183 def on_prepare(xact_info, action, ks_path, msg):
2184 """ prepare callback from dts """
2185 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2186 self._log.debug(
2187 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2188 xact_info, action, xpath, msg
2189 )
2190
2191 if action == rwdts.QueryAction.READ:
2192 schema = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur.schema()
2193 path_entry = schema.keyspec_to_entry(ks_path)
2194 self._log.debug("VDU Opdata path is {}".format(path_entry.key00.id))
2195 try:
2196 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2197 except VnfRecordError as e:
2198 self._log.error("VNFR id %s not found", self._vnfr_id)
2199 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2200 return
2201 try:
2202 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2203 if not vdur._state == VDURecordState.READY:
2204 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2205 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2206 return
2207 with self._dts.transaction() as new_xact:
2208 resp = yield from vdur.read_resource(new_xact)
2209 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2210 vdur_console.id = self._vdur_id
2211 if resp.console_url:
2212 vdur_console.console_url = resp.console_url
2213 else:
2214 vdur_console.console_url = 'none'
2215 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2216 except Exception:
2217 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2218 vdur_console = RwVnfrYang.YangData_RwProject_Project_VnfrConsole_Vnfr_Vdur()
2219 vdur_console.id = self._vdur_id
2220 vdur_console.console_url = 'none'
2221
2222 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2223 xpath=self.vnfr_vdu_console_xpath,
2224 msg=vdur_console)
2225 else:
2226 #raise VnfRecordError("Not supported operation %s" % action)
2227 self._log.error("Not supported operation %s" % action)
2228 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2229 return
2230
2231
2232 self._log.debug("Registering for VNFR VDU using xpath: %s",
2233 self.vnfr_vdu_console_xpath)
2234 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2235 with self._dts.group_create() as group:
2236 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2237 handler=hdl,
2238 flags=rwdts.Flag.PUBLISHER,
2239 )
2240
2241
2242 class VnfrDtsHandler(object):
2243 """ registers 'D,/rw-project:project/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2244 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2245
2246 def __init__(self, dts, log, loop, vnfm):
2247 self._dts = dts
2248 self._log = log
2249 self._loop = loop
2250 self._vnfm = vnfm
2251
2252 self._regh = None
2253 self._project = vnfm._project
2254
2255 @property
2256 def regh(self):
2257 """ Return registration handle"""
2258 return self._regh
2259
2260 @property
2261 def vnfm(self):
2262 """ Return VNF manager instance """
2263 return self._vnfm
2264
2265 def deregister(self):
2266 '''De-register from DTS'''
2267 self._log.debug("De-register VNFR DTS handler for project {}".
2268 format(self._project))
2269 if self._regh:
2270 self._regh.deregister()
2271 self._regh = None
2272
2273 @asyncio.coroutine
2274 def register(self):
2275 """ Register for vnfr create/update/delete/read requests from dts """
2276 def on_commit(xact_info):
2277 """ The transaction has been committed """
2278 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2279 return rwdts.MemberRspCode.ACTION_OK
2280
2281 def on_abort(*args):
2282 """ Abort callback """
2283 self._log.debug("VNF transaction got aborted")
2284
2285 @asyncio.coroutine
2286 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2287
2288 @asyncio.coroutine
2289 def instantiate_realloc_vnfr(vnfr):
2290 """Re-populate the vnfm after restart
2291
2292 Arguments:
2293 vlink
2294
2295 """
2296
2297 yield from vnfr.instantiate(None, restart_mode=True)
2298
2299 if xact_event == rwdts.MemberEvent.INSTALL:
2300 curr_cfg = self.regh.elements
2301 for cfg in curr_cfg:
2302 vnfr = self.vnfm.create_vnfr(cfg)
2303 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2304
2305 self._log.debug("Got on_event in vnfm")
2306
2307 return rwdts.MemberRspCode.ACTION_OK
2308
2309 @asyncio.coroutine
2310 def on_prepare(xact_info, action, ks_path, msg):
2311 """ prepare callback from dts """
2312 self._log.debug(
2313 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2314 xact_info, action, msg
2315 )
2316
2317 if action == rwdts.QueryAction.CREATE:
2318 if not msg.has_field("vnfd"):
2319 err = "Vnfd not provided"
2320 self._log.error(err)
2321 raise VnfRecordError(err)
2322
2323 vnfr = self.vnfm.create_vnfr(msg)
2324 try:
2325 # RIFT-9105: Unable to add a READ query under an existing transaction
2326 # xact = xact_info.xact
2327 yield from vnfr.instantiate(None)
2328 except Exception as e:
2329 self._log.exception(e)
2330 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2331 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2332 yield from vnfr.publish(None)
2333 elif action == rwdts.QueryAction.DELETE:
2334 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2335 path_entry = schema.keyspec_to_entry(ks_path)
2336 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2337
2338 if vnfr is None:
2339 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2340 raise VirtualNetworkFunctionRecordNotFound(
2341 "VNFR id %s", path_entry.key00.id)
2342
2343 try:
2344 yield from vnfr.terminate(xact_info.xact)
2345 # Unref the VNFD
2346 vnfr.vnfd_unref()
2347 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2348 except Exception as e:
2349 self._log.exception(e)
2350 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2351
2352 elif action == rwdts.QueryAction.UPDATE:
2353 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
2354 path_entry = schema.keyspec_to_entry(ks_path)
2355 vnfr = None
2356 try:
2357 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2358 except Exception as e:
2359 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2360 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2361 return
2362
2363 if vnfr is None:
2364 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2365 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2366 return
2367
2368 self._log.debug("VNFR {} update config status {} (current {})".
2369 format(vnfr.name, msg.config_status, vnfr.config_status))
2370 # Update the config status and publish
2371 vnfr._config_status = msg.config_status
2372 yield from vnfr.publish(None)
2373
2374 else:
2375 raise NotImplementedError(
2376 "%s action on VirtualNetworkFunctionRecord not supported",
2377 action)
2378
2379 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2380
2381 xpath = self._project.add_project(VnfrDtsHandler.XPATH)
2382 self._log.debug("Registering for VNFR using xpath: {}".
2383 format(xpath))
2384
2385 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2386 on_prepare=on_prepare,)
2387 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2388 with self._dts.group_create(handler=handlers) as group:
2389 self._regh = group.register(xpath=xpath,
2390 handler=hdl,
2391 flags=(rwdts.Flag.PUBLISHER |
2392 rwdts.Flag.NO_PREP_READ |
2393 rwdts.Flag.CACHE |
2394 rwdts.Flag.DATASTORE),)
2395
2396 @asyncio.coroutine
2397 def create(self, xact, xpath, msg):
2398 """
2399 Create a VNFR record in DTS with path and message
2400 """
2401 path = self._project.add_project(xpath)
2402 self._log.debug("Creating VNFR xact = %s, %s:%s",
2403 xact, path, msg)
2404
2405 self.regh.create_element(path, msg)
2406 self._log.debug("Created VNFR xact = %s, %s:%s",
2407 xact, path, msg)
2408
2409 @asyncio.coroutine
2410 def update(self, xact, xpath, msg):
2411 """
2412 Update a VNFR record in DTS with path and message
2413 """
2414 path = self._project.add_project(xpath)
2415 self._log.debug("Updating VNFR xact = %s, %s:%s",
2416 xact, path, msg)
2417 self.regh.update_element(path, msg)
2418 self._log.debug("Updated VNFR xact = %s, %s:%s",
2419 xact, path, msg)
2420
2421 @asyncio.coroutine
2422 def delete(self, xact, xpath):
2423 """
2424 Delete a VNFR record in DTS with path and message
2425 """
2426 path = self._project.add_project(xpath)
2427 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2428 self.regh.delete_element(path)
2429 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2430
2431
2432 class VnfdRefCountDtsHandler(object):
2433 """ The VNFD Ref Count DTS handler """
2434 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2435
2436 def __init__(self, dts, log, loop, vnfm):
2437 self._dts = dts
2438 self._log = log
2439 self._loop = loop
2440 self._vnfm = vnfm
2441
2442 self._regh = None
2443
2444 @property
2445 def regh(self):
2446 """ Return registration handle """
2447 return self._regh
2448
2449 @property
2450 def vnfm(self):
2451 """ Return the NS manager instance """
2452 return self._vnfm
2453
2454 def deregister(self):
2455 '''De-register from DTS'''
2456 self._log.debug("De-register VNFD Ref DTS handler for project {}".
2457 format(self._vnfm._project))
2458 if self._regh:
2459 self._regh.deregister()
2460 self._regh = None
2461
2462 @asyncio.coroutine
2463 def register(self):
2464 """ Register for VNFD ref count read from dts """
2465
2466 @asyncio.coroutine
2467 def on_prepare(xact_info, action, ks_path, msg):
2468 """ prepare callback from dts """
2469 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2470 self._log.debug(
2471 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2472 xact_info, action, xpath, msg
2473 )
2474
2475 if action == rwdts.QueryAction.READ:
2476 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount.schema()
2477 path_entry = schema.keyspec_to_entry(ks_path)
2478 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2479 for xpath, msg in vnfd_list:
2480 self._log.debug("Responding to ref count query path:%s, msg:%s",
2481 xpath, msg)
2482 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2483 xpath=xpath,
2484 msg=msg)
2485 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2486 else:
2487 raise VnfRecordError("Not supported operation %s" % action)
2488
2489 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2490 with self._dts.group_create() as group:
2491 self._regh = group.register(xpath=self._vnfm._project.add_project(
2492 VnfdRefCountDtsHandler.XPATH),
2493 handler=hdl,
2494 flags=rwdts.Flag.PUBLISHER,
2495 )
2496
2497
2498 class VdurDatastore(object):
2499 """
2500 This VdurDatastore is intended to expose select information about a VDUR
2501 such that it can be referenced in a cloud config file. The data that is
2502 exposed does not necessarily follow the structure of the data in the yang
2503 model. This is intentional. The data that are exposed are intended to be
2504 agnostic of the yang model so that changes in the model do not necessarily
2505 require changes to the interface provided to the user. It also means that
2506 the user does not need to be familiar with the RIFT.ware yang models.
2507 """
2508
2509 def __init__(self):
2510 """Create an instance of VdurDatastore"""
2511 self._vdur_data = dict()
2512 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2513
2514 def add(self, vdur):
2515 """Add a new VDUR to the datastore
2516
2517 Arguments:
2518 vdur - a VirtualDeploymentUnitRecord instance
2519
2520 Raises:
2521 A ValueError is raised if the VDUR is (1) None or (2) already in
2522 the datastore.
2523
2524 """
2525 if vdur.vdu_id is None:
2526 raise ValueError('VDURs are required to have an ID')
2527
2528 if vdur.vdu_id in self._vdur_data:
2529 raise ValueError('cannot add a VDUR more than once')
2530
2531 self._vdur_data[vdur.vdu_id] = dict()
2532
2533 def set_if_not_none(key, attr):
2534 if attr is not None:
2535 self._vdur_data[vdur.vdu_id][key] = attr
2536
2537 set_if_not_none('name', vdur._vdud.name)
2538 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2539 # The below can be used for hostname
2540 set_if_not_none('vdur_name', vdur.unique_short_name)
2541
2542 def update(self, vdur):
2543 """Update the VDUR information in the datastore
2544
2545 Arguments:
2546 vdur - a GI representation of a VDUR
2547
2548 Raises:
2549 A ValueError is raised if the VDUR is (1) None or (2) already in
2550 the datastore.
2551
2552 """
2553 if vdur.vdu_id is None:
2554 raise ValueError('VNFDs are required to have an ID')
2555
2556 if vdur.vdu_id not in self._vdur_data:
2557 raise ValueError('VNF is not recognized')
2558
2559 def set_or_delete(key, attr):
2560 if attr is None:
2561 if key in self._vdur_data[vdur.vdu_id]:
2562 del self._vdur_data[vdur.vdu_id][key]
2563
2564 else:
2565 self._vdur_data[vdur.vdu_id][key] = attr
2566
2567 set_or_delete('name', vdur._vdud.name)
2568 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2569 # The below can be used for hostname
2570 set_or_delete('vdur_name', vdur.unique_short_name)
2571
2572 def remove(self, vdur_id):
2573 """Remove all of the data associated with specified VDUR
2574
2575 Arguments:
2576 vdur_id - the identifier of a VNFD in the datastore
2577
2578 Raises:
2579 A ValueError is raised if the VDUR is not contained in the
2580 datastore.
2581
2582 """
2583 if vdur_id not in self._vdur_data:
2584 raise ValueError('VNF is not recognized')
2585
2586 del self._vdur_data[vdur_id]
2587
2588 def get(self, expr):
2589 """Retrieve VDUR information from the datastore
2590
2591 An expression should be of the form,
2592
2593 vdu[<id>].<attr>
2594
2595 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2596 the exposed attribute that the user wishes to retrieve.
2597
2598 If the requested data is not available, None is returned.
2599
2600 Arguments:
2601 expr - a string that specifies the data to return
2602
2603 Raises:
2604 A ValueError is raised if the provided expression cannot be parsed.
2605
2606 Returns:
2607 The requested data or None
2608
2609 """
2610 result = self._pattern.match(expr)
2611 if result is None:
2612 raise ValueError('data expression not recognized ({})'.format(expr))
2613
2614 vdur_id, key = result.groups()
2615
2616 if vdur_id not in self._vdur_data:
2617 return None
2618
2619 return self._vdur_data[vdur_id].get(key, None)
2620
2621
2622 class VnfManager(object):
2623 """ The virtual network function manager class """
2624 def __init__(self, dts, log, loop, project, cluster_name):
2625 self._dts = dts
2626 self._log = log
2627 self._loop = loop
2628 self._project = project
2629 self._cluster_name = cluster_name
2630
2631 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2632 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2633 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2634 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(
2635 log, dts, loop, project, callback=self.handle_nsr)
2636
2637 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2638 self._vnfr_handler,
2639 self._vcs_handler,
2640 self._vnfr_ref_handler,
2641 self._nsr_handler]
2642 self._vnfrs = {}
2643 self._vnfds_to_vnfr = {}
2644 self._nsrs = {}
2645
2646 @property
2647 def vnfr_handler(self):
2648 """ VNFR dts handler """
2649 return self._vnfr_handler
2650
2651 @property
2652 def vcs_handler(self):
2653 """ VCS dts handler """
2654 return self._vcs_handler
2655
2656 @asyncio.coroutine
2657 def register(self):
2658 """ Register all static DTS handlers """
2659 for hdl in self._dts_handlers:
2660 yield from hdl.register()
2661
2662 def deregister(self):
2663 self._log.debug("De-register VNFM project {}".format(self._project.name))
2664 for hdl in self._dts_handlers:
2665 hdl.deregister()
2666
2667 @asyncio.coroutine
2668 def run(self):
2669 """ Run this VNFM instance """
2670 self._log.debug("Run VNFManager - registering static DTS handlers""")
2671 yield from self.register()
2672
2673 def handle_nsr(self, nsr, action):
2674 if action in [rwdts.QueryAction.CREATE]:
2675 self._nsrs[nsr.id] = nsr
2676 elif action == rwdts.QueryAction.DELETE:
2677 if nsr.id in self._nsrs:
2678 del self._nsrs[nsr.id]
2679
2680 def get_linked_mgmt_network(self, vnfr):
2681 """For the given VNFR get the related mgmt network from the NSD, if
2682 available.
2683 """
2684 vnfd_id = vnfr.vnfd.id
2685 nsr_id = vnfr.nsr_id_ref
2686
2687 # for the given related VNFR, get the corresponding NSR-config
2688 nsr_obj = None
2689 try:
2690 nsr_obj = self._nsrs[nsr_id]
2691 except KeyError:
2692 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2693
2694 # for the related NSD check if a VLD exists such that it's a mgmt
2695 # network
2696 for vld in nsr_obj.nsd.vld:
2697 if vld.mgmt_network:
2698 return vld.name
2699
2700 return None
2701
2702 def get_vnfr(self, vnfr_id):
2703 """ get VNFR by vnfr id """
2704
2705 if vnfr_id not in self._vnfrs:
2706 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2707
2708 return self._vnfrs[vnfr_id]
2709
2710 def create_vnfr(self, vnfr):
2711 """ Create a VNFR instance """
2712 if vnfr.id in self._vnfrs:
2713 msg = "Vnfr id %s already exists" % vnfr.id
2714 self._log.error(msg)
2715 raise VnfRecordError(msg)
2716
2717 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2718 vnfr.id,
2719 vnfr.vnfd.id)
2720
2721 mgmt_network = self.get_linked_mgmt_network(vnfr)
2722
2723 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2724 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2725 mgmt_network=mgmt_network
2726 )
2727
2728 #Update ref count
2729 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2730 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2731 else:
2732 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2733
2734 return self._vnfrs[vnfr.id]
2735
2736 @asyncio.coroutine
2737 def delete_vnfr(self, xact, vnfr):
2738 """ Create a VNFR instance """
2739 if vnfr.vnfr_id in self._vnfrs:
2740 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2741 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2742
2743 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2744 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2745 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2746
2747 del self._vnfrs[vnfr.vnfr_id]
2748
2749 @asyncio.coroutine
2750 def fetch_vnfd(self, vnfd_id):
2751 """ Fetch VNFDs based with the vnfd id"""
2752 vnfd_path = self._project.add_project(
2753 VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id))
2754 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2755 vnfd = None
2756
2757 res_iter = yield from self._dts.query_read(vnfd_path,
2758 rwdts.XactFlag.MERGE)
2759
2760 for ent in res_iter:
2761 res = yield from ent
2762 vnfd = res.result
2763
2764 if vnfd is None:
2765 err = "Failed to get Vnfd %s" % vnfd_id
2766 self._log.error(err)
2767 raise VnfRecordError(err)
2768
2769 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2770
2771 return vnfd
2772
2773 def vnfd_in_use(self, vnfd_id):
2774 """ Is this VNFD in use """
2775 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2776 if vnfd_id in self._vnfds_to_vnfr:
2777 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2778 return False
2779
2780 @asyncio.coroutine
2781 def publish_vnfr(self, xact, path, msg):
2782 """ Publish a VNFR """
2783 self._log.debug("publish_vnfr called with path %s, msg %s",
2784 path, msg)
2785 yield from self.vnfr_handler.update(xact, path, msg)
2786
2787 @asyncio.coroutine
2788 def delete_vnfd(self, vnfd_id):
2789 """ Delete the Virtual Network Function descriptor with the passed id """
2790 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2791 if vnfd_id in self._vnfds_to_vnfr:
2792 if self._vnfds_to_vnfr[vnfd_id]:
2793 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2794 vnfd_id,
2795 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2796 raise VirtualNetworkFunctionDescriptorRefCountExists(
2797 "Cannot delete :%s, ref_count:%s",
2798 vnfd_id,
2799 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2800
2801 del self._vnfds_to_vnfr[vnfd_id]
2802
2803 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2804 try:
2805 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2806 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2807 if os.path.exists(vnfd_dir):
2808 shutil.rmtree(vnfd_dir, ignore_errors=True)
2809 except Exception as e:
2810 self._log.error("Exception in cleaning up VNFD {}: {}".
2811 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2812 self._log.exception(e)
2813
2814
2815 def vnfd_refcount_xpath(self, vnfd_id):
2816 """ xpath for ref count entry """
2817 return self._project.add_project(VnfdRefCountDtsHandler.XPATH +
2818 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2819
2820 @asyncio.coroutine
2821 def get_vnfd_refcount(self, vnfd_id):
2822 """ Get the vnfd_list from this VNFM"""
2823 vnfd_list = []
2824 if vnfd_id is None or vnfd_id == "":
2825 for vnfd in self._vnfds_to_vnfr.keys():
2826 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2827 vnfd_msg.vnfd_id_ref = vnfd
2828 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2829 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2830 elif vnfd_id in self._vnfds_to_vnfr:
2831 vnfd_msg = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_VnfdRefCount()
2832 vnfd_msg.vnfd_id_ref = vnfd_id
2833 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2834 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2835
2836 return vnfd_list
2837
2838
2839 class VnfmProject(ManoProject):
2840
2841 def __init__(self, name, tasklet, **kw):
2842 super(VnfmProject, self).__init__(tasklet.log, name)
2843 self.update(tasklet)
2844
2845 self._vnfm = None
2846
2847 @asyncio.coroutine
2848 def register (self):
2849 try:
2850 vm_parent_name = self._tasklet.tasklet_info.get_parent_vm_parent_instance_name()
2851 assert vm_parent_name is not None
2852 self._vnfm = VnfManager(self._dts, self.log, self.loop, self, vm_parent_name)
2853 yield from self._vnfm.run()
2854 except Exception:
2855 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2856 raise
2857
2858 def deregister(self):
2859 self._log.debug("De-register project {} for VnfmProject".
2860 format(self.name))
2861 self._vnfm.deregister()
2862
2863
2864 class VnfmTasklet(rift.tasklets.Tasklet):
2865 """ VNF Manager tasklet class """
2866 def __init__(self, *args, **kwargs):
2867 super(VnfmTasklet, self).__init__(*args, **kwargs)
2868 self.rwlog.set_category("rw-mano-log")
2869 self.rwlog.set_subcategory("vnfm")
2870
2871 self._dts = None
2872 self._project_handler = None
2873 self.projects = {}
2874
2875 @property
2876 def dts(self):
2877 return self._dts
2878
2879 def start(self):
2880 try:
2881 super(VnfmTasklet, self).start()
2882 self.log.info("Starting VnfmTasklet")
2883
2884 self.log.setLevel(logging.DEBUG)
2885
2886 self.log.debug("Registering with dts")
2887 self._dts = rift.tasklets.DTS(self.tasklet_info,
2888 RwVnfmYang.get_schema(),
2889 self.loop,
2890 self.on_dts_state_change)
2891
2892 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2893 except Exception:
2894 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2895 raise
2896
2897 def on_instance_started(self):
2898 """ Task insance started callback """
2899 self.log.debug("Got instance started callback")
2900
2901 def stop(self):
2902 try:
2903 self._dts.deinit()
2904 except Exception:
2905 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2906 raise
2907
2908 @asyncio.coroutine
2909 def init(self):
2910 """ Task init callback """
2911 self.log.debug("creating project handler")
2912 self.project_handler = ProjectHandler(self, VnfmProject)
2913 self.project_handler.register()
2914
2915 @asyncio.coroutine
2916 def run(self):
2917 """ Task run callback """
2918 pass
2919
2920 @asyncio.coroutine
2921 def on_dts_state_change(self, state):
2922 """Take action according to current dts state to transition
2923 application into the corresponding application state
2924
2925 Arguments
2926 state - current dts state
2927 """
2928 switch = {
2929 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2930 rwdts.State.CONFIG: rwdts.State.RUN,
2931 }
2932
2933 handlers = {
2934 rwdts.State.INIT: self.init,
2935 rwdts.State.RUN: self.run,
2936 }
2937
2938 # Transition application to next state
2939 handler = handlers.get(state, None)
2940 if handler is not None:
2941 yield from handler()
2942
2943 # Transition dts to next state
2944 next_state = switch.get(state, None)
2945 if next_state is not None:
2946 self._dts.handle.set_state(next_state)