RIFT-14481 uptime for vlr, vnfr, nsr
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52
53
54 class VMResourceError(Exception):
55 """ VM resource Error"""
56 pass
57
58
59 class VnfRecordError(Exception):
60 """ VNF record instatiation failed"""
61 pass
62
63
64 class VduRecordError(Exception):
65 """ VDU record instatiation failed"""
66 pass
67
68
69 class NotImplemented(Exception):
70 """Not implemented """
71 pass
72
73
74 class VnfrRecordExistsError(Exception):
75 """VNFR record already exist with the same VNFR id"""
76 pass
77
78
79 class InternalVirtualLinkRecordError(Exception):
80 """Internal virtual link record error"""
81 pass
82
83
84 class VDUImageNotFound(Exception):
85 """VDU Image not found error"""
86 pass
87
88
89 class VirtualDeploymentUnitRecordError(Exception):
90 """VDU Instantiation failed"""
91 pass
92
93
94 class VMNotReadyError(Exception):
95 """ VM Not yet received from resource manager """
96 pass
97
98
99 class VDURecordNotFound(Exception):
100 """ Could not find a VDU record """
101 pass
102
103
104 class VirtualNetworkFunctionRecordDescNotFound(Exception):
105 """ Cannot find Virtual Network Function Record Descriptor """
106 pass
107
108
109 class VirtualNetworkFunctionDescriptorError(Exception):
110 """ Virtual Network Function Record Descriptor Error """
111 pass
112
113
114 class VirtualNetworkFunctionDescriptorNotFound(Exception):
115 """ Virtual Network Function Record Descriptor Not Found """
116 pass
117
118
119 class VirtualNetworkFunctionRecordNotFound(Exception):
120 """ Virtual Network Function Record Not Found """
121 pass
122
123
124 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
125 """ Virtual Network Funtion Descriptor reference count exists """
126 pass
127
128
129 class VnfrInstantiationFailed(Exception):
130 """ Virtual Network Funtion Instantiation failed"""
131 pass
132
133
134 class VNFMPlacementGroupError(Exception):
135 pass
136
137 class VirtualNetworkFunctionRecordState(enum.Enum):
138 """ VNFR state """
139 INIT = 1
140 VL_INIT_PHASE = 2
141 VM_INIT_PHASE = 3
142 READY = 4
143 TERMINATE = 5
144 VL_TERMINATE_PHASE = 6
145 VDU_TERMINATE_PHASE = 7
146 TERMINATED = 7
147 FAILED = 10
148
149
150 class VDURecordState(enum.Enum):
151 """VDU record state """
152 INIT = 1
153 INSTANTIATING = 2
154 RESOURCE_ALLOC_PENDING = 3
155 READY = 4
156 TERMINATING = 5
157 TERMINATED = 6
158 FAILED = 10
159
160
161 class VcsComponent(object):
162 """ VCS Component within the VNF descriptor """
163 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
164 self._dts = dts
165 self._log = log
166 self._loop = loop
167 self._component = component
168 self._cluster_name = cluster_name
169 self._vcs_handler = vcs_handler
170 self._mangled_name = mangled_name
171
172 @staticmethod
173 def mangle_name(component_name, vnf_name, vnfd_id):
174 """ mangled component name """
175 return vnf_name + ":" + component_name + ":" + vnfd_id
176
177 @property
178 def name(self):
179 """ name of this component"""
180 return self._mangled_name
181
182 @property
183 def path(self):
184 """ The path for this object """
185 return("D,/rw-manifest:manifest" +
186 "/rw-manifest:operational-inventory" +
187 "/rw-manifest:component" +
188 "[rw-manifest:component-name = '{}']").format(self.name)
189
190 @property
191 def instance_xpath(self):
192 """ The path for this object """
193 return("D,/rw-base:vcs" +
194 "/instances" +
195 "/instance" +
196 "[instance-name = '{}']".format(self._cluster_name))
197
198 @property
199 def start_comp_xpath(self):
200 """ start component xpath """
201 return (self.instance_xpath +
202 "/child-n[instance-name = 'START-REQ']")
203
204 def get_start_comp_msg(self, ip_address):
205 """ start this component """
206 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
207 start_msg.instance_name = 'START-REQ'
208 start_msg.component_name = self.name
209 start_msg.admin_command = "START"
210 start_msg.ip_address = ip_address
211
212 return start_msg
213
214 @property
215 def msg(self):
216 """ Returns the message for this vcs component"""
217
218 vcs_comp_dict = self._component.as_dict()
219
220 def mangle_comp_names(comp_dict):
221 """ mangle component name with VNF name, id"""
222 for key, val in comp_dict.items():
223 if isinstance(val, dict):
224 comp_dict[key] = mangle_comp_names(val)
225 elif isinstance(val, list):
226 i = 0
227 for ent in val:
228 if isinstance(ent, dict):
229 val[i] = mangle_comp_names(ent)
230 else:
231 val[i] = ent
232 i += 1
233 elif key == "component_name":
234 comp_dict[key] = VcsComponent.mangle_name(val,
235 self._vnfd_name,
236 self._vnfd_id)
237 return comp_dict
238
239 mangled_dict = mangle_comp_names(vcs_comp_dict)
240 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
241 return msg
242
243 @asyncio.coroutine
244 def publish(self, xact):
245 """ Publishes the VCS component """
246 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
247 self.name, self.path, self.msg)
248 yield from self._vcs_handler.publish(xact, self.path, self.msg)
249
250 @asyncio.coroutine
251 def start(self, xact, parent, ip_addr=None):
252 """ Starts this VCS component """
253 # ATTN RV - replace with block add
254 start_msg = self.get_start_comp_msg(ip_addr)
255 self._log.debug("starting component %s %s",
256 self.start_comp_xpath, start_msg)
257 yield from self._dts.query_create(self.start_comp_xpath,
258 0,
259 start_msg)
260 self._log.debug("started component %s, %s",
261 self.start_comp_xpath, start_msg)
262
263
264 class VirtualDeploymentUnitRecord(object):
265 """ Virtual Deployment Unit Record """
266 def __init__(self,
267 dts,
268 log,
269 loop,
270 vdud,
271 vnfr,
272 mgmt_intf,
273 cloud_account_name,
274 vnfd_package_store,
275 vdur_id=None,
276 placement_groups=[]):
277 self._dts = dts
278 self._log = log
279 self._loop = loop
280 self._vdud = vdud
281 self._vnfr = vnfr
282 self._mgmt_intf = mgmt_intf
283 self._cloud_account_name = cloud_account_name
284 self._vnfd_package_store = vnfd_package_store
285
286 self._vdur_id = vdur_id or str(uuid.uuid4())
287 self._int_intf = []
288 self._ext_intf = []
289 self._state = VDURecordState.INIT
290 self._state_failed_reason = None
291 self._request_id = str(uuid.uuid4())
292 self._name = vnfr.name + "__" + vdud.id
293 self._placement_groups = placement_groups
294 self._rm_regh = None
295 self._vm_resp = None
296 self._vdud_cloud_init = None
297 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
298
299 @asyncio.coroutine
300 def vdu_opdata_register(self):
301 yield from self._vdur_console_handler.register()
302
303 def cp_ip_addr(self, cp_name):
304 """ Find ip address by connection point name """
305 if self._vm_resp is not None:
306 for conn_point in self._vm_resp.connection_points:
307 if conn_point.name == cp_name:
308 return conn_point.ip_address
309 return "0.0.0.0"
310
311 def cp_mac_addr(self, cp_name):
312 """ Find mac address by connection point name """
313 if self._vm_resp is not None:
314 for conn_point in self._vm_resp.connection_points:
315 if conn_point.name == cp_name:
316 return conn_point.mac_addr
317 return "00:00:00:00:00:00"
318
319 def cp_id(self, cp_name):
320 """ Find connection point id by connection point name """
321 if self._vm_resp is not None:
322 for conn_point in self._vm_resp.connection_points:
323 if conn_point.name == cp_name:
324 return conn_point.connection_point_id
325 return ''
326
327 @property
328 def vdu_id(self):
329 return self._vdud.id
330
331 @property
332 def vm_resp(self):
333 return self._vm_resp
334
335 @property
336 def name(self):
337 """ Return this VDUR's name """
338 return self._name
339
340 @property
341 def cloud_account_name(self):
342 """ Cloud account this VDU should be created in """
343 return self._cloud_account_name
344
345 @property
346 def image_name(self):
347 """ name that should be used to lookup the image on the CMP """
348 return os.path.basename(self._vdud.image)
349
350 @property
351 def image_checksum(self):
352 """ name that should be used to lookup the image on the CMP """
353 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
354
355 @property
356 def management_ip(self):
357 if not self.active:
358 return None
359 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
360
361 @property
362 def vm_management_ip(self):
363 if not self.active:
364 return None
365 return self._vm_resp.management_ip
366
367 @property
368 def operational_status(self):
369 """ Operational status of this VDU"""
370 op_stats_dict = {"INIT": "init",
371 "INSTANTIATING": "vm_init_phase",
372 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
373 "READY": "running",
374 "FAILED": "failed",
375 "TERMINATING": "terminated",
376 "TERMINATED": "terminated",
377 }
378 return op_stats_dict[self._state.name]
379
380 @property
381 def msg(self):
382 """ VDU message """
383 vdu_fields = ["vm_flavor",
384 "guest_epa",
385 "vswitch_epa",
386 "hypervisor_epa",
387 "host_epa",
388 "name"]
389 vdu_copy_dict = {k: v for k, v in
390 self._vdud.as_dict().items() if k in vdu_fields}
391 vdur_dict = {"id": self._vdur_id,
392 "vdu_id_ref": self._vdud.id,
393 "operational_status": self.operational_status,
394 "operational_status_details": self._state_failed_reason,
395 }
396 if self.vm_resp is not None:
397 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
398 "flavor_id": self.vm_resp.flavor_id,
399 "image_id": self.vm_resp.image_id,
400 })
401
402 if self.management_ip is not None:
403 vdur_dict["management_ip"] = self.management_ip
404
405 if self.vm_management_ip is not None:
406 vdur_dict["vm_management_ip"] = self.vm_management_ip
407
408 vdur_dict.update(vdu_copy_dict)
409
410 icp_list = []
411 ii_list = []
412
413 for intf, cp_id, vlr in self._int_intf:
414 cp = self.find_internal_cp_by_cp_id(cp_id)
415
416 icp_list.append({"name": cp.name,
417 "id": cp.id,
418 "type_yang": "VPORT",
419 "ip_address": self.cp_ip_addr(cp.id),
420 "mac_address": self.cp_mac_addr(cp.id)})
421
422 ii_list.append({"name": intf.name,
423 "vdur_internal_connection_point_ref": cp.id,
424 "virtual_interface": {}})
425
426 vdur_dict["internal_connection_point"] = icp_list
427 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
428 vdur_dict["internal_interface"] = ii_list
429
430 ei_list = []
431 for intf, cp, vlr in self._ext_intf:
432 ei_list.append({"name": cp,
433 "vnfd_connection_point_ref": cp,
434 "virtual_interface": {}})
435 self._vnfr.update_cp(cp,
436 self.cp_ip_addr(cp),
437 self.cp_mac_addr(cp),
438 self.cp_id(cp))
439
440 vdur_dict["external_interface"] = ei_list
441
442 placement_groups = []
443 for group in self._placement_groups:
444 placement_groups.append(group.as_dict())
445
446 vdur_dict['placement_groups_info'] = placement_groups
447 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
448
449 @property
450 def resmgr_path(self):
451 """ path for resource-mgr"""
452 return ("D,/rw-resource-mgr:resource-mgmt" +
453 "/vdu-event" +
454 "/vdu-event-data[event-id='{}']".format(self._request_id))
455
456 @property
457 def vm_flavor_msg(self):
458 """ VM flavor message """
459 flavor = self._vdud.vm_flavor.__class__()
460 flavor.copy_from(self._vdud.vm_flavor)
461
462 return flavor
463
464 @property
465 def vdud_cloud_init(self):
466 """ Return the cloud-init contents for the VDU """
467 if self._vdud_cloud_init is None:
468 self._vdud_cloud_init = self.cloud_init()
469
470 return self._vdud_cloud_init
471
472 def cloud_init(self):
473 """ Populate cloud_init with cloud-config script from
474 either the inline contents or from the file provided
475 """
476 if self._vdud.cloud_init is not None:
477 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
478 return self._vdud.cloud_init
479 elif self._vdud.cloud_init_file is not None:
480 # Get cloud-init script contents from the file provided in the cloud_init_file param
481 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
482 filename = self._vdud.cloud_init_file
483 self._vnfd_package_store.refresh()
484 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
485 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
486 try:
487 return cloud_init_extractor.read_script(stored_package, filename)
488 except rift.package.cloud_init.CloudInitExtractionError as e:
489 raise VirtualDeploymentUnitRecordError(e)
490 else:
491 self._log.debug("VDU Instantiation: cloud-init script not provided")
492
493 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
494 host_aggregates = []
495 availability_zones = []
496 server_groups = []
497 for group in self._placement_groups:
498 if group.has_field('host_aggregate'):
499 for aggregate in group.host_aggregate:
500 host_aggregates.append(aggregate.as_dict())
501 if group.has_field('availability_zone'):
502 availability_zones.append(group.availability_zone.as_dict())
503 if group.has_field('server_group'):
504 server_groups.append(group.server_group.as_dict())
505
506 if availability_zones:
507 if len(availability_zones) > 1:
508 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
509 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
510 else:
511 vm_create_msg_dict['availability_zone'] = availability_zones[0]
512
513 if server_groups:
514 if len(server_groups) > 1:
515 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
516 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
517 else:
518 vm_create_msg_dict['server_group'] = server_groups[0]
519
520 if host_aggregates:
521 vm_create_msg_dict['host_aggregate'] = host_aggregates
522
523 return
524
525 def process_placement_groups(self, vm_create_msg_dict):
526 """Process the placement_groups and fill resource-mgr request"""
527 if not self._placement_groups:
528 return
529
530 cloud_set = set([group.cloud_type for group in self._placement_groups])
531 assert len(cloud_set) == 1
532 cloud_type = cloud_set.pop()
533
534 if cloud_type == 'openstack':
535 self.process_openstack_placement_group_construct(vm_create_msg_dict)
536
537 else:
538 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
539 return
540
541 def resmgr_msg(self, config=None):
542 vdu_fields = ["vm_flavor",
543 "guest_epa",
544 "vswitch_epa",
545 "hypervisor_epa",
546 "host_epa"]
547
548 self._log.debug("Creating params based on VDUD: %s", self._vdud)
549 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
550
551 vm_create_msg_dict = {
552 "name": self.name,
553 "image_name": self.image_name,
554 }
555
556 if self.image_checksum is not None:
557 vm_create_msg_dict["image_checksum"] = self.image_checksum
558
559 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
560 if self._vdud.has_field('mgmt_vpci'):
561 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
562
563 self._log.debug("VDUD: %s", self._vdud)
564 if config is not None:
565 vm_create_msg_dict['vdu_init'] = {'userdata': config}
566
567 cp_list = []
568 for intf, cp, vlr in self._ext_intf:
569 cp_info = {"name": cp,
570 "virtual_link_id": vlr.network_id,
571 "type_yang": intf.virtual_interface.type_yang}
572
573 if (intf.virtual_interface.has_field('vpci') and
574 intf.virtual_interface.vpci is not None):
575 cp_info["vpci"] = intf.virtual_interface.vpci
576
577 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
578 cp_info['security_group'] = vlr.ip_profile_params.security_group
579
580 cp_list.append(cp_info)
581
582 for intf, cp, vlr in self._int_intf:
583 if (intf.virtual_interface.has_field('vpci') and
584 intf.virtual_interface.vpci is not None):
585 cp_list.append({"name": cp,
586 "virtual_link_id": vlr.network_id,
587 "type_yang": intf.virtual_interface.type_yang,
588 "vpci": intf.virtual_interface.vpci})
589 else:
590 cp_list.append({"name": cp,
591 "virtual_link_id": vlr.network_id,
592 "type_yang": intf.virtual_interface.type_yang})
593
594 vm_create_msg_dict["connection_points"] = cp_list
595 vm_create_msg_dict.update(vdu_copy_dict)
596
597 self.process_placement_groups(vm_create_msg_dict)
598
599 msg = RwResourceMgrYang.VDUEventData()
600 msg.event_id = self._request_id
601 msg.cloud_account = self.cloud_account_name
602 msg.request_info.from_dict(vm_create_msg_dict)
603 return msg
604
605 @asyncio.coroutine
606 def terminate(self, xact):
607 """ Delete resource in VIM """
608 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
609 self._log.warning("VDU terminate in not ready state - Ignoring request")
610 return
611
612 self._state = VDURecordState.TERMINATING
613 if self._vm_resp is not None:
614 try:
615 with self._dts.transaction() as new_xact:
616 yield from self.delete_resource(new_xact)
617 except Exception:
618 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
619
620 if self._rm_regh is not None:
621 self._log.debug("Deregistering resource manager registration handle")
622 self._rm_regh.deregister()
623 self._rm_regh = None
624
625 if self._vdur_console_handler is not None:
626 self._log.error("Deregistering vnfr vdur registration handle")
627 self._vdur_console_handler._regh.deregister()
628 self._vdur_console_handler._regh = None
629
630 self._state = VDURecordState.TERMINATED
631
632 def find_internal_cp_by_cp_id(self, cp_id):
633 """ Find the CP corresponding to the connection point id"""
634 cp = None
635
636 self._log.debug("find_internal_cp_by_cp_id(%s) called",
637 cp_id)
638
639 for int_cp in self._vdud.internal_connection_point:
640 self._log.debug("Checking for int cp %s in internal connection points",
641 int_cp.id)
642 if int_cp.id == cp_id:
643 cp = int_cp
644 break
645
646 if cp is None:
647 self._log.debug("Failed to find cp %s in internal connection points",
648 cp_id)
649 msg = "Failed to find cp %s in internal connection points" % cp_id
650 raise VduRecordError(msg)
651
652 # return the VLR associated with the connection point
653 return cp
654
655 @asyncio.coroutine
656 def create_resource(self, xact, vnfr, config=None):
657 """ Request resource from ResourceMgr """
658 def find_cp_by_name(cp_name):
659 """ Find a connection point by name """
660 cp = None
661 self._log.debug("find_cp_by_name(%s) called", cp_name)
662 for ext_cp in vnfr._cprs:
663 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
664 if ext_cp.name == cp_name:
665 cp = ext_cp
666 break
667 if cp is None:
668 self._log.debug("Failed to find cp %s in external connection points",
669 cp_name)
670 return cp
671
672 def find_internal_vlr_by_cp_name(cp_name):
673 """ Find the VLR corresponding to the connection point name"""
674 cp = None
675
676 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
677 cp_name)
678
679 for int_cp in self._vdud.internal_connection_point:
680 self._log.debug("Checking for int cp %s in internal connection points",
681 int_cp.id)
682 if int_cp.id == cp_name:
683 cp = int_cp
684 break
685
686 if cp is None:
687 self._log.debug("Failed to find cp %s in internal connection points",
688 cp_name)
689 msg = "Failed to find cp %s in internal connection points" % cp_name
690 raise VduRecordError(msg)
691
692 # return the VLR associated with the connection point
693 return vnfr.find_vlr_by_cp(cp_name)
694
695 block = xact.block_create()
696
697 self._log.debug("Executing vm request id: %s, action: create",
698 self._request_id)
699
700 # Resolve the networks associated external interfaces
701 for ext_intf in self._vdud.external_interface:
702 self._log.debug("Resolving external interface name [%s], cp[%s]",
703 ext_intf.name, ext_intf.vnfd_connection_point_ref)
704 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
705 if cp is None:
706 self._log.debug("Failed to find connection point - %s",
707 ext_intf.vnfd_connection_point_ref)
708 continue
709 self._log.debug("Connection point name [%s], type[%s]",
710 cp.name, cp.type_yang)
711
712 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
713
714 etuple = (ext_intf, cp.name, vlr)
715 self._ext_intf.append(etuple)
716
717 self._log.debug("Created external interface tuple : %s", etuple)
718
719 # Resolve the networks associated internal interfaces
720 for intf in self._vdud.internal_interface:
721 cp_id = intf.vdu_internal_connection_point_ref
722 self._log.debug("Resolving internal interface name [%s], cp[%s]",
723 intf.name, cp_id)
724
725 try:
726 vlr = find_internal_vlr_by_cp_name(cp_id)
727 except Exception as e:
728 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
729 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
730 raise VduRecordError(msg)
731
732 ituple = (intf, cp_id, vlr)
733 self._int_intf.append(ituple)
734
735 self._log.debug("Created internal interface tuple : %s", ituple)
736
737 resmgr_path = self.resmgr_path
738 resmgr_msg = self.resmgr_msg(config)
739
740 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
741 block.add_query_create(resmgr_path, resmgr_msg)
742
743 res_iter = yield from block.execute(now=True)
744
745 resp = None
746
747 for i in res_iter:
748 r = yield from i
749 resp = r.result
750
751 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
752 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
753 self._log.debug("Got vm request response: %s", resp.resource_info)
754 return resp.resource_info
755
756 @asyncio.coroutine
757 def delete_resource(self, xact):
758 block = xact.block_create()
759
760 self._log.debug("Executing vm request id: %s, action: delete",
761 self._request_id)
762
763 block.add_query_delete(self.resmgr_path)
764
765 yield from block.execute(flags=0, now=True)
766
767 @asyncio.coroutine
768 def read_resource(self, xact):
769 block = xact.block_create()
770
771 self._log.debug("Executing vm request id: %s, action: delete",
772 self._request_id)
773
774 block.add_query_read(self.resmgr_path)
775
776 res_iter = yield from block.execute(flags=0, now=True)
777 for i in res_iter:
778 r = yield from i
779 resp = r.result
780
781 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
782 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
783 self._log.debug("Got vm request response: %s", resp.resource_info)
784 #self._vm_resp = resp.resource_info
785 return resp.resource_info
786
787
788 @asyncio.coroutine
789 def start_component(self):
790 """ This VDUR is active """
791 self._log.debug("Starting component %s for vdud %s vdur %s",
792 self._vdud.vcs_component_ref,
793 self._vdud,
794 self._vdur_id)
795 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
796 self.vm_resp.management_ip)
797
798 @property
799 def active(self):
800 """ Is this VDU active """
801 return True if self._state is VDURecordState.READY else False
802
803 @asyncio.coroutine
804 def instantiation_failed(self, failed_reason=None):
805 """ VDU instantiation failed """
806 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
807 self._state = VDURecordState.FAILED
808 self._state_failed_reason = failed_reason
809 yield from self._vnfr.instantiation_failed(failed_reason)
810
811 @asyncio.coroutine
812 def vdu_is_active(self):
813 """ This VDU is active"""
814 if self.active:
815 self._log.warning("VDU %s was already marked as active", self._vdur_id)
816 return
817
818 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
819
820 if self._vdud.vcs_component_ref is not None:
821 yield from self.start_component()
822
823 self._state = VDURecordState.READY
824
825 if self._vnfr.all_vdus_active():
826 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
827 yield from self._vnfr.is_ready()
828
829 @asyncio.coroutine
830 def instantiate(self, xact, vnfr, config=None):
831 """ Instantiate this VDU """
832 self._state = VDURecordState.INSTANTIATING
833
834 @asyncio.coroutine
835 def on_prepare(xact_info, query_action, ks_path, msg):
836 """ This VDUR is active """
837 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
838 query_action,
839 ks_path,
840 msg)
841
842 if (query_action == rwdts.QueryAction.UPDATE or
843 query_action == rwdts.QueryAction.CREATE):
844 self._vm_resp = msg
845
846 if msg.resource_state == "active":
847 # Move this VDU to ready state
848 yield from self.vdu_is_active()
849 elif msg.resource_state == "failed":
850 yield from self.instantiation_failed(msg.resource_errors)
851 elif query_action == rwdts.QueryAction.DELETE:
852 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
853 else:
854 raise NotImplementedError(
855 "%s action on VirtualDeployementUnitRecord not supported",
856 query_action)
857
858 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
859
860 try:
861 reg_event = asyncio.Event(loop=self._loop)
862
863 @asyncio.coroutine
864 def on_ready(regh, status):
865 reg_event.set()
866
867 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
868 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
869 flags=rwdts.Flag.SUBSCRIBER,
870 handler=handler)
871 yield from reg_event.wait()
872
873 vm_resp = yield from self.create_resource(xact, vnfr, config)
874 self._vm_resp = vm_resp
875
876 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
877 self._log.debug("Requested VM from resource manager response %s",
878 vm_resp)
879 if vm_resp.resource_state == "active":
880 self._log.debug("Resourcemgr responded wih an active vm resp %s",
881 vm_resp)
882 yield from self.vdu_is_active()
883 self._state = VDURecordState.READY
884 elif (vm_resp.resource_state == "pending" or
885 vm_resp.resource_state == "inactive"):
886 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
887 vm_resp)
888 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
889 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
890 # flags=rwdts.Flag.SUBSCRIBER,
891 # handler=handler)
892 else:
893 self._log.debug("Resourcemgr responded wih an error vm resp %s",
894 vm_resp)
895 raise VirtualDeploymentUnitRecordError(
896 "Failed VDUR instantiation %s " % vm_resp)
897
898 except Exception as e:
899 import traceback
900 traceback.print_exc()
901 self._log.exception(e)
902 self._log.error("Instantiation of VDU record failed: %s", str(e))
903 self._state = VDURecordState.FAILED
904 yield from self.instantiation_failed(str(e))
905
906
907 class VlRecordState(enum.Enum):
908 """ VL Record State """
909 INIT = 101
910 INSTANTIATION_PENDING = 102
911 ACTIVE = 103
912 TERMINATE_PENDING = 104
913 TERMINATED = 105
914 FAILED = 106
915
916
917 class InternalVirtualLinkRecord(object):
918 """ Internal Virtual Link record """
919 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
920 self._dts = dts
921 self._log = log
922 self._loop = loop
923 self._ivld_msg = ivld_msg
924 self._vnfr_name = vnfr_name
925 self._cloud_account_name = cloud_account_name
926
927 self._vlr_req = self.create_vlr()
928 self._vlr = None
929 self._state = VlRecordState.INIT
930
931 @property
932 def vlr_id(self):
933 """ Find VLR by id """
934 return self._vlr_req.id
935
936 @property
937 def name(self):
938 """ Name of this VL """
939 return self._vnfr_name + "." + self._ivld_msg.name
940
941 @property
942 def network_id(self):
943 """ Find VLR by id """
944 return self._vlr.network_id if self._vlr else None
945
946 def vlr_path(self):
947 """ VLR path for this VLR instance"""
948 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
949
950 def create_vlr(self):
951 """ Create the VLR record which will be instantiated """
952
953 vld_fields = ["short_name",
954 "vendor",
955 "description",
956 "version",
957 "type_yang",
958 "provider_network"]
959
960 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
961
962 vlr_dict = {"id": str(uuid.uuid4()),
963 "name": self.name,
964 "cloud_account": self._cloud_account_name,
965 }
966 vlr_dict.update(vld_copy_dict)
967
968 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
969 return vlr
970
971 @asyncio.coroutine
972 def instantiate(self, xact, restart_mode=False):
973 """ Instantiate VL """
974
975 @asyncio.coroutine
976 def instantiate_vlr():
977 """ Instantiate VLR"""
978 self._log.debug("Create VL with xpath %s and vlr %s",
979 self.vlr_path(), self._vlr_req)
980
981 with self._dts.transaction(flags=0) as xact:
982 block = xact.block_create()
983 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
984 self._log.debug("Executing VL create path:%s msg:%s",
985 self.vlr_path(), self._vlr_req)
986
987 res_iter = None
988 try:
989 res_iter = yield from block.execute()
990 except Exception:
991 self._state = VlRecordState.FAILED
992 self._log.exception("Caught exception while instantial VL")
993 raise
994
995 for ent in res_iter:
996 res = yield from ent
997 self._vlr = res.result
998
999 if self._vlr.operational_status == 'failed':
1000 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1001 self._state = VlRecordState.FAILED
1002 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1003
1004 self._log.info("Created VL with xpath %s and vlr %s",
1005 self.vlr_path(), self._vlr)
1006
1007 @asyncio.coroutine
1008 def get_vlr():
1009 """ Get the network id """
1010 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1011 vlr = None
1012 for ent in res_iter:
1013 res = yield from ent
1014 vlr = res.result
1015
1016 if vlr is None:
1017 err = "Failed to get VLR for path %s" % self.vlr_path()
1018 self._log.warn(err)
1019 raise InternalVirtualLinkRecordError(err)
1020 return vlr
1021
1022 self._state = VlRecordState.INSTANTIATION_PENDING
1023
1024 if restart_mode:
1025 vl = yield from get_vlr()
1026 if vl is None:
1027 yield from instantiate_vlr()
1028 else:
1029 yield from instantiate_vlr()
1030
1031 self._state = VlRecordState.ACTIVE
1032
1033 def vlr_in_vns(self):
1034 """ Is there a VLR record in VNS """
1035 if (self._state == VlRecordState.ACTIVE or
1036 self._state == VlRecordState.INSTANTIATION_PENDING or
1037 self._state == VlRecordState.FAILED):
1038 return True
1039
1040 return False
1041
1042 @asyncio.coroutine
1043 def terminate(self, xact):
1044 """Terminate this VL """
1045 if not self.vlr_in_vns():
1046 self._log.debug("Ignoring terminate request for id %s in state %s",
1047 self.vlr_id, self._state)
1048 return
1049
1050 self._log.debug("Terminating VL with path %s", self.vlr_path())
1051 self._state = VlRecordState.TERMINATE_PENDING
1052 block = xact.block_create()
1053 block.add_query_delete(self.vlr_path())
1054 yield from block.execute(flags=0, now=True)
1055 self._state = VlRecordState.TERMINATED
1056 self._log.debug("Terminated VL with path %s", self.vlr_path())
1057
1058
1059 class VirtualNetworkFunctionRecord(object):
1060 """ Virtual Network Function Record """
1061 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg):
1062 self._dts = dts
1063 self._log = log
1064 self._loop = loop
1065 self._cluster_name = cluster_name
1066 self._vnfr_msg = vnfr_msg
1067 self._vnfr_id = vnfr_msg.id
1068 self._vnfd_id = vnfr_msg.vnfd_ref
1069 self._vnfm = vnfm
1070 self._vcs_handler = vcs_handler
1071 self._vnfr = vnfr_msg
1072
1073 self._vnfd = None
1074 self._state = VirtualNetworkFunctionRecordState.INIT
1075 self._state_failed_reason = None
1076 self._ext_vlrs = {} # The list of external virtual links
1077 self._vlrs = [] # The list of internal virtual links
1078 self._vdus = [] # The list of vdu
1079 self._vlr_by_cp = {}
1080 self._cprs = []
1081 self._inventory = {}
1082 self._create_time = int(time.time())
1083 self._vnf_mon = None
1084 self._config_status = vnfr_msg.config_status
1085 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1086
1087 def _get_vdur_from_vdu_id(self, vdu_id):
1088 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1089 self._log.debug("Searching through vdus: %s", self._vdus)
1090 for vdu in self._vdus:
1091 self._log.debug("vdu_id: %s", vdu.vdu_id)
1092 if vdu.vdu_id == vdu_id:
1093 return vdu
1094
1095 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1096
1097 @property
1098 def operational_status(self):
1099 """ Operational status of this VNFR """
1100 op_status_map = {"INIT": "init",
1101 "VL_INIT_PHASE": "vl_init_phase",
1102 "VM_INIT_PHASE": "vm_init_phase",
1103 "READY": "running",
1104 "TERMINATE": "terminate",
1105 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1106 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1107 "TERMINATED": "terminated",
1108 "FAILED": "failed", }
1109 return op_status_map[self._state.name]
1110
1111 @property
1112 def vnfd_xpath(self):
1113 """ VNFD xpath associated with this VNFR """
1114 return("C,/vnfd:vnfd-catalog/"
1115 "vnfd:vnfd[vnfd:id = '{}']".format(self._vnfd_id))
1116
1117 @property
1118 def vnfd(self):
1119 """ VNFD for this VNFR """
1120 return self._vnfd
1121
1122 @property
1123 def vnf_name(self):
1124 """ VNFD name associated with this VNFR """
1125 return self.vnfd.name
1126
1127 @property
1128 def name(self):
1129 """ Name of this VNF in the record """
1130 return self._vnfr.name
1131
1132 @property
1133 def cloud_account_name(self):
1134 """ Name of the cloud account this VNFR is instantiated in """
1135 return self._vnfr.cloud_account
1136
1137 @property
1138 def vnfd_id(self):
1139 """ VNFD Id associated with this VNFR """
1140 return self.vnfd.id
1141
1142 @property
1143 def vnfr_id(self):
1144 """ VNFR Id associated with this VNFR """
1145 return self._vnfr_id
1146
1147 @property
1148 def member_vnf_index(self):
1149 """ Member VNF index associated with this VNFR """
1150 return self._vnfr.member_vnf_index_ref
1151
1152 @property
1153 def config_status(self):
1154 """ Config agent status for this VNFR """
1155 return self._config_status
1156
1157 def component_by_name(self, component_name):
1158 """ Find a component by name in the inventory list"""
1159 mangled_name = VcsComponent.mangle_name(component_name,
1160 self.vnf_name,
1161 self.vnfd_id)
1162 return self._inventory[mangled_name]
1163
1164
1165
1166 @asyncio.coroutine
1167 def get_nsr_config(self):
1168 ### Need access to NS instance configuration for runtime resolution.
1169 ### This shall be replaced when deployment flavors are implemented
1170 xpath = "C,/nsr:ns-instance-config"
1171 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1172
1173 for result in results:
1174 entry = yield from result
1175 ns_instance_config = entry.result
1176 for nsr in ns_instance_config.nsr:
1177 if nsr.id == self._vnfr_msg.nsr_id_ref:
1178 return nsr
1179 return None
1180
1181 @asyncio.coroutine
1182 def start_component(self, component_name, ip_addr):
1183 """ Start a component in the VNFR by name """
1184 comp = self.component_by_name(component_name)
1185 yield from comp.start(None, None, ip_addr)
1186
1187 def cp_ip_addr(self, cp_name):
1188 """ Get ip address for connection point """
1189 self._log.debug("cp_ip_addr()")
1190 for cp in self._cprs:
1191 if cp.name == cp_name and cp.ip_address is not None:
1192 return cp.ip_address
1193 return "0.0.0.0"
1194
1195 def mgmt_intf_info(self):
1196 """ Get Management interface info for this VNFR """
1197 mgmt_intf_desc = self.vnfd.msg.mgmt_interface
1198 ip_addr = None
1199 if mgmt_intf_desc.has_field("cp"):
1200 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1201 elif mgmt_intf_desc.has_field("vdu_id"):
1202 try:
1203 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1204 ip_addr = vdur.management_ip
1205 except VDURecordNotFound:
1206 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1207 ip_addr = None
1208 else:
1209 ip_addr = mgmt_intf_desc.ip_address
1210 port = mgmt_intf_desc.port
1211
1212 return ip_addr, port
1213
1214 @property
1215 def msg(self):
1216 """ Message associated with this VNFR """
1217 vnfd_fields = ["short_name", "vendor", "description", "version"]
1218 vnfd_copy_dict = {k: v for k, v in self.vnfd.msg.as_dict().items() if k in vnfd_fields}
1219
1220 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1221 ip_address, port = self.mgmt_intf_info()
1222
1223 if ip_address is not None:
1224 mgmt_intf.ip_address = ip_address
1225 if port is not None:
1226 mgmt_intf.port = port
1227
1228 vnfr_dict = {"id": self._vnfr_id,
1229 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1230 "name": self.name,
1231 "member_vnf_index_ref": self.member_vnf_index,
1232 "vnfd_ref": self.vnfd_id,
1233 "operational_status": self.operational_status,
1234 "operational_status_details": self._state_failed_reason,
1235 "cloud_account": self.cloud_account_name,
1236 "config_status": self._config_status
1237 }
1238
1239 vnfr_dict.update(vnfd_copy_dict)
1240
1241 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1242 vnfr_msg.uptime = int(time.time()) - self._create_time
1243 vnfr_msg.mgmt_interface = mgmt_intf
1244
1245 # Add all the VLRs to VNFR
1246 for vlr in self._vlrs:
1247 ivlr = vnfr_msg.internal_vlr.add()
1248 ivlr.vlr_ref = vlr.vlr_id
1249
1250 # Add all the VDURs to VDUR
1251 if self._vdus is not None:
1252 for vdu in self._vdus:
1253 vdur = vnfr_msg.vdur.add()
1254 vdur.from_dict(vdu.msg.as_dict())
1255
1256 if self.vnfd.msg.mgmt_interface.has_field('dashboard_params'):
1257 vnfr_msg.dashboard_url = self.dashboard_url
1258
1259 for cpr in self._cprs:
1260 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1261 vnfr_msg.connection_point.append(new_cp)
1262
1263 if self._vnf_mon is not None:
1264 for monp in self._vnf_mon.msg:
1265 vnfr_msg.monitoring_param.append(
1266 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1267
1268 if self._vnfr.vnf_configuration is not None:
1269 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1270 if (ip_address is not None and
1271 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1272 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1273
1274 for group in self._vnfr_msg.placement_groups_info:
1275 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1276 group_info.from_dict(group.as_dict())
1277 vnfr_msg.placement_groups_info.append(group_info)
1278
1279 return vnfr_msg
1280
1281 @property
1282 def dashboard_url(self):
1283 ip, cfg_port = self.mgmt_intf_info()
1284 protocol = 'http'
1285 http_port = 80
1286 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('https'):
1287 if self.vnfd.msg.mgmt_interface.dashboard_params.https is True:
1288 protocol = 'https'
1289 http_port = 443
1290 if self.vnfd.msg.mgmt_interface.dashboard_params.has_field('port'):
1291 http_port = self.vnfd.msg.mgmt_interface.dashboard_params.port
1292
1293 url = "{protocol}://{ip_address}:{port}/{path}".format(
1294 protocol=protocol,
1295 ip_address=ip,
1296 port=http_port,
1297 path=self.vnfd.msg.mgmt_interface.dashboard_params.path.lstrip("/"),
1298 )
1299
1300 return url
1301
1302 @property
1303 def xpath(self):
1304 """ path for this VNFR """
1305 return("D,/vnfr:vnfr-catalog"
1306 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1307
1308 @asyncio.coroutine
1309 def publish(self, xact):
1310 """ publish this VNFR """
1311 vnfr = self.msg
1312 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1313 self.xpath, self.msg)
1314 vnfr.create_time = self._create_time
1315 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1316 self._log.debug("Published VNFR path = [%s], record = [%s]",
1317 self.xpath, self.msg)
1318
1319 @asyncio.coroutine
1320 def create_vls(self):
1321 """ Publish The VLs associated with this VNF """
1322 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1323 self.vnfd_id)
1324 for ivld_msg in self.vnfd.msg.internal_vld:
1325 self._log.debug("Creating internal vld:"
1326 " %s, int_cp_ref = %s",
1327 ivld_msg, ivld_msg.internal_connection_point
1328 )
1329 vlr = InternalVirtualLinkRecord(dts=self._dts,
1330 log=self._log,
1331 loop=self._loop,
1332 ivld_msg=ivld_msg,
1333 vnfr_name=self.name,
1334 cloud_account_name=self.cloud_account_name
1335 )
1336 self._vlrs.append(vlr)
1337
1338 for int_cp in ivld_msg.internal_connection_point:
1339 if int_cp.id_ref in self._vlr_by_cp:
1340 msg = ("Connection point %s already "
1341 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1342 raise InternalVirtualLinkRecordError(msg)
1343 self._log.debug("Setting vlr %s to internal cp = %s",
1344 vlr, int_cp.id_ref)
1345 self._vlr_by_cp[int_cp.id_ref] = vlr
1346
1347 @asyncio.coroutine
1348 def instantiate_vls(self, xact, restart_mode=False):
1349 """ Instantiate the VLs associated with this VNF """
1350 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1351 self.vnfd_id)
1352
1353 for vlr in self._vlrs:
1354 self._log.debug("Instantiating VLR %s", vlr)
1355 yield from vlr.instantiate(xact, restart_mode)
1356
1357 def find_vlr_by_cp(self, cp_name):
1358 """ Find the VLR associated with the cp name """
1359 return self._vlr_by_cp[cp_name]
1360
1361 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1362 """
1363 Returns the cloud specific construct for placement group
1364 Arguments:
1365 input_group: VNFD PlacementGroup
1366 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1367 """
1368 copy_dict = ['name', 'requirement', 'strategy']
1369 for group_info in nsr_config.vnfd_placement_group_maps:
1370 if group_info.placement_group_ref == input_group.name and \
1371 group_info.vnfd_id_ref == self.vnfd_id:
1372 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1373 group_dict = {k:v for k,v in
1374 group_info.as_dict().items()
1375 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1376 for param in copy_dict:
1377 group_dict.update({param: getattr(input_group, param)})
1378 group.from_dict(group_dict)
1379 return group
1380 return None
1381
1382 @asyncio.coroutine
1383 def get_vdu_placement_groups(self, vdu):
1384 placement_groups = []
1385 ### Step-1: Get VNF level placement groups
1386 for group in self._vnfr_msg.placement_groups_info:
1387 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1388 #group_info.from_dict(group.as_dict())
1389 placement_groups.append(group)
1390
1391 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1392 nsr_config = yield from self.get_nsr_config()
1393
1394 ### Step-3: Get VDU level placement groups
1395 for group in self.vnfd.msg.placement_groups:
1396 for member_vdu in group.member_vdus:
1397 if member_vdu.member_vdu_ref == vdu.id:
1398 group_info = self.resolve_placement_group_cloud_construct(group,
1399 nsr_config)
1400 if group_info is None:
1401 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1402 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1403 else:
1404 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1405 str(group_info),
1406 vdu.name,
1407 self.vnf_name,
1408 self.member_vnf_index)
1409 placement_groups.append(group_info)
1410
1411 return placement_groups
1412
1413 @asyncio.coroutine
1414 def create_vdus(self, vnfr, restart_mode=False):
1415 """ Create the VDUs associated with this VNF """
1416
1417 def get_vdur_id(vdud):
1418 """Get the corresponding VDUR's id for the VDUD. This is useful in
1419 case of a restart.
1420
1421 In restart mode we check for exiting VDUR's ID and use them, if
1422 available. This way we don't end up creating duplicate VDURs
1423 """
1424 vdur_id = None
1425
1426 if restart_mode and vdud is not None:
1427 try:
1428 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1429 vdur_id = vdur[0]
1430 except IndexError:
1431 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1432
1433 return vdur_id
1434
1435
1436 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1437 for vdu in self.vnfd.msg.vdu:
1438 self._log.debug("Creating vdu: %s", vdu)
1439 vdur_id = get_vdur_id(vdu)
1440
1441 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1442 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1443 vdu.name,
1444 self.vnf_name,
1445 self.member_vnf_index,
1446 [ group.name for group in placement_groups])
1447
1448 vdur = VirtualDeploymentUnitRecord(
1449 dts=self._dts,
1450 log=self._log,
1451 loop=self._loop,
1452 vdud=vdu,
1453 vnfr=vnfr,
1454 mgmt_intf=self.has_mgmt_interface(vdu),
1455 cloud_account_name=self.cloud_account_name,
1456 vnfd_package_store=self._vnfd_package_store,
1457 vdur_id=vdur_id,
1458 placement_groups = placement_groups,
1459 )
1460 yield from vdur.vdu_opdata_register()
1461
1462 self._vdus.append(vdur)
1463
1464 @asyncio.coroutine
1465 def instantiate_vdus(self, xact, vnfr):
1466 """ Instantiate the VDUs associated with this VNF """
1467 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1468
1469 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1470
1471 # Identify any dependencies among the VDUs
1472 dependencies = collections.defaultdict(list)
1473 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1474
1475 for vdu in self._vdus:
1476 if vdu.vdud_cloud_init is not None:
1477 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1478 if vdu_id != vdu.vdu_id:
1479 # This means that vdu.vdu_id depends upon vdu_id,
1480 # i.e. vdu_id must be instantiated before
1481 # vdu.vdu_id.
1482 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1483
1484 # Define the terminal states of VDU instantiation
1485 terminal = (
1486 VDURecordState.READY,
1487 VDURecordState.TERMINATED,
1488 VDURecordState.FAILED,
1489 )
1490
1491 datastore = VdurDatastore()
1492 processed = set()
1493
1494 @asyncio.coroutine
1495 def instantiate_monitor(vdu):
1496 """Monitor the state of the VDU during instantiation
1497
1498 Arguments:
1499 vdu - a VirtualDeploymentUnitRecord
1500
1501 """
1502 # wait for the VDUR to enter a terminal state
1503 while vdu._state not in terminal:
1504 yield from asyncio.sleep(1, loop=self._loop)
1505
1506 # update the datastore
1507 datastore.update(vdu)
1508
1509 # add the VDU to the set of processed VDUs
1510 processed.add(vdu.vdu_id)
1511
1512 @asyncio.coroutine
1513 def instantiate(vdu):
1514 """Instantiate the specified VDU
1515
1516 Arguments:
1517 vdu - a VirtualDeploymentUnitRecord
1518
1519 Raises:
1520 if the VDU, or any of the VDUs this VDU depends upon, are
1521 terminated or fail to instantiate properly, a
1522 VirtualDeploymentUnitRecordError is raised.
1523
1524 """
1525 for dependency in dependencies[vdu.vdu_id]:
1526 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1527
1528 while dependency.vdu_id not in processed:
1529 yield from asyncio.sleep(1, loop=self._loop)
1530
1531 if not dependency.active:
1532 raise VirtualDeploymentUnitRecordError()
1533
1534 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1535
1536 # Populate the datastore with the current values of the VDU
1537 datastore.add(vdu)
1538
1539 # Substitute any variables contained in the cloud config script
1540 config = str(vdu.vdud_cloud_init)
1541
1542 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1543 if len(parts) > 1:
1544
1545 # Extract the variable names
1546 variables = list()
1547 for variable in parts[1::2]:
1548 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1549
1550 # Iterate of the variables and substitute values from the
1551 # datastore.
1552 for variable in variables:
1553
1554 # Handle a reference to a VDU by ID
1555 if variable.startswith('vdu['):
1556 value = datastore.get(variable)
1557 if value is None:
1558 msg = "Unable to find a substitute for {} in {} cloud-init script"
1559 raise ValueError(msg.format(variable, vdu.vdu_id))
1560
1561 config = config.replace("{{ %s }}" % variable, value)
1562 continue
1563
1564 # Handle a reference to the current VDU
1565 if variable.startswith('vdu'):
1566 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1567 config = config.replace("{{ %s }}" % variable, value)
1568 continue
1569
1570 # Handle unrecognized variables
1571 msg = 'unrecognized cloud-config variable: {}'
1572 raise ValueError(msg.format(variable))
1573
1574 # Instantiate the VDU
1575 with self._dts.transaction() as xact:
1576 self._log.debug("Instantiating vdu: %s", vdu)
1577 yield from vdu.instantiate(xact, vnfr, config=config)
1578 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1579 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1580 self.vnfr_id, vdu)
1581
1582 # First create a set of tasks to monitor the state of the VDUs and
1583 # report when they have entered a terminal state
1584 for vdu in self._vdus:
1585 self._loop.create_task(instantiate_monitor(vdu))
1586
1587 for vdu in self._vdus:
1588 self._loop.create_task(instantiate(vdu))
1589
1590 def has_mgmt_interface(self, vdu):
1591 # ## TODO: Support additional mgmt_interface type options
1592 if self.vnfd.msg.mgmt_interface.vdu_id == vdu.id:
1593 return True
1594 return False
1595
1596 def vlr_xpath(self, vlr_id):
1597 """ vlr xpath """
1598 return(
1599 "D,/vlr:vlr-catalog/"
1600 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1601
1602 def ext_vlr_by_id(self, vlr_id):
1603 """ find ext vlr by id """
1604 return self._ext_vlrs[vlr_id]
1605
1606 @asyncio.coroutine
1607 def publish_inventory(self, xact):
1608 """ Publish the inventory associated with this VNF """
1609 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1610
1611 for component in self.vnfd.msg.component:
1612 self._log.debug("Creating inventory component %s", component)
1613 mangled_name = VcsComponent.mangle_name(component.component_name,
1614 self.vnf_name,
1615 self.vnfd_id
1616 )
1617 comp = VcsComponent(dts=self._dts,
1618 log=self._log,
1619 loop=self._loop,
1620 cluster_name=self._cluster_name,
1621 vcs_handler=self._vcs_handler,
1622 component=component,
1623 mangled_name=mangled_name,
1624 )
1625 if comp.name in self._inventory:
1626 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1627 component, self._vnfd_id)
1628 return
1629 self._log.debug("Adding component %s for vnrf %s",
1630 comp.name, self._vnfr_id)
1631 self._inventory[comp.name] = comp
1632 yield from comp.publish(xact)
1633
1634 def all_vdus_active(self):
1635 """ Are all VDUS in this VNFR active? """
1636 for vdu in self._vdus:
1637 if not vdu.active:
1638 return False
1639
1640 self._log.debug("Inside all_vdus_active. Returning True")
1641 return True
1642
1643 @asyncio.coroutine
1644 def instantiation_failed(self, failed_reason=None):
1645 """ VNFR instantiation failed """
1646 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1647 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1648 self._state_failed_reason = failed_reason
1649
1650 # Update the VNFR with the changed status
1651 yield from self.publish(None)
1652
1653 @asyncio.coroutine
1654 def is_ready(self):
1655 """ This VNF is ready"""
1656 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1657
1658 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1659 self.set_state(VirtualNetworkFunctionRecordState.READY)
1660
1661 else:
1662 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1663
1664 # Update the VNFR with the changed status
1665 yield from self.publish(None)
1666
1667 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1668 """Updated the connection point with ip address"""
1669 for cp in self._cprs:
1670 if cp.name == cp_name:
1671 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1672 cp_name, cp, ip_address, cp_id)
1673 cp.ip_address = ip_address
1674 cp.mac_address = mac_addr
1675 cp.connection_point_id = cp_id
1676 return
1677
1678 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1679 self._log.debug(err)
1680 raise VirtualDeploymentUnitRecordError(err)
1681
1682 def set_state(self, state):
1683 """ Set state for this VNFR"""
1684 self._state = state
1685
1686 @asyncio.coroutine
1687 def instantiate(self, xact, restart_mode=False):
1688 """ instantiate this VNF """
1689 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1690
1691 @asyncio.coroutine
1692 def fetch_vlrs():
1693 """ Fetch VLRs """
1694 # Iterate over all the connection points in VNFR and fetch the
1695 # associated VLRs
1696
1697 def cpr_from_cp(cp):
1698 """ Creates a record level connection point from the desciptor cp"""
1699 cp_fields = ["name", "image", "vm-flavor"]
1700 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1701 cpr_dict = {}
1702 cpr_dict.update(cp_copy_dict)
1703 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1704
1705 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1706 self._vnfr_id, self._vnfr.connection_point)
1707
1708 for cp in self._vnfr.connection_point:
1709 cpr = cpr_from_cp(cp)
1710 self._cprs.append(cpr)
1711 self._log.debug("Adding Connection point record %s ", cp)
1712
1713 vlr_path = self.vlr_xpath(cp.vlr_ref)
1714 self._log.debug("Fetching VLR with path = %s", vlr_path)
1715 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1716 rwdts.XactFlag.MERGE)
1717 for i in res_iter:
1718 r = yield from i
1719 d = r.result
1720 self._ext_vlrs[cp.vlr_ref] = d
1721 cpr.vlr_ref = cp.vlr_ref
1722 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1723
1724 # Fetch the VNFD associated with the VNFR
1725 self._log.debug("VNFR-ID %s: Fetching vnfds", self._vnfr_id)
1726 self._vnfd = yield from self._vnfm.get_vnfd_ref(self._vnfd_id)
1727 self._log.debug("VNFR-ID %s: Fetched vnfd:%s", self._vnfr_id, self._vnfd)
1728
1729 assert self.vnfd is not None
1730
1731 # Fetch External VLRs
1732 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1733 yield from fetch_vlrs()
1734
1735 # Publish inventory
1736 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1737 yield from self.publish_inventory(xact)
1738
1739 # Publish inventory
1740 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1741 yield from self.create_vls()
1742
1743 # publish the VNFR
1744 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1745 yield from self.publish(xact)
1746
1747 # instantiate VLs
1748 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1749 try:
1750 yield from self.instantiate_vls(xact, restart_mode)
1751 except Exception as e:
1752 self._log.exception("VL instantiation failed (%s)", str(e))
1753 yield from self.instantiation_failed(str(e))
1754 return
1755
1756 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1757
1758 # instantiate VDUs
1759 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1760 yield from self.create_vdus(self, restart_mode)
1761
1762 # publish the VNFR
1763 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1764 yield from self.publish(xact)
1765
1766 # instantiate VDUs
1767 # ToDo: Check if this should be prevented during restart
1768 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1769 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1770
1771 # publish the VNFR
1772 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1773 yield from self.publish(xact)
1774
1775 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1776
1777 # create task updating uptime for this vnfr
1778 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1779 self._loop.create_task(self.vnfr_uptime_update(xact))
1780
1781 @asyncio.coroutine
1782 def terminate(self, xact):
1783 """ Terminate this virtual network function """
1784
1785 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1786
1787 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1788
1789 # stop monitoring
1790 if self._vnf_mon is not None:
1791 self._vnf_mon.stop()
1792 self._vnf_mon.deregister()
1793 self._vnf_mon = None
1794
1795 @asyncio.coroutine
1796 def terminate_vls():
1797 """ Terminate VLs in this VNF """
1798 for vl in self._vlrs:
1799 yield from vl.terminate(xact)
1800
1801 @asyncio.coroutine
1802 def terminate_vdus():
1803 """ Terminate VDUS in this VNF """
1804 for vdu in self._vdus:
1805 yield from vdu.terminate(xact)
1806
1807 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1808 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1809 yield from terminate_vls()
1810
1811 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1812 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1813 yield from terminate_vdus()
1814
1815 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1816 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1817
1818 @asyncio.coroutine
1819 def vnfr_uptime_update(self, xact):
1820 while True:
1821 # Return when vnfr state is FAILED or TERMINATED etc
1822 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1823 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1824 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1825 VirtualNetworkFunctionRecordState.READY]:
1826 return
1827 yield from self.publish(xact)
1828 yield from asyncio.sleep(2, loop=self._loop)
1829
1830
1831
1832 class VnfdDtsHandler(object):
1833 """ DTS handler for VNFD config changes """
1834 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1835
1836 def __init__(self, dts, log, loop, vnfm):
1837 self._dts = dts
1838 self._log = log
1839 self._loop = loop
1840 self._vnfm = vnfm
1841 self._regh = None
1842
1843 @asyncio.coroutine
1844 def regh(self):
1845 """ DTS registration handle """
1846 return self._regh
1847
1848 @asyncio.coroutine
1849 def register(self):
1850 """ Register for VNFD configuration"""
1851
1852 def on_apply(dts, acg, xact, action, scratch):
1853 """Apply the configuration"""
1854 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1855 xact, action, scratch)
1856
1857 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1858 # Create/Update a VNFD record
1859 for cfg in self._regh.get_xact_elements(xact):
1860 # Only interested in those VNFD cfgs whose ID was received in prepare callback
1861 if cfg.id in scratch.get('vnfds', []) or is_recovery:
1862 self._vnfm.update_vnfd(cfg)
1863
1864 scratch.pop('vnfds', None)
1865
1866 @asyncio.coroutine
1867 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1868 """ on prepare callback """
1869 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1870 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1871 fref = ProtobufC.FieldReference.alloc()
1872 fref.goto_whole_message(msg.to_pbcm())
1873
1874 # Handle deletes in prepare_callback, but adds/updates in apply_callback
1875 if fref.is_field_deleted():
1876 # Delete an VNFD record
1877 self._log.debug("Deleting VNFD with id %s", msg.id)
1878 if self._vnfm.vnfd_in_use(msg.id):
1879 self._log.debug("Cannot delete VNFD in use - %s", msg)
1880 err = "Cannot delete a VNFD in use - %s" % msg
1881 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1882 # Delete a VNFD record
1883 yield from self._vnfm.delete_vnfd(msg.id)
1884 else:
1885 # Handle actual adds/updates in apply_callback,
1886 # just check if VNFD in use in prepare_callback
1887 if self._vnfm.vnfd_in_use(msg.id):
1888 self._log.debug("Cannot modify an VNFD in use - %s", msg)
1889 err = "Cannot modify an VNFD in use - %s" % msg
1890 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1891
1892 # Add this VNFD to scratch to create/update in apply callback
1893 vnfds = scratch.setdefault('vnfds', [])
1894 vnfds.append(msg.id)
1895
1896 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1897
1898 self._log.debug(
1899 "Registering for VNFD config using xpath: %s",
1900 VnfdDtsHandler.XPATH,
1901 )
1902 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1903 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1904 self._regh = acg.register(
1905 xpath=VnfdDtsHandler.XPATH,
1906 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1907 on_prepare=on_prepare)
1908
1909
1910 class VcsComponentDtsHandler(object):
1911 """ Vcs Component DTS handler """
1912 XPATH = ("D,/rw-manifest:manifest" +
1913 "/rw-manifest:operational-inventory" +
1914 "/rw-manifest:component")
1915
1916 def __init__(self, dts, log, loop, vnfm):
1917 self._dts = dts
1918 self._log = log
1919 self._loop = loop
1920 self._regh = None
1921 self._vnfm = vnfm
1922
1923 @property
1924 def regh(self):
1925 """ DTS registration handle """
1926 return self._regh
1927
1928 @asyncio.coroutine
1929 def register(self):
1930 """ Registers VCS component dts publisher registration"""
1931 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1932 VcsComponentDtsHandler.XPATH)
1933
1934 hdl = rift.tasklets.DTS.RegistrationHandler()
1935 handlers = rift.tasklets.Group.Handler()
1936 with self._dts.group_create(handler=handlers) as group:
1937 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1938 handler=hdl,
1939 flags=(rwdts.Flag.PUBLISHER |
1940 rwdts.Flag.NO_PREP_READ |
1941 rwdts.Flag.DATASTORE),)
1942
1943 @asyncio.coroutine
1944 def publish(self, xact, path, msg):
1945 """ Publishes the VCS component """
1946 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1947 xact, path, msg)
1948 self.regh.create_element(path, msg)
1949 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1950 VcsComponentDtsHandler.XPATH, xact, path, msg)
1951
1952 class VnfrConsoleOperdataDtsHandler(object):
1953 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1954 @property
1955 def vnfr_vdu_console_xpath(self):
1956 """ path for resource-mgr"""
1957 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1958
1959 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1960 self._dts = dts
1961 self._log = log
1962 self._loop = loop
1963 self._regh = None
1964 self._vnfm = vnfm
1965
1966 self._vnfr_id = vnfr_id
1967 self._vdur_id = vdur_id
1968 self._vdu_id = vdu_id
1969
1970 @asyncio.coroutine
1971 def register(self):
1972 """ Register for VNFR VDU Operational Data read from dts """
1973
1974 @asyncio.coroutine
1975 def on_prepare(xact_info, action, ks_path, msg):
1976 """ prepare callback from dts """
1977 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
1978 self._log.debug(
1979 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
1980 xact_info, action, xpath, msg
1981 )
1982
1983 if action == rwdts.QueryAction.READ:
1984 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
1985 path_entry = schema.keyspec_to_entry(ks_path)
1986 self._log.debug("VDU Opdata path is {}".format(path_entry))
1987 try:
1988 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
1989 except VnfRecordError as e:
1990 self._log.error("VNFR id %s not found", self._vnfr_id)
1991 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1992 return
1993 try:
1994 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
1995 if not vdur._state == VDURecordState.READY:
1996 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
1997 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
1998 return
1999 with self._dts.transaction() as new_xact:
2000 resp = yield from vdur.read_resource(new_xact)
2001 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2002 vdur_console.id = self._vdur_id
2003 if resp.console_url:
2004 vdur_console.console_url = resp.console_url
2005 else:
2006 vdur_console.console_url = 'none'
2007 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2008 except Exception:
2009 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2010 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2011 vdur_console.id = self._vdur_id
2012 vdur_console.console_url = 'none'
2013
2014 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2015 xpath=self.vnfr_vdu_console_xpath,
2016 msg=vdur_console)
2017 else:
2018 #raise VnfRecordError("Not supported operation %s" % action)
2019 self._log.error("Not supported operation %s" % action)
2020 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2021 return
2022
2023
2024 self._log.debug("Registering for VNFR VDU using xpath: %s",
2025 self.vnfr_vdu_console_xpath)
2026 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2027 with self._dts.group_create() as group:
2028 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2029 handler=hdl,
2030 flags=rwdts.Flag.PUBLISHER,
2031 )
2032
2033
2034 class VnfrDtsHandler(object):
2035 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2036 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2037
2038 def __init__(self, dts, log, loop, vnfm):
2039 self._dts = dts
2040 self._log = log
2041 self._loop = loop
2042 self._vnfm = vnfm
2043
2044 self._regh = None
2045
2046 @property
2047 def regh(self):
2048 """ Return registration handle"""
2049 return self._regh
2050
2051 @property
2052 def vnfm(self):
2053 """ Return VNF manager instance """
2054 return self._vnfm
2055
2056 @asyncio.coroutine
2057 def register(self):
2058 """ Register for vnfr create/update/delete/read requests from dts """
2059 def on_commit(xact_info):
2060 """ The transaction has been committed """
2061 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2062 return rwdts.MemberRspCode.ACTION_OK
2063
2064 def on_abort(*args):
2065 """ Abort callback """
2066 self._log.debug("VNF transaction got aborted")
2067
2068 @asyncio.coroutine
2069 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2070
2071 @asyncio.coroutine
2072 def instantiate_realloc_vnfr(vnfr):
2073 """Re-populate the vnfm after restart
2074
2075 Arguments:
2076 vlink
2077
2078 """
2079
2080 yield from vnfr.instantiate(None, restart_mode=True)
2081
2082 if xact_event == rwdts.MemberEvent.INSTALL:
2083 curr_cfg = self.regh.elements
2084 for cfg in curr_cfg:
2085 vnfr = self.vnfm.create_vnfr(cfg)
2086 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2087
2088 self._log.debug("Got on_event in vnfm")
2089
2090 return rwdts.MemberRspCode.ACTION_OK
2091
2092 @asyncio.coroutine
2093 def on_prepare(xact_info, action, ks_path, msg):
2094 """ prepare callback from dts """
2095 self._log.debug(
2096 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2097 xact_info, action, msg
2098 )
2099
2100 if action == rwdts.QueryAction.CREATE:
2101 if not msg.has_field("vnfd_ref"):
2102 err = "Vnfd reference not provided"
2103 self._log.error(err)
2104 raise VnfRecordError(err)
2105
2106 vnfr = self.vnfm.create_vnfr(msg)
2107 try:
2108 # RIFT-9105: Unable to add a READ query under an existing transaction
2109 # xact = xact_info.xact
2110 yield from vnfr.instantiate(None)
2111 except Exception as e:
2112 self._log.exception(e)
2113 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2114 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2115 yield from vnfr.publish(None)
2116 elif action == rwdts.QueryAction.DELETE:
2117 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2118 path_entry = schema.keyspec_to_entry(ks_path)
2119 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2120
2121 if vnfr is None:
2122 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2123 raise VirtualNetworkFunctionRecordNotFound(
2124 "VNFR id %s", path_entry.key00.id)
2125
2126 try:
2127 yield from vnfr.terminate(xact_info.xact)
2128 # Unref the VNFD
2129 vnfr.vnfd.unref()
2130 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2131 except Exception as e:
2132 self._log.exception(e)
2133 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2134
2135 elif action == rwdts.QueryAction.UPDATE:
2136 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2137 path_entry = schema.keyspec_to_entry(ks_path)
2138 vnfr = None
2139 try:
2140 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2141 except Exception as e:
2142 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2143 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2144 return
2145
2146 if vnfr is None:
2147 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2148 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2149 return
2150
2151 self._log.debug("VNFR {} update config status {} (current {})".
2152 format(vnfr.name, msg.config_status, vnfr.config_status))
2153 # Update the config status and publish
2154 vnfr._config_status = msg.config_status
2155 yield from vnfr.publish(None)
2156
2157 else:
2158 raise NotImplementedError(
2159 "%s action on VirtualNetworkFunctionRecord not supported",
2160 action)
2161
2162 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2163
2164 self._log.debug("Registering for VNFR using xpath: %s",
2165 VnfrDtsHandler.XPATH,)
2166
2167 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2168 on_prepare=on_prepare,)
2169 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2170 with self._dts.group_create(handler=handlers) as group:
2171 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2172 handler=hdl,
2173 flags=(rwdts.Flag.PUBLISHER |
2174 rwdts.Flag.NO_PREP_READ |
2175 rwdts.Flag.CACHE |
2176 rwdts.Flag.DATASTORE),)
2177
2178 @asyncio.coroutine
2179 def create(self, xact, path, msg):
2180 """
2181 Create a VNFR record in DTS with path and message
2182 """
2183 self._log.debug("Creating VNFR xact = %s, %s:%s",
2184 xact, path, msg)
2185
2186 self.regh.create_element(path, msg)
2187 self._log.debug("Created VNFR xact = %s, %s:%s",
2188 xact, path, msg)
2189
2190 @asyncio.coroutine
2191 def update(self, xact, path, msg):
2192 """
2193 Update a VNFR record in DTS with path and message
2194 """
2195 self._log.debug("Updating VNFR xact = %s, %s:%s",
2196 xact, path, msg)
2197 self.regh.update_element(path, msg)
2198 self._log.debug("Updated VNFR xact = %s, %s:%s",
2199 xact, path, msg)
2200
2201 @asyncio.coroutine
2202 def delete(self, xact, path):
2203 """
2204 Delete a VNFR record in DTS with path and message
2205 """
2206 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2207 self.regh.delete_element(path)
2208 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2209
2210
2211 class VirtualNetworkFunctionDescriptor(object):
2212 """
2213 Virtual Network Function descriptor class
2214 """
2215
2216 def __init__(self, dts, log, loop, vnfm, vnfd):
2217 self._dts = dts
2218 self._log = log
2219 self._loop = loop
2220
2221 self._vnfm = vnfm
2222 self._vnfd = vnfd
2223 self._ref_count = 0
2224
2225 @property
2226 def ref_count(self):
2227 """ Returns the reference count associated with
2228 this Virtual Network Function Descriptor"""
2229 return self._ref_count
2230
2231 @property
2232 def id(self):
2233 """ Returns vnfd id """
2234 return self._vnfd.id
2235
2236 @property
2237 def name(self):
2238 """ Returns vnfd name """
2239 return self._vnfd.name
2240
2241 def in_use(self):
2242 """ Returns whether vnfd is in use or not """
2243 return True if self._ref_count > 0 else False
2244
2245 def ref(self):
2246 """ Take a reference on this object """
2247 self._ref_count += 1
2248 return self._ref_count
2249
2250 def unref(self):
2251 """ Release reference on this object """
2252 if self.ref_count < 1:
2253 msg = ("Unref on a VNFD object - vnfd id %s, ref_count = %s" %
2254 (self.id, self._ref_count))
2255 self._log.critical(msg)
2256 raise VnfRecordError(msg)
2257 self._log.debug("Releasing ref on VNFD %s - curr ref_count:%s",
2258 self.id, self.ref_count)
2259 self._ref_count -= 1
2260 return self._ref_count
2261
2262 @property
2263 def msg(self):
2264 """ Return the message associated with this NetworkServiceDescriptor"""
2265 return self._vnfd
2266
2267 @staticmethod
2268 def path_for_id(vnfd_id):
2269 """ Return path for the passed vnfd_id"""
2270 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
2271
2272 def path(self):
2273 """ Return the path associated with this NetworkServiceDescriptor"""
2274 return VirtualNetworkFunctionDescriptor.path_for_id(self.id)
2275
2276 def update(self, vnfd):
2277 """ Update the Virtual Network Function Descriptor """
2278 if self.in_use():
2279 self._log.error("Cannot update descriptor %s in use refcnt=%d",
2280 self.id, self.ref_count)
2281
2282 # The following loop is added to debug RIFT-13284
2283 for vnf_rec in self._vnfm._vnfrs.values():
2284 if vnf_rec.vnfd_id == self.id:
2285 self._log.error("descriptor %s in used by %s:%s",
2286 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2287 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot update descriptor in use %s" % self.id)
2288 self._vnfd = vnfd
2289
2290 def delete(self):
2291 """ Delete the Virtual Network Function Descriptor """
2292 if self.in_use():
2293 self._log.error("Cannot delete descriptor %s in use refcnt=%d",
2294 self.id)
2295
2296 # The following loop is added to debug RIFT-13284
2297 for vnf_rec in self._vnfm._vnfrs.values():
2298 if vnf_rec.vnfd_id == self.id:
2299 self._log.error("descriptor %s in used by %s:%s",
2300 self.id, vnf_rec.vnfr_id, vnf_rec.msg)
2301 raise VirtualNetworkFunctionDescriptorRefCountExists("Cannot delete descriptor in use %s" % self.id)
2302 self._vnfm.delete_vnfd(self.id)
2303
2304
2305 class VnfdRefCountDtsHandler(object):
2306 """ The VNFD Ref Count DTS handler """
2307 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2308
2309 def __init__(self, dts, log, loop, vnfm):
2310 self._dts = dts
2311 self._log = log
2312 self._loop = loop
2313 self._vnfm = vnfm
2314
2315 self._regh = None
2316
2317 @property
2318 def regh(self):
2319 """ Return registration handle """
2320 return self._regh
2321
2322 @property
2323 def vnfm(self):
2324 """ Return the NS manager instance """
2325 return self._vnfm
2326
2327 @asyncio.coroutine
2328 def register(self):
2329 """ Register for VNFD ref count read from dts """
2330
2331 @asyncio.coroutine
2332 def on_prepare(xact_info, action, ks_path, msg):
2333 """ prepare callback from dts """
2334 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2335 self._log.debug(
2336 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2337 xact_info, action, xpath, msg
2338 )
2339
2340 if action == rwdts.QueryAction.READ:
2341 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2342 path_entry = schema.keyspec_to_entry(ks_path)
2343 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2344 for xpath, msg in vnfd_list:
2345 self._log.debug("Responding to ref count query path:%s, msg:%s",
2346 xpath, msg)
2347 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2348 xpath=xpath,
2349 msg=msg)
2350 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2351 else:
2352 raise VnfRecordError("Not supported operation %s" % action)
2353
2354 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2355 with self._dts.group_create() as group:
2356 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2357 handler=hdl,
2358 flags=rwdts.Flag.PUBLISHER,
2359 )
2360
2361
2362 class VdurDatastore(object):
2363 """
2364 This VdurDatastore is intended to expose select information about a VDUR
2365 such that it can be referenced in a cloud config file. The data that is
2366 exposed does not necessarily follow the structure of the data in the yang
2367 model. This is intentional. The data that are exposed are intended to be
2368 agnostic of the yang model so that changes in the model do not necessarily
2369 require changes to the interface provided to the user. It also means that
2370 the user does not need to be familiar with the RIFT.ware yang models.
2371 """
2372
2373 def __init__(self):
2374 """Create an instance of VdurDatastore"""
2375 self._vdur_data = dict()
2376 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2377
2378 def add(self, vdur):
2379 """Add a new VDUR to the datastore
2380
2381 Arguments:
2382 vdur - a VirtualDeploymentUnitRecord instance
2383
2384 Raises:
2385 A ValueError is raised if the VDUR is (1) None or (2) already in
2386 the datastore.
2387
2388 """
2389 if vdur.vdu_id is None:
2390 raise ValueError('VDURs are required to have an ID')
2391
2392 if vdur.vdu_id in self._vdur_data:
2393 raise ValueError('cannot add a VDUR more than once')
2394
2395 self._vdur_data[vdur.vdu_id] = dict()
2396
2397 def set_if_not_none(key, attr):
2398 if attr is not None:
2399 self._vdur_data[vdur.vdu_id][key] = attr
2400
2401 set_if_not_none('name', vdur._vdud.name)
2402 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2403
2404 def update(self, vdur):
2405 """Update the VDUR information in the datastore
2406
2407 Arguments:
2408 vdur - a GI representation of a VDUR
2409
2410 Raises:
2411 A ValueError is raised if the VDUR is (1) None or (2) already in
2412 the datastore.
2413
2414 """
2415 if vdur.vdu_id is None:
2416 raise ValueError('VNFDs are required to have an ID')
2417
2418 if vdur.vdu_id not in self._vdur_data:
2419 raise ValueError('VNF is not recognized')
2420
2421 def set_or_delete(key, attr):
2422 if attr is None:
2423 if key in self._vdur_data[vdur.vdu_id]:
2424 del self._vdur_data[vdur.vdu_id][key]
2425
2426 else:
2427 self._vdur_data[vdur.vdu_id][key] = attr
2428
2429 set_or_delete('name', vdur._vdud.name)
2430 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2431
2432 def remove(self, vdur_id):
2433 """Remove all of the data associated with specified VDUR
2434
2435 Arguments:
2436 vdur_id - the identifier of a VNFD in the datastore
2437
2438 Raises:
2439 A ValueError is raised if the VDUR is not contained in the
2440 datastore.
2441
2442 """
2443 if vdur_id not in self._vdur_data:
2444 raise ValueError('VNF is not recognized')
2445
2446 del self._vdur_data[vdur_id]
2447
2448 def get(self, expr):
2449 """Retrieve VDUR information from the datastore
2450
2451 An expression should be of the form,
2452
2453 vdu[<id>].<attr>
2454
2455 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2456 the exposed attribute that the user wishes to retrieve.
2457
2458 If the requested data is not available, None is returned.
2459
2460 Arguments:
2461 expr - a string that specifies the data to return
2462
2463 Raises:
2464 A ValueError is raised if the provided expression cannot be parsed.
2465
2466 Returns:
2467 The requested data or None
2468
2469 """
2470 result = self._pattern.match(expr)
2471 if result is None:
2472 raise ValueError('data expression not recognized ({})'.format(expr))
2473
2474 vdur_id, key = result.groups()
2475
2476 if vdur_id not in self._vdur_data:
2477 return None
2478
2479 return self._vdur_data[vdur_id].get(key, None)
2480
2481
2482 class VnfManager(object):
2483 """ The virtual network function manager class """
2484 def __init__(self, dts, log, loop, cluster_name):
2485 self._dts = dts
2486 self._log = log
2487 self._loop = loop
2488 self._cluster_name = cluster_name
2489
2490 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2491 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2492
2493 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2494 self._vnfr_handler,
2495 self._vcs_handler,
2496 VnfdRefCountDtsHandler(dts, log, loop, self)]
2497 self._vnfrs = {}
2498 self._vnfds = {}
2499
2500 @property
2501 def vnfr_handler(self):
2502 """ VNFR dts handler """
2503 return self._vnfr_handler
2504
2505 @property
2506 def vcs_handler(self):
2507 """ VCS dts handler """
2508 return self._vcs_handler
2509
2510 @asyncio.coroutine
2511 def register(self):
2512 """ Register all static DTS handlers """
2513 for hdl in self._dts_handlers:
2514 yield from hdl.register()
2515
2516 @asyncio.coroutine
2517 def run(self):
2518 """ Run this VNFM instance """
2519 self._log.debug("Run VNFManager - registering static DTS handlers""")
2520 yield from self.register()
2521
2522 def get_vnfr(self, vnfr_id):
2523 """ get VNFR by vnfr id """
2524
2525 if vnfr_id not in self._vnfrs:
2526 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2527
2528 return self._vnfrs[vnfr_id]
2529
2530 def create_vnfr(self, vnfr):
2531 """ Create a VNFR instance """
2532 if vnfr.id in self._vnfrs:
2533 msg = "Vnfr id %s already exists" % vnfr.id
2534 self._log.error(msg)
2535 raise VnfRecordError(msg)
2536
2537 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2538 vnfr.id,
2539 vnfr.vnfd_ref)
2540
2541 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2542 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr
2543 )
2544 return self._vnfrs[vnfr.id]
2545
2546 @asyncio.coroutine
2547 def delete_vnfr(self, xact, vnfr):
2548 """ Create a VNFR instance """
2549 if vnfr.vnfr_id in self._vnfrs:
2550 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2551 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2552 del self._vnfrs[vnfr.vnfr_id]
2553
2554 @asyncio.coroutine
2555 def fetch_vnfd(self, vnfd_id):
2556 """ Fetch VNFDs based with the vnfd id"""
2557 vnfd_path = VirtualNetworkFunctionDescriptor.path_for_id(vnfd_id)
2558 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2559 vnfd = None
2560
2561 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2562
2563 for ent in res_iter:
2564 res = yield from ent
2565 vnfd = res.result
2566
2567 if vnfd is None:
2568 err = "Failed to get Vnfd %s" % vnfd_id
2569 self._log.error(err)
2570 raise VnfRecordError(err)
2571
2572 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2573
2574 return vnfd
2575
2576 @asyncio.coroutine
2577 def get_vnfd_ref(self, vnfd_id):
2578 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2579 vnfd = yield from self.get_vnfd(vnfd_id)
2580 vnfd.ref()
2581 return vnfd
2582
2583 @asyncio.coroutine
2584 def get_vnfd(self, vnfd_id):
2585 """ Get Virtual Network Function descriptor for the passed vnfd_id"""
2586 vnfd = None
2587 if vnfd_id not in self._vnfds:
2588 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2589 vnfd = yield from self.fetch_vnfd(vnfd_id)
2590
2591 if vnfd is None:
2592 self._log.error("Cannot find VNFD id:%s", vnfd_id)
2593 raise VirtualNetworkFunctionDescriptorError("Cannot find VNFD id:%s", vnfd_id)
2594
2595 if vnfd.id != vnfd_id:
2596 self._log.error("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2597 raise VirtualNetworkFunctionDescriptorError("Bad Recovery state {} found for {}".format(vnfd.id, vnfd_id))
2598
2599 if vnfd.id not in self._vnfds:
2600 self.create_vnfd(vnfd)
2601
2602 return self._vnfds[vnfd_id]
2603
2604 def vnfd_in_use(self, vnfd_id):
2605 """ Is this VNFD in use """
2606 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2607 if vnfd_id in self._vnfds:
2608 return self._vnfds[vnfd_id].in_use()
2609 return False
2610
2611 @asyncio.coroutine
2612 def publish_vnfr(self, xact, path, msg):
2613 """ Publish a VNFR """
2614 self._log.debug("publish_vnfr called with path %s, msg %s",
2615 path, msg)
2616 yield from self.vnfr_handler.update(xact, path, msg)
2617
2618 def create_vnfd(self, vnfd):
2619 """ Create a virtual network function descriptor """
2620 self._log.debug("Create virtual networkfunction descriptor - %s", vnfd)
2621 if vnfd.id in self._vnfds:
2622 self._log.error("Cannot create VNFD %s -VNFD id already exists", vnfd)
2623 raise VirtualNetworkFunctionDescriptorError("VNFD already exists-%s", vnfd.id)
2624
2625 self._vnfds[vnfd.id] = VirtualNetworkFunctionDescriptor(self._dts,
2626 self._log,
2627 self._loop,
2628 self,
2629 vnfd)
2630 return self._vnfds[vnfd.id]
2631
2632 def update_vnfd(self, vnfd):
2633 """ update the Virtual Network Function descriptor """
2634 self._log.debug("Update virtual network function descriptor - %s", vnfd)
2635
2636 if vnfd.id not in self._vnfds:
2637 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
2638 self.create_vnfd(vnfd)
2639 else:
2640 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
2641 self._vnfds[vnfd.id].update(vnfd)
2642
2643 @asyncio.coroutine
2644 def delete_vnfd(self, vnfd_id):
2645 """ Delete the Virtual Network Function descriptor with the passed id """
2646 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2647 if vnfd_id not in self._vnfds:
2648 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2649 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2650
2651 if self._vnfds[vnfd_id].in_use():
2652 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2653 vnfd_id,
2654 self._vnfds[vnfd_id].ref_count)
2655 raise VirtualNetworkFunctionDescriptorRefCountExists(
2656 "Cannot delete :%s, ref_count:%s",
2657 vnfd_id,
2658 self._vnfds[vnfd_id].ref_count)
2659
2660 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2661 try:
2662 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2663 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2664 if os.path.exists(vnfd_dir):
2665 shutil.rmtree(vnfd_dir, ignore_errors=True)
2666 except Exception as e:
2667 self._log.error("Exception in cleaning up VNFD {}: {}".
2668 format(self._vnfds[vnfd_id].name, e))
2669 self._log.exception(e)
2670
2671 del self._vnfds[vnfd_id]
2672
2673 def vnfd_refcount_xpath(self, vnfd_id):
2674 """ xpath for ref count entry """
2675 return (VnfdRefCountDtsHandler.XPATH +
2676 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2677
2678 @asyncio.coroutine
2679 def get_vnfd_refcount(self, vnfd_id):
2680 """ Get the vnfd_list from this VNFM"""
2681 vnfd_list = []
2682 if vnfd_id is None or vnfd_id == "":
2683 for vnfd in self._vnfds.values():
2684 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2685 vnfd_msg.vnfd_id_ref = vnfd.id
2686 vnfd_msg.instance_ref_count = vnfd.ref_count
2687 vnfd_list.append((self.vnfd_refcount_xpath(vnfd.id), vnfd_msg))
2688 elif vnfd_id in self._vnfds:
2689 vnfd_msg.vnfd_id_ref = self._vnfds[vnfd_id].id
2690 vnfd_msg.instance_ref_count = self._vnfds[vnfd_id].ref_count
2691 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2692
2693 return vnfd_list
2694
2695
2696 class VnfmTasklet(rift.tasklets.Tasklet):
2697 """ VNF Manager tasklet class """
2698 def __init__(self, *args, **kwargs):
2699 super(VnfmTasklet, self).__init__(*args, **kwargs)
2700 self.rwlog.set_category("rw-mano-log")
2701 self.rwlog.set_subcategory("vnfm")
2702
2703 self._dts = None
2704 self._vnfm = None
2705
2706 def start(self):
2707 try:
2708 super(VnfmTasklet, self).start()
2709 self.log.info("Starting VnfmTasklet")
2710
2711 self.log.setLevel(logging.DEBUG)
2712
2713 self.log.debug("Registering with dts")
2714 self._dts = rift.tasklets.DTS(self.tasklet_info,
2715 RwVnfmYang.get_schema(),
2716 self.loop,
2717 self.on_dts_state_change)
2718
2719 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2720 except Exception:
2721 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2722 raise
2723
2724 def on_instance_started(self):
2725 """ Task insance started callback """
2726 self.log.debug("Got instance started callback")
2727
2728 def stop(self):
2729 try:
2730 self._dts.deinit()
2731 except Exception:
2732 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2733 raise
2734
2735 @asyncio.coroutine
2736 def init(self):
2737 """ Task init callback """
2738 try:
2739 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2740 assert vm_parent_name is not None
2741 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2742 yield from self._vnfm.run()
2743 except Exception:
2744 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2745 raise
2746
2747 @asyncio.coroutine
2748 def run(self):
2749 """ Task run callback """
2750 pass
2751
2752 @asyncio.coroutine
2753 def on_dts_state_change(self, state):
2754 """Take action according to current dts state to transition
2755 application into the corresponding application state
2756
2757 Arguments
2758 state - current dts state
2759 """
2760 switch = {
2761 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2762 rwdts.State.CONFIG: rwdts.State.RUN,
2763 }
2764
2765 handlers = {
2766 rwdts.State.INIT: self.init,
2767 rwdts.State.RUN: self.run,
2768 }
2769
2770 # Transition application to next state
2771 handler = handlers.get(state, None)
2772 if handler is not None:
2773 yield from handler()
2774
2775 # Transition dts to next state
2776 next_state = switch.get(state, None)
2777 if next_state is not None:
2778 self._dts.handle.set_state(next_state)