RIFT-14562
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self._vdud:
344 return None
345 return os.path.basename(self._vdud.image)
346
347 @property
348 def image_checksum(self):
349 """ name that should be used to lookup the image on the CMP """
350 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
351
352 @property
353 def management_ip(self):
354 if not self.active:
355 return None
356 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
357
358 @property
359 def vm_management_ip(self):
360 if not self.active:
361 return None
362 return self._vm_resp.management_ip
363
364 @property
365 def operational_status(self):
366 """ Operational status of this VDU"""
367 op_stats_dict = {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "READY": "running",
371 "FAILED": "failed",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
374 }
375 return op_stats_dict[self._state.name]
376
377 @property
378 def msg(self):
379 """ VDU message """
380 vdu_fields = ["vm_flavor",
381 "guest_epa",
382 "vswitch_epa",
383 "hypervisor_epa",
384 "host_epa",
385 "volumes",
386 "name"]
387 vdu_copy_dict = {k: v for k, v in
388 self._vdud.as_dict().items() if k in vdu_fields}
389 vdur_dict = {"id": self._vdur_id,
390 "vdu_id_ref": self._vdud.id,
391 "operational_status": self.operational_status,
392 "operational_status_details": self._state_failed_reason,
393 }
394 if self.vm_resp is not None:
395 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
396 "flavor_id": self.vm_resp.flavor_id
397 })
398 if self._vm_resp.has_field('image_id'):
399 vdur_dict.update({ "image_id": self.vm_resp.image_id })
400
401 if self.management_ip is not None:
402 vdur_dict["management_ip"] = self.management_ip
403
404 if self.vm_management_ip is not None:
405 vdur_dict["vm_management_ip"] = self.vm_management_ip
406
407 vdur_dict.update(vdu_copy_dict)
408
409 if self.vm_resp is not None:
410 if self._vm_resp.has_field('volumes'):
411 for opvolume in self._vm_resp.volumes:
412 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
413 if len(vdurvol_data) == 1:
414 vdurvol_data[0]["volume_id"] = opvolume.volume_id
415
416 icp_list = []
417 ii_list = []
418
419 for intf, cp_id, vlr in self._int_intf:
420 cp = self.find_internal_cp_by_cp_id(cp_id)
421
422 icp_list.append({"name": cp.name,
423 "id": cp.id,
424 "type_yang": "VPORT",
425 "ip_address": self.cp_ip_addr(cp.name)})
426
427 ii_list.append({"name": intf.name,
428 "vdur_internal_connection_point_ref": cp.id,
429 "virtual_interface": {}})
430
431 vdur_dict["internal_connection_point"] = icp_list
432 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
433 vdur_dict["internal_interface"] = ii_list
434
435 ei_list = []
436 for intf, cp, vlr in self._ext_intf:
437 ei_list.append({"name": cp.name,
438 "vnfd_connection_point_ref": cp.name,
439 "virtual_interface": {}})
440 self._vnfr.update_cp(cp.name, self.cp_ip_addr(cp.name), self.cp_id(cp.name))
441
442 vdur_dict["external_interface"] = ei_list
443
444 placement_groups = []
445 for group in self._placement_groups:
446 placement_groups.append(group.as_dict())
447 vdur_dict['placement_groups_info'] = placement_groups
448
449 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
450
451 @property
452 def resmgr_path(self):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
455 "/vdu-event" +
456 "/vdu-event-data[event-id='{}']".format(self._request_id))
457
458 @property
459 def vm_flavor_msg(self):
460 """ VM flavor message """
461 flavor = self._vdud.vm_flavor.__class__()
462 flavor.copy_from(self._vdud.vm_flavor)
463
464 return flavor
465
466 @property
467 def vdud_cloud_init(self):
468 """ Return the cloud-init contents for the VDU """
469 if self._vdud_cloud_init is None:
470 self._vdud_cloud_init = self.cloud_init()
471
472 return self._vdud_cloud_init
473
474 def cloud_init(self):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
477 """
478 if self._vdud.cloud_init is not None:
479 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
480 return self._vdud.cloud_init
481 elif self._vdud.cloud_init_file is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
484 filename = self._vdud.cloud_init_file
485 self._vnfd_package_store.refresh()
486 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
487 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
488 try:
489 return cloud_init_extractor.read_script(stored_package, filename)
490 except rift.package.cloud_init.CloudInitExtractionError as e:
491 raise VirtualDeploymentUnitRecordError(e)
492 else:
493 self._log.debug("VDU Instantiation: cloud-init script not provided")
494
495 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
496 host_aggregates = []
497 availability_zones = []
498 server_groups = []
499 for group in self._placement_groups:
500 if group.has_field('host_aggregate'):
501 for aggregate in group.host_aggregate:
502 host_aggregates.append(aggregate.as_dict())
503 if group.has_field('availability_zone'):
504 availability_zones.append(group.availability_zone.as_dict())
505 if group.has_field('server_group'):
506 server_groups.append(group.server_group.as_dict())
507
508 if availability_zones:
509 if len(availability_zones) > 1:
510 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
512 else:
513 vm_create_msg_dict['availability_zone'] = availability_zones[0]
514
515 if server_groups:
516 if len(server_groups) > 1:
517 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
519 else:
520 vm_create_msg_dict['server_group'] = server_groups[0]
521
522 if host_aggregates:
523 vm_create_msg_dict['host_aggregate'] = host_aggregates
524
525 return
526
527 def process_placement_groups(self, vm_create_msg_dict):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self._placement_groups:
530 return
531
532 cloud_set = set([group.cloud_type for group in self._placement_groups])
533 assert len(cloud_set) == 1
534 cloud_type = cloud_set.pop()
535
536 if cloud_type == 'openstack':
537 self.process_openstack_placement_group_construct(vm_create_msg_dict)
538
539 else:
540 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
541 return
542
543 def resmgr_msg(self, config=None):
544 vdu_fields = ["vm_flavor",
545 "guest_epa",
546 "vswitch_epa",
547 "hypervisor_epa",
548 "host_epa"]
549
550 self._log.debug("Creating params based on VDUD: %s", self._vdud)
551 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
552
553 vm_create_msg_dict = {
554 "name": self.name,
555 }
556
557 if self.image_name is not None:
558 vm_create_msg_dict["image_name"] = self.image_name
559
560 if self.image_checksum is not None:
561 vm_create_msg_dict["image_checksum"] = self.image_checksum
562
563 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
564 if self._vdud.has_field('mgmt_vpci'):
565 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
566
567 self._log.debug("VDUD: %s", self._vdud)
568 if config is not None:
569 vm_create_msg_dict['vdu_init'] = {'userdata': config}
570
571 if self._mgmt_network:
572 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
573
574 cp_list = []
575 for intf, cp, vlr in self._ext_intf:
576 cp_info = {"name": cp.name,
577 "virtual_link_id": vlr.network_id,
578 "type_yang": intf.virtual_interface.type_yang}
579
580 try:
581 if cp.static_ip_address:
582 cp_info["static_ip_address"] = cp.static_ip_address
583 except AttributeError as e:
584 pass
585
586 if (intf.virtual_interface.has_field('vpci') and
587 intf.virtual_interface.vpci is not None):
588 cp_info["vpci"] = intf.virtual_interface.vpci
589
590 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
591 cp_info['security_group'] = vlr.ip_profile_params.security_group
592
593 self._log.debug("External CP info {}".format(cp_info))
594 cp_list.append(cp_info)
595
596 for intf, cp_id, vlr in self._int_intf:
597 cp = self.find_internal_cp_by_cp_id(cp_id)
598
599 cp_dict = {"name": cp_id,
600 "virtual_link_id": vlr.network_id,
601 "type_yang": intf.virtual_interface.type_yang}
602
603 if (intf.virtual_interface.has_field('vpci') and
604 intf.virtual_interface.vpci is not None):
605 cp_dict["vpci"] = intf.virtual_interface.vpci
606
607 try:
608 if cp.static_ip_address:
609 cp_dict["static_ip_address"] = cp.static_ip_address
610 except AttributeError as e:
611 pass
612
613 self._log.debug("Internal CP info {}".format(cp_info))
614 cp_list.append(cp_dict)
615
616 vm_create_msg_dict["connection_points"] = cp_list
617 vm_create_msg_dict.update(vdu_copy_dict)
618
619 self.process_placement_groups(vm_create_msg_dict)
620
621 msg = RwResourceMgrYang.VDUEventData()
622 msg.event_id = self._request_id
623 msg.cloud_account = self.cloud_account_name
624 msg.request_info.from_dict(vm_create_msg_dict)
625
626 for volume in self._vdud.volumes:
627 v = msg.request_info.volumes.add()
628 v.from_dict(volume.as_dict())
629 return msg
630
631 @asyncio.coroutine
632 def terminate(self, xact):
633 """ Delete resource in VIM """
634 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
635 self._log.warning("VDU terminate in not ready state - Ignoring request")
636 return
637
638 self._state = VDURecordState.TERMINATING
639 if self._vm_resp is not None:
640 try:
641 with self._dts.transaction() as new_xact:
642 yield from self.delete_resource(new_xact)
643 except Exception:
644 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
645
646 if self._rm_regh is not None:
647 self._log.debug("Deregistering resource manager registration handle")
648 self._rm_regh.deregister()
649 self._rm_regh = None
650
651 if self._vdur_console_handler is not None:
652 self._log.error("Deregistering vnfr vdur registration handle")
653 self._vdur_console_handler._regh.deregister()
654 self._vdur_console_handler._regh = None
655
656 self._state = VDURecordState.TERMINATED
657
658 def find_internal_cp_by_cp_id(self, cp_id):
659 """ Find the CP corresponding to the connection point id"""
660 cp = None
661
662 self._log.debug("find_internal_cp_by_cp_id(%s) called",
663 cp_id)
664
665 for int_cp in self._vdud.internal_connection_point:
666 self._log.debug("Checking for int cp %s in internal connection points",
667 int_cp.id)
668 if int_cp.id == cp_id:
669 cp = int_cp
670 break
671
672 if cp is None:
673 self._log.debug("Failed to find cp %s in internal connection points",
674 cp_id)
675 msg = "Failed to find cp %s in internal connection points" % cp_id
676 raise VduRecordError(msg)
677
678 # return the VLR associated with the connection point
679 return cp
680
681 @asyncio.coroutine
682 def create_resource(self, xact, vnfr, config=None):
683 """ Request resource from ResourceMgr """
684 def find_cp_by_name(cp_name):
685 """ Find a connection point by name """
686 cp = None
687 self._log.debug("find_cp_by_name(%s) called", cp_name)
688 for ext_cp in vnfr._cprs:
689 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
690 if ext_cp.name == cp_name:
691 cp = ext_cp
692 break
693 if cp is None:
694 self._log.debug("Failed to find cp %s in external connection points",
695 cp_name)
696 return cp
697
698 def find_internal_vlr_by_cp_id(cp_id):
699 self._log.debug("find_internal_vlr_by_cp_id(%s) called",
700 cp_id)
701
702 # Validate the cp
703 cp = self.find_internal_cp_by_cp_id(cp_id)
704
705 # return the VLR associated with the connection point
706 return vnfr.find_vlr_by_cp(cp_id)
707
708 block = xact.block_create()
709
710 self._log.debug("Executing vm request id: %s, action: create",
711 self._request_id)
712
713 # Resolve the networks associated external interfaces
714 for ext_intf in self._vdud.external_interface:
715 self._log.debug("Resolving external interface name [%s], cp[%s]",
716 ext_intf.name, ext_intf.vnfd_connection_point_ref)
717 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
718 if cp is None:
719 self._log.debug("Failed to find connection point - %s",
720 ext_intf.vnfd_connection_point_ref)
721 continue
722 self._log.debug("Connection point name [%s], type[%s]",
723 cp.name, cp.type_yang)
724
725 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
726
727 etuple = (ext_intf, cp, vlr)
728 self._ext_intf.append(etuple)
729
730 self._log.debug("Created external interface tuple : %s", etuple)
731
732 # Resolve the networks associated internal interfaces
733 for intf in self._vdud.internal_interface:
734 cp_id = intf.vdu_internal_connection_point_ref
735 self._log.debug("Resolving internal interface name [%s], cp[%s]",
736 intf.name, cp_id)
737
738 try:
739 vlr = find_internal_vlr_by_cp_id(cp_id)
740 except Exception as e:
741 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
742 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
743 raise VduRecordError(msg)
744
745 ituple = (intf, cp_id, vlr)
746 self._int_intf.append(ituple)
747
748 self._log.debug("Created internal interface tuple : %s", ituple)
749
750 resmgr_path = self.resmgr_path
751 resmgr_msg = self.resmgr_msg(config)
752
753 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
754 block.add_query_create(resmgr_path, resmgr_msg)
755
756 res_iter = yield from block.execute(now=True)
757
758 resp = None
759
760 for i in res_iter:
761 r = yield from i
762 resp = r.result
763
764 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
765 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
766 self._log.debug("Got vm request response: %s", resp.resource_info)
767 return resp.resource_info
768
769 @asyncio.coroutine
770 def delete_resource(self, xact):
771 block = xact.block_create()
772
773 self._log.debug("Executing vm request id: %s, action: delete",
774 self._request_id)
775
776 block.add_query_delete(self.resmgr_path)
777
778 yield from block.execute(flags=0, now=True)
779
780 @asyncio.coroutine
781 def read_resource(self, xact):
782 block = xact.block_create()
783
784 self._log.debug("Executing vm request id: %s, action: delete",
785 self._request_id)
786
787 block.add_query_read(self.resmgr_path)
788
789 res_iter = yield from block.execute(flags=0, now=True)
790 for i in res_iter:
791 r = yield from i
792 resp = r.result
793
794 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
795 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
796 self._log.debug("Got vm request response: %s", resp.resource_info)
797 #self._vm_resp = resp.resource_info
798 return resp.resource_info
799
800
801 @asyncio.coroutine
802 def start_component(self):
803 """ This VDUR is active """
804 self._log.debug("Starting component %s for vdud %s vdur %s",
805 self._vdud.vcs_component_ref,
806 self._vdud,
807 self._vdur_id)
808 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
809 self.vm_resp.management_ip)
810
811 @property
812 def active(self):
813 """ Is this VDU active """
814 return True if self._state is VDURecordState.READY else False
815
816 @asyncio.coroutine
817 def instantiation_failed(self, failed_reason=None):
818 """ VDU instantiation failed """
819 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
820 self._state = VDURecordState.FAILED
821 self._state_failed_reason = failed_reason
822 yield from self._vnfr.instantiation_failed(failed_reason)
823
824 @asyncio.coroutine
825 def vdu_is_active(self):
826 """ This VDU is active"""
827 if self.active:
828 self._log.warning("VDU %s was already marked as active", self._vdur_id)
829 return
830
831 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
832
833 if self._vdud.vcs_component_ref is not None:
834 yield from self.start_component()
835
836 self._state = VDURecordState.READY
837
838 if self._vnfr.all_vdus_active():
839 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
840 yield from self._vnfr.is_ready()
841
842 @asyncio.coroutine
843 def instantiate(self, xact, vnfr, config=None):
844 """ Instantiate this VDU """
845 self._state = VDURecordState.INSTANTIATING
846
847 @asyncio.coroutine
848 def on_prepare(xact_info, query_action, ks_path, msg):
849 """ This VDUR is active """
850 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
851 query_action,
852 ks_path,
853 msg)
854
855 if (query_action == rwdts.QueryAction.UPDATE or
856 query_action == rwdts.QueryAction.CREATE):
857 self._vm_resp = msg
858
859 if msg.resource_state == "active":
860 # Move this VDU to ready state
861 yield from self.vdu_is_active()
862 elif msg.resource_state == "failed":
863 yield from self.instantiation_failed(msg.resource_errors)
864 elif query_action == rwdts.QueryAction.DELETE:
865 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
866 else:
867 raise NotImplementedError(
868 "%s action on VirtualDeployementUnitRecord not supported",
869 query_action)
870 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
871
872 try:
873 reg_event = asyncio.Event(loop=self._loop)
874
875 @asyncio.coroutine
876 def on_ready(regh, status):
877 reg_event.set()
878
879 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
880 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
881 flags=rwdts.Flag.SUBSCRIBER,
882 handler=handler)
883 yield from reg_event.wait()
884
885 vm_resp = yield from self.create_resource(xact, vnfr, config)
886 self._vm_resp = vm_resp
887
888 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
889 self._log.debug("Requested VM from resource manager response %s",
890 vm_resp)
891 if vm_resp.resource_state == "active":
892 self._log.debug("Resourcemgr responded wih an active vm resp %s",
893 vm_resp)
894 yield from self.vdu_is_active()
895 self._state = VDURecordState.READY
896 elif (vm_resp.resource_state == "pending" or
897 vm_resp.resource_state == "inactive"):
898 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
899 vm_resp)
900 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
901 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
902 # flags=rwdts.Flag.SUBSCRIBER,
903 # handler=handler)
904 else:
905 self._log.debug("Resourcemgr responded wih an error vm resp %s",
906 vm_resp)
907 raise VirtualDeploymentUnitRecordError(
908 "Failed VDUR instantiation %s " % vm_resp)
909
910 except Exception as e:
911 import traceback
912 traceback.print_exc()
913 self._log.exception(e)
914 self._log.error("Instantiation of VDU record failed: %s", str(e))
915 self._state = VDURecordState.FAILED
916 yield from self.instantiation_failed(str(e))
917
918
919 class VlRecordState(enum.Enum):
920 """ VL Record State """
921 INIT = 101
922 INSTANTIATION_PENDING = 102
923 ACTIVE = 103
924 TERMINATE_PENDING = 104
925 TERMINATED = 105
926 FAILED = 106
927
928
929 class InternalVirtualLinkRecord(object):
930 """ Internal Virtual Link record """
931 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
932 self._dts = dts
933 self._log = log
934 self._loop = loop
935 self._ivld_msg = ivld_msg
936 self._vnfr_name = vnfr_name
937 self._cloud_account_name = cloud_account_name
938
939 self._vlr_req = self.create_vlr()
940 self._vlr = None
941 self._state = VlRecordState.INIT
942
943 @property
944 def vlr_id(self):
945 """ Find VLR by id """
946 return self._vlr_req.id
947
948 @property
949 def name(self):
950 """ Name of this VL """
951 return self._vnfr_name + "." + self._ivld_msg.name
952
953 @property
954 def network_id(self):
955 """ Find VLR by id """
956 return self._vlr.network_id if self._vlr else None
957
958 def vlr_path(self):
959 """ VLR path for this VLR instance"""
960 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
961
962 def create_vlr(self):
963 """ Create the VLR record which will be instantiated """
964
965 vld_fields = ["short_name",
966 "vendor",
967 "description",
968 "version",
969 "type_yang",
970 "provider_network"]
971
972 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
973
974 vlr_dict = {"id": str(uuid.uuid4()),
975 "name": self.name,
976 "cloud_account": self._cloud_account_name,
977 }
978 vlr_dict.update(vld_copy_dict)
979
980 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
981 return vlr
982
983 @asyncio.coroutine
984 def instantiate(self, xact, restart_mode=False):
985 """ Instantiate VL """
986
987 @asyncio.coroutine
988 def instantiate_vlr():
989 """ Instantiate VLR"""
990 self._log.debug("Create VL with xpath %s and vlr %s",
991 self.vlr_path(), self._vlr_req)
992
993 with self._dts.transaction(flags=0) as xact:
994 block = xact.block_create()
995 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
996 self._log.debug("Executing VL create path:%s msg:%s",
997 self.vlr_path(), self._vlr_req)
998
999 res_iter = None
1000 try:
1001 res_iter = yield from block.execute()
1002 except Exception:
1003 self._state = VlRecordState.FAILED
1004 self._log.exception("Caught exception while instantial VL")
1005 raise
1006
1007 for ent in res_iter:
1008 res = yield from ent
1009 self._vlr = res.result
1010
1011 if self._vlr.operational_status == 'failed':
1012 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1013 self._state = VlRecordState.FAILED
1014 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1015
1016 self._log.info("Created VL with xpath %s and vlr %s",
1017 self.vlr_path(), self._vlr)
1018
1019 @asyncio.coroutine
1020 def get_vlr():
1021 """ Get the network id """
1022 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1023 vlr = None
1024 for ent in res_iter:
1025 res = yield from ent
1026 vlr = res.result
1027
1028 if vlr is None:
1029 err = "Failed to get VLR for path %s" % self.vlr_path()
1030 self._log.warn(err)
1031 raise InternalVirtualLinkRecordError(err)
1032 return vlr
1033
1034 self._state = VlRecordState.INSTANTIATION_PENDING
1035
1036 if restart_mode:
1037 vl = yield from get_vlr()
1038 if vl is None:
1039 yield from instantiate_vlr()
1040 else:
1041 yield from instantiate_vlr()
1042
1043 self._state = VlRecordState.ACTIVE
1044
1045 def vlr_in_vns(self):
1046 """ Is there a VLR record in VNS """
1047 if (self._state == VlRecordState.ACTIVE or
1048 self._state == VlRecordState.INSTANTIATION_PENDING or
1049 self._state == VlRecordState.FAILED):
1050 return True
1051
1052 return False
1053
1054 @asyncio.coroutine
1055 def terminate(self, xact):
1056 """Terminate this VL """
1057 if not self.vlr_in_vns():
1058 self._log.debug("Ignoring terminate request for id %s in state %s",
1059 self.vlr_id, self._state)
1060 return
1061
1062 self._log.debug("Terminating VL with path %s", self.vlr_path())
1063 self._state = VlRecordState.TERMINATE_PENDING
1064 block = xact.block_create()
1065 block.add_query_delete(self.vlr_path())
1066 yield from block.execute(flags=0, now=True)
1067 self._state = VlRecordState.TERMINATED
1068 self._log.debug("Terminated VL with path %s", self.vlr_path())
1069
1070
1071 class VirtualNetworkFunctionRecord(object):
1072 """ Virtual Network Function Record """
1073 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1074 self._dts = dts
1075 self._log = log
1076 self._loop = loop
1077 self._cluster_name = cluster_name
1078 self._vnfr_msg = vnfr_msg
1079 self._vnfr_id = vnfr_msg.id
1080 self._vnfd_id = vnfr_msg.vnfd.id
1081 self._vnfm = vnfm
1082 self._vcs_handler = vcs_handler
1083 self._vnfr = vnfr_msg
1084 self._mgmt_network = mgmt_network
1085
1086 self._vnfd = vnfr_msg.vnfd
1087 self._state = VirtualNetworkFunctionRecordState.INIT
1088 self._state_failed_reason = None
1089 self._ext_vlrs = {} # The list of external virtual links
1090 self._vlrs = [] # The list of internal virtual links
1091 self._vdus = [] # The list of vdu
1092 self._vlr_by_cp = {}
1093 self._cprs = []
1094 self._inventory = {}
1095 self._create_time = int(time.time())
1096 self._vnf_mon = None
1097 self._config_status = vnfr_msg.config_status
1098 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1099 self._rw_vnfd = None
1100 self._vnfd_ref_count = 0
1101
1102 def _get_vdur_from_vdu_id(self, vdu_id):
1103 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1104 self._log.debug("Searching through vdus: %s", self._vdus)
1105 for vdu in self._vdus:
1106 self._log.debug("vdu_id: %s", vdu.vdu_id)
1107 if vdu.vdu_id == vdu_id:
1108 return vdu
1109
1110 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1111
1112 @property
1113 def operational_status(self):
1114 """ Operational status of this VNFR """
1115 op_status_map = {"INIT": "init",
1116 "VL_INIT_PHASE": "vl_init_phase",
1117 "VM_INIT_PHASE": "vm_init_phase",
1118 "READY": "running",
1119 "TERMINATE": "terminate",
1120 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1121 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1122 "TERMINATED": "terminated",
1123 "FAILED": "failed", }
1124 return op_status_map[self._state.name]
1125
1126 @staticmethod
1127 def vnfd_xpath(vnfd_id):
1128 """ VNFD xpath associated with this VNFR """
1129 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1130
1131 @property
1132 def vnfd_ref_count(self):
1133 """ Returns the VNFD reference count associated with this VNFR """
1134 return self._vnfd_ref_count
1135
1136 def vnfd_in_use(self):
1137 """ Returns whether vnfd is in use or not """
1138 return True if self._vnfd_ref_count > 0 else False
1139
1140 def vnfd_ref(self):
1141 """ Take a reference on this object """
1142 self._vnfd_ref_count += 1
1143 return self._vnfd_ref_count
1144
1145 def vnfd_unref(self):
1146 """ Release reference on this object """
1147 if self._vnfd_ref_count < 1:
1148 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1149 (self.vnfd.id, self._vnfd_ref_count))
1150 self._log.critical(msg)
1151 raise VnfRecordError(msg)
1152 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1153 self.vnfd.id, self._vnfd_ref_count)
1154 self._vnfd_ref_count -= 1
1155 return self._vnfd_ref_count
1156
1157 @property
1158 def vnfd(self):
1159 """ VNFD for this VNFR """
1160 return self._vnfd
1161
1162 @property
1163 def vnf_name(self):
1164 """ VNFD name associated with this VNFR """
1165 return self.vnfd.name
1166
1167 @property
1168 def name(self):
1169 """ Name of this VNF in the record """
1170 return self._vnfr.name
1171
1172 @property
1173 def cloud_account_name(self):
1174 """ Name of the cloud account this VNFR is instantiated in """
1175 return self._vnfr.cloud_account
1176
1177 @property
1178 def vnfd_id(self):
1179 """ VNFD Id associated with this VNFR """
1180 return self.vnfd.id
1181
1182 @property
1183 def vnfr_id(self):
1184 """ VNFR Id associated with this VNFR """
1185 return self._vnfr_id
1186
1187 @property
1188 def member_vnf_index(self):
1189 """ Member VNF index associated with this VNFR """
1190 return self._vnfr.member_vnf_index_ref
1191
1192 @property
1193 def config_status(self):
1194 """ Config agent status for this VNFR """
1195 return self._config_status
1196
1197 def component_by_name(self, component_name):
1198 """ Find a component by name in the inventory list"""
1199 mangled_name = VcsComponent.mangle_name(component_name,
1200 self.vnf_name,
1201 self.vnfd_id)
1202 return self._inventory[mangled_name]
1203
1204
1205
1206 @asyncio.coroutine
1207 def get_nsr_config(self):
1208 ### Need access to NS instance configuration for runtime resolution.
1209 ### This shall be replaced when deployment flavors are implemented
1210 xpath = "C,/nsr:ns-instance-config"
1211 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1212
1213 for result in results:
1214 entry = yield from result
1215 ns_instance_config = entry.result
1216 for nsr in ns_instance_config.nsr:
1217 if nsr.id == self._vnfr_msg.nsr_id_ref:
1218 return nsr
1219 return None
1220
1221 @asyncio.coroutine
1222 def start_component(self, component_name, ip_addr):
1223 """ Start a component in the VNFR by name """
1224 comp = self.component_by_name(component_name)
1225 yield from comp.start(None, None, ip_addr)
1226
1227 def cp_ip_addr(self, cp_name):
1228 """ Get ip address for connection point """
1229 self._log.debug("cp_ip_addr()")
1230 for cp in self._cprs:
1231 if cp.name == cp_name and cp.ip_address is not None:
1232 return cp.ip_address
1233 return "0.0.0.0"
1234
1235 def mgmt_intf_info(self):
1236 """ Get Management interface info for this VNFR """
1237 mgmt_intf_desc = self.vnfd.mgmt_interface
1238 ip_addr = None
1239 if mgmt_intf_desc.has_field("cp"):
1240 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1241 elif mgmt_intf_desc.has_field("vdu_id"):
1242 try:
1243 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1244 ip_addr = vdur.management_ip
1245 except VDURecordNotFound:
1246 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1247 ip_addr = None
1248 else:
1249 ip_addr = mgmt_intf_desc.ip_address
1250 port = mgmt_intf_desc.port
1251
1252 return ip_addr, port
1253
1254 @property
1255 def msg(self):
1256 """ Message associated with this VNFR """
1257 vnfd_fields = ["short_name", "vendor", "description", "version"]
1258 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1259
1260 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1261 ip_address, port = self.mgmt_intf_info()
1262
1263 if ip_address is not None:
1264 mgmt_intf.ip_address = ip_address
1265 if port is not None:
1266 mgmt_intf.port = port
1267
1268 vnfr_dict = {"id": self._vnfr_id,
1269 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1270 "name": self.name,
1271 "member_vnf_index_ref": self.member_vnf_index,
1272 "operational_status": self.operational_status,
1273 "operational_status_details": self._state_failed_reason,
1274 "cloud_account": self.cloud_account_name,
1275 "config_status": self._config_status
1276 }
1277
1278 vnfr_dict.update(vnfd_copy_dict)
1279
1280 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1281 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1282
1283 vnfr_msg.create_time = self._create_time
1284 vnfr_msg.uptime = int(time.time()) - self._create_time
1285 vnfr_msg.mgmt_interface = mgmt_intf
1286
1287 # Add all the VLRs to VNFR
1288 for vlr in self._vlrs:
1289 ivlr = vnfr_msg.internal_vlr.add()
1290 ivlr.vlr_ref = vlr.vlr_id
1291
1292 # Add all the VDURs to VDUR
1293 if self._vdus is not None:
1294 for vdu in self._vdus:
1295 vdur = vnfr_msg.vdur.add()
1296 vdur.from_dict(vdu.msg.as_dict())
1297
1298 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1299 vnfr_msg.dashboard_url = self.dashboard_url
1300
1301 for cpr in self._cprs:
1302 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1303 vnfr_msg.connection_point.append(new_cp)
1304
1305 if self._vnf_mon is not None:
1306 for monp in self._vnf_mon.msg:
1307 vnfr_msg.monitoring_param.append(
1308 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1309
1310 if self._vnfr.vnf_configuration is not None:
1311 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1312 if (ip_address is not None and
1313 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1314 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1315
1316 for group in self._vnfr_msg.placement_groups_info:
1317 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1318 group_info.from_dict(group.as_dict())
1319 vnfr_msg.placement_groups_info.append(group_info)
1320
1321 return vnfr_msg
1322
1323 @property
1324 def dashboard_url(self):
1325 ip, cfg_port = self.mgmt_intf_info()
1326 protocol = 'http'
1327 http_port = 80
1328 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1329 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1330 protocol = 'https'
1331 http_port = 443
1332 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1333 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1334
1335 url = "{protocol}://{ip_address}:{port}/{path}".format(
1336 protocol=protocol,
1337 ip_address=ip,
1338 port=http_port,
1339 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1340 )
1341
1342 return url
1343
1344 @property
1345 def xpath(self):
1346 """ path for this VNFR """
1347 return("D,/vnfr:vnfr-catalog"
1348 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1349
1350 @asyncio.coroutine
1351 def publish(self, xact):
1352 """ publish this VNFR """
1353 vnfr = self.msg
1354 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1355 self.xpath, self.msg)
1356 vnfr.create_time = self._create_time
1357 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1358 self._log.debug("Published VNFR path = [%s], record = [%s]",
1359 self.xpath, self.msg)
1360
1361 @asyncio.coroutine
1362 def create_vls(self):
1363 """ Publish The VLs associated with this VNF """
1364 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1365 self.vnfd_id)
1366 for ivld_msg in self.vnfd.internal_vld:
1367 self._log.debug("Creating internal vld:"
1368 " %s, int_cp_ref = %s",
1369 ivld_msg, ivld_msg.internal_connection_point
1370 )
1371 vlr = InternalVirtualLinkRecord(dts=self._dts,
1372 log=self._log,
1373 loop=self._loop,
1374 ivld_msg=ivld_msg,
1375 vnfr_name=self.name,
1376 cloud_account_name=self.cloud_account_name
1377 )
1378 self._vlrs.append(vlr)
1379
1380 for int_cp in ivld_msg.internal_connection_point:
1381 if int_cp.id_ref in self._vlr_by_cp:
1382 msg = ("Connection point %s already "
1383 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1384 raise InternalVirtualLinkRecordError(msg)
1385 self._log.debug("Setting vlr %s to internal cp = %s",
1386 vlr, int_cp.id_ref)
1387 self._vlr_by_cp[int_cp.id_ref] = vlr
1388
1389 @asyncio.coroutine
1390 def instantiate_vls(self, xact, restart_mode=False):
1391 """ Instantiate the VLs associated with this VNF """
1392 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1393 self.vnfd_id)
1394
1395 for vlr in self._vlrs:
1396 self._log.debug("Instantiating VLR %s", vlr)
1397 yield from vlr.instantiate(xact, restart_mode)
1398
1399 def find_vlr_by_cp(self, cp_name):
1400 """ Find the VLR associated with the cp name """
1401 return self._vlr_by_cp[cp_name]
1402
1403 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1404 """
1405 Returns the cloud specific construct for placement group
1406 Arguments:
1407 input_group: VNFD PlacementGroup
1408 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1409 """
1410 copy_dict = ['name', 'requirement', 'strategy']
1411 for group_info in nsr_config.vnfd_placement_group_maps:
1412 if group_info.placement_group_ref == input_group.name and \
1413 group_info.vnfd_id_ref == self.vnfd_id:
1414 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1415 group_dict = {k:v for k,v in
1416 group_info.as_dict().items()
1417 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1418 for param in copy_dict:
1419 group_dict.update({param: getattr(input_group, param)})
1420 group.from_dict(group_dict)
1421 return group
1422 return None
1423
1424 @asyncio.coroutine
1425 def get_vdu_placement_groups(self, vdu):
1426 placement_groups = []
1427 ### Step-1: Get VNF level placement groups
1428 for group in self._vnfr_msg.placement_groups_info:
1429 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1430 #group_info.from_dict(group.as_dict())
1431 placement_groups.append(group)
1432
1433 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1434 nsr_config = yield from self.get_nsr_config()
1435
1436 ### Step-3: Get VDU level placement groups
1437 for group in self.vnfd.placement_groups:
1438 for member_vdu in group.member_vdus:
1439 if member_vdu.member_vdu_ref == vdu.id:
1440 group_info = self.resolve_placement_group_cloud_construct(group,
1441 nsr_config)
1442 if group_info is None:
1443 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1444 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1445 else:
1446 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1447 str(group_info),
1448 vdu.name,
1449 self.vnf_name,
1450 self.member_vnf_index)
1451 placement_groups.append(group_info)
1452
1453 return placement_groups
1454
1455 @asyncio.coroutine
1456 def create_vdus(self, vnfr, restart_mode=False):
1457 """ Create the VDUs associated with this VNF """
1458
1459 def get_vdur_id(vdud):
1460 """Get the corresponding VDUR's id for the VDUD. This is useful in
1461 case of a restart.
1462
1463 In restart mode we check for exiting VDUR's ID and use them, if
1464 available. This way we don't end up creating duplicate VDURs
1465 """
1466 vdur_id = None
1467
1468 if restart_mode and vdud is not None:
1469 try:
1470 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1471 vdur_id = vdur[0]
1472 except IndexError:
1473 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1474
1475 return vdur_id
1476
1477
1478 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1479 for vdu in self._rw_vnfd.vdu:
1480 self._log.debug("Creating vdu: %s", vdu)
1481 vdur_id = get_vdur_id(vdu)
1482
1483 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1484 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1485 vdu.name,
1486 self.vnf_name,
1487 self.member_vnf_index,
1488 [ group.name for group in placement_groups])
1489
1490 vdur = VirtualDeploymentUnitRecord(
1491 dts=self._dts,
1492 log=self._log,
1493 loop=self._loop,
1494 vdud=vdu,
1495 vnfr=vnfr,
1496 mgmt_intf=self.has_mgmt_interface(vdu),
1497 mgmt_network=self._mgmt_network,
1498 cloud_account_name=self.cloud_account_name,
1499 vnfd_package_store=self._vnfd_package_store,
1500 vdur_id=vdur_id,
1501 placement_groups = placement_groups,
1502 )
1503 yield from vdur.vdu_opdata_register()
1504
1505 self._vdus.append(vdur)
1506
1507 @asyncio.coroutine
1508 def instantiate_vdus(self, xact, vnfr):
1509 """ Instantiate the VDUs associated with this VNF """
1510 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1511
1512 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1513
1514 # Identify any dependencies among the VDUs
1515 dependencies = collections.defaultdict(list)
1516 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1517
1518 for vdu in self._vdus:
1519 if vdu.vdud_cloud_init is not None:
1520 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1521 if vdu_id != vdu.vdu_id:
1522 # This means that vdu.vdu_id depends upon vdu_id,
1523 # i.e. vdu_id must be instantiated before
1524 # vdu.vdu_id.
1525 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1526
1527 # Define the terminal states of VDU instantiation
1528 terminal = (
1529 VDURecordState.READY,
1530 VDURecordState.TERMINATED,
1531 VDURecordState.FAILED,
1532 )
1533
1534 datastore = VdurDatastore()
1535 processed = set()
1536
1537 @asyncio.coroutine
1538 def instantiate_monitor(vdu):
1539 """Monitor the state of the VDU during instantiation
1540
1541 Arguments:
1542 vdu - a VirtualDeploymentUnitRecord
1543
1544 """
1545 # wait for the VDUR to enter a terminal state
1546 while vdu._state not in terminal:
1547 yield from asyncio.sleep(1, loop=self._loop)
1548
1549 # update the datastore
1550 datastore.update(vdu)
1551
1552 # add the VDU to the set of processed VDUs
1553 processed.add(vdu.vdu_id)
1554
1555 @asyncio.coroutine
1556 def instantiate(vdu):
1557 """Instantiate the specified VDU
1558
1559 Arguments:
1560 vdu - a VirtualDeploymentUnitRecord
1561
1562 Raises:
1563 if the VDU, or any of the VDUs this VDU depends upon, are
1564 terminated or fail to instantiate properly, a
1565 VirtualDeploymentUnitRecordError is raised.
1566
1567 """
1568 for dependency in dependencies[vdu.vdu_id]:
1569 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1570
1571 while dependency.vdu_id not in processed:
1572 yield from asyncio.sleep(1, loop=self._loop)
1573
1574 if not dependency.active:
1575 raise VirtualDeploymentUnitRecordError()
1576
1577 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1578
1579 # Populate the datastore with the current values of the VDU
1580 datastore.add(vdu)
1581
1582 # Substitute any variables contained in the cloud config script
1583 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1584
1585 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1586 if len(parts) > 1:
1587
1588 # Extract the variable names
1589 variables = list()
1590 for variable in parts[1::2]:
1591 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1592
1593 # Iterate of the variables and substitute values from the
1594 # datastore.
1595 for variable in variables:
1596
1597 # Handle a reference to a VDU by ID
1598 if variable.startswith('vdu['):
1599 value = datastore.get(variable)
1600 if value is None:
1601 msg = "Unable to find a substitute for {} in {} cloud-init script"
1602 raise ValueError(msg.format(variable, vdu.vdu_id))
1603
1604 config = config.replace("{{ %s }}" % variable, value)
1605 continue
1606
1607 # Handle a reference to the current VDU
1608 if variable.startswith('vdu'):
1609 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1610 config = config.replace("{{ %s }}" % variable, value)
1611 continue
1612
1613 # Handle unrecognized variables
1614 msg = 'unrecognized cloud-config variable: {}'
1615 raise ValueError(msg.format(variable))
1616
1617 # Instantiate the VDU
1618 with self._dts.transaction() as xact:
1619 self._log.debug("Instantiating vdu: %s", vdu)
1620 yield from vdu.instantiate(xact, vnfr, config=config)
1621 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1622 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1623 self.vnfr_id, vdu)
1624
1625 # First create a set of tasks to monitor the state of the VDUs and
1626 # report when they have entered a terminal state
1627 for vdu in self._vdus:
1628 self._loop.create_task(instantiate_monitor(vdu))
1629
1630 for vdu in self._vdus:
1631 self._loop.create_task(instantiate(vdu))
1632
1633 def has_mgmt_interface(self, vdu):
1634 # ## TODO: Support additional mgmt_interface type options
1635 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1636 return True
1637 return False
1638
1639 def vlr_xpath(self, vlr_id):
1640 """ vlr xpath """
1641 return(
1642 "D,/vlr:vlr-catalog/"
1643 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1644
1645 def ext_vlr_by_id(self, vlr_id):
1646 """ find ext vlr by id """
1647 return self._ext_vlrs[vlr_id]
1648
1649 @asyncio.coroutine
1650 def publish_inventory(self, xact):
1651 """ Publish the inventory associated with this VNF """
1652 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1653
1654 for component in self._rw_vnfd.component:
1655 self._log.debug("Creating inventory component %s", component)
1656 mangled_name = VcsComponent.mangle_name(component.component_name,
1657 self.vnf_name,
1658 self.vnfd_id
1659 )
1660 comp = VcsComponent(dts=self._dts,
1661 log=self._log,
1662 loop=self._loop,
1663 cluster_name=self._cluster_name,
1664 vcs_handler=self._vcs_handler,
1665 component=component,
1666 mangled_name=mangled_name,
1667 )
1668 if comp.name in self._inventory:
1669 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1670 component, self._vnfd_id)
1671 return
1672 self._log.debug("Adding component %s for vnrf %s",
1673 comp.name, self._vnfr_id)
1674 self._inventory[comp.name] = comp
1675 yield from comp.publish(xact)
1676
1677 def all_vdus_active(self):
1678 """ Are all VDUS in this VNFR active? """
1679 for vdu in self._vdus:
1680 if not vdu.active:
1681 return False
1682
1683 self._log.debug("Inside all_vdus_active. Returning True")
1684 return True
1685
1686 @asyncio.coroutine
1687 def instantiation_failed(self, failed_reason=None):
1688 """ VNFR instantiation failed """
1689 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1690 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1691 self._state_failed_reason = failed_reason
1692
1693 # Update the VNFR with the changed status
1694 yield from self.publish(None)
1695
1696 @asyncio.coroutine
1697 def is_ready(self):
1698 """ This VNF is ready"""
1699 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1700
1701 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1702 self.set_state(VirtualNetworkFunctionRecordState.READY)
1703
1704 else:
1705 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1706
1707 # Update the VNFR with the changed status
1708 yield from self.publish(None)
1709
1710 def update_cp(self, cp_name, ip_address, cp_id):
1711 """Updated the connection point with ip address"""
1712 for cp in self._cprs:
1713 if cp.name == cp_name:
1714 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1715 cp_name, cp, ip_address, cp_id)
1716 cp.ip_address = ip_address
1717 cp.connection_point_id = cp_id
1718 return
1719
1720 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1721 self._log.debug(err)
1722 raise VirtualDeploymentUnitRecordError(err)
1723
1724 def set_state(self, state):
1725 """ Set state for this VNFR"""
1726 self._state = state
1727
1728 @asyncio.coroutine
1729 def instantiate(self, xact, restart_mode=False):
1730 """ instantiate this VNF """
1731 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1732 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1733
1734 @asyncio.coroutine
1735 def fetch_vlrs():
1736 """ Fetch VLRs """
1737 # Iterate over all the connection points in VNFR and fetch the
1738 # associated VLRs
1739
1740 def cpr_from_cp(cp):
1741 """ Creates a record level connection point from the desciptor cp"""
1742 cp_fields = ["name", "image", "vm-flavor", "static_ip_address"]
1743 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1744 cpr_dict = {}
1745 cpr_dict.update(cp_copy_dict)
1746 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1747
1748 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1749 self._vnfr_id, self._vnfr.connection_point)
1750
1751 for cp in self._vnfr.connection_point:
1752 cpr = cpr_from_cp(cp)
1753 self._cprs.append(cpr)
1754 self._log.debug("Adding Connection point record %s ", cp)
1755
1756 vlr_path = self.vlr_xpath(cp.vlr_ref)
1757 self._log.debug("Fetching VLR with path = %s", vlr_path)
1758 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1759 rwdts.XactFlag.MERGE)
1760 for i in res_iter:
1761 r = yield from i
1762 d = r.result
1763 self._ext_vlrs[cp.vlr_ref] = d
1764 cpr.vlr_ref = cp.vlr_ref
1765 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1766
1767 # Increase the VNFD reference count
1768 self.vnfd_ref()
1769
1770 assert self.vnfd
1771
1772 # Fetch External VLRs
1773 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1774 yield from fetch_vlrs()
1775
1776 # Publish inventory
1777 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1778 yield from self.publish_inventory(xact)
1779
1780 # Publish inventory
1781 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1782 yield from self.create_vls()
1783
1784 # publish the VNFR
1785 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1786 yield from self.publish(xact)
1787
1788 # instantiate VLs
1789 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1790 try:
1791 yield from self.instantiate_vls(xact, restart_mode)
1792 except Exception as e:
1793 self._log.exception("VL instantiation failed (%s)", str(e))
1794 yield from self.instantiation_failed(str(e))
1795 return
1796
1797 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1798
1799 # instantiate VDUs
1800 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1801 yield from self.create_vdus(self, restart_mode)
1802
1803 # publish the VNFR
1804 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1805 yield from self.publish(xact)
1806
1807 # instantiate VDUs
1808 # ToDo: Check if this should be prevented during restart
1809 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1810 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1811
1812 # publish the VNFR
1813 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1814 yield from self.publish(xact)
1815
1816 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1817
1818 # create task updating uptime for this vnfr
1819 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1820 self._loop.create_task(self.vnfr_uptime_update(xact))
1821
1822 @asyncio.coroutine
1823 def terminate(self, xact):
1824 """ Terminate this virtual network function """
1825
1826 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1827
1828 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1829
1830 # stop monitoring
1831 if self._vnf_mon is not None:
1832 self._vnf_mon.stop()
1833 self._vnf_mon.deregister()
1834 self._vnf_mon = None
1835
1836 @asyncio.coroutine
1837 def terminate_vls():
1838 """ Terminate VLs in this VNF """
1839 for vl in self._vlrs:
1840 yield from vl.terminate(xact)
1841
1842 @asyncio.coroutine
1843 def terminate_vdus():
1844 """ Terminate VDUS in this VNF """
1845 for vdu in self._vdus:
1846 yield from vdu.terminate(xact)
1847
1848 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1849 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1850 yield from terminate_vls()
1851
1852 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1853 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1854 yield from terminate_vdus()
1855
1856 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1857 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1858
1859 @asyncio.coroutine
1860 def vnfr_uptime_update(self, xact):
1861 while True:
1862 # Return when vnfr state is FAILED or TERMINATED etc
1863 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1864 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1865 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1866 VirtualNetworkFunctionRecordState.READY]:
1867 return
1868 yield from self.publish(xact)
1869 yield from asyncio.sleep(2, loop=self._loop)
1870
1871
1872
1873 class VnfdDtsHandler(object):
1874 """ DTS handler for VNFD config changes """
1875 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1876
1877 def __init__(self, dts, log, loop, vnfm):
1878 self._dts = dts
1879 self._log = log
1880 self._loop = loop
1881 self._vnfm = vnfm
1882 self._regh = None
1883
1884 @asyncio.coroutine
1885 def regh(self):
1886 """ DTS registration handle """
1887 return self._regh
1888
1889 @asyncio.coroutine
1890 def register(self):
1891 """ Register for VNFD configuration"""
1892
1893 def on_apply(dts, acg, xact, action, scratch):
1894 """Apply the configuration"""
1895 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1896 xact, action, scratch)
1897
1898 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1899
1900 @asyncio.coroutine
1901 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1902 """ on prepare callback """
1903 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1904 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1905 fref = ProtobufC.FieldReference.alloc()
1906 fref.goto_whole_message(msg.to_pbcm())
1907
1908 # Handle deletes in prepare_callback
1909 if fref.is_field_deleted():
1910 # Delete an VNFD record
1911 self._log.debug("Deleting VNFD with id %s", msg.id)
1912 if self._vnfm.vnfd_in_use(msg.id):
1913 self._log.debug("Cannot delete VNFD in use - %s", msg)
1914 err = "Cannot delete a VNFD in use - %s" % msg
1915 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1916 # Delete a VNFD record
1917 yield from self._vnfm.delete_vnfd(msg.id)
1918
1919 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1920
1921 self._log.debug(
1922 "Registering for VNFD config using xpath: %s",
1923 VnfdDtsHandler.XPATH,
1924 )
1925 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1926 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1927 self._regh = acg.register(
1928 xpath=VnfdDtsHandler.XPATH,
1929 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1930 on_prepare=on_prepare)
1931
1932
1933 class VcsComponentDtsHandler(object):
1934 """ Vcs Component DTS handler """
1935 XPATH = ("D,/rw-manifest:manifest" +
1936 "/rw-manifest:operational-inventory" +
1937 "/rw-manifest:component")
1938
1939 def __init__(self, dts, log, loop, vnfm):
1940 self._dts = dts
1941 self._log = log
1942 self._loop = loop
1943 self._regh = None
1944 self._vnfm = vnfm
1945
1946 @property
1947 def regh(self):
1948 """ DTS registration handle """
1949 return self._regh
1950
1951 @asyncio.coroutine
1952 def register(self):
1953 """ Registers VCS component dts publisher registration"""
1954 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1955 VcsComponentDtsHandler.XPATH)
1956
1957 hdl = rift.tasklets.DTS.RegistrationHandler()
1958 handlers = rift.tasklets.Group.Handler()
1959 with self._dts.group_create(handler=handlers) as group:
1960 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1961 handler=hdl,
1962 flags=(rwdts.Flag.PUBLISHER |
1963 rwdts.Flag.NO_PREP_READ |
1964 rwdts.Flag.DATASTORE),)
1965
1966 @asyncio.coroutine
1967 def publish(self, xact, path, msg):
1968 """ Publishes the VCS component """
1969 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1970 xact, path, msg)
1971 self.regh.create_element(path, msg)
1972 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1973 VcsComponentDtsHandler.XPATH, xact, path, msg)
1974
1975 class VnfrConsoleOperdataDtsHandler(object):
1976 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1977 @property
1978 def vnfr_vdu_console_xpath(self):
1979 """ path for resource-mgr"""
1980 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1981
1982 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1983 self._dts = dts
1984 self._log = log
1985 self._loop = loop
1986 self._regh = None
1987 self._vnfm = vnfm
1988
1989 self._vnfr_id = vnfr_id
1990 self._vdur_id = vdur_id
1991 self._vdu_id = vdu_id
1992
1993 @asyncio.coroutine
1994 def register(self):
1995 """ Register for VNFR VDU Operational Data read from dts """
1996
1997 @asyncio.coroutine
1998 def on_prepare(xact_info, action, ks_path, msg):
1999 """ prepare callback from dts """
2000 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2001 self._log.debug(
2002 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2003 xact_info, action, xpath, msg
2004 )
2005
2006 if action == rwdts.QueryAction.READ:
2007 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2008 path_entry = schema.keyspec_to_entry(ks_path)
2009 self._log.debug("VDU Opdata path is {}".format(path_entry))
2010 try:
2011 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2012 except VnfRecordError as e:
2013 self._log.error("VNFR id %s not found", self._vnfr_id)
2014 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2015 return
2016 try:
2017 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2018 if not vdur._state == VDURecordState.READY:
2019 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2020 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2021 return
2022 with self._dts.transaction() as new_xact:
2023 resp = yield from vdur.read_resource(new_xact)
2024 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2025 vdur_console.id = self._vdur_id
2026 if resp.console_url:
2027 vdur_console.console_url = resp.console_url
2028 else:
2029 vdur_console.console_url = 'none'
2030 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2031 except Exception:
2032 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2033 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2034 vdur_console.id = self._vdur_id
2035 vdur_console.console_url = 'none'
2036
2037 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2038 xpath=self.vnfr_vdu_console_xpath,
2039 msg=vdur_console)
2040 else:
2041 #raise VnfRecordError("Not supported operation %s" % action)
2042 self._log.error("Not supported operation %s" % action)
2043 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2044 return
2045
2046
2047 self._log.debug("Registering for VNFR VDU using xpath: %s",
2048 self.vnfr_vdu_console_xpath)
2049 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2050 with self._dts.group_create() as group:
2051 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2052 handler=hdl,
2053 flags=rwdts.Flag.PUBLISHER,
2054 )
2055
2056
2057 class VnfrDtsHandler(object):
2058 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2059 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2060
2061 def __init__(self, dts, log, loop, vnfm):
2062 self._dts = dts
2063 self._log = log
2064 self._loop = loop
2065 self._vnfm = vnfm
2066
2067 self._regh = None
2068
2069 @property
2070 def regh(self):
2071 """ Return registration handle"""
2072 return self._regh
2073
2074 @property
2075 def vnfm(self):
2076 """ Return VNF manager instance """
2077 return self._vnfm
2078
2079 @asyncio.coroutine
2080 def register(self):
2081 """ Register for vnfr create/update/delete/read requests from dts """
2082 def on_commit(xact_info):
2083 """ The transaction has been committed """
2084 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2085 return rwdts.MemberRspCode.ACTION_OK
2086
2087 def on_abort(*args):
2088 """ Abort callback """
2089 self._log.debug("VNF transaction got aborted")
2090
2091 @asyncio.coroutine
2092 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2093
2094 @asyncio.coroutine
2095 def instantiate_realloc_vnfr(vnfr):
2096 """Re-populate the vnfm after restart
2097
2098 Arguments:
2099 vlink
2100
2101 """
2102
2103 yield from vnfr.instantiate(None, restart_mode=True)
2104
2105 if xact_event == rwdts.MemberEvent.INSTALL:
2106 curr_cfg = self.regh.elements
2107 for cfg in curr_cfg:
2108 vnfr = self.vnfm.create_vnfr(cfg)
2109 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2110
2111 self._log.debug("Got on_event in vnfm")
2112
2113 return rwdts.MemberRspCode.ACTION_OK
2114
2115 @asyncio.coroutine
2116 def on_prepare(xact_info, action, ks_path, msg):
2117 """ prepare callback from dts """
2118 self._log.debug(
2119 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2120 xact_info, action, msg
2121 )
2122
2123 if action == rwdts.QueryAction.CREATE:
2124 if not msg.has_field("vnfd"):
2125 err = "Vnfd not provided"
2126 self._log.error(err)
2127 raise VnfRecordError(err)
2128
2129 vnfr = self.vnfm.create_vnfr(msg)
2130 try:
2131 # RIFT-9105: Unable to add a READ query under an existing transaction
2132 # xact = xact_info.xact
2133 yield from vnfr.instantiate(None)
2134 except Exception as e:
2135 self._log.exception(e)
2136 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2137 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2138 yield from vnfr.publish(None)
2139 elif action == rwdts.QueryAction.DELETE:
2140 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2141 path_entry = schema.keyspec_to_entry(ks_path)
2142 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2143
2144 if vnfr is None:
2145 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2146 raise VirtualNetworkFunctionRecordNotFound(
2147 "VNFR id %s", path_entry.key00.id)
2148
2149 try:
2150 yield from vnfr.terminate(xact_info.xact)
2151 # Unref the VNFD
2152 vnfr.vnfd_unref()
2153 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2154 except Exception as e:
2155 self._log.exception(e)
2156 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2157
2158 elif action == rwdts.QueryAction.UPDATE:
2159 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2160 path_entry = schema.keyspec_to_entry(ks_path)
2161 vnfr = None
2162 try:
2163 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2164 except Exception as e:
2165 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2166 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2167 return
2168
2169 if vnfr is None:
2170 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2171 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2172 return
2173
2174 self._log.debug("VNFR {} update config status {} (current {})".
2175 format(vnfr.name, msg.config_status, vnfr.config_status))
2176 # Update the config status and publish
2177 vnfr._config_status = msg.config_status
2178 yield from vnfr.publish(None)
2179
2180 else:
2181 raise NotImplementedError(
2182 "%s action on VirtualNetworkFunctionRecord not supported",
2183 action)
2184
2185 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2186
2187 self._log.debug("Registering for VNFR using xpath: %s",
2188 VnfrDtsHandler.XPATH,)
2189
2190 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2191 on_prepare=on_prepare,)
2192 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2193 with self._dts.group_create(handler=handlers) as group:
2194 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2195 handler=hdl,
2196 flags=(rwdts.Flag.PUBLISHER |
2197 rwdts.Flag.NO_PREP_READ |
2198 rwdts.Flag.CACHE |
2199 rwdts.Flag.DATASTORE),)
2200
2201 @asyncio.coroutine
2202 def create(self, xact, path, msg):
2203 """
2204 Create a VNFR record in DTS with path and message
2205 """
2206 self._log.debug("Creating VNFR xact = %s, %s:%s",
2207 xact, path, msg)
2208
2209 self.regh.create_element(path, msg)
2210 self._log.debug("Created VNFR xact = %s, %s:%s",
2211 xact, path, msg)
2212
2213 @asyncio.coroutine
2214 def update(self, xact, path, msg):
2215 """
2216 Update a VNFR record in DTS with path and message
2217 """
2218 self._log.debug("Updating VNFR xact = %s, %s:%s",
2219 xact, path, msg)
2220 self.regh.update_element(path, msg)
2221 self._log.debug("Updated VNFR xact = %s, %s:%s",
2222 xact, path, msg)
2223
2224 @asyncio.coroutine
2225 def delete(self, xact, path):
2226 """
2227 Delete a VNFR record in DTS with path and message
2228 """
2229 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2230 self.regh.delete_element(path)
2231 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2232
2233
2234 class VnfdRefCountDtsHandler(object):
2235 """ The VNFD Ref Count DTS handler """
2236 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2237
2238 def __init__(self, dts, log, loop, vnfm):
2239 self._dts = dts
2240 self._log = log
2241 self._loop = loop
2242 self._vnfm = vnfm
2243
2244 self._regh = None
2245
2246 @property
2247 def regh(self):
2248 """ Return registration handle """
2249 return self._regh
2250
2251 @property
2252 def vnfm(self):
2253 """ Return the NS manager instance """
2254 return self._vnfm
2255
2256 @asyncio.coroutine
2257 def register(self):
2258 """ Register for VNFD ref count read from dts """
2259
2260 @asyncio.coroutine
2261 def on_prepare(xact_info, action, ks_path, msg):
2262 """ prepare callback from dts """
2263 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2264 self._log.debug(
2265 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2266 xact_info, action, xpath, msg
2267 )
2268
2269 if action == rwdts.QueryAction.READ:
2270 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2271 path_entry = schema.keyspec_to_entry(ks_path)
2272 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2273 for xpath, msg in vnfd_list:
2274 self._log.debug("Responding to ref count query path:%s, msg:%s",
2275 xpath, msg)
2276 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2277 xpath=xpath,
2278 msg=msg)
2279 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2280 else:
2281 raise VnfRecordError("Not supported operation %s" % action)
2282
2283 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2284 with self._dts.group_create() as group:
2285 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2286 handler=hdl,
2287 flags=rwdts.Flag.PUBLISHER,
2288 )
2289
2290
2291 class VdurDatastore(object):
2292 """
2293 This VdurDatastore is intended to expose select information about a VDUR
2294 such that it can be referenced in a cloud config file. The data that is
2295 exposed does not necessarily follow the structure of the data in the yang
2296 model. This is intentional. The data that are exposed are intended to be
2297 agnostic of the yang model so that changes in the model do not necessarily
2298 require changes to the interface provided to the user. It also means that
2299 the user does not need to be familiar with the RIFT.ware yang models.
2300 """
2301
2302 def __init__(self):
2303 """Create an instance of VdurDatastore"""
2304 self._vdur_data = dict()
2305 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2306
2307 def add(self, vdur):
2308 """Add a new VDUR to the datastore
2309
2310 Arguments:
2311 vdur - a VirtualDeploymentUnitRecord instance
2312
2313 Raises:
2314 A ValueError is raised if the VDUR is (1) None or (2) already in
2315 the datastore.
2316
2317 """
2318 if vdur.vdu_id is None:
2319 raise ValueError('VDURs are required to have an ID')
2320
2321 if vdur.vdu_id in self._vdur_data:
2322 raise ValueError('cannot add a VDUR more than once')
2323
2324 self._vdur_data[vdur.vdu_id] = dict()
2325
2326 def set_if_not_none(key, attr):
2327 if attr is not None:
2328 self._vdur_data[vdur.vdu_id][key] = attr
2329
2330 set_if_not_none('name', vdur._vdud.name)
2331 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2332
2333 def update(self, vdur):
2334 """Update the VDUR information in the datastore
2335
2336 Arguments:
2337 vdur - a GI representation of a VDUR
2338
2339 Raises:
2340 A ValueError is raised if the VDUR is (1) None or (2) already in
2341 the datastore.
2342
2343 """
2344 if vdur.vdu_id is None:
2345 raise ValueError('VNFDs are required to have an ID')
2346
2347 if vdur.vdu_id not in self._vdur_data:
2348 raise ValueError('VNF is not recognized')
2349
2350 def set_or_delete(key, attr):
2351 if attr is None:
2352 if key in self._vdur_data[vdur.vdu_id]:
2353 del self._vdur_data[vdur.vdu_id][key]
2354
2355 else:
2356 self._vdur_data[vdur.vdu_id][key] = attr
2357
2358 set_or_delete('name', vdur._vdud.name)
2359 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2360
2361 def remove(self, vdur_id):
2362 """Remove all of the data associated with specified VDUR
2363
2364 Arguments:
2365 vdur_id - the identifier of a VNFD in the datastore
2366
2367 Raises:
2368 A ValueError is raised if the VDUR is not contained in the
2369 datastore.
2370
2371 """
2372 if vdur_id not in self._vdur_data:
2373 raise ValueError('VNF is not recognized')
2374
2375 del self._vdur_data[vdur_id]
2376
2377 def get(self, expr):
2378 """Retrieve VDUR information from the datastore
2379
2380 An expression should be of the form,
2381
2382 vdu[<id>].<attr>
2383
2384 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2385 the exposed attribute that the user wishes to retrieve.
2386
2387 If the requested data is not available, None is returned.
2388
2389 Arguments:
2390 expr - a string that specifies the data to return
2391
2392 Raises:
2393 A ValueError is raised if the provided expression cannot be parsed.
2394
2395 Returns:
2396 The requested data or None
2397
2398 """
2399 result = self._pattern.match(expr)
2400 if result is None:
2401 raise ValueError('data expression not recognized ({})'.format(expr))
2402
2403 vdur_id, key = result.groups()
2404
2405 if vdur_id not in self._vdur_data:
2406 return None
2407
2408 return self._vdur_data[vdur_id].get(key, None)
2409
2410
2411 class VnfManager(object):
2412 """ The virtual network function manager class """
2413 def __init__(self, dts, log, loop, cluster_name):
2414 self._dts = dts
2415 self._log = log
2416 self._loop = loop
2417 self._cluster_name = cluster_name
2418
2419 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2420 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2421 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2422 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2423
2424 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2425 self._vnfr_handler,
2426 self._vcs_handler,
2427 self._vnfr_ref_handler,
2428 self._nsr_handler]
2429 self._vnfrs = {}
2430 self._vnfds_to_vnfr = {}
2431 self._nsrs = {}
2432
2433 @property
2434 def vnfr_handler(self):
2435 """ VNFR dts handler """
2436 return self._vnfr_handler
2437
2438 @property
2439 def vcs_handler(self):
2440 """ VCS dts handler """
2441 return self._vcs_handler
2442
2443 @asyncio.coroutine
2444 def register(self):
2445 """ Register all static DTS handlers """
2446 for hdl in self._dts_handlers:
2447 yield from hdl.register()
2448
2449 @asyncio.coroutine
2450 def run(self):
2451 """ Run this VNFM instance """
2452 self._log.debug("Run VNFManager - registering static DTS handlers""")
2453 yield from self.register()
2454
2455 def handle_nsr(self, nsr, action):
2456 if action in [rwdts.QueryAction.CREATE]:
2457 self._nsrs[nsr.id] = nsr
2458 elif action == rwdts.QueryAction.DELETE:
2459 if nsr.id in self._nsrs:
2460 del self._nsrs[nsr.id]
2461
2462 def get_linked_mgmt_network(self, vnfr):
2463 """For the given VNFR get the related mgmt network from the NSD, if
2464 available.
2465 """
2466 vnfd_id = vnfr.vnfd.id
2467 nsr_id = vnfr.nsr_id_ref
2468
2469 # for the given related VNFR, get the corresponding NSR-config
2470 nsr_obj = None
2471 try:
2472 nsr_obj = self._nsrs[nsr_id]
2473 except KeyError:
2474 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2475
2476 # for the related NSD check if a VLD exists such that it's a mgmt
2477 # network
2478 for vld in nsr_obj.nsd.vld:
2479 if vld.mgmt_network:
2480 return vld.vim_network_name
2481
2482 return None
2483
2484 def get_vnfr(self, vnfr_id):
2485 """ get VNFR by vnfr id """
2486
2487 if vnfr_id not in self._vnfrs:
2488 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2489
2490 return self._vnfrs[vnfr_id]
2491
2492 def create_vnfr(self, vnfr):
2493 """ Create a VNFR instance """
2494 if vnfr.id in self._vnfrs:
2495 msg = "Vnfr id %s already exists" % vnfr.id
2496 self._log.error(msg)
2497 raise VnfRecordError(msg)
2498
2499 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2500 vnfr.id,
2501 vnfr.vnfd.id)
2502
2503 mgmt_network = self.get_linked_mgmt_network(vnfr)
2504
2505 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2506 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2507 mgmt_network=mgmt_network
2508 )
2509 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2510 return self._vnfrs[vnfr.id]
2511
2512 @asyncio.coroutine
2513 def delete_vnfr(self, xact, vnfr):
2514 """ Create a VNFR instance """
2515 if vnfr.vnfr_id in self._vnfrs:
2516 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2517 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2518 del self._vnfrs[vnfr.vnfr_id]
2519
2520 @asyncio.coroutine
2521 def fetch_vnfd(self, vnfd_id):
2522 """ Fetch VNFDs based with the vnfd id"""
2523 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2524 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2525 vnfd = None
2526
2527 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2528
2529 for ent in res_iter:
2530 res = yield from ent
2531 vnfd = res.result
2532
2533 if vnfd is None:
2534 err = "Failed to get Vnfd %s" % vnfd_id
2535 self._log.error(err)
2536 raise VnfRecordError(err)
2537
2538 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2539
2540 return vnfd
2541
2542 def vnfd_in_use(self, vnfd_id):
2543 """ Is this VNFD in use """
2544 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2545 if vnfd_id in self._vnfds_to_vnfr:
2546 return self._vnfds_to_vnfr[vnfd_id].in_use()
2547 return False
2548
2549 @asyncio.coroutine
2550 def publish_vnfr(self, xact, path, msg):
2551 """ Publish a VNFR """
2552 self._log.debug("publish_vnfr called with path %s, msg %s",
2553 path, msg)
2554 yield from self.vnfr_handler.update(xact, path, msg)
2555
2556 @asyncio.coroutine
2557 def delete_vnfd(self, vnfd_id):
2558 """ Delete the Virtual Network Function descriptor with the passed id """
2559 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2560 if vnfd_id not in self._vnfds_to_vnfr:
2561 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2562 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2563
2564 if self._vnfds_to_vnfr[vnfd_id].in_use():
2565 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2566 vnfd_id,
2567 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2568 raise VirtualNetworkFunctionDescriptorRefCountExists(
2569 "Cannot delete :%s, ref_count:%s",
2570 vnfd_id,
2571 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2572
2573 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2574 try:
2575 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2576 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2577 if os.path.exists(vnfd_dir):
2578 shutil.rmtree(vnfd_dir, ignore_errors=True)
2579 except Exception as e:
2580 self._log.error("Exception in cleaning up VNFD {}: {}".
2581 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2582 self._log.exception(e)
2583
2584 del self._vnfds_to_vnfr[vnfd_id]
2585
2586 def vnfd_refcount_xpath(self, vnfd_id):
2587 """ xpath for ref count entry """
2588 return (VnfdRefCountDtsHandler.XPATH +
2589 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2590
2591 @asyncio.coroutine
2592 def get_vnfd_refcount(self, vnfd_id):
2593 """ Get the vnfd_list from this VNFM"""
2594 vnfd_list = []
2595 if vnfd_id is None or vnfd_id == "":
2596 for vnfr in self._vnfds_to_vnfr.values():
2597 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2598 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2599 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2600 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2601 elif vnfd_id in self._vnfds_to_vnfr:
2602 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2603 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2604 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2605 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2606
2607 return vnfd_list
2608
2609
2610 class VnfmTasklet(rift.tasklets.Tasklet):
2611 """ VNF Manager tasklet class """
2612 def __init__(self, *args, **kwargs):
2613 super(VnfmTasklet, self).__init__(*args, **kwargs)
2614 self.rwlog.set_category("rw-mano-log")
2615 self.rwlog.set_subcategory("vnfm")
2616
2617 self._dts = None
2618 self._vnfm = None
2619
2620 def start(self):
2621 try:
2622 super(VnfmTasklet, self).start()
2623 self.log.info("Starting VnfmTasklet")
2624
2625 self.log.setLevel(logging.DEBUG)
2626
2627 self.log.debug("Registering with dts")
2628 self._dts = rift.tasklets.DTS(self.tasklet_info,
2629 RwVnfmYang.get_schema(),
2630 self.loop,
2631 self.on_dts_state_change)
2632
2633 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2634 except Exception:
2635 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2636 raise
2637
2638 def on_instance_started(self):
2639 """ Task insance started callback """
2640 self.log.debug("Got instance started callback")
2641
2642 def stop(self):
2643 try:
2644 self._dts.deinit()
2645 except Exception:
2646 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2647 raise
2648
2649 @asyncio.coroutine
2650 def init(self):
2651 """ Task init callback """
2652 try:
2653 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2654 assert vm_parent_name is not None
2655 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2656 yield from self._vnfm.run()
2657 except Exception:
2658 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2659 raise
2660
2661 @asyncio.coroutine
2662 def run(self):
2663 """ Task run callback """
2664 pass
2665
2666 @asyncio.coroutine
2667 def on_dts_state_change(self, state):
2668 """Take action according to current dts state to transition
2669 application into the corresponding application state
2670
2671 Arguments
2672 state - current dts state
2673 """
2674 switch = {
2675 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2676 rwdts.State.CONFIG: rwdts.State.RUN,
2677 }
2678
2679 handlers = {
2680 rwdts.State.INIT: self.init,
2681 rwdts.State.RUN: self.run,
2682 }
2683
2684 # Transition application to next state
2685 handler = handlers.get(state, None)
2686 if handler is not None:
2687 yield from handler()
2688
2689 # Transition dts to next state
2690 next_state = switch.get(state, None)
2691 if next_state is not None:
2692 self._dts.handle.set_state(next_state)