RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28
29
30 from collections import deque
31 from collections import defaultdict
32 from enum import Enum
33
34 import gi
35 gi.require_version('RwYang', '1.0')
36 gi.require_version('RwNsdYang', '1.0')
37 gi.require_version('RwDts', '1.0')
38 gi.require_version('RwNsmYang', '1.0')
39 gi.require_version('RwNsrYang', '1.0')
40 gi.require_version('RwTypes', '1.0')
41 gi.require_version('RwVlrYang', '1.0')
42 gi.require_version('RwVnfrYang', '1.0')
43 from gi.repository import (
44 RwYang,
45 RwNsrYang,
46 NsrYang,
47 NsdYang,
48 RwVlrYang,
49 VnfrYang,
50 RwVnfrYang,
51 RwNsmYang,
52 RwsdnYang,
53 RwDts as rwdts,
54 RwTypes,
55 ProtobufC,
56 )
57
58 import rift.tasklets
59 import rift.mano.ncclient
60 import rift.mano.config_data.config
61 import rift.mano.dts as mano_dts
62
63 from . import rwnsm_conman as conman
64 from . import cloud
65 from . import publisher
66 from . import xpath
67 from . import config_value_pool
68 from . import rwvnffgmgr
69 from . import scale_group
70
71
72 class NetworkServiceRecordState(Enum):
73 """ Network Service Record State """
74 INIT = 101
75 VL_INIT_PHASE = 102
76 VNF_INIT_PHASE = 103
77 VNFFG_INIT_PHASE = 104
78 RUNNING = 106
79 SCALING_OUT = 107
80 SCALING_IN = 108
81 TERMINATE = 109
82 TERMINATE_RCVD = 110
83 VL_TERMINATE_PHASE = 111
84 VNF_TERMINATE_PHASE = 112
85 VNFFG_TERMINATE_PHASE = 113
86 TERMINATED = 114
87 FAILED = 115
88 VL_INSTANTIATE = 116
89 VL_TERMINATE = 117
90
91
92 class NetworkServiceRecordError(Exception):
93 """ Network Service Record Error """
94 pass
95
96
97 class NetworkServiceDescriptorError(Exception):
98 """ Network Service Descriptor Error """
99 pass
100
101
102 class VirtualNetworkFunctionRecordError(Exception):
103 """ Virtual Network Function Record Error """
104 pass
105
106
107 class NetworkServiceDescriptorNotFound(Exception):
108 """ Cannot find Network Service Descriptor"""
109 pass
110
111
112 class NetworkServiceDescriptorRefCountExists(Exception):
113 """ Network Service Descriptor reference count exists """
114 pass
115
116
117 class NetworkServiceDescriptorUnrefError(Exception):
118 """ Failed to unref a network service descriptor """
119 pass
120
121
122 class NsrInstantiationFailed(Exception):
123 """ Failed to instantiate network service """
124 pass
125
126
127 class VnfInstantiationFailed(Exception):
128 """ Failed to instantiate virtual network function"""
129 pass
130
131
132 class VnffgInstantiationFailed(Exception):
133 """ Failed to instantiate virtual network function"""
134 pass
135
136
137 class VnfDescriptorError(Exception):
138 """Failed to instantiate virtual network function"""
139 pass
140
141
142 class ScalingOperationError(Exception):
143 pass
144
145
146 class ScaleGroupMissingError(Exception):
147 pass
148
149
150 class PlacementGroupError(Exception):
151 pass
152
153
154 class NsrNsdUpdateError(Exception):
155 pass
156
157
158 class NsrVlUpdateError(NsrNsdUpdateError):
159 pass
160
161
162 class VlRecordState(Enum):
163 """ VL Record State """
164 INIT = 101
165 INSTANTIATION_PENDING = 102
166 ACTIVE = 103
167 TERMINATE_PENDING = 104
168 TERMINATED = 105
169 FAILED = 106
170
171
172 class VnffgRecordState(Enum):
173 """ VNFFG Record State """
174 INIT = 101
175 INSTANTIATION_PENDING = 102
176 ACTIVE = 103
177 TERMINATE_PENDING = 104
178 TERMINATED = 105
179 FAILED = 106
180
181
182 class VnffgRecord(object):
183 """ Vnffg Records class"""
184 SFF_DP_PORT = 4790
185 SFF_MGMT_PORT = 5000
186 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
187
188 self._dts = dts
189 self._log = log
190 self._loop = loop
191 self._vnffgmgr = vnffgmgr
192 self._nsr = nsr
193 self._nsr_name = nsr_name
194 self._vnffgd_msg = vnffgd_msg
195 if sdn_account_name is None:
196 self._sdn_account_name = ''
197 else:
198 self._sdn_account_name = sdn_account_name
199
200 self._vnffgr_id = str(uuid.uuid4())
201 self._vnffgr_rsp_id = list()
202 self._vnffgr_state = VnffgRecordState.INIT
203
204 @property
205 def id(self):
206 """ VNFFGR id """
207 return self._vnffgr_id
208
209 @property
210 def state(self):
211 """ state of this VNF """
212 return self._vnffgr_state
213
214 def fetch_vnffgr(self):
215 """
216 Get VNFFGR message to be published
217 """
218
219 if self._vnffgr_state == VnffgRecordState.INIT:
220 vnffgr_dict = {"id": self._vnffgr_id,
221 "nsd_id": self._nsr.nsd_id,
222 "vnffgd_id_ref": self._vnffgd_msg.id,
223 "vnffgd_name_ref": self._vnffgd_msg.name,
224 "sdn_account": self._sdn_account_name,
225 "operational_status": 'init',
226 }
227 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
228 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
229 vnffgr_dict = {"id": self._vnffgr_id,
230 "nsd_id": self._nsr.nsd_id,
231 "vnffgd_id_ref": self._vnffgd_msg.id,
232 "vnffgd_name_ref": self._vnffgd_msg.name,
233 "sdn_account": self._sdn_account_name,
234 "operational_status": 'terminated',
235 }
236 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
237 else:
238 try:
239 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
240 except Exception:
241 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
242 self._vnffgr_state = VnffgRecordState.FAILED
243 vnffgr_dict = {"id": self._vnffgr_id,
244 "nsd_id": self._nsr.nsd_id,
245 "vnffgd_id_ref": self._vnffgd_msg.id,
246 "vnffgd_name_ref": self._vnffgd_msg.name,
247 "sdn_account": self._sdn_account_name,
248 "operational_status": 'failed',
249 }
250 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
251
252 return vnffgr
253
254 @asyncio.coroutine
255 def vnffgr_create_msg(self):
256 """ Virtual Link Record message for Creating VLR in VNS """
257 vnffgr_dict = {"id": self._vnffgr_id,
258 "nsd_id": self._nsr.nsd_id,
259 "vnffgd_id_ref": self._vnffgd_msg.id,
260 "vnffgd_name_ref": self._vnffgd_msg.name,
261 "sdn_account": self._sdn_account_name,
262 }
263 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
264 for rsp in self._vnffgd_msg.rsp:
265 vnffgr_rsp = vnffgr.rsp.add()
266 vnffgr_rsp.id = str(uuid.uuid4())
267 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
268 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
269 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
270 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
271 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
272 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
273 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
274 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
275 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
276 else:
277 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
278 continue
279
280 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
281 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
282 vnfr_cp_ref.hop_number = rsp_cp_ref.order
283 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
284 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
285 for nsr_vnfr in self._nsr.vnfrs.values():
286 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
287 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
288 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
289 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
290 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
291
292 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
293 self._log.debug(" Received VNFR is %s", vnfr)
294 while vnfr.operational_status != 'running':
295 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
296 if vnfr.operational_status == 'failed':
297 self._log.error("Fetching VNFR for %s failed", vnfr.id)
298 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
299 yield from asyncio.sleep(2, loop=self._loop)
300 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
301 self._log.debug("Received VNFR is %s", vnfr)
302
303 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
304 for cp in vnfr.connection_point:
305 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
306 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
307 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
308 for vdu in vnfr.vdur:
309 for ext_intf in vdu.external_interface:
310 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
311 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
312 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
313 vnfr_cp_ref.connection_point_params.vm_id)
314 break
315
316 vnfr_cp_ref.connection_point_params.address = cp.ip_address
317 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
318
319 for vnffgd_classifier in self._vnffgd_msg.classifier:
320 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
321 if len(_rsp) > 0:
322 rsp_id_ref = _rsp[0].id
323 rsp_name = _rsp[0].name
324 else:
325 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
326 continue
327 vnffgr_classifier = vnffgr.classifier.add()
328 vnffgr_classifier.id = vnffgd_classifier.id
329 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
330 _rsp[0].classifier_name = vnffgr_classifier.name
331 vnffgr_classifier.rsp_id_ref = rsp_id_ref
332 vnffgr_classifier.rsp_name = rsp_name
333 for nsr_vnfr in self._nsr.vnfrs.values():
334 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
335 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
336 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
337 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
338 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
339
340 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
341 vnffgr_classifier.sff_name = nsr_vnfr.name
342
343 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
344 self._log.debug(" Received VNFR is %s", vnfr)
345 while vnfr.operational_status != 'running':
346 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
347 if vnfr.operational_status == 'failed':
348 self._log.error("Fetching VNFR for %s failed", vnfr.id)
349 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
350 yield from asyncio.sleep(2, loop=self._loop)
351 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
352 self._log.debug("Received VNFR is %s", vnfr)
353
354 for cp in vnfr.connection_point:
355 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
356 vnffgr_classifier.port_id = cp.connection_point_id
357 vnffgr_classifier.ip_address = cp.ip_address
358 for vdu in vnfr.vdur:
359 for ext_intf in vdu.external_interface:
360 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
361 vnffgr_classifier.vm_id = vdu.vim_id
362 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
363 vnfr_cp_ref.connection_point_params.vm_id)
364 break
365
366 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
367 return vnffgr
368
369 @asyncio.coroutine
370 def vnffgr_nsr_sff_list(self):
371 """ SFF List for VNFR """
372 sff_list = {}
373 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
374
375 for nsr_vnfr in self._nsr.vnfrs.values():
376 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug(" Received VNFR is %s", vnfr)
379 while vnfr.operational_status != 'running':
380 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
381 if vnfr.operational_status == 'failed':
382 self._log.error("Fetching VNFR for %s failed", vnfr.id)
383 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
384 yield from asyncio.sleep(2, loop=self._loop)
385 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
386 self._log.debug("Received VNFR is %s", vnfr)
387
388 sff = RwsdnYang.VNFFGSff()
389 sff_list[nsr_vnfr.vnfd.id] = sff
390 sff.name = nsr_vnfr.name
391 sff.function_type = nsr_vnfr.vnfd.service_function_chain
392
393 sff.mgmt_address = vnfr.mgmt_interface.ip_address
394 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
395 for cp in vnfr.connection_point:
396 sff_dp = sff.dp_endpoints.add()
397 sff_dp.name = self._nsr.name + '.' + cp.name
398 sff_dp.address = cp.ip_address
399 sff_dp.port = VnffgRecord.SFF_DP_PORT
400 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
401 for sf_name in sf_list:
402 _sf = sff.vnfr_list.add()
403 _sf.vnfr_name = sf_name
404
405 return sff_list
406
407 @asyncio.coroutine
408 def instantiate(self):
409 """ Instantiate this VNFFG """
410
411 self._log.info("Instaniating VNFFGR with vnffgd %s",
412 self._vnffgd_msg)
413
414
415 vnffgr_request = yield from self.vnffgr_create_msg()
416 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
417
418 try:
419 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
420 except Exception as e:
421 self._log.exception("VNFFG instantiation failed: %s", str(e))
422 self._vnffgr_state = VnffgRecordState.FAILED
423 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
424
425 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
426
427 self._log.info("Instantiated VNFFGR :%s", vnffgr)
428 self._vnffgr_state = VnffgRecordState.ACTIVE
429
430 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
431 yield from self._nsr.update_state()
432
433 def vnffgr_in_vnffgrm(self):
434 """ Is there a VNFR record in VNFM """
435 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
436 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
437 self._vnffgr_state == VnffgRecordState.FAILED):
438 return True
439
440 return False
441
442 @asyncio.coroutine
443 def terminate(self):
444 """ Terminate this VNFFGR """
445 if not self.vnffgr_in_vnffgrm():
446 self._log.error("Ignoring terminate request for id %s in state %s",
447 self.id, self._vnffgr_state)
448 return
449
450 self._log.info("Terminating VNFFGR id:%s", self.id)
451 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
452
453 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
454
455 self._vnffgr_state = VnffgRecordState.TERMINATED
456 self._log.debug("Terminated VNFFGR id:%s", self.id)
457
458
459 class VirtualLinkRecord(object):
460 """ Virtual Link Records class"""
461 @staticmethod
462 @asyncio.coroutine
463 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, ip_profile, nsr_id, restart_mode=False):
464 """Creates a new VLR object based on the given data.
465
466 If restart mode is enabled, then we look for existing records in the
467 DTS and create a VLR records using the exiting data(ID)
468
469 Returns:
470 VirtualLinkRecord
471 """
472 vlr_obj = VirtualLinkRecord(
473 dts,
474 log,
475 loop,
476 nsr_name,
477 vld_msg,
478 cloud_account_name,
479 ip_profile,
480 nsr_id,
481 )
482
483 if restart_mode:
484 res_iter = yield from dts.query_read(
485 "D,/vlr:vlr-catalog/vlr:vlr",
486 rwdts.XactFlag.MERGE)
487
488 for fut in res_iter:
489 response = yield from fut
490 vlr = response.result
491
492 # Check if the record is already present, if so use the ID of
493 # the existing record. Since the name of the record is uniquely
494 # formed we can use it as a search key!
495 if vlr.name == vlr_obj.name:
496 vlr_obj.reset_id(vlr.id)
497 break
498
499 return vlr_obj
500
501 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, ip_profile, nsr_id):
502 self._dts = dts
503 self._log = log
504 self._loop = loop
505 self._nsr_name = nsr_name
506 self._vld_msg = vld_msg
507 self._cloud_account_name = cloud_account_name
508 self._assigned_subnet = None
509 self._nsr_id = nsr_id
510 self._ip_profile = ip_profile
511 self._vlr_id = str(uuid.uuid4())
512 self._state = VlRecordState.INIT
513 self._prev_state = None
514
515 @property
516 def xpath(self):
517 """ path for this object """
518 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
519
520 @property
521 def id(self):
522 """ VLR id """
523 return self._vlr_id
524
525 @property
526 def nsr_name(self):
527 """ Get NSR name for this VL """
528 return self.nsr_name
529
530 @property
531 def vld_msg(self):
532 """ Virtual Link Desciptor """
533 return self._vld_msg
534
535 @property
536 def assigned_subnet(self):
537 """ Subnet assigned to this VL"""
538 return self._assigned_subnet
539
540 @property
541 def name(self):
542 """
543 Get the name for this VLR.
544 VLR name is "nsr name:VLD name"
545 """
546 if self.vld_msg.vim_network_name:
547 return self.vld_msg.vim_network_name
548 elif self.vld_msg.name == "multisite":
549 # This is a temporary hack to identify manually provisioned inter-site network
550 return self.vld_msg.name
551 else:
552 return self._nsr_name + "." + self.vld_msg.name
553
554 @property
555 def cloud_account_name(self):
556 """ Cloud account that this VLR should be created in """
557 return self._cloud_account_name
558
559 @staticmethod
560 def vlr_xpath(vlr):
561 """ Get the VLR path from VLR """
562 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
563
564 @property
565 def state(self):
566 """ VLR state """
567 return self._state
568
569 @state.setter
570 def state(self, value):
571 """ VLR set state """
572 self._state = value
573
574 @property
575 def prev_state(self):
576 """ VLR previous state """
577 return self._prev_state
578
579 @prev_state.setter
580 def prev_state(self, value):
581 """ VLR set previous state """
582 self._prev_state = value
583
584 @property
585 def vlr_msg(self):
586 """ Virtual Link Record message for Creating VLR in VNS """
587 vld_fields = ["short_name",
588 "vendor",
589 "description",
590 "version",
591 "type_yang",
592 "vim_network_name",
593 "provider_network"]
594
595 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
596 if k in vld_fields}
597
598 vlr_dict = {"id": self._vlr_id,
599 "nsr_id_ref": self._nsr_id,
600 "vld_ref": self.vld_msg.id,
601 "name": self.name,
602 "cloud_account": self.cloud_account_name,
603 }
604
605 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
606 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
607
608 vlr_dict.update(vld_copy_dict)
609 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
610 return vlr
611
612 def reset_id(self, vlr_id):
613 self._vlr_id = vlr_id
614
615 def create_nsr_vlr_msg(self, vnfrs):
616 """ The VLR message"""
617 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
618 nsr_vlr.vlr_ref = self._vlr_id
619 nsr_vlr.assigned_subnet = self.assigned_subnet
620 nsr_vlr.cloud_account = self.cloud_account_name
621
622 for conn in self.vld_msg.vnfd_connection_point_ref:
623 for vnfr in vnfrs:
624 if (vnfr.vnfd.id == conn.vnfd_id_ref and
625 vnfr.member_vnf_index == conn.member_vnf_index_ref and
626 self.cloud_account_name == vnfr.cloud_account_name):
627 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
628 cp_entry.vnfr_id = vnfr.id
629 cp_entry.connection_point = conn.vnfd_connection_point_ref
630
631 return nsr_vlr
632
633 @asyncio.coroutine
634 def instantiate(self):
635 """ Instantiate this VL """
636
637 self._log.debug("Instaniating VLR key %s, vld %s",
638 self.xpath, self._vld_msg)
639 vlr = None
640 self._state = VlRecordState.INSTANTIATION_PENDING
641 self._log.debug("Executing VL create path:%s msg:%s",
642 self.xpath, self.vlr_msg)
643
644 with self._dts.transaction(flags=0) as xact:
645 block = xact.block_create()
646 block.add_query_create(self.xpath, self.vlr_msg)
647 self._log.debug("Executing VL create path:%s msg:%s",
648 self.xpath, self.vlr_msg)
649 res_iter = yield from block.execute(now=True)
650 for ent in res_iter:
651 res = yield from ent
652 vlr = res.result
653
654 if vlr is None:
655 self._state = VlRecordState.FAILED
656 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
657
658 if vlr.operational_status == 'failed':
659 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
660 self._state = VlRecordState.FAILED
661 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
662
663 self._log.info("Instantiated VL with xpath %s and vlr:%s",
664 self.xpath, vlr)
665 self._state = VlRecordState.ACTIVE
666 self._assigned_subnet = vlr.assigned_subnet
667
668 def vlr_in_vns(self):
669 """ Is there a VLR record in VNS """
670 if (self._state == VlRecordState.ACTIVE or
671 self._state == VlRecordState.INSTANTIATION_PENDING or
672 self._state == VlRecordState.TERMINATE_PENDING or
673 self._state == VlRecordState.FAILED):
674 return True
675
676 return False
677
678 @asyncio.coroutine
679 def terminate(self):
680 """ Terminate this VL """
681 if not self.vlr_in_vns():
682 self._log.debug("Ignoring terminate request for id %s in state %s",
683 self.id, self._state)
684 return
685
686 self._log.debug("Terminating VL id:%s", self.id)
687 self._state = VlRecordState.TERMINATE_PENDING
688
689 with self._dts.transaction(flags=0) as xact:
690 block = xact.block_create()
691 block.add_query_delete(self.xpath)
692 yield from block.execute(flags=0, now=True)
693
694 self._state = VlRecordState.TERMINATED
695 self._log.debug("Terminated VL id:%s", self.id)
696
697
698 class VnfRecordState(Enum):
699 """ Vnf Record State """
700 INIT = 101
701 INSTANTIATION_PENDING = 102
702 ACTIVE = 103
703 TERMINATE_PENDING = 104
704 TERMINATED = 105
705 FAILED = 106
706
707
708 class VirtualNetworkFunctionRecord(object):
709 """ Virtual Network Function Record class"""
710 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
711
712 @staticmethod
713 @asyncio.coroutine
714 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
715 cloud_account_name, nsr_id, group_name, group_instance_id,
716 placement_groups, restart_mode=False):
717 """Creates a new VNFR object based on the given data.
718
719 If restart mode is enabled, then we look for existing records in the
720 DTS and create a VNFR records using the exiting data(ID)
721
722 Returns:
723 VirtualNetworkFunctionRecord
724 """
725 vnfr_obj = VirtualNetworkFunctionRecord(
726 dts,
727 log,
728 loop,
729 vnfd,
730 const_vnfd_msg,
731 nsd_id,
732 nsr_name,
733 cloud_account_name,
734 nsr_id,
735 group_name,
736 group_instance_id,
737 placement_groups,
738 restart_mode=restart_mode)
739
740 if restart_mode:
741 res_iter = yield from dts.query_read(
742 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
743 rwdts.XactFlag.MERGE)
744
745 for fut in res_iter:
746 response = yield from fut
747 vnfr = response.result
748
749 if vnfr.name == vnfr_obj.name:
750 vnfr_obj.reset_id(vnfr.id)
751 break
752
753 return vnfr_obj
754
755 def __init__(self,
756 dts,
757 log,
758 loop,
759 vnfd,
760 const_vnfd_msg,
761 nsd_id,
762 nsr_name,
763 cloud_account_name,
764 nsr_id,
765 group_name=None,
766 group_instance_id=None,
767 placement_groups = [],
768 restart_mode = False):
769 self._dts = dts
770 self._log = log
771 self._loop = loop
772 self._vnfd = vnfd
773 self._const_vnfd_msg = const_vnfd_msg
774 self._nsd_id = nsd_id
775 self._nsr_name = nsr_name
776 self._nsr_id = nsr_id
777 self._cloud_account_name = cloud_account_name
778 self._group_name = group_name
779 self._group_instance_id = group_instance_id
780 self._placement_groups = placement_groups
781 self._config_status = NsrYang.ConfigStates.INIT
782
783 self._prev_state = VnfRecordState.INIT
784 self._state = VnfRecordState.INIT
785 self._state_failed_reason = None
786
787 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
788 self.configure()
789
790 self._vnfr_id = str(uuid.uuid4())
791 self._name = None
792 self._vnfr_msg = self.create_vnfr_msg()
793 self._log.debug("Set VNFR {} config type to {}".
794 format(self.name, self.config_type))
795 self.restart_mode = restart_mode
796
797
798 if group_name is None and group_instance_id is not None:
799 raise ValueError("Group instance id must not be provided with an empty group name")
800
801 @property
802 def id(self):
803 """ VNFR id """
804 return self._vnfr_id
805
806 @property
807 def xpath(self):
808 """ VNFR xpath """
809 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
810
811 @property
812 def vnfr_msg(self):
813 """ VNFR message """
814 return self._vnfr_msg
815
816 @property
817 def const_vnfr_msg(self):
818 """ VNFR message """
819 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name)
820
821 @property
822 def vnfd(self):
823 """ vnfd """
824 return self._vnfd
825
826 @property
827 def cloud_account_name(self):
828 """ Cloud account that this VNF should be created in """
829 return self._cloud_account_name
830
831
832 @property
833 def active(self):
834 """ Is this VNF actve """
835 return True if self._state == VnfRecordState.ACTIVE else False
836
837 @property
838 def state(self):
839 """ state of this VNF """
840 return self._state
841
842 @property
843 def state_failed_reason(self):
844 """ Error message in case this VNF is in failed state """
845 return self._state_failed_reason
846
847 @property
848 def member_vnf_index(self):
849 """ Member VNF index """
850 return self._const_vnfd_msg.member_vnf_index
851
852 @property
853 def nsr_name(self):
854 """ NSR name"""
855 return self._nsr_name
856
857 @property
858 def name(self):
859 """ Name of this VNFR """
860 if self._name is not None:
861 return self._name
862
863 name_tags = [self._nsr_name]
864
865 if self._group_name is not None:
866 name_tags.append(self._group_name)
867
868 if self._group_instance_id is not None:
869 name_tags.append(str(self._group_instance_id))
870
871 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
872
873 self._name = "__".join(name_tags)
874
875 return self._name
876
877 @staticmethod
878 def vnfr_xpath(vnfr):
879 """ Get the VNFR path from VNFR """
880 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
881
882 @property
883 def config_type(self):
884 cfg_types = ['netconf', 'juju', 'script']
885 for method in cfg_types:
886 if self._vnfd.vnf_configuration.has_field(method):
887 return method
888 return 'none'
889
890 @property
891 def config_status(self):
892 """Return the config status as YANG ENUM string"""
893 self._log.debug("Map VNFR {} config status {} ({})".
894 format(self.name, self._config_status, self.config_type))
895 if self.config_type == 'none':
896 return 'config_not_needed'
897 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
898 return 'configured'
899 elif self._config_status == NsrYang.ConfigStates.FAILED:
900 return 'failed'
901
902 return 'configuring'
903
904 def set_state(self, state):
905 """ set the state of this object """
906 self._prev_state = self._state
907 self._state = state
908
909 def reset_id(self, vnfr_id):
910 self._vnfr_id = vnfr_id
911 self._vnfr_msg = self.create_vnfr_msg()
912
913 def configure(self):
914 self.config_store.merge_vnfd_config(
915 self._nsd_id,
916 self._vnfd,
917 self.member_vnf_index,
918 )
919
920 def create_vnfr_msg(self):
921 """ VNFR message for this VNFR """
922 vnfd_fields = [
923 "short_name",
924 "vendor",
925 "description",
926 "version",
927 "type_yang",
928 ]
929 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
930 vnfr_dict = {
931 "id": self.id,
932 "nsr_id_ref": self._nsr_id,
933 "vnfd_ref": self.vnfd.id,
934 "name": self.name,
935 "cloud_account": self._cloud_account_name,
936 "config_status": self.config_status
937 }
938 vnfr_dict.update(vnfd_copy_dict)
939
940 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
941 vnfr.member_vnf_index_ref = self.member_vnf_index
942 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
943
944 if self._vnfd.mgmt_interface.has_field("port"):
945 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
946
947 for group_info in self._placement_groups:
948 group = vnfr.placement_groups_info.add()
949 group.from_dict(group_info.as_dict())
950
951 # UI expects the monitoring param field to exist
952 vnfr.monitoring_param = []
953
954 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
955 return vnfr
956
957 @asyncio.coroutine
958 def update_vnfm(self):
959 self._log.debug("Send an update to VNFM for VNFR {} with {}".
960 format(self.name, self.vnfr_msg))
961 yield from self._dts.query_update(
962 self.xpath,
963 rwdts.XactFlag.TRACE,
964 self.vnfr_msg
965 )
966
967 def get_config_status(self):
968 """Return the config status as YANG ENUM"""
969 return self._config_status
970
971 @asyncio.coroutine
972 def set_config_status(self, status):
973
974 def status_to_string(status):
975 status_dc = {
976 NsrYang.ConfigStates.INIT : 'init',
977 NsrYang.ConfigStates.CONFIGURING : 'configuring',
978 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
979 NsrYang.ConfigStates.CONFIGURED : 'configured',
980 NsrYang.ConfigStates.FAILED : 'failed',
981 }
982
983 return status_dc[status]
984
985 self._log.debug("Update VNFR {} from {} ({}) to {}".
986 format(self.name, self._config_status,
987 self.config_type, status))
988 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
989 self._log.error("Updating already configured VNFR {}".
990 format(self.name))
991 return
992
993 if self._config_status != status:
994 try:
995 self._config_status = status
996 # I don't think this is used. Original implementor can check.
997 # Caused Exception, so corrected it by status_to_string
998 # But not sure whats the use of this variable?
999 self.vnfr_msg.config_status = status_to_string(status)
1000 except Exception as e:
1001 self._log.error("Exception=%s", str(e))
1002 pass
1003
1004 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1005
1006 if self._config_status != NsrYang.ConfigStates.INIT:
1007 try:
1008 # Publish only after VNFM has the VNFR created
1009 yield from self.update_vnfm()
1010 except Exception as e:
1011 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1012 format(status, self.name, e))
1013 self._log.exception(e)
1014
1015 def is_configured(self):
1016 if self.config_type == 'none':
1017 return True
1018
1019 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1020 return True
1021
1022 return False
1023
1024 @asyncio.coroutine
1025 def instantiate(self, nsr):
1026 """ Instantiate this VNFR"""
1027
1028 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1029 self.xpath, self._vnfd)
1030
1031 self._log.debug("Create VNF with xpath %s and vnfr %s",
1032 self.xpath, self.vnfr_msg)
1033
1034 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1035
1036 def find_vlr_for_cp(conn):
1037 """ Find VLR for the given connection point """
1038 for vlr in nsr.vlrs:
1039 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1040 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1041 vnfd_cp.vnfd_connection_point_ref == conn.name and
1042 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1043 vlr.cloud_account_name == self.cloud_account_name):
1044 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1045 conn.name, self.member_vnf_index)
1046 return vlr
1047 return None
1048
1049 # For every connection point in the VNFD fill in the identifier
1050 for conn_p in self._vnfd.connection_point:
1051 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1052 cpr.name = conn_p.name
1053 cpr.type_yang = conn_p.type_yang
1054 vlr_ref = find_vlr_for_cp(conn_p)
1055 if vlr_ref is None:
1056 msg = "Failed to find VLR for cp = %s" % conn_p.name
1057 self._log.debug("%s", msg)
1058 # raise VirtualNetworkFunctionRecordError(msg)
1059 continue
1060
1061 cpr.vlr_ref = vlr_ref.id
1062 self.vnfr_msg.connection_point.append(cpr)
1063 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1064 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref)
1065
1066 if not self.restart_mode:
1067 yield from self._dts.query_create(self.xpath,
1068 0, # this is sub
1069 self.vnfr_msg)
1070 else:
1071 yield from self._dts.query_update(self.xpath,
1072 0,
1073 self.vnfr_msg)
1074
1075 self._log.info("Created VNF with xpath %s and vnfr %s",
1076 self.xpath, self.vnfr_msg)
1077
1078 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1079 self.xpath, self._vnfd, self.vnfr_msg)
1080
1081 @asyncio.coroutine
1082 def update_state(self, vnfr_msg):
1083 """ Update this VNFR"""
1084 if vnfr_msg.operational_status == "running":
1085 if self.vnfr_msg.operational_status != "running":
1086 yield from self.is_active()
1087 elif vnfr_msg.operational_status == "failed":
1088 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1089
1090 @asyncio.coroutine
1091 def is_active(self):
1092 """ This VNFR is active """
1093 self._log.debug("VNFR %s is active", self._vnfr_id)
1094 self.set_state(VnfRecordState.ACTIVE)
1095
1096 @asyncio.coroutine
1097 def instantiation_failed(self, failed_reason=None):
1098 """ This VNFR instantiation failed"""
1099 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1100 self.set_state(VnfRecordState.FAILED)
1101 self._state_failed_reason = failed_reason
1102
1103 def vnfr_in_vnfm(self):
1104 """ Is there a VNFR record in VNFM """
1105 if (self._state == VnfRecordState.ACTIVE or
1106 self._state == VnfRecordState.INSTANTIATION_PENDING or
1107 self._state == VnfRecordState.FAILED):
1108 return True
1109
1110 return False
1111
1112 @asyncio.coroutine
1113 def terminate(self):
1114 """ Terminate this VNF """
1115 if not self.vnfr_in_vnfm():
1116 self._log.debug("Ignoring terminate request for id %s in state %s",
1117 self.id, self._state)
1118 return
1119
1120 self._log.debug("Terminating VNF id:%s", self.id)
1121 self.set_state(VnfRecordState.TERMINATE_PENDING)
1122 with self._dts.transaction(flags=0) as xact:
1123 block = xact.block_create()
1124 block.add_query_delete(self.xpath)
1125 yield from block.execute(flags=0)
1126 self.set_state(VnfRecordState.TERMINATED)
1127 self._log.debug("Terminated VNF id:%s", self.id)
1128
1129
1130 class NetworkServiceStatus(object):
1131 """ A class representing the Network service's status """
1132 MAX_EVENTS_RECORDED = 10
1133 """ Network service Status class"""
1134 def __init__(self, dts, log, loop):
1135 self._dts = dts
1136 self._log = log
1137 self._loop = loop
1138
1139 self._state = NetworkServiceRecordState.INIT
1140 self._events = deque([])
1141
1142 @asyncio.coroutine
1143 def create_notification(self, evt, evt_desc, evt_details):
1144 xp = "N,/rw-nsr:nsm-notification"
1145 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1146 notif.event = evt
1147 notif.description = evt_desc
1148 notif.details = evt_details if evt_details is not None else None
1149
1150 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1151 self._log.info("Notification called by creating dts query: %s", notif)
1152
1153 def record_event(self, evt, evt_desc, evt_details):
1154 """ Record an event """
1155 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1156 evt, evt_desc, len(self._events))
1157 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1158 self._events.popleft()
1159 self._events.append((int(time.time()), evt, evt_desc,
1160 evt_details if evt_details is not None else None))
1161
1162 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1163
1164 def set_state(self, state):
1165 """ set the state of this status object """
1166 self._state = state
1167
1168 def yang_str(self):
1169 """ Return the state as a yang enum string """
1170 state_to_str_map = {"INIT": "init",
1171 "VL_INIT_PHASE": "vl_init_phase",
1172 "VNF_INIT_PHASE": "vnf_init_phase",
1173 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1174 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1175 "RUNNING": "running",
1176 "SCALING_OUT": "scaling_out",
1177 "SCALING_IN": "scaling_in",
1178 "TERMINATE_RCVD": "terminate_rcvd",
1179 "TERMINATE": "terminate",
1180 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1181 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1182 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1183 "TERMINATED": "terminated",
1184 "FAILED": "failed",
1185 "VL_INSTANTIATE": "vl_instantiate",
1186 "VL_TERMINATE": "vl_terminate",
1187 }
1188 return state_to_str_map[self._state.name]
1189
1190 @property
1191 def state(self):
1192 """ State of this status object """
1193 return self._state
1194
1195 @property
1196 def msg(self):
1197 """ Network Service Record as a message"""
1198 event_list = []
1199 idx = 1
1200 for entry in self._events:
1201 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1202 event.id = idx
1203 idx += 1
1204 event.timestamp, event.event, event.description, event.details = entry
1205 event_list.append(event)
1206 return event_list
1207
1208
1209 class NetworkServiceRecord(object):
1210 """ Network service record """
1211 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1212
1213 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, restart_mode=False):
1214 self._dts = dts
1215 self._log = log
1216 self._loop = loop
1217 self._nsm = nsm
1218 self._nsr_cfg_msg = nsr_cfg_msg
1219 self._nsm_plugin = nsm_plugin
1220 self._sdn_account_name = sdn_account_name
1221
1222 self._nsd = None
1223 self._nsr_msg = None
1224 self._nsr_regh = None
1225 self._vlrs = []
1226 self._vnfrs = {}
1227 self._vnfds = {}
1228 self._vnffgrs = {}
1229 self._param_pools = {}
1230 self._scaling_groups = {}
1231 self._create_time = int(time.time())
1232 self._op_status = NetworkServiceStatus(dts, log, loop)
1233 self._config_status = NsrYang.ConfigStates.CONFIGURING
1234 self._config_status_details = None
1235 self._job_id = 0
1236 self.restart_mode = restart_mode
1237 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1238 self._debug_running = False
1239 self._is_active = False
1240 self._vl_phase_completed = False
1241 self._vnf_phase_completed = False
1242
1243
1244 # Initalise the state to init
1245 # The NSR moves through the following transitions
1246 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1247 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1248 # 3. VNFS_READY - READY when the NSR is published
1249
1250 self.set_state(NetworkServiceRecordState.INIT)
1251
1252 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1253
1254 @property
1255 def nsm_plugin(self):
1256 """ NSM Plugin """
1257 return self._nsm_plugin
1258
1259 def set_state(self, state):
1260 """ Set state for this NSR"""
1261 self._log.debug("Setting state to %s", state)
1262 # We are in init phase and is moving to the next state
1263 # The new state could be a FAILED state or VNF_INIIT_PHASE
1264 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1265 self._vl_phase_completed = True
1266
1267 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1268 self._vnf_phase_completed = True
1269
1270 self._op_status.set_state(state)
1271
1272 @property
1273 def id(self):
1274 """ Get id for this NSR"""
1275 return self._nsr_cfg_msg.id
1276
1277 @property
1278 def name(self):
1279 """ Name of this network service record """
1280 return self._nsr_cfg_msg.name
1281
1282 @property
1283 def cloud_account_name(self):
1284 return self._nsr_cfg_msg.cloud_account
1285
1286 @property
1287 def state(self):
1288 """State of this NetworkServiceRecord"""
1289 return self._op_status.state
1290
1291 @property
1292 def active(self):
1293 """ Is this NSR active ?"""
1294 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1295
1296 @property
1297 def vlrs(self):
1298 """ VLRs associated with this NSR"""
1299 return self._vlrs
1300
1301 @property
1302 def vnfrs(self):
1303 """ VNFRs associated with this NSR"""
1304 return self._vnfrs
1305
1306 @property
1307 def vnffgrs(self):
1308 """ VNFFGRs associated with this NSR"""
1309 return self._vnffgrs
1310
1311 @property
1312 def scaling_groups(self):
1313 """ Scaling groups associated with this NSR """
1314 return self._scaling_groups
1315
1316 @property
1317 def param_pools(self):
1318 """ Parameter value pools associated with this NSR"""
1319 return self._param_pools
1320
1321 @property
1322 def nsr_cfg_msg(self):
1323 return self._nsr_cfg_msg
1324
1325 @nsr_cfg_msg.setter
1326 def nsr_cfg_msg(self, msg):
1327 self._nsr_cfg_msg = msg
1328
1329 @property
1330 def nsd_msg(self):
1331 """ NSD Protobuf for this NSR """
1332 if self._nsd is not None:
1333 return self._nsd
1334 self._nsd = self._nsr_cfg_msg.nsd
1335 return self._nsd
1336
1337 @property
1338 def nsd_id(self):
1339 """ NSD ID for this NSR """
1340 return self.nsd_msg.id
1341
1342 @property
1343 def job_id(self):
1344 ''' Get a new job id for config primitive'''
1345 self._job_id += 1
1346 return self._job_id
1347
1348 @property
1349 def config_status(self):
1350 """ Config status for NSR """
1351 return self._config_status
1352
1353 def resolve_placement_group_cloud_construct(self, input_group):
1354 """
1355 Returns the cloud specific construct for placement group
1356 """
1357 copy_dict = ['name', 'requirement', 'strategy']
1358
1359 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1360 if group_info.placement_group_ref == input_group.name:
1361 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1362 group_dict = {k:v for k,v in
1363 group_info.as_dict().items() if k != 'placement_group_ref'}
1364 for param in copy_dict:
1365 group_dict.update({param: getattr(input_group, param)})
1366 group.from_dict(group_dict)
1367 return group
1368 return None
1369
1370
1371 def __str__(self):
1372 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1373 self.name, self.nsd_id, self.cloud_account_name
1374 )
1375
1376 def _get_vnfd(self, vnfd_id, config_xact):
1377 """ Fetch vnfd msg for the passed vnfd id """
1378 return self._nsm.get_vnfd(vnfd_id, config_xact)
1379
1380 def _get_vnfd_cloud_account(self, vnfd_member_index):
1381 """ Fetch Cloud Account for the passed vnfd id """
1382 if self._nsr_cfg_msg.vnf_cloud_account_map:
1383 vim_accounts = [vnf.cloud_account for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1384 if vnfd_member_index == vnf.member_vnf_index_ref]
1385 if vim_accounts and vim_accounts[0]:
1386 return vim_accounts[0]
1387 return self.cloud_account_name
1388
1389 def _get_constituent_vnfd_msg(self, vnf_index):
1390 for const_vnfd in self.nsd_msg.constituent_vnfd:
1391 if const_vnfd.member_vnf_index == vnf_index:
1392 return const_vnfd
1393
1394 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1395
1396 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1397 """ Record an event """
1398 self._op_status.record_event(evt, evt_desc, evt_details)
1399 if state is not None:
1400 self.set_state(state)
1401
1402 def scaling_trigger_str(self, trigger):
1403 SCALING_TRIGGER_STRS = {
1404 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1405 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1406 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1407 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1408 }
1409 try:
1410 return SCALING_TRIGGER_STRS[trigger]
1411 except Exception as e:
1412 self._log.error("Scaling trigger mapping error for {} : {}".
1413 format(trigger, e))
1414 self._log.exception(e)
1415 return "Unknown trigger"
1416
1417 @asyncio.coroutine
1418 def instantiate_vls(self):
1419 """
1420 This function instantiates VLs for every VL in this Network Service
1421 """
1422 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1423 self.id)
1424 for vlr in self._vlrs:
1425 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1426 vlr.state = VlRecordState.ACTIVE
1427
1428 @asyncio.coroutine
1429 def create(self, config_xact):
1430 """ Create this network service"""
1431 # Create virtual links for all the external vnf
1432 # connection points in this NS
1433 yield from self.create_vls()
1434
1435 # Create VNFs in this network service
1436 yield from self.create_vnfs(config_xact)
1437
1438 # Create VNFFG for network service
1439 self.create_vnffgs()
1440
1441 # Create Scaling Groups for each scaling group in NSD
1442 self.create_scaling_groups()
1443
1444 # Create Parameter Pools
1445 self.create_param_pools()
1446
1447 @asyncio.coroutine
1448 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1449 """ Apply config based on script for scale group """
1450
1451 @asyncio.coroutine
1452 def add_vnfrs_data(vnfrs_list):
1453 """ Add as a dict each of the VNFRs data """
1454 vnfrs_data = []
1455 for vnfr in vnfrs_list:
1456 self._log.debug("Add VNFR {} data".format(vnfr))
1457 vnfr_data = dict()
1458 vnfr_data['name'] = vnfr.name
1459 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1460 # Get VNF management and other IPs, etc
1461 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1462 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1463 try:
1464 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1465 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1466 except Exception as e:
1467 self._log.error("Unable to get management IP for vnfr {}:{}".
1468 format(vnfr.name, e))
1469
1470 try:
1471 vnfr_data['connection_points'] = []
1472 for cp in opdata.connection_point:
1473 con_pt = dict()
1474 con_pt['name'] = cp.name
1475 con_pt['ip_address'] = cp.ip_address
1476 vnfr_data['connection_points'].append(con_pt)
1477 except Exception as e:
1478 self._log.error("Exception getting connections points for VNFR {}: {}".
1479 format(vnfr.name, e))
1480
1481 vnfrs_data.append(vnfr_data)
1482 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1483
1484 return vnfrs_data
1485
1486 def add_nsr_data(nsr):
1487 nsr_data = dict()
1488 nsr_data['name'] = nsr.name
1489 return nsr_data
1490
1491 if script is None or len(script) == 0:
1492 self._log.error("Script not provided for scale group config: {}".format(group.name))
1493 return False
1494
1495 if script[0] == '/':
1496 path = script
1497 else:
1498 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1499 if not os.path.exists(path):
1500 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1501 format(group.name, path))
1502 return False
1503
1504 # Build a YAML file with all parameters for the script to execute
1505 # The data consists of 5 sections
1506 # 1. Trigger
1507 # 2. Scale group config
1508 # 3. VNFRs in the scale group
1509 # 4. VNFRs outside scale group
1510 # 5. NSR data
1511 data = dict()
1512 data['trigger'] = group.trigger_map(trigger)
1513 data['config'] = group.group_msg.as_dict()
1514
1515 if vnfrs:
1516 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1517 else:
1518 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1519
1520 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1521 data["nsr"] = add_nsr_data(self)
1522
1523 tmp_file = None
1524 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1525 tmp_file.write(yaml.dump(data, default_flow_style=True)
1526 .encode("UTF-8"))
1527
1528 self._log.debug("Creating a temp file: {} with input data: {}".
1529 format(tmp_file.name, data))
1530
1531 cmd = "{} {}".format(path, tmp_file.name)
1532 self._log.debug("Running the CMD: {}".format(cmd))
1533 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1534 rc = yield from proc.wait()
1535 if rc:
1536 self._log.error("The script {} for scale group {} config returned: {}".
1537 format(script, group.name, rc))
1538 return False
1539
1540 # Success
1541 return True
1542
1543
1544 @asyncio.coroutine
1545 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1546 """ Apply the config for the scaling group based on trigger """
1547 if group is None or scale_instance is None:
1548 return False
1549
1550 @asyncio.coroutine
1551 def update_config_status(success=True, err_msg=None):
1552 self._log.debug("Update %s config status to %r : %s",
1553 scale_instance, success, err_msg)
1554 if (scale_instance.config_status == "failed"):
1555 # Do not update the config status if it is already in failed state
1556 return
1557
1558 if scale_instance.config_status == "configured":
1559 # Update only to failed state an already configured scale instance
1560 if not success:
1561 scale_instance.config_status = "failed"
1562 scale_instance.config_err_msg = err_msg
1563 yield from self.update_state()
1564 else:
1565 # We are in configuring state
1566 # Only after post scale out mark instance as configured
1567 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1568 if success:
1569 scale_instance.config_status = "configured"
1570 else:
1571 scale_instance.config_status = "failed"
1572 scale_instance.config_err_msg = err_msg
1573 yield from self.update_state()
1574
1575 config = group.trigger_config(trigger)
1576 if config is None:
1577 return True
1578
1579 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1580 if config.has_field("ns_config_primitive_name_ref"):
1581 config_name = config.ns_config_primitive_name_ref
1582 nsd_msg = self.nsd_msg
1583 config_primitive = None
1584 for ns_cfg_prim in nsd_msg.service_primitive:
1585 if ns_cfg_prim.name == config_name:
1586 config_primitive = ns_cfg_prim
1587 break
1588
1589 if config_primitive is None:
1590 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1591
1592 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1593 if config_primitive.has_field("user_defined_script"):
1594 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1595 group, scale_instance, trigger, vnfrs)
1596 err_msg = None
1597 if not rc:
1598 err_msg = "Failed config for trigger {} using config script '{}'". \
1599 format(self.scaling_trigger_str(trigger),
1600 config_primitive.user_defined_script)
1601 yield from update_config_status(success=rc, err_msg=err_msg)
1602 return rc
1603 else:
1604 err_msg = "Failed config for trigger {} as config script is not specified". \
1605 format(self.scaling_trigger_str(trigger))
1606 yield from update_config_status(success=False, err_msg=err_msg)
1607 raise NotImplementedError("Only script based config support for scale group for now: {}".
1608 format(group.name))
1609 else:
1610 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1611 format(self.scaling_trigger_str(trigger))
1612 yield from update_config_status(success=False, err_msg=err_msg)
1613 self._log.error("Config primitive not specified for config action in scale group %s" %
1614 (group.name))
1615 return False
1616
1617 def create_scaling_groups(self):
1618 """ This function creates a NSScalingGroup for every scaling
1619 group defined in he NSD"""
1620
1621 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1622 self._log.debug("Found scaling_group %s in nsr id %s",
1623 scaling_group_msg.name, self.id)
1624
1625 group_record = scale_group.ScalingGroup(
1626 self._log,
1627 scaling_group_msg
1628 )
1629
1630 self._scaling_groups[group_record.name] = group_record
1631
1632 @asyncio.coroutine
1633 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1634 group = self._scaling_groups[group_name]
1635 scale_instance = group.create_instance(index, is_default)
1636
1637 @asyncio.coroutine
1638 def create_vnfs():
1639 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1640 len(self.nsd_msg.constituent_vnfd), self.id, self)
1641
1642 vnfrs = []
1643 for vnf_index, count in group.vnf_index_count_map.items():
1644 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1645 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1646
1647 cloud_account_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1648 if cloud_account_name is None:
1649 cloud_account_name = self.cloud_account_name
1650 for _ in range(count):
1651 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, group_name, index)
1652 scale_instance.add_vnfr(vnfr)
1653 vnfrs.append(vnfr)
1654
1655 return vnfrs
1656
1657 @asyncio.coroutine
1658 def instantiate_instance():
1659 self._log.debug("Creating %s VNFRS", scale_instance)
1660 vnfrs = yield from create_vnfs()
1661 yield from self.publish()
1662
1663 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1664 scale_instance.operational_status = "vnf_init_phase"
1665 yield from self.update_state()
1666
1667 try:
1668 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1669 group, scale_instance, vnfrs)
1670 if not rc:
1671 self._log.error("Pre scale out config for scale group {} ({}) failed".
1672 format(group.name, index))
1673 scale_instance.operational_status = "failed"
1674 else:
1675 yield from self.instantiate_vnfs(vnfrs)
1676
1677 except Exception as e:
1678 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1679 format(group.name, e))
1680 self._log.exception(e)
1681 scale_instance.operational_status = "failed"
1682
1683 yield from self.update_state()
1684
1685 yield from instantiate_instance()
1686
1687 @asyncio.coroutine
1688 def delete_scale_group_instance(self, group_name, index):
1689 group = self._scaling_groups[group_name]
1690 scale_instance = group.get_instance(index)
1691 if scale_instance.is_default:
1692 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1693
1694 scale_instance.operational_status = "terminate"
1695 yield from self.update_state()
1696
1697 @asyncio.coroutine
1698 def terminate_instance():
1699 self._log.debug("Terminating %s VNFRS" % scale_instance)
1700 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1701 group, scale_instance)
1702 if not rc:
1703 self._log.error("Pre scale in config for scale group {} ({}) failed".
1704 format(group.name, index))
1705
1706 # Going ahead with terminate, even if there is an error in pre-scale-in config
1707 # as this could be result of scale out failure and we need to cleanup this group
1708 yield from self.terminate_vnfrs(scale_instance.vnfrs)
1709 group.delete_instance(index)
1710
1711 scale_instance.operational_status = "vnf_terminate_phase"
1712 yield from self.update_state()
1713
1714 yield from terminate_instance()
1715
1716 @asyncio.coroutine
1717 def _update_scale_group_instances_status(self):
1718 @asyncio.coroutine
1719 def post_scale_out_task(group, instance):
1720 # Apply post scale out config once all VNFRs are active
1721 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1722 group, instance)
1723 instance.operational_status = "running"
1724 if rc:
1725 self._log.debug("Scale out for group {} and instance {} succeeded".
1726 format(group.name, instance.instance_id))
1727 else:
1728 self._log.error("Post scale out config for scale group {} ({}) failed".
1729 format(group.name, instance.instance_id))
1730
1731 yield from self.update_state()
1732
1733 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1734 for group, instances in group_instances.items():
1735 self._log.debug("Updating %s instance status", group)
1736 for instance in instances:
1737 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1738 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1739 if instance.operational_status == "vnf_init_phase":
1740 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1741 instance.operational_status = "running"
1742
1743 # Create a task for post scale out to allow us to sleep before attempting
1744 # to configure newly created VM's
1745 self._loop.create_task(post_scale_out_task(group, instance))
1746
1747 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1748 self._log.debug("Scale out for group {} and instance {} failed".
1749 format(group.name, instance.instance_id))
1750 instance.operational_status = "failed"
1751
1752 elif instance.operational_status == "vnf_terminate_phase":
1753 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1754 instance.operational_status = "terminated"
1755 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1756 group, instance)
1757 if rc:
1758 self._log.debug("Scale in for group {} and instance {} succeeded".
1759 format(group.name, instance.instance_id))
1760 else:
1761 self._log.error("Post scale in config for scale group {} ({}) failed".
1762 format(group.name, instance.instance_id))
1763
1764 def create_vnffgs(self):
1765 """ This function creates VNFFGs for every VNFFG in the NSD
1766 associated with this NSR"""
1767
1768 for vnffgd in self.nsd_msg.vnffgd:
1769 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1770 vnffgr = VnffgRecord(self._dts,
1771 self._log,
1772 self._loop,
1773 self._nsm._vnffgmgr,
1774 self,
1775 self.name,
1776 vnffgd,
1777 self._sdn_account_name
1778 )
1779 self._vnffgrs[vnffgr.id] = vnffgr
1780
1781 def resolve_vld_ip_profile(self, nsd_msg, vld):
1782 if not vld.has_field('ip_profile_ref'):
1783 return None
1784 profile = [ profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref ]
1785 return profile[0] if profile else None
1786
1787 @asyncio.coroutine
1788 def _create_vls(self, vld, cloud_account):
1789 """Create a VLR in the cloud account specified using the given VLD
1790
1791 Args:
1792 vld : VLD yang obj
1793 cloud_account : Cloud account name
1794
1795 Returns:
1796 VirtualLinkRecord
1797 """
1798 vlr = yield from VirtualLinkRecord.create_record(
1799 self._dts,
1800 self._log,
1801 self._loop,
1802 self.name,
1803 vld,
1804 cloud_account,
1805 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1806 self.id,
1807 restart_mode=self.restart_mode)
1808
1809 return vlr
1810
1811 def _extract_cloud_accounts_for_vl(self, vld):
1812 """
1813 Extracts the list of cloud accounts from the NS Config obj
1814
1815 Rules:
1816 1. Cloud accounts based connection point (vnf_cloud_account_map)
1817 Args:
1818 vld : VLD yang object
1819
1820 Returns:
1821 TYPE: Description
1822 """
1823 cloud_account_list = []
1824
1825 if self._nsr_cfg_msg.vnf_cloud_account_map:
1826 # Handle case where cloud_account is None
1827 vnf_cloud_map = {}
1828 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1829 if vnf.cloud_account is not None:
1830 vnf_cloud_map[vnf.member_vnf_index_ref] = vnf.cloud_account
1831
1832 for vnfc in vld.vnfd_connection_point_ref:
1833 cloud_account = vnf_cloud_map.get(
1834 vnfc.member_vnf_index_ref,
1835 self.cloud_account_name)
1836
1837 cloud_account_list.append(cloud_account)
1838
1839 if self._nsr_cfg_msg.vl_cloud_account_map:
1840 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1841 if vld_map.vld_id_ref == vld.id:
1842 cloud_account_list.extend(vld_map.cloud_accounts)
1843
1844 # If no config has been provided then fall-back to the default
1845 # account
1846 if not cloud_account_list:
1847 cloud_account_list = [self.cloud_account_name]
1848
1849 self._log.debug("VL {} cloud accounts: {}".
1850 format(vld.name, cloud_account_list))
1851 return set(cloud_account_list)
1852
1853 @asyncio.coroutine
1854 def create_vls(self):
1855 """ This function creates VLs for every VLD in the NSD
1856 associated with this NSR"""
1857 for vld in self.nsd_msg.vld:
1858 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1859 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1860 for account in cloud_account_list:
1861 vlr = yield from self._create_vls(vld, account)
1862 self._vlrs.append(vlr)
1863
1864
1865 @asyncio.coroutine
1866 def create_vl_instance(self, vld):
1867 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1868 # Check if the VL is already present
1869 vlr = None
1870 for vl in self._vlrs:
1871 if vl.vld_msg.id == vld.id:
1872 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1873 vld.id, self.id, vl.id, vl.state)
1874 vlr = vl
1875 if vlr.state != VlRecordState.TERMINATED:
1876 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1877 vld, self.id
1878 self._log.error(err_msg)
1879 raise NsrVlUpdateError(err_msg)
1880 break
1881
1882 if vlr is None:
1883 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1884 for account in cloud_account_list:
1885 vlr = yield from self._create_vls(vld, account)
1886 self._vlrs.append(vlr)
1887
1888 vlr.state = VlRecordState.INSTANTIATION_PENDING
1889 yield from self.update_state()
1890
1891 try:
1892 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1893 vlr.state = VlRecordState.ACTIVE
1894
1895 except Exception as e:
1896 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1897 format(self.id, vld.id, e)
1898 self._log.error(err_msg)
1899 self._log.exception(e)
1900 vlr.state = VlRecordState.FAILED
1901
1902 yield from self.update_state()
1903
1904 @asyncio.coroutine
1905 def delete_vl_instance(self, vld):
1906 for vlr in self._vlrs:
1907 if vlr.vld_msg.id == vld.id:
1908 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1909 vlr.id, vld.id, self.id)
1910 vlr.state = VlRecordState.TERMINATE_PENDING
1911 yield from self.update_state()
1912
1913 try:
1914 yield from self.nsm_plugin.terminate_vl(vlr)
1915 vlr.state = VlRecordState.TERMINATED
1916 self._vlrs.remove(vlr)
1917
1918 except Exception as e:
1919 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1920 format(self.id, vld.id, e)
1921 self._log.error(err_msg)
1922 self._log.exception(e)
1923 vlr.state = VlRecordState.FAILED
1924
1925 yield from self.update_state()
1926 break
1927
1928 @asyncio.coroutine
1929 def create_vnfs(self, config_xact):
1930 """
1931 This function creates VNFs for every VNF in the NSD
1932 associated with this NSR
1933 """
1934 self._log.debug("Creating %u VNFs associated with this NS id %s",
1935 len(self.nsd_msg.constituent_vnfd), self.id)
1936
1937 for const_vnfd in self.nsd_msg.constituent_vnfd:
1938 if not const_vnfd.start_by_default:
1939 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1940 const_vnfd.member_vnf_index)
1941 continue
1942
1943 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1944 cloud_account_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1945 if cloud_account_name is None:
1946 cloud_account_name = self.cloud_account_name
1947 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name)
1948
1949
1950 def get_placement_groups(self, vnfd_msg, const_vnfd):
1951 placement_groups = []
1952 for group in self.nsd_msg.placement_groups:
1953 for member_vnfd in group.member_vnfd:
1954 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
1955 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
1956 group_info = self.resolve_placement_group_cloud_construct(group)
1957 if group_info is None:
1958 self._log.error("Could not resolve cloud-construct for placement group: %s", group.name)
1959 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1960 else:
1961 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1962 str(group_info),
1963 vnfd_msg.name,
1964 const_vnfd.member_vnf_index)
1965 placement_groups.append(group_info)
1966 return placement_groups
1967
1968 @asyncio.coroutine
1969 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, group_name=None, group_instance_id=None):
1970 # Fetch the VNFD associated with this VNF
1971 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
1972 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
1973 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
1974 vnfd_msg.name,
1975 const_vnfd.member_vnf_index,
1976 [ group.name for group in placement_groups])
1977 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
1978 self._log,
1979 self._loop,
1980 vnfd_msg,
1981 const_vnfd,
1982 self.nsd_id,
1983 self.name,
1984 cloud_account_name,
1985 self.id,
1986 group_name,
1987 group_instance_id,
1988 placement_groups,
1989 restart_mode=self.restart_mode,
1990 )
1991 if vnfr.id in self._vnfrs:
1992 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
1993 raise NetworkServiceRecordError(err)
1994
1995 self._vnfrs[vnfr.id] = vnfr
1996 self._nsm.vnfrs[vnfr.id] = vnfr
1997
1998 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
1999
2000 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2001 vnfr.name,
2002 vnfr.id)
2003
2004 return vnfr
2005
2006 def create_param_pools(self):
2007 for param_pool in self.nsd_msg.parameter_pool:
2008 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2009
2010 start_value = param_pool.range.start_value
2011 end_value = param_pool.range.end_value
2012 if end_value < start_value:
2013 raise NetworkServiceRecordError(
2014 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2015 start_value, end_value
2016 )
2017 )
2018
2019 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2020 self._log,
2021 param_pool.name,
2022 range(start_value, end_value)
2023 )
2024
2025 @asyncio.coroutine
2026 def fetch_vnfr(self, vnfr_path):
2027 """ Fetch VNFR record """
2028 vnfr = None
2029 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2030 vnfr_path, self.id)
2031 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2032
2033 for ent in res_iter:
2034 res = yield from ent
2035 vnfr = res.result
2036
2037 return vnfr
2038
2039 @asyncio.coroutine
2040 def instantiate_vnfs(self, vnfrs):
2041 """
2042 This function instantiates VNFs for every VNF in this Network Service
2043 """
2044 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2045 for vnf in vnfrs:
2046 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2047 yield from self.nsm_plugin.instantiate_vnf(self, vnf)
2048
2049 @asyncio.coroutine
2050 def instantiate_vnffgs(self):
2051 """
2052 This function instantiates VNFFGs for every VNFFG in this Network Service
2053 """
2054 self._log.debug("Instantiating %u VNFFGs in NS %s",
2055 len(self.nsd_msg.vnffgd), self.id)
2056 for _, vnfr in self.vnfrs.items():
2057 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2058 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2059 yield from asyncio.sleep(2, loop=self._loop)
2060 if vnfr.state == VnfRecordState.ACTIVE:
2061 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2062 continue
2063 else:
2064 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2065 self._vnffgr_state = VnffgRecordState.FAILED
2066 return
2067
2068 self._log.info("Waiting for 90 seconds for VMs to come up")
2069 yield from asyncio.sleep(90, loop=self._loop)
2070 self._log.info("Starting VNFFG orchestration")
2071 for vnffg in self._vnffgrs.values():
2072 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2073 yield from vnffg.instantiate()
2074
2075 @asyncio.coroutine
2076 def instantiate_scaling_instances(self, config_xact):
2077 """ Instantiate any default scaling instances in this Network Service """
2078 for group in self._scaling_groups.values():
2079 for i in range(group.min_instance_count):
2080 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2081 yield from self.create_scale_group_instance(
2082 group.name, i, config_xact, is_default=True
2083 )
2084
2085 for group_msg in self._nsr_cfg_msg.scaling_group:
2086 if group_msg.scaling_group_name_ref != group.name:
2087 continue
2088
2089 for instance in group_msg.instance:
2090 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2091 yield from self.create_scale_group_instance(
2092 group.name, instance.id, config_xact, is_default=False
2093 )
2094
2095 def has_scaling_instances(self):
2096 """ Return boolean indicating if the network service has default scaling groups """
2097 for group in self._scaling_groups.values():
2098 if group.min_instance_count > 0:
2099 return True
2100
2101 for group_msg in self._nsr_cfg_msg.scaling_group:
2102 if len(group_msg.instance) > 0:
2103 return True
2104
2105 return False
2106
2107 @asyncio.coroutine
2108 def publish(self):
2109 """ This function publishes this NSR """
2110 self._nsr_msg = self.create_msg()
2111
2112 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2113 self.nsr_xpath,
2114 self._nsr_msg)
2115
2116 if self._debug_running:
2117 self._log.debug("Publishing NSR in RUNNING state!")
2118 #raise()
2119
2120 with self._dts.transaction() as xact:
2121 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2122 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2123 self._debug_running = True
2124
2125 @asyncio.coroutine
2126 def unpublish(self, xact):
2127 """ Unpublish this NSR object """
2128 self._log.debug("Unpublishing Network service id %s", self.id)
2129 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2130
2131 @property
2132 def nsr_xpath(self):
2133 """ Returns the xpath associated with this NSR """
2134 return(
2135 "D,/nsr:ns-instance-opdata" +
2136 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2137 ).format(self.id)
2138
2139 @staticmethod
2140 def xpath_from_nsr(nsr):
2141 """ Returns the xpath associated with this NSR op data"""
2142 return (NetworkServiceRecord.XPATH +
2143 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2144
2145 @property
2146 def nsd_xpath(self):
2147 """ Return NSD config xpath."""
2148 return(
2149 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2150 ).format(self.nsd_id)
2151
2152 @asyncio.coroutine
2153 def instantiate(self, config_xact):
2154 """"Instantiates a NetworkServiceRecord.
2155
2156 This function instantiates a Network service
2157 which involves the following steps,
2158
2159 * Instantiate every VL in NSD by sending create VLR request to DTS.
2160 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2161 * Publish the NSR details to DTS
2162
2163 Arguments:
2164 nsr: The NSR configuration request containing nsr-id and nsd
2165 config_xact: The configuration transaction which initiated the instatiation
2166
2167 Raises:
2168 NetworkServiceRecordError if the NSR creation fails
2169
2170 Returns:
2171 No return value
2172 """
2173
2174 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2175
2176 # Move the state to INIITALIZING
2177 self.set_state(NetworkServiceRecordState.INIT)
2178
2179 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2180 self.record_event("instantiating", event_descr)
2181
2182 # Find the NSD
2183 self._nsd = self._nsr_cfg_msg.nsd
2184
2185 try:
2186 # Update ref count if nsd present in catalog
2187 self._nsm.get_nsd_ref(self.nsd_id)
2188
2189 except NetworkServiceDescriptorError:
2190 # This could be an NSD not in the nsd-catalog
2191 pass
2192
2193 # Merge any config and initial config primitive values
2194 self.config_store.merge_nsd_config(self.nsd_msg)
2195 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2196
2197 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2198 self.record_event("nsd-fetched", event_descr)
2199
2200 if self._nsd is None:
2201 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2202 self._log.debug(msg, self.nsd_id, self.id)
2203 raise NetworkServiceRecordError(self)
2204
2205 self._log.debug("Got nsd result %s", self._nsd)
2206
2207 # Substitute any input parameters
2208 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2209
2210 # Create the record
2211 yield from self.create(config_xact)
2212
2213 # Publish the NSR to DTS
2214 yield from self.publish()
2215
2216 @asyncio.coroutine
2217 def do_instantiate():
2218 """
2219 Instantiate network service
2220 """
2221 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2222 self.id, self.nsd_id)
2223
2224 # instantiate the VLs
2225 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2226 (len(self.nsd_msg.vld), self.id))
2227 self.record_event("begin-external-vls-instantiation", event_descr)
2228
2229 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2230
2231 yield from self.instantiate_vls()
2232
2233 # Publish the NSR to DTS
2234 yield from self.publish()
2235
2236 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2237 (len(self.nsd_msg.vld), self.id))
2238 self.record_event("end-external-vls-instantiation", event_descr)
2239
2240 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2241
2242 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2243 self.id, self.nsd_id)
2244
2245 # instantiate the VNFs
2246 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2247 (len(self.nsd_msg.constituent_vnfd), self.id))
2248
2249 self.record_event("begin-vnf-instantiation", event_descr)
2250
2251 yield from self.instantiate_vnfs(self._vnfrs.values())
2252
2253 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2254 len(self.nsd_msg.constituent_vnfd), self.id)
2255
2256 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2257 (len(self.nsd_msg.constituent_vnfd), self.id))
2258 self.record_event("end-vnf-instantiation", event_descr)
2259
2260 if len(self.vnffgrs) > 0:
2261 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2262 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2263 (len(self.nsd_msg.vnffgd), self.id))
2264
2265 self.record_event("begin-vnffg-instantiation", event_descr)
2266
2267 yield from self.instantiate_vnffgs()
2268
2269 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2270 (len(self.nsd_msg.vnffgd), self.id))
2271 self.record_event("end-vnffg-instantiation", event_descr)
2272
2273 if self.has_scaling_instances():
2274 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2275 (len(self._scaling_groups), self.id))
2276
2277 self.record_event("begin-scaling-group-instantiation", event_descr)
2278 yield from self.instantiate_scaling_instances(config_xact)
2279 self.record_event("end-scaling-group-instantiation", event_descr)
2280
2281 # Give the plugin a chance to deploy the network service now that all
2282 # virtual links and vnfs are instantiated
2283 yield from self.nsm_plugin.deploy(self._nsr_msg)
2284
2285 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2286 self.id, self.nsd_id)
2287
2288 # Publish the NSR to DTS
2289 yield from self.publish()
2290
2291 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2292 self.id, self.nsd_id)
2293
2294 def on_instantiate_done(fut):
2295 # If the do_instantiate fails, then publish NSR with failed result
2296 if fut.exception() is not None:
2297 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception()))
2298 self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception())))
2299
2300 instantiate_task = self._loop.create_task(do_instantiate())
2301 instantiate_task.add_done_callback(on_instantiate_done)
2302
2303 @asyncio.coroutine
2304 def set_config_status(self, status, status_details=None):
2305 if self.config_status != status:
2306 self._log.debug("Updating NSR {} status for {} to {}".
2307 format(self.name, self.config_status, status))
2308 self._config_status = status
2309 self._config_status_details = status_details
2310
2311 if self._config_status == NsrYang.ConfigStates.FAILED:
2312 self.record_event("config-failed", "NS configuration failed",
2313 evt_details=self._config_status_details)
2314
2315 yield from self.publish()
2316
2317 @asyncio.coroutine
2318 def is_active(self):
2319 """ This NS is active """
2320 self.set_state(NetworkServiceRecordState.RUNNING)
2321 if self._is_active:
2322 return
2323
2324 # Publish the NSR to DTS
2325 self._log.debug("Network service %s is active ", self.id)
2326 self._is_active = True
2327
2328 event_descr = "NSR in running state for NSR id %s" % self.id
2329 self.record_event("ns-running", event_descr)
2330
2331 yield from self.publish()
2332
2333 @asyncio.coroutine
2334 def instantiation_failed(self, failed_reason=None):
2335 """ The NS instantiation failed"""
2336 self._log.error("Network service id:%s, name:%s instantiation failed",
2337 self.id, self.name)
2338 self.set_state(NetworkServiceRecordState.FAILED)
2339
2340 event_descr = "Instantiation of NS %s failed" % self.id
2341 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2342
2343 # Publish the NSR to DTS
2344 yield from self.publish()
2345
2346 @asyncio.coroutine
2347 def terminate_vnfrs(self, vnfrs):
2348 """ Terminate VNFRS in this network service """
2349 self._log.debug("Terminating VNFs in network service %s", self.id)
2350 for vnfr in vnfrs:
2351 yield from self.nsm_plugin.terminate_vnf(vnfr)
2352
2353 @asyncio.coroutine
2354 def terminate(self):
2355 """ Terminate a NetworkServiceRecord."""
2356 def terminate_vnffgrs():
2357 """ Terminate VNFFGRS in this network service """
2358 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2359 for vnffgr in self.vnffgrs.values():
2360 yield from vnffgr.terminate()
2361
2362 def terminate_vlrs():
2363 """ Terminate VLRs in this netork service """
2364 self._log.debug("Terminating VLs in network service %s", self.id)
2365 for vlr in self.vlrs:
2366 yield from self.nsm_plugin.terminate_vl(vlr)
2367 vlr.state = VlRecordState.TERMINATED
2368
2369 self._log.debug("Terminating network service id %s", self.id)
2370
2371 # Move the state to TERMINATE
2372 self.set_state(NetworkServiceRecordState.TERMINATE)
2373 event_descr = "Terminate being processed for NS Id:%s" % self.id
2374 self.record_event("terminate", event_descr)
2375
2376 # Move the state to VNF_TERMINATE_PHASE
2377 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2378 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2379 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2380 self.record_event("terminating-vnffgss", event_descr)
2381 yield from terminate_vnffgrs()
2382
2383 # Move the state to VNF_TERMINATE_PHASE
2384 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2385 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2386 self.record_event("terminating-vnfs", event_descr)
2387 yield from self.terminate_vnfrs(self.vnfrs.values())
2388
2389 # Move the state to VL_TERMINATE_PHASE
2390 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2391 event_descr = "Terminating VLs in NS Id:%s" % self.id
2392 self.record_event("terminating-vls", event_descr)
2393 yield from terminate_vlrs()
2394
2395 yield from self.nsm_plugin.terminate_ns(self)
2396
2397 # Move the state to TERMINATED
2398 self.set_state(NetworkServiceRecordState.TERMINATED)
2399 event_descr = "Terminated NS Id:%s" % self.id
2400 self.record_event("terminated", event_descr)
2401
2402 def enable(self):
2403 """"Enable a NetworkServiceRecord."""
2404 pass
2405
2406 def disable(self):
2407 """"Disable a NetworkServiceRecord."""
2408 pass
2409
2410 def map_config_status(self):
2411 self._log.debug("Config status for ns {} is {}".
2412 format(self.name, self._config_status))
2413 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2414 return 'configuring'
2415 if self._config_status == NsrYang.ConfigStates.FAILED:
2416 return 'failed'
2417 return 'configured'
2418
2419 def vl_phase_completed(self):
2420 """ Are VLs created in this NS?"""
2421 return self._vl_phase_completed
2422
2423 def vnf_phase_completed(self):
2424 """ Are VLs created in this NS?"""
2425 return self._vnf_phase_completed
2426
2427 def create_msg(self):
2428 """ The network serice record as a message """
2429 nsr_dict = {"ns_instance_config_ref": self.id}
2430 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2431 #nsr.cloud_account = self.cloud_account_name
2432 nsr.sdn_account = self._sdn_account_name
2433 nsr.name_ref = self.name
2434 nsr.nsd_ref = self.nsd_id
2435 nsr.nsd_name_ref = self.nsd_msg.name
2436 nsr.operational_events = self._op_status.msg
2437 nsr.operational_status = self._op_status.yang_str()
2438 nsr.config_status = self.map_config_status()
2439 nsr.config_status_details = self._config_status_details
2440 nsr.create_time = self._create_time
2441
2442 for cfg_prim in self.nsd_msg.service_primitive:
2443 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2444 cfg_prim.as_dict())
2445 nsr.service_primitive.append(cfg_prim)
2446
2447 for init_cfg in self.nsd_msg.initial_config_primitive:
2448 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2449 init_cfg.as_dict())
2450 nsr.initial_config_primitive.append(prim)
2451
2452 if self.vl_phase_completed():
2453 for vlr in self.vlrs:
2454 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2455
2456 if self.vnf_phase_completed():
2457 for vnfr_id in self.vnfrs:
2458 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2459 for vnffgr in self.vnffgrs.values():
2460 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2461 for scaling_group in self._scaling_groups.values():
2462 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2463
2464 return nsr
2465
2466 def all_vnfs_active(self):
2467 """ Are all VNFS in this NS active? """
2468 for _, vnfr in self.vnfrs.items():
2469 if vnfr.active is not True:
2470 return False
2471 return True
2472
2473 @asyncio.coroutine
2474 def update_state(self):
2475 """ Re-evaluate this NS's state """
2476 curr_state = self._op_status.state
2477
2478 if curr_state == NetworkServiceRecordState.TERMINATED:
2479 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2480 return
2481
2482 new_state = NetworkServiceRecordState.RUNNING
2483 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2484 self.id, curr_state)
2485
2486 # Check all the VNFRs are present
2487 for _, vnfr in self.vnfrs.items():
2488 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2489 pass
2490 elif vnfr.state == VnfRecordState.FAILED:
2491 if vnfr._prev_state != vnfr.state:
2492 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2493 event_error_details = vnfr.state_failed_reason
2494 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2495 vnfr.set_state(VnfRecordState.FAILED)
2496 else:
2497 self._log.info("VNF state did not change, curr=%s, prev=%s",
2498 vnfr.state, vnfr._prev_state)
2499 new_state = NetworkServiceRecordState.FAILED
2500 break
2501 else:
2502 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2503 vnfr.id, self.id, vnfr.state)
2504 new_state = curr_state
2505
2506 # If new state is RUNNING; check all VLs
2507 if new_state == NetworkServiceRecordState.RUNNING:
2508 for vl in self.vlrs:
2509
2510 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2511 pass
2512 elif vl.state == VlRecordState.FAILED:
2513 if vl.prev_state != vl.state:
2514 event_descr = "Instantiation of VL %s failed" % vl.id
2515 event_error_details = vl.state_failed_reason
2516 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2517 vl.prev_state = vl.state
2518 else:
2519 self._log.debug("VL %s already in failed state")
2520 else:
2521 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2522 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2523 break
2524
2525 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2526 new_state = NetworkServiceRecordState.VL_TERMINATE
2527 break
2528
2529 # If new state is RUNNING; check VNFFGRs are also active
2530 if new_state == NetworkServiceRecordState.RUNNING:
2531 for _, vnffgr in self.vnffgrs.items():
2532 self._log.info("Checking vnffgr state for nsr %s is: %s",
2533 self.id, vnffgr.state)
2534 if vnffgr.state == VnffgRecordState.ACTIVE:
2535 pass
2536 elif vnffgr.state == VnffgRecordState.FAILED:
2537 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2538 self.record_event("vnffg-failed", event_descr)
2539 new_state = NetworkServiceRecordState.FAILED
2540 break
2541 else:
2542 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2543 vnffgr.id, self.id, vnffgr.state)
2544 new_state = curr_state
2545
2546 # Update all the scaling group instance operational status to
2547 # reflect the state of all VNFR within that instance
2548 yield from self._update_scale_group_instances_status()
2549
2550 for _, group in self._scaling_groups.items():
2551 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2552 new_state = NetworkServiceRecordState.SCALING_OUT
2553 break
2554 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2555 new_state = NetworkServiceRecordState.SCALING_IN
2556 break
2557
2558 if new_state != curr_state:
2559 self._log.debug("Changing state of Network service %s from %s to %s",
2560 self.id, curr_state, new_state)
2561 if new_state == NetworkServiceRecordState.RUNNING:
2562 yield from self.is_active()
2563 elif new_state == NetworkServiceRecordState.FAILED:
2564 # If the NS is already active and we entered scaling_in, scaling_out,
2565 # do not mark the NS as failing if scaling operation failed.
2566 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2567 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2568 new_state = NetworkServiceRecordState.RUNNING
2569 self.set_state(new_state)
2570 else:
2571 yield from self.instantiation_failed()
2572 else:
2573 self.set_state(new_state)
2574
2575 yield from self.publish()
2576
2577
2578 class InputParameterSubstitution(object):
2579 """
2580 This class is responsible for substituting input parameters into an NSD.
2581 """
2582
2583 def __init__(self, log):
2584 """Create an instance of InputParameterSubstitution
2585
2586 Arguments:
2587 log - a logger for this object to use
2588
2589 """
2590 self.log = log
2591
2592 def __call__(self, nsd, nsr_config):
2593 """Substitutes input parameters from the NSR config into the NSD
2594
2595 This call modifies the provided NSD with the input parameters that are
2596 contained in the NSR config.
2597
2598 Arguments:
2599 nsd - a GI NSD object
2600 nsr_config - a GI NSR config object
2601
2602 """
2603 if nsd is None or nsr_config is None:
2604 return
2605
2606 # Create a lookup of the xpath elements that this descriptor allows
2607 # to be modified
2608 optional_input_parameters = set()
2609 for input_parameter in nsd.input_parameter_xpath:
2610 optional_input_parameters.add(input_parameter.xpath)
2611
2612 # Apply the input parameters to the descriptor
2613 if nsr_config.input_parameter:
2614 for param in nsr_config.input_parameter:
2615 if param.xpath not in optional_input_parameters:
2616 msg = "tried to set an invalid input parameter ({})"
2617 self.log.error(msg.format(param.xpath))
2618 continue
2619
2620 self.log.debug(
2621 "input-parameter:{} = {}".format(
2622 param.xpath,
2623 param.value,
2624 )
2625 )
2626
2627 try:
2628 xpath.setxattr(nsd, param.xpath, param.value)
2629
2630 except Exception as e:
2631 self.log.exception(e)
2632
2633
2634 class NetworkServiceDescriptor(object):
2635 """
2636 Network service descriptor class
2637 """
2638
2639 def __init__(self, dts, log, loop, nsd, nsm):
2640 self._dts = dts
2641 self._log = log
2642 self._loop = loop
2643
2644 self._nsd = nsd
2645 self._ref_count = 0
2646
2647 self._nsm = nsm
2648
2649 @property
2650 def id(self):
2651 """ Returns nsd id """
2652 return self._nsd.id
2653
2654 @property
2655 def name(self):
2656 """ Returns name of nsd """
2657 return self._nsd.name
2658
2659 @property
2660 def ref_count(self):
2661 """ Returns reference count"""
2662 return self._ref_count
2663
2664 def in_use(self):
2665 """ Returns whether nsd is in use or not """
2666 return True if self.ref_count > 0 else False
2667
2668 def ref(self):
2669 """ Take a reference on this object """
2670 self._ref_count += 1
2671
2672 def unref(self):
2673 """ Release reference on this object """
2674 if self.ref_count < 1:
2675 msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2676 (self.id, self.ref_count))
2677 self._log.critical(msg)
2678 raise NetworkServiceDescriptorError(msg)
2679 self._ref_count -= 1
2680
2681 @property
2682 def msg(self):
2683 """ Return the message associated with this NetworkServiceDescriptor"""
2684 return self._nsd
2685
2686 @staticmethod
2687 def path_for_id(nsd_id):
2688 """ Return path for the passed nsd_id"""
2689 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2690
2691 def path(self):
2692 """ Return the message associated with this NetworkServiceDescriptor"""
2693 return NetworkServiceDescriptor.path_for_id(self.id)
2694
2695 def update(self, nsd):
2696 """ Update the NSD descriptor """
2697 self._nsd = nsd
2698
2699
2700 class NsdDtsHandler(object):
2701 """ The network service descriptor DTS handler """
2702 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2703
2704 def __init__(self, dts, log, loop, nsm):
2705 self._dts = dts
2706 self._log = log
2707 self._loop = loop
2708 self._nsm = nsm
2709
2710 self._regh = None
2711
2712 @property
2713 def regh(self):
2714 """ Return registration handle """
2715 return self._regh
2716
2717 @asyncio.coroutine
2718 def register(self):
2719 """ Register for Nsd create/update/delete/read requests from dts """
2720
2721 def on_apply(dts, acg, xact, action, scratch):
2722 """Apply the configuration"""
2723 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2724 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2725 xact, action)
2726 # Create/Update an NSD record
2727 for cfg in self._regh.get_xact_elements(xact):
2728 # Only interested in those NSD cfgs whose ID was received in prepare callback
2729 if cfg.id in scratch.get('nsds', []) or is_recovery:
2730 self._nsm.update_nsd(cfg)
2731
2732 scratch.pop('nsds', None)
2733
2734 return RwTypes.RwStatus.SUCCESS
2735
2736 @asyncio.coroutine
2737 def delete_nsd_libs(nsd_id):
2738 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2739 try:
2740 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2741 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2742
2743 if os.path.exists (nsd_dir):
2744 shutil.rmtree(nsd_dir, ignore_errors=True)
2745 except Exception as e:
2746 self._log.error("Exception in cleaning up NSD libs {}: {}".
2747 format(nsd_id, e))
2748 self._log.excpetion(e)
2749
2750 @asyncio.coroutine
2751 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2752 """ Prepare callback from DTS for NSD config """
2753
2754 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2755 msg.id, msg)
2756
2757 fref = ProtobufC.FieldReference.alloc()
2758 fref.goto_whole_message(msg.to_pbcm())
2759
2760 if fref.is_field_deleted():
2761 # Delete an NSD record
2762 self._log.debug("Deleting NSD with id %s", msg.id)
2763 if self._nsm.nsd_in_use(msg.id):
2764 self._log.debug("Cannot delete NSD in use - %s", msg.id)
2765 err = "Cannot delete an NSD in use - %s" % msg.id
2766 raise NetworkServiceDescriptorRefCountExists(err)
2767
2768 yield from delete_nsd_libs(msg.id)
2769 self._nsm.delete_nsd(msg.id)
2770 else:
2771 # Add this NSD to scratch to create/update in apply callback
2772 nsds = scratch.setdefault('nsds', [])
2773 nsds.append(msg.id)
2774 # acg._scratch['nsds'].append(msg.id)
2775
2776 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2777
2778 self._log.debug(
2779 "Registering for NSD config using xpath: %s",
2780 NsdDtsHandler.XPATH,
2781 )
2782
2783 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2784 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2785 # Need a list in scratch to store NSDs to create/update later
2786 # acg._scratch['nsds'] = list()
2787 self._regh = acg.register(
2788 xpath=NsdDtsHandler.XPATH,
2789 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2790 on_prepare=on_prepare)
2791
2792
2793 class VnfdDtsHandler(object):
2794 """ DTS handler for VNFD config changes """
2795 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2796
2797 def __init__(self, dts, log, loop, nsm):
2798 self._dts = dts
2799 self._log = log
2800 self._loop = loop
2801 self._nsm = nsm
2802 self._regh = None
2803
2804 @property
2805 def regh(self):
2806 """ DTS registration handle """
2807 return self._regh
2808
2809 @asyncio.coroutine
2810 def register(self):
2811 """ Register for VNFD configuration"""
2812
2813 @asyncio.coroutine
2814 def on_apply(dts, acg, xact, action, scratch):
2815 """Apply the configuration"""
2816 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2817 xact, action, scratch)
2818
2819 # Create/Update a VNFD record
2820 for cfg in self._regh.get_xact_elements(xact):
2821 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2822 if cfg.id in scratch.get('vnfds', []):
2823 self._nsm.update_vnfd(cfg)
2824
2825 for cfg in self._regh.elements:
2826 if cfg.id in scratch.get('deleted_vnfds', []):
2827 yield from self._nsm.delete_vnfd(cfg.id)
2828
2829 scratch.pop('vnfds', None)
2830 scratch.pop('deleted_vnfds', None)
2831
2832 @asyncio.coroutine
2833 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2834 """ on prepare callback """
2835 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2836 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2837
2838 fref = ProtobufC.FieldReference.alloc()
2839 fref.goto_whole_message(msg.to_pbcm())
2840
2841 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2842 if fref.is_field_deleted():
2843 self._log.debug("Adding msg to deleted field")
2844 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2845 deleted_vnfds.append(msg.id)
2846 else:
2847 # Add this VNFD to scratch to create/update in apply callback
2848 vnfds = scratch.setdefault('vnfds', [])
2849 vnfds.append(msg.id)
2850
2851 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2852
2853 self._log.debug(
2854 "Registering for VNFD config using xpath: %s",
2855 VnfdDtsHandler.XPATH,
2856 )
2857 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2858 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2859 # Need a list in scratch to store VNFDs to create/update later
2860 # acg._scratch['vnfds'] = list()
2861 # acg._scratch['deleted_vnfds'] = list()
2862 self._regh = acg.register(
2863 xpath=VnfdDtsHandler.XPATH,
2864 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2865 on_prepare=on_prepare)
2866
2867 class NsrRpcDtsHandler(object):
2868 """ The network service instantiation RPC DTS handler """
2869 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2870 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2871 NETCONF_IP_ADDRESS = "127.0.0.1"
2872 NETCONF_PORT = 2022
2873 NETCONF_USER = "admin"
2874 NETCONF_PW = "admin"
2875
2876 def __init__(self, dts, log, loop, nsm):
2877 self._dts = dts
2878 self._log = log
2879 self._loop = loop
2880 self._nsm = nsm
2881 self._nsd = None
2882
2883 self._ns_regh = None
2884
2885 self._manager = None
2886
2887 self._model = RwYang.Model.create_libncx()
2888 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2889
2890 @property
2891 def nsm(self):
2892 """ Return the NS manager instance """
2893 return self._nsm
2894
2895 @staticmethod
2896 def wrap_netconf_config_xml(xml):
2897 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2898 return xml
2899
2900 @asyncio.coroutine
2901 def _connect(self, timeout_secs=240):
2902
2903 start_time = time.time()
2904 while (time.time() - start_time) < timeout_secs:
2905
2906 try:
2907 self._log.debug("Attemping NsmTasklet netconf connection.")
2908
2909 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2910 loop=self._loop,
2911 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2912 port=NsrRpcDtsHandler.NETCONF_PORT,
2913 username=NsrRpcDtsHandler.NETCONF_USER,
2914 password=NsrRpcDtsHandler.NETCONF_PW,
2915 allow_agent=False,
2916 look_for_keys=False,
2917 hostkey_verify=False,
2918 )
2919
2920 return manager
2921
2922 except ncclient.transport.errors.SSHError as e:
2923 self._log.warning("Netconf connection to launchpad %s failed: %s",
2924 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2925
2926 yield from asyncio.sleep(5, loop=self._loop)
2927
2928 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2929 timeout_secs)
2930
2931 @asyncio.coroutine
2932 def register(self):
2933 """ Register for NS monitoring read from dts """
2934 @asyncio.coroutine
2935 def on_ns_config_prepare(xact_info, action, ks_path, msg):
2936 """ prepare callback from dts start-network-service"""
2937 assert action == rwdts.QueryAction.RPC
2938 rpc_ip = msg
2939 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
2940 "nsr_id":str(uuid.uuid4())
2941 })
2942
2943 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and 'cloud_account' in rpc_ip):
2944 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
2945
2946
2947 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
2948
2949 try:
2950 # Add used value to the pool
2951 self._log.debug("RPC output: {}".format(rpc_op))
2952 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
2953
2954 if not self._manager:
2955 self._manager = yield from self._connect()
2956
2957 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2958 rpc_ip.name, rpc_ip.nsd_ref)
2959
2960 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
2961 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
2962 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
2963 ns_instance_config_dict.update(ns_instance_config_copy_dict)
2964
2965 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
2966 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2967 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
2968
2969 xml = ns_instance_config.to_xml_v2(self._model)
2970 netconf_xml = self.wrap_netconf_config_xml(xml)
2971
2972 self._log.debug("Sending configure ns-instance-config xml to %s: %s",
2973 netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
2974
2975 response = yield from self._manager.edit_config(
2976 target="running",
2977 config=netconf_xml,
2978 )
2979 self._log.debug("Received edit config response: %s", str(response))
2980
2981 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
2982 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
2983 rpc_op)
2984 except Exception as e:
2985 self._log.error("Exception processing the "
2986 "start-network-service: {}".format(e))
2987 self._log.exception(e)
2988 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
2989 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
2990
2991
2992 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
2993
2994 with self._dts.group_create() as group:
2995 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
2996 handler=hdl_ns,
2997 flags=rwdts.Flag.PUBLISHER,
2998 )
2999
3000
3001 class NsrDtsHandler(object):
3002 """ The network service DTS handler """
3003 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3004 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3005
3006 def __init__(self, dts, log, loop, nsm):
3007 self._dts = dts
3008 self._log = log
3009 self._loop = loop
3010 self._nsm = nsm
3011
3012 self._nsr_regh = None
3013 self._scale_regh = None
3014
3015 @property
3016 def nsm(self):
3017 """ Return the NS manager instance """
3018 return self._nsm
3019
3020 @asyncio.coroutine
3021 def register(self):
3022 """ Register for Nsr create/update/delete/read requests from dts """
3023
3024 def nsr_id_from_keyspec(ks):
3025 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3026 nsr_id = nsr_path_entry.key00.id
3027 return nsr_id
3028
3029 def group_name_from_keyspec(ks):
3030 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3031 group_name = group_path_entry.key00.scaling_group_name_ref
3032 return group_name
3033
3034 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3035 """ Return boolean indicating if scaling group instance was already commited previously.
3036
3037 By looking at the existing elements in this registration handle (elements not part
3038 of this current xact), we can tell if the instance was configured previously without
3039 keeping any application state.
3040 """
3041 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3042 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3043 elem_group_name = group_name_from_keyspec(keyspec)
3044
3045 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3046 continue
3047
3048 if instance_cfg.id == instance_id:
3049 return True
3050
3051 return False
3052
3053 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3054 delta = {"added": [], "deleted": []}
3055 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3056 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3057 if elem_nsr_id != nsr_id:
3058 continue
3059
3060 elem_group_name = group_name_from_keyspec(keyspec)
3061 if elem_group_name != group_name:
3062 continue
3063
3064 delta["added"].append(instance_cfg.id)
3065
3066 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3067 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3068 if elem_nsr_id != nsr_id:
3069 continue
3070
3071 elem_group_name = group_name_from_keyspec(keyspec)
3072 if elem_group_name != group_name:
3073 continue
3074
3075 if instance_cfg.id in delta["added"]:
3076 delta["added"].remove(instance_cfg.id)
3077 else:
3078 delta["deleted"].append(instance_cfg.id)
3079
3080 return delta
3081
3082 @asyncio.coroutine
3083 def update_nsr_nsd(nsr_id, xact, scratch):
3084
3085 @asyncio.coroutine
3086 def get_nsr_vl_delta(nsr_id, xact, scratch):
3087 delta = {"added": [], "deleted": []}
3088 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3089 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3090 if elem_nsr_id != nsr_id:
3091 continue
3092
3093 if 'vld' in instance_cfg.nsd:
3094 for vld in instance_cfg.nsd.vld:
3095 delta["added"].append(vld)
3096
3097 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3098 self._log.debug("NSR update: %s", instance_cfg)
3099 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3100 if elem_nsr_id != nsr_id:
3101 continue
3102
3103 if 'vld' in instance_cfg.nsd:
3104 for vld in instance_cfg.nsd.vld:
3105 if vld in delta["added"]:
3106 delta["added"].remove(vld)
3107 else:
3108 delta["deleted"].append(vld)
3109
3110 return delta
3111
3112 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3113 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3114
3115 for vld in vl_delta["added"]:
3116 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3117
3118 for vld in vl_delta["deleted"]:
3119 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3120
3121 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3122 # Unfortunately, it is currently difficult to figure out what has exactly
3123 # changed in this xact without Pbdelta support (RIFT-4916)
3124 # As a workaround, we can fetch the pre and post xact elements and
3125 # perform a comparison to figure out adds/deletes/updates
3126 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3127 curr_cfgs = list(dts_member_reg.elements)
3128
3129 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3130 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3131
3132 # Find Adds
3133 added_keys = set(xact_key_map) - set(curr_key_map)
3134 added_cfgs = [xact_key_map[key] for key in added_keys]
3135
3136 # Find Deletes
3137 deleted_keys = set(curr_key_map) - set(xact_key_map)
3138 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3139
3140 # Find Updates
3141 updated_keys = set(curr_key_map) & set(xact_key_map)
3142 updated_cfgs = [xact_key_map[key] for key in updated_keys
3143 if xact_key_map[key] != curr_key_map[key]]
3144
3145 return added_cfgs, deleted_cfgs, updated_cfgs
3146
3147 def on_apply(dts, acg, xact, action, scratch):
3148 """Apply the configuration"""
3149 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3150 xact, action, scratch)
3151
3152 def handle_create_nsr(msg, restart_mode=False):
3153 # Handle create nsr requests """
3154 # Do some validations
3155 if not msg.has_field("nsd"):
3156 err = "NSD not provided"
3157 self._log.error(err)
3158 raise NetworkServiceRecordError(err)
3159
3160 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3161 msg.id, msg.as_dict())
3162 nsr = self.nsm.create_nsr(msg, restart_mode=restart_mode)
3163 return nsr
3164
3165 def handle_delete_nsr(msg):
3166 @asyncio.coroutine
3167 def delete_instantiation(ns_id):
3168 """ Delete instantiation """
3169 with self._dts.transaction() as xact:
3170 yield from self._nsm.terminate_ns(ns_id, xact)
3171
3172 # Handle delete NSR requests
3173 self._log.info("Delete req for NSR Id: %s received", msg.id)
3174 # Terminate the NSR instance
3175 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3176
3177 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3178 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3179 nsr.record_event("terminate-rcvd", event_descr)
3180
3181 self._loop.create_task(delete_instantiation(msg.id))
3182
3183 @asyncio.coroutine
3184 def begin_instantiation(nsr):
3185 # Begin instantiation
3186 self._log.info("Beginning NS instantiation: %s", nsr.id)
3187 yield from self._nsm.instantiate_ns(nsr.id, xact)
3188
3189 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3190 xact, action, scratch)
3191
3192 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3193 for element in self._nsr_regh.elements:
3194 nsr = handle_create_nsr(element, restart_mode=True)
3195 self._loop.create_task(begin_instantiation(nsr))
3196
3197
3198 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3199 xact,
3200 "id",
3201 scratch)
3202 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3203 deleted_msgs, updated_msgs)
3204
3205 for msg in added_msgs:
3206 if msg.id not in self._nsm.nsrs:
3207 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3208 nsr = handle_create_nsr(msg)
3209 self._loop.create_task(begin_instantiation(nsr))
3210
3211 for msg in deleted_msgs:
3212 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3213 try:
3214 handle_delete_nsr(msg)
3215 except Exception:
3216 self._log.exception("Failed to terminate NS:%s", msg.id)
3217
3218 for msg in updated_msgs:
3219 self._log.info("Update NSR received in on_apply: %s", msg)
3220
3221 self._nsm.nsr_update_cfg(msg.id, msg)
3222
3223 if 'nsd' in msg:
3224 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3225
3226 for group in msg.scaling_group:
3227 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3228 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3229
3230 for instance_id in instance_delta["added"]:
3231 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3232
3233 for instance_id in instance_delta["deleted"]:
3234 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3235
3236
3237 return RwTypes.RwStatus.SUCCESS
3238
3239 @asyncio.coroutine
3240 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3241 """ Prepare calllback from DTS for NSR """
3242
3243 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3244 action = xact_info.query_action
3245 self._log.debug(
3246 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3247 xact, action, xact_info, xpath, msg
3248 )
3249
3250 @asyncio.coroutine
3251 def delete_instantiation(ns_id):
3252 """ Delete instantiation """
3253 yield from self._nsm.terminate_ns(ns_id, None)
3254
3255 def handle_delete_nsr():
3256 """ Handle delete NSR requests """
3257 self._log.info("Delete req for NSR Id: %s received", msg.id)
3258 # Terminate the NSR instance
3259 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3260
3261 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3262 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3263 nsr.record_event("terminate-rcvd", event_descr)
3264
3265 self._loop.create_task(delete_instantiation(msg.id))
3266
3267 fref = ProtobufC.FieldReference.alloc()
3268 fref.goto_whole_message(msg.to_pbcm())
3269
3270 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3271 # if this is an NSR create
3272 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3273 # Ensure the Cloud account/datacenter has been specified
3274 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3275 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3276
3277 # Check if nsd is specified
3278 if not msg.has_field("nsd"):
3279 raise NsrInstantiationFailed("NSD not specified in NSR")
3280
3281 else:
3282 nsr = self._nsm.nsrs[msg.id]
3283
3284 if msg.has_field("nsd"):
3285 if nsr.state != NetworkServiceRecordState.RUNNING:
3286 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3287 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3288 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3289
3290 if msg.has_field("scaling_group"):
3291 if nsr.state != NetworkServiceRecordState.RUNNING:
3292 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3293
3294 if len(msg.scaling_group) > 1:
3295 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3296
3297 for group_msg in msg.scaling_group:
3298 num_new_group_instances = len(group_msg.instance)
3299 if num_new_group_instances > 1:
3300 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3301
3302 elif num_new_group_instances == 1:
3303 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3304 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3305 if len(scale_group.instances) == scale_group.max_instance_count:
3306 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3307
3308 acg.handle.prepare_complete_ok(xact_info.handle)
3309
3310
3311 self._log.debug("Registering for NSR config using xpath: %s",
3312 NsrDtsHandler.NSR_XPATH)
3313
3314 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3315 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3316 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3317 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3318 on_prepare=on_prepare)
3319
3320 self._scale_regh = acg.register(
3321 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3322 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3323 )
3324
3325
3326 class NsrOpDataDtsHandler(object):
3327 """ The network service op data DTS handler """
3328 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3329
3330 def __init__(self, dts, log, loop, nsm):
3331 self._dts = dts
3332 self._log = log
3333 self._loop = loop
3334 self._nsm = nsm
3335 self._regh = None
3336
3337 @property
3338 def regh(self):
3339 """ Return the registration handle"""
3340 return self._regh
3341
3342 @property
3343 def nsm(self):
3344 """ Return the NS manager instance """
3345 return self._nsm
3346
3347 @asyncio.coroutine
3348 def register(self):
3349 """ Register for Nsr op data publisher registration"""
3350 self._log.debug("Registering Nsr op data path %s as publisher",
3351 NsrOpDataDtsHandler.XPATH)
3352
3353 hdl = rift.tasklets.DTS.RegistrationHandler()
3354 handlers = rift.tasklets.Group.Handler()
3355 with self._dts.group_create(handler=handlers) as group:
3356 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3357 handler=hdl,
3358 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3359
3360 @asyncio.coroutine
3361 def create(self, path, msg):
3362 """
3363 Create an NS record in DTS with the path and message
3364 """
3365 self._log.debug("Creating NSR %s:%s", path, msg)
3366 self.regh.create_element(path, msg)
3367 self._log.debug("Created NSR, %s:%s", path, msg)
3368
3369 @asyncio.coroutine
3370 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3371 """
3372 Update an NS record in DTS with the path and message
3373 """
3374 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3375 self.regh.update_element(path, msg, flags)
3376 self._log.debug("Updated NSR, %s:%s", path, msg)
3377
3378 @asyncio.coroutine
3379 def delete(self, path):
3380 """
3381 Update an NS record in DTS with the path and message
3382 """
3383 self._log.debug("Deleting NSR path:%s", path)
3384 self.regh.delete_element(path)
3385 self._log.debug("Deleted NSR path:%s", path)
3386
3387
3388 class VnfrDtsHandler(object):
3389 """ The virtual network service DTS handler """
3390 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3391
3392 def __init__(self, dts, log, loop, nsm):
3393 self._dts = dts
3394 self._log = log
3395 self._loop = loop
3396 self._nsm = nsm
3397
3398 self._regh = None
3399
3400 @property
3401 def regh(self):
3402 """ Return registration handle """
3403 return self._regh
3404
3405 @property
3406 def nsm(self):
3407 """ Return the NS manager instance """
3408 return self._nsm
3409
3410 @asyncio.coroutine
3411 def register(self):
3412 """ Register for vnfr create/update/delete/ advises from dts """
3413
3414 def on_commit(xact_info):
3415 """ The transaction has been committed """
3416 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3417 return rwdts.MemberRspCode.ACTION_OK
3418
3419 @asyncio.coroutine
3420 def on_prepare(xact_info, action, ks_path, msg):
3421 """ prepare callback from dts """
3422 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3423 self._log.debug(
3424 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3425 xact_info, action, ks_path, msg
3426 )
3427
3428 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3429 path_entry = schema.keyspec_to_entry(ks_path)
3430 if path_entry.key00.id not in self._nsm._vnfrs:
3431 self._log.error("%s request for non existent record path %s",
3432 action, xpath)
3433 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3434
3435 return
3436
3437 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3438 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3439 yield from self._nsm.update_vnfr(msg)
3440 elif action == rwdts.QueryAction.DELETE:
3441 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3442 self._nsm.delete_vnfr(path_entry.key00.id)
3443
3444 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3445
3446 self._log.debug("Registering for VNFR using xpath: %s",
3447 VnfrDtsHandler.XPATH,)
3448
3449 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3450 on_prepare=on_prepare,)
3451 with self._dts.group_create() as group:
3452 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3453 handler=hdl,
3454 flags=(rwdts.Flag.SUBSCRIBER),)
3455
3456
3457 class NsdRefCountDtsHandler(object):
3458 """ The NSD Ref Count DTS handler """
3459 XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3460
3461 def __init__(self, dts, log, loop, nsm):
3462 self._dts = dts
3463 self._log = log
3464 self._loop = loop
3465 self._nsm = nsm
3466
3467 self._regh = None
3468
3469 @property
3470 def regh(self):
3471 """ Return registration handle """
3472 return self._regh
3473
3474 @property
3475 def nsm(self):
3476 """ Return the NS manager instance """
3477 return self._nsm
3478
3479 @asyncio.coroutine
3480 def register(self):
3481 """ Register for NSD ref count read from dts """
3482
3483 @asyncio.coroutine
3484 def on_prepare(xact_info, action, ks_path, msg):
3485 """ prepare callback from dts """
3486 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3487
3488 if action == rwdts.QueryAction.READ:
3489 schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema()
3490 path_entry = schema.keyspec_to_entry(ks_path)
3491 nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref)
3492 for xpath, msg in nsd_list:
3493 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
3494 xpath=xpath,
3495 msg=msg)
3496 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3497 else:
3498 raise NetworkServiceRecordError("Not supported operation %s" % action)
3499
3500 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
3501 with self._dts.group_create() as group:
3502 self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH,
3503 handler=hdl,
3504 flags=rwdts.Flag.PUBLISHER,)
3505
3506
3507 class NsManager(object):
3508 """ The Network Service Manager class"""
3509 def __init__(self, dts, log, loop,
3510 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3511 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3512 self._dts = dts
3513 self._log = log
3514 self._loop = loop
3515 self._nsr_handler = nsr_handler
3516 self._vnfr_pub_handler = vnfr_handler
3517 self._vlr_pub_handler = vlr_handler
3518 self._vnffgmgr = vnffgmgr
3519 self._vnfd_pub_handler = vnfd_pub_handler
3520 self._cloud_account_handler = cloud_account_handler
3521
3522 self._ro_plugin_selector = ro_plugin_selector
3523 self._ncclient = rift.mano.ncclient.NcClient(
3524 host="127.0.0.1",
3525 port=2022,
3526 username="admin",
3527 password="admin",
3528 loop=self._loop)
3529
3530 self._nsrs = {}
3531 self._nsds = {}
3532 self._vnfds = {}
3533 self._vnfrs = {}
3534
3535 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3536
3537 # TODO: All these handlers should move to tasklet level.
3538 # Passing self is often an indication of bad design
3539 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3540 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3541 self._dts_handlers = [self._nsd_dts_handler,
3542 VnfrDtsHandler(dts, log, loop, self),
3543 NsdRefCountDtsHandler(dts, log, loop, self),
3544 NsrDtsHandler(dts, log, loop, self),
3545 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3546 NsrRpcDtsHandler(dts,log,loop,self),
3547 self._vnfd_dts_handler,
3548 self.cfgmgr_obj,
3549 ]
3550
3551
3552 @property
3553 def log(self):
3554 """ Log handle """
3555 return self._log
3556
3557 @property
3558 def loop(self):
3559 """ Loop """
3560 return self._loop
3561
3562 @property
3563 def dts(self):
3564 """ DTS handle """
3565 return self._dts
3566
3567 @property
3568 def nsr_handler(self):
3569 """" NSR handler """
3570 return self._nsr_handler
3571
3572 @property
3573 def so_obj(self):
3574 """" So Obj handler """
3575 return self._so_obj
3576
3577 @property
3578 def nsrs(self):
3579 """ NSRs in this NSM"""
3580 return self._nsrs
3581
3582 @property
3583 def nsds(self):
3584 """ NSDs in this NSM"""
3585 return self._nsds
3586
3587 @property
3588 def vnfds(self):
3589 """ VNFDs in this NSM"""
3590 return self._vnfds
3591
3592 @property
3593 def vnfrs(self):
3594 """ VNFRs in this NSM"""
3595 return self._vnfrs
3596
3597 @property
3598 def nsr_pub_handler(self):
3599 """ NSR publication handler """
3600 return self._nsr_handler
3601
3602 @property
3603 def vnfr_pub_handler(self):
3604 """ VNFR publication handler """
3605 return self._vnfr_pub_handler
3606
3607 @property
3608 def vlr_pub_handler(self):
3609 """ VLR publication handler """
3610 return self._vlr_pub_handler
3611
3612 @property
3613 def vnfd_pub_handler(self):
3614 return self._vnfd_pub_handler
3615
3616 @asyncio.coroutine
3617 def register(self):
3618 """ Register all static DTS handlers """
3619 for dts_handle in self._dts_handlers:
3620 yield from dts_handle.register()
3621
3622
3623 def get_ns_by_nsr_id(self, nsr_id):
3624 """ get NSR by nsr id """
3625 if nsr_id not in self._nsrs:
3626 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3627
3628 return self._nsrs[nsr_id]
3629
3630 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3631 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3632 nsr_id,
3633 scale_group_name,
3634 instance_id
3635 )
3636 nsr = self._nsrs[nsr_id]
3637 if nsr.state != NetworkServiceRecordState.RUNNING:
3638 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3639
3640 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3641
3642 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3643 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3644 nsr_id,
3645 scale_group_name,
3646 instance_id,
3647 )
3648 nsr = self._nsrs[nsr_id]
3649 if nsr.state != NetworkServiceRecordState.RUNNING:
3650 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3651
3652 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3653
3654 def scale_rpc_callback(self, xact, msg, action):
3655 """Callback handler for RPC calls
3656 Args:
3657 xact : Transaction Handler
3658 msg : RPC input
3659 action : Scaling Action
3660 """
3661 ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3662 ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3663
3664 xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3665 msg.nsr_id_ref)
3666 instance = ScalingGroupInstance.from_dict({"id": msg.instance_id})
3667
3668 @asyncio.coroutine
3669 def get_nsr_scaling_group():
3670 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3671
3672 for result in results:
3673 res = yield from result
3674 nsr_config = res.result
3675
3676 for scaling_group in nsr_config.scaling_group:
3677 if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref:
3678 break
3679 else:
3680 scaling_group = nsr_config.scaling_group.add()
3681 scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref
3682
3683 return (nsr_config, scaling_group)
3684
3685 @asyncio.coroutine
3686 def update_config(nsr_config):
3687 xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config)
3688 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3689 yield from self._ncclient.connect()
3690 yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace")
3691
3692 @asyncio.coroutine
3693 def scale_out():
3694 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3695 scaling_group.instance.append(instance)
3696 yield from update_config(nsr_config)
3697
3698 @asyncio.coroutine
3699 def scale_in():
3700 nsr_config, scaling_group = yield from get_nsr_scaling_group()
3701 scaling_group.instance.remove(instance)
3702 yield from update_config(nsr_config)
3703
3704 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3705 self._loop.create_task(scale_out())
3706 else:
3707 self._loop.create_task(scale_in())
3708
3709 # Opdata based calls, disabled for now!
3710 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3711 # self.scale_nsr_out(
3712 # msg.nsr_id_ref,
3713 # msg.scaling_group_name_ref,
3714 # msg.instance_id,
3715 # xact)
3716 # else:
3717 # self.scale_nsr_in(
3718 # msg.nsr_id_ref,
3719 # msg.scaling_group_name_ref,
3720 # msg.instance_id)
3721
3722 def nsr_update_cfg(self, nsr_id, msg):
3723 nsr = self._nsrs[nsr_id]
3724 nsr.nsr_cfg_msg= msg
3725
3726 def nsr_instantiate_vl(self, nsr_id, vld):
3727 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3728 nsr = self._nsrs[nsr_id]
3729 if nsr.state != NetworkServiceRecordState.RUNNING:
3730 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3731
3732 # Not calling in a separate task as this is called from a separate task
3733 yield from nsr.create_vl_instance(vld)
3734
3735 def nsr_terminate_vl(self, nsr_id, vld):
3736 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3737 nsr = self._nsrs[nsr_id]
3738 if nsr.state != NetworkServiceRecordState.RUNNING:
3739 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3740
3741 # Not calling in a separate task as this is called from a separate task
3742 yield from nsr.delete_vl_instance(vld)
3743
3744 def create_nsr(self, nsr_msg, restart_mode=False):
3745 """ Create an NSR instance """
3746 if nsr_msg.id in self._nsrs:
3747 msg = "NSR id %s already exists" % nsr_msg.id
3748 self._log.error(msg)
3749 raise NetworkServiceRecordError(msg)
3750
3751 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3752 nsr_msg.id,
3753 nsr_msg.nsd.id)
3754
3755 nsm_plugin = self._ro_plugin_selector.ro_plugin
3756 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3757
3758 nsr = NetworkServiceRecord(self._dts,
3759 self._log,
3760 self._loop,
3761 self,
3762 nsm_plugin,
3763 nsr_msg,
3764 sdn_account_name,
3765 restart_mode=restart_mode
3766 )
3767 self._nsrs[nsr_msg.id] = nsr
3768 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd)
3769
3770 return nsr
3771
3772 def delete_nsr(self, nsr_id):
3773 """
3774 Delete NSR with the passed nsr id
3775 """
3776 del self._nsrs[nsr_id]
3777
3778 @asyncio.coroutine
3779 def instantiate_ns(self, nsr_id, config_xact):
3780 """ Instantiate an NS instance """
3781 self._log.debug("Instantiating Network service id %s", nsr_id)
3782 if nsr_id not in self._nsrs:
3783 err = "NSR id %s not found " % nsr_id
3784 self._log.error(err)
3785 raise NetworkServiceRecordError(err)
3786
3787 nsr = self._nsrs[nsr_id]
3788 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3789
3790 @asyncio.coroutine
3791 def update_vnfr(self, vnfr):
3792 """Create/Update an VNFR """
3793
3794 vnfr_state = self._vnfrs[vnfr.id].state
3795 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3796
3797 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3798 nsr = self.find_nsr_for_vnfr(vnfr.id)
3799 yield from nsr.update_state()
3800
3801 def find_nsr_for_vnfr(self, vnfr_id):
3802 """ Find the NSR which )has the passed vnfr id"""
3803 for nsr in list(self.nsrs.values()):
3804 for vnfr in list(nsr.vnfrs.values()):
3805 if vnfr.id == vnfr_id:
3806 return nsr
3807 return None
3808
3809 def delete_vnfr(self, vnfr_id):
3810 """ Delete VNFR with the passed id"""
3811 del self._vnfrs[vnfr_id]
3812
3813 def get_nsd_ref(self, nsd_id):
3814 """ Get network service descriptor for the passed nsd_id
3815 with a reference"""
3816 nsd = self.get_nsd(nsd_id)
3817 nsd.ref()
3818 return nsd
3819
3820 @asyncio.coroutine
3821 def get_nsr_config(self, nsd_id):
3822 xpath = "C,/nsr:ns-instance-config"
3823 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3824
3825 for result in results:
3826 entry = yield from result
3827 ns_instance_config = entry.result
3828
3829 for nsr in ns_instance_config.nsr:
3830 if nsr.nsd.id == nsd_id:
3831 return nsr
3832
3833 return None
3834
3835 @asyncio.coroutine
3836 def nsd_unref_by_nsr_id(self, nsr_id):
3837 """ Unref the network service descriptor based on NSR id """
3838 self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id)
3839 if nsr_id in self._nsrs:
3840 nsr = self._nsrs[nsr_id]
3841
3842 try:
3843 nsd = self.get_nsd(nsr.nsd_id)
3844 self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3845 nsd.id, nsr.id, nsd.ref_count)
3846 nsd.unref()
3847 except NetworkServiceDescriptorError:
3848 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3849 # could be deleted
3850 pass
3851
3852 else:
3853 self._log.error("Cannot find NSR with id %s", nsr_id)
3854 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id)
3855
3856 @asyncio.coroutine
3857 def nsd_unref(self, nsd_id):
3858 """ Unref the network service descriptor associated with the id """
3859 nsd = self.get_nsd(nsd_id)
3860 nsd.unref()
3861
3862 def get_nsd(self, nsd_id):
3863 """ Get network service descriptor for the passed nsd_id"""
3864 if nsd_id not in self._nsds:
3865 self._log.error("Cannot find NSD id:%s", nsd_id)
3866 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3867
3868 return self._nsds[nsd_id]
3869
3870 def create_nsd(self, nsd_msg):
3871 """ Create a network service descriptor """
3872 self._log.debug("Create network service descriptor - %s", nsd_msg)
3873 if nsd_msg.id in self._nsds:
3874 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3875 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3876
3877 nsd = NetworkServiceDescriptor(
3878 self._dts,
3879 self._log,
3880 self._loop,
3881 nsd_msg,
3882 self
3883 )
3884 self._nsds[nsd_msg.id] = nsd
3885
3886 return nsd
3887
3888 def update_nsd(self, nsd):
3889 """ update the Network service descriptor """
3890 self._log.debug("Update network service descriptor - %s", nsd)
3891 if nsd.id not in self._nsds:
3892 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3893 self.create_nsd(nsd)
3894 else:
3895 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3896 self._nsds[nsd.id].update(nsd)
3897
3898 def delete_nsd(self, nsd_id):
3899 """ Delete the Network service descriptor with the passed id """
3900 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3901 if nsd_id not in self._nsds:
3902 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3903 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
3904
3905 if nsd_id not in self._nsds:
3906 self._log.debug("Cannot delete NSD id %s reference exists %s",
3907 nsd_id,
3908 self._nsds[nsd_id].ref_count)
3909 raise NetworkServiceDescriptorRefCountExists(
3910 "Cannot delete :%s, ref_count:%s",
3911 nsd_id,
3912 self._nsds[nsd_id].ref_count)
3913
3914 del self._nsds[nsd_id]
3915
3916 def get_vnfd_config(self, xact):
3917 vnfd_dts_reg = self._vnfd_dts_handler.regh
3918 for cfg in vnfd_dts_reg.get_xact_elements(xact):
3919 if cfg.id not in self._vnfds:
3920 self.create_vnfd(cfg)
3921
3922 def get_vnfd(self, vnfd_id, xact):
3923 """ Get virtual network function descriptor for the passed vnfd_id"""
3924 if vnfd_id not in self._vnfds:
3925 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3926 self.get_vnfd_config(xact)
3927
3928 if vnfd_id not in self._vnfds:
3929 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3930 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
3931
3932 return self._vnfds[vnfd_id]
3933
3934 def create_vnfd(self, vnfd):
3935 """ Create a virtual network function descriptor """
3936 self._log.debug("Create virtual network function descriptor - %s", vnfd)
3937 if vnfd.id in self._vnfds:
3938 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
3939 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
3940
3941 self._vnfds[vnfd.id] = vnfd
3942 return self._vnfds[vnfd.id]
3943
3944 def update_vnfd(self, vnfd):
3945 """ Update the virtual network function descriptor """
3946 self._log.debug("Update virtual network function descriptor- %s", vnfd)
3947
3948 # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511
3949 for ivld in vnfd.internal_vld:
3950 ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref))
3951
3952 if vnfd.id not in self._vnfds:
3953 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
3954 self.create_vnfd(vnfd)
3955 else:
3956 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
3957 self._vnfds[vnfd.id] = vnfd
3958
3959 @asyncio.coroutine
3960 def delete_vnfd(self, vnfd_id):
3961 """ Delete the virtual network function descriptor with the passed id """
3962 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
3963 if vnfd_id not in self._vnfds:
3964 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
3965 raise VnfDescriptorError("Cannot find %s", vnfd_id)
3966
3967 del self._vnfds[vnfd_id]
3968
3969 def nsd_in_use(self, nsd_id):
3970 """ Is the NSD with the passed id in use """
3971 self._log.debug("Is this NSD in use - msg:%s", nsd_id)
3972 if nsd_id in self._nsds:
3973 return self._nsds[nsd_id].in_use()
3974 return False
3975
3976 @asyncio.coroutine
3977 def publish_nsr(self, xact, path, msg):
3978 """ Publish a NSR """
3979 self._log.debug("Publish NSR with path %s, msg %s",
3980 path, msg)
3981 yield from self.nsr_handler.update(xact, path, msg)
3982
3983 @asyncio.coroutine
3984 def unpublish_nsr(self, xact, path):
3985 """ Un Publish an NSR """
3986 self._log.debug("Publishing delete NSR with path %s", path)
3987 yield from self.nsr_handler.delete(path, xact)
3988
3989 def vnfr_is_ready(self, vnfr_id):
3990 """ VNFR with the id is ready """
3991 self._log.debug("VNFR id %s ready", vnfr_id)
3992 if vnfr_id not in self._vnfds:
3993 err = "Did not find VNFR ID with id %s" % vnfr_id
3994 self._log.critical("err")
3995 raise VirtualNetworkFunctionRecordError(err)
3996 self._vnfrs[vnfr_id].is_ready()
3997
3998 @asyncio.coroutine
3999 def get_nsd_refcount(self, nsd_id):
4000 """ Get the nsd_list from this NSM"""
4001
4002 def nsd_refcount_xpath(nsd_id):
4003 """ xpath for ref count entry """
4004 return (NsdRefCountDtsHandler.XPATH +
4005 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id)
4006
4007 nsd_list = []
4008 if nsd_id is None or nsd_id == "":
4009 for nsd in self._nsds.values():
4010 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4011 nsd_msg.nsd_id_ref = nsd.id
4012 nsd_msg.instance_ref_count = nsd.ref_count
4013 nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg))
4014 elif nsd_id in self._nsds:
4015 nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4016 nsd_msg.nsd_id_ref = self._nsds[nsd_id].id
4017 nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count
4018 nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg))
4019
4020 return nsd_list
4021
4022 @asyncio.coroutine
4023 def terminate_ns(self, nsr_id, xact):
4024 """
4025 Terminate network service for the given NSR Id
4026 """
4027
4028 # Terminate the instances/networks assocaited with this nw service
4029 self._log.debug("Terminating the network service %s", nsr_id)
4030 yield from self._nsrs[nsr_id].terminate()
4031
4032 # Unref the NSD
4033 yield from self.nsd_unref_by_nsr_id(nsr_id)
4034
4035 # Unpublish the NSR record
4036 self._log.debug("Unpublishing the network service %s", nsr_id)
4037 yield from self._nsrs[nsr_id].unpublish(xact)
4038
4039 # Finaly delete the NS instance from this NS Manager
4040 self._log.debug("Deletng the network service %s", nsr_id)
4041 self.delete_nsr(nsr_id)
4042
4043
4044 class NsmRecordsPublisherProxy(object):
4045 """ This class provides a publisher interface that allows plugin objects
4046 to publish NSR/VNFR/VLR"""
4047
4048 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4049 self._dts = dts
4050 self._log = log
4051 self._loop = loop
4052 self._nsr_pub_hdlr = nsr_pub_hdlr
4053 self._vlr_pub_hdlr = vlr_pub_hdlr
4054 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4055
4056 @asyncio.coroutine
4057 def publish_nsr(self, xact, nsr):
4058 """ Publish an NSR """
4059 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4060 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4061
4062 @asyncio.coroutine
4063 def unpublish_nsr(self, xact, nsr):
4064 """ Unpublish an NSR """
4065 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4066 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4067
4068 @asyncio.coroutine
4069 def publish_vnfr(self, xact, vnfr):
4070 """ Publish an VNFR """
4071 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4072 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4073
4074 @asyncio.coroutine
4075 def unpublish_vnfr(self, xact, vnfr):
4076 """ Unpublish a VNFR """
4077 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4078 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4079
4080 @asyncio.coroutine
4081 def publish_vlr(self, xact, vlr):
4082 """ Publish a VLR """
4083 path = VirtualLinkRecord.vlr_xpath(vlr)
4084 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4085
4086 @asyncio.coroutine
4087 def unpublish_vlr(self, xact, vlr):
4088 """ Unpublish a VLR """
4089 path = VirtualLinkRecord.vlr_xpath(vlr)
4090 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4091
4092
4093 class ScalingRpcHandler(mano_dts.DtsHandler):
4094 """ The Network service Monitor DTS handler """
4095 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4096 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4097
4098 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4099 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4100
4101 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4102
4103 def __init__(self, log, dts, loop, callback=None):
4104 super().__init__(log, dts, loop)
4105 self.callback = callback
4106 self.last_instance_id = defaultdict(int)
4107
4108 @asyncio.coroutine
4109 def register(self):
4110
4111 @asyncio.coroutine
4112 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4113 assert action == rwdts.QueryAction.RPC
4114
4115 try:
4116 if self.callback:
4117 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4118
4119 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4120 "instance_id": msg.instance_id})
4121
4122 xact_info.respond_xpath(
4123 rwdts.XactRspCode.ACK,
4124 self.__class__.SCALE_IN_OUTPUT_XPATH,
4125 rpc_op)
4126
4127 except Exception as e:
4128 self.log.exception(e)
4129 xact_info.respond_xpath(
4130 rwdts.XactRspCode.NACK,
4131 self.__class__.SCALE_IN_OUTPUT_XPATH)
4132
4133 @asyncio.coroutine
4134 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4135 assert action == rwdts.QueryAction.RPC
4136
4137 try:
4138 scaling_group = msg.scaling_group_name_ref
4139 if not msg.instance_id:
4140 last_instance_id = self.last_instance_id[scale_group]
4141 msg.instance_id = last_instance_id + 1
4142 self.last_instance_id[scale_group] += 1
4143
4144 if self.callback:
4145 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4146
4147 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4148 "instance_id": msg.instance_id})
4149
4150 xact_info.respond_xpath(
4151 rwdts.XactRspCode.ACK,
4152 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4153 rpc_op)
4154
4155 except Exception as e:
4156 self.log.exception(e)
4157 xact_info.respond_xpath(
4158 rwdts.XactRspCode.NACK,
4159 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4160
4161 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4162 on_prepare=on_scale_in_prepare)
4163 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4164 on_prepare=on_scale_out_prepare)
4165
4166 with self.dts.group_create() as group:
4167 group.register(
4168 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4169 handler=scale_in_hdl,
4170 flags=rwdts.Flag.PUBLISHER)
4171 group.register(
4172 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4173 handler=scale_out_hdl,
4174 flags=rwdts.Flag.PUBLISHER)
4175
4176
4177 class NsmTasklet(rift.tasklets.Tasklet):
4178 """
4179 The network service manager tasklet
4180 """
4181 def __init__(self, *args, **kwargs):
4182 super(NsmTasklet, self).__init__(*args, **kwargs)
4183 self.rwlog.set_category("rw-mano-log")
4184 self.rwlog.set_subcategory("nsm")
4185
4186 self._dts = None
4187 self._nsm = None
4188
4189 self._ro_plugin_selector = None
4190 self._vnffgmgr = None
4191
4192 self._nsr_handler = None
4193 self._vnfr_pub_handler = None
4194 self._vlr_pub_handler = None
4195 self._vnfd_pub_handler = None
4196 self._scale_cfg_handler = None
4197
4198 self._records_publisher_proxy = None
4199
4200 def start(self):
4201 """ The task start callback """
4202 super(NsmTasklet, self).start()
4203 self.log.info("Starting NsmTasklet")
4204
4205 self.log.debug("Registering with dts")
4206 self._dts = rift.tasklets.DTS(self.tasklet_info,
4207 RwNsmYang.get_schema(),
4208 self.loop,
4209 self.on_dts_state_change)
4210
4211 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4212
4213 def stop(self):
4214 try:
4215 self._dts.deinit()
4216 except Exception:
4217 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4218 raise
4219
4220 def on_instance_started(self):
4221 """ Task instance started callback """
4222 self.log.debug("Got instance started callback")
4223
4224 @asyncio.coroutine
4225 def init(self):
4226 """ Task init callback """
4227 self.log.debug("Got instance started callback")
4228
4229 self.log.debug("creating config account handler")
4230
4231 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4232 yield from self._nsr_pub_handler.register()
4233
4234 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4235 yield from self._vnfr_pub_handler.register()
4236
4237 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4238 yield from self._vlr_pub_handler.register()
4239
4240 manifest = self.tasklet_info.get_pb_manifest()
4241 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4242 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4243 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4244
4245 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4246
4247 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4248 self._dts,
4249 self.log,
4250 self.loop,
4251 self._nsr_pub_handler,
4252 self._vnfr_pub_handler,
4253 self._vlr_pub_handler,
4254 )
4255
4256 # Register the NSM to receive the nsm plugin
4257 # when cloud account is configured
4258 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4259 self._dts,
4260 self.log,
4261 self.loop,
4262 self._records_publisher_proxy,
4263 )
4264 yield from self._ro_plugin_selector.register()
4265
4266 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4267 self._log,
4268 self._dts,
4269 self.log_hdl)
4270
4271 yield from self._cloud_account_handler.register()
4272
4273 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4274 yield from self._vnffgmgr.register()
4275
4276 self._nsm = NsManager(
4277 self._dts,
4278 self.log,
4279 self.loop,
4280 self._nsr_pub_handler,
4281 self._vnfr_pub_handler,
4282 self._vlr_pub_handler,
4283 self._ro_plugin_selector,
4284 self._vnffgmgr,
4285 self._vnfd_pub_handler,
4286 self._cloud_account_handler
4287 )
4288
4289 yield from self._nsm.register()
4290
4291 @asyncio.coroutine
4292 def run(self):
4293 """ Task run callback """
4294 pass
4295
4296 @asyncio.coroutine
4297 def on_dts_state_change(self, state):
4298 """Take action according to current dts state to transition
4299 application into the corresponding application state
4300
4301 Arguments
4302 state - current dts state
4303 """
4304 switch = {
4305 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4306 rwdts.State.CONFIG: rwdts.State.RUN,
4307 }
4308
4309 handlers = {
4310 rwdts.State.INIT: self.init,
4311 rwdts.State.RUN: self.run,
4312 }
4313
4314 # Transition application to next state
4315 handler = handlers.get(state, None)
4316 if handler is not None:
4317 yield from handler()
4318
4319 # Transition dts to next state
4320 next_state = switch.get(state, None)
4321 if next_state is not None:
4322 self.log.debug("Changing state to %s", next_state)
4323 self._dts.handle.set_state(next_state)