7cbc194b99dd568c7c62dd46bc080473b7756556
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import ncclient
20 import ncclient.asyncio_manager
21 import os
22 import shutil
23 import sys
24 import tempfile
25 import time
26 import uuid
27 import yaml
28 import requests
29 import json
30
31
32 from collections import deque
33 from collections import defaultdict
34 from enum import Enum
35
36 import gi
37 gi.require_version('RwYang', '1.0')
38 gi.require_version('RwNsdYang', '1.0')
39 gi.require_version('RwDts', '1.0')
40 gi.require_version('RwNsmYang', '1.0')
41 gi.require_version('RwNsrYang', '1.0')
42 gi.require_version('RwTypes', '1.0')
43 gi.require_version('RwVlrYang', '1.0')
44 gi.require_version('RwVnfrYang', '1.0')
45 from gi.repository import (
46 RwYang,
47 RwNsrYang,
48 NsrYang,
49 NsdYang,
50 RwVlrYang,
51 VnfrYang,
52 RwVnfrYang,
53 RwNsmYang,
54 RwsdnalYang,
55 RwDts as rwdts,
56 RwTypes,
57 ProtobufC,
58 )
59
60 import rift.tasklets
61 import rift.mano.ncclient
62 import rift.mano.config_data.config
63 import rift.mano.dts as mano_dts
64
65 from . import rwnsm_conman as conman
66 from . import cloud
67 from . import publisher
68 from . import xpath
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
72
73
74 class NetworkServiceRecordState(Enum):
75 """ Network Service Record State """
76 INIT = 101
77 VL_INIT_PHASE = 102
78 VNF_INIT_PHASE = 103
79 VNFFG_INIT_PHASE = 104
80 RUNNING = 106
81 SCALING_OUT = 107
82 SCALING_IN = 108
83 TERMINATE = 109
84 TERMINATE_RCVD = 110
85 VL_TERMINATE_PHASE = 111
86 VNF_TERMINATE_PHASE = 112
87 VNFFG_TERMINATE_PHASE = 113
88 TERMINATED = 114
89 FAILED = 115
90 VL_INSTANTIATE = 116
91 VL_TERMINATE = 117
92
93
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
96 pass
97
98
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
101 pass
102
103
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
106 pass
107
108
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
111 pass
112
113
114 class NetworkServiceDescriptorNotFound(Exception):
115 """ Network Service Descriptor reference count exists """
116 pass
117
118 class NsrInstantiationFailed(Exception):
119 """ Failed to instantiate network service """
120 pass
121
122
123 class VnfInstantiationFailed(Exception):
124 """ Failed to instantiate virtual network function"""
125 pass
126
127
128 class VnffgInstantiationFailed(Exception):
129 """ Failed to instantiate virtual network function"""
130 pass
131
132
133 class VnfDescriptorError(Exception):
134 """Failed to instantiate virtual network function"""
135 pass
136
137
138 class ScalingOperationError(Exception):
139 pass
140
141
142 class ScaleGroupMissingError(Exception):
143 pass
144
145
146 class PlacementGroupError(Exception):
147 pass
148
149
150 class NsrNsdUpdateError(Exception):
151 pass
152
153
154 class NsrVlUpdateError(NsrNsdUpdateError):
155 pass
156
157
158 class VlRecordState(Enum):
159 """ VL Record State """
160 INIT = 101
161 INSTANTIATION_PENDING = 102
162 ACTIVE = 103
163 TERMINATE_PENDING = 104
164 TERMINATED = 105
165 FAILED = 106
166
167
168 class VnffgRecordState(Enum):
169 """ VNFFG Record State """
170 INIT = 101
171 INSTANTIATION_PENDING = 102
172 ACTIVE = 103
173 TERMINATE_PENDING = 104
174 TERMINATED = 105
175 FAILED = 106
176
177
178 class VnffgRecord(object):
179 """ Vnffg Records class"""
180 SFF_DP_PORT = 4790
181 SFF_MGMT_PORT = 5000
182 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name):
183
184 self._dts = dts
185 self._log = log
186 self._loop = loop
187 self._vnffgmgr = vnffgmgr
188 self._nsr = nsr
189 self._nsr_name = nsr_name
190 self._vnffgd_msg = vnffgd_msg
191 if sdn_account_name is None:
192 self._sdn_account_name = ''
193 else:
194 self._sdn_account_name = sdn_account_name
195
196 self._vnffgr_id = str(uuid.uuid4())
197 self._vnffgr_rsp_id = list()
198 self._vnffgr_state = VnffgRecordState.INIT
199
200 @property
201 def id(self):
202 """ VNFFGR id """
203 return self._vnffgr_id
204
205 @property
206 def state(self):
207 """ state of this VNF """
208 return self._vnffgr_state
209
210 def fetch_vnffgr(self):
211 """
212 Get VNFFGR message to be published
213 """
214
215 if self._vnffgr_state == VnffgRecordState.INIT:
216 vnffgr_dict = {"id": self._vnffgr_id,
217 "vnffgd_id_ref": self._vnffgd_msg.id,
218 "vnffgd_name_ref": self._vnffgd_msg.name,
219 "sdn_account": self._sdn_account_name,
220 "operational_status": 'init',
221 }
222 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
223 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
224 vnffgr_dict = {"id": self._vnffgr_id,
225 "vnffgd_id_ref": self._vnffgd_msg.id,
226 "vnffgd_name_ref": self._vnffgd_msg.name,
227 "sdn_account": self._sdn_account_name,
228 "operational_status": 'terminated',
229 }
230 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
231 else:
232 try:
233 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
234 except Exception:
235 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
236 self._vnffgr_state = VnffgRecordState.FAILED
237 vnffgr_dict = {"id": self._vnffgr_id,
238 "vnffgd_id_ref": self._vnffgd_msg.id,
239 "vnffgd_name_ref": self._vnffgd_msg.name,
240 "sdn_account": self._sdn_account_name,
241 "operational_status": 'failed',
242 }
243 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
244
245 return vnffgr
246
247 @asyncio.coroutine
248 def vnffgr_create_msg(self):
249 """ Virtual Link Record message for Creating VLR in VNS """
250 vnffgr_dict = {"id": self._vnffgr_id,
251 "vnffgd_id_ref": self._vnffgd_msg.id,
252 "vnffgd_name_ref": self._vnffgd_msg.name,
253 "sdn_account": self._sdn_account_name,
254 }
255 vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
256 for rsp in self._vnffgd_msg.rsp:
257 vnffgr_rsp = vnffgr.rsp.add()
258 vnffgr_rsp.id = str(uuid.uuid4())
259 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
260 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
261 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
262 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
263 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
264 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
265 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
266 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
267 self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
268 else:
269 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref)
270 continue
271
272 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
273 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
274 vnfr_cp_ref.hop_number = rsp_cp_ref.order
275 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
276 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
277 for nsr_vnfr in self._nsr.vnfrs.values():
278 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
279 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
280 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
281 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
282 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
283
284 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
285 self._log.debug(" Received VNFR is %s", vnfr)
286 while vnfr.operational_status != 'running':
287 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
288 if vnfr.operational_status == 'failed':
289 self._log.error("Fetching VNFR for %s failed", vnfr.id)
290 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
291 yield from asyncio.sleep(2, loop=self._loop)
292 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
293 self._log.debug("Received VNFR is %s", vnfr)
294
295 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
296 for cp in vnfr.connection_point:
297 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
298 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
299 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
300 for vdu in vnfr.vdur:
301 for ext_intf in vdu.external_interface:
302 if ext_intf.name == vnfr_cp_ref.vnfr_connection_point_ref:
303 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
304 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
305 vnfr_cp_ref.connection_point_params.vm_id)
306 break
307
308 vnfr_cp_ref.connection_point_params.address = cp.ip_address
309 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
310
311 for vnffgd_classifier in self._vnffgd_msg.classifier:
312 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
313 if len(_rsp) > 0:
314 rsp_id_ref = _rsp[0].id
315 rsp_name = _rsp[0].name
316 else:
317 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
318 continue
319 vnffgr_classifier = vnffgr.classifier.add()
320 vnffgr_classifier.id = vnffgd_classifier.id
321 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
322 _rsp[0].classifier_name = vnffgr_classifier.name
323 vnffgr_classifier.rsp_id_ref = rsp_id_ref
324 vnffgr_classifier.rsp_name = rsp_name
325 for nsr_vnfr in self._nsr.vnfrs.values():
326 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
327 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
328 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
329 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
330 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
331
332 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
333 vnffgr_classifier.sff_name = nsr_vnfr.name
334
335 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
336 self._log.debug(" Received VNFR is %s", vnfr)
337 while vnfr.operational_status != 'running':
338 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
339 if vnfr.operational_status == 'failed':
340 self._log.error("Fetching VNFR for %s failed", vnfr.id)
341 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
342 yield from asyncio.sleep(2, loop=self._loop)
343 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
344 self._log.debug("Received VNFR is %s", vnfr)
345
346 for cp in vnfr.connection_point:
347 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
348 vnffgr_classifier.port_id = cp.connection_point_id
349 vnffgr_classifier.ip_address = cp.ip_address
350 for vdu in vnfr.vdur:
351 for ext_intf in vdu.external_interface:
352 if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref:
353 vnffgr_classifier.vm_id = vdu.vim_id
354 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
355 vnfr_cp_ref.connection_point_params.vm_id)
356 break
357
358 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
359 return vnffgr
360
361 @asyncio.coroutine
362 def vnffgr_nsr_sff_list(self):
363 """ SFF List for VNFR """
364 sff_list = {}
365 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
366
367 for nsr_vnfr in self._nsr.vnfrs.values():
368 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
369 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
370 self._log.debug(" Received VNFR is %s", vnfr)
371 while vnfr.operational_status != 'running':
372 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
373 if vnfr.operational_status == 'failed':
374 self._log.error("Fetching VNFR for %s failed", vnfr.id)
375 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
376 yield from asyncio.sleep(2, loop=self._loop)
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug("Received VNFR is %s", vnfr)
379
380 sff = RwsdnalYang.VNFFGSff()
381 sff_list[nsr_vnfr.vnfd.id] = sff
382 sff.name = nsr_vnfr.name
383 sff.function_type = nsr_vnfr.vnfd.service_function_chain
384
385 sff.mgmt_address = vnfr.mgmt_interface.ip_address
386 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
387 for cp in vnfr.connection_point:
388 sff_dp = sff.dp_endpoints.add()
389 sff_dp.name = self._nsr.name + '.' + cp.name
390 sff_dp.address = cp.ip_address
391 sff_dp.port = VnffgRecord.SFF_DP_PORT
392 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
393 for sf_name in sf_list:
394 _sf = sff.vnfr_list.add()
395 _sf.vnfr_name = sf_name
396
397 return sff_list
398
399 @asyncio.coroutine
400 def instantiate(self):
401 """ Instantiate this VNFFG """
402
403 self._log.info("Instaniating VNFFGR with vnffgd %s",
404 self._vnffgd_msg)
405
406
407 vnffgr_request = yield from self.vnffgr_create_msg()
408 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
409
410 try:
411 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
412 except Exception as e:
413 self._log.exception("VNFFG instantiation failed: %s", str(e))
414 self._vnffgr_state = VnffgRecordState.FAILED
415 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
416
417 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
418
419 self._log.info("Instantiated VNFFGR :%s", vnffgr)
420 self._vnffgr_state = VnffgRecordState.ACTIVE
421
422 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
423 yield from self._nsr.update_state()
424
425 def vnffgr_in_vnffgrm(self):
426 """ Is there a VNFR record in VNFM """
427 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
428 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
429 self._vnffgr_state == VnffgRecordState.FAILED):
430 return True
431
432 return False
433
434 @asyncio.coroutine
435 def terminate(self):
436 """ Terminate this VNFFGR """
437 if not self.vnffgr_in_vnffgrm():
438 self._log.error("Ignoring terminate request for id %s in state %s",
439 self.id, self._vnffgr_state)
440 return
441
442 self._log.info("Terminating VNFFGR id:%s", self.id)
443 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
444
445 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
446
447 self._vnffgr_state = VnffgRecordState.TERMINATED
448 self._log.debug("Terminated VNFFGR id:%s", self.id)
449
450
451 class VirtualLinkRecord(object):
452 """ Virtual Link Records class"""
453 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
454 @staticmethod
455 @asyncio.coroutine
456 def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False):
457 """Creates a new VLR object based on the given data.
458
459 If restart mode is enabled, then we look for existing records in the
460 DTS and create a VLR records using the exiting data(ID)
461
462 Returns:
463 VirtualLinkRecord
464 """
465 vlr_obj = VirtualLinkRecord(
466 dts,
467 log,
468 loop,
469 nsr_name,
470 vld_msg,
471 cloud_account_name,
472 om_datacenter,
473 ip_profile,
474 nsr_id,
475 )
476
477 if restart_mode:
478 res_iter = yield from dts.query_read(
479 "D,/vlr:vlr-catalog/vlr:vlr",
480 rwdts.XactFlag.MERGE)
481
482 for fut in res_iter:
483 response = yield from fut
484 vlr = response.result
485
486 # Check if the record is already present, if so use the ID of
487 # the existing record. Since the name of the record is uniquely
488 # formed we can use it as a search key!
489 if vlr.name == vlr_obj.name:
490 vlr_obj.reset_id(vlr.id)
491 break
492
493 return vlr_obj
494
495 def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id):
496 self._dts = dts
497 self._log = log
498 self._loop = loop
499 self._nsr_name = nsr_name
500 self._vld_msg = vld_msg
501 self._cloud_account_name = cloud_account_name
502 self._om_datacenter_name = om_datacenter
503 self._assigned_subnet = None
504 self._nsr_id = nsr_id
505 self._ip_profile = ip_profile
506 self._vlr_id = str(uuid.uuid4())
507 self._state = VlRecordState.INIT
508 self._prev_state = None
509 self._create_time = int(time.time())
510
511 @property
512 def xpath(self):
513 """ path for this object """
514 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id)
515
516 @property
517 def id(self):
518 """ VLR id """
519 return self._vlr_id
520
521 @property
522 def nsr_name(self):
523 """ Get NSR name for this VL """
524 return self.nsr_name
525
526 @property
527 def vld_msg(self):
528 """ Virtual Link Desciptor """
529 return self._vld_msg
530
531 @property
532 def assigned_subnet(self):
533 """ Subnet assigned to this VL"""
534 return self._assigned_subnet
535
536 @property
537 def name(self):
538 """
539 Get the name for this VLR.
540 VLR name is "nsr name:VLD name"
541 """
542 if self.vld_msg.vim_network_name:
543 return self.vld_msg.vim_network_name
544 elif self.vld_msg.name == "multisite":
545 # This is a temporary hack to identify manually provisioned inter-site network
546 return self.vld_msg.name
547 else:
548 return self._nsr_name + "." + self.vld_msg.name
549
550 @property
551 def cloud_account_name(self):
552 """ Cloud account that this VLR should be created in """
553 return self._cloud_account_name
554
555 @property
556 def om_datacenter_name(self):
557 """ Datacenter that this VLR should be created in """
558 return self._om_datacenter_name
559
560 @staticmethod
561 def vlr_xpath(vlr):
562 """ Get the VLR path from VLR """
563 return (VirtualLinkRecord.XPATH + "[vlr:id = '{}']").format(vlr.id)
564
565 @property
566 def state(self):
567 """ VLR state """
568 return self._state
569
570 @state.setter
571 def state(self, value):
572 """ VLR set state """
573 self._state = value
574
575 @property
576 def prev_state(self):
577 """ VLR previous state """
578 return self._prev_state
579
580 @prev_state.setter
581 def prev_state(self, value):
582 """ VLR set previous state """
583 self._prev_state = value
584
585 @property
586 def vlr_msg(self):
587 """ Virtual Link Record message for Creating VLR in VNS """
588 vld_fields = ["short_name",
589 "vendor",
590 "description",
591 "version",
592 "type_yang",
593 "vim_network_name",
594 "provider_network"]
595
596 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
597 if k in vld_fields}
598
599 vlr_dict = {"id": self._vlr_id,
600 "nsr_id_ref": self._nsr_id,
601 "vld_ref": self.vld_msg.id,
602 "name": self.name,
603 "create_time": self._create_time,
604 "cloud_account": self.cloud_account_name,
605 "om_datacenter": self.om_datacenter_name,
606 }
607
608 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
609 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
610
611 vlr_dict.update(vld_copy_dict)
612 vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict)
613 return vlr
614
615 def reset_id(self, vlr_id):
616 self._vlr_id = vlr_id
617
618 def create_nsr_vlr_msg(self, vnfrs):
619 """ The VLR message"""
620 nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
621 nsr_vlr.vlr_ref = self._vlr_id
622 nsr_vlr.assigned_subnet = self.assigned_subnet
623 nsr_vlr.cloud_account = self.cloud_account_name
624 nsr_vlr.om_datacenter = self.om_datacenter_name
625
626 for conn in self.vld_msg.vnfd_connection_point_ref:
627 for vnfr in vnfrs:
628 if (vnfr.vnfd.id == conn.vnfd_id_ref and
629 vnfr.member_vnf_index == conn.member_vnf_index_ref and
630 self.cloud_account_name == vnfr.cloud_account_name and
631 self.om_datacenter_name == vnfr.om_datacenter_name):
632 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
633 cp_entry.vnfr_id = vnfr.id
634 cp_entry.connection_point = conn.vnfd_connection_point_ref
635
636 return nsr_vlr
637
638 @asyncio.coroutine
639 def instantiate(self):
640 """ Instantiate this VL """
641 self._log.debug("Instaniating VLR key %s, vld %s",
642 self.xpath, self._vld_msg)
643 vlr = None
644 self._state = VlRecordState.INSTANTIATION_PENDING
645 self._log.debug("Executing VL create path:%s msg:%s",
646 self.xpath, self.vlr_msg)
647
648 with self._dts.transaction(flags=0) as xact:
649 block = xact.block_create()
650 block.add_query_create(self.xpath, self.vlr_msg)
651 self._log.debug("Executing VL create path:%s msg:%s",
652 self.xpath, self.vlr_msg)
653 res_iter = yield from block.execute(now=True)
654 for ent in res_iter:
655 res = yield from ent
656 vlr = res.result
657
658 if vlr is None:
659 self._state = VlRecordState.FAILED
660 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
661
662 if vlr.operational_status == 'failed':
663 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
664 self._state = VlRecordState.FAILED
665 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
666
667 self._log.info("Instantiated VL with xpath %s and vlr:%s",
668 self.xpath, vlr)
669 self._state = VlRecordState.ACTIVE
670 self._assigned_subnet = vlr.assigned_subnet
671
672 def vlr_in_vns(self):
673 """ Is there a VLR record in VNS """
674 if (self._state == VlRecordState.ACTIVE or
675 self._state == VlRecordState.INSTANTIATION_PENDING or
676 self._state == VlRecordState.TERMINATE_PENDING or
677 self._state == VlRecordState.FAILED):
678 return True
679
680 return False
681
682 @asyncio.coroutine
683 def terminate(self):
684 """ Terminate this VL """
685 if not self.vlr_in_vns():
686 self._log.debug("Ignoring terminate request for id %s in state %s",
687 self.id, self._state)
688 return
689
690 self._log.debug("Terminating VL id:%s", self.id)
691 self._state = VlRecordState.TERMINATE_PENDING
692
693 with self._dts.transaction(flags=0) as xact:
694 block = xact.block_create()
695 block.add_query_delete(self.xpath)
696 yield from block.execute(flags=0, now=True)
697
698 self._state = VlRecordState.TERMINATED
699 self._log.debug("Terminated VL id:%s", self.id)
700
701
702 class VnfRecordState(Enum):
703 """ Vnf Record State """
704 INIT = 101
705 INSTANTIATION_PENDING = 102
706 ACTIVE = 103
707 TERMINATE_PENDING = 104
708 TERMINATED = 105
709 FAILED = 106
710
711
712 class VirtualNetworkFunctionRecord(object):
713 """ Virtual Network Function Record class"""
714 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
715
716 @staticmethod
717 @asyncio.coroutine
718 def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name,
719 cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id,
720 placement_groups, restart_mode=False):
721 """Creates a new VNFR object based on the given data.
722
723 If restart mode is enabled, then we look for existing records in the
724 DTS and create a VNFR records using the exiting data(ID)
725
726 Returns:
727 VirtualNetworkFunctionRecord
728 """
729 vnfr_obj = VirtualNetworkFunctionRecord(
730 dts,
731 log,
732 loop,
733 vnfd,
734 const_vnfd_msg,
735 nsd_id,
736 nsr_name,
737 cloud_account_name,
738 om_datacenter_name,
739 nsr_id,
740 group_name,
741 group_instance_id,
742 placement_groups,
743 restart_mode=restart_mode)
744
745 if restart_mode:
746 res_iter = yield from dts.query_read(
747 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
748 rwdts.XactFlag.MERGE)
749
750 for fut in res_iter:
751 response = yield from fut
752 vnfr = response.result
753
754 if vnfr.name == vnfr_obj.name:
755 vnfr_obj.reset_id(vnfr.id)
756 break
757
758 return vnfr_obj
759
760 def __init__(self,
761 dts,
762 log,
763 loop,
764 vnfd,
765 const_vnfd_msg,
766 nsd_id,
767 nsr_name,
768 cloud_account_name,
769 om_datacenter_name,
770 nsr_id,
771 group_name=None,
772 group_instance_id=None,
773 placement_groups = [],
774 restart_mode = False):
775 self._dts = dts
776 self._log = log
777 self._loop = loop
778 self._vnfd = vnfd
779 self._const_vnfd_msg = const_vnfd_msg
780 self._nsd_id = nsd_id
781 self._nsr_name = nsr_name
782 self._nsr_id = nsr_id
783 self._cloud_account_name = cloud_account_name
784 self._om_datacenter_name = om_datacenter_name
785 self._group_name = group_name
786 self._group_instance_id = group_instance_id
787 self._placement_groups = placement_groups
788 self._config_status = NsrYang.ConfigStates.INIT
789 self._create_time = int(time.time())
790
791 self._prev_state = VnfRecordState.INIT
792 self._state = VnfRecordState.INIT
793 self._state_failed_reason = None
794
795 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
796 self.configure()
797
798 self._vnfr_id = str(uuid.uuid4())
799 self._name = None
800 self._vnfr_msg = self.create_vnfr_msg()
801 self._log.debug("Set VNFR {} config type to {}".
802 format(self.name, self.config_type))
803 self.restart_mode = restart_mode
804
805
806 if group_name is None and group_instance_id is not None:
807 raise ValueError("Group instance id must not be provided with an empty group name")
808
809 @property
810 def id(self):
811 """ VNFR id """
812 return self._vnfr_id
813
814 @property
815 def xpath(self):
816 """ VNFR xpath """
817 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id)
818
819 @property
820 def vnfr_msg(self):
821 """ VNFR message """
822 return self._vnfr_msg
823
824 @property
825 def const_vnfr_msg(self):
826 """ VNFR message """
827 return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name)
828
829 @property
830 def vnfd(self):
831 """ vnfd """
832 return self._vnfd
833
834 @property
835 def cloud_account_name(self):
836 """ Cloud account that this VNF should be created in """
837 return self._cloud_account_name
838
839 @property
840 def om_datacenter_name(self):
841 """ Datacenter that this VNF should be created in """
842 return self._om_datacenter_name
843
844
845 @property
846 def active(self):
847 """ Is this VNF actve """
848 return True if self._state == VnfRecordState.ACTIVE else False
849
850 @property
851 def state(self):
852 """ state of this VNF """
853 return self._state
854
855 @property
856 def state_failed_reason(self):
857 """ Error message in case this VNF is in failed state """
858 return self._state_failed_reason
859
860 @property
861 def member_vnf_index(self):
862 """ Member VNF index """
863 return self._const_vnfd_msg.member_vnf_index
864
865 @property
866 def nsr_name(self):
867 """ NSR name"""
868 return self._nsr_name
869
870 @property
871 def name(self):
872 """ Name of this VNFR """
873 if self._name is not None:
874 return self._name
875
876 name_tags = [self._nsr_name]
877
878 if self._group_name is not None:
879 name_tags.append(self._group_name)
880
881 if self._group_instance_id is not None:
882 name_tags.append(str(self._group_instance_id))
883
884 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
885
886 self._name = "__".join(name_tags)
887
888 return self._name
889
890 @staticmethod
891 def vnfr_xpath(vnfr):
892 """ Get the VNFR path from VNFR """
893 return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id)
894
895 @property
896 def config_type(self):
897 cfg_types = ['netconf', 'juju', 'script']
898 for method in cfg_types:
899 if self._vnfd.vnf_configuration.has_field(method):
900 return method
901 return 'none'
902
903 @property
904 def config_status(self):
905 """Return the config status as YANG ENUM string"""
906 self._log.debug("Map VNFR {} config status {} ({})".
907 format(self.name, self._config_status, self.config_type))
908 if self.config_type == 'none':
909 return 'config_not_needed'
910 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
911 return 'configured'
912 elif self._config_status == NsrYang.ConfigStates.FAILED:
913 return 'failed'
914
915 return 'configuring'
916
917 def set_state(self, state):
918 """ set the state of this object """
919 self._prev_state = self._state
920 self._state = state
921
922 def reset_id(self, vnfr_id):
923 self._vnfr_id = vnfr_id
924 self._vnfr_msg = self.create_vnfr_msg()
925
926 def configure(self):
927 self.config_store.merge_vnfd_config(
928 self._nsd_id,
929 self._vnfd,
930 self.member_vnf_index,
931 )
932
933 def create_vnfr_msg(self):
934 """ VNFR message for this VNFR """
935 vnfd_fields = [
936 "short_name",
937 "vendor",
938 "description",
939 "version",
940 "type_yang",
941 ]
942 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
943 vnfr_dict = {
944 "id": self.id,
945 "nsr_id_ref": self._nsr_id,
946 "name": self.name,
947 "cloud_account": self._cloud_account_name,
948 "om_datacenter": self._om_datacenter_name,
949 "config_status": self.config_status
950 }
951 vnfr_dict.update(vnfd_copy_dict)
952
953 vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
954 vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(),
955 ignore_missing_keys=True)
956 vnfr.member_vnf_index_ref = self.member_vnf_index
957 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
958
959 if self._vnfd.mgmt_interface.has_field("port"):
960 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
961
962 for group_info in self._placement_groups:
963 group = vnfr.placement_groups_info.add()
964 group.from_dict(group_info.as_dict())
965
966 # UI expects the monitoring param field to exist
967 vnfr.monitoring_param = []
968
969 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
970 return vnfr
971
972 @asyncio.coroutine
973 def update_vnfm(self):
974 self._log.debug("Send an update to VNFM for VNFR {} with {}".
975 format(self.name, self.vnfr_msg))
976 yield from self._dts.query_update(
977 self.xpath,
978 rwdts.XactFlag.TRACE,
979 self.vnfr_msg
980 )
981
982 def get_config_status(self):
983 """Return the config status as YANG ENUM"""
984 return self._config_status
985
986 @asyncio.coroutine
987 def set_config_status(self, status):
988
989 def status_to_string(status):
990 status_dc = {
991 NsrYang.ConfigStates.INIT : 'init',
992 NsrYang.ConfigStates.CONFIGURING : 'configuring',
993 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
994 NsrYang.ConfigStates.CONFIGURED : 'configured',
995 NsrYang.ConfigStates.FAILED : 'failed',
996 }
997
998 return status_dc[status]
999
1000 self._log.debug("Update VNFR {} from {} ({}) to {}".
1001 format(self.name, self._config_status,
1002 self.config_type, status))
1003 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1004 self._log.error("Updating already configured VNFR {}".
1005 format(self.name))
1006 return
1007
1008 if self._config_status != status:
1009 try:
1010 self._config_status = status
1011 # I don't think this is used. Original implementor can check.
1012 # Caused Exception, so corrected it by status_to_string
1013 # But not sure whats the use of this variable?
1014 self.vnfr_msg.config_status = status_to_string(status)
1015 except Exception as e:
1016 self._log.error("Exception=%s", str(e))
1017 pass
1018
1019 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1020
1021 if self._config_status != NsrYang.ConfigStates.INIT:
1022 try:
1023 # Publish only after VNFM has the VNFR created
1024 yield from self.update_vnfm()
1025 except Exception as e:
1026 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1027 format(status, self.name, e))
1028 self._log.exception(e)
1029
1030 def is_configured(self):
1031 if self.config_type == 'none':
1032 return True
1033
1034 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1035 return True
1036
1037 return False
1038
1039 @asyncio.coroutine
1040 def instantiate(self, nsr):
1041 """ Instantiate this VNFR"""
1042
1043 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1044 self.xpath, self._vnfd)
1045
1046 self._log.debug("Create VNF with xpath %s and vnfr %s",
1047 self.xpath, self.vnfr_msg)
1048
1049 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1050
1051 def find_vlr_for_cp(conn):
1052 """ Find VLR for the given connection point """
1053 for vlr in nsr.vlrs:
1054 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1055 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1056 vnfd_cp.vnfd_connection_point_ref == conn.name and
1057 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1058 vlr.cloud_account_name == self.cloud_account_name):
1059 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1060 conn.name, self.member_vnf_index)
1061 return vlr
1062 return None
1063
1064 # For every connection point in the VNFD fill in the identifier
1065 for conn_p in self._vnfd.connection_point:
1066 cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1067 cpr.name = conn_p.name
1068 cpr.type_yang = conn_p.type_yang
1069 if conn_p.has_field('port_security_enabled'):
1070 cpr.port_security_enabled = conn_p.port_security_enabled
1071
1072 vlr_ref = find_vlr_for_cp(conn_p)
1073 if vlr_ref is None:
1074 msg = "Failed to find VLR for cp = %s" % conn_p.name
1075 self._log.debug("%s", msg)
1076 # raise VirtualNetworkFunctionRecordError(msg)
1077 continue
1078
1079 cpr.vlr_ref = vlr_ref.id
1080 self.vnfr_msg.connection_point.append(cpr)
1081 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1082 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
1083
1084 if not self.restart_mode:
1085 yield from self._dts.query_create(self.xpath,
1086 0, # this is sub
1087 self.vnfr_msg)
1088 else:
1089 yield from self._dts.query_update(self.xpath,
1090 0,
1091 self.vnfr_msg)
1092
1093 self._log.info("Created VNF with xpath %s and vnfr %s",
1094 self.xpath, self.vnfr_msg)
1095
1096 self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1097 self.xpath, self._vnfd, self.vnfr_msg)
1098
1099 @asyncio.coroutine
1100 def update_state(self, vnfr_msg):
1101 """ Update this VNFR"""
1102 if vnfr_msg.operational_status == "running":
1103 if self.vnfr_msg.operational_status != "running":
1104 yield from self.is_active()
1105 elif vnfr_msg.operational_status == "failed":
1106 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1107
1108 @asyncio.coroutine
1109 def is_active(self):
1110 """ This VNFR is active """
1111 self._log.debug("VNFR %s is active", self._vnfr_id)
1112 self.set_state(VnfRecordState.ACTIVE)
1113
1114 @asyncio.coroutine
1115 def instantiation_failed(self, failed_reason=None):
1116 """ This VNFR instantiation failed"""
1117 self._log.error("VNFR %s instantiation failed", self._vnfr_id)
1118 self.set_state(VnfRecordState.FAILED)
1119 self._state_failed_reason = failed_reason
1120
1121 def vnfr_in_vnfm(self):
1122 """ Is there a VNFR record in VNFM """
1123 if (self._state == VnfRecordState.ACTIVE or
1124 self._state == VnfRecordState.INSTANTIATION_PENDING or
1125 self._state == VnfRecordState.FAILED):
1126 return True
1127
1128 return False
1129
1130 @asyncio.coroutine
1131 def terminate(self):
1132 """ Terminate this VNF """
1133 if not self.vnfr_in_vnfm():
1134 self._log.debug("Ignoring terminate request for id %s in state %s",
1135 self.id, self._state)
1136 return
1137
1138 self._log.debug("Terminating VNF id:%s", self.id)
1139 self.set_state(VnfRecordState.TERMINATE_PENDING)
1140 with self._dts.transaction(flags=0) as xact:
1141 block = xact.block_create()
1142 block.add_query_delete(self.xpath)
1143 yield from block.execute(flags=0)
1144 self.set_state(VnfRecordState.TERMINATED)
1145 self._log.debug("Terminated VNF id:%s", self.id)
1146
1147
1148 class NetworkServiceStatus(object):
1149 """ A class representing the Network service's status """
1150 MAX_EVENTS_RECORDED = 10
1151 """ Network service Status class"""
1152 def __init__(self, dts, log, loop):
1153 self._dts = dts
1154 self._log = log
1155 self._loop = loop
1156
1157 self._state = NetworkServiceRecordState.INIT
1158 self._events = deque([])
1159
1160 @asyncio.coroutine
1161 def create_notification(self, evt, evt_desc, evt_details):
1162 xp = "N,/rw-nsr:nsm-notification"
1163 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1164 notif.event = evt
1165 notif.description = evt_desc
1166 notif.details = evt_details if evt_details is not None else None
1167
1168 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1169 self._log.info("Notification called by creating dts query: %s", notif)
1170
1171 def record_event(self, evt, evt_desc, evt_details):
1172 """ Record an event """
1173 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1174 evt, evt_desc, len(self._events))
1175 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1176 self._events.popleft()
1177 self._events.append((int(time.time()), evt, evt_desc,
1178 evt_details if evt_details is not None else None))
1179
1180 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1181
1182 def set_state(self, state):
1183 """ set the state of this status object """
1184 self._state = state
1185
1186 def yang_str(self):
1187 """ Return the state as a yang enum string """
1188 state_to_str_map = {"INIT": "init",
1189 "VL_INIT_PHASE": "vl_init_phase",
1190 "VNF_INIT_PHASE": "vnf_init_phase",
1191 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1192 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1193 "RUNNING": "running",
1194 "SCALING_OUT": "scaling_out",
1195 "SCALING_IN": "scaling_in",
1196 "TERMINATE_RCVD": "terminate_rcvd",
1197 "TERMINATE": "terminate",
1198 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1199 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1200 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1201 "TERMINATED": "terminated",
1202 "FAILED": "failed",
1203 "VL_INSTANTIATE": "vl_instantiate",
1204 "VL_TERMINATE": "vl_terminate",
1205 }
1206 return state_to_str_map[self._state.name]
1207
1208 @property
1209 def state(self):
1210 """ State of this status object """
1211 return self._state
1212
1213 @property
1214 def msg(self):
1215 """ Network Service Record as a message"""
1216 event_list = []
1217 idx = 1
1218 for entry in self._events:
1219 event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1220 event.id = idx
1221 idx += 1
1222 event.timestamp, event.event, event.description, event.details = entry
1223 event_list.append(event)
1224 return event_list
1225
1226
1227 class NetworkServiceRecord(object):
1228 """ Network service record """
1229 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1230
1231 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False,
1232 vlr_handler=None):
1233 self._dts = dts
1234 self._log = log
1235 self._loop = loop
1236 self._nsm = nsm
1237 self._nsr_cfg_msg = nsr_cfg_msg
1238 self._nsm_plugin = nsm_plugin
1239 self._sdn_account_name = sdn_account_name
1240 self._vlr_handler = vlr_handler
1241
1242 self._nsd = None
1243 self._nsr_msg = None
1244 self._nsr_regh = None
1245 self._key_pairs = key_pairs
1246 self._vlrs = []
1247 self._vnfrs = {}
1248 self._vnfds = {}
1249 self._vnffgrs = {}
1250 self._param_pools = {}
1251 self._scaling_groups = {}
1252 self._create_time = int(time.time())
1253 self._op_status = NetworkServiceStatus(dts, log, loop)
1254 self._config_status = NsrYang.ConfigStates.CONFIGURING
1255 self._config_status_details = None
1256 self._job_id = 0
1257 self.restart_mode = restart_mode
1258 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1259 self._debug_running = False
1260 self._is_active = False
1261 self._vl_phase_completed = False
1262 self._vnf_phase_completed = False
1263 self.vlr_uptime_tasks = {}
1264
1265
1266 # Initalise the state to init
1267 # The NSR moves through the following transitions
1268 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1269 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1270 # 3. VNFS_READY - READY when the NSR is published
1271
1272 self.set_state(NetworkServiceRecordState.INIT)
1273
1274 self.substitute_input_parameters = InputParameterSubstitution(self._log)
1275
1276 @property
1277 def nsm_plugin(self):
1278 """ NSM Plugin """
1279 return self._nsm_plugin
1280
1281 def set_state(self, state):
1282 """ Set state for this NSR"""
1283 self._log.debug("Setting state to %s", state)
1284 # We are in init phase and is moving to the next state
1285 # The new state could be a FAILED state or VNF_INIIT_PHASE
1286 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1287 self._vl_phase_completed = True
1288
1289 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1290 self._vnf_phase_completed = True
1291
1292 self._op_status.set_state(state)
1293 self._nsm_plugin.set_state(self.id, state)
1294
1295 @property
1296 def id(self):
1297 """ Get id for this NSR"""
1298 return self._nsr_cfg_msg.id
1299
1300 @property
1301 def name(self):
1302 """ Name of this network service record """
1303 return self._nsr_cfg_msg.name
1304
1305 @property
1306 def cloud_account_name(self):
1307 return self._nsr_cfg_msg.cloud_account
1308
1309 @property
1310 def om_datacenter_name(self):
1311 if self._nsr_cfg_msg.has_field('om_datacenter'):
1312 return self._nsr_cfg_msg.om_datacenter
1313 return None
1314
1315 @property
1316 def state(self):
1317 """State of this NetworkServiceRecord"""
1318 return self._op_status.state
1319
1320 @property
1321 def active(self):
1322 """ Is this NSR active ?"""
1323 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1324
1325 @property
1326 def vlrs(self):
1327 """ VLRs associated with this NSR"""
1328 return self._vlrs
1329
1330 @property
1331 def vnfrs(self):
1332 """ VNFRs associated with this NSR"""
1333 return self._vnfrs
1334
1335 @property
1336 def vnffgrs(self):
1337 """ VNFFGRs associated with this NSR"""
1338 return self._vnffgrs
1339
1340 @property
1341 def scaling_groups(self):
1342 """ Scaling groups associated with this NSR """
1343 return self._scaling_groups
1344
1345 @property
1346 def param_pools(self):
1347 """ Parameter value pools associated with this NSR"""
1348 return self._param_pools
1349
1350 @property
1351 def nsr_cfg_msg(self):
1352 return self._nsr_cfg_msg
1353
1354 @nsr_cfg_msg.setter
1355 def nsr_cfg_msg(self, msg):
1356 self._nsr_cfg_msg = msg
1357
1358 @property
1359 def nsd_msg(self):
1360 """ NSD Protobuf for this NSR """
1361 if self._nsd is not None:
1362 return self._nsd
1363 self._nsd = self._nsr_cfg_msg.nsd
1364 return self._nsd
1365
1366 @property
1367 def nsd_id(self):
1368 """ NSD ID for this NSR """
1369 return self.nsd_msg.id
1370
1371 @property
1372 def job_id(self):
1373 ''' Get a new job id for config primitive'''
1374 self._job_id += 1
1375 return self._job_id
1376
1377 @property
1378 def config_status(self):
1379 """ Config status for NSR """
1380 return self._config_status
1381
1382 def resolve_placement_group_cloud_construct(self, input_group):
1383 """
1384 Returns the cloud specific construct for placement group
1385 """
1386 copy_dict = ['name', 'requirement', 'strategy']
1387
1388 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1389 if group_info.placement_group_ref == input_group.name:
1390 group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1391 group_dict = {k:v for k,v in
1392 group_info.as_dict().items() if k != 'placement_group_ref'}
1393 for param in copy_dict:
1394 group_dict.update({param: getattr(input_group, param)})
1395 group.from_dict(group_dict)
1396 return group
1397 return None
1398
1399
1400 def __str__(self):
1401 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1402 self.name, self.nsd_id, self.cloud_account_name
1403 )
1404
1405 def _get_vnfd(self, vnfd_id, config_xact):
1406 """ Fetch vnfd msg for the passed vnfd id """
1407 return self._nsm.get_vnfd(vnfd_id, config_xact)
1408
1409 def _get_vnfd_cloud_account(self, vnfd_member_index):
1410 """ Fetch Cloud Account for the passed vnfd id """
1411 if self._nsr_cfg_msg.vnf_cloud_account_map:
1412 vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \
1413 if vnfd_member_index == vnf.member_vnf_index_ref]
1414 if vim_accounts and vim_accounts[0]:
1415 return vim_accounts[0]
1416 return (self.cloud_account_name,self.om_datacenter_name)
1417
1418 def _get_constituent_vnfd_msg(self, vnf_index):
1419 for const_vnfd in self.nsd_msg.constituent_vnfd:
1420 if const_vnfd.member_vnf_index == vnf_index:
1421 return const_vnfd
1422
1423 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1424
1425 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1426 """ Record an event """
1427 self._op_status.record_event(evt, evt_desc, evt_details)
1428 if state is not None:
1429 self.set_state(state)
1430
1431 def scaling_trigger_str(self, trigger):
1432 SCALING_TRIGGER_STRS = {
1433 NsdYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1434 NsdYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1435 NsdYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1436 NsdYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1437 }
1438 try:
1439 return SCALING_TRIGGER_STRS[trigger]
1440 except Exception as e:
1441 self._log.error("Scaling trigger mapping error for {} : {}".
1442 format(trigger, e))
1443 self._log.exception(e)
1444 return "Unknown trigger"
1445
1446 @asyncio.coroutine
1447 def instantiate_vls(self):
1448 """
1449 This function instantiates VLs for every VL in this Network Service
1450 """
1451 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1452 self.id)
1453 for vlr in self._vlrs:
1454 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1455 vlr.state = VlRecordState.ACTIVE
1456
1457
1458 def vlr_uptime_update(self, vlr):
1459 try:
1460
1461 vlr_ = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict({'id': vlr.id})
1462 while True:
1463 vlr_.uptime = int(time.time()) - vlr._create_time
1464 yield from self._vlr_handler.update(None, VirtualLinkRecord.vlr_xpath(vlr), vlr_)
1465 yield from asyncio.sleep(2, loop=self._loop)
1466 except asyncio.CancelledError:
1467 self._log.debug("Received cancellation request for vlr_uptime_update task")
1468 yield from self._vlr_handler.delete(None, VirtualLinkRecord.vlr_xpath(vlr))
1469
1470
1471 @asyncio.coroutine
1472 def create(self, config_xact):
1473 """ Create this network service"""
1474 # Create virtual links for all the external vnf
1475 # connection points in this NS
1476 yield from self.create_vls()
1477
1478 # Create VNFs in this network service
1479 yield from self.create_vnfs(config_xact)
1480
1481 # Create VNFFG for network service
1482 self.create_vnffgs()
1483
1484 # Create Scaling Groups for each scaling group in NSD
1485 self.create_scaling_groups()
1486
1487 # Create Parameter Pools
1488 self.create_param_pools()
1489
1490 @asyncio.coroutine
1491 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1492 """ Apply config based on script for scale group """
1493
1494 @asyncio.coroutine
1495 def add_vnfrs_data(vnfrs_list):
1496 """ Add as a dict each of the VNFRs data """
1497 vnfrs_data = []
1498 for vnfr in vnfrs_list:
1499 self._log.debug("Add VNFR {} data".format(vnfr))
1500 vnfr_data = dict()
1501 vnfr_data['name'] = vnfr.name
1502 if trigger in [NsdYang.ScalingTrigger.PRE_SCALE_IN, NsdYang.ScalingTrigger.POST_SCALE_OUT]:
1503 # Get VNF management and other IPs, etc
1504 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1505 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1506 try:
1507 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1508 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1509 except Exception as e:
1510 self._log.error("Unable to get management IP for vnfr {}:{}".
1511 format(vnfr.name, e))
1512
1513 try:
1514 vnfr_data['connection_points'] = []
1515 for cp in opdata.connection_point:
1516 con_pt = dict()
1517 con_pt['name'] = cp.name
1518 con_pt['ip_address'] = cp.ip_address
1519 vnfr_data['connection_points'].append(con_pt)
1520 except Exception as e:
1521 self._log.error("Exception getting connections points for VNFR {}: {}".
1522 format(vnfr.name, e))
1523
1524 vnfrs_data.append(vnfr_data)
1525 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1526
1527 return vnfrs_data
1528
1529 def add_nsr_data(nsr):
1530 nsr_data = dict()
1531 nsr_data['name'] = nsr.name
1532 return nsr_data
1533
1534 if script is None or len(script) == 0:
1535 self._log.error("Script not provided for scale group config: {}".format(group.name))
1536 return False
1537
1538 if script[0] == '/':
1539 path = script
1540 else:
1541 path = os.path.join(os.environ['RIFT_INSTALL'], "usr/bin", script)
1542 if not os.path.exists(path):
1543 self._log.error("Config faled for scale group {}: Script does not exist at {}".
1544 format(group.name, path))
1545 return False
1546
1547 # Build a YAML file with all parameters for the script to execute
1548 # The data consists of 5 sections
1549 # 1. Trigger
1550 # 2. Scale group config
1551 # 3. VNFRs in the scale group
1552 # 4. VNFRs outside scale group
1553 # 5. NSR data
1554 data = dict()
1555 data['trigger'] = group.trigger_map(trigger)
1556 data['config'] = group.group_msg.as_dict()
1557
1558 if vnfrs:
1559 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1560 else:
1561 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1562
1563 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1564 data["nsr"] = add_nsr_data(self)
1565
1566 tmp_file = None
1567 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1568 tmp_file.write(yaml.dump(data, default_flow_style=True)
1569 .encode("UTF-8"))
1570
1571 self._log.debug("Creating a temp file: {} with input data: {}".
1572 format(tmp_file.name, data))
1573
1574 cmd = "{} {}".format(path, tmp_file.name)
1575 self._log.debug("Running the CMD: {}".format(cmd))
1576 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1577 rc = yield from proc.wait()
1578 if rc:
1579 self._log.error("The script {} for scale group {} config returned: {}".
1580 format(script, group.name, rc))
1581 return False
1582
1583 # Success
1584 return True
1585
1586
1587 @asyncio.coroutine
1588 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1589 """ Apply the config for the scaling group based on trigger """
1590 if group is None or scale_instance is None:
1591 return False
1592
1593 @asyncio.coroutine
1594 def update_config_status(success=True, err_msg=None):
1595 self._log.debug("Update %s config status to %r : %s",
1596 scale_instance, success, err_msg)
1597 if (scale_instance.config_status == "failed"):
1598 # Do not update the config status if it is already in failed state
1599 return
1600
1601 if scale_instance.config_status == "configured":
1602 # Update only to failed state an already configured scale instance
1603 if not success:
1604 scale_instance.config_status = "failed"
1605 scale_instance.config_err_msg = err_msg
1606 yield from self.update_state()
1607 else:
1608 # We are in configuring state
1609 # Only after post scale out mark instance as configured
1610 if trigger == NsdYang.ScalingTrigger.POST_SCALE_OUT:
1611 if success:
1612 scale_instance.config_status = "configured"
1613 else:
1614 scale_instance.config_status = "failed"
1615 scale_instance.config_err_msg = err_msg
1616 yield from self.update_state()
1617
1618 config = group.trigger_config(trigger)
1619 if config is None:
1620 return True
1621
1622 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1623 if config.has_field("ns_config_primitive_name_ref"):
1624 config_name = config.ns_config_primitive_name_ref
1625 nsd_msg = self.nsd_msg
1626 config_primitive = None
1627 for ns_cfg_prim in nsd_msg.service_primitive:
1628 if ns_cfg_prim.name == config_name:
1629 config_primitive = ns_cfg_prim
1630 break
1631
1632 if config_primitive is None:
1633 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1634
1635 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1636 if config_primitive.has_field("user_defined_script"):
1637 rc = yield from self.apply_scale_group_config_script(config_primitive.user_defined_script,
1638 group, scale_instance, trigger, vnfrs)
1639 err_msg = None
1640 if not rc:
1641 err_msg = "Failed config for trigger {} using config script '{}'". \
1642 format(self.scaling_trigger_str(trigger),
1643 config_primitive.user_defined_script)
1644 yield from update_config_status(success=rc, err_msg=err_msg)
1645 return rc
1646 else:
1647 err_msg = "Failed config for trigger {} as config script is not specified". \
1648 format(self.scaling_trigger_str(trigger))
1649 yield from update_config_status(success=False, err_msg=err_msg)
1650 raise NotImplementedError("Only script based config support for scale group for now: {}".
1651 format(group.name))
1652 else:
1653 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1654 format(self.scaling_trigger_str(trigger))
1655 yield from update_config_status(success=False, err_msg=err_msg)
1656 self._log.error("Config primitive not specified for config action in scale group %s" %
1657 (group.name))
1658 return False
1659
1660 def create_scaling_groups(self):
1661 """ This function creates a NSScalingGroup for every scaling
1662 group defined in he NSD"""
1663
1664 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1665 self._log.debug("Found scaling_group %s in nsr id %s",
1666 scaling_group_msg.name, self.id)
1667
1668 group_record = scale_group.ScalingGroup(
1669 self._log,
1670 scaling_group_msg
1671 )
1672
1673 self._scaling_groups[group_record.name] = group_record
1674
1675 @asyncio.coroutine
1676 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1677 group = self._scaling_groups[group_name]
1678 scale_instance = group.create_instance(index, is_default)
1679
1680 @asyncio.coroutine
1681 def create_vnfs():
1682 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1683 len(self.nsd_msg.constituent_vnfd), self.id, self)
1684
1685 vnfrs = []
1686 for vnf_index, count in group.vnf_index_count_map.items():
1687 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1688 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1689
1690 cloud_account_name, om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd_msg.member_vnf_index)
1691 if cloud_account_name is None:
1692 cloud_account_name = self.cloud_account_name
1693 for _ in range(count):
1694 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index)
1695 scale_instance.add_vnfr(vnfr)
1696 vnfrs.append(vnfr)
1697 return vnfrs
1698
1699 @asyncio.coroutine
1700 def instantiate_instance():
1701 self._log.debug("Creating %s VNFRS", scale_instance)
1702 vnfrs = yield from create_vnfs()
1703 yield from self.publish()
1704
1705 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1706 scale_instance.operational_status = "vnf_init_phase"
1707 yield from self.update_state()
1708
1709 try:
1710 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_OUT,
1711 group, scale_instance, vnfrs)
1712 if not rc:
1713 self._log.error("Pre scale out config for scale group {} ({}) failed".
1714 format(group.name, index))
1715 scale_instance.operational_status = "failed"
1716 else:
1717 yield from self.instantiate_vnfs(vnfrs, scaleout=True)
1718
1719
1720 except Exception as e:
1721 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1722 format(group.name, e))
1723 self._log.exception(e)
1724 scale_instance.operational_status = "failed"
1725
1726 yield from self.update_state()
1727
1728 yield from instantiate_instance()
1729
1730 @asyncio.coroutine
1731 def delete_scale_group_instance(self, group_name, index):
1732 group = self._scaling_groups[group_name]
1733 scale_instance = group.get_instance(index)
1734 if scale_instance.is_default:
1735 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1736
1737 scale_instance.operational_status = "terminate"
1738 yield from self.update_state()
1739
1740 @asyncio.coroutine
1741 def terminate_instance():
1742 self._log.debug("Terminating %s VNFRS" % scale_instance)
1743 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.PRE_SCALE_IN,
1744 group, scale_instance)
1745 if not rc:
1746 self._log.error("Pre scale in config for scale group {} ({}) failed".
1747 format(group.name, index))
1748
1749 # Going ahead with terminate, even if there is an error in pre-scale-in config
1750 # as this could be result of scale out failure and we need to cleanup this group
1751 yield from self.terminate_vnfrs(scale_instance.vnfrs)
1752 group.delete_instance(index)
1753
1754 scale_instance.operational_status = "vnf_terminate_phase"
1755 yield from self.update_state()
1756
1757 yield from terminate_instance()
1758
1759 @asyncio.coroutine
1760 def _update_scale_group_instances_status(self):
1761 @asyncio.coroutine
1762 def post_scale_out_task(group, instance):
1763 # Apply post scale out config once all VNFRs are active
1764 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_OUT,
1765 group, instance)
1766 instance.operational_status = "running"
1767 if rc:
1768 self._log.debug("Scale out for group {} and instance {} succeeded".
1769 format(group.name, instance.instance_id))
1770 else:
1771 self._log.error("Post scale out config for scale group {} ({}) failed".
1772 format(group.name, instance.instance_id))
1773
1774 yield from self.update_state()
1775
1776 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1777 for group, instances in group_instances.items():
1778 self._log.debug("Updating %s instance status", group)
1779 for instance in instances:
1780 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1781 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1782 if instance.operational_status == "vnf_init_phase":
1783 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1784 instance.operational_status = "running"
1785
1786 # Create a task for post scale out to allow us to sleep before attempting
1787 # to configure newly created VM's
1788 self._loop.create_task(post_scale_out_task(group, instance))
1789
1790 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
1791 self._log.debug("Scale out for group {} and instance {} failed".
1792 format(group.name, instance.instance_id))
1793 instance.operational_status = "failed"
1794
1795 elif instance.operational_status == "vnf_terminate_phase":
1796 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
1797 instance.operational_status = "terminated"
1798 rc = yield from self.apply_scaling_group_config(NsdYang.ScalingTrigger.POST_SCALE_IN,
1799 group, instance)
1800 if rc:
1801 self._log.debug("Scale in for group {} and instance {} succeeded".
1802 format(group.name, instance.instance_id))
1803 else:
1804 self._log.error("Post scale in config for scale group {} ({}) failed".
1805 format(group.name, instance.instance_id))
1806
1807 def create_vnffgs(self):
1808 """ This function creates VNFFGs for every VNFFG in the NSD
1809 associated with this NSR"""
1810
1811 for vnffgd in self.nsd_msg.vnffgd:
1812 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
1813 vnffgr = VnffgRecord(self._dts,
1814 self._log,
1815 self._loop,
1816 self._nsm._vnffgmgr,
1817 self,
1818 self.name,
1819 vnffgd,
1820 self._sdn_account_name
1821 )
1822 self._vnffgrs[vnffgr.id] = vnffgr
1823
1824 def resolve_vld_ip_profile(self, nsd_msg, vld):
1825 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
1826 if not vld.has_field('ip_profile_ref'):
1827 return None
1828 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
1829 return profile[0] if profile else None
1830
1831 @asyncio.coroutine
1832 def _create_vls(self, vld, cloud_account,om_datacenter):
1833 """Create a VLR in the cloud account specified using the given VLD
1834
1835 Args:
1836 vld : VLD yang obj
1837 cloud_account : Cloud account name
1838
1839 Returns:
1840 VirtualLinkRecord
1841 """
1842 vlr = yield from VirtualLinkRecord.create_record(
1843 self._dts,
1844 self._log,
1845 self._loop,
1846 self.name,
1847 vld,
1848 cloud_account,
1849 om_datacenter,
1850 self.resolve_vld_ip_profile(self.nsd_msg, vld),
1851 self.id,
1852 restart_mode=self.restart_mode)
1853
1854 return vlr
1855
1856 def _extract_cloud_accounts_for_vl(self, vld):
1857 """
1858 Extracts the list of cloud accounts from the NS Config obj
1859
1860 Rules:
1861 1. Cloud accounts based connection point (vnf_cloud_account_map)
1862 Args:
1863 vld : VLD yang object
1864
1865 Returns:
1866 TYPE: Description
1867 """
1868 cloud_account_list = []
1869
1870 if self._nsr_cfg_msg.vnf_cloud_account_map:
1871 # Handle case where cloud_account is None
1872 vnf_cloud_map = {}
1873 for vnf in self._nsr_cfg_msg.vnf_cloud_account_map:
1874 if vnf.cloud_account is not None or vnf.om_datacenter is not None:
1875 vnf_cloud_map[vnf.member_vnf_index_ref] = (vnf.cloud_account,vnf.om_datacenter)
1876
1877 for vnfc in vld.vnfd_connection_point_ref:
1878 cloud_account = vnf_cloud_map.get(
1879 vnfc.member_vnf_index_ref,
1880 (self.cloud_account_name,self.om_datacenter_name))
1881
1882 cloud_account_list.append(cloud_account)
1883
1884 if self._nsr_cfg_msg.vl_cloud_account_map:
1885 for vld_map in self._nsr_cfg_msg.vl_cloud_account_map:
1886 if vld_map.vld_id_ref == vld.id:
1887 for cloud_account in vld_map.cloud_accounts:
1888 cloud_account_list.extend((cloud_account,None))
1889 for om_datacenter in vld_map.om_datacenters:
1890 cloud_account_list.extend((None,om_datacenter))
1891
1892 # If no config has been provided then fall-back to the default
1893 # account
1894 if not cloud_account_list:
1895 cloud_account_list = [(self.cloud_account_name,self.om_datacenter_name)]
1896
1897 self._log.debug("VL {} cloud accounts: {}".
1898 format(vld.name, cloud_account_list))
1899 return set(cloud_account_list)
1900
1901 @asyncio.coroutine
1902 def create_vls(self):
1903 """ This function creates VLs for every VLD in the NSD
1904 associated with this NSR"""
1905 for vld in self.nsd_msg.vld:
1906
1907 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
1908 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1909 for cloud_account,om_datacenter in cloud_account_list:
1910 vlr = yield from self._create_vls(vld, cloud_account,om_datacenter)
1911 self._vlrs.append(vlr)
1912
1913
1914 @asyncio.coroutine
1915 def create_vl_instance(self, vld):
1916 self._log.debug("Create VL for {}: {}".format(self.id, vld.as_dict()))
1917 # Check if the VL is already present
1918 vlr = None
1919 for vl in self._vlrs:
1920 if vl.vld_msg.id == vld.id:
1921 self._log.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1922 vld.id, self.id, vl.id, vl.state)
1923 vlr = vl
1924 if vlr.state != VlRecordState.TERMINATED:
1925 err_msg = "VLR for VL %s in NSR %s already instantiated", \
1926 vld, self.id
1927 self._log.error(err_msg)
1928 raise NsrVlUpdateError(err_msg)
1929 break
1930
1931 if vlr is None:
1932 cloud_account_list = self._extract_cloud_accounts_for_vl(vld)
1933 for account,om_datacenter in cloud_account_list:
1934 vlr = yield from self._create_vls(vld, account,om_datacenter)
1935 self._vlrs.append(vlr)
1936
1937 vlr.state = VlRecordState.INSTANTIATION_PENDING
1938 yield from self.update_state()
1939
1940 try:
1941 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1942 vlr.state = VlRecordState.ACTIVE
1943
1944 except Exception as e:
1945 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
1946 format(self.id, vld.id, e)
1947 self._log.error(err_msg)
1948 self._log.exception(e)
1949 vlr.state = VlRecordState.FAILED
1950
1951 yield from self.update_state()
1952
1953 @asyncio.coroutine
1954 def delete_vl_instance(self, vld):
1955 for vlr in self._vlrs:
1956 if vlr.vld_msg.id == vld.id:
1957 self._log.debug("Found VLR %s for VLD %s in NSR %s",
1958 vlr.id, vld.id, self.id)
1959 vlr.state = VlRecordState.TERMINATE_PENDING
1960 yield from self.update_state()
1961
1962 try:
1963 yield from self.nsm_plugin.terminate_vl(vlr)
1964 vlr.state = VlRecordState.TERMINATED
1965 self._vlrs.remove(vlr)
1966
1967 except Exception as e:
1968 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
1969 format(self.id, vld.id, e)
1970 self._log.error(err_msg)
1971 self._log.exception(e)
1972 vlr.state = VlRecordState.FAILED
1973
1974 yield from self.update_state()
1975 break
1976
1977 @asyncio.coroutine
1978 def create_vnfs(self, config_xact):
1979 """
1980 This function creates VNFs for every VNF in the NSD
1981 associated with this NSR
1982 """
1983 self._log.debug("Creating %u VNFs associated with this NS id %s",
1984 len(self.nsd_msg.constituent_vnfd), self.id)
1985
1986 for const_vnfd in self.nsd_msg.constituent_vnfd:
1987 if not const_vnfd.start_by_default:
1988 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1989 const_vnfd.member_vnf_index)
1990 continue
1991
1992 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
1993 cloud_account_name,om_datacenter_name = self._get_vnfd_cloud_account(const_vnfd.member_vnf_index)
1994 if cloud_account_name is None:
1995 cloud_account_name = self.cloud_account_name
1996 yield from self.create_vnf_record(vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name)
1997
1998
1999 def get_placement_groups(self, vnfd_msg, const_vnfd):
2000 placement_groups = []
2001 for group in self.nsd_msg.placement_groups:
2002 for member_vnfd in group.member_vnfd:
2003 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
2004 (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index):
2005 group_info = self.resolve_placement_group_cloud_construct(group)
2006 if group_info is None:
2007 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
2008 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2009 else:
2010 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2011 str(group_info),
2012 vnfd_msg.name,
2013 const_vnfd.member_vnf_index)
2014 placement_groups.append(group_info)
2015 return placement_groups
2016
2017 @asyncio.coroutine
2018 def create_vnf_record(self, vnfd_msg, const_vnfd, cloud_account_name, om_datacenter_name, group_name=None, group_instance_id=None):
2019 # Fetch the VNFD associated with this VNF
2020 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2021 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name)
2022 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2023 vnfd_msg.name,
2024 const_vnfd.member_vnf_index,
2025 [ group.name for group in placement_groups])
2026 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2027 self._log,
2028 self._loop,
2029 vnfd_msg,
2030 const_vnfd,
2031 self.nsd_id,
2032 self.name,
2033 cloud_account_name,
2034 om_datacenter_name,
2035 self.id,
2036 group_name,
2037 group_instance_id,
2038 placement_groups,
2039 restart_mode=self.restart_mode,
2040 )
2041 if vnfr.id in self._vnfrs:
2042 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2043 raise NetworkServiceRecordError(err)
2044
2045 self._vnfrs[vnfr.id] = vnfr
2046 self._nsm.vnfrs[vnfr.id] = vnfr
2047
2048 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2049
2050 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2051 vnfr.name,
2052 vnfr.id)
2053
2054 return vnfr
2055
2056 def create_param_pools(self):
2057 for param_pool in self.nsd_msg.parameter_pool:
2058 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2059
2060 start_value = param_pool.range.start_value
2061 end_value = param_pool.range.end_value
2062 if end_value < start_value:
2063 raise NetworkServiceRecordError(
2064 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2065 start_value, end_value
2066 )
2067 )
2068
2069 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2070 self._log,
2071 param_pool.name,
2072 range(start_value, end_value)
2073 )
2074
2075 @asyncio.coroutine
2076 def fetch_vnfr(self, vnfr_path):
2077 """ Fetch VNFR record """
2078 vnfr = None
2079 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2080 vnfr_path, self.id)
2081 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2082
2083 for ent in res_iter:
2084 res = yield from ent
2085 vnfr = res.result
2086
2087 return vnfr
2088
2089 @asyncio.coroutine
2090 def instantiate_vnfs(self, vnfrs, scaleout=False):
2091 """
2092 This function instantiates VNFs for every VNF in this Network Service
2093 """
2094 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2095 for vnf in vnfrs:
2096 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2097 yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout)
2098
2099 @asyncio.coroutine
2100 def instantiate_vnffgs(self):
2101 """
2102 This function instantiates VNFFGs for every VNFFG in this Network Service
2103 """
2104 self._log.debug("Instantiating %u VNFFGs in NS %s",
2105 len(self.nsd_msg.vnffgd), self.id)
2106 for _, vnfr in self.vnfrs.items():
2107 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2108 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2109 yield from asyncio.sleep(2, loop=self._loop)
2110 if vnfr.state == VnfRecordState.ACTIVE:
2111 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2112 continue
2113 else:
2114 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2115 self._vnffgr_state = VnffgRecordState.FAILED
2116 return
2117
2118 self._log.info("Waiting for 90 seconds for VMs to come up")
2119 yield from asyncio.sleep(90, loop=self._loop)
2120 self._log.info("Starting VNFFG orchestration")
2121 for vnffg in self._vnffgrs.values():
2122 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2123 yield from vnffg.instantiate()
2124
2125 @asyncio.coroutine
2126 def instantiate_scaling_instances(self, config_xact):
2127 """ Instantiate any default scaling instances in this Network Service """
2128 for group in self._scaling_groups.values():
2129 for i in range(group.min_instance_count):
2130 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2131 yield from self.create_scale_group_instance(
2132 group.name, i, config_xact, is_default=True
2133 )
2134
2135 for group_msg in self._nsr_cfg_msg.scaling_group:
2136 if group_msg.scaling_group_name_ref != group.name:
2137 continue
2138
2139 for instance in group_msg.instance:
2140 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2141 yield from self.create_scale_group_instance(
2142 group.name, instance.id, config_xact, is_default=False
2143 )
2144
2145 def has_scaling_instances(self):
2146 """ Return boolean indicating if the network service has default scaling groups """
2147 for group in self._scaling_groups.values():
2148 if group.min_instance_count > 0:
2149 return True
2150
2151 for group_msg in self._nsr_cfg_msg.scaling_group:
2152 if len(group_msg.instance) > 0:
2153 return True
2154
2155 return False
2156
2157 @asyncio.coroutine
2158 def publish(self):
2159 """ This function publishes this NSR """
2160 self._nsr_msg = self.create_msg()
2161
2162 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2163 self.nsr_xpath,
2164 self._nsr_msg)
2165
2166 if self._debug_running:
2167 self._log.debug("Publishing NSR in RUNNING state!")
2168 #raise()
2169
2170 with self._dts.transaction() as xact:
2171 yield from self._nsm.nsr_handler.update(xact, self.nsr_xpath, self._nsr_msg)
2172 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2173 self._debug_running = True
2174
2175 @asyncio.coroutine
2176 def unpublish(self, xact):
2177 """ Unpublish this NSR object """
2178 self._log.debug("Unpublishing Network service id %s", self.id)
2179 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2180
2181 @property
2182 def nsr_xpath(self):
2183 """ Returns the xpath associated with this NSR """
2184 return(
2185 "D,/nsr:ns-instance-opdata" +
2186 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2187 ).format(self.id)
2188
2189 @staticmethod
2190 def xpath_from_nsr(nsr):
2191 """ Returns the xpath associated with this NSR op data"""
2192 return (NetworkServiceRecord.XPATH +
2193 "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)
2194
2195 @property
2196 def nsd_xpath(self):
2197 """ Return NSD config xpath."""
2198 return(
2199 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2200 ).format(self.nsd_id)
2201
2202 @asyncio.coroutine
2203 def instantiate(self, config_xact):
2204 """"Instantiates a NetworkServiceRecord.
2205
2206 This function instantiates a Network service
2207 which involves the following steps,
2208
2209 * Instantiate every VL in NSD by sending create VLR request to DTS.
2210 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2211 * Publish the NSR details to DTS
2212
2213 Arguments:
2214 nsr: The NSR configuration request containing nsr-id and nsd
2215 config_xact: The configuration transaction which initiated the instatiation
2216
2217 Raises:
2218 NetworkServiceRecordError if the NSR creation fails
2219
2220 Returns:
2221 No return value
2222 """
2223
2224 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2225
2226 # Move the state to INIITALIZING
2227 self.set_state(NetworkServiceRecordState.INIT)
2228
2229 event_descr = "Instantiation Request Received NSR Id:%s" % self.id
2230 self.record_event("instantiating", event_descr)
2231
2232 # Find the NSD
2233 self._nsd = self._nsr_cfg_msg.nsd
2234
2235 # Merge any config and initial config primitive values
2236 self.config_store.merge_nsd_config(self.nsd_msg)
2237 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2238
2239 event_descr = "Fetched NSD with descriptor id %s" % self.nsd_id
2240 self.record_event("nsd-fetched", event_descr)
2241
2242 if self._nsd is None:
2243 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2244 self._log.debug(msg, self.nsd_id, self.id)
2245 raise NetworkServiceRecordError(self)
2246
2247 self._log.debug("Got nsd result %s", self._nsd)
2248
2249 # Substitute any input parameters
2250 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2251
2252 # Create the record
2253 yield from self.create(config_xact)
2254
2255 # Publish the NSR to DTS
2256 yield from self.publish()
2257
2258 @asyncio.coroutine
2259 def do_instantiate():
2260 """
2261 Instantiate network service
2262 """
2263 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2264 self.id, self.nsd_id)
2265
2266 # instantiate the VLs
2267 event_descr = ("Instantiating %s external VLs for NSR id %s" %
2268 (len(self.nsd_msg.vld), self.id))
2269 self.record_event("begin-external-vls-instantiation", event_descr)
2270
2271 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2272
2273 yield from self.instantiate_vls()
2274
2275 # Publish the NSR to DTS
2276 yield from self.publish()
2277
2278 event_descr = ("Finished instantiating %s external VLs for NSR id %s" %
2279 (len(self.nsd_msg.vld), self.id))
2280 self.record_event("end-external-vls-instantiation", event_descr)
2281
2282 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2283
2284 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2285 self.id, self.nsd_id)
2286
2287 # instantiate the VNFs
2288 event_descr = ("Instantiating %s VNFS for NSR id %s" %
2289 (len(self.nsd_msg.constituent_vnfd), self.id))
2290
2291 self.record_event("begin-vnf-instantiation", event_descr)
2292
2293 yield from self.instantiate_vnfs(self._vnfrs.values())
2294
2295 self._log.debug(" Finished instantiating %d VNFs for NSR id %s",
2296 len(self.nsd_msg.constituent_vnfd), self.id)
2297
2298 event_descr = ("Finished instantiating %s VNFs for NSR id %s" %
2299 (len(self.nsd_msg.constituent_vnfd), self.id))
2300 self.record_event("end-vnf-instantiation", event_descr)
2301
2302 if len(self.vnffgrs) > 0:
2303 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2304 event_descr = ("Instantiating %s VNFFGS for NSR id %s" %
2305 (len(self.nsd_msg.vnffgd), self.id))
2306
2307 self.record_event("begin-vnffg-instantiation", event_descr)
2308
2309 yield from self.instantiate_vnffgs()
2310
2311 event_descr = ("Finished instantiating %s VNFFGDs for NSR id %s" %
2312 (len(self.nsd_msg.vnffgd), self.id))
2313 self.record_event("end-vnffg-instantiation", event_descr)
2314
2315 if self.has_scaling_instances():
2316 event_descr = ("Instantiating %s Scaling Groups for NSR id %s" %
2317 (len(self._scaling_groups), self.id))
2318
2319 self.record_event("begin-scaling-group-instantiation", event_descr)
2320 yield from self.instantiate_scaling_instances(config_xact)
2321 self.record_event("end-scaling-group-instantiation", event_descr)
2322
2323 # Give the plugin a chance to deploy the network service now that all
2324 # virtual links and vnfs are instantiated
2325 yield from self.nsm_plugin.deploy(self._nsr_msg)
2326
2327 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2328 self.id, self.nsd_id)
2329
2330 # Publish the NSR to DTS
2331 yield from self.publish()
2332
2333 self._log.debug("Published NSR...... nsr[%s], nsd[%s]",
2334 self.id, self.nsd_id)
2335
2336 def on_instantiate_done(fut):
2337 # If the do_instantiate fails, then publish NSR with failed result
2338 e = fut.exception()
2339 if e is not None:
2340 import traceback, sys
2341 print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True)
2342 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e))
2343 self._loop.create_task(self.instantiation_failed(failed_reason=str(e)))
2344
2345 instantiate_task = self._loop.create_task(do_instantiate())
2346 instantiate_task.add_done_callback(on_instantiate_done)
2347
2348 @asyncio.coroutine
2349 def set_config_status(self, status, status_details=None):
2350 if self.config_status != status:
2351 self._log.debug("Updating NSR {} status for {} to {}".
2352 format(self.name, self.config_status, status))
2353 self._config_status = status
2354 self._config_status_details = status_details
2355
2356 if self._config_status == NsrYang.ConfigStates.FAILED:
2357 self.record_event("config-failed", "NS configuration failed",
2358 evt_details=self._config_status_details)
2359
2360 yield from self.publish()
2361
2362 @asyncio.coroutine
2363 def is_active(self):
2364 """ This NS is active """
2365 self.set_state(NetworkServiceRecordState.RUNNING)
2366 if self._is_active:
2367 return
2368
2369 # Publish the NSR to DTS
2370 self._log.debug("Network service %s is active ", self.id)
2371 self._is_active = True
2372
2373 event_descr = "NSR in running state for NSR id %s" % self.id
2374 self.record_event("ns-running", event_descr)
2375
2376 yield from self.publish()
2377
2378 @asyncio.coroutine
2379 def instantiation_failed(self, failed_reason=None):
2380 """ The NS instantiation failed"""
2381 self._log.error("Network service id:%s, name:%s instantiation failed",
2382 self.id, self.name)
2383 self.set_state(NetworkServiceRecordState.FAILED)
2384
2385 event_descr = "Instantiation of NS %s failed" % self.id
2386 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2387
2388 # Publish the NSR to DTS
2389 yield from self.publish()
2390
2391 @asyncio.coroutine
2392 def terminate_vnfrs(self, vnfrs):
2393 """ Terminate VNFRS in this network service """
2394 self._log.debug("Terminating VNFs in network service %s", self.id)
2395 for vnfr in vnfrs:
2396 yield from self.nsm_plugin.terminate_vnf(vnfr)
2397
2398 @asyncio.coroutine
2399 def terminate(self):
2400 """ Terminate a NetworkServiceRecord."""
2401 def terminate_vnffgrs():
2402 """ Terminate VNFFGRS in this network service """
2403 self._log.debug("Terminating VNFFGRs in network service %s", self.id)
2404 for vnffgr in self.vnffgrs.values():
2405 yield from vnffgr.terminate()
2406
2407 def terminate_vlrs():
2408 """ Terminate VLRs in this netork service """
2409 self._log.debug("Terminating VLs in network service %s", self.id)
2410 for vlr in self.vlrs:
2411 yield from self.nsm_plugin.terminate_vl(vlr)
2412 vlr.state = VlRecordState.TERMINATED
2413 if vlr.id in self.vlr_uptime_tasks:
2414 self.vlr_uptime_tasks[vlr.id].cancel()
2415
2416 self._log.debug("Terminating network service id %s", self.id)
2417
2418 # Move the state to TERMINATE
2419 self.set_state(NetworkServiceRecordState.TERMINATE)
2420 event_descr = "Terminate being processed for NS Id:%s" % self.id
2421 self.record_event("terminate", event_descr)
2422
2423 # Move the state to VNF_TERMINATE_PHASE
2424 self._log.debug("Terminating VNFFGs in NS ID: %s", self.id)
2425 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2426 event_descr = "Terminating VNFFGS in NS Id:%s" % self.id
2427 self.record_event("terminating-vnffgss", event_descr)
2428 yield from terminate_vnffgrs()
2429
2430 # Move the state to VNF_TERMINATE_PHASE
2431 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2432 event_descr = "Terminating VNFS in NS Id:%s" % self.id
2433 self.record_event("terminating-vnfs", event_descr)
2434 yield from self.terminate_vnfrs(self.vnfrs.values())
2435
2436 # Move the state to VL_TERMINATE_PHASE
2437 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2438 event_descr = "Terminating VLs in NS Id:%s" % self.id
2439 self.record_event("terminating-vls", event_descr)
2440 yield from terminate_vlrs()
2441
2442 yield from self.nsm_plugin.terminate_ns(self)
2443
2444 # Move the state to TERMINATED
2445 self.set_state(NetworkServiceRecordState.TERMINATED)
2446 event_descr = "Terminated NS Id:%s" % self.id
2447 self.record_event("terminated", event_descr)
2448
2449 def enable(self):
2450 """"Enable a NetworkServiceRecord."""
2451 pass
2452
2453 def disable(self):
2454 """"Disable a NetworkServiceRecord."""
2455 pass
2456
2457 def map_config_status(self):
2458 self._log.debug("Config status for ns {} is {}".
2459 format(self.name, self._config_status))
2460 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2461 return 'configuring'
2462 if self._config_status == NsrYang.ConfigStates.FAILED:
2463 return 'failed'
2464 return 'configured'
2465
2466 def vl_phase_completed(self):
2467 """ Are VLs created in this NS?"""
2468 return self._vl_phase_completed
2469
2470 def vnf_phase_completed(self):
2471 """ Are VLs created in this NS?"""
2472 return self._vnf_phase_completed
2473
2474 def create_msg(self):
2475 """ The network serice record as a message """
2476 nsr_dict = {"ns_instance_config_ref": self.id}
2477 nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2478 #nsr.cloud_account = self.cloud_account_name
2479 nsr.sdn_account = self._sdn_account_name
2480 nsr.name_ref = self.name
2481 nsr.nsd_ref = self.nsd_id
2482 nsr.nsd_name_ref = self.nsd_msg.name
2483 nsr.operational_events = self._op_status.msg
2484 nsr.operational_status = self._op_status.yang_str()
2485 nsr.config_status = self.map_config_status()
2486 nsr.config_status_details = self._config_status_details
2487 nsr.create_time = self._create_time
2488 nsr.uptime = int(time.time()) - self._create_time
2489
2490 for cfg_prim in self.nsd_msg.service_primitive:
2491 cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2492 cfg_prim.as_dict())
2493 nsr.service_primitive.append(cfg_prim)
2494
2495 for init_cfg in self.nsd_msg.initial_config_primitive:
2496 prim = NsrYang.NsrInitialConfigPrimitive.from_dict(
2497 init_cfg.as_dict())
2498 nsr.initial_config_primitive.append(prim)
2499
2500 if self.vl_phase_completed():
2501 for vlr in self.vlrs:
2502 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2503
2504 if self.vnf_phase_completed():
2505 for vnfr_id in self.vnfrs:
2506 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2507 for vnffgr in self.vnffgrs.values():
2508 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2509 for scaling_group in self._scaling_groups.values():
2510 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2511
2512 return nsr
2513
2514 def all_vnfs_active(self):
2515 """ Are all VNFS in this NS active? """
2516 for _, vnfr in self.vnfrs.items():
2517 if vnfr.active is not True:
2518 return False
2519 return True
2520
2521 @asyncio.coroutine
2522 def update_state(self):
2523 """ Re-evaluate this NS's state """
2524 curr_state = self._op_status.state
2525
2526 if curr_state == NetworkServiceRecordState.TERMINATED:
2527 self._log.debug("NS (%s) in terminated state, not updating state", self.id)
2528 return
2529
2530 new_state = NetworkServiceRecordState.RUNNING
2531 self._log.info("Received update_state for nsr: %s, curr-state: %s",
2532 self.id, curr_state)
2533
2534 # Check all the VNFRs are present
2535 for _, vnfr in self.vnfrs.items():
2536 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2537 pass
2538 elif vnfr.state == VnfRecordState.FAILED:
2539 if vnfr._prev_state != vnfr.state:
2540 event_descr = "Instantiation of VNF %s failed" % vnfr.id
2541 event_error_details = vnfr.state_failed_reason
2542 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2543 vnfr.set_state(VnfRecordState.FAILED)
2544 else:
2545 self._log.info("VNF state did not change, curr=%s, prev=%s",
2546 vnfr.state, vnfr._prev_state)
2547 new_state = NetworkServiceRecordState.FAILED
2548 break
2549 else:
2550 self._log.info("VNF %s in NSR %s is still not active; current state is: %s",
2551 vnfr.id, self.id, vnfr.state)
2552 new_state = curr_state
2553
2554 # If new state is RUNNING; check all VLs
2555 if new_state == NetworkServiceRecordState.RUNNING:
2556 for vl in self.vlrs:
2557
2558 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2559 pass
2560 elif vl.state == VlRecordState.FAILED:
2561 if vl.prev_state != vl.state:
2562 event_descr = "Instantiation of VL %s failed" % vl.id
2563 event_error_details = vl.state_failed_reason
2564 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2565 vl.prev_state = vl.state
2566 else:
2567 self._log.debug("VL %s already in failed state")
2568 else:
2569 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2570 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2571 break
2572
2573 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2574 new_state = NetworkServiceRecordState.VL_TERMINATE
2575 break
2576
2577 # If new state is RUNNING; check VNFFGRs are also active
2578 if new_state == NetworkServiceRecordState.RUNNING:
2579 for _, vnffgr in self.vnffgrs.items():
2580 self._log.info("Checking vnffgr state for nsr %s is: %s",
2581 self.id, vnffgr.state)
2582 if vnffgr.state == VnffgRecordState.ACTIVE:
2583 pass
2584 elif vnffgr.state == VnffgRecordState.FAILED:
2585 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2586 self.record_event("vnffg-failed", event_descr)
2587 new_state = NetworkServiceRecordState.FAILED
2588 break
2589 else:
2590 self._log.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2591 vnffgr.id, self.id, vnffgr.state)
2592 new_state = curr_state
2593
2594 # Update all the scaling group instance operational status to
2595 # reflect the state of all VNFR within that instance
2596 yield from self._update_scale_group_instances_status()
2597
2598 for _, group in self._scaling_groups.items():
2599 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2600 new_state = NetworkServiceRecordState.SCALING_OUT
2601 break
2602 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2603 new_state = NetworkServiceRecordState.SCALING_IN
2604 break
2605
2606 if new_state != curr_state:
2607 self._log.debug("Changing state of Network service %s from %s to %s",
2608 self.id, curr_state, new_state)
2609 if new_state == NetworkServiceRecordState.RUNNING:
2610 yield from self.is_active()
2611 elif new_state == NetworkServiceRecordState.FAILED:
2612 # If the NS is already active and we entered scaling_in, scaling_out,
2613 # do not mark the NS as failing if scaling operation failed.
2614 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2615 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2616 new_state = NetworkServiceRecordState.RUNNING
2617 self.set_state(new_state)
2618 else:
2619 yield from self.instantiation_failed()
2620 else:
2621 self.set_state(new_state)
2622
2623 yield from self.publish()
2624
2625
2626 class InputParameterSubstitution(object):
2627 """
2628 This class is responsible for substituting input parameters into an NSD.
2629 """
2630
2631 def __init__(self, log):
2632 """Create an instance of InputParameterSubstitution
2633
2634 Arguments:
2635 log - a logger for this object to use
2636
2637 """
2638 self.log = log
2639
2640 def __call__(self, nsd, nsr_config):
2641 """Substitutes input parameters from the NSR config into the NSD
2642
2643 This call modifies the provided NSD with the input parameters that are
2644 contained in the NSR config.
2645
2646 Arguments:
2647 nsd - a GI NSD object
2648 nsr_config - a GI NSR config object
2649
2650 """
2651 if nsd is None or nsr_config is None:
2652 return
2653
2654 # Create a lookup of the xpath elements that this descriptor allows
2655 # to be modified
2656 optional_input_parameters = set()
2657 for input_parameter in nsd.input_parameter_xpath:
2658 optional_input_parameters.add(input_parameter.xpath)
2659
2660 # Apply the input parameters to the descriptor
2661 if nsr_config.input_parameter:
2662 for param in nsr_config.input_parameter:
2663 if param.xpath not in optional_input_parameters:
2664 msg = "tried to set an invalid input parameter ({})"
2665 self.log.error(msg.format(param.xpath))
2666 continue
2667
2668 self.log.debug(
2669 "input-parameter:{} = {}".format(
2670 param.xpath,
2671 param.value,
2672 )
2673 )
2674
2675 try:
2676 xpath.setxattr(nsd, param.xpath, param.value)
2677
2678 except Exception as e:
2679 self.log.exception(e)
2680
2681
2682 class NetworkServiceDescriptor(object):
2683 """
2684 Network service descriptor class
2685 """
2686
2687 def __init__(self, dts, log, loop, nsd, nsm):
2688 self._dts = dts
2689 self._log = log
2690 self._loop = loop
2691
2692 self._nsd = nsd
2693 self._nsm = nsm
2694
2695 @property
2696 def id(self):
2697 """ Returns nsd id """
2698 return self._nsd.id
2699
2700 @property
2701 def name(self):
2702 """ Returns name of nsd """
2703 return self._nsd.name
2704
2705 @property
2706 def msg(self):
2707 """ Return the message associated with this NetworkServiceDescriptor"""
2708 return self._nsd
2709
2710 @staticmethod
2711 def path_for_id(nsd_id):
2712 """ Return path for the passed nsd_id"""
2713 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id)
2714
2715 def path(self):
2716 """ Return the message associated with this NetworkServiceDescriptor"""
2717 return NetworkServiceDescriptor.path_for_id(self.id)
2718
2719 def update(self, nsd):
2720 """ Update the NSD descriptor """
2721 self._nsd = nsd
2722
2723
2724 class NsdDtsHandler(object):
2725 """ The network service descriptor DTS handler """
2726 XPATH = "C,/nsd:nsd-catalog/nsd:nsd"
2727
2728 def __init__(self, dts, log, loop, nsm):
2729 self._dts = dts
2730 self._log = log
2731 self._loop = loop
2732 self._nsm = nsm
2733
2734 self._regh = None
2735
2736 @property
2737 def regh(self):
2738 """ Return registration handle """
2739 return self._regh
2740
2741 @asyncio.coroutine
2742 def register(self):
2743 """ Register for Nsd create/update/delete/read requests from dts """
2744
2745 def on_apply(dts, acg, xact, action, scratch):
2746 """Apply the configuration"""
2747 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
2748 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2749 xact, action)
2750 # Create/Update an NSD record
2751 for cfg in self._regh.get_xact_elements(xact):
2752 # Only interested in those NSD cfgs whose ID was received in prepare callback
2753 if cfg.id in scratch.get('nsds', []) or is_recovery:
2754 self._nsm.update_nsd(cfg)
2755
2756 scratch.pop('nsds', None)
2757
2758 return RwTypes.RwStatus.SUCCESS
2759
2760 @asyncio.coroutine
2761 def delete_nsd_libs(nsd_id):
2762 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2763 try:
2764 rift_artifacts_dir = os.environ['RIFT_ARTIFACTS']
2765 nsd_dir = os.path.join(rift_artifacts_dir, 'launchpad/libs', nsd_id)
2766
2767 if os.path.exists (nsd_dir):
2768 shutil.rmtree(nsd_dir, ignore_errors=True)
2769 except Exception as e:
2770 self._log.error("Exception in cleaning up NSD libs {}: {}".
2771 format(nsd_id, e))
2772 self._log.excpetion(e)
2773
2774 @asyncio.coroutine
2775 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2776 """ Prepare callback from DTS for NSD config """
2777
2778 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
2779 msg.id, msg)
2780
2781 fref = ProtobufC.FieldReference.alloc()
2782 fref.goto_whole_message(msg.to_pbcm())
2783
2784 if fref.is_field_deleted():
2785 # Delete an NSD record
2786 self._log.debug("Deleting NSD with id %s", msg.id)
2787 yield from delete_nsd_libs(msg.id)
2788 self._nsm.delete_nsd(msg.id)
2789 else:
2790 # Add this NSD to scratch to create/update in apply callback
2791 nsds = scratch.setdefault('nsds', [])
2792 nsds.append(msg.id)
2793 # acg._scratch['nsds'].append(msg.id)
2794
2795 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2796
2797 self._log.debug(
2798 "Registering for NSD config using xpath: %s",
2799 NsdDtsHandler.XPATH,
2800 )
2801
2802 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2803 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2804 # Need a list in scratch to store NSDs to create/update later
2805 # acg._scratch['nsds'] = list()
2806 self._regh = acg.register(
2807 xpath=NsdDtsHandler.XPATH,
2808 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
2809 on_prepare=on_prepare)
2810
2811
2812 class VnfdDtsHandler(object):
2813 """ DTS handler for VNFD config changes """
2814 XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2815
2816 def __init__(self, dts, log, loop, nsm):
2817 self._dts = dts
2818 self._log = log
2819 self._loop = loop
2820 self._nsm = nsm
2821 self._regh = None
2822
2823 @property
2824 def regh(self):
2825 """ DTS registration handle """
2826 return self._regh
2827
2828 @asyncio.coroutine
2829 def register(self):
2830 """ Register for VNFD configuration"""
2831
2832 @asyncio.coroutine
2833 def on_apply(dts, acg, xact, action, scratch):
2834 """Apply the configuration"""
2835 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2836 xact, action, scratch)
2837
2838 # Create/Update a VNFD record
2839 for cfg in self._regh.get_xact_elements(xact):
2840 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2841 if cfg.id in scratch.get('vnfds', []):
2842 self._nsm.update_vnfd(cfg)
2843
2844 for cfg in self._regh.elements:
2845 if cfg.id in scratch.get('deleted_vnfds', []):
2846 yield from self._nsm.delete_vnfd(cfg.id)
2847
2848 scratch.pop('vnfds', None)
2849 scratch.pop('deleted_vnfds', None)
2850
2851 @asyncio.coroutine
2852 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
2853 """ on prepare callback """
2854 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2855 ks_path.to_xpath(RwNsmYang.get_schema()), xact_info.query_action, msg)
2856
2857 fref = ProtobufC.FieldReference.alloc()
2858 fref.goto_whole_message(msg.to_pbcm())
2859
2860 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2861 if fref.is_field_deleted():
2862 self._log.debug("Adding msg to deleted field")
2863 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
2864 deleted_vnfds.append(msg.id)
2865 else:
2866 # Add this VNFD to scratch to create/update in apply callback
2867 vnfds = scratch.setdefault('vnfds', [])
2868 vnfds.append(msg.id)
2869
2870 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
2871
2872 self._log.debug(
2873 "Registering for VNFD config using xpath: %s",
2874 VnfdDtsHandler.XPATH,
2875 )
2876 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
2877 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
2878 # Need a list in scratch to store VNFDs to create/update later
2879 # acg._scratch['vnfds'] = list()
2880 # acg._scratch['deleted_vnfds'] = list()
2881 self._regh = acg.register(
2882 xpath=VnfdDtsHandler.XPATH,
2883 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
2884 on_prepare=on_prepare)
2885
2886 class NsrRpcDtsHandler(object):
2887 """ The network service instantiation RPC DTS handler """
2888 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
2889 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
2890 NETCONF_IP_ADDRESS = "127.0.0.1"
2891 NETCONF_PORT = 2022
2892 RESTCONF_PORT = 8888
2893 NETCONF_USER = "admin"
2894 NETCONF_PW = "admin"
2895 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2896
2897 def __init__(self, dts, log, loop, nsm):
2898 self._dts = dts
2899 self._log = log
2900 self._loop = loop
2901 self._nsm = nsm
2902 self._nsd = None
2903
2904 self._ns_regh = None
2905
2906 self._manager = None
2907 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config'
2908
2909 self._model = RwYang.Model.create_libncx()
2910 self._model.load_schema_ypbc(RwNsrYang.get_schema())
2911
2912 @property
2913 def nsm(self):
2914 """ Return the NS manager instance """
2915 return self._nsm
2916
2917 @staticmethod
2918 def wrap_netconf_config_xml(xml):
2919 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
2920 return xml
2921
2922 @asyncio.coroutine
2923 def _connect(self, timeout_secs=240):
2924
2925 start_time = time.time()
2926 while (time.time() - start_time) < timeout_secs:
2927
2928 try:
2929 self._log.debug("Attemping NsmTasklet netconf connection.")
2930
2931 manager = yield from ncclient.asyncio_manager.asyncio_connect(
2932 loop=self._loop,
2933 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
2934 port=NsrRpcDtsHandler.NETCONF_PORT,
2935 username=NsrRpcDtsHandler.NETCONF_USER,
2936 password=NsrRpcDtsHandler.NETCONF_PW,
2937 allow_agent=False,
2938 look_for_keys=False,
2939 hostkey_verify=False,
2940 )
2941
2942 return manager
2943
2944 except ncclient.transport.errors.SSHError as e:
2945 self._log.warning("Netconf connection to launchpad %s failed: %s",
2946 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
2947
2948 yield from asyncio.sleep(5, loop=self._loop)
2949
2950 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2951 timeout_secs)
2952
2953 def _apply_ns_instance_config(self,payload_dict):
2954 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2955 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2956 response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False)
2957 return response
2958
2959 @asyncio.coroutine
2960 def register(self):
2961 """ Register for NS monitoring read from dts """
2962 @asyncio.coroutine
2963 def on_ns_config_prepare(xact_info, action, ks_path, msg):
2964 """ prepare callback from dts start-network-service"""
2965 assert action == rwdts.QueryAction.RPC
2966 rpc_ip = msg
2967 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
2968 "nsr_id":str(uuid.uuid4())
2969 })
2970
2971 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
2972 self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip))
2973
2974
2975 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
2976
2977 try:
2978 # Add used value to the pool
2979 self._log.debug("RPC output: {}".format(rpc_op))
2980
2981 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
2982
2983 #if not self._manager:
2984 # self._manager = yield from self._connect()
2985
2986 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2987 rpc_ip.name, rpc_ip.nsd_ref)
2988
2989 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
2990 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
2991 if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields}
2992 ns_instance_config_dict.update(ns_instance_config_copy_dict)
2993
2994 ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
2995 ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2996 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
2997
2998 payload_dict = ns_instance_config.to_json(self._model)
2999 #xml = ns_instance_config.to_xml_v2(self._model)
3000 #netconf_xml = self.wrap_netconf_config_xml(xml)
3001
3002 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
3003 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
3004 self._log.debug("Sending configure ns-instance-config json to %s: %s",
3005 self._nsr_config_url,ns_instance_config)
3006
3007 #response = yield from self._manager.edit_config(
3008 # target="running",
3009 # config=netconf_xml,
3010 # )
3011 response = yield from self._loop.run_in_executor(
3012 None,
3013 self._apply_ns_instance_config,
3014 payload_dict
3015 )
3016 response.raise_for_status()
3017 self._log.debug("Received edit config response: %s", response.json())
3018
3019 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3020 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3021 rpc_op)
3022 except Exception as e:
3023 self._log.error("Exception processing the "
3024 "start-network-service: {}".format(e))
3025 self._log.exception(e)
3026 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3027 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3028
3029
3030 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
3031
3032 with self._dts.group_create() as group:
3033 self._ns_regh = group.register(xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3034 handler=hdl_ns,
3035 flags=rwdts.Flag.PUBLISHER,
3036 )
3037
3038
3039 class NsrDtsHandler(object):
3040 """ The network service DTS handler """
3041 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3042 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3043 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3044
3045 def __init__(self, dts, log, loop, nsm):
3046 self._dts = dts
3047 self._log = log
3048 self._loop = loop
3049 self._nsm = nsm
3050
3051 self._nsr_regh = None
3052 self._scale_regh = None
3053 self._key_pair_regh = None
3054
3055 @property
3056 def nsm(self):
3057 """ Return the NS manager instance """
3058 return self._nsm
3059
3060 @asyncio.coroutine
3061 def register(self):
3062 """ Register for Nsr create/update/delete/read requests from dts """
3063
3064 def nsr_id_from_keyspec(ks):
3065 nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3066 nsr_id = nsr_path_entry.key00.id
3067 return nsr_id
3068
3069 def group_name_from_keyspec(ks):
3070 group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3071 group_name = group_path_entry.key00.scaling_group_name_ref
3072 return group_name
3073
3074 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3075 """ Return boolean indicating if scaling group instance was already commited previously.
3076
3077 By looking at the existing elements in this registration handle (elements not part
3078 of this current xact), we can tell if the instance was configured previously without
3079 keeping any application state.
3080 """
3081 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3082 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3083 elem_group_name = group_name_from_keyspec(keyspec)
3084
3085 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3086 continue
3087
3088 if instance_cfg.id == instance_id:
3089 return True
3090
3091 return False
3092
3093 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3094 delta = {"added": [], "deleted": []}
3095 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3096 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3097 if elem_nsr_id != nsr_id:
3098 continue
3099
3100 elem_group_name = group_name_from_keyspec(keyspec)
3101 if elem_group_name != group_name:
3102 continue
3103
3104 delta["added"].append(instance_cfg.id)
3105
3106 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3107 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3108 if elem_nsr_id != nsr_id:
3109 continue
3110
3111 elem_group_name = group_name_from_keyspec(keyspec)
3112 if elem_group_name != group_name:
3113 continue
3114
3115 if instance_cfg.id in delta["added"]:
3116 delta["added"].remove(instance_cfg.id)
3117 else:
3118 delta["deleted"].append(instance_cfg.id)
3119
3120 return delta
3121
3122 @asyncio.coroutine
3123 def update_nsr_nsd(nsr_id, xact, scratch):
3124
3125 @asyncio.coroutine
3126 def get_nsr_vl_delta(nsr_id, xact, scratch):
3127 delta = {"added": [], "deleted": []}
3128 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3129 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3130 if elem_nsr_id != nsr_id:
3131 continue
3132
3133 if 'vld' in instance_cfg.nsd:
3134 for vld in instance_cfg.nsd.vld:
3135 delta["added"].append(vld)
3136
3137 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3138 self._log.debug("NSR update: %s", instance_cfg)
3139 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3140 if elem_nsr_id != nsr_id:
3141 continue
3142
3143 if 'vld' in instance_cfg.nsd:
3144 for vld in instance_cfg.nsd.vld:
3145 if vld in delta["added"]:
3146 delta["added"].remove(vld)
3147 else:
3148 delta["deleted"].append(vld)
3149
3150 return delta
3151
3152 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3153 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3154
3155 for vld in vl_delta["added"]:
3156 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3157
3158 for vld in vl_delta["deleted"]:
3159 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3160
3161 def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch):
3162 # Unfortunately, it is currently difficult to figure out what has exactly
3163 # changed in this xact without Pbdelta support (RIFT-4916)
3164 # As a workaround, we can fetch the pre and post xact elements and
3165 # perform a comparison to figure out adds/deletes/updates
3166 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
3167 curr_cfgs = list(dts_member_reg.elements)
3168
3169 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
3170 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
3171
3172 # Find Adds
3173 added_keys = set(xact_key_map) - set(curr_key_map)
3174 added_cfgs = [xact_key_map[key] for key in added_keys]
3175
3176 # Find Deletes
3177 deleted_keys = set(curr_key_map) - set(xact_key_map)
3178 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
3179
3180 # Find Updates
3181 updated_keys = set(curr_key_map) & set(xact_key_map)
3182 updated_cfgs = [xact_key_map[key] for key in updated_keys
3183 if xact_key_map[key] != curr_key_map[key]]
3184
3185 return added_cfgs, deleted_cfgs, updated_cfgs
3186
3187 def get_nsr_key_pairs(dts_member_reg, xact):
3188 key_pairs = {}
3189 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3190 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3191 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3192 key_pairs[instance_cfg.name] = instance_cfg
3193 return key_pairs
3194
3195 def on_apply(dts, acg, xact, action, scratch):
3196 """Apply the configuration"""
3197 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3198 xact, action, scratch)
3199
3200 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3201 # Handle create nsr requests """
3202 # Do some validations
3203 if not msg.has_field("nsd"):
3204 err = "NSD not provided"
3205 self._log.error(err)
3206 raise NetworkServiceRecordError(err)
3207
3208 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3209 msg.id, msg.as_dict())
3210 nsr = self.nsm.create_nsr(msg, key_pairs=key_pairs, restart_mode=restart_mode)
3211 return nsr
3212
3213 def handle_delete_nsr(msg):
3214 @asyncio.coroutine
3215 def delete_instantiation(ns_id):
3216 """ Delete instantiation """
3217 with self._dts.transaction() as xact:
3218 yield from self._nsm.terminate_ns(ns_id, xact)
3219
3220 # Handle delete NSR requests
3221 self._log.info("Delete req for NSR Id: %s received", msg.id)
3222 # Terminate the NSR instance
3223 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3224
3225 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3226 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3227 nsr.record_event("terminate-rcvd", event_descr)
3228
3229 self._loop.create_task(delete_instantiation(msg.id))
3230
3231 @asyncio.coroutine
3232 def begin_instantiation(nsr):
3233 # Begin instantiation
3234 self._log.info("Beginning NS instantiation: %s", nsr.id)
3235 yield from self._nsm.instantiate_ns(nsr.id, xact)
3236
3237 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3238 xact, action, scratch)
3239
3240 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3241 key_pairs = []
3242 for element in self._key_pair_regh.elements:
3243 key_pairs.append(element)
3244 for element in self._nsr_regh.elements:
3245 nsr = handle_create_nsr(element, key_pairs, restart_mode=True)
3246 self._loop.create_task(begin_instantiation(nsr))
3247
3248
3249 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3250 xact,
3251 "id",
3252 scratch)
3253 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3254 deleted_msgs, updated_msgs)
3255
3256 for msg in added_msgs:
3257 if msg.id not in self._nsm.nsrs:
3258 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3259 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3260 nsr = handle_create_nsr(msg,key_pairs)
3261 self._loop.create_task(begin_instantiation(nsr))
3262
3263 for msg in deleted_msgs:
3264 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3265 try:
3266 handle_delete_nsr(msg)
3267 except Exception:
3268 self._log.exception("Failed to terminate NS:%s", msg.id)
3269
3270 for msg in updated_msgs:
3271 self._log.info("Update NSR received in on_apply: %s", msg)
3272
3273 self._nsm.nsr_update_cfg(msg.id, msg)
3274
3275 if 'nsd' in msg:
3276 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3277
3278 for group in msg.scaling_group:
3279 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3280 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3281
3282 for instance_id in instance_delta["added"]:
3283 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3284
3285 for instance_id in instance_delta["deleted"]:
3286 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3287
3288
3289 return RwTypes.RwStatus.SUCCESS
3290
3291 @asyncio.coroutine
3292 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3293 """ Prepare calllback from DTS for NSR """
3294
3295 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3296 action = xact_info.query_action
3297 self._log.debug(
3298 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3299 xact, action, xact_info, xpath, msg
3300 )
3301
3302 @asyncio.coroutine
3303 def delete_instantiation(ns_id):
3304 """ Delete instantiation """
3305 yield from self._nsm.terminate_ns(ns_id, None)
3306
3307 def handle_delete_nsr():
3308 """ Handle delete NSR requests """
3309 self._log.info("Delete req for NSR Id: %s received", msg.id)
3310 # Terminate the NSR instance
3311 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3312
3313 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3314 event_descr = "Terminate rcvd for NS Id:%s" % msg.id
3315 nsr.record_event("terminate-rcvd", event_descr)
3316
3317 self._loop.create_task(delete_instantiation(msg.id))
3318
3319 fref = ProtobufC.FieldReference.alloc()
3320 fref.goto_whole_message(msg.to_pbcm())
3321
3322 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3323 # if this is an NSR create
3324 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3325 # Ensure the Cloud account/datacenter has been specified
3326 if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"):
3327 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3328
3329 # Check if nsd is specified
3330 if not msg.has_field("nsd"):
3331 raise NsrInstantiationFailed("NSD not specified in NSR")
3332
3333 else:
3334 nsr = self._nsm.nsrs[msg.id]
3335
3336 if msg.has_field("nsd"):
3337 if nsr.state != NetworkServiceRecordState.RUNNING:
3338 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3339 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3340 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3341
3342 if msg.has_field("scaling_group"):
3343 self._log.debug("ScaleMsg %s", msg)
3344 self._log.debug("NSSCALINGSTATE %s", nsr.state)
3345 if nsr.state != NetworkServiceRecordState.RUNNING:
3346 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3347
3348 if len(msg.scaling_group) > 1:
3349 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3350
3351 for group_msg in msg.scaling_group:
3352 num_new_group_instances = len(group_msg.instance)
3353 if num_new_group_instances > 1:
3354 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3355
3356 elif num_new_group_instances == 1:
3357 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
3358 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
3359 if len(scale_group.instances) == scale_group.max_instance_count:
3360 raise ScalingOperationError("Max instances for %s reached" % scale_group)
3361
3362 acg.handle.prepare_complete_ok(xact_info.handle)
3363
3364
3365 self._log.debug("Registering for NSR config using xpath: %s",
3366 NsrDtsHandler.NSR_XPATH)
3367
3368 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3369 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3370 self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH,
3371 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3372 on_prepare=on_prepare)
3373
3374 self._scale_regh = acg.register(
3375 xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH,
3376 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
3377 )
3378
3379 self._key_pair_regh = acg.register(
3380 xpath=NsrDtsHandler.KEY_PAIR_XPATH,
3381 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3382 )
3383
3384
3385 class NsrOpDataDtsHandler(object):
3386 """ The network service op data DTS handler """
3387 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
3388
3389 def __init__(self, dts, log, loop, nsm):
3390 self._dts = dts
3391 self._log = log
3392 self._loop = loop
3393 self._nsm = nsm
3394 self._regh = None
3395
3396 @property
3397 def regh(self):
3398 """ Return the registration handle"""
3399 return self._regh
3400
3401 @property
3402 def nsm(self):
3403 """ Return the NS manager instance """
3404 return self._nsm
3405
3406 @asyncio.coroutine
3407 def register(self):
3408 """ Register for Nsr op data publisher registration"""
3409 self._log.debug("Registering Nsr op data path %s as publisher",
3410 NsrOpDataDtsHandler.XPATH)
3411
3412 hdl = rift.tasklets.DTS.RegistrationHandler()
3413 handlers = rift.tasklets.Group.Handler()
3414 with self._dts.group_create(handler=handlers) as group:
3415 self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH,
3416 handler=hdl,
3417 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE)
3418
3419 @asyncio.coroutine
3420 def create(self, path, msg):
3421 """
3422 Create an NS record in DTS with the path and message
3423 """
3424 self._log.debug("Creating NSR %s:%s", path, msg)
3425 self.regh.create_element(path, msg)
3426 self._log.debug("Created NSR, %s:%s", path, msg)
3427
3428 @asyncio.coroutine
3429 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
3430 """
3431 Update an NS record in DTS with the path and message
3432 """
3433 self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh)
3434 self.regh.update_element(path, msg, flags)
3435 self._log.debug("Updated NSR, %s:%s", path, msg)
3436
3437 @asyncio.coroutine
3438 def delete(self, path):
3439 """
3440 Update an NS record in DTS with the path and message
3441 """
3442 self._log.debug("Deleting NSR path:%s", path)
3443 self.regh.delete_element(path)
3444 self._log.debug("Deleted NSR path:%s", path)
3445
3446
3447 class VnfrDtsHandler(object):
3448 """ The virtual network service DTS handler """
3449 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3450
3451 def __init__(self, dts, log, loop, nsm):
3452 self._dts = dts
3453 self._log = log
3454 self._loop = loop
3455 self._nsm = nsm
3456
3457 self._regh = None
3458
3459 @property
3460 def regh(self):
3461 """ Return registration handle """
3462 return self._regh
3463
3464 @property
3465 def nsm(self):
3466 """ Return the NS manager instance """
3467 return self._nsm
3468
3469 @asyncio.coroutine
3470 def register(self):
3471 """ Register for vnfr create/update/delete/ advises from dts """
3472
3473 def on_commit(xact_info):
3474 """ The transaction has been committed """
3475 self._log.debug("Got vnfr commit (xact_info: %s)", xact_info)
3476 return rwdts.MemberRspCode.ACTION_OK
3477
3478 @asyncio.coroutine
3479 def on_prepare(xact_info, action, ks_path, msg):
3480 """ prepare callback from dts """
3481 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3482 self._log.debug(
3483 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3484 xact_info, action, ks_path, msg
3485 )
3486
3487 schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema()
3488 path_entry = schema.keyspec_to_entry(ks_path)
3489 if path_entry.key00.id not in self._nsm._vnfrs:
3490 self._log.error("%s request for non existent record path %s",
3491 action, xpath)
3492 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
3493
3494 return
3495
3496 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3497 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3498 yield from self._nsm.update_vnfr(msg)
3499 elif action == rwdts.QueryAction.DELETE:
3500 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
3501 self._nsm.delete_vnfr(path_entry.key00.id)
3502
3503 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
3504
3505 self._log.debug("Registering for VNFR using xpath: %s",
3506 VnfrDtsHandler.XPATH,)
3507
3508 hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit,
3509 on_prepare=on_prepare,)
3510 with self._dts.group_create() as group:
3511 self._regh = group.register(xpath=VnfrDtsHandler.XPATH,
3512 handler=hdl,
3513 flags=(rwdts.Flag.SUBSCRIBER),)
3514
3515
3516 class NsManager(object):
3517 """ The Network Service Manager class"""
3518 def __init__(self, dts, log, loop,
3519 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
3520 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
3521 self._dts = dts
3522 self._log = log
3523 self._loop = loop
3524 self._nsr_handler = nsr_handler
3525 self._vnfr_pub_handler = vnfr_handler
3526 self._vlr_pub_handler = vlr_handler
3527 self._vnffgmgr = vnffgmgr
3528 self._vnfd_pub_handler = vnfd_pub_handler
3529 self._cloud_account_handler = cloud_account_handler
3530
3531 self._ro_plugin_selector = ro_plugin_selector
3532
3533 # Intialize the set of variables for implementing Scaling RPC using REST.
3534 self._headers = {"content-type":"application/json", "accept":"application/json"}
3535 #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed.
3536 self._user = 'admin'
3537 self._password = 'admin'
3538 self._ip = 'localhost'
3539 self._rport = 8008
3540 self._conf_url = "https://{ip}:{port}/api/config". \
3541 format(ip=self._ip,
3542 port=self._rport)
3543
3544 self._nsrs = {}
3545 self._nsds = {}
3546 self._vnfds = {}
3547 self._vnfrs = {}
3548
3549 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
3550
3551 # TODO: All these handlers should move to tasklet level.
3552 # Passing self is often an indication of bad design
3553 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
3554 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
3555 self._dts_handlers = [self._nsd_dts_handler,
3556 VnfrDtsHandler(dts, log, loop, self),
3557 NsrDtsHandler(dts, log, loop, self),
3558 ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback),
3559 NsrRpcDtsHandler(dts,log,loop,self),
3560 self._vnfd_dts_handler,
3561 self.cfgmgr_obj,
3562 ]
3563
3564
3565 @property
3566 def log(self):
3567 """ Log handle """
3568 return self._log
3569
3570 @property
3571 def loop(self):
3572 """ Loop """
3573 return self._loop
3574
3575 @property
3576 def dts(self):
3577 """ DTS handle """
3578 return self._dts
3579
3580 @property
3581 def nsr_handler(self):
3582 """" NSR handler """
3583 return self._nsr_handler
3584
3585 @property
3586 def so_obj(self):
3587 """" So Obj handler """
3588 return self._so_obj
3589
3590 @property
3591 def nsrs(self):
3592 """ NSRs in this NSM"""
3593 return self._nsrs
3594
3595 @property
3596 def nsds(self):
3597 """ NSDs in this NSM"""
3598 return self._nsds
3599
3600 @property
3601 def vnfds(self):
3602 """ VNFDs in this NSM"""
3603 return self._vnfds
3604
3605 @property
3606 def vnfrs(self):
3607 """ VNFRs in this NSM"""
3608 return self._vnfrs
3609
3610 @property
3611 def nsr_pub_handler(self):
3612 """ NSR publication handler """
3613 return self._nsr_handler
3614
3615 @property
3616 def vnfr_pub_handler(self):
3617 """ VNFR publication handler """
3618 return self._vnfr_pub_handler
3619
3620 @property
3621 def vlr_pub_handler(self):
3622 """ VLR publication handler """
3623 return self._vlr_pub_handler
3624
3625 @property
3626 def vnfd_pub_handler(self):
3627 return self._vnfd_pub_handler
3628
3629 @asyncio.coroutine
3630 def register(self):
3631 """ Register all static DTS handlers """
3632 for dts_handle in self._dts_handlers:
3633 yield from dts_handle.register()
3634
3635
3636 def get_ns_by_nsr_id(self, nsr_id):
3637 """ get NSR by nsr id """
3638 if nsr_id not in self._nsrs:
3639 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
3640
3641 return self._nsrs[nsr_id]
3642
3643 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
3644 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3645 nsr_id,
3646 scale_group_name,
3647 instance_id
3648 )
3649 nsr = self._nsrs[nsr_id]
3650 if nsr.state != NetworkServiceRecordState.RUNNING:
3651 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3652
3653 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
3654
3655 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
3656 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3657 nsr_id,
3658 scale_group_name,
3659 instance_id,
3660 )
3661 nsr = self._nsrs[nsr_id]
3662 if nsr.state != NetworkServiceRecordState.RUNNING:
3663 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3664
3665 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
3666
3667 def scale_rpc_callback(self, xact, msg, action):
3668 """Callback handler for RPC calls
3669 Args:
3670 xact : Transaction Handler
3671 msg : RPC input
3672 action : Scaling Action
3673 """
3674 def get_scaling_group_information():
3675 scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
3676 output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False)
3677 if output.text == None or len(output.text) == 0:
3678 self.log.error("nsr id %s information not present", self._nsr_id)
3679 return None
3680 scaling_group_info = json.loads(output.text)
3681 return scaling_group_info
3682
3683 def config_scaling_group_information(scaling_group_info):
3684 data_str = json.dumps(scaling_group_info)
3685 self.log.debug("scaling group Info %s", data_str)
3686
3687 scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
3688 response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers)
3689 response.raise_for_status()
3690
3691 def scale_out():
3692 scaling_group_info = get_scaling_group_information()
3693 if scaling_group_info is None:
3694 return
3695
3696 scaling_group_present = False
3697 if "scaling-group" in scaling_group_info["nsr:nsr"]:
3698 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
3699 for scaling_group in scaling_group_array:
3700 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
3701 scaling_group_present = True
3702 if 'instance' not in scaling_group:
3703 scaling_group['instance'] = []
3704 for instance in scaling_group['instance']:
3705 if instance["id"] == int(msg.instance_id):
3706 self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id)
3707 return
3708 scaling_group["instance"].append({"id": int(msg.instance_id)})
3709
3710 if not scaling_group_present:
3711 scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}]
3712
3713 config_scaling_group_information(scaling_group_info)
3714 return
3715
3716 def scale_in():
3717 scaling_group_info = get_scaling_group_information()
3718 if scaling_group_info is None:
3719 return
3720
3721 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
3722 scaling_group_present = False
3723 instance_id_present = False
3724 for scaling_group in scaling_group_array:
3725 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
3726 scaling_group_present = True
3727 if 'instance' in scaling_group:
3728 instance_array = scaling_group["instance"];
3729 for index in range(len(instance_array)):
3730 if instance_array[index]["id"] == int(msg.instance_id):
3731 instance_array.pop(index)
3732 instance_id_present = True
3733 break
3734
3735 if not scaling_group_present:
3736 self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref)
3737 return
3738
3739 if not instance_id_present:
3740 self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id)
3741 return
3742
3743 config_scaling_group_information(scaling_group_info)
3744 return
3745
3746 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3747 self._loop.run_in_executor(None, scale_out)
3748 else:
3749 self._loop.run_in_executor(None, scale_in)
3750
3751 def nsr_update_cfg(self, nsr_id, msg):
3752 nsr = self._nsrs[nsr_id]
3753 nsr.nsr_cfg_msg= msg
3754
3755 def nsr_instantiate_vl(self, nsr_id, vld):
3756 self.log.debug("NSR {} create VL {}".format(nsr_id, vld))
3757 nsr = self._nsrs[nsr_id]
3758 if nsr.state != NetworkServiceRecordState.RUNNING:
3759 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3760
3761 # Not calling in a separate task as this is called from a separate task
3762 yield from nsr.create_vl_instance(vld)
3763
3764 def nsr_terminate_vl(self, nsr_id, vld):
3765 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
3766 nsr = self._nsrs[nsr_id]
3767 if nsr.state != NetworkServiceRecordState.RUNNING:
3768 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3769
3770 # Not calling in a separate task as this is called from a separate task
3771 yield from nsr.delete_vl_instance(vld)
3772
3773 def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False):
3774 """ Create an NSR instance """
3775 self._log.debug("NSRMSG %s", nsr_msg)
3776 if nsr_msg.id in self._nsrs:
3777 msg = "NSR id %s already exists" % nsr_msg.id
3778 self._log.error(msg)
3779 raise NetworkServiceRecordError(msg)
3780
3781 self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3782 nsr_msg.id,
3783 nsr_msg.nsd.id)
3784
3785 nsm_plugin = self._ro_plugin_selector.ro_plugin
3786 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account)
3787
3788 nsr = NetworkServiceRecord(self._dts,
3789 self._log,
3790 self._loop,
3791 self,
3792 nsm_plugin,
3793 nsr_msg,
3794 sdn_account_name,
3795 key_pairs,
3796 restart_mode=restart_mode,
3797 vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr
3798 )
3799 self._nsrs[nsr_msg.id] = nsr
3800 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs)
3801
3802 return nsr
3803
3804 def delete_nsr(self, nsr_id):
3805 """
3806 Delete NSR with the passed nsr id
3807 """
3808 del self._nsrs[nsr_id]
3809
3810 @asyncio.coroutine
3811 def instantiate_ns(self, nsr_id, config_xact):
3812 """ Instantiate an NS instance """
3813 self._log.debug("Instantiating Network service id %s", nsr_id)
3814 if nsr_id not in self._nsrs:
3815 err = "NSR id %s not found " % nsr_id
3816 self._log.error(err)
3817 raise NetworkServiceRecordError(err)
3818
3819 nsr = self._nsrs[nsr_id]
3820 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
3821
3822 @asyncio.coroutine
3823 def update_vnfr(self, vnfr):
3824 """Create/Update an VNFR """
3825
3826 vnfr_state = self._vnfrs[vnfr.id].state
3827 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
3828
3829 yield from self._vnfrs[vnfr.id].update_state(vnfr)
3830 nsr = self.find_nsr_for_vnfr(vnfr.id)
3831 yield from nsr.update_state()
3832
3833 def find_nsr_for_vnfr(self, vnfr_id):
3834 """ Find the NSR which )has the passed vnfr id"""
3835 for nsr in list(self.nsrs.values()):
3836 for vnfr in list(nsr.vnfrs.values()):
3837 if vnfr.id == vnfr_id:
3838 return nsr
3839 return None
3840
3841 def delete_vnfr(self, vnfr_id):
3842 """ Delete VNFR with the passed id"""
3843 del self._vnfrs[vnfr_id]
3844
3845 @asyncio.coroutine
3846 def get_nsr_config(self, nsd_id):
3847 xpath = "C,/nsr:ns-instance-config"
3848 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
3849
3850 for result in results:
3851 entry = yield from result
3852 ns_instance_config = entry.result
3853
3854 for nsr in ns_instance_config.nsr:
3855 if nsr.nsd.id == nsd_id:
3856 return nsr
3857
3858 return None
3859
3860 def get_nsd(self, nsd_id):
3861 """ Get network service descriptor for the passed nsd_id"""
3862 if nsd_id not in self._nsds:
3863 self._log.error("Cannot find NSD id:%s", nsd_id)
3864 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
3865
3866 return self._nsds[nsd_id]
3867
3868 def create_nsd(self, nsd_msg):
3869 """ Create a network service descriptor """
3870 self._log.debug("Create network service descriptor - %s", nsd_msg)
3871 if nsd_msg.id in self._nsds:
3872 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
3873 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
3874
3875 nsd = NetworkServiceDescriptor(
3876 self._dts,
3877 self._log,
3878 self._loop,
3879 nsd_msg,
3880 self
3881 )
3882 self._nsds[nsd_msg.id] = nsd
3883
3884 return nsd
3885
3886 def update_nsd(self, nsd):
3887 """ update the Network service descriptor """
3888 self._log.debug("Update network service descriptor - %s", nsd)
3889 if nsd.id not in self._nsds:
3890 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
3891 self.create_nsd(nsd)
3892 else:
3893 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
3894 self._nsds[nsd.id].update(nsd)
3895
3896 def delete_nsd(self, nsd_id):
3897 """ Delete the Network service descriptor with the passed id """
3898 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
3899 if nsd_id not in self._nsds:
3900 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
3901 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
3902 del self._nsds[nsd_id]
3903
3904 def get_vnfd_config(self, xact):
3905 vnfd_dts_reg = self._vnfd_dts_handler.regh
3906 for cfg in vnfd_dts_reg.get_xact_elements(xact):
3907 if cfg.id not in self._vnfds:
3908 self.create_vnfd(cfg)
3909
3910 def get_vnfd(self, vnfd_id, xact):
3911 """ Get virtual network function descriptor for the passed vnfd_id"""
3912 if vnfd_id not in self._vnfds:
3913 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3914 self.get_vnfd_config(xact)
3915
3916 if vnfd_id not in self._vnfds:
3917 self._log.error("Cannot find VNFD id:%s", vnfd_id)
3918 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
3919
3920 return self._vnfds[vnfd_id]
3921
3922 def create_vnfd(self, vnfd):
3923 """ Create a virtual network function descriptor """
3924 self._log.debug("Create virtual network function descriptor - %s", vnfd)
3925 if vnfd.id in self._vnfds:
3926 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
3927 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
3928
3929 self._vnfds[vnfd.id] = vnfd
3930 return self._vnfds[vnfd.id]
3931
3932 def update_vnfd(self, vnfd):
3933 """ Update the virtual network function descriptor """
3934 self._log.debug("Update virtual network function descriptor- %s", vnfd)
3935
3936
3937 if vnfd.id not in self._vnfds:
3938 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
3939 self.create_vnfd(vnfd)
3940 else:
3941 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
3942 self._vnfds[vnfd.id] = vnfd
3943
3944 @asyncio.coroutine
3945 def delete_vnfd(self, vnfd_id):
3946 """ Delete the virtual network function descriptor with the passed id """
3947 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
3948 if vnfd_id not in self._vnfds:
3949 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
3950 raise VnfDescriptorError("Cannot find %s", vnfd_id)
3951
3952 del self._vnfds[vnfd_id]
3953
3954 @asyncio.coroutine
3955 def publish_nsr(self, xact, path, msg):
3956 """ Publish a NSR """
3957 self._log.debug("Publish NSR with path %s, msg %s",
3958 path, msg)
3959 yield from self.nsr_handler.update(xact, path, msg)
3960
3961 @asyncio.coroutine
3962 def unpublish_nsr(self, xact, path):
3963 """ Un Publish an NSR """
3964 self._log.debug("Publishing delete NSR with path %s", path)
3965 yield from self.nsr_handler.delete(path, xact)
3966
3967 def vnfr_is_ready(self, vnfr_id):
3968 """ VNFR with the id is ready """
3969 self._log.debug("VNFR id %s ready", vnfr_id)
3970 if vnfr_id not in self._vnfds:
3971 err = "Did not find VNFR ID with id %s" % vnfr_id
3972 self._log.critical("err")
3973 raise VirtualNetworkFunctionRecordError(err)
3974 self._vnfrs[vnfr_id].is_ready()
3975
3976
3977 @asyncio.coroutine
3978 def terminate_ns(self, nsr_id, xact):
3979 """
3980 Terminate network service for the given NSR Id
3981 """
3982
3983 # Terminate the instances/networks assocaited with this nw service
3984 self._log.debug("Terminating the network service %s", nsr_id)
3985 try :
3986 yield from self._nsrs[nsr_id].terminate()
3987 except Exception as e:
3988 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
3989
3990 # Unpublish the NSR record
3991 self._log.debug("Unpublishing the network service %s", nsr_id)
3992 yield from self._nsrs[nsr_id].unpublish(xact)
3993
3994 # Finaly delete the NS instance from this NS Manager
3995 self._log.debug("Deletng the network service %s", nsr_id)
3996 self.delete_nsr(nsr_id)
3997
3998
3999 class NsmRecordsPublisherProxy(object):
4000 """ This class provides a publisher interface that allows plugin objects
4001 to publish NSR/VNFR/VLR"""
4002
4003 def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr):
4004 self._dts = dts
4005 self._log = log
4006 self._loop = loop
4007 self._nsr_pub_hdlr = nsr_pub_hdlr
4008 self._vlr_pub_hdlr = vlr_pub_hdlr
4009 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4010
4011 @asyncio.coroutine
4012 def publish_nsr(self, xact, nsr):
4013 """ Publish an NSR """
4014 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4015 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4016
4017 @asyncio.coroutine
4018 def unpublish_nsr(self, xact, nsr):
4019 """ Unpublish an NSR """
4020 path = NetworkServiceRecord.xpath_from_nsr(nsr)
4021 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4022
4023 @asyncio.coroutine
4024 def publish_vnfr(self, xact, vnfr):
4025 """ Publish an VNFR """
4026 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4027 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4028
4029 @asyncio.coroutine
4030 def unpublish_vnfr(self, xact, vnfr):
4031 """ Unpublish a VNFR """
4032 path = VirtualNetworkFunctionRecord.vnfr_xpath(vnfr)
4033 return (yield from self._vnfr_pub_hdlr.delete(xact, path))
4034
4035 @asyncio.coroutine
4036 def publish_vlr(self, xact, vlr):
4037 """ Publish a VLR """
4038 path = VirtualLinkRecord.vlr_xpath(vlr)
4039 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4040
4041 @asyncio.coroutine
4042 def unpublish_vlr(self, xact, vlr):
4043 """ Unpublish a VLR """
4044 path = VirtualLinkRecord.vlr_xpath(vlr)
4045 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4046
4047
4048 class ScalingRpcHandler(mano_dts.DtsHandler):
4049 """ The Network service Monitor DTS handler """
4050 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4051 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4052
4053 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4054 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4055
4056 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4057
4058 def __init__(self, log, dts, loop, callback=None):
4059 super().__init__(log, dts, loop)
4060 self.callback = callback
4061 self.last_instance_id = defaultdict(int)
4062
4063 @asyncio.coroutine
4064 def register(self):
4065
4066 @asyncio.coroutine
4067 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4068 assert action == rwdts.QueryAction.RPC
4069
4070 try:
4071 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4072 "instance_id": msg.instance_id})
4073
4074 xact_info.respond_xpath(
4075 rwdts.XactRspCode.ACK,
4076 self.__class__.SCALE_IN_OUTPUT_XPATH,
4077 rpc_op)
4078
4079 if self.callback:
4080 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4081 except Exception as e:
4082 self.log.exception(e)
4083 xact_info.respond_xpath(
4084 rwdts.XactRspCode.NACK,
4085 self.__class__.SCALE_IN_OUTPUT_XPATH)
4086
4087 @asyncio.coroutine
4088 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4089 assert action == rwdts.QueryAction.RPC
4090
4091 try:
4092 scaling_group = msg.scaling_group_name_ref
4093 if not msg.instance_id:
4094 last_instance_id = self.last_instance_id[scale_group]
4095 msg.instance_id = last_instance_id + 1
4096 self.last_instance_id[scale_group] += 1
4097
4098 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4099 "instance_id": msg.instance_id})
4100
4101 xact_info.respond_xpath(
4102 rwdts.XactRspCode.ACK,
4103 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4104 rpc_op)
4105
4106 if self.callback:
4107 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4108 except Exception as e:
4109 self.log.exception(e)
4110 xact_info.respond_xpath(
4111 rwdts.XactRspCode.NACK,
4112 self.__class__.SCALE_OUT_OUTPUT_XPATH)
4113
4114 scale_in_hdl = rift.tasklets.DTS.RegistrationHandler(
4115 on_prepare=on_scale_in_prepare)
4116 scale_out_hdl = rift.tasklets.DTS.RegistrationHandler(
4117 on_prepare=on_scale_out_prepare)
4118
4119 with self.dts.group_create() as group:
4120 group.register(
4121 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4122 handler=scale_in_hdl,
4123 flags=rwdts.Flag.PUBLISHER)
4124 group.register(
4125 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4126 handler=scale_out_hdl,
4127 flags=rwdts.Flag.PUBLISHER)
4128
4129
4130 class NsmTasklet(rift.tasklets.Tasklet):
4131 """
4132 The network service manager tasklet
4133 """
4134 def __init__(self, *args, **kwargs):
4135 super(NsmTasklet, self).__init__(*args, **kwargs)
4136 self.rwlog.set_category("rw-mano-log")
4137 self.rwlog.set_subcategory("nsm")
4138
4139 self._dts = None
4140 self._nsm = None
4141
4142 self._ro_plugin_selector = None
4143 self._vnffgmgr = None
4144
4145 self._nsr_handler = None
4146 self._vnfr_pub_handler = None
4147 self._vlr_pub_handler = None
4148 self._vnfd_pub_handler = None
4149 self._scale_cfg_handler = None
4150
4151 self._records_publisher_proxy = None
4152
4153 def start(self):
4154 """ The task start callback """
4155 super(NsmTasklet, self).start()
4156 self.log.info("Starting NsmTasklet")
4157
4158 self.log.debug("Registering with dts")
4159 self._dts = rift.tasklets.DTS(self.tasklet_info,
4160 RwNsmYang.get_schema(),
4161 self.loop,
4162 self.on_dts_state_change)
4163
4164 self.log.debug("Created DTS Api GI Object: %s", self._dts)
4165
4166 def stop(self):
4167 try:
4168 self._dts.deinit()
4169 except Exception:
4170 print("Caught Exception in NSM stop:", sys.exc_info()[0])
4171 raise
4172
4173 def on_instance_started(self):
4174 """ Task instance started callback """
4175 self.log.debug("Got instance started callback")
4176
4177 @asyncio.coroutine
4178 def init(self):
4179 """ Task init callback """
4180 self.log.debug("Got instance started callback")
4181
4182 self.log.debug("creating config account handler")
4183
4184 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop)
4185 yield from self._nsr_pub_handler.register()
4186
4187 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop)
4188 yield from self._vnfr_pub_handler.register()
4189
4190 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop)
4191 yield from self._vlr_pub_handler.register()
4192
4193 manifest = self.tasklet_info.get_pb_manifest()
4194 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4195 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4196 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4197
4198 self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop)
4199
4200 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4201 self._dts,
4202 self.log,
4203 self.loop,
4204 self._nsr_pub_handler,
4205 self._vnfr_pub_handler,
4206 self._vlr_pub_handler,
4207 )
4208
4209 # Register the NSM to receive the nsm plugin
4210 # when cloud account is configured
4211 self._ro_plugin_selector = cloud.ROAccountPluginSelector(
4212 self._dts,
4213 self.log,
4214 self.loop,
4215 self._records_publisher_proxy,
4216 )
4217 yield from self._ro_plugin_selector.register()
4218
4219 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4220 self._log,
4221 self._dts,
4222 self.log_hdl)
4223
4224 yield from self._cloud_account_handler.register()
4225
4226 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop)
4227 yield from self._vnffgmgr.register()
4228
4229 self._nsm = NsManager(
4230 self._dts,
4231 self.log,
4232 self.loop,
4233 self._nsr_pub_handler,
4234 self._vnfr_pub_handler,
4235 self._vlr_pub_handler,
4236 self._ro_plugin_selector,
4237 self._vnffgmgr,
4238 self._vnfd_pub_handler,
4239 self._cloud_account_handler
4240 )
4241
4242 yield from self._nsm.register()
4243
4244 @asyncio.coroutine
4245 def run(self):
4246 """ Task run callback """
4247 pass
4248
4249 @asyncio.coroutine
4250 def on_dts_state_change(self, state):
4251 """Take action according to current dts state to transition
4252 application into the corresponding application state
4253
4254 Arguments
4255 state - current dts state
4256 """
4257 switch = {
4258 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
4259 rwdts.State.CONFIG: rwdts.State.RUN,
4260 }
4261
4262 handlers = {
4263 rwdts.State.INIT: self.init,
4264 rwdts.State.RUN: self.run,
4265 }
4266
4267 # Transition application to next state
4268 handler = handlers.get(state, None)
4269 if handler is not None:
4270 yield from handler()
4271
4272 # Transition dts to next state
4273 next_state = switch.get(state, None)
4274 if next_state is not None:
4275 self.log.debug("Changing state to %s", next_state)
4276 self._dts.handle.set_state(next_state)