460b27390fc67bfd2bec88dbaabb8efbf194cfa4
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_id(self, cp_name):
315 """ Find connection point id by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.connection_point_id
320 return ''
321
322 @property
323 def vdu_id(self):
324 return self._vdud.id
325
326 @property
327 def vm_resp(self):
328 return self._vm_resp
329
330 @property
331 def name(self):
332 """ Return this VDUR's name """
333 return self._name
334
335 @property
336 def cloud_account_name(self):
337 """ Cloud account this VDU should be created in """
338 return self._cloud_account_name
339
340 @property
341 def image_name(self):
342 """ name that should be used to lookup the image on the CMP """
343 if 'image' not in self._vdud:
344 return None
345 return os.path.basename(self._vdud.image)
346
347 @property
348 def image_checksum(self):
349 """ name that should be used to lookup the image on the CMP """
350 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
351
352 @property
353 def management_ip(self):
354 if not self.active:
355 return None
356 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
357
358 @property
359 def vm_management_ip(self):
360 if not self.active:
361 return None
362 return self._vm_resp.management_ip
363
364 @property
365 def operational_status(self):
366 """ Operational status of this VDU"""
367 op_stats_dict = {"INIT": "init",
368 "INSTANTIATING": "vm_init_phase",
369 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
370 "READY": "running",
371 "FAILED": "failed",
372 "TERMINATING": "terminated",
373 "TERMINATED": "terminated",
374 }
375 return op_stats_dict[self._state.name]
376
377 @property
378 def msg(self):
379 """ VDU message """
380 vdu_fields = ["vm_flavor",
381 "guest_epa",
382 "vswitch_epa",
383 "hypervisor_epa",
384 "host_epa",
385 "volumes",
386 "name"]
387 vdu_copy_dict = {k: v for k, v in
388 self._vdud.as_dict().items() if k in vdu_fields}
389 vdur_dict = {"id": self._vdur_id,
390 "vdu_id_ref": self._vdud.id,
391 "operational_status": self.operational_status,
392 "operational_status_details": self._state_failed_reason,
393 }
394 if self.vm_resp is not None:
395 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
396 "flavor_id": self.vm_resp.flavor_id
397 })
398 if self._vm_resp.has_field('image_id'):
399 vdur_dict.update({ "image_id": self.vm_resp.image_id })
400
401 if self.management_ip is not None:
402 vdur_dict["management_ip"] = self.management_ip
403
404 if self.vm_management_ip is not None:
405 vdur_dict["vm_management_ip"] = self.vm_management_ip
406
407 vdur_dict.update(vdu_copy_dict)
408
409 if self.vm_resp is not None:
410 if self._vm_resp.has_field('volumes'):
411 for opvolume in self._vm_resp.volumes:
412 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
413 if len(vdurvol_data) == 1:
414 vdurvol_data[0]["volume_id"] = opvolume.volume_id
415
416 icp_list = []
417 ii_list = []
418
419 for intf, cp_id, vlr in self._int_intf:
420 cp = self.find_internal_cp_by_cp_id(cp_id)
421
422 icp_list.append({"name": cp.name,
423 "id": cp.id,
424 "type_yang": "VPORT",
425 "ip_address": self.cp_ip_addr(cp.name)})
426
427 ii_list.append({"name": intf.name,
428 "vdur_internal_connection_point_ref": cp.id,
429 "virtual_interface": {}})
430
431 vdur_dict["internal_connection_point"] = icp_list
432 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
433 vdur_dict["internal_interface"] = ii_list
434
435 ei_list = []
436 for intf, cp, vlr in self._ext_intf:
437 ei_list.append({"name": cp.name,
438 "vnfd_connection_point_ref": cp.name,
439 "virtual_interface": {}})
440 self._vnfr.update_cp(cp.name, self.cp_ip_addr(cp.name), self.cp_id(cp.name))
441
442 vdur_dict["external_interface"] = ei_list
443
444 placement_groups = []
445 for group in self._placement_groups:
446 placement_groups.append(group.as_dict())
447 vdur_dict['placement_groups_info'] = placement_groups
448
449 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
450
451 @property
452 def resmgr_path(self):
453 """ path for resource-mgr"""
454 return ("D,/rw-resource-mgr:resource-mgmt" +
455 "/vdu-event" +
456 "/vdu-event-data[event-id='{}']".format(self._request_id))
457
458 @property
459 def vm_flavor_msg(self):
460 """ VM flavor message """
461 flavor = self._vdud.vm_flavor.__class__()
462 flavor.copy_from(self._vdud.vm_flavor)
463
464 return flavor
465
466 @property
467 def vdud_cloud_init(self):
468 """ Return the cloud-init contents for the VDU """
469 if self._vdud_cloud_init is None:
470 self._vdud_cloud_init = self.cloud_init()
471
472 return self._vdud_cloud_init
473
474 def cloud_init(self):
475 """ Populate cloud_init with cloud-config script from
476 either the inline contents or from the file provided
477 """
478 if self._vdud.cloud_init is not None:
479 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
480 return self._vdud.cloud_init
481 elif self._vdud.cloud_init_file is not None:
482 # Get cloud-init script contents from the file provided in the cloud_init_file param
483 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
484 filename = self._vdud.cloud_init_file
485 self._vnfd_package_store.refresh()
486 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
487 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
488 try:
489 return cloud_init_extractor.read_script(stored_package, filename)
490 except rift.package.cloud_init.CloudInitExtractionError as e:
491 raise VirtualDeploymentUnitRecordError(e)
492 else:
493 self._log.debug("VDU Instantiation: cloud-init script not provided")
494
495 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
496 host_aggregates = []
497 availability_zones = []
498 server_groups = []
499 for group in self._placement_groups:
500 if group.has_field('host_aggregate'):
501 for aggregate in group.host_aggregate:
502 host_aggregates.append(aggregate.as_dict())
503 if group.has_field('availability_zone'):
504 availability_zones.append(group.availability_zone.as_dict())
505 if group.has_field('server_group'):
506 server_groups.append(group.server_group.as_dict())
507
508 if availability_zones:
509 if len(availability_zones) > 1:
510 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
511 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
512 else:
513 vm_create_msg_dict['availability_zone'] = availability_zones[0]
514
515 if server_groups:
516 if len(server_groups) > 1:
517 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
518 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
519 else:
520 vm_create_msg_dict['server_group'] = server_groups[0]
521
522 if host_aggregates:
523 vm_create_msg_dict['host_aggregate'] = host_aggregates
524
525 return
526
527 def process_placement_groups(self, vm_create_msg_dict):
528 """Process the placement_groups and fill resource-mgr request"""
529 if not self._placement_groups:
530 return
531
532 cloud_set = set([group.cloud_type for group in self._placement_groups])
533 assert len(cloud_set) == 1
534 cloud_type = cloud_set.pop()
535
536 if cloud_type == 'openstack':
537 self.process_openstack_placement_group_construct(vm_create_msg_dict)
538
539 else:
540 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
541 return
542
543 def resmgr_msg(self, config=None):
544 vdu_fields = ["vm_flavor",
545 "guest_epa",
546 "vswitch_epa",
547 "hypervisor_epa",
548 "host_epa"]
549
550 self._log.debug("Creating params based on VDUD: %s", self._vdud)
551 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
552
553 vm_create_msg_dict = {
554 "name": self.name,
555 }
556
557 if self.image_name is not None:
558 vm_create_msg_dict["image_name"] = self.image_name
559
560 if self.image_checksum is not None:
561 vm_create_msg_dict["image_checksum"] = self.image_checksum
562
563 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
564 if self._vdud.has_field('mgmt_vpci'):
565 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
566
567 self._log.debug("VDUD: %s", self._vdud)
568 if config is not None:
569 vm_create_msg_dict['vdu_init'] = {'userdata': config}
570
571 if self._mgmt_network:
572 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
573
574 cp_list = []
575 for intf, cp, vlr in self._ext_intf:
576 cp_info = {"name": cp.name,
577 "virtual_link_id": vlr.network_id,
578 "type_yang": intf.virtual_interface.type_yang}
579
580 try:
581 if cp.static_ip_address:
582 cp_info["static_ip_address"] = cp.static_ip_address
583 except AttributeError as e:
584 pass
585
586 if (intf.virtual_interface.has_field('vpci') and
587 intf.virtual_interface.vpci is not None):
588 cp_info["vpci"] = intf.virtual_interface.vpci
589
590 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
591 cp_info['security_group'] = vlr.ip_profile_params.security_group
592
593 self._log.debug("External CP info {}".format(cp_info))
594 cp_list.append(cp_info)
595
596 for intf, cp_id, vlr in self._int_intf:
597 cp = self.find_internal_cp_by_cp_id(cp_id)
598
599 cp_dict = {"name": cp_id,
600 "virtual_link_id": vlr.network_id,
601 "type_yang": intf.virtual_interface.type_yang}
602
603 if (intf.virtual_interface.has_field('vpci') and
604 intf.virtual_interface.vpci is not None):
605 cp_dict["vpci"] = intf.virtual_interface.vpci
606
607 try:
608 if cp.static_ip_address:
609 cp_dict["static_ip_address"] = cp.static_ip_address
610 except AttributeError as e:
611 pass
612
613 self._log.debug("Internal CP info {}".format(cp_info))
614 cp_list.append(cp_dict)
615
616 vm_create_msg_dict["connection_points"] = cp_list
617 vm_create_msg_dict.update(vdu_copy_dict)
618
619 self.process_placement_groups(vm_create_msg_dict)
620
621 msg = RwResourceMgrYang.VDUEventData()
622 msg.event_id = self._request_id
623 msg.cloud_account = self.cloud_account_name
624 msg.request_info.from_dict(vm_create_msg_dict)
625
626 for volume in self._vdud.volumes:
627 v = msg.request_info.volumes.add()
628 v.from_dict(volume.as_dict())
629 return msg
630
631 @asyncio.coroutine
632 def terminate(self, xact):
633 """ Delete resource in VIM """
634 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
635 self._log.warning("VDU terminate in not ready state - Ignoring request")
636 return
637
638 self._state = VDURecordState.TERMINATING
639 if self._vm_resp is not None:
640 try:
641 with self._dts.transaction() as new_xact:
642 yield from self.delete_resource(new_xact)
643 except Exception:
644 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
645
646 if self._rm_regh is not None:
647 self._log.debug("Deregistering resource manager registration handle")
648 self._rm_regh.deregister()
649 self._rm_regh = None
650
651 if self._vdur_console_handler is not None:
652 self._log.error("Deregistering vnfr vdur registration handle")
653 self._vdur_console_handler._regh.deregister()
654 self._vdur_console_handler._regh = None
655
656 self._state = VDURecordState.TERMINATED
657
658 def find_internal_cp_by_cp_id(self, cp_id):
659 """ Find the CP corresponding to the connection point id"""
660 cp = None
661
662 self._log.debug("find_internal_cp_by_cp_id(%s) called",
663 cp_id)
664
665 for int_cp in self._vdud.internal_connection_point:
666 self._log.debug("Checking for int cp %s in internal connection points",
667 int_cp.id)
668 if int_cp.id == cp_id:
669 cp = int_cp
670 break
671
672 if cp is None:
673 self._log.debug("Failed to find cp %s in internal connection points",
674 cp_id)
675 msg = "Failed to find cp %s in internal connection points" % cp_id
676 raise VduRecordError(msg)
677
678 # return the VLR associated with the connection point
679 return cp
680
681 @asyncio.coroutine
682 def create_resource(self, xact, vnfr, config=None):
683 """ Request resource from ResourceMgr """
684 def find_cp_by_name(cp_name):
685 """ Find a connection point by name """
686 cp = None
687 self._log.debug("find_cp_by_name(%s) called", cp_name)
688 for ext_cp in vnfr._cprs:
689 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
690 if ext_cp.name == cp_name:
691 cp = ext_cp
692 break
693 if cp is None:
694 self._log.debug("Failed to find cp %s in external connection points",
695 cp_name)
696 return cp
697
698 def find_internal_vlr_by_cp_id(cp_id):
699 self._log.debug("find_internal_vlr_by_cp_id(%s) called",
700 cp_id)
701
702 # Validate the cp
703 cp = self.find_internal_cp_by_cp_id(cp_id)
704
705 # return the VLR associated with the connection point
706 return vnfr.find_vlr_by_cp(cp_id)
707
708 block = xact.block_create()
709
710 self._log.debug("Executing vm request id: %s, action: create",
711 self._request_id)
712
713 # Resolve the networks associated external interfaces
714 for ext_intf in self._vdud.external_interface:
715 self._log.debug("Resolving external interface name [%s], cp[%s]",
716 ext_intf.name, ext_intf.vnfd_connection_point_ref)
717 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
718 if cp is None:
719 self._log.debug("Failed to find connection point - %s",
720 ext_intf.vnfd_connection_point_ref)
721 continue
722 self._log.debug("Connection point name [%s], type[%s]",
723 cp.name, cp.type_yang)
724
725 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
726
727 etuple = (ext_intf, cp, vlr)
728 self._ext_intf.append(etuple)
729
730 self._log.debug("Created external interface tuple : %s", etuple)
731
732 # Resolve the networks associated internal interfaces
733 for intf in self._vdud.internal_interface:
734 cp_id = intf.vdu_internal_connection_point_ref
735 self._log.debug("Resolving internal interface name [%s], cp[%s]",
736 intf.name, cp_id)
737
738 try:
739 vlr = find_internal_vlr_by_cp_id(cp_id)
740 except Exception as e:
741 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
742 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
743 raise VduRecordError(msg)
744
745 ituple = (intf, cp_id, vlr)
746 self._int_intf.append(ituple)
747
748 self._log.debug("Created internal interface tuple : %s", ituple)
749
750 resmgr_path = self.resmgr_path
751 resmgr_msg = self.resmgr_msg(config)
752
753 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
754 block.add_query_create(resmgr_path, resmgr_msg)
755
756 res_iter = yield from block.execute(now=True)
757
758 resp = None
759
760 for i in res_iter:
761 r = yield from i
762 resp = r.result
763
764 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
765 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
766 self._log.debug("Got vm request response: %s", resp.resource_info)
767 return resp.resource_info
768
769 @asyncio.coroutine
770 def delete_resource(self, xact):
771 block = xact.block_create()
772
773 self._log.debug("Executing vm request id: %s, action: delete",
774 self._request_id)
775
776 block.add_query_delete(self.resmgr_path)
777
778 yield from block.execute(flags=0, now=True)
779
780 @asyncio.coroutine
781 def read_resource(self, xact):
782 block = xact.block_create()
783
784 self._log.debug("Executing vm request id: %s, action: delete",
785 self._request_id)
786
787 block.add_query_read(self.resmgr_path)
788
789 res_iter = yield from block.execute(flags=0, now=True)
790 for i in res_iter:
791 r = yield from i
792 resp = r.result
793
794 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
795 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
796 self._log.debug("Got vm request response: %s", resp.resource_info)
797 #self._vm_resp = resp.resource_info
798 return resp.resource_info
799
800
801 @asyncio.coroutine
802 def start_component(self):
803 """ This VDUR is active """
804 self._log.debug("Starting component %s for vdud %s vdur %s",
805 self._vdud.vcs_component_ref,
806 self._vdud,
807 self._vdur_id)
808 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
809 self.vm_resp.management_ip)
810
811 @property
812 def active(self):
813 """ Is this VDU active """
814 return True if self._state is VDURecordState.READY else False
815
816 @asyncio.coroutine
817 def instantiation_failed(self, failed_reason=None):
818 """ VDU instantiation failed """
819 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
820 self._state = VDURecordState.FAILED
821 self._state_failed_reason = failed_reason
822 yield from self._vnfr.instantiation_failed(failed_reason)
823
824 @asyncio.coroutine
825 def vdu_is_active(self):
826 """ This VDU is active"""
827 if self.active:
828 self._log.warning("VDU %s was already marked as active", self._vdur_id)
829 return
830
831 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
832
833 if self._vdud.vcs_component_ref is not None:
834 yield from self.start_component()
835
836 self._state = VDURecordState.READY
837
838 if self._vnfr.all_vdus_active():
839 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
840 yield from self._vnfr.is_ready()
841
842 @asyncio.coroutine
843 def instantiate(self, xact, vnfr, config=None):
844 """ Instantiate this VDU """
845 self._state = VDURecordState.INSTANTIATING
846
847 @asyncio.coroutine
848 def on_prepare(xact_info, query_action, ks_path, msg):
849 """ This VDUR is active """
850 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
851 query_action,
852 ks_path,
853 msg)
854
855 if (query_action == rwdts.QueryAction.UPDATE or
856 query_action == rwdts.QueryAction.CREATE):
857 self._vm_resp = msg
858 if msg.resource_state == "active":
859 # Move this VDU to ready state
860 yield from self.vdu_is_active()
861 elif msg.resource_state == "failed":
862 yield from self.instantiation_failed(msg.resource_errors)
863 elif query_action == rwdts.QueryAction.DELETE:
864 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
865 else:
866 raise NotImplementedError(
867 "%s action on VirtualDeployementUnitRecord not supported",
868 query_action)
869 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
870
871 try:
872 reg_event = asyncio.Event(loop=self._loop)
873
874 @asyncio.coroutine
875 def on_ready(regh, status):
876 reg_event.set()
877
878 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
879 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
880 flags=rwdts.Flag.SUBSCRIBER,
881 handler=handler)
882 yield from reg_event.wait()
883
884 vm_resp = yield from self.create_resource(xact, vnfr, config)
885 self._vm_resp = vm_resp
886 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
887 self._log.debug("Requested VM from resource manager response %s",
888 vm_resp)
889 if vm_resp.resource_state == "active":
890 self._log.debug("Resourcemgr responded wih an active vm resp %s",
891 vm_resp)
892 yield from self.vdu_is_active()
893 self._state = VDURecordState.READY
894 elif (vm_resp.resource_state == "pending" or
895 vm_resp.resource_state == "inactive"):
896 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
897 vm_resp)
898 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
899 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
900 # flags=rwdts.Flag.SUBSCRIBER,
901 # handler=handler)
902 else:
903 self._log.debug("Resourcemgr responded wih an error vm resp %s",
904 vm_resp)
905 raise VirtualDeploymentUnitRecordError(
906 "Failed VDUR instantiation %s " % vm_resp)
907
908 except Exception as e:
909 import traceback
910 traceback.print_exc()
911 self._log.exception(e)
912 self._log.error("Instantiation of VDU record failed: %s", str(e))
913 self._state = VDURecordState.FAILED
914 yield from self.instantiation_failed(str(e))
915
916
917 class VlRecordState(enum.Enum):
918 """ VL Record State """
919 INIT = 101
920 INSTANTIATION_PENDING = 102
921 ACTIVE = 103
922 TERMINATE_PENDING = 104
923 TERMINATED = 105
924 FAILED = 106
925
926
927 class InternalVirtualLinkRecord(object):
928 """ Internal Virtual Link record """
929 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
930 self._dts = dts
931 self._log = log
932 self._loop = loop
933 self._ivld_msg = ivld_msg
934 self._vnfr_name = vnfr_name
935 self._cloud_account_name = cloud_account_name
936
937 self._vlr_req = self.create_vlr()
938 self._vlr = None
939 self._state = VlRecordState.INIT
940
941 @property
942 def vlr_id(self):
943 """ Find VLR by id """
944 return self._vlr_req.id
945
946 @property
947 def name(self):
948 """ Name of this VL """
949 return self._vnfr_name + "." + self._ivld_msg.name
950
951 @property
952 def network_id(self):
953 """ Find VLR by id """
954 return self._vlr.network_id if self._vlr else None
955
956 def vlr_path(self):
957 """ VLR path for this VLR instance"""
958 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
959
960 def create_vlr(self):
961 """ Create the VLR record which will be instantiated """
962
963 vld_fields = ["short_name",
964 "vendor",
965 "description",
966 "version",
967 "type_yang",
968 "provider_network"]
969
970 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
971
972 vlr_dict = {"id": str(uuid.uuid4()),
973 "name": self.name,
974 "cloud_account": self._cloud_account_name,
975 }
976 vlr_dict.update(vld_copy_dict)
977
978 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
979 return vlr
980
981 @asyncio.coroutine
982 def instantiate(self, xact, restart_mode=False):
983 """ Instantiate VL """
984
985 @asyncio.coroutine
986 def instantiate_vlr():
987 """ Instantiate VLR"""
988 self._log.debug("Create VL with xpath %s and vlr %s",
989 self.vlr_path(), self._vlr_req)
990
991 with self._dts.transaction(flags=0) as xact:
992 block = xact.block_create()
993 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
994 self._log.debug("Executing VL create path:%s msg:%s",
995 self.vlr_path(), self._vlr_req)
996
997 res_iter = None
998 try:
999 res_iter = yield from block.execute()
1000 except Exception:
1001 self._state = VlRecordState.FAILED
1002 self._log.exception("Caught exception while instantial VL")
1003 raise
1004
1005 for ent in res_iter:
1006 res = yield from ent
1007 self._vlr = res.result
1008
1009 if self._vlr.operational_status == 'failed':
1010 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1011 self._state = VlRecordState.FAILED
1012 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1013
1014 self._log.info("Created VL with xpath %s and vlr %s",
1015 self.vlr_path(), self._vlr)
1016
1017 @asyncio.coroutine
1018 def get_vlr():
1019 """ Get the network id """
1020 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1021 vlr = None
1022 for ent in res_iter:
1023 res = yield from ent
1024 vlr = res.result
1025
1026 if vlr is None:
1027 err = "Failed to get VLR for path %s" % self.vlr_path()
1028 self._log.warn(err)
1029 raise InternalVirtualLinkRecordError(err)
1030 return vlr
1031
1032 self._state = VlRecordState.INSTANTIATION_PENDING
1033
1034 if restart_mode:
1035 vl = yield from get_vlr()
1036 if vl is None:
1037 yield from instantiate_vlr()
1038 else:
1039 yield from instantiate_vlr()
1040
1041 self._state = VlRecordState.ACTIVE
1042
1043 def vlr_in_vns(self):
1044 """ Is there a VLR record in VNS """
1045 if (self._state == VlRecordState.ACTIVE or
1046 self._state == VlRecordState.INSTANTIATION_PENDING or
1047 self._state == VlRecordState.FAILED):
1048 return True
1049
1050 return False
1051
1052 @asyncio.coroutine
1053 def terminate(self, xact):
1054 """Terminate this VL """
1055 if not self.vlr_in_vns():
1056 self._log.debug("Ignoring terminate request for id %s in state %s",
1057 self.vlr_id, self._state)
1058 return
1059
1060 self._log.debug("Terminating VL with path %s", self.vlr_path())
1061 self._state = VlRecordState.TERMINATE_PENDING
1062 block = xact.block_create()
1063 block.add_query_delete(self.vlr_path())
1064 yield from block.execute(flags=0, now=True)
1065 self._state = VlRecordState.TERMINATED
1066 self._log.debug("Terminated VL with path %s", self.vlr_path())
1067
1068
1069 class VirtualNetworkFunctionRecord(object):
1070 """ Virtual Network Function Record """
1071 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1072 self._dts = dts
1073 self._log = log
1074 self._loop = loop
1075 self._cluster_name = cluster_name
1076 self._vnfr_msg = vnfr_msg
1077 self._vnfr_id = vnfr_msg.id
1078 self._vnfd_id = vnfr_msg.vnfd.id
1079 self._vnfm = vnfm
1080 self._vcs_handler = vcs_handler
1081 self._vnfr = vnfr_msg
1082 self._mgmt_network = mgmt_network
1083
1084 self._vnfd = vnfr_msg.vnfd
1085 self._state = VirtualNetworkFunctionRecordState.INIT
1086 self._state_failed_reason = None
1087 self._ext_vlrs = {} # The list of external virtual links
1088 self._vlrs = [] # The list of internal virtual links
1089 self._vdus = [] # The list of vdu
1090 self._vlr_by_cp = {}
1091 self._cprs = []
1092 self._inventory = {}
1093 self._create_time = int(time.time())
1094 self._vnf_mon = None
1095 self._config_status = vnfr_msg.config_status
1096 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1097 self._rw_vnfd = None
1098 self._vnfd_ref_count = 0
1099
1100 def _get_vdur_from_vdu_id(self, vdu_id):
1101 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1102 self._log.debug("Searching through vdus: %s", self._vdus)
1103 for vdu in self._vdus:
1104 self._log.debug("vdu_id: %s", vdu.vdu_id)
1105 if vdu.vdu_id == vdu_id:
1106 return vdu
1107
1108 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1109
1110 @property
1111 def operational_status(self):
1112 """ Operational status of this VNFR """
1113 op_status_map = {"INIT": "init",
1114 "VL_INIT_PHASE": "vl_init_phase",
1115 "VM_INIT_PHASE": "vm_init_phase",
1116 "READY": "running",
1117 "TERMINATE": "terminate",
1118 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1119 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1120 "TERMINATED": "terminated",
1121 "FAILED": "failed", }
1122 return op_status_map[self._state.name]
1123
1124 @staticmethod
1125 def vnfd_xpath(vnfd_id):
1126 """ VNFD xpath associated with this VNFR """
1127 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1128
1129 @property
1130 def vnfd_ref_count(self):
1131 """ Returns the VNFD reference count associated with this VNFR """
1132 return self._vnfd_ref_count
1133
1134 def vnfd_in_use(self):
1135 """ Returns whether vnfd is in use or not """
1136 return True if self._vnfd_ref_count > 0 else False
1137
1138 def vnfd_ref(self):
1139 """ Take a reference on this object """
1140 self._vnfd_ref_count += 1
1141 return self._vnfd_ref_count
1142
1143 def vnfd_unref(self):
1144 """ Release reference on this object """
1145 if self._vnfd_ref_count < 1:
1146 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1147 (self.vnfd.id, self._vnfd_ref_count))
1148 self._log.critical(msg)
1149 raise VnfRecordError(msg)
1150 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1151 self.vnfd.id, self._vnfd_ref_count)
1152 self._vnfd_ref_count -= 1
1153 return self._vnfd_ref_count
1154
1155 @property
1156 def vnfd(self):
1157 """ VNFD for this VNFR """
1158 return self._vnfd
1159
1160 @property
1161 def vnf_name(self):
1162 """ VNFD name associated with this VNFR """
1163 return self.vnfd.name
1164
1165 @property
1166 def name(self):
1167 """ Name of this VNF in the record """
1168 return self._vnfr.name
1169
1170 @property
1171 def cloud_account_name(self):
1172 """ Name of the cloud account this VNFR is instantiated in """
1173 return self._vnfr.cloud_account
1174
1175 @property
1176 def vnfd_id(self):
1177 """ VNFD Id associated with this VNFR """
1178 return self.vnfd.id
1179
1180 @property
1181 def vnfr_id(self):
1182 """ VNFR Id associated with this VNFR """
1183 return self._vnfr_id
1184
1185 @property
1186 def member_vnf_index(self):
1187 """ Member VNF index associated with this VNFR """
1188 return self._vnfr.member_vnf_index_ref
1189
1190 @property
1191 def config_status(self):
1192 """ Config agent status for this VNFR """
1193 return self._config_status
1194
1195 def component_by_name(self, component_name):
1196 """ Find a component by name in the inventory list"""
1197 mangled_name = VcsComponent.mangle_name(component_name,
1198 self.vnf_name,
1199 self.vnfd_id)
1200 return self._inventory[mangled_name]
1201
1202
1203
1204 @asyncio.coroutine
1205 def get_nsr_config(self):
1206 ### Need access to NS instance configuration for runtime resolution.
1207 ### This shall be replaced when deployment flavors are implemented
1208 xpath = "C,/nsr:ns-instance-config"
1209 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1210
1211 for result in results:
1212 entry = yield from result
1213 ns_instance_config = entry.result
1214 for nsr in ns_instance_config.nsr:
1215 if nsr.id == self._vnfr_msg.nsr_id_ref:
1216 return nsr
1217 return None
1218
1219 @asyncio.coroutine
1220 def start_component(self, component_name, ip_addr):
1221 """ Start a component in the VNFR by name """
1222 comp = self.component_by_name(component_name)
1223 yield from comp.start(None, None, ip_addr)
1224
1225 def cp_ip_addr(self, cp_name):
1226 """ Get ip address for connection point """
1227 self._log.debug("cp_ip_addr()")
1228 for cp in self._cprs:
1229 if cp.name == cp_name and cp.ip_address is not None:
1230 return cp.ip_address
1231 return "0.0.0.0"
1232
1233 def mgmt_intf_info(self):
1234 """ Get Management interface info for this VNFR """
1235 mgmt_intf_desc = self.vnfd.mgmt_interface
1236 ip_addr = None
1237 if mgmt_intf_desc.has_field("cp"):
1238 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1239 elif mgmt_intf_desc.has_field("vdu_id"):
1240 try:
1241 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1242 ip_addr = vdur.management_ip
1243 except VDURecordNotFound:
1244 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1245 ip_addr = None
1246 else:
1247 ip_addr = mgmt_intf_desc.ip_address
1248 port = mgmt_intf_desc.port
1249
1250 return ip_addr, port
1251
1252 @property
1253 def msg(self):
1254 """ Message associated with this VNFR """
1255 vnfd_fields = ["short_name", "vendor", "description", "version"]
1256 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1257
1258 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1259 ip_address, port = self.mgmt_intf_info()
1260 if ip_address is not None:
1261 mgmt_intf.ip_address = ip_address
1262 if port is not None:
1263 mgmt_intf.port = port
1264
1265 vnfr_dict = {"id": self._vnfr_id,
1266 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1267 "name": self.name,
1268 "member_vnf_index_ref": self.member_vnf_index,
1269 "operational_status": self.operational_status,
1270 "operational_status_details": self._state_failed_reason,
1271 "cloud_account": self.cloud_account_name,
1272 "config_status": self._config_status
1273 }
1274
1275 vnfr_dict.update(vnfd_copy_dict)
1276
1277 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1278 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1279
1280 vnfr_msg.create_time = self._create_time
1281 vnfr_msg.uptime = int(time.time()) - self._create_time
1282 vnfr_msg.mgmt_interface = mgmt_intf
1283
1284 # Add all the VLRs to VNFR
1285 for vlr in self._vlrs:
1286 ivlr = vnfr_msg.internal_vlr.add()
1287 ivlr.vlr_ref = vlr.vlr_id
1288
1289 # Add all the VDURs to VDUR
1290 if self._vdus is not None:
1291 for vdu in self._vdus:
1292 vdur = vnfr_msg.vdur.add()
1293 vdur.from_dict(vdu.msg.as_dict())
1294
1295 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1296 vnfr_msg.dashboard_url = self.dashboard_url
1297
1298 for cpr in self._cprs:
1299 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1300 vnfr_msg.connection_point.append(new_cp)
1301
1302 if self._vnf_mon is not None:
1303 for monp in self._vnf_mon.msg:
1304 vnfr_msg.monitoring_param.append(
1305 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1306
1307 if self._vnfr.vnf_configuration is not None:
1308 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1309 if (ip_address is not None and
1310 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1311 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1312
1313 for group in self._vnfr_msg.placement_groups_info:
1314 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1315 group_info.from_dict(group.as_dict())
1316 vnfr_msg.placement_groups_info.append(group_info)
1317
1318 return vnfr_msg
1319
1320 @property
1321 def dashboard_url(self):
1322 ip, cfg_port = self.mgmt_intf_info()
1323 protocol = 'http'
1324 http_port = 80
1325 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1326 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1327 protocol = 'https'
1328 http_port = 443
1329 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1330 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1331
1332 url = "{protocol}://{ip_address}:{port}/{path}".format(
1333 protocol=protocol,
1334 ip_address=ip,
1335 port=http_port,
1336 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1337 )
1338
1339 return url
1340
1341 @property
1342 def xpath(self):
1343 """ path for this VNFR """
1344 return("D,/vnfr:vnfr-catalog"
1345 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1346
1347 @asyncio.coroutine
1348 def publish(self, xact):
1349 """ publish this VNFR """
1350 vnfr = self.msg
1351 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1352 self.xpath, self.msg)
1353 vnfr.create_time = self._create_time
1354 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1355 self._log.debug("Published VNFR path = [%s], record = [%s]",
1356 self.xpath, self.msg)
1357
1358 @asyncio.coroutine
1359 def create_vls(self):
1360 """ Publish The VLs associated with this VNF """
1361 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1362 self.vnfd_id)
1363 for ivld_msg in self.vnfd.internal_vld:
1364 self._log.debug("Creating internal vld:"
1365 " %s, int_cp_ref = %s",
1366 ivld_msg, ivld_msg.internal_connection_point
1367 )
1368 vlr = InternalVirtualLinkRecord(dts=self._dts,
1369 log=self._log,
1370 loop=self._loop,
1371 ivld_msg=ivld_msg,
1372 vnfr_name=self.name,
1373 cloud_account_name=self.cloud_account_name
1374 )
1375 self._vlrs.append(vlr)
1376
1377 for int_cp in ivld_msg.internal_connection_point:
1378 if int_cp.id_ref in self._vlr_by_cp:
1379 msg = ("Connection point %s already "
1380 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1381 raise InternalVirtualLinkRecordError(msg)
1382 self._log.debug("Setting vlr %s to internal cp = %s",
1383 vlr, int_cp.id_ref)
1384 self._vlr_by_cp[int_cp.id_ref] = vlr
1385
1386 @asyncio.coroutine
1387 def instantiate_vls(self, xact, restart_mode=False):
1388 """ Instantiate the VLs associated with this VNF """
1389 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1390 self.vnfd_id)
1391
1392 for vlr in self._vlrs:
1393 self._log.debug("Instantiating VLR %s", vlr)
1394 yield from vlr.instantiate(xact, restart_mode)
1395
1396 def find_vlr_by_cp(self, cp_name):
1397 """ Find the VLR associated with the cp name """
1398 return self._vlr_by_cp[cp_name]
1399
1400 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1401 """
1402 Returns the cloud specific construct for placement group
1403 Arguments:
1404 input_group: VNFD PlacementGroup
1405 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1406 """
1407 copy_dict = ['name', 'requirement', 'strategy']
1408 for group_info in nsr_config.vnfd_placement_group_maps:
1409 if group_info.placement_group_ref == input_group.name and \
1410 group_info.vnfd_id_ref == self.vnfd_id:
1411 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1412 group_dict = {k:v for k,v in
1413 group_info.as_dict().items()
1414 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1415 for param in copy_dict:
1416 group_dict.update({param: getattr(input_group, param)})
1417 group.from_dict(group_dict)
1418 return group
1419 return None
1420
1421 @asyncio.coroutine
1422 def get_vdu_placement_groups(self, vdu):
1423 placement_groups = []
1424 ### Step-1: Get VNF level placement groups
1425 for group in self._vnfr_msg.placement_groups_info:
1426 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1427 #group_info.from_dict(group.as_dict())
1428 placement_groups.append(group)
1429
1430 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1431 nsr_config = yield from self.get_nsr_config()
1432
1433 ### Step-3: Get VDU level placement groups
1434 for group in self.vnfd.placement_groups:
1435 for member_vdu in group.member_vdus:
1436 if member_vdu.member_vdu_ref == vdu.id:
1437 group_info = self.resolve_placement_group_cloud_construct(group,
1438 nsr_config)
1439 if group_info is None:
1440 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1441 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1442 else:
1443 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1444 str(group_info),
1445 vdu.name,
1446 self.vnf_name,
1447 self.member_vnf_index)
1448 placement_groups.append(group_info)
1449
1450 return placement_groups
1451
1452 @asyncio.coroutine
1453 def create_vdus(self, vnfr, restart_mode=False):
1454 """ Create the VDUs associated with this VNF """
1455
1456 def get_vdur_id(vdud):
1457 """Get the corresponding VDUR's id for the VDUD. This is useful in
1458 case of a restart.
1459
1460 In restart mode we check for exiting VDUR's ID and use them, if
1461 available. This way we don't end up creating duplicate VDURs
1462 """
1463 vdur_id = None
1464
1465 if restart_mode and vdud is not None:
1466 try:
1467 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1468 vdur_id = vdur[0]
1469 except IndexError:
1470 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1471
1472 return vdur_id
1473
1474
1475 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1476 for vdu in self._rw_vnfd.vdu:
1477 self._log.debug("Creating vdu: %s", vdu)
1478 vdur_id = get_vdur_id(vdu)
1479
1480 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1481 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1482 vdu.name,
1483 self.vnf_name,
1484 self.member_vnf_index,
1485 [ group.name for group in placement_groups])
1486
1487 vdur = VirtualDeploymentUnitRecord(
1488 dts=self._dts,
1489 log=self._log,
1490 loop=self._loop,
1491 vdud=vdu,
1492 vnfr=vnfr,
1493 mgmt_intf=self.has_mgmt_interface(vdu),
1494 mgmt_network=self._mgmt_network,
1495 cloud_account_name=self.cloud_account_name,
1496 vnfd_package_store=self._vnfd_package_store,
1497 vdur_id=vdur_id,
1498 placement_groups = placement_groups,
1499 )
1500 yield from vdur.vdu_opdata_register()
1501
1502 self._vdus.append(vdur)
1503
1504 @asyncio.coroutine
1505 def instantiate_vdus(self, xact, vnfr):
1506 """ Instantiate the VDUs associated with this VNF """
1507 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1508
1509 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1510
1511 # Identify any dependencies among the VDUs
1512 dependencies = collections.defaultdict(list)
1513 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1514
1515 for vdu in self._vdus:
1516 if vdu.vdud_cloud_init is not None:
1517 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1518 if vdu_id != vdu.vdu_id:
1519 # This means that vdu.vdu_id depends upon vdu_id,
1520 # i.e. vdu_id must be instantiated before
1521 # vdu.vdu_id.
1522 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1523
1524 # Define the terminal states of VDU instantiation
1525 terminal = (
1526 VDURecordState.READY,
1527 VDURecordState.TERMINATED,
1528 VDURecordState.FAILED,
1529 )
1530
1531 datastore = VdurDatastore()
1532 processed = set()
1533
1534 @asyncio.coroutine
1535 def instantiate_monitor(vdu):
1536 """Monitor the state of the VDU during instantiation
1537
1538 Arguments:
1539 vdu - a VirtualDeploymentUnitRecord
1540
1541 """
1542 # wait for the VDUR to enter a terminal state
1543 while vdu._state not in terminal:
1544 yield from asyncio.sleep(1, loop=self._loop)
1545
1546 # update the datastore
1547 datastore.update(vdu)
1548
1549 # add the VDU to the set of processed VDUs
1550 processed.add(vdu.vdu_id)
1551
1552 @asyncio.coroutine
1553 def instantiate(vdu):
1554 """Instantiate the specified VDU
1555
1556 Arguments:
1557 vdu - a VirtualDeploymentUnitRecord
1558
1559 Raises:
1560 if the VDU, or any of the VDUs this VDU depends upon, are
1561 terminated or fail to instantiate properly, a
1562 VirtualDeploymentUnitRecordError is raised.
1563
1564 """
1565 for dependency in dependencies[vdu.vdu_id]:
1566 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1567
1568 while dependency.vdu_id not in processed:
1569 yield from asyncio.sleep(1, loop=self._loop)
1570
1571 if not dependency.active:
1572 raise VirtualDeploymentUnitRecordError()
1573
1574 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1575
1576 # Populate the datastore with the current values of the VDU
1577 datastore.add(vdu)
1578
1579 # Substitute any variables contained in the cloud config script
1580 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1581
1582 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1583 if len(parts) > 1:
1584
1585 # Extract the variable names
1586 variables = list()
1587 for variable in parts[1::2]:
1588 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1589
1590 # Iterate of the variables and substitute values from the
1591 # datastore.
1592 for variable in variables:
1593
1594 # Handle a reference to a VDU by ID
1595 if variable.startswith('vdu['):
1596 value = datastore.get(variable)
1597 if value is None:
1598 msg = "Unable to find a substitute for {} in {} cloud-init script"
1599 raise ValueError(msg.format(variable, vdu.vdu_id))
1600
1601 config = config.replace("{{ %s }}" % variable, value)
1602 continue
1603
1604 # Handle a reference to the current VDU
1605 if variable.startswith('vdu'):
1606 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1607 config = config.replace("{{ %s }}" % variable, value)
1608 continue
1609
1610 # Handle unrecognized variables
1611 msg = 'unrecognized cloud-config variable: {}'
1612 raise ValueError(msg.format(variable))
1613
1614 # Instantiate the VDU
1615 with self._dts.transaction() as xact:
1616 self._log.debug("Instantiating vdu: %s", vdu)
1617 yield from vdu.instantiate(xact, vnfr, config=config)
1618 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1619 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1620 self.vnfr_id, vdu)
1621
1622 # First create a set of tasks to monitor the state of the VDUs and
1623 # report when they have entered a terminal state
1624 for vdu in self._vdus:
1625 self._loop.create_task(instantiate_monitor(vdu))
1626
1627 for vdu in self._vdus:
1628 self._loop.create_task(instantiate(vdu))
1629
1630 def has_mgmt_interface(self, vdu):
1631 # ## TODO: Support additional mgmt_interface type options
1632 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1633 return True
1634 return False
1635
1636 def vlr_xpath(self, vlr_id):
1637 """ vlr xpath """
1638 return(
1639 "D,/vlr:vlr-catalog/"
1640 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1641
1642 def ext_vlr_by_id(self, vlr_id):
1643 """ find ext vlr by id """
1644 return self._ext_vlrs[vlr_id]
1645
1646 @asyncio.coroutine
1647 def publish_inventory(self, xact):
1648 """ Publish the inventory associated with this VNF """
1649 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1650
1651 for component in self._rw_vnfd.component:
1652 self._log.debug("Creating inventory component %s", component)
1653 mangled_name = VcsComponent.mangle_name(component.component_name,
1654 self.vnf_name,
1655 self.vnfd_id
1656 )
1657 comp = VcsComponent(dts=self._dts,
1658 log=self._log,
1659 loop=self._loop,
1660 cluster_name=self._cluster_name,
1661 vcs_handler=self._vcs_handler,
1662 component=component,
1663 mangled_name=mangled_name,
1664 )
1665 if comp.name in self._inventory:
1666 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1667 component, self._vnfd_id)
1668 return
1669 self._log.debug("Adding component %s for vnrf %s",
1670 comp.name, self._vnfr_id)
1671 self._inventory[comp.name] = comp
1672 yield from comp.publish(xact)
1673
1674 def all_vdus_active(self):
1675 """ Are all VDUS in this VNFR active? """
1676 for vdu in self._vdus:
1677 if not vdu.active:
1678 return False
1679
1680 self._log.debug("Inside all_vdus_active. Returning True")
1681 return True
1682
1683 @asyncio.coroutine
1684 def instantiation_failed(self, failed_reason=None):
1685 """ VNFR instantiation failed """
1686 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1687 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1688 self._state_failed_reason = failed_reason
1689
1690 # Update the VNFR with the changed status
1691 yield from self.publish(None)
1692
1693 @asyncio.coroutine
1694 def is_ready(self):
1695 """ This VNF is ready"""
1696 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1697
1698 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1699 self.set_state(VirtualNetworkFunctionRecordState.READY)
1700
1701 else:
1702 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1703
1704 # Update the VNFR with the changed status
1705 yield from self.publish(None)
1706
1707 def update_cp(self, cp_name, ip_address, cp_id):
1708 """Updated the connection point with ip address"""
1709 for cp in self._cprs:
1710 if cp.name == cp_name:
1711 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1712 cp_name, cp, ip_address, cp_id)
1713 cp.ip_address = ip_address
1714 cp.connection_point_id = cp_id
1715 return
1716
1717 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1718 self._log.debug(err)
1719 raise VirtualDeploymentUnitRecordError(err)
1720
1721 def set_state(self, state):
1722 """ Set state for this VNFR"""
1723 self._state = state
1724
1725 @asyncio.coroutine
1726 def instantiate(self, xact, restart_mode=False):
1727 """ instantiate this VNF """
1728 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1729 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1730
1731 @asyncio.coroutine
1732 def fetch_vlrs():
1733 """ Fetch VLRs """
1734 # Iterate over all the connection points in VNFR and fetch the
1735 # associated VLRs
1736
1737 def cpr_from_cp(cp):
1738 """ Creates a record level connection point from the desciptor cp"""
1739 cp_fields = ["name", "image", "vm-flavor", "static_ip_address"]
1740 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1741 cpr_dict = {}
1742 cpr_dict.update(cp_copy_dict)
1743 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1744
1745 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1746 self._vnfr_id, self._vnfr.connection_point)
1747
1748 for cp in self._vnfr.connection_point:
1749 cpr = cpr_from_cp(cp)
1750 self._cprs.append(cpr)
1751 self._log.debug("Adding Connection point record %s ", cp)
1752
1753 vlr_path = self.vlr_xpath(cp.vlr_ref)
1754 self._log.debug("Fetching VLR with path = %s", vlr_path)
1755 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1756 rwdts.XactFlag.MERGE)
1757 for i in res_iter:
1758 r = yield from i
1759 d = r.result
1760 self._ext_vlrs[cp.vlr_ref] = d
1761 cpr.vlr_ref = cp.vlr_ref
1762 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1763
1764 # Increase the VNFD reference count
1765 self.vnfd_ref()
1766
1767 assert self.vnfd
1768
1769 # Fetch External VLRs
1770 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1771 yield from fetch_vlrs()
1772
1773 # Publish inventory
1774 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1775 yield from self.publish_inventory(xact)
1776
1777 # Publish inventory
1778 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1779 yield from self.create_vls()
1780
1781 # publish the VNFR
1782 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1783 yield from self.publish(xact)
1784
1785 # instantiate VLs
1786 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1787 try:
1788 yield from self.instantiate_vls(xact, restart_mode)
1789 except Exception as e:
1790 self._log.exception("VL instantiation failed (%s)", str(e))
1791 yield from self.instantiation_failed(str(e))
1792 return
1793
1794 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1795
1796 # instantiate VDUs
1797 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1798 yield from self.create_vdus(self, restart_mode)
1799
1800 # publish the VNFR
1801 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1802 yield from self.publish(xact)
1803
1804 # instantiate VDUs
1805 # ToDo: Check if this should be prevented during restart
1806 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1807 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1808
1809 # publish the VNFR
1810 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1811 yield from self.publish(xact)
1812
1813 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1814
1815 # create task updating uptime for this vnfr
1816 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1817 self._loop.create_task(self.vnfr_uptime_update(xact))
1818
1819 @asyncio.coroutine
1820 def terminate(self, xact):
1821 """ Terminate this virtual network function """
1822
1823 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1824
1825 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1826
1827 # stop monitoring
1828 if self._vnf_mon is not None:
1829 self._vnf_mon.stop()
1830 self._vnf_mon.deregister()
1831 self._vnf_mon = None
1832
1833 @asyncio.coroutine
1834 def terminate_vls():
1835 """ Terminate VLs in this VNF """
1836 for vl in self._vlrs:
1837 yield from vl.terminate(xact)
1838
1839 @asyncio.coroutine
1840 def terminate_vdus():
1841 """ Terminate VDUS in this VNF """
1842 for vdu in self._vdus:
1843 yield from vdu.terminate(xact)
1844
1845 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1846 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1847 yield from terminate_vls()
1848
1849 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1850 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1851 yield from terminate_vdus()
1852
1853 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1854 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1855
1856 @asyncio.coroutine
1857 def vnfr_uptime_update(self, xact):
1858 while True:
1859 # Return when vnfr state is FAILED or TERMINATED etc
1860 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1861 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1862 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1863 VirtualNetworkFunctionRecordState.READY]:
1864 return
1865 yield from self.publish(xact)
1866 yield from asyncio.sleep(2, loop=self._loop)
1867
1868
1869
1870 class VnfdDtsHandler(object):
1871 """ DTS handler for VNFD config changes """
1872 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1873
1874 def __init__(self, dts, log, loop, vnfm):
1875 self._dts = dts
1876 self._log = log
1877 self._loop = loop
1878 self._vnfm = vnfm
1879 self._regh = None
1880
1881 @asyncio.coroutine
1882 def regh(self):
1883 """ DTS registration handle """
1884 return self._regh
1885
1886 @asyncio.coroutine
1887 def register(self):
1888 """ Register for VNFD configuration"""
1889
1890 def on_apply(dts, acg, xact, action, scratch):
1891 """Apply the configuration"""
1892 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1893 xact, action, scratch)
1894
1895 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1896
1897 @asyncio.coroutine
1898 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1899 """ on prepare callback """
1900 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1901 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1902 fref = ProtobufC.FieldReference.alloc()
1903 fref.goto_whole_message(msg.to_pbcm())
1904
1905 # Handle deletes in prepare_callback
1906 if fref.is_field_deleted():
1907 # Delete an VNFD record
1908 self._log.debug("Deleting VNFD with id %s", msg.id)
1909 if self._vnfm.vnfd_in_use(msg.id):
1910 self._log.debug("Cannot delete VNFD in use - %s", msg)
1911 err = "Cannot delete a VNFD in use - %s" % msg
1912 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1913 # Delete a VNFD record
1914 yield from self._vnfm.delete_vnfd(msg.id)
1915
1916 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1917
1918 self._log.debug(
1919 "Registering for VNFD config using xpath: %s",
1920 VnfdDtsHandler.XPATH,
1921 )
1922 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1923 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1924 self._regh = acg.register(
1925 xpath=VnfdDtsHandler.XPATH,
1926 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1927 on_prepare=on_prepare)
1928
1929
1930 class VcsComponentDtsHandler(object):
1931 """ Vcs Component DTS handler """
1932 XPATH = ("D,/rw-manifest:manifest" +
1933 "/rw-manifest:operational-inventory" +
1934 "/rw-manifest:component")
1935
1936 def __init__(self, dts, log, loop, vnfm):
1937 self._dts = dts
1938 self._log = log
1939 self._loop = loop
1940 self._regh = None
1941 self._vnfm = vnfm
1942
1943 @property
1944 def regh(self):
1945 """ DTS registration handle """
1946 return self._regh
1947
1948 @asyncio.coroutine
1949 def register(self):
1950 """ Registers VCS component dts publisher registration"""
1951 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1952 VcsComponentDtsHandler.XPATH)
1953
1954 hdl = rift.tasklets.DTS.RegistrationHandler()
1955 handlers = rift.tasklets.Group.Handler()
1956 with self._dts.group_create(handler=handlers) as group:
1957 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1958 handler=hdl,
1959 flags=(rwdts.Flag.PUBLISHER |
1960 rwdts.Flag.NO_PREP_READ |
1961 rwdts.Flag.DATASTORE),)
1962
1963 @asyncio.coroutine
1964 def publish(self, xact, path, msg):
1965 """ Publishes the VCS component """
1966 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1967 xact, path, msg)
1968 self.regh.create_element(path, msg)
1969 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1970 VcsComponentDtsHandler.XPATH, xact, path, msg)
1971
1972 class VnfrConsoleOperdataDtsHandler(object):
1973 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1974 @property
1975 def vnfr_vdu_console_xpath(self):
1976 """ path for resource-mgr"""
1977 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1978
1979 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1980 self._dts = dts
1981 self._log = log
1982 self._loop = loop
1983 self._regh = None
1984 self._vnfm = vnfm
1985
1986 self._vnfr_id = vnfr_id
1987 self._vdur_id = vdur_id
1988 self._vdu_id = vdu_id
1989
1990 @asyncio.coroutine
1991 def register(self):
1992 """ Register for VNFR VDU Operational Data read from dts """
1993
1994 @asyncio.coroutine
1995 def on_prepare(xact_info, action, ks_path, msg):
1996 """ prepare callback from dts """
1997 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
1998 self._log.debug(
1999 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2000 xact_info, action, xpath, msg
2001 )
2002
2003 if action == rwdts.QueryAction.READ:
2004 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2005 path_entry = schema.keyspec_to_entry(ks_path)
2006 self._log.debug("VDU Opdata path is {}".format(path_entry))
2007 try:
2008 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2009 except VnfRecordError as e:
2010 self._log.error("VNFR id %s not found", self._vnfr_id)
2011 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2012 return
2013 try:
2014 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2015 if not vdur._state == VDURecordState.READY:
2016 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2017 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2018 return
2019 with self._dts.transaction() as new_xact:
2020 resp = yield from vdur.read_resource(new_xact)
2021 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2022 vdur_console.id = self._vdur_id
2023 if resp.console_url:
2024 vdur_console.console_url = resp.console_url
2025 else:
2026 vdur_console.console_url = 'none'
2027 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2028 except Exception:
2029 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2030 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2031 vdur_console.id = self._vdur_id
2032 vdur_console.console_url = 'none'
2033
2034 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2035 xpath=self.vnfr_vdu_console_xpath,
2036 msg=vdur_console)
2037 else:
2038 #raise VnfRecordError("Not supported operation %s" % action)
2039 self._log.error("Not supported operation %s" % action)
2040 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2041 return
2042
2043
2044 self._log.debug("Registering for VNFR VDU using xpath: %s",
2045 self.vnfr_vdu_console_xpath)
2046 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2047 with self._dts.group_create() as group:
2048 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2049 handler=hdl,
2050 flags=rwdts.Flag.PUBLISHER,
2051 )
2052
2053
2054 class VnfrDtsHandler(object):
2055 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2056 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2057
2058 def __init__(self, dts, log, loop, vnfm):
2059 self._dts = dts
2060 self._log = log
2061 self._loop = loop
2062 self._vnfm = vnfm
2063
2064 self._regh = None
2065
2066 @property
2067 def regh(self):
2068 """ Return registration handle"""
2069 return self._regh
2070
2071 @property
2072 def vnfm(self):
2073 """ Return VNF manager instance """
2074 return self._vnfm
2075
2076 @asyncio.coroutine
2077 def register(self):
2078 """ Register for vnfr create/update/delete/read requests from dts """
2079 def on_commit(xact_info):
2080 """ The transaction has been committed """
2081 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2082 return rwdts.MemberRspCode.ACTION_OK
2083
2084 def on_abort(*args):
2085 """ Abort callback """
2086 self._log.debug("VNF transaction got aborted")
2087
2088 @asyncio.coroutine
2089 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2090
2091 @asyncio.coroutine
2092 def instantiate_realloc_vnfr(vnfr):
2093 """Re-populate the vnfm after restart
2094
2095 Arguments:
2096 vlink
2097
2098 """
2099
2100 yield from vnfr.instantiate(None, restart_mode=True)
2101
2102 if xact_event == rwdts.MemberEvent.INSTALL:
2103 curr_cfg = self.regh.elements
2104 for cfg in curr_cfg:
2105 vnfr = self.vnfm.create_vnfr(cfg)
2106 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2107
2108 self._log.debug("Got on_event in vnfm")
2109
2110 return rwdts.MemberRspCode.ACTION_OK
2111
2112 @asyncio.coroutine
2113 def on_prepare(xact_info, action, ks_path, msg):
2114 """ prepare callback from dts """
2115 self._log.debug(
2116 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2117 xact_info, action, msg
2118 )
2119
2120 if action == rwdts.QueryAction.CREATE:
2121 if not msg.has_field("vnfd"):
2122 err = "Vnfd not provided"
2123 self._log.error(err)
2124 raise VnfRecordError(err)
2125
2126 vnfr = self.vnfm.create_vnfr(msg)
2127 try:
2128 # RIFT-9105: Unable to add a READ query under an existing transaction
2129 # xact = xact_info.xact
2130 yield from vnfr.instantiate(None)
2131 except Exception as e:
2132 self._log.exception(e)
2133 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2134 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2135 yield from vnfr.publish(None)
2136 elif action == rwdts.QueryAction.DELETE:
2137 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2138 path_entry = schema.keyspec_to_entry(ks_path)
2139 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2140
2141 if vnfr is None:
2142 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2143 raise VirtualNetworkFunctionRecordNotFound(
2144 "VNFR id %s", path_entry.key00.id)
2145
2146 try:
2147 yield from vnfr.terminate(xact_info.xact)
2148 # Unref the VNFD
2149 vnfr.vnfd_unref()
2150 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2151 except Exception as e:
2152 self._log.exception(e)
2153 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2154
2155 elif action == rwdts.QueryAction.UPDATE:
2156 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2157 path_entry = schema.keyspec_to_entry(ks_path)
2158 vnfr = None
2159 try:
2160 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2161 except Exception as e:
2162 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2163 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2164 return
2165
2166 if vnfr is None:
2167 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2168 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2169 return
2170
2171 self._log.debug("VNFR {} update config status {} (current {})".
2172 format(vnfr.name, msg.config_status, vnfr.config_status))
2173 # Update the config status and publish
2174 vnfr._config_status = msg.config_status
2175 yield from vnfr.publish(None)
2176
2177 else:
2178 raise NotImplementedError(
2179 "%s action on VirtualNetworkFunctionRecord not supported",
2180 action)
2181
2182 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2183
2184 self._log.debug("Registering for VNFR using xpath: %s",
2185 VnfrDtsHandler.XPATH,)
2186
2187 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2188 on_prepare=on_prepare,)
2189 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2190 with self._dts.group_create(handler=handlers) as group:
2191 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2192 handler=hdl,
2193 flags=(rwdts.Flag.PUBLISHER |
2194 rwdts.Flag.NO_PREP_READ |
2195 rwdts.Flag.CACHE |
2196 rwdts.Flag.DATASTORE),)
2197
2198 @asyncio.coroutine
2199 def create(self, xact, path, msg):
2200 """
2201 Create a VNFR record in DTS with path and message
2202 """
2203 self._log.debug("Creating VNFR xact = %s, %s:%s",
2204 xact, path, msg)
2205
2206 self.regh.create_element(path, msg)
2207 self._log.debug("Created VNFR xact = %s, %s:%s",
2208 xact, path, msg)
2209
2210 @asyncio.coroutine
2211 def update(self, xact, path, msg):
2212 """
2213 Update a VNFR record in DTS with path and message
2214 """
2215 self._log.debug("Updating VNFR xact = %s, %s:%s",
2216 xact, path, msg)
2217 self.regh.update_element(path, msg)
2218 self._log.debug("Updated VNFR xact = %s, %s:%s",
2219 xact, path, msg)
2220
2221 @asyncio.coroutine
2222 def delete(self, xact, path):
2223 """
2224 Delete a VNFR record in DTS with path and message
2225 """
2226 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2227 self.regh.delete_element(path)
2228 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2229
2230
2231 class VnfdRefCountDtsHandler(object):
2232 """ The VNFD Ref Count DTS handler """
2233 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2234
2235 def __init__(self, dts, log, loop, vnfm):
2236 self._dts = dts
2237 self._log = log
2238 self._loop = loop
2239 self._vnfm = vnfm
2240
2241 self._regh = None
2242
2243 @property
2244 def regh(self):
2245 """ Return registration handle """
2246 return self._regh
2247
2248 @property
2249 def vnfm(self):
2250 """ Return the NS manager instance """
2251 return self._vnfm
2252
2253 @asyncio.coroutine
2254 def register(self):
2255 """ Register for VNFD ref count read from dts """
2256
2257 @asyncio.coroutine
2258 def on_prepare(xact_info, action, ks_path, msg):
2259 """ prepare callback from dts """
2260 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2261 self._log.debug(
2262 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2263 xact_info, action, xpath, msg
2264 )
2265
2266 if action == rwdts.QueryAction.READ:
2267 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2268 path_entry = schema.keyspec_to_entry(ks_path)
2269 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2270 for xpath, msg in vnfd_list:
2271 self._log.debug("Responding to ref count query path:%s, msg:%s",
2272 xpath, msg)
2273 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2274 xpath=xpath,
2275 msg=msg)
2276 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2277 else:
2278 raise VnfRecordError("Not supported operation %s" % action)
2279
2280 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2281 with self._dts.group_create() as group:
2282 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2283 handler=hdl,
2284 flags=rwdts.Flag.PUBLISHER,
2285 )
2286
2287
2288 class VdurDatastore(object):
2289 """
2290 This VdurDatastore is intended to expose select information about a VDUR
2291 such that it can be referenced in a cloud config file. The data that is
2292 exposed does not necessarily follow the structure of the data in the yang
2293 model. This is intentional. The data that are exposed are intended to be
2294 agnostic of the yang model so that changes in the model do not necessarily
2295 require changes to the interface provided to the user. It also means that
2296 the user does not need to be familiar with the RIFT.ware yang models.
2297 """
2298
2299 def __init__(self):
2300 """Create an instance of VdurDatastore"""
2301 self._vdur_data = dict()
2302 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2303
2304 def add(self, vdur):
2305 """Add a new VDUR to the datastore
2306
2307 Arguments:
2308 vdur - a VirtualDeploymentUnitRecord instance
2309
2310 Raises:
2311 A ValueError is raised if the VDUR is (1) None or (2) already in
2312 the datastore.
2313
2314 """
2315 if vdur.vdu_id is None:
2316 raise ValueError('VDURs are required to have an ID')
2317
2318 if vdur.vdu_id in self._vdur_data:
2319 raise ValueError('cannot add a VDUR more than once')
2320
2321 self._vdur_data[vdur.vdu_id] = dict()
2322
2323 def set_if_not_none(key, attr):
2324 if attr is not None:
2325 self._vdur_data[vdur.vdu_id][key] = attr
2326
2327 set_if_not_none('name', vdur._vdud.name)
2328 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2329
2330 def update(self, vdur):
2331 """Update the VDUR information in the datastore
2332
2333 Arguments:
2334 vdur - a GI representation of a VDUR
2335
2336 Raises:
2337 A ValueError is raised if the VDUR is (1) None or (2) already in
2338 the datastore.
2339
2340 """
2341 if vdur.vdu_id is None:
2342 raise ValueError('VNFDs are required to have an ID')
2343
2344 if vdur.vdu_id not in self._vdur_data:
2345 raise ValueError('VNF is not recognized')
2346
2347 def set_or_delete(key, attr):
2348 if attr is None:
2349 if key in self._vdur_data[vdur.vdu_id]:
2350 del self._vdur_data[vdur.vdu_id][key]
2351
2352 else:
2353 self._vdur_data[vdur.vdu_id][key] = attr
2354
2355 set_or_delete('name', vdur._vdud.name)
2356 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2357
2358 def remove(self, vdur_id):
2359 """Remove all of the data associated with specified VDUR
2360
2361 Arguments:
2362 vdur_id - the identifier of a VNFD in the datastore
2363
2364 Raises:
2365 A ValueError is raised if the VDUR is not contained in the
2366 datastore.
2367
2368 """
2369 if vdur_id not in self._vdur_data:
2370 raise ValueError('VNF is not recognized')
2371
2372 del self._vdur_data[vdur_id]
2373
2374 def get(self, expr):
2375 """Retrieve VDUR information from the datastore
2376
2377 An expression should be of the form,
2378
2379 vdu[<id>].<attr>
2380
2381 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2382 the exposed attribute that the user wishes to retrieve.
2383
2384 If the requested data is not available, None is returned.
2385
2386 Arguments:
2387 expr - a string that specifies the data to return
2388
2389 Raises:
2390 A ValueError is raised if the provided expression cannot be parsed.
2391
2392 Returns:
2393 The requested data or None
2394
2395 """
2396 result = self._pattern.match(expr)
2397 if result is None:
2398 raise ValueError('data expression not recognized ({})'.format(expr))
2399
2400 vdur_id, key = result.groups()
2401
2402 if vdur_id not in self._vdur_data:
2403 return None
2404
2405 return self._vdur_data[vdur_id].get(key, None)
2406
2407
2408 class VnfManager(object):
2409 """ The virtual network function manager class """
2410 def __init__(self, dts, log, loop, cluster_name):
2411 self._dts = dts
2412 self._log = log
2413 self._loop = loop
2414 self._cluster_name = cluster_name
2415
2416 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2417 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2418 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2419 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2420
2421 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2422 self._vnfr_handler,
2423 self._vcs_handler,
2424 self._vnfr_ref_handler,
2425 self._nsr_handler]
2426 self._vnfrs = {}
2427 self._vnfds_to_vnfr = {}
2428 self._nsrs = {}
2429
2430 @property
2431 def vnfr_handler(self):
2432 """ VNFR dts handler """
2433 return self._vnfr_handler
2434
2435 @property
2436 def vcs_handler(self):
2437 """ VCS dts handler """
2438 return self._vcs_handler
2439
2440 @asyncio.coroutine
2441 def register(self):
2442 """ Register all static DTS handlers """
2443 for hdl in self._dts_handlers:
2444 yield from hdl.register()
2445
2446 @asyncio.coroutine
2447 def run(self):
2448 """ Run this VNFM instance """
2449 self._log.debug("Run VNFManager - registering static DTS handlers""")
2450 yield from self.register()
2451
2452 def handle_nsr(self, nsr, action):
2453 if action in [rwdts.QueryAction.CREATE]:
2454 self._nsrs[nsr.id] = nsr
2455 elif action == rwdts.QueryAction.DELETE:
2456 if nsr.id in self._nsrs:
2457 del self._nsrs[nsr.id]
2458
2459 def get_linked_mgmt_network(self, vnfr):
2460 """For the given VNFR get the related mgmt network from the NSD, if
2461 available.
2462 """
2463 vnfd_id = vnfr.vnfd.id
2464 nsr_id = vnfr.nsr_id_ref
2465
2466 # for the given related VNFR, get the corresponding NSR-config
2467 nsr_obj = None
2468 try:
2469 nsr_obj = self._nsrs[nsr_id]
2470 except KeyError:
2471 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2472
2473 # for the related NSD check if a VLD exists such that it's a mgmt
2474 # network
2475 for vld in nsr_obj.nsd.vld:
2476 if vld.mgmt_network:
2477 return vld.vim_network_name
2478
2479 return None
2480
2481 def get_vnfr(self, vnfr_id):
2482 """ get VNFR by vnfr id """
2483
2484 if vnfr_id not in self._vnfrs:
2485 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2486
2487 return self._vnfrs[vnfr_id]
2488
2489 def create_vnfr(self, vnfr):
2490 """ Create a VNFR instance """
2491 if vnfr.id in self._vnfrs:
2492 msg = "Vnfr id %s already exists" % vnfr.id
2493 self._log.error(msg)
2494 raise VnfRecordError(msg)
2495
2496 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2497 vnfr.id,
2498 vnfr.vnfd.id)
2499
2500 mgmt_network = self.get_linked_mgmt_network(vnfr)
2501
2502 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2503 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2504 mgmt_network=mgmt_network
2505 )
2506 self._vnfds_to_vnfr[vnfr.vnfd.id] = self._vnfrs[vnfr.id]
2507 return self._vnfrs[vnfr.id]
2508
2509 @asyncio.coroutine
2510 def delete_vnfr(self, xact, vnfr):
2511 """ Create a VNFR instance """
2512 if vnfr.vnfr_id in self._vnfrs:
2513 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2514 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2515 del self._vnfrs[vnfr.vnfr_id]
2516
2517 @asyncio.coroutine
2518 def fetch_vnfd(self, vnfd_id):
2519 """ Fetch VNFDs based with the vnfd id"""
2520 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2521 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2522 vnfd = None
2523
2524 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2525
2526 for ent in res_iter:
2527 res = yield from ent
2528 vnfd = res.result
2529
2530 if vnfd is None:
2531 err = "Failed to get Vnfd %s" % vnfd_id
2532 self._log.error(err)
2533 raise VnfRecordError(err)
2534
2535 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2536
2537 return vnfd
2538
2539 def vnfd_in_use(self, vnfd_id):
2540 """ Is this VNFD in use """
2541 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2542 if vnfd_id in self._vnfds_to_vnfr:
2543 return self._vnfds_to_vnfr[vnfd_id].in_use()
2544 return False
2545
2546 @asyncio.coroutine
2547 def publish_vnfr(self, xact, path, msg):
2548 """ Publish a VNFR """
2549 self._log.debug("publish_vnfr called with path %s, msg %s",
2550 path, msg)
2551 yield from self.vnfr_handler.update(xact, path, msg)
2552
2553 @asyncio.coroutine
2554 def delete_vnfd(self, vnfd_id):
2555 """ Delete the Virtual Network Function descriptor with the passed id """
2556 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2557 if vnfd_id not in self._vnfds_to_vnfr:
2558 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
2559 raise VirtualNetworkFunctionDescriptorNotFound("Cannot find %s", vnfd_id)
2560
2561 if self._vnfds_to_vnfr[vnfd_id].in_use():
2562 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2563 vnfd_id,
2564 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2565 raise VirtualNetworkFunctionDescriptorRefCountExists(
2566 "Cannot delete :%s, ref_count:%s",
2567 vnfd_id,
2568 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2569
2570 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2571 try:
2572 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2573 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2574 if os.path.exists(vnfd_dir):
2575 shutil.rmtree(vnfd_dir, ignore_errors=True)
2576 except Exception as e:
2577 self._log.error("Exception in cleaning up VNFD {}: {}".
2578 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2579 self._log.exception(e)
2580
2581 del self._vnfds_to_vnfr[vnfd_id]
2582
2583 def vnfd_refcount_xpath(self, vnfd_id):
2584 """ xpath for ref count entry """
2585 return (VnfdRefCountDtsHandler.XPATH +
2586 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2587
2588 @asyncio.coroutine
2589 def get_vnfd_refcount(self, vnfd_id):
2590 """ Get the vnfd_list from this VNFM"""
2591 vnfd_list = []
2592 if vnfd_id is None or vnfd_id == "":
2593 for vnfr in self._vnfds_to_vnfr.values():
2594 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2595 vnfd_msg.vnfd_id_ref = vnfr.vnfd.id
2596 vnfd_msg.instance_ref_count = vnfr.vnfd_ref_count
2597 vnfd_list.append((self.vnfd_refcount_xpath(vnfr.vnfd.id), vnfd_msg))
2598 elif vnfd_id in self._vnfds_to_vnfr:
2599 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2600 vnfd_msg.vnfd_id_ref = self._vnfds_to_vnfr[vnfd_id].vnfd.id
2601 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count
2602 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2603
2604 return vnfd_list
2605
2606
2607 class VnfmTasklet(rift.tasklets.Tasklet):
2608 """ VNF Manager tasklet class """
2609 def __init__(self, *args, **kwargs):
2610 super(VnfmTasklet, self).__init__(*args, **kwargs)
2611 self.rwlog.set_category("rw-mano-log")
2612 self.rwlog.set_subcategory("vnfm")
2613
2614 self._dts = None
2615 self._vnfm = None
2616
2617 def start(self):
2618 try:
2619 super(VnfmTasklet, self).start()
2620 self.log.info("Starting VnfmTasklet")
2621
2622 self.log.setLevel(logging.DEBUG)
2623
2624 self.log.debug("Registering with dts")
2625 self._dts = rift.tasklets.DTS(self.tasklet_info,
2626 RwVnfmYang.get_schema(),
2627 self.loop,
2628 self.on_dts_state_change)
2629
2630 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2631 except Exception:
2632 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2633 raise
2634
2635 def on_instance_started(self):
2636 """ Task insance started callback """
2637 self.log.debug("Got instance started callback")
2638
2639 def stop(self):
2640 try:
2641 self._dts.deinit()
2642 except Exception:
2643 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2644 raise
2645
2646 @asyncio.coroutine
2647 def init(self):
2648 """ Task init callback """
2649 try:
2650 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2651 assert vm_parent_name is not None
2652 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2653 yield from self._vnfm.run()
2654 except Exception:
2655 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2656 raise
2657
2658 @asyncio.coroutine
2659 def run(self):
2660 """ Task run callback """
2661 pass
2662
2663 @asyncio.coroutine
2664 def on_dts_state_change(self, state):
2665 """Take action according to current dts state to transition
2666 application into the corresponding application state
2667
2668 Arguments
2669 state - current dts state
2670 """
2671 switch = {
2672 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2673 rwdts.State.CONFIG: rwdts.State.RUN,
2674 }
2675
2676 handlers = {
2677 rwdts.State.INIT: self.init,
2678 rwdts.State.RUN: self.run,
2679 }
2680
2681 # Transition application to next state
2682 handler = handlers.get(state, None)
2683 if handler is not None:
2684 yield from handler()
2685
2686 # Transition dts to next state
2687 next_state = switch.get(state, None)
2688 if next_state is not None:
2689 self._dts.handle.set_state(next_state)