Bug 36 Multi site NS support and support for IP profile params
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28
29
30 from collections import deque
31 from collections import defaultdict
32 from enum import Enum
33
34 import gi
35 gi.require_version('RwYang', '1.0')
36 gi.require_version('RwNsdYang', '1.0')
37 gi.require_version('RwDts', '1.0')
38 gi.require_version('RwNsmYang', '1.0')
39 gi.require_version('RwNsrYang', '1.0')
40 gi.require_version('RwTypes', '1.0')
41 gi.require_version('RwVlrYang', '1.0')
42 gi.require_version('RwVnfrYang', '1.0')
43 from gi.repository import (
44 RwYang,
45 RwNsrYang,
46 NsrYang,
47 NsdYang,
48 RwVlrYang,
49 VnfrYang,
50 RwVnfrYang,
51 RwNsmYang,
52 RwsdnYang,
53 RwDts as rwdts,
54 RwTypes,
55 ProtobufC,
56 )
57
58 import rift.tasklets
59 import rift.mano.ncclient
60 import rift.mano.config_data.config
61 import rift.mano.dts as mano_dts
62
63 from . import rwnsm_conman as conman
64 from . import cloud
65 from . import publisher
66 from . import xpath
67 from . import config_value_pool
68 from . import rwvnffgmgr
69 from . import scale_group
70
71
72 class NetworkServiceRecordState(Enum):
73 """ Network Service Record State """
74 INIT = 101
75 VL_INIT_PHASE = 102
76 VNF_INIT_PHASE = 103
77 VNFFG_INIT_PHASE = 104
78 RUNNING = 106
79 SCALING_OUT = 107
80 SCALING_IN = 108
81 TERMINATE = 109
82 TERMINATE_RCVD = 110
83 VL_TERMINATE_PHASE = 111
84 VNF_TERMINATE_PHASE = 112
85 VNFFG_TERMINATE_PHASE = 113
86 TERMINATED = 114
87 FAILED = 115
88 VL_INSTANTIATE = 116
89 VL_TERMINATE = 117
90
91
92 class NetworkServiceRecordError(Exception):
93 """ Network Service Record Error """
94 pass
95
96
97 class NetworkServiceDescriptorError(Exception):
98 """ Network Service Descriptor Error """
99 pass
100
101
102 class VirtualNetworkFunctionRecordError(Exception):
103 """ Virtual Network Function Record Error """
104 pass
105
106
107 class NetworkServiceDescriptorNotFound(Exception):
108 """ Cannot find Network Service Descriptor"""
109 pass
110
111
112 class NetworkServiceDescriptorRefCountExists(Exception):
113 """ Network Service Descriptor reference count exists """
114 pass
115
116
117 class NetworkServiceDescriptorUnrefError(Exception):
118 """ Failed to unref a network service descriptor """
119 pass
120
121
122 class NsrInstantiationFailed(Exception):
123 """ Failed to instantiate network service """
124 pass
125
126
127 class VnfInstantiationFailed(Exception):
128 """ Failed to instantiate virtual network function"""
129 pass
130
131
132 class VnffgInstantiationFailed(Exception):
133 """ Failed to instantiate virtual network function"""
134 pass
135
136
137 class VnfDescriptorError(Exception):
138 """Failed to instantiate virtual network function"""
139 pass
140
141
142 class ScalingOperationError(Exception):
143 pass
144
145
146 class ScaleGroupMissingError(Exception):
147 pass
148
149
150 class PlacementGroupError(Exception):
151 pass
152
153
154 class NsrNsdUpdateError(Exception):
155 pass
156
157
158 class NsrVlUpdateError(NsrNsdUpdateError):
159 pass
160
161
162 class VlRecordState(Enum):
163 """ VL Record State """
164 INIT = 101
165 INSTANTIATION_PENDING = 102
166 ACTIVE = 103
167 TERMINATE_PENDING = 104
168 TERMINATED = 105
169 FAILED = 106
170
171
172 class VnffgRecordState(Enum):
173 """ VNFFG Record State """
174 INIT = 101
175 INSTANTIATION_PENDING = 102
176 ACTIVE = 103
177 TERMINATE_PENDING = 104
178 TERMINATED = 105
179 FAILED = 106
180
181
182 class VnffgRecord(object):
183 """ Vnffg Records class"""
184 SFF_DP_PORT = 4790
185 SFF_MGMT_PORT = 5000
186 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
187
188 self._dts = dts
189 self._log = log
190 self._loop = loop
191 self._vnffgmgr = vnffgmgr
192 self._nsr = nsr
193 self._nsr_name = nsr_name
194 self._vnffgd_msg = vnffgd_msg
195 if sdn_account_name is None:
196 self._sdn_account_name = ''
197 else:
198 self._sdn_account_name = sdn_account_name
199
200 self._vnffgr_id = str(uuid.uuid4())
201 self._vnffgr_rsp_id = list()
202 self._vnffgr_state = VnffgRecordState.INIT
203
204 @property
205 def id(self):
206 """ VNFFGR id """
207 return self._vnffgr_id
208
209 @property
210 def state(self):
211 """ state of this VNF """
212 return self._vnffgr_state
213
214 def fetch_vnffgr(self):
215 """
216 Get VNFFGR message to be published
217 """
218
219 if self._vnffgr_state == VnffgRecordState.INIT:
220 vnffgr_dict = {"id": self._vnffgr_id,
221 "nsd_id": self._nsr.nsd_id,
222 "vnffgd_id_ref": self._vnffgd_msg.id,
223 "vnffgd_name_ref": self._vnffgd_msg.name,
224 "sdn_account": self._sdn_account_name,
225 "operational_status": 'init',
226 }
227 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
228 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
229 vnffgr_dict = {"id": self._vnffgr_id,
230 "nsd_id": self._nsr.nsd_id,
231 "vnffgd_id_ref": self._vnffgd_msg.id,
232 "vnffgd_name_ref": self._vnffgd_msg.name,
233 "sdn_account": self._sdn_account_name,
234 "operational_status": 'terminated',
235 }
236 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
237 else:
238 try:
239 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
240 except Exception:
241 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
242 self._vnffgr_state = VnffgRecordState.FAILED
243 vnffgr_dict = {"id": self._vnffgr_id,
244 "nsd_id": self._nsr.nsd_id,
245 "vnffgd_id_ref": self._vnffgd_msg.id,
246 "vnffgd_name_ref": self._vnffgd_msg.name,
247 "sdn_account": self._sdn_account_name,
248 "operational_status": 'failed',
249 }
250 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
251
252 return vnffgr
253
254 @asyncio.coroutine
255 def vnffgr_create_msg(self):
256 """ Virtual Link Record message for Creating VLR in VNS """
257 vnffgr_dict = {"id": self._vnffgr_id,
258 "nsd_id": self._nsr.nsd_id,
259 "vnffgd_id_ref": self._vnffgd_msg.id,
260 "vnffgd_name_ref": self._vnffgd_msg.name,
261 "sdn_account": self._sdn_account_name,
262 }
263 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
264 for rsp in self._vnffgd_msg.rsp:
265 vnffgr_rsp = vnffgr.rsp.add()
266 vnffgr_rsp.id = str(uuid.uuid4())
267 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
268 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
269 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
270 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
271 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
272 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
273 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
274 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
275 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
276 else:
277 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
278 continue
279
280 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
281 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
282 vnfr_cp_ref.hop_number = rsp_cp_ref.order
283 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
284 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
285 for nsr_vnfr in self._nsr.vnfrs.values():
286 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
287 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
288 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
289 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
290 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
291
292 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
293 self._log.debug(" Received VNFR is %s", vnfr)
294 while vnfr.operational_status != 'running':
295 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
296 if vnfr.operational_status == 'failed':
297 self._log.error("Fetching VNFR for %s failed", vnfr.id)
298 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
299 yield from asyncio.sleep(2, loop=self._loop)
300 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
301 self._log.debug("Received VNFR is %s", vnfr)
302
303 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
304 for cp in vnfr.connection_point:
305 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
306 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
307 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
308 for vdu in vnfr.vdur:
309 for ext_intf in vdu.external_interface:
310 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
311 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
312 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
313 vnfr_cp_ref.connection_point_params.vm_id)
314 break
315
316 vnfr_cp_ref.connection_point_params.address = cp.ip_address
317 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
318
319 for vnffgd_classifier in self._vnffgd_msg.classifier:
320 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
321 if len(_rsp) > 0:
322 rsp_id_ref = _rsp[0].id
323 rsp_name = _rsp[0].name
324 else:
325 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
326 continue
327 vnffgr_classifier = vnffgr.classifier.add()
328 vnffgr_classifier.id = vnffgd_classifier.id
329 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
330 _rsp[0].classifier_name = vnffgr_classifier.name
331 vnffgr_classifier.rsp_id_ref = rsp_id_ref
332 vnffgr_classifier.rsp_name = rsp_name
333 for nsr_vnfr in self._nsr.vnfrs.values():
334 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
335 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
336 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
337 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
338 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
339
340 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
341 vnffgr_classifier.sff_name = nsr_vnfr.name
342
343 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
344 self._log.debug(" Received VNFR is %s", vnfr)
345 while vnfr.operational_status != 'running':
346 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
347 if vnfr.operational_status == 'failed':
348 self._log.error("Fetching VNFR for %s failed", vnfr.id)
349 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
350 yield from asyncio.sleep(2, loop=self._loop)
351 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
352 self._log.debug("Received VNFR is %s", vnfr)
353
354 for cp in vnfr.connection_point:
355 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
356 vnffgr_classifier.port_id = cp.connection_point_id
357 vnffgr_classifier.ip_address = cp.ip_address
358 for vdu in vnfr.vdur:
359 for ext_intf in vdu.external_interface:
360 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
361 vnffgr_classifier.vm_id = vdu.vim_id
362 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
363 vnfr_cp_ref.connection_point_params.vm_id)
364 break
365
366 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
367 return vnffgr
368
369 @asyncio.coroutine
370 def vnffgr_nsr_sff_list(self):
371 """ SFF List for VNFR """
372 sff_list = {}
373 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
374
375 for nsr_vnfr in self._nsr.vnfrs.values():
376 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug(" Received VNFR is %s", vnfr)
379 while vnfr.operational_status != 'running':
380 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
381 if vnfr.operational_status == 'failed':
382 self._log.error("Fetching VNFR for %s failed", vnfr.id)
383 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
384 yield from asyncio.sleep(2, loop=self._loop)
385 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
386 self._log.debug("Received VNFR is %s", vnfr)
387
388 sff = RwsdnYang.VNFFGSff()
389 sff_list[nsr_vnfr.vnfd.id] = sff
390 sff.name = nsr_vnfr.name
391 sff.function_type = nsr_vnfr.vnfd.service_function_chain
392
393 sff.mgmt_address = vnfr.mgmt_interface.ip_address
394 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
395 for cp in vnfr.connection_point:
396 sff_dp = sff.dp_endpoints.add()
397 sff_dp.name = self._nsr.name + '.' + cp.name
398 sff_dp.address = cp.ip_address
399 sff_dp.port = VnffgRecord.SFF_DP_PORT
400 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
401 for sf_name in sf_list:
402 _sf = sff.vnfr_list.add()
403 _sf.vnfr_name = sf_name
404
405 return sff_list
406
407 @asyncio.coroutine
408 def instantiate(self):
409 """ Instantiate this VNFFG """
410
411 self._log.info("Instaniating VNFFGR with vnffgd %s",
412 self._vnffgd_msg)
413
414
415 vnffgr_request = yield from self.vnffgr_create_msg()
416 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
417
418 try:
419 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
420 except Exception as e:
421 self._log.exception("VNFFG instantiation failed: %s", str(e))
422 self._vnffgr_state = VnffgRecordState.FAILED
423 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
424
425 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
426
427 self._log.info("Instantiated VNFFGR :%s", vnffgr)
428 self._vnffgr_state = VnffgRecordState.ACTIVE
429
430 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
431 yield from self._nsr.update_state()
432
433 def vnffgr_in_vnffgrm(self):
434 """ Is there a VNFR record in VNFM """
435 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
436 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
437 self._vnffgr_state == VnffgRecordState.FAILED):
438 return True
439
440 return False
441
442 @asyncio.coroutine
443 def terminate(self):
444 """ Terminate this VNFFGR """
445 if not self.vnffgr_in_vnffgrm():
446 self._log.error("Ignoring terminate request for id %s in state %s",
447 self.id, self._vnffgr_state)
448 return
449
450 self._log.info("Terminating VNFFGR id:%s", self.id)
451 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
452
453 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
454
455 self._vnffgr_state = VnffgRecordState.TERMINATED
456 self._log.debug("Terminated VNFFGR id:%s", self.id)
457
458
459 class VirtualLinkRecord(object):
460 """ Virtual Link Records class"""
461 @staticmethod
462 @asyncio.coroutine
463 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False):
464 """Creates a new VLR object based on the given data.
465
466 If restart mode is enabled, then we look for existing records in the
467 DTS and create a VLR records using the exiting data(ID)
468
469 Returns:
470 VirtualLinkRecord
471 """
472 vlr_obj = VirtualLinkRecord(
473 dts,
474 log,
475 loop,
476 nsr_name,
477 vld_msg,
478 cloud_account_name,
479 om_datacenter,
480 ip_profile,
481 nsr_id,
482 )
483
484 if restart_mode:
485 res_iter = yield from dts.query_read(
486 "D,/vlr:vlr-catalog/vlr:vlr",
487 rwdts.XactFlag.MERGE)
488
489 for fut in res_iter:
490 response = yield from fut
491 vlr = response.result
492
493 # Check if the record is already present, if so use the ID of
494 # the existing record. Since the name of the record is uniquely
495 # formed we can use it as a search key!
496 if vlr.name == vlr_obj.name:
497 vlr_obj.reset_id(vlr.id)
498 break
499
500 return vlr_obj
501
502 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id):
503 self._dts = dts
504 self._log = log
505 self._loop = loop
506 self._nsr_name = nsr_name
507 self._vld_msg = vld_msg
508 self._cloud_account_name = cloud_account_name
509 self._om_datacenter_name = om_datacenter
510 self._assigned_subnet = None
511 self._nsr_id = nsr_id
512 self._ip_profile = ip_profile
513 self._vlr_id = str(uuid.uuid4())
514 self._state = VlRecordState.INIT
515 self._prev_state = None
516
517 @property
518 def xpath(self):
519 """ path for this object """
520 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
521
522 @property
523 def id(self):
524 """ VLR id """
525 return self._vlr_id
526
527 @property
528 def nsr_name(self):
529 """ Get NSR name for this VL """
530 return self.nsr_name
531
532 @property
533 def vld_msg(self):
534 """ Virtual Link Desciptor """
535 return self._vld_msg
536
537 @property
538 def assigned_subnet(self):
539 """ Subnet assigned to this VL"""
540 return self._assigned_subnet
541
542 @property
543 def name(self):
544 """
545 Get the name for this VLR.
546 VLR name is "nsr name:VLD name"
547 """
548 if self.vld_msg.vim_network_name:
549 return self.vld_msg.vim_network_name
550 elif self.vld_msg.name == "multisite":
551 # This is a temporary hack to identify manually provisioned inter-site network
552 return self.vld_msg.name
553 else:
554 return self._nsr_name + "." + self.vld_msg.name
555
556 @property
557 def cloud_account_name(self):
558 """ Cloud account that this VLR should be created in """
559 return self._cloud_account_name
560
561 @property
562 def om_datacenter_name(self):
563 """ Datacenter that this VLR should be created in """
564 return self._om_datacenter_name
565
566 @staticmethod
567 def vlr_xpath(vlr):
568 """ Get the VLR path from VLR """
569 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
570
571 @property
572 def state(self):
573 """ VLR state """
574 return self._state
575
576 @state.setter
577 def state(self, value):
578 """ VLR set state """
579 self._state = value
580
581 @property
582 def prev_state(self):
583 """ VLR previous state """
584 return self._prev_state
585
586 @prev_state.setter
587 def prev_state(self, value):
588 """ VLR set previous state """
589 self._prev_state = value
590
591 @property
592 def vlr_msg(self):
593 """ Virtual Link Record message for Creating VLR in VNS """
594 vld_fields = ["short_name",
595 "vendor",
596 "description",
597 "version",
598 "type_yang",
599 "vim_network_name",
600 "provider_network"]
601
602 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
603 if k in vld_fields}
604
605 vlr_dict = {"id": self._vlr_id,
606 "nsr_id_ref": self._nsr_id,
607 "vld_ref": self.vld_msg.id,
608 "name": self.name,
609 "cloud_account": self.cloud_account_name,
610 "om_datacenter": self.om_datacenter_name,
611 }
612
613 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
614 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
615
616 vlr_dict.update(vld_copy_dict)
617 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
618 return vlr
619
620 def reset_id(self, vlr_id):
621 self._vlr_id = vlr_id
622
623 def create_nsr_vlr_msg(self, vnfrs):
624 """ The VLR message"""
625 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
626 nsr_vlr.vlr_ref = self._vlr_id
627 nsr_vlr.assigned_subnet = self.assigned_subnet
628 nsr_vlr.cloud_account = self.cloud_account_name
629 nsr_vlr.om_datacenter = self.om_datacenter_name
630
631 for conn in self.vld_msg.vnfd_connection_point_ref:
632 for vnfr in vnfrs:
633 if (vnfr.vnfd.id == conn.vnfd_id_ref and
634 vnfr.member_vnf_index == conn.member_vnf_index_ref and
635 self.cloud_account_name == vnfr.cloud_account_name and
636 self.om_datacenter_name == vnfr.om_datacenter_name):
637 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
638 cp_entry.vnfr_id = vnfr.id
639 cp_entry.connection_point = conn.vnfd_connection_point_ref
640
641 return nsr_vlr
642
643 @asyncio.coroutine
644 def instantiate(self):
645 """ Instantiate this VL """
646
647 self._log.debug("Instaniating VLR key %s, vld %s",
648 self.xpath, self._vld_msg)
649 vlr = None
650 self._state = VlRecordState.INSTANTIATION_PENDING
651 self._log.debug("Executing VL create path:%s msg:%s",
652 self.xpath, self.vlr_msg)
653
654 with self._dts.transaction(flags=0) as xact:
655 block = xact.block_create()
656 block.add_query_create(self.xpath, self.vlr_msg)
657 self._log.debug("Executing VL create path:%s msg:%s",
658 self.xpath, self.vlr_msg)
659 res_iter = yield from block.execute(now=True)
660 for ent in res_iter:
661 res = yield from ent
662 vlr = res.result
663
664 if vlr is None:
665 self._state = VlRecordState.FAILED
666 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
667
668 if vlr.operational_status == 'failed':
669 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
670 self._state = VlRecordState.FAILED
671 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
672
673 self._log.info("Instantiated VL with xpath %s and vlr:%s",
674 self.xpath, vlr)
675 self._state = VlRecordState.ACTIVE
676 self._assigned_subnet = vlr.assigned_subnet
677
678 def vlr_in_vns(self):
679 """ Is there a VLR record in VNS """
680 if (self._state == VlRecordState.ACTIVE or
681 self._state == VlRecordState.INSTANTIATION_PENDING or
682 self._state == VlRecordState.TERMINATE_PENDING or
683 self._state == VlRecordState.FAILED):
684 return True
685
686 return False
687
688 @asyncio.coroutine
689 def terminate(self):
690 """ Terminate this VL """
691 if not self.vlr_in_vns():
692 self._log.debug("Ignoring terminate request for id %s in state %s",
693 self.id, self._state)
694 return
695
696 self._log.debug("Terminating VL id:%s", self.id)
697 self._state = VlRecordState.TERMINATE_PENDING
698
699 with self._dts.transaction(flags=0) as xact:
700 block = xact.block_create()
701 block.add_query_delete(self.xpath)
702 yield from block.execute(flags=0, now=True)
703
704 self._state = VlRecordState.TERMINATED
705 self._log.debug("Terminated VL id:%s", self.id)
706
707
708 class VnfRecordState(Enum):
709 """ Vnf Record State """
710 INIT = 101
711 INSTANTIATION_PENDING = 102
712 ACTIVE = 103
713 TERMINATE_PENDING = 104
714 TERMINATED = 105
715 FAILED = 106
716
717
718 class VirtualNetworkFunctionRecord(object):
719 """ Virtual Network Function Record class"""
720 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
721
722 @staticmethod
723 @asyncio.coroutine
724 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
725 cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id,
726 placement_groups, restart_mode=False):
727 """Creates a new VNFR object based on the given data.
728
729 If restart mode is enabled, then we look for existing records in the
730 DTS and create a VNFR records using the exiting data(ID)
731
732 Returns:
733 VirtualNetworkFunctionRecord
734 """
735 vnfr_obj = VirtualNetworkFunctionRecord(
736 dts,
737 log,
738 loop,
739 vnfd,
740 const_vnfd_msg,
741 nsd_id,
742 nsr_name,
743 cloud_account_name,
744 om_datacenter_name,
745 nsr_id,
746 group_name,
747 group_instance_id,
748 placement_groups,
749 restart_mode=restart_mode)
750
751 if restart_mode:
752 res_iter = yield from dts.query_read(
753 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
754 rwdts.XactFlag.MERGE)
755
756 for fut in res_iter:
757 response = yield from fut
758 vnfr = response.result
759
760 if vnfr.name == vnfr_obj.name:
761 vnfr_obj.reset_id(vnfr.id)
762 break
763
764 return vnfr_obj
765
766 def __init__(self,
767 dts,
768 log,
769 loop,
770 vnfd,
771 const_vnfd_msg,
772 nsd_id,
773 nsr_name,
774 cloud_account_name,
775 om_datacenter_name,
776 nsr_id,
777 group_name=None,
778 group_instance_id=None,
779 placement_groups = [],
780 restart_mode = False):
781 self._dts = dts
782 self._log = log
783 self._loop = loop
784 self._vnfd = vnfd
785 self._const_vnfd_msg = const_vnfd_msg
786 self._nsd_id = nsd_id
787 self._nsr_name = nsr_name
788 self._nsr_id = nsr_id
789 self._cloud_account_name = cloud_account_name
790 self._om_datacenter_name = om_datacenter_name
791 self._group_name = group_name
792 self._group_instance_id = group_instance_id
793 self._placement_groups = placement_groups
794 self._config_status = NsrYang.ConfigStates.INIT
795
796 self._prev_state = VnfRecordState.INIT
797 self._state = VnfRecordState.INIT
798 self._state_failed_reason = None
799
800 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
801 self.configure()
802
803 self._vnfr_id = str(uuid.uuid4())
804 self._name = None
805 self._vnfr_msg = self.create_vnfr_msg()
806 self._log.debug("Set VNFR {} config type to {}".
807 format(self.name, self.config_type))
808 self.restart_mode = restart_mode
809
810
811 if group_name is None and group_instance_id is not None:
812 raise ValueError("Group instance id must not be provided with an empty group name")
813
814 @property
815 def id(self):
816 """ VNFR id """
817 return self._vnfr_id
818
819 @property
820 def xpath(self):
821 """ VNFR xpath """
822 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
823
824 @property
825 def vnfr_msg(self):
826 """ VNFR message """
827 return self._vnfr_msg
828
829 @property
830 def const_vnfr_msg(self):
831 """ VNFR message """
832 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name)
833
834 @property
835 def vnfd(self):
836 """ vnfd """
837 return self._vnfd
838
839 @property
840 def cloud_account_name(self):
841 """ Cloud account that this VNF should be created in """
842 return self._cloud_account_name
843
844 @property
845 def om_datacenter_name(self):
846 """ Datacenter that this VNF should be created in """
847 return self._om_datacenter_name
848
849
850 @property
851 def active(self):
852 """ Is this VNF actve """
853 return True if self._state == VnfRecordState.ACTIVE else False
854
855 @property
856 def state(self):
857 """ state of this VNF """
858 return self._state
859
860 @property
861 def state_failed_reason(self):
862 """ Error message in case this VNF is in failed state """
863 return self._state_failed_reason
864
865 @property
866 def member_vnf_index(self):
867 """ Member VNF index """
868 return self._const_vnfd_msg.member_vnf_index
869
870 @property
871 def nsr_name(self):
872 """ NSR name"""
873 return self._nsr_name
874
875 @property
876 def name(self):
877 """ Name of this VNFR """
878 if self._name is not None:
879 return self._name
880
881 name_tags = [self._nsr_name]
882
883 if self._group_name is not None:
884 name_tags.append(self._group_name)
885
886 if self._group_instance_id is not None:
887 name_tags.append(str(self._group_instance_id))
888
889 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
890
891 self._name = "__".join(name_tags)
892
893 return self._name
894
895 @staticmethod
896 def vnfr_xpath(vnfr):
897 """ Get the VNFR path from VNFR """
898 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
899
900 @property
901 def config_type(self):
902 cfg_types = ['netconf', 'juju', 'script']
903 for method in cfg_types:
904 if self._vnfd.vnf_configuration.has_field(method):
905 return method
906 return 'none'
907
908 @property
909 def config_status(self):
910 """Return the config status as YANG ENUM string"""
911 self._log.debug("Map VNFR {} config status {} ({})".
912 format(self.name, self._config_status, self.config_type))
913 if self.config_type == 'none':
914 return 'config_not_needed'
915 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
916 return 'configured'
917 elif self._config_status == NsrYang.ConfigStates.FAILED:
918 return 'failed'
919
920 return 'configuring'
921
922 def set_state(self, state):
923 """ set the state of this object """
924 self._prev_state = self._state
925 self._state = state
926
927 def reset_id(self, vnfr_id):
928 self._vnfr_id = vnfr_id
929 self._vnfr_msg = self.create_vnfr_msg()
930
931 def configure(self):
932 self.config_store.merge_vnfd_config(
933 self._nsd_id,
934 self._vnfd,
935 self.member_vnf_index,
936 )
937
938 def create_vnfr_msg(self):
939 """ VNFR message for this VNFR """
940 vnfd_fields = [
941 "short_name",
942 "vendor",
943 "description",
944 "version",
945 "type_yang",
946 ]
947 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
948 vnfr_dict = {
949 "id": self.id,
950 "nsr_id_ref": self._nsr_id,
951 "vnfd_ref": self.vnfd.id,
952 "name": self.name,
953 "cloud_account": self._cloud_account_name,
954 "om_datacenter": self._om_datacenter_name,
955 "config_status": self.config_status
956 }
957 vnfr_dict.update(vnfd_copy_dict)
958
959 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
960 vnfr.member_vnf_index_ref = self.member_vnf_index
961 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
962
963 if self._vnfd.mgmt_interface.has_field("port"):
964 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
965
966 for group_info in self._placement_groups:
967 group = vnfr.placement_groups_info.add()
968 group.from_dict(group_info.as_dict())
969
970 # UI expects the monitoring param field to exist
971 vnfr.monitoring_param = []
972
973 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
974 return vnfr
975
976 @asyncio.coroutine
977 def update_vnfm(self):
978 self._log.debug("Send an update to VNFM for VNFR {} with {}".
979 format(self.name, self.vnfr_msg))
980 yield from self._dts.query_update(
981 self.xpath,
982 rwdts.XactFlag.TRACE,
983 self.vnfr_msg
984 )
985
986 def get_config_status(self):
987 """Return the config status as YANG ENUM"""
988 return self._config_status
989
990 @asyncio.coroutine
991 def set_config_status(self, status):
992
993 def status_to_string(status):
994 status_dc = {
995 NsrYang.ConfigStates.INIT : 'init',
996 NsrYang.ConfigStates.CONFIGURING : 'configuring',
997 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
998 NsrYang.ConfigStates.CONFIGURED : 'configured',
999 NsrYang.ConfigStates.FAILED : 'failed',
1000 }
1001
1002 return status_dc[status]
1003
1004 self._log.debug("Update VNFR {} from {} ({}) to {}".
1005 format(self.name, self._config_status,
1006 self.config_type, status))
1007 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1008 self._log.error("Updating already configured VNFR {}".
1009 format(self.name))
1010 return
1011
1012 if self._config_status != status:
1013 try:
1014 self._config_status = status
1015 # I don't think this is used. Original implementor can check.
1016 # Caused Exception, so corrected it by status_to_string
1017 # But not sure whats the use of this variable?
1018 self.vnfr_msg.config_status = status_to_string(status)
1019 except Exception as e:
1020 self._log.error("Exception=%s", str(e))
1021 pass
1022
1023 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1024
1025 if self._config_status != NsrYang.ConfigStates.INIT:
1026 try:
1027 # Publish only after VNFM has the VNFR created
1028 yield from self.update_vnfm()
1029 except Exception as e:
1030 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1031 format(status, self.name, e))
1032 self._log.exception(e)
1033
1034 def is_configured(self):
1035 if self.config_type == 'none':
1036 return True
1037
1038 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1039 return True
1040
1041 return False
1042
1043 @asyncio.coroutine
1044 def instantiate(self, nsr):
1045 """ Instantiate this VNFR"""
1046
1047 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1048 self.xpath, self._vnfd)
1049
1050 self._log.debug("Create VNF with xpath %s and vnfr %s",
1051 self.xpath, self.vnfr_msg)
1052
1053 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1054
1055 def find_vlr_for_cp(conn):
1056 """ Find VLR for the given connection point """
1057 for vlr in nsr.vlrs:
1058 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1059 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1060 vnfd_cp.vnfd_connection_point_ref == conn.name and
1061 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1062 vlr.cloud_account_name == self.cloud_account_name):
1063 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1064 conn.name, self.member_vnf_index)
1065 return vlr
1066 return None
1067
1068 # For every connection point in the VNFD fill in the identifier
1069 for conn_p in self._vnfd.connection_point:
1070 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1071 cpr.name = conn_p.name
1072 cpr.type_yang = conn_p.type_yang
1073 vlr_ref = find_vlr_for_cp(conn_p)
1074 if vlr_ref is None:
1075 msg = "Failed to find VLR for cp = %s" % conn_p.name
1076 self._log.debug("%s", msg)
1077 # raise VirtualNetworkFunctionRecordError(msg)
1078 continue
1079
1080 cpr.vlr_ref = vlr_ref.id
1081 self.vnfr_msg.connection_point.append(cpr)
1082 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1083 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref)
1084
1085 if not self.restart_mode:
1086 yield from self._dts.query_create(self.xpath,
1087 0, # this is sub
1088 self.vnfr_msg)
1089 else:
1090 yield from self._dts.query_update(self.xpath,
1091 0,
1092 self.vnfr_msg)
1093
1094 self._log.info("Created VNF with xpath %s and vnfr %s",
1095 self.xpath, self.vnfr_msg)
1096
1097 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1098 self.xpath, self._vnfd, self.vnfr_msg)
1099
1100 @asyncio.coroutine
1101 def update_state(self, vnfr_msg):
1102 """ Update this VNFR"""
1103 if vnfr_msg.operational_status == "running":
1104 if self.vnfr_msg.operational_status != "running":
1105 yield from self.is_active()
1106 elif vnfr_msg.operational_status == "failed":
1107 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1108
1109 @asyncio.coroutine
1110 def is_active(self):
1111 """ This VNFR is active """
1112 self._log.debug("VNFR %s is active", self._vnfr_id)
1113 self.set_state(VnfRecordState.ACTIVE)
1114
1115 @asyncio.coroutine
1116 def instantiation_failed(self, failed_reason=None):
1117 """ This VNFR instantiation failed"""
1118 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1119 self.set_state(VnfRecordState.FAILED)
1120 self._state_failed_reason = failed_reason
1121
1122 def vnfr_in_vnfm(self):
1123 """ Is there a VNFR record in VNFM """
1124 if (self._state == VnfRecordState.ACTIVE or
1125 self._state == VnfRecordState.INSTANTIATION_PENDING or
1126 self._state == VnfRecordState.FAILED):
1127 return True
1128
1129 return False
1130
1131 @asyncio.coroutine
1132 def terminate(self):
1133 """ Terminate this VNF """
1134 if not self.vnfr_in_vnfm():
1135 self._log.debug("Ignoring terminate request for id %s in state %s",
1136 self.id, self._state)
1137 return
1138
1139 self._log.debug("Terminating VNF id:%s", self.id)
1140 self.set_state(VnfRecordState.TERMINATE_PENDING)
1141 with self._dts.transaction(flags=0) as xact:
1142 block = xact.block_create()
1143 block.add_query_delete(self.xpath)
1144 yield from block.execute(flags=0)
1145 self.set_state(VnfRecordState.TERMINATED)
1146 self._log.debug("Terminated VNF id:%s", self.id)
1147
1148
1149 class NetworkServiceStatus(object):
1150 """ A class representing the Network service's status """
1151 MAX_EVENTS_RECORDED = 10
1152 """ Network service Status class"""
1153 def __init__(self, dts, log, loop):
1154 self._dts = dts
1155 self._log = log
1156 self._loop = loop
1157
1158 self._state = NetworkServiceRecordState.INIT
1159 self._events = deque([])
1160
1161 @asyncio.coroutine
1162 def create_notification(self, evt, evt_desc, evt_details):
1163 xp = "N,/rw-nsr:nsm-notification"
1164 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1165 notif.event = evt
1166 notif.description = evt_desc
1167 notif.details = evt_details if evt_details is not None else None
1168
1169 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1170 self._log.info("Notification called by creating dts query: %s", notif)
1171
1172 def record_event(self, evt, evt_desc, evt_details):
1173 """ Record an event """
1174 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1175 evt, evt_desc, len(self._events))
1176 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1177 self._events.popleft()
1178 self._events.append((int(time.time()), evt, evt_desc,
1179 evt_details if evt_details is not None else None))
1180
1181 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1182
1183 def set_state(self, state):
1184 """ set the state of this status object """
1185 self._state = state
1186
1187 def yang_str(self):
1188 """ Return the state as a yang enum string """
1189 state_to_str_map = {"INIT": "init",
1190 "VL_INIT_PHASE": "vl_init_phase",
1191 "VNF_INIT_PHASE": "vnf_init_phase",
1192 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1193 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1194 "RUNNING": "running",
1195 "SCALING_OUT": "scaling_out",
1196 "SCALING_IN": "scaling_in",
1197 "TERMINATE_RCVD": "terminate_rcvd",
1198 "TERMINATE": "terminate",
1199 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1200 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1201 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1202 "TERMINATED": "terminated",
1203 "FAILED": "failed",
1204 "VL_INSTANTIATE": "vl_instantiate",
1205 "VL_TERMINATE": "vl_terminate",
1206 }
1207 return state_to_str_map[self._state.name]
1208
1209 @property
1210 def state(self):
1211 """ State of this status object """
1212 return self._state
1213
1214 @property
1215 def msg(self):
1216 """ Network Service Record as a message"""
1217 event_list = []
1218 idx = 1
1219 for entry in self._events:
1220 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1221 event.id = idx
1222 idx += 1
1223 event.timestamp, event.event, event.description, event.details = entry
1224 event_list.append(event)
1225 return event_list
1226
1227
1228 class NetworkServiceRecord(object):
1229 """ Network service record """
1230 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1231
1232 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, restart_mode=False):
1233 self._dts = dts
1234 self._log = log
1235 self._loop = loop
1236 self._nsm = nsm
1237 self._nsr_cfg_msg = nsr_cfg_msg
1238 self._nsm_plugin = nsm_plugin
1239 self._sdn_account_name = sdn_account_name
1240
1241 self._nsd = None
1242 self._nsr_msg = None
1243 self._nsr_regh = None
1244 self._vlrs = []
1245 self._vnfrs = {}
1246 self._vnfds = {}
1247 self._vnffgrs = {}
1248 self._param_pools = {}
1249 self._scaling_groups = {}
1250 self._create_time = int(time.time())
1251 self._op_status = NetworkServiceStatus(dts, log, loop)
1252 self._config_status = NsrYang.ConfigStates.CONFIGURING
1253 self._config_status_details = None
1254 self._job_id = 0
1255 self.restart_mode = restart_mode
1256 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1257 self._debug_running = False
1258 self._is_active = False
1259 self._vl_phase_completed = False
1260 self._vnf_phase_completed = False
1261
1262
1263 # Initalise the state to init
1264 # The NSR moves through the following transitions
1265 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1266 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1267 # 3. VNFS_READY - READY when the NSR is published
1268
1269 self.set_state(NetworkServiceRecordState.INIT)
1270
1271 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1272
1273 @property
1274 def nsm_plugin(self):
1275 """ NSM Plugin """
1276 return self._nsm_plugin
1277
1278 def set_state(self, state):
1279 """ Set state for this NSR"""
1280 self._log.debug("Setting state to %s", state)
1281 # We are in init phase and is moving to the next state
1282 # The new state could be a FAILED state or VNF_INIIT_PHASE
1283 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1284 self._vl_phase_completed = True
1285
1286 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1287 self._vnf_phase_completed = True
1288
1289 self._op_status.set_state(state)
1290
1291 @property
1292 def id(self):
1293 """ Get id for this NSR"""
1294 return self._nsr_cfg_msg.id
1295
1296 @property
1297 def name(self):
1298 """ Name of this network service record """
1299 return self._nsr_cfg_msg.name
1300
1301 @property
1302 def cloud_account_name(self):
1303 return self._nsr_cfg_msg.cloud_account
1304
1305 @property
1306 def om_datacenter_name(self):
1307 if self._nsr_cfg_msg.has_field('om_datacenter'):
1308 return self._nsr_cfg_msg.om_datacenter
1309 return None
1310
1311 @property
1312 def state(self):
1313 """State of this NetworkServiceRecord"""
1314 return self._op_status.state
1315
1316 @property
1317 def active(self):
1318 """ Is this NSR active ?"""
1319 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1320
1321 @property
1322 def vlrs(self):
1323 """ VLRs associated with this NSR"""
1324 return self._vlrs
1325
1326 @property
1327 def vnfrs(self):
1328 """ VNFRs associated with this NSR"""
1329 return self._vnfrs
1330
1331 @property
1332 def vnffgrs(self):
1333 """ VNFFGRs associated with this NSR"""
1334 return self._vnffgrs
1335
1336 @property
1337 def scaling_groups(self):
1338 """ Scaling groups associated with this NSR """
1339 return self._scaling_groups
1340
1341 @property
1342 def param_pools(self):
1343 """ Parameter value pools associated with this NSR"""
1344 return self._param_pools
1345
1346 @property
1347 def nsr_cfg_msg(self):
1348 return self._nsr_cfg_msg
1349
1350 @nsr_cfg_msg.setter
1351 def nsr_cfg_msg(self, msg):
1352 self._nsr_cfg_msg = msg
1353
1354 @property
1355 def nsd_msg(self):
1356 """ NSD Protobuf for this NSR """
1357 if self._nsd is not None:
1358 return self._nsd
1359 self._nsd = self._nsr_cfg_msg.nsd
1360 return self._nsd
1361
1362 @property
1363 def nsd_id(self):
1364 """ NSD ID for this NSR """
1365 return self.nsd_msg.id
1366
1367 @property
1368 def job_id(self):
1369 ''' Get a new job id for config primitive'''
1370 self._job_id += 1
1371 return self._job_id
1372
1373 @property
1374 def config_status(self):
1375 """ Config status for NSR """
1376 return self._config_status
1377
1378 def resolve_placement_group_cloud_construct(self, input_group):
1379 """
1380 Returns the cloud specific construct for placement group
1381 """
1382 copy_dict = ['name', 'requirement', 'strategy']
1383
1384 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1385 if group_info.placement_group_ref == input_group.name:
1386 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1387 group_dict = {k:v for k,v in
1388 group_info.as_dict().items() if k != 'placement_group_ref'}
1389 for param in copy_dict:
1390 group_dict.update({param: getattr(input_group, param)})
1391 group.from_dict(group_dict)
1392 return group
1393 return None
1394
1395
1396 def __str__(self):
1397 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1398 self.name, self.nsd_id, self.cloud_account_name
1399 )
1400
1401 def _get_vnfd(self, vnfd_id, config_xact):
1402 """ Fetch vnfd msg for the passed vnfd id """
1403 return self._nsm.get_vnfd(vnfd_id, config_xact)
1404
1405 def _get_vnfd_cloud_account(self, vnfd_member_index):
1406 """ Fetch Cloud Account for the passed vnfd id """
1407 if self._nsr_cfg_msg.vnf_cloud_account_map:
1408 vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1409 if vnfd_member_index == vnf.member_vnf_index_ref]
1410 if vim_accounts and vim_accounts[0]:
1411 return vim_accounts[0]
1412 return (self.cloud_account_name,self.om_datacenter_name)
1413
1414 def _get_constituent_vnfd_msg(self, vnf_index):
1415 for const_vnfd in self.nsd_msg.constituent_vnfd:
1416 if const_vnfd.member_vnf_index == vnf_index:
1417 return const_vnfd
1418
1419 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1420
1421 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1422 """ Record an event """
1423 self._op_status.record_event(evt, evt_desc, evt_details)
1424 if state is not None:
1425 self.set_state(state)
1426
1427 def scaling_trigger_str(self, trigger):
1428 SCALING_TRIGGER_STRS = {
1429 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1430 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1431 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1432 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1433 }
1434 try:
1435 return SCALING_TRIGGER_STRS[trigger]
1436 except Exception as e:
1437 self._log.error("Scaling trigger mapping error for {} : {}".
1438 format(trigger, e))
1439 self._log.exception(e)
1440 return "Unknown trigger"
1441
1442 @asyncio.coroutine
1443 def instantiate_vls(self):
1444 """
1445 This function instantiates VLs for every VL in this Network Service
1446 """
1447 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1448 self.id)
1449 for vlr in self._vlrs:
1450 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1451 vlr.state = VlRecordState.ACTIVE
1452
1453 @asyncio.coroutine
1454 def create(self, config_xact):
1455 """ Create this network service"""
1456 # Create virtual links for all the external vnf
1457 # connection points in this NS
1458 yield from self.create_vls()
1459
1460 # Create VNFs in this network service
1461 yield from self.create_vnfs(config_xact)
1462
1463 # Create VNFFG for network service
1464 self.create_vnffgs()
1465
1466 # Create Scaling Groups for each scaling group in NSD
1467 self.create_scaling_groups()
1468
1469 # Create Parameter Pools
1470 self.create_param_pools()
1471
1472 @asyncio.coroutine
1473 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1474 """ Apply config based on script for scale group """
1475
1476 @asyncio.coroutine
1477 def add_vnfrs_data(vnfrs_list):
1478 """ Add as a dict each of the VNFRs data """
1479 vnfrs_data = []
1480 for vnfr in vnfrs_list:
1481 self._log.debug("Add VNFR {} data".format(vnfr))
1482 vnfr_data = dict()
1483 vnfr_data['name'] = vnfr.name
1484 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1485 # Get VNF management and other IPs, etc
1486 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1487 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1488 try:
1489 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1490 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1491 except Exception as e:
1492 self._log.error("Unable to get management IP for vnfr {}:{}".
1493 format(vnfr.name, e))
1494
1495 try:
1496 vnfr_data['connection_points'] = []
1497 for cp in opdata.connection_point:
1498 con_pt = dict()
1499 con_pt['name'] = cp.name
1500 con_pt['ip_address'] = cp.ip_address
1501 vnfr_data['connection_points'].append(con_pt)
1502 except Exception as e:
1503 self._log.error("Exception getting connections points for VNFR {}: {}".
1504 format(vnfr.name, e))
1505
1506 vnfrs_data.append(vnfr_data)
1507 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1508
1509 return vnfrs_data
1510
1511 def add_nsr_data(nsr):
1512 nsr_data = dict()
1513 nsr_data['name'] = nsr.name
1514 return nsr_data
1515
1516 if script is None or len(script) == 0:
1517 self._log.error("Script not provided for scale group config: {}".format(group.name))
1518 return False
1519
1520 if script[0] == '/':
1521 path = script
1522 else:
1523 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1524 if not os.path.exists(path):
1525 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1526 format(group.name, path))
1527 return False
1528
1529 # Build a YAML file with all parameters for the script to execute
1530 # The data consists of 5 sections
1531 # 1. Trigger
1532 # 2. Scale group config
1533 # 3. VNFRs in the scale group
1534 # 4. VNFRs outside scale group
1535 # 5. NSR data
1536 data = dict()
1537 data['trigger'] = group.trigger_map(trigger)
1538 data['config'] = group.group_msg.as_dict()
1539
1540 if vnfrs:
1541 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1542 else:
1543 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1544
1545 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1546 data["nsr"] = add_nsr_data(self)
1547
1548 tmp_file = None
1549 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1550 tmp_file.write(yaml.dump(data, default_flow_style=True)
1551 .encode("UTF-8"))
1552
1553 self._log.debug("Creating a temp file: {} with input data: {}".
1554 format(tmp_file.name, data))
1555
1556 cmd = "{} {}".format(path, tmp_file.name)
1557 self._log.debug("Running the CMD: {}".format(cmd))
1558 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1559 rc = yield from proc.wait()
1560 if rc:
1561 self._log.error("The script {} for scale group {} config returned: {}".
1562 format(script, group.name, rc))
1563 return False
1564
1565 # Success
1566 return True
1567
1568
1569 @asyncio.coroutine
1570 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1571 """ Apply the config for the scaling group based on trigger """
1572 if group is None or scale_instance is None:
1573 return False
1574
1575 @asyncio.coroutine
1576 def update_config_status(success=True, err_msg=None):
1577 self._log.debug("Update %s config status to %r : %s",
1578 scale_instance, success, err_msg)
1579 if (scale_instance.config_status == "failed"):
1580 # Do not update the config status if it is already in failed state
1581 return
1582
1583 if scale_instance.config_status == "configured":
1584 # Update only to failed state an already configured scale instance
1585 if not success:
1586 scale_instance.config_status = "failed"
1587 scale_instance.config_err_msg = err_msg
1588 yield from self.update_state()
1589 else:
1590 # We are in configuring state
1591 # Only after post scale out mark instance as configured
1592 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1593 if success:
1594 scale_instance.config_status = "configured"
1595 else:
1596 scale_instance.config_status = "failed"
1597 scale_instance.config_err_msg = err_msg
1598 yield from self.update_state()
1599
1600 config = group.trigger_config(trigger)
1601 if config is None:
1602 return True
1603
1604 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1605 if config.has_field("ns_config_primitive_name_ref"):
1606 config_name = config.ns_config_primitive_name_ref
1607 nsd_msg = self.nsd_msg
1608 config_primitive = None
1609 for ns_cfg_prim in nsd_msg.service_primitive:
1610 if ns_cfg_prim.name == config_name:
1611 config_primitive = ns_cfg_prim
1612 break
1613
1614 if config_primitive is None:
1615 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1616
1617 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1618 if config_primitive.has_field("user_defined_script"):
1619 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1620 group, scale_instance, trigger, vnfrs)
1621 err_msg = None
1622 if not rc:
1623 err_msg = "Failed config for trigger {} using config script '{}'". \
1624 format(self.scaling_trigger_str(trigger),
1625 config_primitive.user_defined_script)
1626 yield from update_config_status(success=rc, err_msg=err_msg)
1627 return rc
1628 else:
1629 err_msg = "Failed config for trigger {} as config script is not specified". \
1630 format(self.scaling_trigger_str(trigger))
1631 yield from update_config_status(success=False, err_msg=err_msg)
1632 raise NotImplementedError("Only script based config support for scale group for now: {}".
1633 format(group.name))
1634 else:
1635 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1636 format(self.scaling_trigger_str(trigger))
1637 yield from update_config_status(success=False, err_msg=err_msg)
1638 self._log.error("Config primitive not specified for config action in scale group %s" %
1639 (group.name))
1640 return False
1641
1642 def create_scaling_groups(self):
1643 """ This function creates a NSScalingGroup for every scaling
1644 group defined in he NSD"""
1645
1646 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1647 self._log.debug("Found scaling_group %s in nsr id %s",
1648 scaling_group_msg.name, self.id)
1649
1650 group_record = scale_group.ScalingGroup(
1651 self._log,
1652 scaling_group_msg
1653 )
1654
1655 self._scaling_groups[group_record.name] = group_record
1656
1657 @asyncio.coroutine
1658 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1659 group = self._scaling_groups[group_name]
1660 scale_instance = group.create_instance(index, is_default)
1661
1662 @asyncio.coroutine
1663 def create_vnfs():
1664 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1665 len(self.nsd_msg.constituent_vnfd), self.id, self)
1666
1667 vnfrs = []
1668 for vnf_index, count in group.vnf_index_count_map.items():
1669 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1670 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1671
1672 cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1673 if cloud_account_name is None:
1674 cloud_account_name = self.cloud_account_name
1675 for _ in range(count):
1676 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index)
1677 scale_instance.add_vnfr(vnfr)
1678 vnfrs.append(vnfr)
1679
1680 return vnfrs
1681
1682 @asyncio.coroutine
1683 def instantiate_instance():
1684 self._log.debug("Creating %s VNFRS", scale_instance)
1685 vnfrs = yield from create_vnfs()
1686 yield from self.publish()
1687
1688 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1689 scale_instance.operational_status = "vnf_init_phase"
1690 yield from self.update_state()
1691
1692 try:
1693 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1694 group, scale_instance, vnfrs)
1695 if not rc:
1696 self._log.error("Pre scale out config for scale group {} ({}) failed".
1697 format(group.name, index))
1698 scale_instance.operational_status = "failed"
1699 else:
1700 yield from self.instantiate_vnfs(vnfrs)
1701
1702 except Exception as e:
1703 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1704 format(group.name, e))
1705 self._log.exception(e)
1706 scale_instance.operational_status = "failed"
1707
1708 yield from self.update_state()
1709
1710 yield from instantiate_instance()
1711
1712 @asyncio.coroutine
1713 def delete_scale_group_instance(self, group_name, index):
1714 group = self._scaling_groups[group_name]
1715 scale_instance = group.get_instance(index)
1716 if scale_instance.is_default:
1717 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1718
1719 scale_instance.operational_status = "terminate"
1720 yield from self.update_state()
1721
1722 @asyncio.coroutine
1723 def terminate_instance():
1724 self._log.debug("Terminating %s VNFRS" % scale_instance)
1725 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1726 group, scale_instance)
1727 if not rc:
1728 self._log.error("Pre scale in config for scale group {} ({}) failed".
1729 format(group.name, index))
1730
1731 # Going ahead with terminate, even if there is an error in pre-scale-in config
1732 # as this could be result of scale out failure and we need to cleanup this group
1733 yield from self.terminate_vnfrs(scale_instance.vnfrs)
1734 group.delete_instance(index)
1735
1736 scale_instance.operational_status = "vnf_terminate_phase"
1737 yield from self.update_state()
1738
1739 yield from terminate_instance()
1740
1741 @asyncio.coroutine
1742 def _update_scale_group_instances_status(self):
1743 @asyncio.coroutine
1744 def post_scale_out_task(group, instance):
1745 # Apply post scale out config once all VNFRs are active
1746 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1747 group, instance)
1748 instance.operational_status = "running"
1749 if rc:
1750 self._log.debug("Scale out for group {} and instance {} succeeded".
1751 format(group.name, instance.instance_id))
1752 else:
1753 self._log.error("Post scale out config for scale group {} ({}) failed".
1754 format(group.name, instance.instance_id))
1755
1756 yield from self.update_state()
1757
1758 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1759 for group, instances in group_instances.items():
1760 self._log.debug("Updating %s instance status", group)
1761 for instance in instances:
1762 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1763 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1764 if instance.operational_status == "vnf_init_phase":
1765 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1766 instance.operational_status = "running"
1767
1768 # Create a task for post scale out to allow us to sleep before attempting
1769 # to configure newly created VM's
1770 self._loop.create_task(post_scale_out_task(group, instance))
1771
1772 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1773 self._log.debug("Scale out for group {} and instance {} failed".
1774 format(group.name, instance.instance_id))
1775 instance.operational_status = "failed"
1776
1777 elif instance.operational_status == "vnf_terminate_phase":
1778 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1779 instance.operational_status = "terminated"
1780 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1781 group, instance)
1782 if rc:
1783 self._log.debug("Scale in for group {} and instance {} succeeded".
1784 format(group.name, instance.instance_id))
1785 else:
1786 self._log.error("Post scale in config for scale group {} ({}) failed".
1787 format(group.name, instance.instance_id))
1788
1789 def create_vnffgs(self):
1790 """ This function creates VNFFGs for every VNFFG in the NSD
1791 associated with this NSR"""
1792
1793 for vnffgd in self.nsd_msg.vnffgd:
1794 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1795 vnffgr = VnffgRecord(self._dts,
1796 self._log,
1797 self._loop,
1798 self._nsm._vnffgmgr,
1799 self,
1800 self.name,
1801 vnffgd,
1802 self._sdn_account_name
1803 )
1804 self._vnffgrs[vnffgr.id] = vnffgr
1805
1806 def resolve_vld_ip_profile(self, nsd_msg, vld):
1807 if not vld.has_field('ip_profile_ref'):
1808 return None
1809 profile = [ profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref ]
1810 return profile[0] if profile else None
1811
1812 @asyncio.coroutine
1813 def _create_vls(self, vld, cloud_account,om_datacenter):
1814 """Create a VLR in the cloud account specified using the given VLD
1815
1816 Args:
1817 vld : VLD yang obj
1818 cloud_account : Cloud account name
1819
1820 Returns:
1821 VirtualLinkRecord
1822 """
1823 vlr = yield from VirtualLinkRecord.create_record(
1824 self._dts,
1825 self._log,
1826 self._loop,
1827 self.name,
1828 vld,
1829 cloud_account,
1830 om_datacenter,
1831 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1832 self.id,
1833 restart_mode=self.restart_mode)
1834
1835 return vlr
1836
1837 def _extract_cloud_accounts_for_vl(self, vld):
1838 """
1839 Extracts the list of cloud accounts from the NS Config obj
1840
1841 Rules:
1842 1. Cloud accounts based connection point (vnf_cloud_account_map)
1843 Args:
1844 vld : VLD yang object
1845
1846 Returns:
1847 TYPE: Description
1848 """
1849 cloud_account_list = []
1850
1851 if self._nsr_cfg_msg.vnf_cloud_account_map:
1852 # Handle case where cloud_account is None
1853 vnf_cloud_map = {}
1854 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1855 if vnf.cloud_account is not None or vnf.om_datacenter is not None:
1856 vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter)
1857
1858 for vnfc in vld.vnfd_connection_point_ref:
1859 cloud_account = vnf_cloud_map.get(
1860 vnfc.member_vnf_index_ref,
1861 (self.cloud_account_name,self.om_datacenter_name))
1862
1863 cloud_account_list.append(cloud_account)
1864
1865 if self._nsr_cfg_msg.vl_cloud_account_map:
1866 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1867 if vld_map.vld_id_ref == vld.id:
1868 for cloud_account in vld_map.cloud_accounts:
1869 cloud_account_list.extend((cloud_account,None))
1870 for om_datacenter in vld_map.om_datacenters:
1871 cloud_account_list.extend((None,om_datacenter))
1872
1873 # If no config has been provided then fall-back to the default
1874 # account
1875 if not cloud_account_list:
1876 cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)]
1877
1878 self._log.debug("VL {} cloud accounts: {}".
1879 format(vld.name, cloud_account_list))
1880 return set(cloud_account_list)
1881
1882 @asyncio.coroutine
1883 def create_vls(self):
1884 """ This function creates VLs for every VLD in the NSD
1885 associated with this NSR"""
1886 for vld in self.nsd_msg.vld:
1887 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1888 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1889 for cloud_account,om_datacenter in cloud_account_list:
1890 vlr = yield from self._create_vls(vld, cloud_account,om_datacenter)
1891 self._vlrs.append(vlr)
1892
1893
1894 @asyncio.coroutine
1895 def create_vl_instance(self, vld):
1896 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1897 # Check if the VL is already present
1898 vlr = None
1899 for vl in self._vlrs:
1900 if vl.vld_msg.id == vld.id:
1901 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1902 vld.id, self.id, vl.id, vl.state)
1903 vlr = vl
1904 if vlr.state != VlRecordState.TERMINATED:
1905 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1906 vld, self.id
1907 self._log.error(err_msg)
1908 raise NsrVlUpdateError(err_msg)
1909 break
1910
1911 if vlr is None:
1912 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1913 for account in cloud_account_list:
1914 vlr = yield from self._create_vls(vld, account)
1915 self._vlrs.append(vlr)
1916
1917 vlr.state = VlRecordState.INSTANTIATION_PENDING
1918 yield from self.update_state()
1919
1920 try:
1921 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1922 vlr.state = VlRecordState.ACTIVE
1923
1924 except Exception as e:
1925 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1926 format(self.id, vld.id, e)
1927 self._log.error(err_msg)
1928 self._log.exception(e)
1929 vlr.state = VlRecordState.FAILED
1930
1931 yield from self.update_state()
1932
1933 @asyncio.coroutine
1934 def delete_vl_instance(self, vld):
1935 for vlr in self._vlrs:
1936 if vlr.vld_msg.id == vld.id:
1937 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1938 vlr.id, vld.id, self.id)
1939 vlr.state = VlRecordState.TERMINATE_PENDING
1940 yield from self.update_state()
1941
1942 try:
1943 yield from self.nsm_plugin.terminate_vl(vlr)
1944 vlr.state = VlRecordState.TERMINATED
1945 self._vlrs.remove(vlr)
1946
1947 except Exception as e:
1948 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1949 format(self.id, vld.id, e)
1950 self._log.error(err_msg)
1951 self._log.exception(e)
1952 vlr.state = VlRecordState.FAILED
1953
1954 yield from self.update_state()
1955 break
1956
1957 @asyncio.coroutine
1958 def create_vnfs(self, config_xact):
1959 """
1960 This function creates VNFs for every VNF in the NSD
1961 associated with this NSR
1962 """
1963 self._log.debug("Creating %u VNFs associated with this NS id %s",
1964 len(self.nsd_msg.constituent_vnfd), self.id)
1965
1966 for const_vnfd in self.nsd_msg.constituent_vnfd:
1967 if not const_vnfd.start_by_default:
1968 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1969 const_vnfd.member_vnf_index)
1970 continue
1971
1972 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1973 cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1974 if cloud_account_name is None:
1975 cloud_account_name = self.cloud_account_name
1976 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name)
1977
1978
1979 def get_placement_groups(self, vnfd_msg, const_vnfd):
1980 placement_groups = []
1981 for group in self.nsd_msg.placement_groups:
1982 for member_vnfd in group.member_vnfd:
1983 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
1984 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
1985 group_info = self.resolve_placement_group_cloud_construct(group)
1986 if group_info is None:
1987 self._log.error("Could not resolve cloud-construct for placement group: %s", group.name)
1988 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1989 else:
1990 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1991 str(group_info),
1992 vnfd_msg.name,
1993 const_vnfd.member_vnf_index)
1994 placement_groups.append(group_info)
1995 return placement_groups
1996
1997 @asyncio.coroutine
1998 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None):
1999 # Fetch the VNFD associated with this VNF
2000 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2001 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
2002 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2003 vnfd_msg.name,
2004 const_vnfd.member_vnf_index,
2005 [ group.name for group in placement_groups])
2006 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2007 self._log,
2008 self._loop,
2009 vnfd_msg,
2010 const_vnfd,
2011 self.nsd_id,
2012 self.name,
2013 cloud_account_name,
2014 om_datacenter_name,
2015 self.id,
2016 group_name,
2017 group_instance_id,
2018 placement_groups,
2019 restart_mode=self.restart_mode,
2020 )
2021 if vnfr.id in self._vnfrs:
2022 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2023 raise NetworkServiceRecordError(err)
2024
2025 self._vnfrs[vnfr.id] = vnfr
2026 self._nsm.vnfrs[vnfr.id] = vnfr
2027
2028 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2029
2030 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2031 vnfr.name,
2032 vnfr.id)
2033
2034 return vnfr
2035
2036 def create_param_pools(self):
2037 for param_pool in self.nsd_msg.parameter_pool:
2038 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2039
2040 start_value = param_pool.range.start_value
2041 end_value = param_pool.range.end_value
2042 if end_value < start_value:
2043 raise NetworkServiceRecordError(
2044 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2045 start_value, end_value
2046 )
2047 )
2048
2049 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2050 self._log,
2051 param_pool.name,
2052 range(start_value, end_value)
2053 )
2054
2055 @asyncio.coroutine
2056 def fetch_vnfr(self, vnfr_path):
2057 """ Fetch VNFR record """
2058 vnfr = None
2059 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2060 vnfr_path, self.id)
2061 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2062
2063 for ent in res_iter:
2064 res = yield from ent
2065 vnfr = res.result
2066
2067 return vnfr
2068
2069 @asyncio.coroutine
2070 def instantiate_vnfs(self, vnfrs):
2071 """
2072 This function instantiates VNFs for every VNF in this Network Service
2073 """
2074 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2075 for vnf in vnfrs:
2076 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2077 yield from self.nsm_plugin.instantiate_vnf(self, vnf)
2078
2079 @asyncio.coroutine
2080 def instantiate_vnffgs(self):
2081 """
2082 This function instantiates VNFFGs for every VNFFG in this Network Service
2083 """
2084 self._log.debug("Instantiating %u VNFFGs in NS %s",
2085 len(self.nsd_msg.vnffgd), self.id)
2086 for _, vnfr in self.vnfrs.items():
2087 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2088 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2089 yield from asyncio.sleep(2, loop=self._loop)
2090 if vnfr.state == VnfRecordState.ACTIVE:
2091 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2092 continue
2093 else:
2094 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2095 self._vnffgr_state = VnffgRecordState.FAILED
2096 return
2097
2098 self._log.info("Waiting for 90 seconds for VMs to come up")
2099 yield from asyncio.sleep(90, loop=self._loop)
2100 self._log.info("Starting VNFFG orchestration")
2101 for vnffg in self._vnffgrs.values():
2102 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2103 yield from vnffg.instantiate()
2104
2105 @asyncio.coroutine
2106 def instantiate_scaling_instances(self, config_xact):
2107 """ Instantiate any default scaling instances in this Network Service """
2108 for group in self._scaling_groups.values():
2109 for i in range(group.min_instance_count):
2110 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2111 yield from self.create_scale_group_instance(
2112 group.name, i, config_xact, is_default=True
2113 )
2114
2115 for group_msg in self._nsr_cfg_msg.scaling_group:
2116 if group_msg.scaling_group_name_ref != group.name:
2117 continue
2118
2119 for instance in group_msg.instance:
2120 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2121 yield from self.create_scale_group_instance(
2122 group.name, instance.id, config_xact, is_default=False
2123 )
2124
2125 def has_scaling_instances(self):
2126 """ Return boolean indicating if the network service has default scaling groups """
2127 for group in self._scaling_groups.values():
2128 if group.min_instance_count > 0:
2129 return True
2130
2131 for group_msg in self._nsr_cfg_msg.scaling_group:
2132 if len(group_msg.instance) > 0:
2133 return True
2134
2135 return False
2136
2137 @asyncio.coroutine
2138 def publish(self):
2139 """ This function publishes this NSR """
2140 self._nsr_msg = self.create_msg()
2141
2142 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2143 self.nsr_xpath,
2144 self._nsr_msg)
2145
2146 if self._debug_running:
2147 self._log.debug("Publishing NSR in RUNNING state!")
2148 #raise()
2149
2150 with self._dts.transaction() as xact:
2151 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2152 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2153 self._debug_running = True
2154
2155 @asyncio.coroutine
2156 def unpublish(self, xact):
2157 """ Unpublish this NSR object """
2158 self._log.debug("Unpublishing Network service id %s", self.id)
2159 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2160
2161 @property
2162 def nsr_xpath(self):
2163 """ Returns the xpath associated with this NSR """
2164 return(
2165 "D,/nsr:ns-instance-opdata" +
2166 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2167 ).format(self.id)
2168
2169 @staticmethod
2170 def xpath_from_nsr(nsr):
2171 """ Returns the xpath associated with this NSR op data"""
2172 return (NetworkServiceRecord.XPATH +
2173 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2174
2175 @property
2176 def nsd_xpath(self):
2177 """ Return NSD config xpath."""
2178 return(
2179 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2180 ).format(self.nsd_id)
2181
2182 @asyncio.coroutine
2183 def instantiate(self, config_xact):
2184 """"Instantiates a NetworkServiceRecord.
2185
2186 This function instantiates a Network service
2187 which involves the following steps,
2188
2189 * Instantiate every VL in NSD by sending create VLR request to DTS.
2190 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2191 * Publish the NSR details to DTS
2192
2193 Arguments:
2194 nsr: The NSR configuration request containing nsr-id and nsd
2195 config_xact: The configuration transaction which initiated the instatiation
2196
2197 Raises:
2198 NetworkServiceRecordError if the NSR creation fails
2199
2200 Returns:
2201 No return value
2202 """
2203
2204 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2205
2206 # Move the state to INIITALIZING
2207 self.set_state(NetworkServiceRecordState.INIT)
2208
2209 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2210 self.record_event("instantiating", event_descr)
2211
2212 # Find the NSD
2213 self._nsd = self._nsr_cfg_msg.nsd
2214
2215 try:
2216 # Update ref count if nsd present in catalog
2217 self._nsm.get_nsd_ref(self.nsd_id)
2218
2219 except NetworkServiceDescriptorError:
2220 # This could be an NSD not in the nsd-catalog
2221 pass
2222
2223 # Merge any config and initial config primitive values
2224 self.config_store.merge_nsd_config(self.nsd_msg)
2225 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2226
2227 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2228 self.record_event("nsd-fetched", event_descr)
2229
2230 if self._nsd is None:
2231 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2232 self._log.debug(msg, self.nsd_id, self.id)
2233 raise NetworkServiceRecordError(self)
2234
2235 self._log.debug("Got nsd result %s", self._nsd)
2236
2237 # Substitute any input parameters
2238 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2239
2240 # Create the record
2241 yield from self.create(config_xact)
2242
2243 # Publish the NSR to DTS
2244 yield from self.publish()
2245
2246 @asyncio.coroutine
2247 def do_instantiate():
2248 """
2249 Instantiate network service
2250 """
2251 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2252 self.id, self.nsd_id)
2253
2254 # instantiate the VLs
2255 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2256 (len(self.nsd_msg.vld), self.id))
2257 self.record_event("begin-external-vls-instantiation", event_descr)
2258
2259 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2260
2261 yield from self.instantiate_vls()
2262
2263 # Publish the NSR to DTS
2264 yield from self.publish()
2265
2266 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2267 (len(self.nsd_msg.vld), self.id))
2268 self.record_event("end-external-vls-instantiation", event_descr)
2269
2270 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2271
2272 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2273 self.id, self.nsd_id)
2274
2275 # instantiate the VNFs
2276 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2277 (len(self.nsd_msg.constituent_vnfd), self.id))
2278
2279 self.record_event("begin-vnf-instantiation", event_descr)
2280
2281 yield from self.instantiate_vnfs(self._vnfrs.values())
2282
2283 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2284 len(self.nsd_msg.constituent_vnfd), self.id)
2285
2286 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2287 (len(self.nsd_msg.constituent_vnfd), self.id))
2288 self.record_event("end-vnf-instantiation", event_descr)
2289
2290 if len(self.vnffgrs) > 0:
2291 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2292 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2293 (len(self.nsd_msg.vnffgd), self.id))
2294
2295 self.record_event("begin-vnffg-instantiation", event_descr)
2296
2297 yield from self.instantiate_vnffgs()
2298
2299 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2300 (len(self.nsd_msg.vnffgd), self.id))
2301 self.record_event("end-vnffg-instantiation", event_descr)
2302
2303 if self.has_scaling_instances():
2304 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2305 (len(self._scaling_groups), self.id))
2306
2307 self.record_event("begin-scaling-group-instantiation", event_descr)
2308 yield from self.instantiate_scaling_instances(config_xact)
2309 self.record_event("end-scaling-group-instantiation", event_descr)
2310
2311 # Give the plugin a chance to deploy the network service now that all
2312 # virtual links and vnfs are instantiated
2313 yield from self.nsm_plugin.deploy(self._nsr_msg)
2314
2315 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2316 self.id, self.nsd_id)
2317
2318 # Publish the NSR to DTS
2319 yield from self.publish()
2320
2321 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2322 self.id, self.nsd_id)
2323
2324 def on_instantiate_done(fut):
2325 # If the do_instantiate fails, then publish NSR with failed result
2326 if fut.exception() is not None:
2327 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception()))
2328 self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception())))
2329
2330 instantiate_task = self._loop.create_task(do_instantiate())
2331 instantiate_task.add_done_callback(on_instantiate_done)
2332
2333 @asyncio.coroutine
2334 def set_config_status(self, status, status_details=None):
2335 if self.config_status != status:
2336 self._log.debug("Updating NSR {} status for {} to {}".
2337 format(self.name, self.config_status, status))
2338 self._config_status = status
2339 self._config_status_details = status_details
2340
2341 if self._config_status == NsrYang.ConfigStates.FAILED:
2342 self.record_event("config-failed", "NS configuration failed",
2343 evt_details=self._config_status_details)
2344
2345 yield from self.publish()
2346
2347 @asyncio.coroutine
2348 def is_active(self):
2349 """ This NS is active """
2350 self.set_state(NetworkServiceRecordState.RUNNING)
2351 if self._is_active:
2352 return
2353
2354 # Publish the NSR to DTS
2355 self._log.debug("Network service %s is active ", self.id)
2356 self._is_active = True
2357
2358 event_descr = "NSR in running state for NSR id %s" % self.id
2359 self.record_event("ns-running", event_descr)
2360
2361 yield from self.publish()
2362
2363 @asyncio.coroutine
2364 def instantiation_failed(self, failed_reason=None):
2365 """ The NS instantiation failed"""
2366 self._log.error("Network service id:%s, name:%s instantiation failed",
2367 self.id, self.name)
2368 self.set_state(NetworkServiceRecordState.FAILED)
2369
2370 event_descr = "Instantiation of NS %s failed" % self.id
2371 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2372
2373 # Publish the NSR to DTS
2374 yield from self.publish()
2375
2376 @asyncio.coroutine
2377 def terminate_vnfrs(self, vnfrs):
2378 """ Terminate VNFRS in this network service """
2379 self._log.debug("Terminating VNFs in network service %s", self.id)
2380 for vnfr in vnfrs:
2381 yield from self.nsm_plugin.terminate_vnf(vnfr)
2382
2383 @asyncio.coroutine
2384 def terminate(self):
2385 """ Terminate a NetworkServiceRecord."""
2386 def terminate_vnffgrs():
2387 """ Terminate VNFFGRS in this network service """
2388 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2389 for vnffgr in self.vnffgrs.values():
2390 yield from vnffgr.terminate()
2391
2392 def terminate_vlrs():
2393 """ Terminate VLRs in this netork service """
2394 self._log.debug("Terminating VLs in network service %s", self.id)
2395 for vlr in self.vlrs:
2396 yield from self.nsm_plugin.terminate_vl(vlr)
2397 vlr.state = VlRecordState.TERMINATED
2398
2399 self._log.debug("Terminating network service id %s", self.id)
2400
2401 # Move the state to TERMINATE
2402 self.set_state(NetworkServiceRecordState.TERMINATE)
2403 event_descr = "Terminate being processed for NS Id:%s" % self.id
2404 self.record_event("terminate", event_descr)
2405
2406 # Move the state to VNF_TERMINATE_PHASE
2407 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2408 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2409 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2410 self.record_event("terminating-vnffgss", event_descr)
2411 yield from terminate_vnffgrs()
2412
2413 # Move the state to VNF_TERMINATE_PHASE
2414 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2415 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2416 self.record_event("terminating-vnfs", event_descr)
2417 yield from self.terminate_vnfrs(self.vnfrs.values())
2418
2419 # Move the state to VL_TERMINATE_PHASE
2420 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2421 event_descr = "Terminating VLs in NS Id:%s" % self.id
2422 self.record_event("terminating-vls", event_descr)
2423 yield from terminate_vlrs()
2424
2425 yield from self.nsm_plugin.terminate_ns(self)
2426
2427 # Move the state to TERMINATED
2428 self.set_state(NetworkServiceRecordState.TERMINATED)
2429 event_descr = "Terminated NS Id:%s" % self.id
2430 self.record_event("terminated", event_descr)
2431
2432 def enable(self):
2433 """"Enable a NetworkServiceRecord."""
2434 pass
2435
2436 def disable(self):
2437 """"Disable a NetworkServiceRecord."""
2438 pass
2439
2440 def map_config_status(self):
2441 self._log.debug("Config status for ns {} is {}".
2442 format(self.name, self._config_status))
2443 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2444 return 'configuring'
2445 if self._config_status == NsrYang.ConfigStates.FAILED:
2446 return 'failed'
2447 return 'configured'
2448
2449 def vl_phase_completed(self):
2450 """ Are VLs created in this NS?"""
2451 return self._vl_phase_completed
2452
2453 def vnf_phase_completed(self):
2454 """ Are VLs created in this NS?"""
2455 return self._vnf_phase_completed
2456
2457 def create_msg(self):
2458 """ The network serice record as a message """
2459 nsr_dict = {"ns_instance_config_ref": self.id}
2460 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2461 #nsr.cloud_account = self.cloud_account_name
2462 nsr.sdn_account = self._sdn_account_name
2463 nsr.name_ref = self.name
2464 nsr.nsd_ref = self.nsd_id
2465 nsr.nsd_name_ref = self.nsd_msg.name
2466 nsr.operational_events = self._op_status.msg
2467 nsr.operational_status = self._op_status.yang_str()
2468 nsr.config_status = self.map_config_status()
2469 nsr.config_status_details = self._config_status_details
2470 nsr.create_time = self._create_time
2471
2472 for cfg_prim in self.nsd_msg.service_primitive:
2473 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2474 cfg_prim.as_dict())
2475 nsr.service_primitive.append(cfg_prim)
2476
2477 for init_cfg in self.nsd_msg.initial_config_primitive:
2478 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2479 init_cfg.as_dict())
2480 nsr.initial_config_primitive.append(prim)
2481
2482 if self.vl_phase_completed():
2483 for vlr in self.vlrs:
2484 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2485
2486 if self.vnf_phase_completed():
2487 for vnfr_id in self.vnfrs:
2488 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2489 for vnffgr in self.vnffgrs.values():
2490 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2491 for scaling_group in self._scaling_groups.values():
2492 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2493
2494 return nsr
2495
2496 def all_vnfs_active(self):
2497 """ Are all VNFS in this NS active? """
2498 for _, vnfr in self.vnfrs.items():
2499 if vnfr.active is not True:
2500 return False
2501 return True
2502
2503 @asyncio.coroutine
2504 def update_state(self):
2505 """ Re-evaluate this NS's state """
2506 curr_state = self._op_status.state
2507
2508 if curr_state == NetworkServiceRecordState.TERMINATED:
2509 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2510 return
2511
2512 new_state = NetworkServiceRecordState.RUNNING
2513 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2514 self.id, curr_state)
2515
2516 # Check all the VNFRs are present
2517 for _, vnfr in self.vnfrs.items():
2518 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2519 pass
2520 elif vnfr.state == VnfRecordState.FAILED:
2521 if vnfr._prev_state != vnfr.state:
2522 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2523 event_error_details = vnfr.state_failed_reason
2524 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2525 vnfr.set_state(VnfRecordState.FAILED)
2526 else:
2527 self._log.info("VNF state did not change, curr=%s, prev=%s",
2528 vnfr.state, vnfr._prev_state)
2529 new_state = NetworkServiceRecordState.FAILED
2530 break
2531 else:
2532 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2533 vnfr.id, self.id, vnfr.state)
2534 new_state = curr_state
2535
2536 # If new state is RUNNING; check all VLs
2537 if new_state == NetworkServiceRecordState.RUNNING:
2538 for vl in self.vlrs:
2539
2540 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2541 pass
2542 elif vl.state == VlRecordState.FAILED:
2543 if vl.prev_state != vl.state:
2544 event_descr = "Instantiation of VL %s failed" % vl.id
2545 event_error_details = vl.state_failed_reason
2546 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2547 vl.prev_state = vl.state
2548 else:
2549 self._log.debug("VL %s already in failed state")
2550 else:
2551 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2552 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2553 break
2554
2555 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2556 new_state = NetworkServiceRecordState.VL_TERMINATE
2557 break
2558
2559 # If new state is RUNNING; check VNFFGRs are also active
2560 if new_state == NetworkServiceRecordState.RUNNING:
2561 for _, vnffgr in self.vnffgrs.items():
2562 self._log.info("Checking vnffgr state for nsr %s is: %s",
2563 self.id, vnffgr.state)
2564 if vnffgr.state == VnffgRecordState.ACTIVE:
2565 pass
2566 elif vnffgr.state == VnffgRecordState.FAILED:
2567 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2568 self.record_event("vnffg-failed", event_descr)
2569 new_state = NetworkServiceRecordState.FAILED
2570 break
2571 else:
2572 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2573 vnffgr.id, self.id, vnffgr.state)
2574 new_state = curr_state
2575
2576 # Update all the scaling group instance operational status to
2577 # reflect the state of all VNFR within that instance
2578 yield from self._update_scale_group_instances_status()
2579
2580 for _, group in self._scaling_groups.items():
2581 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2582 new_state = NetworkServiceRecordState.SCALING_OUT
2583 break
2584 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2585 new_state = NetworkServiceRecordState.SCALING_IN
2586 break
2587
2588 if new_state != curr_state:
2589 self._log.debug("Changing state of Network service %s from %s to %s",
2590 self.id, curr_state, new_state)
2591 if new_state == NetworkServiceRecordState.RUNNING:
2592 yield from self.is_active()
2593 elif new_state == NetworkServiceRecordState.FAILED:
2594 # If the NS is already active and we entered scaling_in, scaling_out,
2595 # do not mark the NS as failing if scaling operation failed.
2596 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2597 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2598 new_state = NetworkServiceRecordState.RUNNING
2599 self.set_state(new_state)
2600 else:
2601 yield from self.instantiation_failed()
2602 else:
2603 self.set_state(new_state)
2604
2605 yield from self.publish()
2606
2607
2608 class InputParameterSubstitution(object):
2609 """
2610 This class is responsible for substituting input parameters into an NSD.
2611 """
2612
2613 def __init__(self, log):
2614 """Create an instance of InputParameterSubstitution
2615
2616 Arguments:
2617 log - a logger for this object to use
2618
2619 """
2620 self.log = log
2621
2622 def __call__(self, nsd, nsr_config):
2623 """Substitutes input parameters from the NSR config into the NSD
2624
2625 This call modifies the provided NSD with the input parameters that are
2626 contained in the NSR config.
2627
2628 Arguments:
2629 nsd - a GI NSD object
2630 nsr_config - a GI NSR config object
2631
2632 """
2633 if nsd is None or nsr_config is None:
2634 return
2635
2636 # Create a lookup of the xpath elements that this descriptor allows
2637 # to be modified
2638 optional_input_parameters = set()
2639 for input_parameter in nsd.input_parameter_xpath:
2640 optional_input_parameters.add(input_parameter.xpath)
2641
2642 # Apply the input parameters to the descriptor
2643 if nsr_config.input_parameter:
2644 for param in nsr_config.input_parameter:
2645 if param.xpath not in optional_input_parameters:
2646 msg = "tried to set an invalid input parameter ({})"
2647 self.log.error(msg.format(param.xpath))
2648 continue
2649
2650 self.log.debug(
2651 "input-parameter:{} = {}".format(
2652 param.xpath,
2653 param.value,
2654 )
2655 )
2656
2657 try:
2658 xpath.setxattr(nsd, param.xpath, param.value)
2659
2660 except Exception as e:
2661 self.log.exception(e)
2662
2663
2664 class NetworkServiceDescriptor(object):
2665 """
2666 Network service descriptor class
2667 """
2668
2669 def __init__(self, dts, log, loop, nsd, nsm):
2670 self._dts = dts
2671 self._log = log
2672 self._loop = loop
2673
2674 self._nsd = nsd
2675 self._ref_count = 0
2676
2677 self._nsm = nsm
2678
2679 @property
2680 def id(self):
2681 """ Returns nsd id """
2682 return self._nsd.id
2683
2684 @property
2685 def name(self):
2686 """ Returns name of nsd """
2687 return self._nsd.name
2688
2689 @property
2690 def ref_count(self):
2691 """ Returns reference count"""
2692 return self._ref_count
2693
2694 def in_use(self):
2695 """ Returns whether nsd is in use or not """
2696 return True if self.ref_count > 0 else False
2697
2698 def ref(self):
2699 """ Take a reference on this object """
2700 self._ref_count += 1
2701
2702 def unref(self):
2703 """ Release reference on this object """
2704 if self.ref_count < 1:
2705 msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2706 (self.id, self.ref_count))
2707 self._log.critical(msg)
2708 raise NetworkServiceDescriptorError(msg)
2709 self._ref_count -= 1
2710
2711 @property
2712 def msg(self):
2713 """ Return the message associated with this NetworkServiceDescriptor"""
2714 return self._nsd
2715
2716 @staticmethod
2717 def path_for_id(nsd_id):
2718 """ Return path for the passed nsd_id"""
2719 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2720
2721 def path(self):
2722 """ Return the message associated with this NetworkServiceDescriptor"""
2723 return NetworkServiceDescriptor.path_for_id(self.id)
2724
2725 def update(self, nsd):
2726 """ Update the NSD descriptor """
2727 self._nsd = nsd
2728
2729
2730 class NsdDtsHandler(object):
2731 """ The network service descriptor DTS handler """
2732 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2733
2734 def __init__(self, dts, log, loop, nsm):
2735 self._dts = dts
2736 self._log = log
2737 self._loop = loop
2738 self._nsm = nsm
2739
2740 self._regh = None
2741
2742 @property
2743 def regh(self):
2744 """ Return registration handle """
2745 return self._regh
2746
2747 @asyncio.coroutine
2748 def register(self):
2749 """ Register for Nsd create/update/delete/read requests from dts """
2750
2751 def on_apply(dts, acg, xact, action, scratch):
2752 """Apply the configuration"""
2753 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2754 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2755 xact, action)
2756 # Create/Update an NSD record
2757 for cfg in self._regh.get_xact_elements(xact):
2758 # Only interested in those NSD cfgs whose ID was received in prepare callback
2759 if cfg.id in scratch.get('nsds', []) or is_recovery:
2760 self._nsm.update_nsd(cfg)
2761
2762 scratch.pop('nsds', None)
2763
2764 return RwTypes.RwStatus.SUCCESS
2765
2766 @asyncio.coroutine
2767 def delete_nsd_libs(nsd_id):
2768 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2769 try:
2770 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2771 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2772
2773 if os.path.exists (nsd_dir):
2774 shutil.rmtree(nsd_dir, ignore_errors=True)
2775 except Exception as e:
2776 self._log.error("Exception in cleaning up NSD libs {}: {}".
2777 format(nsd_id, e))
2778 self._log.excpetion(e)
2779
2780 @asyncio.coroutine
2781 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2782 """ Prepare callback from DTS for NSD config """
2783
2784 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2785 msg.id, msg)
2786
2787 fref = ProtobufC.FieldReference.alloc()
2788 fref.goto_whole_message(msg.to_pbcm())
2789
2790 if fref.is_field_deleted():
2791 # Delete an NSD record
2792 self._log.debug("Deleting NSD with id %s", msg.id)
2793 if self._nsm.nsd_in_use(msg.id):
2794 self._log.debug("Cannot delete NSD in use - %s", msg.id)
2795 err = "Cannot delete an NSD in use - %s" % msg.id
2796 raise NetworkServiceDescriptorRefCountExists(err)
2797
2798 yield from delete_nsd_libs(msg.id)
2799 self._nsm.delete_nsd(msg.id)
2800 else:
2801 # Add this NSD to scratch to create/update in apply callback
2802 nsds = scratch.setdefault('nsds', [])
2803 nsds.append(msg.id)
2804 # acg._scratch['nsds'].append(msg.id)
2805
2806 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2807
2808 self._log.debug(
2809 "Registering for NSD config using xpath: %s",
2810 NsdDtsHandler.XPATH,
2811 )
2812
2813 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2814 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2815 # Need a list in scratch to store NSDs to create/update later
2816 # acg._scratch['nsds'] = list()
2817 self._regh = acg.register(
2818 xpath=NsdDtsHandler.XPATH,
2819 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2820 on_prepare=on_prepare)
2821
2822
2823 class VnfdDtsHandler(object):
2824 """ DTS handler for VNFD config changes """
2825 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2826
2827 def __init__(self, dts, log, loop, nsm):
2828 self._dts = dts
2829 self._log = log
2830 self._loop = loop
2831 self._nsm = nsm
2832 self._regh = None
2833
2834 @property
2835 def regh(self):
2836 """ DTS registration handle """
2837 return self._regh
2838
2839 @asyncio.coroutine
2840 def register(self):
2841 """ Register for VNFD configuration"""
2842
2843 @asyncio.coroutine
2844 def on_apply(dts, acg, xact, action, scratch):
2845 """Apply the configuration"""
2846 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2847 xact, action, scratch)
2848
2849 # Create/Update a VNFD record
2850 for cfg in self._regh.get_xact_elements(xact):
2851 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2852 if cfg.id in scratch.get('vnfds', []):
2853 self._nsm.update_vnfd(cfg)
2854
2855 for cfg in self._regh.elements:
2856 if cfg.id in scratch.get('deleted_vnfds', []):
2857 yield from self._nsm.delete_vnfd(cfg.id)
2858
2859 scratch.pop('vnfds', None)
2860 scratch.pop('deleted_vnfds', None)
2861
2862 @asyncio.coroutine
2863 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2864 """ on prepare callback """
2865 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2866 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2867
2868 fref = ProtobufC.FieldReference.alloc()
2869 fref.goto_whole_message(msg.to_pbcm())
2870
2871 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2872 if fref.is_field_deleted():
2873 self._log.debug("Adding msg to deleted field")
2874 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2875 deleted_vnfds.append(msg.id)
2876 else:
2877 # Add this VNFD to scratch to create/update in apply callback
2878 vnfds = scratch.setdefault('vnfds', [])
2879 vnfds.append(msg.id)
2880
2881 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2882
2883 self._log.debug(
2884 "Registering for VNFD config using xpath: %s",
2885 VnfdDtsHandler.XPATH,
2886 )
2887 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2888 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2889 # Need a list in scratch to store VNFDs to create/update later
2890 # acg._scratch['vnfds'] = list()
2891 # acg._scratch['deleted_vnfds'] = list()
2892 self._regh = acg.register(
2893 xpath=VnfdDtsHandler.XPATH,
2894 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2895 on_prepare=on_prepare)
2896
2897 class NsrRpcDtsHandler(object):
2898 """ The network service instantiation RPC DTS handler """
2899 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2900 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2901 NETCONF_IP_ADDRESS = "127.0.0.1"
2902 NETCONF_PORT = 2022
2903 NETCONF_USER = "admin"
2904 NETCONF_PW = "admin"
2905
2906 def __init__(self, dts, log, loop, nsm):
2907 self._dts = dts
2908 self._log = log
2909 self._loop = loop
2910 self._nsm = nsm
2911 self._nsd = None
2912
2913 self._ns_regh = None
2914
2915 self._manager = None
2916
2917 self._model = RwYang.Model.create_libncx()
2918 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2919
2920 @property
2921 def nsm(self):
2922 """ Return the NS manager instance """
2923 return self._nsm
2924
2925 @staticmethod
2926 def wrap_netconf_config_xml(xml):
2927 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2928 return xml
2929
2930 @asyncio.coroutine
2931 def _connect(self, timeout_secs=240):
2932
2933 start_time = time.time()
2934 while (time.time() - start_time) < timeout_secs:
2935
2936 try:
2937 self._log.debug("Attemping NsmTasklet netconf connection.")
2938
2939 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2940 loop=self._loop,
2941 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2942 port=NsrRpcDtsHandler.NETCONF_PORT,
2943 username=NsrRpcDtsHandler.NETCONF_USER,
2944 password=NsrRpcDtsHandler.NETCONF_PW,
2945 allow_agent=False,
2946 look_for_keys=False,
2947 hostkey_verify=False,
2948 )
2949
2950 return manager
2951
2952 except ncclient.transport.errors.SSHError as e:
2953 self._log.warning("Netconf connection to launchpad %s failed: %s",
2954 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2955
2956 yield from asyncio.sleep(5, loop=self._loop)
2957
2958 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2959 timeout_secs)
2960
2961 @asyncio.coroutine
2962 def register(self):
2963 """ Register for NS monitoring read from dts """
2964 @asyncio.coroutine
2965 def on_ns_config_prepare(xact_info, action, ks_path, msg):
2966 """ prepare callback from dts start-network-service"""
2967 assert action == rwdts.QueryAction.RPC
2968 rpc_ip = msg
2969 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
2970 "nsr_id":str(uuid.uuid4())
2971 })
2972
2973 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and 'cloud_account' in rpc_ip):
2974 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
2975
2976
2977 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
2978
2979 try:
2980 # Add used value to the pool
2981 self._log.debug("RPC output: {}".format(rpc_op))
2982 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
2983
2984 if not self._manager:
2985 self._manager = yield from self._connect()
2986
2987 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2988 rpc_ip.name, rpc_ip.nsd_ref)
2989
2990 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
2991 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
2992 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
2993 ns_instance_config_dict.update(ns_instance_config_copy_dict)
2994
2995 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
2996 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2997 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
2998
2999 xml = ns_instance_config.to_xml_v2(self._model)
3000 netconf_xml = self.wrap_netconf_config_xml(xml)
3001
3002 self._log.debug("Sending configure ns-instance-config xml to %s: %s",
3003 netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
3004
3005 response = yield from self._manager.edit_config(
3006 target="running",
3007 config=netconf_xml,
3008 )
3009 self._log.debug("Received edit config response: %s", str(response))
3010
3011 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3012 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3013 rpc_op)
3014 except Exception as e:
3015 self._log.error("Exception processing the "
3016 "start-network-service: {}".format(e))
3017 self._log.exception(e)
3018 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3019 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3020
3021
3022 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
3023
3024 with self._dts.group_create() as group:
3025 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3026 handler=hdl_ns,
3027 flags=rwdts.Flag.PUBLISHER,
3028 )
3029
3030
3031 class NsrDtsHandler(object):
3032 """ The network service DTS handler """
3033 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3034 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3035
3036 def __init__(self, dts, log, loop, nsm):
3037 self._dts = dts
3038 self._log = log
3039 self._loop = loop
3040 self._nsm = nsm
3041
3042 self._nsr_regh = None
3043 self._scale_regh = None
3044
3045 @property
3046 def nsm(self):
3047 """ Return the NS manager instance """
3048 return self._nsm
3049
3050 @asyncio.coroutine
3051 def register(self):
3052 """ Register for Nsr create/update/delete/read requests from dts """
3053
3054 def nsr_id_from_keyspec(ks):
3055 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3056 nsr_id = nsr_path_entry.key00.id
3057 return nsr_id
3058
3059 def group_name_from_keyspec(ks):
3060 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3061 group_name = group_path_entry.key00.scaling_group_name_ref
3062 return group_name
3063
3064 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3065 """ Return boolean indicating if scaling group instance was already commited previously.
3066
3067 By looking at the existing elements in this registration handle (elements not part
3068 of this current xact), we can tell if the instance was configured previously without
3069 keeping any application state.
3070 """
3071 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3072 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3073 elem_group_name = group_name_from_keyspec(keyspec)
3074
3075 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3076 continue
3077
3078 if instance_cfg.id == instance_id:
3079 return True
3080
3081 return False
3082
3083 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3084 delta = {"added": [], "deleted": []}
3085 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3086 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3087 if elem_nsr_id != nsr_id:
3088 continue
3089
3090 elem_group_name = group_name_from_keyspec(keyspec)
3091 if elem_group_name != group_name:
3092 continue
3093
3094 delta["added"].append(instance_cfg.id)
3095
3096 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3097 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3098 if elem_nsr_id != nsr_id:
3099 continue
3100
3101 elem_group_name = group_name_from_keyspec(keyspec)
3102 if elem_group_name != group_name:
3103 continue
3104
3105 if instance_cfg.id in delta["added"]:
3106 delta["added"].remove(instance_cfg.id)
3107 else:
3108 delta["deleted"].append(instance_cfg.id)
3109
3110 return delta
3111
3112 @asyncio.coroutine
3113 def update_nsr_nsd(nsr_id, xact, scratch):
3114
3115 @asyncio.coroutine
3116 def get_nsr_vl_delta(nsr_id, xact, scratch):
3117 delta = {"added": [], "deleted": []}
3118 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3119 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3120 if elem_nsr_id != nsr_id:
3121 continue
3122
3123 if 'vld' in instance_cfg.nsd:
3124 for vld in instance_cfg.nsd.vld:
3125 delta["added"].append(vld)
3126
3127 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3128 self._log.debug("NSR update: %s", instance_cfg)
3129 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3130 if elem_nsr_id != nsr_id:
3131 continue
3132
3133 if 'vld' in instance_cfg.nsd:
3134 for vld in instance_cfg.nsd.vld:
3135 if vld in delta["added"]:
3136 delta["added"].remove(vld)
3137 else:
3138 delta["deleted"].append(vld)
3139
3140 return delta
3141
3142 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3143 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3144
3145 for vld in vl_delta["added"]:
3146 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3147
3148 for vld in vl_delta["deleted"]:
3149 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3150
3151 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3152 # Unfortunately, it is currently difficult to figure out what has exactly
3153 # changed in this xact without Pbdelta support (RIFT-4916)
3154 # As a workaround, we can fetch the pre and post xact elements and
3155 # perform a comparison to figure out adds/deletes/updates
3156 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3157 curr_cfgs = list(dts_member_reg.elements)
3158
3159 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3160 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3161
3162 # Find Adds
3163 added_keys = set(xact_key_map) - set(curr_key_map)
3164 added_cfgs = [xact_key_map[key] for key in added_keys]
3165
3166 # Find Deletes
3167 deleted_keys = set(curr_key_map) - set(xact_key_map)
3168 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3169
3170 # Find Updates
3171 updated_keys = set(curr_key_map) & set(xact_key_map)
3172 updated_cfgs = [xact_key_map[key] for key in updated_keys
3173 if xact_key_map[key] != curr_key_map[key]]
3174
3175 return added_cfgs, deleted_cfgs, updated_cfgs
3176
3177 def on_apply(dts, acg, xact, action, scratch):
3178 """Apply the configuration"""
3179 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3180 xact, action, scratch)
3181
3182 def handle_create_nsr(msg, restart_mode=False):
3183 # Handle create nsr requests """
3184 # Do some validations
3185 if not msg.has_field("nsd"):
3186 err = "NSD not provided"
3187 self._log.error(err)
3188 raise NetworkServiceRecordError(err)
3189
3190 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3191 msg.id, msg.as_dict())
3192 nsr = self.nsm.create_nsr(msg, restart_mode=restart_mode)
3193 return nsr
3194
3195 def handle_delete_nsr(msg):
3196 @asyncio.coroutine
3197 def delete_instantiation(ns_id):
3198 """ Delete instantiation """
3199 with self._dts.transaction() as xact:
3200 yield from self._nsm.terminate_ns(ns_id, xact)
3201
3202 # Handle delete NSR requests
3203 self._log.info("Delete req for NSR Id: %s received", msg.id)
3204 # Terminate the NSR instance
3205 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3206
3207 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3208 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3209 nsr.record_event("terminate-rcvd", event_descr)
3210
3211 self._loop.create_task(delete_instantiation(msg.id))
3212
3213 @asyncio.coroutine
3214 def begin_instantiation(nsr):
3215 # Begin instantiation
3216 self._log.info("Beginning NS instantiation: %s", nsr.id)
3217 yield from self._nsm.instantiate_ns(nsr.id, xact)
3218
3219 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3220 xact, action, scratch)
3221
3222 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3223 for element in self._nsr_regh.elements:
3224 nsr = handle_create_nsr(element, restart_mode=True)
3225 self._loop.create_task(begin_instantiation(nsr))
3226
3227
3228 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3229 xact,
3230 "id",
3231 scratch)
3232 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3233 deleted_msgs, updated_msgs)
3234
3235 for msg in added_msgs:
3236 if msg.id not in self._nsm.nsrs:
3237 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3238 nsr = handle_create_nsr(msg)
3239 self._loop.create_task(begin_instantiation(nsr))
3240
3241 for msg in deleted_msgs:
3242 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3243 try:
3244 handle_delete_nsr(msg)
3245 except Exception:
3246 self._log.exception("Failed to terminate NS:%s", msg.id)
3247
3248 for msg in updated_msgs:
3249 self._log.info("Update NSR received in on_apply: %s", msg)
3250
3251 self._nsm.nsr_update_cfg(msg.id, msg)
3252
3253 if 'nsd' in msg:
3254 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3255
3256 for group in msg.scaling_group:
3257 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3258 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3259
3260 for instance_id in instance_delta["added"]:
3261 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3262
3263 for instance_id in instance_delta["deleted"]:
3264 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3265
3266
3267 return RwTypes.RwStatus.SUCCESS
3268
3269 @asyncio.coroutine
3270 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3271 """ Prepare calllback from DTS for NSR """
3272
3273 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3274 action = xact_info.query_action
3275 self._log.debug(
3276 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3277 xact, action, xact_info, xpath, msg
3278 )
3279
3280 @asyncio.coroutine
3281 def delete_instantiation(ns_id):
3282 """ Delete instantiation """
3283 yield from self._nsm.terminate_ns(ns_id, None)
3284
3285 def handle_delete_nsr():
3286 """ Handle delete NSR requests """
3287 self._log.info("Delete req for NSR Id: %s received", msg.id)
3288 # Terminate the NSR instance
3289 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3290
3291 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3292 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3293 nsr.record_event("terminate-rcvd", event_descr)
3294
3295 self._loop.create_task(delete_instantiation(msg.id))
3296
3297 fref = ProtobufC.FieldReference.alloc()
3298 fref.goto_whole_message(msg.to_pbcm())
3299
3300 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3301 # if this is an NSR create
3302 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3303 # Ensure the Cloud account/datacenter has been specified
3304 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3305 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3306
3307 # Check if nsd is specified
3308 if not msg.has_field("nsd"):
3309 raise NsrInstantiationFailed("NSD not specified in NSR")
3310
3311 else:
3312 nsr = self._nsm.nsrs[msg.id]
3313
3314 if msg.has_field("nsd"):
3315 if nsr.state != NetworkServiceRecordState.RUNNING:
3316 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3317 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3318 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3319
3320 if msg.has_field("scaling_group"):
3321 if nsr.state != NetworkServiceRecordState.RUNNING:
3322 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3323
3324 if len(msg.scaling_group) > 1:
3325 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3326
3327 for group_msg in msg.scaling_group:
3328 num_new_group_instances = len(group_msg.instance)
3329 if num_new_group_instances > 1:
3330 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3331
3332 elif num_new_group_instances == 1:
3333 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3334 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3335 if len(scale_group.instances) == scale_group.max_instance_count:
3336 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3337
3338 acg.handle.prepare_complete_ok(xact_info.handle)
3339
3340
3341 self._log.debug("Registering for NSR config using xpath: %s",
3342 NsrDtsHandler.NSR_XPATH)
3343
3344 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3345 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3346 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3347 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3348 on_prepare=on_prepare)
3349
3350 self._scale_regh = acg.register(
3351 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3352 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3353 )
3354
3355
3356 class NsrOpDataDtsHandler(object):
3357 """ The network service op data DTS handler """
3358 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3359
3360 def __init__(self, dts, log, loop, nsm):
3361 self._dts = dts
3362 self._log = log
3363 self._loop = loop
3364 self._nsm = nsm
3365 self._regh = None
3366
3367 @property
3368 def regh(self):
3369 """ Return the registration handle"""
3370 return self._regh
3371
3372 @property
3373 def nsm(self):
3374 """ Return the NS manager instance """
3375 return self._nsm
3376
3377 @asyncio.coroutine
3378 def register(self):
3379 """ Register for Nsr op data publisher registration"""
3380 self._log.debug("Registering Nsr op data path %s as publisher",
3381 NsrOpDataDtsHandler.XPATH)
3382
3383 hdl = rift.tasklets.DTS.RegistrationHandler()
3384 handlers = rift.tasklets.Group.Handler()
3385 with self._dts.group_create(handler=handlers) as group:
3386 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3387 handler=hdl,
3388 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3389
3390 @asyncio.coroutine
3391 def create(self, path, msg):
3392 """
3393 Create an NS record in DTS with the path and message
3394 """
3395 self._log.debug("Creating NSR %s:%s", path, msg)
3396 self.regh.create_element(path, msg)
3397 self._log.debug("Created NSR, %s:%s", path, msg)
3398
3399 @asyncio.coroutine
3400 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3401 """
3402 Update an NS record in DTS with the path and message
3403 """
3404 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3405 self.regh.update_element(path, msg, flags)
3406 self._log.debug("Updated NSR, %s:%s", path, msg)
3407
3408 @asyncio.coroutine
3409 def delete(self, path):
3410 """
3411 Update an NS record in DTS with the path and message
3412 """
3413 self._log.debug("Deleting NSR path:%s", path)
3414 self.regh.delete_element(path)
3415 self._log.debug("Deleted NSR path:%s", path)
3416
3417
3418 class VnfrDtsHandler(object):
3419 """ The virtual network service DTS handler """
3420 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3421
3422 def __init__(self, dts, log, loop, nsm):
3423 self._dts = dts
3424 self._log = log
3425 self._loop = loop
3426 self._nsm = nsm
3427
3428 self._regh = None
3429
3430 @property
3431 def regh(self):
3432 """ Return registration handle """
3433 return self._regh
3434
3435 @property
3436 def nsm(self):
3437 """ Return the NS manager instance """
3438 return self._nsm
3439
3440 @asyncio.coroutine
3441 def register(self):
3442 """ Register for vnfr create/update/delete/ advises from dts """
3443
3444 def on_commit(xact_info):
3445 """ The transaction has been committed """
3446 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3447 return rwdts.MemberRspCode.ACTION_OK
3448
3449 @asyncio.coroutine
3450 def on_prepare(xact_info, action, ks_path, msg):
3451 """ prepare callback from dts """
3452 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3453 self._log.debug(
3454 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3455 xact_info, action, ks_path, msg
3456 )
3457
3458 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3459 path_entry = schema.keyspec_to_entry(ks_path)
3460 if path_entry.key00.id not in self._nsm._vnfrs:
3461 self._log.error("%s request for non existent record path %s",
3462 action, xpath)
3463 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3464
3465 return
3466
3467 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3468 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3469 yield from self._nsm.update_vnfr(msg)
3470 elif action == rwdts.QueryAction.DELETE:
3471 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3472 self._nsm.delete_vnfr(path_entry.key00.id)
3473
3474 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3475
3476 self._log.debug("Registering for VNFR using xpath: %s",
3477 VnfrDtsHandler.XPATH,)
3478
3479 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3480 on_prepare=on_prepare,)
3481 with self._dts.group_create() as group:
3482 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3483 handler=hdl,
3484 flags=(rwdts.Flag.SUBSCRIBER),)
3485
3486
3487 class NsdRefCountDtsHandler(object):
3488 """ The NSD Ref Count DTS handler """
3489 XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3490
3491 def __init__(self, dts, log, loop, nsm):
3492 self._dts = dts
3493 self._log = log
3494 self._loop = loop
3495 self._nsm = nsm
3496
3497 self._regh = None
3498
3499 @property
3500 def regh(self):
3501 """ Return registration handle """
3502 return self._regh
3503
3504 @property
3505 def nsm(self):
3506 """ Return the NS manager instance """
3507 return self._nsm
3508
3509 @asyncio.coroutine
3510 def register(self):
3511 """ Register for NSD ref count read from dts """
3512
3513 @asyncio.coroutine
3514 def on_prepare(xact_info, action, ks_path, msg):
3515 """ prepare callback from dts """
3516 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3517
3518 if action == rwdts.QueryAction.READ:
3519 schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema()
3520 path_entry = schema.keyspec_to_entry(ks_path)
3521 nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref)
3522 for xpath, msg in nsd_list:
3523 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
3524 xpath=xpath,
3525 msg=msg)
3526 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3527 else:
3528 raise NetworkServiceRecordError("Not supported operation %s" % action)
3529
3530 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
3531 with self._dts.group_create() as group:
3532 self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH,
3533 handler=hdl,
3534 flags=rwdts.Flag.PUBLISHER,)
3535
3536
3537 class NsManager(object):
3538 """ The Network Service Manager class"""
3539 def __init__(self, dts, log, loop,
3540 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3541 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3542 self._dts = dts
3543 self._log = log
3544 self._loop = loop
3545 self._nsr_handler = nsr_handler
3546 self._vnfr_pub_handler = vnfr_handler
3547 self._vlr_pub_handler = vlr_handler
3548 self._vnffgmgr = vnffgmgr
3549 self._vnfd_pub_handler = vnfd_pub_handler
3550 self._cloud_account_handler = cloud_account_handler
3551
3552 self._ro_plugin_selector = ro_plugin_selector
3553 self._ncclient = rift.mano.ncclient.NcClient(
3554 host="127.0.0.1",
3555 port=2022,
3556 username="admin",
3557 password="admin",
3558 loop=self._loop)
3559
3560 self._nsrs = {}
3561 self._nsds = {}
3562 self._vnfds = {}
3563 self._vnfrs = {}
3564
3565 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3566
3567 # TODO: All these handlers should move to tasklet level.
3568 # Passing self is often an indication of bad design
3569 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3570 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3571 self._dts_handlers = [self._nsd_dts_handler,
3572 VnfrDtsHandler(dts, log, loop, self),
3573 NsdRefCountDtsHandler(dts, log, loop, self),
3574 NsrDtsHandler(dts, log, loop, self),
3575 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3576 NsrRpcDtsHandler(dts,log,loop,self),
3577 self._vnfd_dts_handler,
3578 self.cfgmgr_obj,
3579 ]
3580
3581
3582 @property
3583 def log(self):
3584 """ Log handle """
3585 return self._log
3586
3587 @property
3588 def loop(self):
3589 """ Loop """
3590 return self._loop
3591
3592 @property
3593 def dts(self):
3594 """ DTS handle """
3595 return self._dts
3596
3597 @property
3598 def nsr_handler(self):
3599 """" NSR handler """
3600 return self._nsr_handler
3601
3602 @property
3603 def so_obj(self):
3604 """" So Obj handler """
3605 return self._so_obj
3606
3607 @property
3608 def nsrs(self):
3609 """ NSRs in this NSM"""
3610 return self._nsrs
3611
3612 @property
3613 def nsds(self):
3614 """ NSDs in this NSM"""
3615 return self._nsds
3616
3617 @property
3618 def vnfds(self):
3619 """ VNFDs in this NSM"""
3620 return self._vnfds
3621
3622 @property
3623 def vnfrs(self):
3624 """ VNFRs in this NSM"""
3625 return self._vnfrs
3626
3627 @property
3628 def nsr_pub_handler(self):
3629 """ NSR publication handler """
3630 return self._nsr_handler
3631
3632 @property
3633 def vnfr_pub_handler(self):
3634 """ VNFR publication handler """
3635 return self._vnfr_pub_handler
3636
3637 @property
3638 def vlr_pub_handler(self):
3639 """ VLR publication handler """
3640 return self._vlr_pub_handler
3641
3642 @property
3643 def vnfd_pub_handler(self):
3644 return self._vnfd_pub_handler
3645
3646 @asyncio.coroutine
3647 def register(self):
3648 """ Register all static DTS handlers """
3649 for dts_handle in self._dts_handlers:
3650 yield from dts_handle.register()
3651
3652
3653 def get_ns_by_nsr_id(self, nsr_id):
3654 """ get NSR by nsr id """
3655 if nsr_id not in self._nsrs:
3656 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3657
3658 return self._nsrs[nsr_id]
3659
3660 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3661 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3662 nsr_id,
3663 scale_group_name,
3664 instance_id
3665 )
3666 nsr = self._nsrs[nsr_id]
3667 if nsr.state != NetworkServiceRecordState.RUNNING:
3668 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3669
3670 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3671
3672 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3673 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3674 nsr_id,
3675 scale_group_name,
3676 instance_id,
3677 )
3678 nsr = self._nsrs[nsr_id]
3679 if nsr.state != NetworkServiceRecordState.RUNNING:
3680 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3681
3682 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3683
3684 def scale_rpc_callback(self, xact, msg, action):
3685 """Callback handler for RPC calls
3686 Args:
3687 xact : Transaction Handler
3688 msg : RPC input
3689 action : Scaling Action
3690 """
3691 ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3692 ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3693
3694 xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3695 msg.nsr_id_ref)
3696 instance = ScalingGroupInstance.from_dict({"id": msg.instance_id})
3697
3698 @asyncio.coroutine
3699 def get_nsr_scaling_group():
3700 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3701
3702 for result in results:
3703 res = yield from result
3704 nsr_config = res.result
3705
3706 for scaling_group in nsr_config.scaling_group:
3707 if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref:
3708 break
3709 else:
3710 scaling_group = nsr_config.scaling_group.add()
3711 scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref
3712
3713 return (nsr_config, scaling_group)
3714
3715 @asyncio.coroutine
3716 def update_config(nsr_config):
3717 xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config)
3718 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3719 yield from self._ncclient.connect()
3720 yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace")
3721
3722 @asyncio.coroutine
3723 def scale_out():
3724 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3725 scaling_group.instance.append(instance)
3726 yield from update_config(nsr_config)
3727
3728 @asyncio.coroutine
3729 def scale_in():
3730 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3731 scaling_group.instance.remove(instance)
3732 yield from update_config(nsr_config)
3733
3734 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3735 self._loop.create_task(scale_out())
3736 else:
3737 self._loop.create_task(scale_in())
3738
3739 # Opdata based calls, disabled for now!
3740 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3741 # self.scale_nsr_out(
3742 # msg.nsr_id_ref,
3743 # msg.scaling_group_name_ref,
3744 # msg.instance_id,
3745 # xact)
3746 # else:
3747 # self.scale_nsr_in(
3748 # msg.nsr_id_ref,
3749 # msg.scaling_group_name_ref,
3750 # msg.instance_id)
3751
3752 def nsr_update_cfg(self, nsr_id, msg):
3753 nsr = self._nsrs[nsr_id]
3754 nsr.nsr_cfg_msg= msg
3755
3756 def nsr_instantiate_vl(self, nsr_id, vld):
3757 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3758 nsr = self._nsrs[nsr_id]
3759 if nsr.state != NetworkServiceRecordState.RUNNING:
3760 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3761
3762 # Not calling in a separate task as this is called from a separate task
3763 yield from nsr.create_vl_instance(vld)
3764
3765 def nsr_terminate_vl(self, nsr_id, vld):
3766 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3767 nsr = self._nsrs[nsr_id]
3768 if nsr.state != NetworkServiceRecordState.RUNNING:
3769 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3770
3771 # Not calling in a separate task as this is called from a separate task
3772 yield from nsr.delete_vl_instance(vld)
3773
3774 def create_nsr(self, nsr_msg, restart_mode=False):
3775 """ Create an NSR instance """
3776 if nsr_msg.id in self._nsrs:
3777 msg = "NSR id %s already exists" % nsr_msg.id
3778 self._log.error(msg)
3779 raise NetworkServiceRecordError(msg)
3780
3781 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3782 nsr_msg.id,
3783 nsr_msg.nsd.id)
3784
3785 nsm_plugin = self._ro_plugin_selector.ro_plugin
3786 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3787
3788 nsr = NetworkServiceRecord(self._dts,
3789 self._log,
3790 self._loop,
3791 self,
3792 nsm_plugin,
3793 nsr_msg,
3794 sdn_account_name,
3795 restart_mode=restart_mode
3796 )
3797 self._nsrs[nsr_msg.id] = nsr
3798 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd)
3799
3800 return nsr
3801
3802 def delete_nsr(self, nsr_id):
3803 """
3804 Delete NSR with the passed nsr id
3805 """
3806 del self._nsrs[nsr_id]
3807
3808 @asyncio.coroutine
3809 def instantiate_ns(self, nsr_id, config_xact):
3810 """ Instantiate an NS instance """
3811 self._log.debug("Instantiating Network service id %s", nsr_id)
3812 if nsr_id not in self._nsrs:
3813 err = "NSR id %s not found " % nsr_id
3814 self._log.error(err)
3815 raise NetworkServiceRecordError(err)
3816
3817 nsr = self._nsrs[nsr_id]
3818 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3819
3820 @asyncio.coroutine
3821 def update_vnfr(self, vnfr):
3822 """Create/Update an VNFR """
3823
3824 vnfr_state = self._vnfrs[vnfr.id].state
3825 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3826
3827 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3828 nsr = self.find_nsr_for_vnfr(vnfr.id)
3829 yield from nsr.update_state()
3830
3831 def find_nsr_for_vnfr(self, vnfr_id):
3832 """ Find the NSR which )has the passed vnfr id"""
3833 for nsr in list(self.nsrs.values()):
3834 for vnfr in list(nsr.vnfrs.values()):
3835 if vnfr.id == vnfr_id:
3836 return nsr
3837 return None
3838
3839 def delete_vnfr(self, vnfr_id):
3840 """ Delete VNFR with the passed id"""
3841 del self._vnfrs[vnfr_id]
3842
3843 def get_nsd_ref(self, nsd_id):
3844 """ Get network service descriptor for the passed nsd_id
3845 with a reference"""
3846 nsd = self.get_nsd(nsd_id)
3847 nsd.ref()
3848 return nsd
3849
3850 @asyncio.coroutine
3851 def get_nsr_config(self, nsd_id):
3852 xpath = "C,/nsr:ns-instance-config"
3853 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3854
3855 for result in results:
3856 entry = yield from result
3857 ns_instance_config = entry.result
3858
3859 for nsr in ns_instance_config.nsr:
3860 if nsr.nsd.id == nsd_id:
3861 return nsr
3862
3863 return None
3864
3865 @asyncio.coroutine
3866 def nsd_unref_by_nsr_id(self, nsr_id):
3867 """ Unref the network service descriptor based on NSR id """
3868 self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id)
3869 if nsr_id in self._nsrs:
3870 nsr = self._nsrs[nsr_id]
3871
3872 try:
3873 nsd = self.get_nsd(nsr.nsd_id)
3874 self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3875 nsd.id, nsr.id, nsd.ref_count)
3876 nsd.unref()
3877 except NetworkServiceDescriptorError:
3878 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3879 # could be deleted
3880 pass
3881
3882 else:
3883 self._log.error("Cannot find NSR with id %s", nsr_id)
3884 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id)
3885
3886 @asyncio.coroutine
3887 def nsd_unref(self, nsd_id):
3888 """ Unref the network service descriptor associated with the id """
3889 nsd = self.get_nsd(nsd_id)
3890 nsd.unref()
3891
3892 def get_nsd(self, nsd_id):
3893 """ Get network service descriptor for the passed nsd_id"""
3894 if nsd_id not in self._nsds:
3895 self._log.error("Cannot find NSD id:%s", nsd_id)
3896 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3897
3898 return self._nsds[nsd_id]
3899
3900 def create_nsd(self, nsd_msg):
3901 """ Create a network service descriptor """
3902 self._log.debug("Create network service descriptor - %s", nsd_msg)
3903 if nsd_msg.id in self._nsds:
3904 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3905 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3906
3907 nsd = NetworkServiceDescriptor(
3908 self._dts,
3909 self._log,
3910 self._loop,
3911 nsd_msg,
3912 self
3913 )
3914 self._nsds[nsd_msg.id] = nsd
3915
3916 return nsd
3917
3918 def update_nsd(self, nsd):
3919 """ update the Network service descriptor """
3920 self._log.debug("Update network service descriptor - %s", nsd)
3921 if nsd.id not in self._nsds:
3922 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3923 self.create_nsd(nsd)
3924 else:
3925 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3926 self._nsds[nsd.id].update(nsd)
3927
3928 def delete_nsd(self, nsd_id):
3929 """ Delete the Network service descriptor with the passed id """
3930 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3931 if nsd_id not in self._nsds:
3932 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3933 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
3934
3935 if nsd_id not in self._nsds:
3936 self._log.debug("Cannot delete NSD id %s reference exists %s",
3937 nsd_id,
3938 self._nsds[nsd_id].ref_count)
3939 raise NetworkServiceDescriptorRefCountExists(
3940 "Cannot delete :%s, ref_count:%s",
3941 nsd_id,
3942 self._nsds[nsd_id].ref_count)
3943
3944 del self._nsds[nsd_id]
3945
3946 def get_vnfd_config(self, xact):
3947 vnfd_dts_reg = self._vnfd_dts_handler.regh
3948 for cfg in vnfd_dts_reg.get_xact_elements(xact):
3949 if cfg.id not in self._vnfds:
3950 self.create_vnfd(cfg)
3951
3952 def get_vnfd(self, vnfd_id, xact):
3953 """ Get virtual network function descriptor for the passed vnfd_id"""
3954 if vnfd_id not in self._vnfds:
3955 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3956 self.get_vnfd_config(xact)
3957
3958 if vnfd_id not in self._vnfds:
3959 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3960 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
3961
3962 return self._vnfds[vnfd_id]
3963
3964 def create_vnfd(self, vnfd):
3965 """ Create a virtual network function descriptor """
3966 self._log.debug("Create virtual network function descriptor - %s", vnfd)
3967 if vnfd.id in self._vnfds:
3968 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
3969 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
3970
3971 self._vnfds[vnfd.id] = vnfd
3972 return self._vnfds[vnfd.id]
3973
3974 def update_vnfd(self, vnfd):
3975 """ Update the virtual network function descriptor """
3976 self._log.debug("Update virtual network function descriptor- %s", vnfd)
3977
3978 # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511
3979 for ivld in vnfd.internal_vld:
3980 ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref))
3981
3982 if vnfd.id not in self._vnfds:
3983 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
3984 self.create_vnfd(vnfd)
3985 else:
3986 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
3987 self._vnfds[vnfd.id] = vnfd
3988
3989 @asyncio.coroutine
3990 def delete_vnfd(self, vnfd_id):
3991 """ Delete the virtual network function descriptor with the passed id """
3992 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
3993 if vnfd_id not in self._vnfds:
3994 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
3995 raise VnfDescriptorError("Cannot find %s", vnfd_id)
3996
3997 del self._vnfds[vnfd_id]
3998
3999 def nsd_in_use(self, nsd_id):
4000 """ Is the NSD with the passed id in use """
4001 self._log.debug("Is this NSD in use - msg:%s", nsd_id)
4002 if nsd_id in self._nsds:
4003 return self._nsds[nsd_id].in_use()
4004 return False
4005
4006 @asyncio.coroutine
4007 def publish_nsr(self, xact, path, msg):
4008 """ Publish a NSR """
4009 self._log.debug("Publish NSR with path %s, msg %s",
4010 path, msg)
4011 yield from self.nsr_handler.update(xact, path, msg)
4012
4013 @asyncio.coroutine
4014 def unpublish_nsr(self, xact, path):
4015 """ Un Publish an NSR """
4016 self._log.debug("Publishing delete NSR with path %s", path)
4017 yield from self.nsr_handler.delete(path, xact)
4018
4019 def vnfr_is_ready(self, vnfr_id):
4020 """ VNFR with the id is ready """
4021 self._log.debug("VNFR id %s ready", vnfr_id)
4022 if vnfr_id not in self._vnfds:
4023 err = "Did not find VNFR ID with id %s" % vnfr_id
4024 self._log.critical("err")
4025 raise VirtualNetworkFunctionRecordError(err)
4026 self._vnfrs[vnfr_id].is_ready()
4027
4028 @asyncio.coroutine
4029 def get_nsd_refcount(self, nsd_id):
4030 """ Get the nsd_list from this NSM"""
4031
4032 def nsd_refcount_xpath(nsd_id):
4033 """ xpath for ref count entry """
4034 return (NsdRefCountDtsHandler.XPATH +
4035 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id)
4036
4037 nsd_list = []
4038 if nsd_id is None or nsd_id == "":
4039 for nsd in self._nsds.values():
4040 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4041 nsd_msg.nsd_id_ref = nsd.id
4042 nsd_msg.instance_ref_count = nsd.ref_count
4043 nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg))
4044 elif nsd_id in self._nsds:
4045 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4046 nsd_msg.nsd_id_ref = self._nsds[nsd_id].id
4047 nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count
4048 nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg))
4049
4050 return nsd_list
4051
4052 @asyncio.coroutine
4053 def terminate_ns(self, nsr_id, xact):
4054 """
4055 Terminate network service for the given NSR Id
4056 """
4057
4058 # Terminate the instances/networks assocaited with this nw service
4059 self._log.debug("Terminating the network service %s", nsr_id)
4060 yield from self._nsrs[nsr_id].terminate()
4061
4062 # Unref the NSD
4063 yield from self.nsd_unref_by_nsr_id(nsr_id)
4064
4065 # Unpublish the NSR record
4066 self._log.debug("Unpublishing the network service %s", nsr_id)
4067 yield from self._nsrs[nsr_id].unpublish(xact)
4068
4069 # Finaly delete the NS instance from this NS Manager
4070 self._log.debug("Deletng the network service %s", nsr_id)
4071 self.delete_nsr(nsr_id)
4072
4073
4074 class NsmRecordsPublisherProxy(object):
4075 """ This class provides a publisher interface that allows plugin objects
4076 to publish NSR/VNFR/VLR"""
4077
4078 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4079 self._dts = dts
4080 self._log = log
4081 self._loop = loop
4082 self._nsr_pub_hdlr = nsr_pub_hdlr
4083 self._vlr_pub_hdlr = vlr_pub_hdlr
4084 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4085
4086 @asyncio.coroutine
4087 def publish_nsr(self, xact, nsr):
4088 """ Publish an NSR """
4089 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4090 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4091
4092 @asyncio.coroutine
4093 def unpublish_nsr(self, xact, nsr):
4094 """ Unpublish an NSR """
4095 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4096 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4097
4098 @asyncio.coroutine
4099 def publish_vnfr(self, xact, vnfr):
4100 """ Publish an VNFR """
4101 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4102 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4103
4104 @asyncio.coroutine
4105 def unpublish_vnfr(self, xact, vnfr):
4106 """ Unpublish a VNFR """
4107 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4108 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4109
4110 @asyncio.coroutine
4111 def publish_vlr(self, xact, vlr):
4112 """ Publish a VLR """
4113 path = VirtualLinkRecord.vlr_xpath(vlr)
4114 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4115
4116 @asyncio.coroutine
4117 def unpublish_vlr(self, xact, vlr):
4118 """ Unpublish a VLR """
4119 path = VirtualLinkRecord.vlr_xpath(vlr)
4120 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4121
4122
4123 class ScalingRpcHandler(mano_dts.DtsHandler):
4124 """ The Network service Monitor DTS handler """
4125 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4126 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4127
4128 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4129 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4130
4131 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4132
4133 def __init__(self, log, dts, loop, callback=None):
4134 super().__init__(log, dts, loop)
4135 self.callback = callback
4136 self.last_instance_id = defaultdict(int)
4137
4138 @asyncio.coroutine
4139 def register(self):
4140
4141 @asyncio.coroutine
4142 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4143 assert action == rwdts.QueryAction.RPC
4144
4145 try:
4146 if self.callback:
4147 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4148
4149 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4150 "instance_id": msg.instance_id})
4151
4152 xact_info.respond_xpath(
4153 rwdts.XactRspCode.ACK,
4154 self.__class__.SCALE_IN_OUTPUT_XPATH,
4155 rpc_op)
4156
4157 except Exception as e:
4158 self.log.exception(e)
4159 xact_info.respond_xpath(
4160 rwdts.XactRspCode.NACK,
4161 self.__class__.SCALE_IN_OUTPUT_XPATH)
4162
4163 @asyncio.coroutine
4164 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4165 assert action == rwdts.QueryAction.RPC
4166
4167 try:
4168 scaling_group = msg.scaling_group_name_ref
4169 if not msg.instance_id:
4170 last_instance_id = self.last_instance_id[scale_group]
4171 msg.instance_id = last_instance_id + 1
4172 self.last_instance_id[scale_group] += 1
4173
4174 if self.callback:
4175 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4176
4177 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4178 "instance_id": msg.instance_id})
4179
4180 xact_info.respond_xpath(
4181 rwdts.XactRspCode.ACK,
4182 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4183 rpc_op)
4184
4185 except Exception as e:
4186 self.log.exception(e)
4187 xact_info.respond_xpath(
4188 rwdts.XactRspCode.NACK,
4189 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4190
4191 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4192 on_prepare=on_scale_in_prepare)
4193 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4194 on_prepare=on_scale_out_prepare)
4195
4196 with self.dts.group_create() as group:
4197 group.register(
4198 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4199 handler=scale_in_hdl,
4200 flags=rwdts.Flag.PUBLISHER)
4201 group.register(
4202 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4203 handler=scale_out_hdl,
4204 flags=rwdts.Flag.PUBLISHER)
4205
4206
4207 class NsmTasklet(rift.tasklets.Tasklet):
4208 """
4209 The network service manager tasklet
4210 """
4211 def __init__(self, *args, **kwargs):
4212 super(NsmTasklet, self).__init__(*args, **kwargs)
4213 self.rwlog.set_category("rw-mano-log")
4214 self.rwlog.set_subcategory("nsm")
4215
4216 self._dts = None
4217 self._nsm = None
4218
4219 self._ro_plugin_selector = None
4220 self._vnffgmgr = None
4221
4222 self._nsr_handler = None
4223 self._vnfr_pub_handler = None
4224 self._vlr_pub_handler = None
4225 self._vnfd_pub_handler = None
4226 self._scale_cfg_handler = None
4227
4228 self._records_publisher_proxy = None
4229
4230 def start(self):
4231 """ The task start callback """
4232 super(NsmTasklet, self).start()
4233 self.log.info("Starting NsmTasklet")
4234
4235 self.log.debug("Registering with dts")
4236 self._dts = rift.tasklets.DTS(self.tasklet_info,
4237 RwNsmYang.get_schema(),
4238 self.loop,
4239 self.on_dts_state_change)
4240
4241 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4242
4243 def stop(self):
4244 try:
4245 self._dts.deinit()
4246 except Exception:
4247 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4248 raise
4249
4250 def on_instance_started(self):
4251 """ Task instance started callback """
4252 self.log.debug("Got instance started callback")
4253
4254 @asyncio.coroutine
4255 def init(self):
4256 """ Task init callback """
4257 self.log.debug("Got instance started callback")
4258
4259 self.log.debug("creating config account handler")
4260
4261 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4262 yield from self._nsr_pub_handler.register()
4263
4264 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4265 yield from self._vnfr_pub_handler.register()
4266
4267 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4268 yield from self._vlr_pub_handler.register()
4269
4270 manifest = self.tasklet_info.get_pb_manifest()
4271 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4272 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4273 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4274
4275 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4276
4277 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4278 self._dts,
4279 self.log,
4280 self.loop,
4281 self._nsr_pub_handler,
4282 self._vnfr_pub_handler,
4283 self._vlr_pub_handler,
4284 )
4285
4286 # Register the NSM to receive the nsm plugin
4287 # when cloud account is configured
4288 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4289 self._dts,
4290 self.log,
4291 self.loop,
4292 self._records_publisher_proxy,
4293 )
4294 yield from self._ro_plugin_selector.register()
4295
4296 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4297 self._log,
4298 self._dts,
4299 self.log_hdl)
4300
4301 yield from self._cloud_account_handler.register()
4302
4303 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4304 yield from self._vnffgmgr.register()
4305
4306 self._nsm = NsManager(
4307 self._dts,
4308 self.log,
4309 self.loop,
4310 self._nsr_pub_handler,
4311 self._vnfr_pub_handler,
4312 self._vlr_pub_handler,
4313 self._ro_plugin_selector,
4314 self._vnffgmgr,
4315 self._vnfd_pub_handler,
4316 self._cloud_account_handler
4317 )
4318
4319 yield from self._nsm.register()
4320
4321 @asyncio.coroutine
4322 def run(self):
4323 """ Task run callback """
4324 pass
4325
4326 @asyncio.coroutine
4327 def on_dts_state_change(self, state):
4328 """Take action according to current dts state to transition
4329 application into the corresponding application state
4330
4331 Arguments
4332 state - current dts state
4333 """
4334 switch = {
4335 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4336 rwdts.State.CONFIG: rwdts.State.RUN,
4337 }
4338
4339 handlers = {
4340 rwdts.State.INIT: self.init,
4341 rwdts.State.RUN: self.run,
4342 }
4343
4344 # Transition application to next state
4345 handler = handlers.get(state, None)
4346 if handler is not None:
4347 yield from handler()
4348
4349 # Transition dts to next state
4350 next_state = switch.get(state, None)
4351 if next_state is not None:
4352 self.log.debug("Changing state to %s", next_state)
4353 self._dts.handle.set_state(next_state)