Merge branch 'v1.0'
[osm/SO.git] / rwlaunchpad / plugins / rwvnfm / rift / tasklets / rwvnfmtasklet / rwvnfmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import collections
19 import enum
20 import logging
21 import uuid
22 import time
23 import os.path
24 import re
25 import shutil
26 import sys
27
28 import gi
29 gi.require_version('RwDts', '1.0')
30 gi.require_version('RwVnfrYang', '1.0')
31 gi.require_version('RwVnfmYang', '1.0')
32 gi.require_version('RwVlrYang', '1.0')
33 gi.require_version('RwManifestYang', '1.0')
34 gi.require_version('RwBaseYang', '1.0')
35 gi.require_version('RwResourceMgrYang', '1.0')
36
37 from gi.repository import (
38 RwDts as rwdts,
39 RwVnfrYang,
40 RwVnfmYang,
41 RwVlrYang,
42 VnfrYang,
43 RwManifestYang,
44 RwBaseYang,
45 RwResourceMgrYang,
46 ProtobufC,
47 )
48
49 import rift.tasklets
50 import rift.package.store
51 import rift.package.cloud_init
52 import rift.mano.dts as mano_dts
53
54
55 class VMResourceError(Exception):
56 """ VM resource Error"""
57 pass
58
59
60 class VnfRecordError(Exception):
61 """ VNF record instatiation failed"""
62 pass
63
64
65 class VduRecordError(Exception):
66 """ VDU record instatiation failed"""
67 pass
68
69
70 class NotImplemented(Exception):
71 """Not implemented """
72 pass
73
74
75 class VnfrRecordExistsError(Exception):
76 """VNFR record already exist with the same VNFR id"""
77 pass
78
79
80 class InternalVirtualLinkRecordError(Exception):
81 """Internal virtual link record error"""
82 pass
83
84
85 class VDUImageNotFound(Exception):
86 """VDU Image not found error"""
87 pass
88
89
90 class VirtualDeploymentUnitRecordError(Exception):
91 """VDU Instantiation failed"""
92 pass
93
94
95 class VMNotReadyError(Exception):
96 """ VM Not yet received from resource manager """
97 pass
98
99
100 class VDURecordNotFound(Exception):
101 """ Could not find a VDU record """
102 pass
103
104
105 class VirtualNetworkFunctionRecordDescNotFound(Exception):
106 """ Cannot find Virtual Network Function Record Descriptor """
107 pass
108
109
110 class VirtualNetworkFunctionDescriptorError(Exception):
111 """ Virtual Network Function Record Descriptor Error """
112 pass
113
114
115 class VirtualNetworkFunctionDescriptorNotFound(Exception):
116 """ Virtual Network Function Record Descriptor Not Found """
117 pass
118
119
120 class VirtualNetworkFunctionRecordNotFound(Exception):
121 """ Virtual Network Function Record Not Found """
122 pass
123
124
125 class VirtualNetworkFunctionDescriptorRefCountExists(Exception):
126 """ Virtual Network Funtion Descriptor reference count exists """
127 pass
128
129
130 class VnfrInstantiationFailed(Exception):
131 """ Virtual Network Funtion Instantiation failed"""
132 pass
133
134
135 class VNFMPlacementGroupError(Exception):
136 pass
137
138 class VirtualNetworkFunctionRecordState(enum.Enum):
139 """ VNFR state """
140 INIT = 1
141 VL_INIT_PHASE = 2
142 VM_INIT_PHASE = 3
143 READY = 4
144 TERMINATE = 5
145 VL_TERMINATE_PHASE = 6
146 VDU_TERMINATE_PHASE = 7
147 TERMINATED = 7
148 FAILED = 10
149
150
151 class VDURecordState(enum.Enum):
152 """VDU record state """
153 INIT = 1
154 INSTANTIATING = 2
155 RESOURCE_ALLOC_PENDING = 3
156 READY = 4
157 TERMINATING = 5
158 TERMINATED = 6
159 FAILED = 10
160
161
162 class VcsComponent(object):
163 """ VCS Component within the VNF descriptor """
164 def __init__(self, dts, log, loop, cluster_name, vcs_handler, component, mangled_name):
165 self._dts = dts
166 self._log = log
167 self._loop = loop
168 self._component = component
169 self._cluster_name = cluster_name
170 self._vcs_handler = vcs_handler
171 self._mangled_name = mangled_name
172
173 @staticmethod
174 def mangle_name(component_name, vnf_name, vnfd_id):
175 """ mangled component name """
176 return vnf_name + ":" + component_name + ":" + vnfd_id
177
178 @property
179 def name(self):
180 """ name of this component"""
181 return self._mangled_name
182
183 @property
184 def path(self):
185 """ The path for this object """
186 return("D,/rw-manifest:manifest" +
187 "/rw-manifest:operational-inventory" +
188 "/rw-manifest:component" +
189 "[rw-manifest:component-name = '{}']").format(self.name)
190
191 @property
192 def instance_xpath(self):
193 """ The path for this object """
194 return("D,/rw-base:vcs" +
195 "/instances" +
196 "/instance" +
197 "[instance-name = '{}']".format(self._cluster_name))
198
199 @property
200 def start_comp_xpath(self):
201 """ start component xpath """
202 return (self.instance_xpath +
203 "/child-n[instance-name = 'START-REQ']")
204
205 def get_start_comp_msg(self, ip_address):
206 """ start this component """
207 start_msg = RwBaseYang.VcsInstance_Instance_ChildN()
208 start_msg.instance_name = 'START-REQ'
209 start_msg.component_name = self.name
210 start_msg.admin_command = "START"
211 start_msg.ip_address = ip_address
212
213 return start_msg
214
215 @property
216 def msg(self):
217 """ Returns the message for this vcs component"""
218
219 vcs_comp_dict = self._component.as_dict()
220
221 def mangle_comp_names(comp_dict):
222 """ mangle component name with VNF name, id"""
223 for key, val in comp_dict.items():
224 if isinstance(val, dict):
225 comp_dict[key] = mangle_comp_names(val)
226 elif isinstance(val, list):
227 i = 0
228 for ent in val:
229 if isinstance(ent, dict):
230 val[i] = mangle_comp_names(ent)
231 else:
232 val[i] = ent
233 i += 1
234 elif key == "component_name":
235 comp_dict[key] = VcsComponent.mangle_name(val,
236 self._vnfd_name,
237 self._vnfd_id)
238 return comp_dict
239
240 mangled_dict = mangle_comp_names(vcs_comp_dict)
241 msg = RwManifestYang.OpInventory_Component.from_dict(mangled_dict)
242 return msg
243
244 @asyncio.coroutine
245 def publish(self, xact):
246 """ Publishes the VCS component """
247 self._log.debug("Publishing the VcsComponent %s, path = %s comp = %s",
248 self.name, self.path, self.msg)
249 yield from self._vcs_handler.publish(xact, self.path, self.msg)
250
251 @asyncio.coroutine
252 def start(self, xact, parent, ip_addr=None):
253 """ Starts this VCS component """
254 # ATTN RV - replace with block add
255 start_msg = self.get_start_comp_msg(ip_addr)
256 self._log.debug("starting component %s %s",
257 self.start_comp_xpath, start_msg)
258 yield from self._dts.query_create(self.start_comp_xpath,
259 0,
260 start_msg)
261 self._log.debug("started component %s, %s",
262 self.start_comp_xpath, start_msg)
263
264
265 class VirtualDeploymentUnitRecord(object):
266 """ Virtual Deployment Unit Record """
267 def __init__(self,
268 dts,
269 log,
270 loop,
271 vdud,
272 vnfr,
273 mgmt_intf,
274 mgmt_network,
275 cloud_account_name,
276 vnfd_package_store,
277 vdur_id=None,
278 placement_groups=[]):
279 self._dts = dts
280 self._log = log
281 self._loop = loop
282 self._vdud = vdud
283 self._vnfr = vnfr
284 self._mgmt_intf = mgmt_intf
285 self._cloud_account_name = cloud_account_name
286 self._vnfd_package_store = vnfd_package_store
287 self._mgmt_network = mgmt_network
288
289 self._vdur_id = vdur_id or str(uuid.uuid4())
290 self._int_intf = []
291 self._ext_intf = []
292 self._state = VDURecordState.INIT
293 self._state_failed_reason = None
294 self._request_id = str(uuid.uuid4())
295 self._name = vnfr.name + "__" + vdud.id
296 self._placement_groups = placement_groups
297 self._rm_regh = None
298 self._vm_resp = None
299 self._vdud_cloud_init = None
300 self._vdur_console_handler = VnfrConsoleOperdataDtsHandler(dts, log, loop, self._vnfr._vnfm, self._vnfr.vnfr_id, self._vdur_id,self.vdu_id)
301
302 @asyncio.coroutine
303 def vdu_opdata_register(self):
304 yield from self._vdur_console_handler.register()
305
306 def cp_ip_addr(self, cp_name):
307 """ Find ip address by connection point name """
308 if self._vm_resp is not None:
309 for conn_point in self._vm_resp.connection_points:
310 if conn_point.name == cp_name:
311 return conn_point.ip_address
312 return "0.0.0.0"
313
314 def cp_mac_addr(self, cp_name):
315 """ Find mac address by connection point name """
316 if self._vm_resp is not None:
317 for conn_point in self._vm_resp.connection_points:
318 if conn_point.name == cp_name:
319 return conn_point.mac_addr
320 return "00:00:00:00:00:00"
321
322 def cp_id(self, cp_name):
323 """ Find connection point id by connection point name """
324 if self._vm_resp is not None:
325 for conn_point in self._vm_resp.connection_points:
326 if conn_point.name == cp_name:
327 return conn_point.connection_point_id
328 return ''
329
330 @property
331 def vdu_id(self):
332 return self._vdud.id
333
334 @property
335 def vm_resp(self):
336 return self._vm_resp
337
338 @property
339 def name(self):
340 """ Return this VDUR's name """
341 return self._name
342
343 @property
344 def cloud_account_name(self):
345 """ Cloud account this VDU should be created in """
346 return self._cloud_account_name
347
348 @property
349 def image_name(self):
350 """ name that should be used to lookup the image on the CMP """
351 if 'image' not in self._vdud:
352 return None
353 return os.path.basename(self._vdud.image)
354
355 @property
356 def image_checksum(self):
357 """ name that should be used to lookup the image on the CMP """
358 return self._vdud.image_checksum if self._vdud.has_field("image_checksum") else None
359
360 @property
361 def management_ip(self):
362 if not self.active:
363 return None
364 return self._vm_resp.public_ip if self._vm_resp.has_field('public_ip') else self._vm_resp.management_ip
365
366 @property
367 def vm_management_ip(self):
368 if not self.active:
369 return None
370 return self._vm_resp.management_ip
371
372 @property
373 def operational_status(self):
374 """ Operational status of this VDU"""
375 op_stats_dict = {"INIT": "init",
376 "INSTANTIATING": "vm_init_phase",
377 "RESOURCE_ALLOC_PENDING": "vm_alloc_pending",
378 "READY": "running",
379 "FAILED": "failed",
380 "TERMINATING": "terminated",
381 "TERMINATED": "terminated",
382 }
383 return op_stats_dict[self._state.name]
384
385 @property
386 def msg(self):
387 """ VDU message """
388 vdu_fields = ["vm_flavor",
389 "guest_epa",
390 "vswitch_epa",
391 "hypervisor_epa",
392 "host_epa",
393 "volumes",
394 "name"]
395 vdu_copy_dict = {k: v for k, v in
396 self._vdud.as_dict().items() if k in vdu_fields}
397 vdur_dict = {"id": self._vdur_id,
398 "vdu_id_ref": self._vdud.id,
399 "operational_status": self.operational_status,
400 "operational_status_details": self._state_failed_reason,
401 }
402 if self.vm_resp is not None:
403 vdur_dict.update({"vim_id": self.vm_resp.vdu_id,
404 "flavor_id": self.vm_resp.flavor_id
405 })
406 if self._vm_resp.has_field('image_id'):
407 vdur_dict.update({ "image_id": self.vm_resp.image_id })
408
409 if self.management_ip is not None:
410 vdur_dict["management_ip"] = self.management_ip
411
412 if self.vm_management_ip is not None:
413 vdur_dict["vm_management_ip"] = self.vm_management_ip
414
415 vdur_dict.update(vdu_copy_dict)
416
417 if self.vm_resp is not None:
418 if self._vm_resp.has_field('volumes'):
419 for opvolume in self._vm_resp.volumes:
420 vdurvol_data = [vduvol for vduvol in vdur_dict['volumes'] if vduvol['name'] == opvolume.name]
421 if len(vdurvol_data) == 1:
422 vdurvol_data[0]["volume_id"] = opvolume.volume_id
423
424 icp_list = []
425 ii_list = []
426
427 for intf, cp_id, vlr in self._int_intf:
428 cp = self.find_internal_cp_by_cp_id(cp_id)
429
430 icp_list.append({"name": cp.name,
431 "id": cp.id,
432 "type_yang": "VPORT",
433 "ip_address": self.cp_ip_addr(cp.id),
434 "mac_address": self.cp_mac_addr(cp.id)})
435
436 ii_list.append({"name": intf.name,
437 "vdur_internal_connection_point_ref": cp.id,
438 "virtual_interface": {}})
439
440 vdur_dict["internal_connection_point"] = icp_list
441 self._log.debug("internal_connection_point:%s", vdur_dict["internal_connection_point"])
442 vdur_dict["internal_interface"] = ii_list
443
444 ei_list = []
445 for intf, cp, vlr in self._ext_intf:
446 ei_list.append({"name": cp,
447 "vnfd_connection_point_ref": cp,
448 "virtual_interface": {}})
449 self._vnfr.update_cp(cp,
450 self.cp_ip_addr(cp),
451 self.cp_mac_addr(cp),
452 self.cp_id(cp))
453
454 vdur_dict["external_interface"] = ei_list
455
456 placement_groups = []
457 for group in self._placement_groups:
458 placement_groups.append(group.as_dict())
459 vdur_dict['placement_groups_info'] = placement_groups
460
461 return RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur.from_dict(vdur_dict)
462
463 @property
464 def resmgr_path(self):
465 """ path for resource-mgr"""
466 return ("D,/rw-resource-mgr:resource-mgmt" +
467 "/vdu-event" +
468 "/vdu-event-data[event-id='{}']".format(self._request_id))
469
470 @property
471 def vm_flavor_msg(self):
472 """ VM flavor message """
473 flavor = self._vdud.vm_flavor.__class__()
474 flavor.copy_from(self._vdud.vm_flavor)
475
476 return flavor
477
478 @property
479 def vdud_cloud_init(self):
480 """ Return the cloud-init contents for the VDU """
481 if self._vdud_cloud_init is None:
482 self._vdud_cloud_init = self.cloud_init()
483
484 return self._vdud_cloud_init
485
486 def cloud_init(self):
487 """ Populate cloud_init with cloud-config script from
488 either the inline contents or from the file provided
489 """
490 if self._vdud.cloud_init is not None:
491 self._log.debug("cloud_init script provided inline %s", self._vdud.cloud_init)
492 return self._vdud.cloud_init
493 elif self._vdud.cloud_init_file is not None:
494 # Get cloud-init script contents from the file provided in the cloud_init_file param
495 self._log.debug("cloud_init script provided in file %s", self._vdud.cloud_init_file)
496 filename = self._vdud.cloud_init_file
497 self._vnfd_package_store.refresh()
498 stored_package = self._vnfd_package_store.get_package(self._vnfr.vnfd_id)
499 cloud_init_extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
500 try:
501 return cloud_init_extractor.read_script(stored_package, filename)
502 except rift.package.cloud_init.CloudInitExtractionError as e:
503 raise VirtualDeploymentUnitRecordError(e)
504 else:
505 self._log.debug("VDU Instantiation: cloud-init script not provided")
506
507 def process_openstack_placement_group_construct(self, vm_create_msg_dict):
508 host_aggregates = []
509 availability_zones = []
510 server_groups = []
511 for group in self._placement_groups:
512 if group.has_field('host_aggregate'):
513 for aggregate in group.host_aggregate:
514 host_aggregates.append(aggregate.as_dict())
515 if group.has_field('availability_zone'):
516 availability_zones.append(group.availability_zone.as_dict())
517 if group.has_field('server_group'):
518 server_groups.append(group.server_group.as_dict())
519
520 if availability_zones:
521 if len(availability_zones) > 1:
522 self._log.error("Can not launch VDU: %s in multiple availability zones. Requested Zones: %s", self.name, availability_zones)
523 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple availability zones. Requsted Zones".format(self.name, availability_zones))
524 else:
525 vm_create_msg_dict['availability_zone'] = availability_zones[0]
526
527 if server_groups:
528 if len(server_groups) > 1:
529 self._log.error("Can not launch VDU: %s in multiple Server Group. Requested Groups: %s", self.name, server_groups)
530 raise VNFMPlacementGroupError("Can not launch VDU: {} in multiple Server Groups. Requsted Groups".format(self.name, server_groups))
531 else:
532 vm_create_msg_dict['server_group'] = server_groups[0]
533
534 if host_aggregates:
535 vm_create_msg_dict['host_aggregate'] = host_aggregates
536
537 return
538
539 def process_placement_groups(self, vm_create_msg_dict):
540 """Process the placement_groups and fill resource-mgr request"""
541 if not self._placement_groups:
542 return
543
544 cloud_set = set([group.cloud_type for group in self._placement_groups])
545 assert len(cloud_set) == 1
546 cloud_type = cloud_set.pop()
547
548 if cloud_type == 'openstack':
549 self.process_openstack_placement_group_construct(vm_create_msg_dict)
550
551 else:
552 self._log.info("Ignoring placement group with cloud construct for cloud-type: %s", cloud_type)
553 return
554
555 def resmgr_msg(self, config=None):
556 vdu_fields = ["vm_flavor",
557 "guest_epa",
558 "vswitch_epa",
559 "hypervisor_epa",
560 "host_epa"]
561
562 self._log.debug("Creating params based on VDUD: %s", self._vdud)
563 vdu_copy_dict = {k: v for k, v in self._vdud.as_dict().items() if k in vdu_fields}
564
565 vm_create_msg_dict = {
566 "name": self.name,
567 }
568
569 if self.image_name is not None:
570 vm_create_msg_dict["image_name"] = self.image_name
571
572 if self.image_checksum is not None:
573 vm_create_msg_dict["image_checksum"] = self.image_checksum
574
575 vm_create_msg_dict["allocate_public_address"] = self._mgmt_intf
576 if self._vdud.has_field('mgmt_vpci'):
577 vm_create_msg_dict["mgmt_vpci"] = self._vdud.mgmt_vpci
578
579 self._log.debug("VDUD: %s", self._vdud)
580 if config is not None:
581 vm_create_msg_dict['vdu_init'] = {'userdata': config}
582
583 if self._mgmt_network:
584 vm_create_msg_dict['mgmt_network'] = self._mgmt_network
585
586 cp_list = []
587 for intf, cp, vlr in self._ext_intf:
588 cp_info = {"name": cp,
589 "virtual_link_id": vlr.network_id,
590 "type_yang": intf.virtual_interface.type_yang}
591
592 if (intf.virtual_interface.has_field('vpci') and
593 intf.virtual_interface.vpci is not None):
594 cp_info["vpci"] = intf.virtual_interface.vpci
595
596 if (vlr.has_field('ip_profile_params')) and (vlr.ip_profile_params.has_field('security_group')):
597 cp_info['security_group'] = vlr.ip_profile_params.security_group
598
599 cp_list.append(cp_info)
600
601 for intf, cp, vlr in self._int_intf:
602 if (intf.virtual_interface.has_field('vpci') and
603 intf.virtual_interface.vpci is not None):
604 cp_list.append({"name": cp,
605 "virtual_link_id": vlr.network_id,
606 "type_yang": intf.virtual_interface.type_yang,
607 "vpci": intf.virtual_interface.vpci})
608 else:
609 cp_list.append({"name": cp,
610 "virtual_link_id": vlr.network_id,
611 "type_yang": intf.virtual_interface.type_yang})
612
613 vm_create_msg_dict["connection_points"] = cp_list
614 vm_create_msg_dict.update(vdu_copy_dict)
615
616 self.process_placement_groups(vm_create_msg_dict)
617
618 msg = RwResourceMgrYang.VDUEventData()
619 msg.event_id = self._request_id
620 msg.cloud_account = self.cloud_account_name
621 msg.request_info.from_dict(vm_create_msg_dict)
622
623 for volume in self._vdud.volumes:
624 v = msg.request_info.volumes.add()
625 v.from_dict(volume.as_dict())
626 return msg
627
628 @asyncio.coroutine
629 def terminate(self, xact):
630 """ Delete resource in VIM """
631 if self._state != VDURecordState.READY and self._state != VDURecordState.FAILED:
632 self._log.warning("VDU terminate in not ready state - Ignoring request")
633 return
634
635 self._state = VDURecordState.TERMINATING
636 if self._vm_resp is not None:
637 try:
638 with self._dts.transaction() as new_xact:
639 yield from self.delete_resource(new_xact)
640 except Exception:
641 self._log.exception("Caught exception while deleting VDU %s", self.vdu_id)
642
643 if self._rm_regh is not None:
644 self._log.debug("Deregistering resource manager registration handle")
645 self._rm_regh.deregister()
646 self._rm_regh = None
647
648 if self._vdur_console_handler is not None:
649 self._log.error("Deregistering vnfr vdur registration handle")
650 self._vdur_console_handler._regh.deregister()
651 self._vdur_console_handler._regh = None
652
653 self._state = VDURecordState.TERMINATED
654
655 def find_internal_cp_by_cp_id(self, cp_id):
656 """ Find the CP corresponding to the connection point id"""
657 cp = None
658
659 self._log.debug("find_internal_cp_by_cp_id(%s) called",
660 cp_id)
661
662 for int_cp in self._vdud.internal_connection_point:
663 self._log.debug("Checking for int cp %s in internal connection points",
664 int_cp.id)
665 if int_cp.id == cp_id:
666 cp = int_cp
667 break
668
669 if cp is None:
670 self._log.debug("Failed to find cp %s in internal connection points",
671 cp_id)
672 msg = "Failed to find cp %s in internal connection points" % cp_id
673 raise VduRecordError(msg)
674
675 # return the VLR associated with the connection point
676 return cp
677
678 @asyncio.coroutine
679 def create_resource(self, xact, vnfr, config=None):
680 """ Request resource from ResourceMgr """
681 def find_cp_by_name(cp_name):
682 """ Find a connection point by name """
683 cp = None
684 self._log.debug("find_cp_by_name(%s) called", cp_name)
685 for ext_cp in vnfr._cprs:
686 self._log.debug("Checking ext cp (%s) called", ext_cp.name)
687 if ext_cp.name == cp_name:
688 cp = ext_cp
689 break
690 if cp is None:
691 self._log.debug("Failed to find cp %s in external connection points",
692 cp_name)
693 return cp
694
695 def find_internal_vlr_by_cp_name(cp_name):
696 """ Find the VLR corresponding to the connection point name"""
697 cp = None
698
699 self._log.debug("find_internal_vlr_by_cp_name(%s) called",
700 cp_name)
701
702 for int_cp in self._vdud.internal_connection_point:
703 self._log.debug("Checking for int cp %s in internal connection points",
704 int_cp.id)
705 if int_cp.id == cp_name:
706 cp = int_cp
707 break
708
709 if cp is None:
710 self._log.debug("Failed to find cp %s in internal connection points",
711 cp_name)
712 msg = "Failed to find cp %s in internal connection points" % cp_name
713 raise VduRecordError(msg)
714
715 # return the VLR associated with the connection point
716 return vnfr.find_vlr_by_cp(cp_name)
717
718 block = xact.block_create()
719
720 self._log.debug("Executing vm request id: %s, action: create",
721 self._request_id)
722
723 # Resolve the networks associated external interfaces
724 for ext_intf in self._vdud.external_interface:
725 self._log.debug("Resolving external interface name [%s], cp[%s]",
726 ext_intf.name, ext_intf.vnfd_connection_point_ref)
727 cp = find_cp_by_name(ext_intf.vnfd_connection_point_ref)
728 if cp is None:
729 self._log.debug("Failed to find connection point - %s",
730 ext_intf.vnfd_connection_point_ref)
731 continue
732 self._log.debug("Connection point name [%s], type[%s]",
733 cp.name, cp.type_yang)
734
735 vlr = vnfr.ext_vlr_by_id(cp.vlr_ref)
736
737 etuple = (ext_intf, cp.name, vlr)
738 self._ext_intf.append(etuple)
739
740 self._log.debug("Created external interface tuple : %s", etuple)
741
742 # Resolve the networks associated internal interfaces
743 for intf in self._vdud.internal_interface:
744 cp_id = intf.vdu_internal_connection_point_ref
745 self._log.debug("Resolving internal interface name [%s], cp[%s]",
746 intf.name, cp_id)
747
748 try:
749 vlr = find_internal_vlr_by_cp_name(cp_id)
750 except Exception as e:
751 self._log.debug("Failed to find cp %s in internal VLR list", cp_id)
752 msg = "Failed to find cp %s in internal VLR list, e = %s" % (cp_id, e)
753 raise VduRecordError(msg)
754
755 ituple = (intf, cp_id, vlr)
756 self._int_intf.append(ituple)
757
758 self._log.debug("Created internal interface tuple : %s", ituple)
759
760 resmgr_path = self.resmgr_path
761 resmgr_msg = self.resmgr_msg(config)
762
763 self._log.debug("Creating new VM request at: %s, params: %s", resmgr_path, resmgr_msg)
764 block.add_query_create(resmgr_path, resmgr_msg)
765
766 res_iter = yield from block.execute(now=True)
767
768 resp = None
769
770 for i in res_iter:
771 r = yield from i
772 resp = r.result
773
774 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
775 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
776 self._log.debug("Got vm request response: %s", resp.resource_info)
777 return resp.resource_info
778
779 @asyncio.coroutine
780 def delete_resource(self, xact):
781 block = xact.block_create()
782
783 self._log.debug("Executing vm request id: %s, action: delete",
784 self._request_id)
785
786 block.add_query_delete(self.resmgr_path)
787
788 yield from block.execute(flags=0, now=True)
789
790 @asyncio.coroutine
791 def read_resource(self, xact):
792 block = xact.block_create()
793
794 self._log.debug("Executing vm request id: %s, action: delete",
795 self._request_id)
796
797 block.add_query_read(self.resmgr_path)
798
799 res_iter = yield from block.execute(flags=0, now=True)
800 for i in res_iter:
801 r = yield from i
802 resp = r.result
803
804 if resp is None or not (resp.has_field('resource_info') and resp.resource_info.has_field('resource_state')):
805 raise VMResourceError("Did not get a vm resource response (resp: %s)", resp)
806 self._log.debug("Got vm request response: %s", resp.resource_info)
807 #self._vm_resp = resp.resource_info
808 return resp.resource_info
809
810
811 @asyncio.coroutine
812 def start_component(self):
813 """ This VDUR is active """
814 self._log.debug("Starting component %s for vdud %s vdur %s",
815 self._vdud.vcs_component_ref,
816 self._vdud,
817 self._vdur_id)
818 yield from self._vnfr.start_component(self._vdud.vcs_component_ref,
819 self.vm_resp.management_ip)
820
821 @property
822 def active(self):
823 """ Is this VDU active """
824 return True if self._state is VDURecordState.READY else False
825
826 @asyncio.coroutine
827 def instantiation_failed(self, failed_reason=None):
828 """ VDU instantiation failed """
829 self._log.debug("VDU %s instantiation failed ", self._vdur_id)
830 self._state = VDURecordState.FAILED
831 self._state_failed_reason = failed_reason
832 yield from self._vnfr.instantiation_failed(failed_reason)
833
834 @asyncio.coroutine
835 def vdu_is_active(self):
836 """ This VDU is active"""
837 if self.active:
838 self._log.warning("VDU %s was already marked as active", self._vdur_id)
839 return
840
841 self._log.debug("VDUR id %s in VNFR %s is active", self._vdur_id, self._vnfr.vnfr_id)
842
843 if self._vdud.vcs_component_ref is not None:
844 yield from self.start_component()
845
846 self._state = VDURecordState.READY
847
848 if self._vnfr.all_vdus_active():
849 self._log.debug("Inside vdu_is_active. VNFR is READY. Info: %s", self._vnfr)
850 yield from self._vnfr.is_ready()
851
852 @asyncio.coroutine
853 def instantiate(self, xact, vnfr, config=None):
854 """ Instantiate this VDU """
855 self._state = VDURecordState.INSTANTIATING
856
857 @asyncio.coroutine
858 def on_prepare(xact_info, query_action, ks_path, msg):
859 """ This VDUR is active """
860 self._log.debug("Received VDUR instantiate on_prepare (%s:%s:%s)",
861 query_action,
862 ks_path,
863 msg)
864
865 if (query_action == rwdts.QueryAction.UPDATE or
866 query_action == rwdts.QueryAction.CREATE):
867 self._vm_resp = msg
868
869 if msg.resource_state == "active":
870 # Move this VDU to ready state
871 yield from self.vdu_is_active()
872 elif msg.resource_state == "failed":
873 yield from self.instantiation_failed(msg.resource_errors)
874 elif query_action == rwdts.QueryAction.DELETE:
875 self._log.debug("DELETE action in on_prepare for VDUR instantiation, ignoring")
876 else:
877 raise NotImplementedError(
878 "%s action on VirtualDeployementUnitRecord not supported",
879 query_action)
880
881 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
882
883 try:
884 reg_event = asyncio.Event(loop=self._loop)
885
886 @asyncio.coroutine
887 def on_ready(regh, status):
888 reg_event.set()
889
890 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
891 self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
892 flags=rwdts.Flag.SUBSCRIBER,
893 handler=handler)
894 yield from reg_event.wait()
895
896 vm_resp = yield from self.create_resource(xact, vnfr, config)
897 self._vm_resp = vm_resp
898
899 self._state = VDURecordState.RESOURCE_ALLOC_PENDING
900 self._log.debug("Requested VM from resource manager response %s",
901 vm_resp)
902 if vm_resp.resource_state == "active":
903 self._log.debug("Resourcemgr responded wih an active vm resp %s",
904 vm_resp)
905 yield from self.vdu_is_active()
906 self._state = VDURecordState.READY
907 elif (vm_resp.resource_state == "pending" or
908 vm_resp.resource_state == "inactive"):
909 self._log.debug("Resourcemgr responded wih a pending vm resp %s",
910 vm_resp)
911 # handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
912 # self._rm_regh = yield from self._dts.register(self.resmgr_path + '/resource-info',
913 # flags=rwdts.Flag.SUBSCRIBER,
914 # handler=handler)
915 else:
916 self._log.debug("Resourcemgr responded wih an error vm resp %s",
917 vm_resp)
918 raise VirtualDeploymentUnitRecordError(
919 "Failed VDUR instantiation %s " % vm_resp)
920
921 except Exception as e:
922 import traceback
923 traceback.print_exc()
924 self._log.exception(e)
925 self._log.error("Instantiation of VDU record failed: %s", str(e))
926 self._state = VDURecordState.FAILED
927 yield from self.instantiation_failed(str(e))
928
929
930 class VlRecordState(enum.Enum):
931 """ VL Record State """
932 INIT = 101
933 INSTANTIATION_PENDING = 102
934 ACTIVE = 103
935 TERMINATE_PENDING = 104
936 TERMINATED = 105
937 FAILED = 106
938
939
940 class InternalVirtualLinkRecord(object):
941 """ Internal Virtual Link record """
942 def __init__(self, dts, log, loop, ivld_msg, vnfr_name, cloud_account_name):
943 self._dts = dts
944 self._log = log
945 self._loop = loop
946 self._ivld_msg = ivld_msg
947 self._vnfr_name = vnfr_name
948 self._cloud_account_name = cloud_account_name
949
950 self._vlr_req = self.create_vlr()
951 self._vlr = None
952 self._state = VlRecordState.INIT
953
954 @property
955 def vlr_id(self):
956 """ Find VLR by id """
957 return self._vlr_req.id
958
959 @property
960 def name(self):
961 """ Name of this VL """
962 return self._vnfr_name + "." + self._ivld_msg.name
963
964 @property
965 def network_id(self):
966 """ Find VLR by id """
967 return self._vlr.network_id if self._vlr else None
968
969 def vlr_path(self):
970 """ VLR path for this VLR instance"""
971 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self.vlr_id)
972
973 def create_vlr(self):
974 """ Create the VLR record which will be instantiated """
975
976 vld_fields = ["short_name",
977 "vendor",
978 "description",
979 "version",
980 "type_yang",
981 "provider_network"]
982
983 vld_copy_dict = {k: v for k, v in self._ivld_msg.as_dict().items() if k in vld_fields}
984
985 vlr_dict = {"id": str(uuid.uuid4()),
986 "name": self.name,
987 "cloud_account": self._cloud_account_name,
988 }
989 vlr_dict.update(vld_copy_dict)
990
991 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
992 return vlr
993
994 @asyncio.coroutine
995 def instantiate(self, xact, restart_mode=False):
996 """ Instantiate VL """
997
998 @asyncio.coroutine
999 def instantiate_vlr():
1000 """ Instantiate VLR"""
1001 self._log.debug("Create VL with xpath %s and vlr %s",
1002 self.vlr_path(), self._vlr_req)
1003
1004 with self._dts.transaction(flags=0) as xact:
1005 block = xact.block_create()
1006 block.add_query_create(xpath=self.vlr_path(), msg=self._vlr_req)
1007 self._log.debug("Executing VL create path:%s msg:%s",
1008 self.vlr_path(), self._vlr_req)
1009
1010 res_iter = None
1011 try:
1012 res_iter = yield from block.execute()
1013 except Exception:
1014 self._state = VlRecordState.FAILED
1015 self._log.exception("Caught exception while instantial VL")
1016 raise
1017
1018 for ent in res_iter:
1019 res = yield from ent
1020 self._vlr = res.result
1021
1022 if self._vlr.operational_status == 'failed':
1023 self._log.debug("VL creation failed for vlr id %s", self._vlr.id)
1024 self._state = VlRecordState.FAILED
1025 raise VnfrInstantiationFailed("instantiation due to VL failure %s" % (self._vlr.id))
1026
1027 self._log.info("Created VL with xpath %s and vlr %s",
1028 self.vlr_path(), self._vlr)
1029
1030 @asyncio.coroutine
1031 def get_vlr():
1032 """ Get the network id """
1033 res_iter = yield from self._dts.query_read(self.vlr_path(), rwdts.XactFlag.MERGE)
1034 vlr = None
1035 for ent in res_iter:
1036 res = yield from ent
1037 vlr = res.result
1038
1039 if vlr is None:
1040 err = "Failed to get VLR for path %s" % self.vlr_path()
1041 self._log.warn(err)
1042 raise InternalVirtualLinkRecordError(err)
1043 return vlr
1044
1045 self._state = VlRecordState.INSTANTIATION_PENDING
1046
1047 if restart_mode:
1048 vl = yield from get_vlr()
1049 if vl is None:
1050 yield from instantiate_vlr()
1051 else:
1052 yield from instantiate_vlr()
1053
1054 self._state = VlRecordState.ACTIVE
1055
1056 def vlr_in_vns(self):
1057 """ Is there a VLR record in VNS """
1058 if (self._state == VlRecordState.ACTIVE or
1059 self._state == VlRecordState.INSTANTIATION_PENDING or
1060 self._state == VlRecordState.FAILED):
1061 return True
1062
1063 return False
1064
1065 @asyncio.coroutine
1066 def terminate(self, xact):
1067 """Terminate this VL """
1068 if not self.vlr_in_vns():
1069 self._log.debug("Ignoring terminate request for id %s in state %s",
1070 self.vlr_id, self._state)
1071 return
1072
1073 self._log.debug("Terminating VL with path %s", self.vlr_path())
1074 self._state = VlRecordState.TERMINATE_PENDING
1075 block = xact.block_create()
1076 block.add_query_delete(self.vlr_path())
1077 yield from block.execute(flags=0, now=True)
1078 self._state = VlRecordState.TERMINATED
1079 self._log.debug("Terminated VL with path %s", self.vlr_path())
1080
1081
1082 class VirtualNetworkFunctionRecord(object):
1083 """ Virtual Network Function Record """
1084 def __init__(self, dts, log, loop, cluster_name, vnfm, vcs_handler, vnfr_msg, mgmt_network=None):
1085 self._dts = dts
1086 self._log = log
1087 self._loop = loop
1088 self._cluster_name = cluster_name
1089 self._vnfr_msg = vnfr_msg
1090 self._vnfr_id = vnfr_msg.id
1091 self._vnfd_id = vnfr_msg.vnfd.id
1092 self._vnfm = vnfm
1093 self._vcs_handler = vcs_handler
1094 self._vnfr = vnfr_msg
1095 self._mgmt_network = mgmt_network
1096
1097 self._vnfd = vnfr_msg.vnfd
1098 self._state = VirtualNetworkFunctionRecordState.INIT
1099 self._state_failed_reason = None
1100 self._ext_vlrs = {} # The list of external virtual links
1101 self._vlrs = [] # The list of internal virtual links
1102 self._vdus = [] # The list of vdu
1103 self._vlr_by_cp = {}
1104 self._cprs = []
1105 self._inventory = {}
1106 self._create_time = int(time.time())
1107 self._vnf_mon = None
1108 self._config_status = vnfr_msg.config_status
1109 self._vnfd_package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
1110 self._rw_vnfd = None
1111 self._vnfd_ref_count = 0
1112
1113 def _get_vdur_from_vdu_id(self, vdu_id):
1114 self._log.debug("Finding vdur for vdu_id %s", vdu_id)
1115 self._log.debug("Searching through vdus: %s", self._vdus)
1116 for vdu in self._vdus:
1117 self._log.debug("vdu_id: %s", vdu.vdu_id)
1118 if vdu.vdu_id == vdu_id:
1119 return vdu
1120
1121 raise VDURecordNotFound("Could not find vdu record from id: %s", vdu_id)
1122
1123 @property
1124 def operational_status(self):
1125 """ Operational status of this VNFR """
1126 op_status_map = {"INIT": "init",
1127 "VL_INIT_PHASE": "vl_init_phase",
1128 "VM_INIT_PHASE": "vm_init_phase",
1129 "READY": "running",
1130 "TERMINATE": "terminate",
1131 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1132 "VDU_TERMINATE_PHASE": "vm_terminate_phase",
1133 "TERMINATED": "terminated",
1134 "FAILED": "failed", }
1135 return op_status_map[self._state.name]
1136
1137 @staticmethod
1138 def vnfd_xpath(vnfd_id):
1139 """ VNFD xpath associated with this VNFR """
1140 return "C,/vnfd:vnfd-catalog/vnfd:vnfd[vnfd:id = '{}']".format(vnfd_id)
1141
1142 @property
1143 def vnfd_ref_count(self):
1144 """ Returns the VNFD reference count associated with this VNFR """
1145 return self._vnfd_ref_count
1146
1147 def vnfd_in_use(self):
1148 """ Returns whether vnfd is in use or not """
1149 return True if self._vnfd_ref_count > 0 else False
1150
1151 def vnfd_ref(self):
1152 """ Take a reference on this object """
1153 self._vnfd_ref_count += 1
1154 return self._vnfd_ref_count
1155
1156 def vnfd_unref(self):
1157 """ Release reference on this object """
1158 if self._vnfd_ref_count < 1:
1159 msg = ("Unref on a VNFD object - vnfd id %s, vnfd_ref_count = %s" %
1160 (self.vnfd.id, self._vnfd_ref_count))
1161 self._log.critical(msg)
1162 raise VnfRecordError(msg)
1163 self._log.debug("Releasing ref on VNFD %s - curr vnfd_ref_count:%s",
1164 self.vnfd.id, self._vnfd_ref_count)
1165 self._vnfd_ref_count -= 1
1166 return self._vnfd_ref_count
1167
1168 @property
1169 def vnfd(self):
1170 """ VNFD for this VNFR """
1171 return self._vnfd
1172
1173 @property
1174 def vnf_name(self):
1175 """ VNFD name associated with this VNFR """
1176 return self.vnfd.name
1177
1178 @property
1179 def name(self):
1180 """ Name of this VNF in the record """
1181 return self._vnfr.name
1182
1183 @property
1184 def cloud_account_name(self):
1185 """ Name of the cloud account this VNFR is instantiated in """
1186 return self._vnfr.cloud_account
1187
1188 @property
1189 def vnfd_id(self):
1190 """ VNFD Id associated with this VNFR """
1191 return self.vnfd.id
1192
1193 @property
1194 def vnfr_id(self):
1195 """ VNFR Id associated with this VNFR """
1196 return self._vnfr_id
1197
1198 @property
1199 def member_vnf_index(self):
1200 """ Member VNF index associated with this VNFR """
1201 return self._vnfr.member_vnf_index_ref
1202
1203 @property
1204 def config_status(self):
1205 """ Config agent status for this VNFR """
1206 return self._config_status
1207
1208 def component_by_name(self, component_name):
1209 """ Find a component by name in the inventory list"""
1210 mangled_name = VcsComponent.mangle_name(component_name,
1211 self.vnf_name,
1212 self.vnfd_id)
1213 return self._inventory[mangled_name]
1214
1215
1216
1217 @asyncio.coroutine
1218 def get_nsr_config(self):
1219 ### Need access to NS instance configuration for runtime resolution.
1220 ### This shall be replaced when deployment flavors are implemented
1221 xpath = "C,/nsr:ns-instance-config"
1222 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
1223
1224 for result in results:
1225 entry = yield from result
1226 ns_instance_config = entry.result
1227 for nsr in ns_instance_config.nsr:
1228 if nsr.id == self._vnfr_msg.nsr_id_ref:
1229 return nsr
1230 return None
1231
1232 @asyncio.coroutine
1233 def start_component(self, component_name, ip_addr):
1234 """ Start a component in the VNFR by name """
1235 comp = self.component_by_name(component_name)
1236 yield from comp.start(None, None, ip_addr)
1237
1238 def cp_ip_addr(self, cp_name):
1239 """ Get ip address for connection point """
1240 self._log.debug("cp_ip_addr()")
1241 for cp in self._cprs:
1242 if cp.name == cp_name and cp.ip_address is not None:
1243 return cp.ip_address
1244 return "0.0.0.0"
1245
1246 def mgmt_intf_info(self):
1247 """ Get Management interface info for this VNFR """
1248 mgmt_intf_desc = self.vnfd.mgmt_interface
1249 ip_addr = None
1250 if mgmt_intf_desc.has_field("cp"):
1251 ip_addr = self.cp_ip_addr(mgmt_intf_desc.cp)
1252 elif mgmt_intf_desc.has_field("vdu_id"):
1253 try:
1254 vdur = self._get_vdur_from_vdu_id(mgmt_intf_desc.vdu_id)
1255 ip_addr = vdur.management_ip
1256 except VDURecordNotFound:
1257 self._log.debug("Did not find mgmt interface for vnfr id %s", self._vnfr_id)
1258 ip_addr = None
1259 else:
1260 ip_addr = mgmt_intf_desc.ip_address
1261 port = mgmt_intf_desc.port
1262
1263 return ip_addr, port
1264
1265 @property
1266 def msg(self):
1267 """ Message associated with this VNFR """
1268 vnfd_fields = ["short_name", "vendor", "description", "version"]
1269 vnfd_copy_dict = {k: v for k, v in self.vnfd.as_dict().items() if k in vnfd_fields}
1270
1271 mgmt_intf = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MgmtInterface()
1272 ip_address, port = self.mgmt_intf_info()
1273
1274 if ip_address is not None:
1275 mgmt_intf.ip_address = ip_address
1276 if port is not None:
1277 mgmt_intf.port = port
1278
1279 vnfr_dict = {"id": self._vnfr_id,
1280 "nsr_id_ref": self._vnfr_msg.nsr_id_ref,
1281 "name": self.name,
1282 "member_vnf_index_ref": self.member_vnf_index,
1283 "operational_status": self.operational_status,
1284 "operational_status_details": self._state_failed_reason,
1285 "cloud_account": self.cloud_account_name,
1286 "config_status": self._config_status
1287 }
1288
1289 vnfr_dict.update(vnfd_copy_dict)
1290
1291 vnfr_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1292 vnfr_msg.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict())
1293
1294 vnfr_msg.create_time = self._create_time
1295 vnfr_msg.uptime = int(time.time()) - self._create_time
1296 vnfr_msg.mgmt_interface = mgmt_intf
1297
1298 # Add all the VLRs to VNFR
1299 for vlr in self._vlrs:
1300 ivlr = vnfr_msg.internal_vlr.add()
1301 ivlr.vlr_ref = vlr.vlr_id
1302
1303 # Add all the VDURs to VDUR
1304 if self._vdus is not None:
1305 for vdu in self._vdus:
1306 vdur = vnfr_msg.vdur.add()
1307 vdur.from_dict(vdu.msg.as_dict())
1308
1309 if self.vnfd.mgmt_interface.has_field('dashboard_params'):
1310 vnfr_msg.dashboard_url = self.dashboard_url
1311
1312 for cpr in self._cprs:
1313 new_cp = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr.as_dict())
1314 vnfr_msg.connection_point.append(new_cp)
1315
1316 if self._vnf_mon is not None:
1317 for monp in self._vnf_mon.msg:
1318 vnfr_msg.monitoring_param.append(
1319 VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(monp.as_dict()))
1320
1321 if self._vnfr.vnf_configuration is not None:
1322 vnfr_msg.vnf_configuration.from_dict(self._vnfr.vnf_configuration.as_dict())
1323 if (ip_address is not None and
1324 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address is None):
1325 vnfr_msg.vnf_configuration.config_access.mgmt_ip_address = ip_address
1326
1327 for group in self._vnfr_msg.placement_groups_info:
1328 group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1329 group_info.from_dict(group.as_dict())
1330 vnfr_msg.placement_groups_info.append(group_info)
1331
1332 return vnfr_msg
1333
1334 @property
1335 def dashboard_url(self):
1336 ip, cfg_port = self.mgmt_intf_info()
1337 protocol = 'http'
1338 http_port = 80
1339 if self.vnfd.mgmt_interface.dashboard_params.has_field('https'):
1340 if self.vnfd.mgmt_interface.dashboard_params.https is True:
1341 protocol = 'https'
1342 http_port = 443
1343 if self.vnfd.mgmt_interface.dashboard_params.has_field('port'):
1344 http_port = self.vnfd.mgmt_interface.dashboard_params.port
1345
1346 url = "{protocol}://{ip_address}:{port}/{path}".format(
1347 protocol=protocol,
1348 ip_address=ip,
1349 port=http_port,
1350 path=self.vnfd.mgmt_interface.dashboard_params.path.lstrip("/"),
1351 )
1352
1353 return url
1354
1355 @property
1356 def xpath(self):
1357 """ path for this VNFR """
1358 return("D,/vnfr:vnfr-catalog"
1359 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id))
1360
1361 @asyncio.coroutine
1362 def publish(self, xact):
1363 """ publish this VNFR """
1364 vnfr = self.msg
1365 self._log.debug("Publishing VNFR path = [%s], record = [%s]",
1366 self.xpath, self.msg)
1367 vnfr.create_time = self._create_time
1368 yield from self._vnfm.publish_vnfr(xact, self.xpath, self.msg)
1369 self._log.debug("Published VNFR path = [%s], record = [%s]",
1370 self.xpath, self.msg)
1371
1372 @asyncio.coroutine
1373 def create_vls(self):
1374 """ Publish The VLs associated with this VNF """
1375 self._log.debug("Publishing Internal Virtual Links for vnfd id: %s",
1376 self.vnfd_id)
1377 for ivld_msg in self.vnfd.internal_vld:
1378 self._log.debug("Creating internal vld:"
1379 " %s, int_cp_ref = %s",
1380 ivld_msg, ivld_msg.internal_connection_point
1381 )
1382 vlr = InternalVirtualLinkRecord(dts=self._dts,
1383 log=self._log,
1384 loop=self._loop,
1385 ivld_msg=ivld_msg,
1386 vnfr_name=self.name,
1387 cloud_account_name=self.cloud_account_name
1388 )
1389 self._vlrs.append(vlr)
1390
1391 for int_cp in ivld_msg.internal_connection_point:
1392 if int_cp.id_ref in self._vlr_by_cp:
1393 msg = ("Connection point %s already "
1394 " bound %s" % (int_cp.id_ref, self._vlr_by_cp[int_cp.id_ref]))
1395 raise InternalVirtualLinkRecordError(msg)
1396 self._log.debug("Setting vlr %s to internal cp = %s",
1397 vlr, int_cp.id_ref)
1398 self._vlr_by_cp[int_cp.id_ref] = vlr
1399
1400 @asyncio.coroutine
1401 def instantiate_vls(self, xact, restart_mode=False):
1402 """ Instantiate the VLs associated with this VNF """
1403 self._log.debug("Instantiating Internal Virtual Links for vnfd id: %s",
1404 self.vnfd_id)
1405
1406 for vlr in self._vlrs:
1407 self._log.debug("Instantiating VLR %s", vlr)
1408 yield from vlr.instantiate(xact, restart_mode)
1409
1410 def find_vlr_by_cp(self, cp_name):
1411 """ Find the VLR associated with the cp name """
1412 return self._vlr_by_cp[cp_name]
1413
1414 def resolve_placement_group_cloud_construct(self, input_group, nsr_config):
1415 """
1416 Returns the cloud specific construct for placement group
1417 Arguments:
1418 input_group: VNFD PlacementGroup
1419 nsr_config: Configuration for VNFDGroup MAP in the NSR config
1420 """
1421 copy_dict = ['name', 'requirement', 'strategy']
1422 for group_info in nsr_config.vnfd_placement_group_maps:
1423 if group_info.placement_group_ref == input_group.name and \
1424 group_info.vnfd_id_ref == self.vnfd_id:
1425 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1426 group_dict = {k:v for k,v in
1427 group_info.as_dict().items()
1428 if (k != 'placement_group_ref' and k !='vnfd_id_ref')}
1429 for param in copy_dict:
1430 group_dict.update({param: getattr(input_group, param)})
1431 group.from_dict(group_dict)
1432 return group
1433 return None
1434
1435 @asyncio.coroutine
1436 def get_vdu_placement_groups(self, vdu):
1437 placement_groups = []
1438 ### Step-1: Get VNF level placement groups
1439 for group in self._vnfr_msg.placement_groups_info:
1440 #group_info = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_PlacementGroupsInfo()
1441 #group_info.from_dict(group.as_dict())
1442 placement_groups.append(group)
1443
1444 ### Step-2: Get NSR config. This is required for resolving placement_groups cloud constructs
1445 nsr_config = yield from self.get_nsr_config()
1446
1447 ### Step-3: Get VDU level placement groups
1448 for group in self.vnfd.placement_groups:
1449 for member_vdu in group.member_vdus:
1450 if member_vdu.member_vdu_ref == vdu.id:
1451 group_info = self.resolve_placement_group_cloud_construct(group,
1452 nsr_config)
1453 if group_info is None:
1454 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
1455 ### raise VNFMPlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1456 else:
1457 self._log.info("Successfully resolved cloud construct for placement group: %s for VDU: %s in VNF: %s (Member Index: %s)",
1458 str(group_info),
1459 vdu.name,
1460 self.vnf_name,
1461 self.member_vnf_index)
1462 placement_groups.append(group_info)
1463
1464 return placement_groups
1465
1466 @asyncio.coroutine
1467 def create_vdus(self, vnfr, restart_mode=False):
1468 """ Create the VDUs associated with this VNF """
1469
1470 def get_vdur_id(vdud):
1471 """Get the corresponding VDUR's id for the VDUD. This is useful in
1472 case of a restart.
1473
1474 In restart mode we check for exiting VDUR's ID and use them, if
1475 available. This way we don't end up creating duplicate VDURs
1476 """
1477 vdur_id = None
1478
1479 if restart_mode and vdud is not None:
1480 try:
1481 vdur = [vdur.id for vdur in vnfr._vnfr.vdur if vdur.vdu_id_ref == vdud.id]
1482 vdur_id = vdur[0]
1483 except IndexError:
1484 self._log.error("Unable to find a VDUR for VDUD {}".format(vdud))
1485
1486 return vdur_id
1487
1488
1489 self._log.info("Creating VDU's for vnfd id: %s", self.vnfd_id)
1490 for vdu in self._rw_vnfd.vdu:
1491 self._log.debug("Creating vdu: %s", vdu)
1492 vdur_id = get_vdur_id(vdu)
1493
1494 placement_groups = yield from self.get_vdu_placement_groups(vdu)
1495 self._log.info("Launching VDU: %s from VNFD :%s (Member Index: %s) with Placement Groups: %s",
1496 vdu.name,
1497 self.vnf_name,
1498 self.member_vnf_index,
1499 [ group.name for group in placement_groups])
1500
1501 vdur = VirtualDeploymentUnitRecord(
1502 dts=self._dts,
1503 log=self._log,
1504 loop=self._loop,
1505 vdud=vdu,
1506 vnfr=vnfr,
1507 mgmt_intf=self.has_mgmt_interface(vdu),
1508 mgmt_network=self._mgmt_network,
1509 cloud_account_name=self.cloud_account_name,
1510 vnfd_package_store=self._vnfd_package_store,
1511 vdur_id=vdur_id,
1512 placement_groups = placement_groups,
1513 )
1514 yield from vdur.vdu_opdata_register()
1515
1516 self._vdus.append(vdur)
1517
1518 @asyncio.coroutine
1519 def instantiate_vdus(self, xact, vnfr):
1520 """ Instantiate the VDUs associated with this VNF """
1521 self._log.debug("Instantiating VDU's for vnfd id %s: %s", self.vnfd_id, self._vdus)
1522
1523 lookup = {vdu.vdu_id: vdu for vdu in self._vdus}
1524
1525 # Identify any dependencies among the VDUs
1526 dependencies = collections.defaultdict(list)
1527 vdu_id_pattern = re.compile(r"\{\{ vdu\[([^]]+)\]\S* \}\}")
1528
1529 for vdu in self._vdus:
1530 if vdu.vdud_cloud_init is not None:
1531 for vdu_id in vdu_id_pattern.findall(vdu.vdud_cloud_init):
1532 if vdu_id != vdu.vdu_id:
1533 # This means that vdu.vdu_id depends upon vdu_id,
1534 # i.e. vdu_id must be instantiated before
1535 # vdu.vdu_id.
1536 dependencies[vdu.vdu_id].append(lookup[vdu_id])
1537
1538 # Define the terminal states of VDU instantiation
1539 terminal = (
1540 VDURecordState.READY,
1541 VDURecordState.TERMINATED,
1542 VDURecordState.FAILED,
1543 )
1544
1545 datastore = VdurDatastore()
1546 processed = set()
1547
1548 @asyncio.coroutine
1549 def instantiate_monitor(vdu):
1550 """Monitor the state of the VDU during instantiation
1551
1552 Arguments:
1553 vdu - a VirtualDeploymentUnitRecord
1554
1555 """
1556 # wait for the VDUR to enter a terminal state
1557 while vdu._state not in terminal:
1558 yield from asyncio.sleep(1, loop=self._loop)
1559
1560 # update the datastore
1561 datastore.update(vdu)
1562
1563 # add the VDU to the set of processed VDUs
1564 processed.add(vdu.vdu_id)
1565
1566 @asyncio.coroutine
1567 def instantiate(vdu):
1568 """Instantiate the specified VDU
1569
1570 Arguments:
1571 vdu - a VirtualDeploymentUnitRecord
1572
1573 Raises:
1574 if the VDU, or any of the VDUs this VDU depends upon, are
1575 terminated or fail to instantiate properly, a
1576 VirtualDeploymentUnitRecordError is raised.
1577
1578 """
1579 for dependency in dependencies[vdu.vdu_id]:
1580 self._log.debug("{}: waiting for {}".format(vdu.vdu_id, dependency.vdu_id))
1581
1582 while dependency.vdu_id not in processed:
1583 yield from asyncio.sleep(1, loop=self._loop)
1584
1585 if not dependency.active:
1586 raise VirtualDeploymentUnitRecordError()
1587
1588 self._log.debug('instantiating {}'.format(vdu.vdu_id))
1589
1590 # Populate the datastore with the current values of the VDU
1591 datastore.add(vdu)
1592
1593 # Substitute any variables contained in the cloud config script
1594 config = str(vdu.vdud_cloud_init) if vdu.vdud_cloud_init is not None else ""
1595
1596 parts = re.split("\{\{ ([^\}]+) \}\}", config)
1597 if len(parts) > 1:
1598
1599 # Extract the variable names
1600 variables = list()
1601 for variable in parts[1::2]:
1602 variables.append(variable.lstrip('{{').rstrip('}}').strip())
1603
1604 # Iterate of the variables and substitute values from the
1605 # datastore.
1606 for variable in variables:
1607
1608 # Handle a reference to a VDU by ID
1609 if variable.startswith('vdu['):
1610 value = datastore.get(variable)
1611 if value is None:
1612 msg = "Unable to find a substitute for {} in {} cloud-init script"
1613 raise ValueError(msg.format(variable, vdu.vdu_id))
1614
1615 config = config.replace("{{ %s }}" % variable, value)
1616 continue
1617
1618 # Handle a reference to the current VDU
1619 if variable.startswith('vdu'):
1620 value = datastore.get('vdu[{}]'.format(vdu.vdu_id) + variable[3:])
1621 config = config.replace("{{ %s }}" % variable, value)
1622 continue
1623
1624 # Handle unrecognized variables
1625 msg = 'unrecognized cloud-config variable: {}'
1626 raise ValueError(msg.format(variable))
1627
1628 # Instantiate the VDU
1629 with self._dts.transaction() as xact:
1630 self._log.debug("Instantiating vdu: %s", vdu)
1631 yield from vdu.instantiate(xact, vnfr, config=config)
1632 if self._state == VirtualNetworkFunctionRecordState.FAILED:
1633 self._log.error("Instatiation of VNF %s failed while instantiating vdu %s",
1634 self.vnfr_id, vdu)
1635
1636 # First create a set of tasks to monitor the state of the VDUs and
1637 # report when they have entered a terminal state
1638 for vdu in self._vdus:
1639 self._loop.create_task(instantiate_monitor(vdu))
1640
1641 for vdu in self._vdus:
1642 self._loop.create_task(instantiate(vdu))
1643
1644 def has_mgmt_interface(self, vdu):
1645 # ## TODO: Support additional mgmt_interface type options
1646 if self.vnfd.mgmt_interface.vdu_id == vdu.id:
1647 return True
1648 return False
1649
1650 def vlr_xpath(self, vlr_id):
1651 """ vlr xpath """
1652 return(
1653 "D,/vlr:vlr-catalog/"
1654 "vlr:vlr[vlr:id = '{}']".format(vlr_id))
1655
1656 def ext_vlr_by_id(self, vlr_id):
1657 """ find ext vlr by id """
1658 return self._ext_vlrs[vlr_id]
1659
1660 @asyncio.coroutine
1661 def publish_inventory(self, xact):
1662 """ Publish the inventory associated with this VNF """
1663 self._log.debug("Publishing inventory for VNFR id: %s", self._vnfr_id)
1664
1665 for component in self._rw_vnfd.component:
1666 self._log.debug("Creating inventory component %s", component)
1667 mangled_name = VcsComponent.mangle_name(component.component_name,
1668 self.vnf_name,
1669 self.vnfd_id
1670 )
1671 comp = VcsComponent(dts=self._dts,
1672 log=self._log,
1673 loop=self._loop,
1674 cluster_name=self._cluster_name,
1675 vcs_handler=self._vcs_handler,
1676 component=component,
1677 mangled_name=mangled_name,
1678 )
1679 if comp.name in self._inventory:
1680 self._log.debug("Duplicate entries in inventory %s for vnfr %s",
1681 component, self._vnfd_id)
1682 return
1683 self._log.debug("Adding component %s for vnrf %s",
1684 comp.name, self._vnfr_id)
1685 self._inventory[comp.name] = comp
1686 yield from comp.publish(xact)
1687
1688 def all_vdus_active(self):
1689 """ Are all VDUS in this VNFR active? """
1690 for vdu in self._vdus:
1691 if not vdu.active:
1692 return False
1693
1694 self._log.debug("Inside all_vdus_active. Returning True")
1695 return True
1696
1697 @asyncio.coroutine
1698 def instantiation_failed(self, failed_reason=None):
1699 """ VNFR instantiation failed """
1700 self._log.debug("VNFR %s instantiation failed ", self.vnfr_id)
1701 self.set_state(VirtualNetworkFunctionRecordState.FAILED)
1702 self._state_failed_reason = failed_reason
1703
1704 # Update the VNFR with the changed status
1705 yield from self.publish(None)
1706
1707 @asyncio.coroutine
1708 def is_ready(self):
1709 """ This VNF is ready"""
1710 self._log.debug("VNFR id %s is ready", self.vnfr_id)
1711
1712 if self._state != VirtualNetworkFunctionRecordState.FAILED:
1713 self.set_state(VirtualNetworkFunctionRecordState.READY)
1714
1715 else:
1716 self._log.debug("VNFR id %s ignoring state change", self.vnfr_id)
1717
1718 # Update the VNFR with the changed status
1719 yield from self.publish(None)
1720
1721 def update_cp(self, cp_name, ip_address, mac_addr, cp_id):
1722 """Updated the connection point with ip address"""
1723 for cp in self._cprs:
1724 if cp.name == cp_name:
1725 self._log.debug("Setting ip address and id for cp %s, cpr %s with ip %s id %s",
1726 cp_name, cp, ip_address, cp_id)
1727 cp.ip_address = ip_address
1728 cp.mac_address = mac_addr
1729 cp.connection_point_id = cp_id
1730 return
1731
1732 err = "No connection point %s found in VNFR id %s" % (cp.name, self._vnfr_id)
1733 self._log.debug(err)
1734 raise VirtualDeploymentUnitRecordError(err)
1735
1736 def set_state(self, state):
1737 """ Set state for this VNFR"""
1738 self._state = state
1739
1740 @asyncio.coroutine
1741 def instantiate(self, xact, restart_mode=False):
1742 """ instantiate this VNF """
1743 self.set_state(VirtualNetworkFunctionRecordState.VL_INIT_PHASE)
1744 self._rw_vnfd = yield from self._vnfm.fetch_vnfd(self._vnfd_id)
1745
1746 @asyncio.coroutine
1747 def fetch_vlrs():
1748 """ Fetch VLRs """
1749 # Iterate over all the connection points in VNFR and fetch the
1750 # associated VLRs
1751
1752 def cpr_from_cp(cp):
1753 """ Creates a record level connection point from the desciptor cp"""
1754 cp_fields = ["name", "image", "vm-flavor"]
1755 cp_copy_dict = {k: v for k, v in cp.as_dict().items() if k in cp_fields}
1756 cpr_dict = {}
1757 cpr_dict.update(cp_copy_dict)
1758 return VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint.from_dict(cpr_dict)
1759
1760 self._log.debug("Fetching VLRs for VNFR id = %s, cps = %s",
1761 self._vnfr_id, self._vnfr.connection_point)
1762
1763 for cp in self._vnfr.connection_point:
1764 cpr = cpr_from_cp(cp)
1765 self._cprs.append(cpr)
1766 self._log.debug("Adding Connection point record %s ", cp)
1767
1768 vlr_path = self.vlr_xpath(cp.vlr_ref)
1769 self._log.debug("Fetching VLR with path = %s", vlr_path)
1770 res_iter = yield from self._dts.query_read(self.vlr_xpath(cp.vlr_ref),
1771 rwdts.XactFlag.MERGE)
1772 for i in res_iter:
1773 r = yield from i
1774 d = r.result
1775 self._ext_vlrs[cp.vlr_ref] = d
1776 cpr.vlr_ref = cp.vlr_ref
1777 self._log.debug("Fetched VLR [%s] with path = [%s]", d, vlr_path)
1778
1779 # Increase the VNFD reference count
1780 self.vnfd_ref()
1781
1782 assert self.vnfd
1783
1784 # Fetch External VLRs
1785 self._log.debug("VNFR-ID %s: Fetching vlrs", self._vnfr_id)
1786 yield from fetch_vlrs()
1787
1788 # Publish inventory
1789 self._log.debug("VNFR-ID %s: Publishing Inventory", self._vnfr_id)
1790 yield from self.publish_inventory(xact)
1791
1792 # Publish inventory
1793 self._log.debug("VNFR-ID %s: Creating VLs", self._vnfr_id)
1794 yield from self.create_vls()
1795
1796 # publish the VNFR
1797 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1798 yield from self.publish(xact)
1799
1800 # instantiate VLs
1801 self._log.debug("VNFR-ID %s: Instantiate VLs", self._vnfr_id)
1802 try:
1803 yield from self.instantiate_vls(xact, restart_mode)
1804 except Exception as e:
1805 self._log.exception("VL instantiation failed (%s)", str(e))
1806 yield from self.instantiation_failed(str(e))
1807 return
1808
1809 self.set_state(VirtualNetworkFunctionRecordState.VM_INIT_PHASE)
1810
1811 # instantiate VDUs
1812 self._log.debug("VNFR-ID %s: Create VDUs", self._vnfr_id)
1813 yield from self.create_vdus(self, restart_mode)
1814
1815 # publish the VNFR
1816 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1817 yield from self.publish(xact)
1818
1819 # instantiate VDUs
1820 # ToDo: Check if this should be prevented during restart
1821 self._log.debug("VNFR-ID %s: Instantiate VDUs", self._vnfr_id)
1822 _ = self._loop.create_task(self.instantiate_vdus(xact, self))
1823
1824 # publish the VNFR
1825 self._log.debug("VNFR-ID %s: Publish VNFR", self._vnfr_id)
1826 yield from self.publish(xact)
1827
1828 self._log.debug("VNFR-ID %s: Instantiation Done", self._vnfr_id)
1829
1830 # create task updating uptime for this vnfr
1831 self._log.debug("VNFR-ID %s: Starting task to update uptime", self._vnfr_id)
1832 self._loop.create_task(self.vnfr_uptime_update(xact))
1833
1834 @asyncio.coroutine
1835 def terminate(self, xact):
1836 """ Terminate this virtual network function """
1837
1838 self._log.debug("Terminatng VNF id %s", self.vnfr_id)
1839
1840 self.set_state(VirtualNetworkFunctionRecordState.TERMINATE)
1841
1842 # stop monitoring
1843 if self._vnf_mon is not None:
1844 self._vnf_mon.stop()
1845 self._vnf_mon.deregister()
1846 self._vnf_mon = None
1847
1848 @asyncio.coroutine
1849 def terminate_vls():
1850 """ Terminate VLs in this VNF """
1851 for vl in self._vlrs:
1852 yield from vl.terminate(xact)
1853
1854 @asyncio.coroutine
1855 def terminate_vdus():
1856 """ Terminate VDUS in this VNF """
1857 for vdu in self._vdus:
1858 yield from vdu.terminate(xact)
1859
1860 self._log.debug("Terminatng VLs in VNF id %s", self.vnfr_id)
1861 self.set_state(VirtualNetworkFunctionRecordState.VL_TERMINATE_PHASE)
1862 yield from terminate_vls()
1863
1864 self._log.debug("Terminatng VDUs in VNF id %s", self.vnfr_id)
1865 self.set_state(VirtualNetworkFunctionRecordState.VDU_TERMINATE_PHASE)
1866 yield from terminate_vdus()
1867
1868 self._log.debug("Terminated VNF id %s", self.vnfr_id)
1869 self.set_state(VirtualNetworkFunctionRecordState.TERMINATED)
1870
1871 @asyncio.coroutine
1872 def vnfr_uptime_update(self, xact):
1873 while True:
1874 # Return when vnfr state is FAILED or TERMINATED etc
1875 if self._state not in [VirtualNetworkFunctionRecordState.INIT,
1876 VirtualNetworkFunctionRecordState.VL_INIT_PHASE,
1877 VirtualNetworkFunctionRecordState.VM_INIT_PHASE,
1878 VirtualNetworkFunctionRecordState.READY]:
1879 return
1880 yield from self.publish(xact)
1881 yield from asyncio.sleep(2, loop=self._loop)
1882
1883
1884
1885 class VnfdDtsHandler(object):
1886 """ DTS handler for VNFD config changes """
1887 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
1888
1889 def __init__(self, dts, log, loop, vnfm):
1890 self._dts = dts
1891 self._log = log
1892 self._loop = loop
1893 self._vnfm = vnfm
1894 self._regh = None
1895
1896 @asyncio.coroutine
1897 def regh(self):
1898 """ DTS registration handle """
1899 return self._regh
1900
1901 @asyncio.coroutine
1902 def register(self):
1903 """ Register for VNFD configuration"""
1904
1905 def on_apply(dts, acg, xact, action, scratch):
1906 """Apply the configuration"""
1907 self._log.debug("Got VNFM VNFD apply (xact: %s) (action: %s)(scr: %s)",
1908 xact, action, scratch)
1909
1910 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
1911
1912 @asyncio.coroutine
1913 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
1914 """ on prepare callback """
1915 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s)",
1916 ks_path.to_xpath(RwVnfmYang.get_schema()), msg)
1917 fref = ProtobufC.FieldReference.alloc()
1918 fref.goto_whole_message(msg.to_pbcm())
1919
1920 # Handle deletes in prepare_callback
1921 if fref.is_field_deleted():
1922 # Delete an VNFD record
1923 self._log.debug("Deleting VNFD with id %s", msg.id)
1924 if self._vnfm.vnfd_in_use(msg.id):
1925 self._log.debug("Cannot delete VNFD in use - %s", msg)
1926 err = "Cannot delete a VNFD in use - %s" % msg
1927 raise VirtualNetworkFunctionDescriptorRefCountExists(err)
1928 # Delete a VNFD record
1929 yield from self._vnfm.delete_vnfd(msg.id)
1930
1931 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1932
1933 self._log.debug(
1934 "Registering for VNFD config using xpath: %s",
1935 VnfdDtsHandler.XPATH,
1936 )
1937 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
1938 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
1939 self._regh = acg.register(
1940 xpath=VnfdDtsHandler.XPATH,
1941 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1942 on_prepare=on_prepare)
1943
1944
1945 class VcsComponentDtsHandler(object):
1946 """ Vcs Component DTS handler """
1947 XPATH = ("D,/rw-manifest:manifest" +
1948 "/rw-manifest:operational-inventory" +
1949 "/rw-manifest:component")
1950
1951 def __init__(self, dts, log, loop, vnfm):
1952 self._dts = dts
1953 self._log = log
1954 self._loop = loop
1955 self._regh = None
1956 self._vnfm = vnfm
1957
1958 @property
1959 def regh(self):
1960 """ DTS registration handle """
1961 return self._regh
1962
1963 @asyncio.coroutine
1964 def register(self):
1965 """ Registers VCS component dts publisher registration"""
1966 self._log.debug("VCS Comp publisher DTS handler registering path %s",
1967 VcsComponentDtsHandler.XPATH)
1968
1969 hdl = rift.tasklets.DTS.RegistrationHandler()
1970 handlers = rift.tasklets.Group.Handler()
1971 with self._dts.group_create(handler=handlers) as group:
1972 self._regh = group.register(xpath=VcsComponentDtsHandler.XPATH,
1973 handler=hdl,
1974 flags=(rwdts.Flag.PUBLISHER |
1975 rwdts.Flag.NO_PREP_READ |
1976 rwdts.Flag.DATASTORE),)
1977
1978 @asyncio.coroutine
1979 def publish(self, xact, path, msg):
1980 """ Publishes the VCS component """
1981 self._log.debug("Publishing the VcsComponent xact = %s, %s:%s",
1982 xact, path, msg)
1983 self.regh.create_element(path, msg)
1984 self._log.debug("Published the VcsComponent to %s xact = %s, %s:%s",
1985 VcsComponentDtsHandler.XPATH, xact, path, msg)
1986
1987 class VnfrConsoleOperdataDtsHandler(object):
1988 """ registers 'D,/vnfr:vnfr-console/vnfr:vnfr[id]/vdur[id]' and handles CRUD from DTS"""
1989 @property
1990 def vnfr_vdu_console_xpath(self):
1991 """ path for resource-mgr"""
1992 return ("D,/rw-vnfr:vnfr-console/rw-vnfr:vnfr[rw-vnfr:id='{}']/rw-vnfr:vdur[vnfr:id='{}']".format(self._vnfr_id,self._vdur_id))
1993
1994 def __init__(self, dts, log, loop, vnfm, vnfr_id, vdur_id, vdu_id):
1995 self._dts = dts
1996 self._log = log
1997 self._loop = loop
1998 self._regh = None
1999 self._vnfm = vnfm
2000
2001 self._vnfr_id = vnfr_id
2002 self._vdur_id = vdur_id
2003 self._vdu_id = vdu_id
2004
2005 @asyncio.coroutine
2006 def register(self):
2007 """ Register for VNFR VDU Operational Data read from dts """
2008
2009 @asyncio.coroutine
2010 def on_prepare(xact_info, action, ks_path, msg):
2011 """ prepare callback from dts """
2012 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2013 self._log.debug(
2014 "Got VNFR VDU Opdata xact_info: %s, action: %s): %s:%s",
2015 xact_info, action, xpath, msg
2016 )
2017
2018 if action == rwdts.QueryAction.READ:
2019 schema = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur.schema()
2020 path_entry = schema.keyspec_to_entry(ks_path)
2021 self._log.debug("VDU Opdata path is {}".format(path_entry))
2022 try:
2023 vnfr = self._vnfm.get_vnfr(self._vnfr_id)
2024 except VnfRecordError as e:
2025 self._log.error("VNFR id %s not found", self._vnfr_id)
2026 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2027 return
2028 try:
2029 vdur= vnfr._get_vdur_from_vdu_id(self._vdu_id)
2030 if not vdur._state == VDURecordState.READY:
2031 self._log.debug("VDUR state is not READY. current state is {}".format(vdur._state))
2032 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2033 return
2034 with self._dts.transaction() as new_xact:
2035 resp = yield from vdur.read_resource(new_xact)
2036 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2037 vdur_console.id = self._vdur_id
2038 if resp.console_url:
2039 vdur_console.console_url = resp.console_url
2040 else:
2041 vdur_console.console_url = 'none'
2042 self._log.debug("Recevied console URL for vdu {} is {}".format(self._vdu_id,vdur_console))
2043 except Exception:
2044 self._log.exception("Caught exception while reading VDU %s", self._vdu_id)
2045 vdur_console = RwVnfrYang.YangData_RwVnfr_VnfrConsole_Vnfr_Vdur()
2046 vdur_console.id = self._vdur_id
2047 vdur_console.console_url = 'none'
2048
2049 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
2050 xpath=self.vnfr_vdu_console_xpath,
2051 msg=vdur_console)
2052 else:
2053 #raise VnfRecordError("Not supported operation %s" % action)
2054 self._log.error("Not supported operation %s" % action)
2055 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK)
2056 return
2057
2058
2059 self._log.debug("Registering for VNFR VDU using xpath: %s",
2060 self.vnfr_vdu_console_xpath)
2061 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2062 with self._dts.group_create() as group:
2063 self._regh = group.register(xpath=self.vnfr_vdu_console_xpath,
2064 handler=hdl,
2065 flags=rwdts.Flag.PUBLISHER,
2066 )
2067
2068
2069 class VnfrDtsHandler(object):
2070 """ registers 'D,/vnfr:vnfr-catalog/vnfr:vnfr' and handles CRUD from DTS"""
2071 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
2072
2073 def __init__(self, dts, log, loop, vnfm):
2074 self._dts = dts
2075 self._log = log
2076 self._loop = loop
2077 self._vnfm = vnfm
2078
2079 self._regh = None
2080
2081 @property
2082 def regh(self):
2083 """ Return registration handle"""
2084 return self._regh
2085
2086 @property
2087 def vnfm(self):
2088 """ Return VNF manager instance """
2089 return self._vnfm
2090
2091 @asyncio.coroutine
2092 def register(self):
2093 """ Register for vnfr create/update/delete/read requests from dts """
2094 def on_commit(xact_info):
2095 """ The transaction has been committed """
2096 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
2097 return rwdts.MemberRspCode.ACTION_OK
2098
2099 def on_abort(*args):
2100 """ Abort callback """
2101 self._log.debug("VNF transaction got aborted")
2102
2103 @asyncio.coroutine
2104 def on_event(dts, g_reg, xact, xact_event, scratch_data):
2105
2106 @asyncio.coroutine
2107 def instantiate_realloc_vnfr(vnfr):
2108 """Re-populate the vnfm after restart
2109
2110 Arguments:
2111 vlink
2112
2113 """
2114
2115 yield from vnfr.instantiate(None, restart_mode=True)
2116
2117 if xact_event == rwdts.MemberEvent.INSTALL:
2118 curr_cfg = self.regh.elements
2119 for cfg in curr_cfg:
2120 vnfr = self.vnfm.create_vnfr(cfg)
2121 self._loop.create_task(instantiate_realloc_vnfr(vnfr))
2122
2123 self._log.debug("Got on_event in vnfm")
2124
2125 return rwdts.MemberRspCode.ACTION_OK
2126
2127 @asyncio.coroutine
2128 def on_prepare(xact_info, action, ks_path, msg):
2129 """ prepare callback from dts """
2130 self._log.debug(
2131 "Got vnfr on_prepare callback (xact_info: %s, action: %s): %s",
2132 xact_info, action, msg
2133 )
2134
2135 if action == rwdts.QueryAction.CREATE:
2136 if not msg.has_field("vnfd"):
2137 err = "Vnfd not provided"
2138 self._log.error(err)
2139 raise VnfRecordError(err)
2140
2141 vnfr = self.vnfm.create_vnfr(msg)
2142 try:
2143 # RIFT-9105: Unable to add a READ query under an existing transaction
2144 # xact = xact_info.xact
2145 yield from vnfr.instantiate(None)
2146 except Exception as e:
2147 self._log.exception(e)
2148 self._log.error("Error while instantiating vnfr:%s", vnfr.vnfr_id)
2149 vnfr.set_state(VirtualNetworkFunctionRecordState.FAILED)
2150 yield from vnfr.publish(None)
2151 elif action == rwdts.QueryAction.DELETE:
2152 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2153 path_entry = schema.keyspec_to_entry(ks_path)
2154 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2155
2156 if vnfr is None:
2157 self._log.debug("VNFR id %s not found for delete", path_entry.key00.id)
2158 raise VirtualNetworkFunctionRecordNotFound(
2159 "VNFR id %s", path_entry.key00.id)
2160
2161 try:
2162 yield from vnfr.terminate(xact_info.xact)
2163 # Unref the VNFD
2164 vnfr.vnfd_unref()
2165 yield from self._vnfm.delete_vnfr(xact_info.xact, vnfr)
2166 except Exception as e:
2167 self._log.exception(e)
2168 self._log.error("Caught exception while deleting vnfr %s", path_entry.key00.id)
2169
2170 elif action == rwdts.QueryAction.UPDATE:
2171 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
2172 path_entry = schema.keyspec_to_entry(ks_path)
2173 vnfr = None
2174 try:
2175 vnfr = self._vnfm.get_vnfr(path_entry.key00.id)
2176 except Exception as e:
2177 self._log.debug("No vnfr found with id %s", path_entry.key00.id)
2178 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2179 return
2180
2181 if vnfr is None:
2182 self._log.debug("VNFR id %s not found for update", path_entry.key00.id)
2183 xact_info.respond_xpath(rwdts.XactRspCode.NA)
2184 return
2185
2186 self._log.debug("VNFR {} update config status {} (current {})".
2187 format(vnfr.name, msg.config_status, vnfr.config_status))
2188 # Update the config status and publish
2189 vnfr._config_status = msg.config_status
2190 yield from vnfr.publish(None)
2191
2192 else:
2193 raise NotImplementedError(
2194 "%s action on VirtualNetworkFunctionRecord not supported",
2195 action)
2196
2197 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2198
2199 self._log.debug("Registering for VNFR using xpath: %s",
2200 VnfrDtsHandler.XPATH,)
2201
2202 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
2203 on_prepare=on_prepare,)
2204 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
2205 with self._dts.group_create(handler=handlers) as group:
2206 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
2207 handler=hdl,
2208 flags=(rwdts.Flag.PUBLISHER |
2209 rwdts.Flag.NO_PREP_READ |
2210 rwdts.Flag.CACHE |
2211 rwdts.Flag.DATASTORE),)
2212
2213 @asyncio.coroutine
2214 def create(self, xact, path, msg):
2215 """
2216 Create a VNFR record in DTS with path and message
2217 """
2218 self._log.debug("Creating VNFR xact = %s, %s:%s",
2219 xact, path, msg)
2220
2221 self.regh.create_element(path, msg)
2222 self._log.debug("Created VNFR xact = %s, %s:%s",
2223 xact, path, msg)
2224
2225 @asyncio.coroutine
2226 def update(self, xact, path, msg):
2227 """
2228 Update a VNFR record in DTS with path and message
2229 """
2230 self._log.debug("Updating VNFR xact = %s, %s:%s",
2231 xact, path, msg)
2232 self.regh.update_element(path, msg)
2233 self._log.debug("Updated VNFR xact = %s, %s:%s",
2234 xact, path, msg)
2235
2236 @asyncio.coroutine
2237 def delete(self, xact, path):
2238 """
2239 Delete a VNFR record in DTS with path and message
2240 """
2241 self._log.debug("Deleting VNFR xact = %s, %s", xact, path)
2242 self.regh.delete_element(path)
2243 self._log.debug("Deleted VNFR xact = %s, %s", xact, path)
2244
2245
2246 class VnfdRefCountDtsHandler(object):
2247 """ The VNFD Ref Count DTS handler """
2248 XPATH = "D,/vnfr:vnfr-catalog/rw-vnfr:vnfd-ref-count"
2249
2250 def __init__(self, dts, log, loop, vnfm):
2251 self._dts = dts
2252 self._log = log
2253 self._loop = loop
2254 self._vnfm = vnfm
2255
2256 self._regh = None
2257
2258 @property
2259 def regh(self):
2260 """ Return registration handle """
2261 return self._regh
2262
2263 @property
2264 def vnfm(self):
2265 """ Return the NS manager instance """
2266 return self._vnfm
2267
2268 @asyncio.coroutine
2269 def register(self):
2270 """ Register for VNFD ref count read from dts """
2271
2272 @asyncio.coroutine
2273 def on_prepare(xact_info, action, ks_path, msg):
2274 """ prepare callback from dts """
2275 xpath = ks_path.to_xpath(RwVnfrYang.get_schema())
2276 self._log.debug(
2277 "Got VNFD ref count get xact_info: %s, action: %s): %s:%s",
2278 xact_info, action, xpath, msg
2279 )
2280
2281 if action == rwdts.QueryAction.READ:
2282 schema = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount.schema()
2283 path_entry = schema.keyspec_to_entry(ks_path)
2284 vnfd_list = yield from self._vnfm.get_vnfd_refcount(path_entry.key00.vnfd_id_ref)
2285 for xpath, msg in vnfd_list:
2286 self._log.debug("Responding to ref count query path:%s, msg:%s",
2287 xpath, msg)
2288 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
2289 xpath=xpath,
2290 msg=msg)
2291 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2292 else:
2293 raise VnfRecordError("Not supported operation %s" % action)
2294
2295 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
2296 with self._dts.group_create() as group:
2297 self._regh = group.register(xpath=VnfdRefCountDtsHandler.XPATH,
2298 handler=hdl,
2299 flags=rwdts.Flag.PUBLISHER,
2300 )
2301
2302
2303 class VdurDatastore(object):
2304 """
2305 This VdurDatastore is intended to expose select information about a VDUR
2306 such that it can be referenced in a cloud config file. The data that is
2307 exposed does not necessarily follow the structure of the data in the yang
2308 model. This is intentional. The data that are exposed are intended to be
2309 agnostic of the yang model so that changes in the model do not necessarily
2310 require changes to the interface provided to the user. It also means that
2311 the user does not need to be familiar with the RIFT.ware yang models.
2312 """
2313
2314 def __init__(self):
2315 """Create an instance of VdurDatastore"""
2316 self._vdur_data = dict()
2317 self._pattern = re.compile("vdu\[([^]]+)\]\.(.+)")
2318
2319 def add(self, vdur):
2320 """Add a new VDUR to the datastore
2321
2322 Arguments:
2323 vdur - a VirtualDeploymentUnitRecord instance
2324
2325 Raises:
2326 A ValueError is raised if the VDUR is (1) None or (2) already in
2327 the datastore.
2328
2329 """
2330 if vdur.vdu_id is None:
2331 raise ValueError('VDURs are required to have an ID')
2332
2333 if vdur.vdu_id in self._vdur_data:
2334 raise ValueError('cannot add a VDUR more than once')
2335
2336 self._vdur_data[vdur.vdu_id] = dict()
2337
2338 def set_if_not_none(key, attr):
2339 if attr is not None:
2340 self._vdur_data[vdur.vdu_id][key] = attr
2341
2342 set_if_not_none('name', vdur._vdud.name)
2343 set_if_not_none('mgmt.ip', vdur.vm_management_ip)
2344
2345 def update(self, vdur):
2346 """Update the VDUR information in the datastore
2347
2348 Arguments:
2349 vdur - a GI representation of a VDUR
2350
2351 Raises:
2352 A ValueError is raised if the VDUR is (1) None or (2) already in
2353 the datastore.
2354
2355 """
2356 if vdur.vdu_id is None:
2357 raise ValueError('VNFDs are required to have an ID')
2358
2359 if vdur.vdu_id not in self._vdur_data:
2360 raise ValueError('VNF is not recognized')
2361
2362 def set_or_delete(key, attr):
2363 if attr is None:
2364 if key in self._vdur_data[vdur.vdu_id]:
2365 del self._vdur_data[vdur.vdu_id][key]
2366
2367 else:
2368 self._vdur_data[vdur.vdu_id][key] = attr
2369
2370 set_or_delete('name', vdur._vdud.name)
2371 set_or_delete('mgmt.ip', vdur.vm_management_ip)
2372
2373 def remove(self, vdur_id):
2374 """Remove all of the data associated with specified VDUR
2375
2376 Arguments:
2377 vdur_id - the identifier of a VNFD in the datastore
2378
2379 Raises:
2380 A ValueError is raised if the VDUR is not contained in the
2381 datastore.
2382
2383 """
2384 if vdur_id not in self._vdur_data:
2385 raise ValueError('VNF is not recognized')
2386
2387 del self._vdur_data[vdur_id]
2388
2389 def get(self, expr):
2390 """Retrieve VDUR information from the datastore
2391
2392 An expression should be of the form,
2393
2394 vdu[<id>].<attr>
2395
2396 where <id> is the VDUR ID (an unquoted UUID), and <attr> is the name of
2397 the exposed attribute that the user wishes to retrieve.
2398
2399 If the requested data is not available, None is returned.
2400
2401 Arguments:
2402 expr - a string that specifies the data to return
2403
2404 Raises:
2405 A ValueError is raised if the provided expression cannot be parsed.
2406
2407 Returns:
2408 The requested data or None
2409
2410 """
2411 result = self._pattern.match(expr)
2412 if result is None:
2413 raise ValueError('data expression not recognized ({})'.format(expr))
2414
2415 vdur_id, key = result.groups()
2416
2417 if vdur_id not in self._vdur_data:
2418 return None
2419
2420 return self._vdur_data[vdur_id].get(key, None)
2421
2422
2423 class VnfManager(object):
2424 """ The virtual network function manager class """
2425 def __init__(self, dts, log, loop, cluster_name):
2426 self._dts = dts
2427 self._log = log
2428 self._loop = loop
2429 self._cluster_name = cluster_name
2430
2431 self._vcs_handler = VcsComponentDtsHandler(dts, log, loop, self)
2432 self._vnfr_handler = VnfrDtsHandler(dts, log, loop, self)
2433 self._vnfr_ref_handler = VnfdRefCountDtsHandler(dts, log, loop, self)
2434 self._nsr_handler = mano_dts.NsInstanceConfigSubscriber(log, dts, loop, callback=self.handle_nsr)
2435
2436 self._dts_handlers = [VnfdDtsHandler(dts, log, loop, self),
2437 self._vnfr_handler,
2438 self._vcs_handler,
2439 self._vnfr_ref_handler,
2440 self._nsr_handler]
2441 self._vnfrs = {}
2442 self._vnfds_to_vnfr = {}
2443 self._nsrs = {}
2444
2445 @property
2446 def vnfr_handler(self):
2447 """ VNFR dts handler """
2448 return self._vnfr_handler
2449
2450 @property
2451 def vcs_handler(self):
2452 """ VCS dts handler """
2453 return self._vcs_handler
2454
2455 @asyncio.coroutine
2456 def register(self):
2457 """ Register all static DTS handlers """
2458 for hdl in self._dts_handlers:
2459 yield from hdl.register()
2460
2461 @asyncio.coroutine
2462 def run(self):
2463 """ Run this VNFM instance """
2464 self._log.debug("Run VNFManager - registering static DTS handlers""")
2465 yield from self.register()
2466
2467 def handle_nsr(self, nsr, action):
2468 if action in [rwdts.QueryAction.CREATE]:
2469 self._nsrs[nsr.id] = nsr
2470 elif action == rwdts.QueryAction.DELETE:
2471 if nsr.id in self._nsrs:
2472 del self._nsrs[nsr.id]
2473
2474 def get_linked_mgmt_network(self, vnfr):
2475 """For the given VNFR get the related mgmt network from the NSD, if
2476 available.
2477 """
2478 vnfd_id = vnfr.vnfd.id
2479 nsr_id = vnfr.nsr_id_ref
2480
2481 # for the given related VNFR, get the corresponding NSR-config
2482 nsr_obj = None
2483 try:
2484 nsr_obj = self._nsrs[nsr_id]
2485 except KeyError:
2486 raise("Unable to find the NS with the ID: {}".format(nsr_id))
2487
2488 # for the related NSD check if a VLD exists such that it's a mgmt
2489 # network
2490 for vld in nsr_obj.nsd.vld:
2491 if vld.mgmt_network:
2492 return vld.name
2493
2494 return None
2495
2496 def get_vnfr(self, vnfr_id):
2497 """ get VNFR by vnfr id """
2498
2499 if vnfr_id not in self._vnfrs:
2500 raise VnfRecordError("VNFR id %s not found", vnfr_id)
2501
2502 return self._vnfrs[vnfr_id]
2503
2504 def create_vnfr(self, vnfr):
2505 """ Create a VNFR instance """
2506 if vnfr.id in self._vnfrs:
2507 msg = "Vnfr id %s already exists" % vnfr.id
2508 self._log.error(msg)
2509 raise VnfRecordError(msg)
2510
2511 self._log.info("Create VirtualNetworkFunctionRecord %s from vnfd_id: %s",
2512 vnfr.id,
2513 vnfr.vnfd.id)
2514
2515 mgmt_network = self.get_linked_mgmt_network(vnfr)
2516
2517 self._vnfrs[vnfr.id] = VirtualNetworkFunctionRecord(
2518 self._dts, self._log, self._loop, self._cluster_name, self, self.vcs_handler, vnfr,
2519 mgmt_network=mgmt_network
2520 )
2521
2522 #Update ref count
2523 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2524 self._vnfds_to_vnfr[vnfr.vnfd.id] += 1
2525 else:
2526 self._vnfds_to_vnfr[vnfr.vnfd.id] = 1
2527
2528 return self._vnfrs[vnfr.id]
2529
2530 @asyncio.coroutine
2531 def delete_vnfr(self, xact, vnfr):
2532 """ Create a VNFR instance """
2533 if vnfr.vnfr_id in self._vnfrs:
2534 self._log.debug("Deleting VNFR id %s", vnfr.vnfr_id)
2535 yield from self._vnfr_handler.delete(xact, vnfr.xpath)
2536
2537 if vnfr.vnfd.id in self._vnfds_to_vnfr:
2538 if self._vnfds_to_vnfr[vnfr.vnfd.id]:
2539 self._vnfds_to_vnfr[vnfr.vnfd.id] -= 1
2540
2541 del self._vnfrs[vnfr.vnfr_id]
2542
2543 @asyncio.coroutine
2544 def fetch_vnfd(self, vnfd_id):
2545 """ Fetch VNFDs based with the vnfd id"""
2546 vnfd_path = VirtualNetworkFunctionRecord.vnfd_xpath(vnfd_id)
2547 self._log.debug("Fetch vnfd with path %s", vnfd_path)
2548 vnfd = None
2549
2550 res_iter = yield from self._dts.query_read(vnfd_path, rwdts.XactFlag.MERGE)
2551
2552 for ent in res_iter:
2553 res = yield from ent
2554 vnfd = res.result
2555
2556 if vnfd is None:
2557 err = "Failed to get Vnfd %s" % vnfd_id
2558 self._log.error(err)
2559 raise VnfRecordError(err)
2560
2561 self._log.debug("Fetched vnfd for path %s, vnfd - %s", vnfd_path, vnfd)
2562
2563 return vnfd
2564
2565 def vnfd_in_use(self, vnfd_id):
2566 """ Is this VNFD in use """
2567 self._log.debug("Is this VNFD in use - msg:%s", vnfd_id)
2568 if vnfd_id in self._vnfds_to_vnfr:
2569 return (self._vnfds_to_vnfr[vnfd_id] > 0)
2570 return False
2571
2572 @asyncio.coroutine
2573 def publish_vnfr(self, xact, path, msg):
2574 """ Publish a VNFR """
2575 self._log.debug("publish_vnfr called with path %s, msg %s",
2576 path, msg)
2577 yield from self.vnfr_handler.update(xact, path, msg)
2578
2579 @asyncio.coroutine
2580 def delete_vnfd(self, vnfd_id):
2581 """ Delete the Virtual Network Function descriptor with the passed id """
2582 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
2583 if vnfd_id in self._vnfds_to_vnfr:
2584 if self._vnfds_to_vnfr[vnfd_id]:
2585 self._log.debug("Cannot delete VNFD id %s reference exists %s",
2586 vnfd_id,
2587 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2588 raise VirtualNetworkFunctionDescriptorRefCountExists(
2589 "Cannot delete :%s, ref_count:%s",
2590 vnfd_id,
2591 self._vnfds_to_vnfr[vnfd_id].vnfd_ref_count)
2592
2593 del self._vnfds_to_vnfr[vnfd_id]
2594
2595 # Remove any files uploaded with VNFD and stored under $RIFT_ARTIFACTS/libs/<id>
2596 try:
2597 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2598 vnfd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', vnfd_id)
2599 if os.path.exists(vnfd_dir):
2600 shutil.rmtree(vnfd_dir, ignore_errors=True)
2601 except Exception as e:
2602 self._log.error("Exception in cleaning up VNFD {}: {}".
2603 format(self._vnfds_to_vnfr[vnfd_id].vnfd.name, e))
2604 self._log.exception(e)
2605
2606
2607 def vnfd_refcount_xpath(self, vnfd_id):
2608 """ xpath for ref count entry """
2609 return (VnfdRefCountDtsHandler.XPATH +
2610 "[rw-vnfr:vnfd-id-ref = '{}']").format(vnfd_id)
2611
2612 @asyncio.coroutine
2613 def get_vnfd_refcount(self, vnfd_id):
2614 """ Get the vnfd_list from this VNFM"""
2615 vnfd_list = []
2616 if vnfd_id is None or vnfd_id == "":
2617 for vnfd in self._vnfds_to_vnfr.keys():
2618 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2619 vnfd_msg.vnfd_id_ref = vnfd
2620 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd]
2621 vnfd_list.append((self.vnfd_refcount_xpath(vnfd), vnfd_msg))
2622 elif vnfd_id in self._vnfds_to_vnfr:
2623 vnfd_msg = RwVnfrYang.YangData_Vnfr_VnfrCatalog_VnfdRefCount()
2624 vnfd_msg.vnfd_id_ref = vnfd_id
2625 vnfd_msg.instance_ref_count = self._vnfds_to_vnfr[vnfd_id]
2626 vnfd_list.append((self.vnfd_refcount_xpath(vnfd_id), vnfd_msg))
2627
2628 return vnfd_list
2629
2630
2631 class VnfmTasklet(rift.tasklets.Tasklet):
2632 """ VNF Manager tasklet class """
2633 def __init__(self, *args, **kwargs):
2634 super(VnfmTasklet, self).__init__(*args, **kwargs)
2635 self.rwlog.set_category("rw-mano-log")
2636 self.rwlog.set_subcategory("vnfm")
2637
2638 self._dts = None
2639 self._vnfm = None
2640
2641 def start(self):
2642 try:
2643 super(VnfmTasklet, self).start()
2644 self.log.info("Starting VnfmTasklet")
2645
2646 self.log.setLevel(logging.DEBUG)
2647
2648 self.log.debug("Registering with dts")
2649 self._dts = rift.tasklets.DTS(self.tasklet_info,
2650 RwVnfmYang.get_schema(),
2651 self.loop,
2652 self.on_dts_state_change)
2653
2654 self.log.debug("Created DTS Api GI Object: %s", self._dts)
2655 except Exception:
2656 print("Caught Exception in VNFM start:", sys.exc_info()[0])
2657 raise
2658
2659 def on_instance_started(self):
2660 """ Task insance started callback """
2661 self.log.debug("Got instance started callback")
2662
2663 def stop(self):
2664 try:
2665 self._dts.deinit()
2666 except Exception:
2667 print("Caught Exception in VNFM stop:", sys.exc_info()[0])
2668 raise
2669
2670 @asyncio.coroutine
2671 def init(self):
2672 """ Task init callback """
2673 try:
2674 vm_parent_name = self.tasklet_info.get_parent_vm_parent_instance_name()
2675 assert vm_parent_name is not None
2676 self._vnfm = VnfManager(self._dts, self.log, self.loop, vm_parent_name)
2677 yield from self._vnfm.run()
2678 except Exception:
2679 print("Caught Exception in VNFM init:", sys.exc_info()[0])
2680 raise
2681
2682 @asyncio.coroutine
2683 def run(self):
2684 """ Task run callback """
2685 pass
2686
2687 @asyncio.coroutine
2688 def on_dts_state_change(self, state):
2689 """Take action according to current dts state to transition
2690 application into the corresponding application state
2691
2692 Arguments
2693 state - current dts state
2694 """
2695 switch = {
2696 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
2697 rwdts.State.CONFIG: rwdts.State.RUN,
2698 }
2699
2700 handlers = {
2701 rwdts.State.INIT: self.init,
2702 rwdts.State.RUN: self.run,
2703 }
2704
2705 # Transition application to next state
2706 handler = handlers.get(state, None)
2707 if handler is not None:
2708 yield from handler()
2709
2710 # Transition dts to next state
2711 next_state = switch.get(state, None)
2712 if next_state is not None:
2713 self._dts.handle.set_state(next_state)