update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwnsmtasklet.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
18 import asyncio
19 import gi
20 import json
21 import ncclient
22 import ncclient.asyncio_manager
23 import os
24 import requests
25 import shutil
26 import sys
27 import tempfile
28 import time
29 import uuid
30 import yaml
31
32 from collections import defaultdict
33 from collections import deque
34 from enum import Enum
35 from urllib.parse import urlparse
36
37 # disable unsigned certificate warning
38 from requests.packages.urllib3.exceptions import InsecureRequestWarning
39 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
40
41 gi.require_version('RwYang', '1.0')
42 gi.require_version('NsdBaseYang', '1.0')
43 gi.require_version('ProjectNsdYang', '1.0')
44 gi.require_version('RwDts', '1.0')
45 gi.require_version('RwNsmYang', '1.0')
46 gi.require_version('RwNsrYang', '1.0')
47 gi.require_version('NsrYang', '1.0')
48 gi.require_version('RwTypes', '1.0')
49 gi.require_version('RwVlrYang', '1.0')
50 gi.require_version('RwVnfrYang', '1.0')
51 gi.require_version('VnfrYang', '1.0')
52 gi.require_version('ProjectVnfdYang', '1.0')
53 from gi.repository import (
54 RwYang,
55 RwNsrYang,
56 NsrYang,
57 NsdBaseYang,
58 ProjectNsdYang as NsdYang,
59 RwVlrYang,
60 VnfrYang,
61 RwVnfrYang,
62 RwNsmYang,
63 RwsdnalYang,
64 RwDts as rwdts,
65 RwTypes,
66 ProjectVnfdYang,
67 ProtobufC,
68 )
69 gi.require_version('RwKeyspec', '1.0')
70 from gi.repository.RwKeyspec import quoted_key
71
72 from rift.mano.utils.ssh_keys import ManoSshKey
73 import rift.mano.ncclient
74 import rift.mano.config_data.config
75 import rift.mano.dts as mano_dts
76 import rift.tasklets
77 from rift.mano.utils.project import (
78 ManoProject,
79 ProjectHandler,
80 get_add_delete_update_cfgs,
81 DEFAULT_PROJECT,
82 )
83
84 from . import rwnsm_conman as conman
85 from . import cloud
86 from . import publisher
87 from . import subscriber
88 from . import xpath
89 from . import config_value_pool
90 from . import rwvnffgmgr
91 from . import scale_group
92 from . import rwnsmplugin
93 from . import openmano_nsm
94 import functools
95 import collections
96
97 class NetworkServiceRecordState(Enum):
98 """ Network Service Record State """
99 INIT = 101
100 VL_INIT_PHASE = 102
101 VNF_INIT_PHASE = 103
102 VNFFG_INIT_PHASE = 104
103 RUNNING = 106
104 SCALING_OUT = 107
105 SCALING_IN = 108
106 TERMINATE = 109
107 TERMINATE_RCVD = 110
108 VL_TERMINATE_PHASE = 111
109 VNF_TERMINATE_PHASE = 112
110 VNFFG_TERMINATE_PHASE = 113
111 TERMINATED = 114
112 FAILED = 115
113 VL_INSTANTIATE = 116
114 VL_TERMINATE = 117
115
116
117 class NetworkServiceRecordError(Exception):
118 """ Network Service Record Error """
119 pass
120
121
122 class NetworkServiceDescriptorError(Exception):
123 """ Network Service Descriptor Error """
124 pass
125
126
127 class VirtualNetworkFunctionRecordError(Exception):
128 """ Virtual Network Function Record Error """
129 pass
130
131
132 class NetworkServiceDescriptorNotFound(Exception):
133 """ Cannot find Network Service Descriptor"""
134 pass
135
136
137 class NetworkServiceDescriptorNotFound(Exception):
138 """ Network Service Descriptor reference count exists """
139 pass
140
141 class NsrInstantiationFailed(Exception):
142 """ Failed to instantiate network service """
143 pass
144
145
146 class VnfInstantiationFailed(Exception):
147 """ Failed to instantiate virtual network function"""
148 pass
149
150
151 class VnffgInstantiationFailed(Exception):
152 """ Failed to instantiate virtual network function"""
153 pass
154
155
156 class VnfDescriptorError(Exception):
157 """Failed to instantiate virtual network function"""
158 pass
159
160
161 class ScalingOperationError(Exception):
162 pass
163
164
165 class ScaleGroupMissingError(Exception):
166 pass
167
168
169 class PlacementGroupError(Exception):
170 pass
171
172
173 class NsrNsdUpdateError(Exception):
174 pass
175
176
177 class NsrVlUpdateError(NsrNsdUpdateError):
178 pass
179
180 class VirtualLinkRecordError(Exception):
181 """ Virtual Links Record Error """
182 pass
183
184
185 class VlRecordState(Enum):
186 """ VL Record State """
187 INIT = 101
188 INSTANTIATION_PENDING = 102
189 ACTIVE = 103
190 TERMINATE_PENDING = 104
191 TERMINATED = 105
192 FAILED = 106
193
194
195 class VnffgRecordState(Enum):
196 """ VNFFG Record State """
197 INIT = 101
198 INSTANTIATION_PENDING = 102
199 ACTIVE = 103
200 TERMINATE_PENDING = 104
201 TERMINATED = 105
202 FAILED = 106
203
204
205 class VnffgRecord(object):
206 """ Vnffg Records class"""
207 SFF_DP_PORT = 4790
208 SFF_MGMT_PORT = 5000
209 def __init__(self, dts, log, loop, vnffgmgr, nsr, nsr_name, vnffgd_msg, sdn_account_name,cloud_account_name):
210
211 self._dts = dts
212 self._log = log
213 self._loop = loop
214 self._vnffgmgr = vnffgmgr
215 self._nsr = nsr
216 self._nsr_name = nsr_name
217 self._vnffgd_msg = vnffgd_msg
218 self._cloud_account_name = cloud_account_name
219 if sdn_account_name is None:
220 self._sdn_account_name = ''
221 else:
222 self._sdn_account_name = sdn_account_name
223
224 self._vnffgr_id = str(uuid.uuid4())
225 self._vnffgr_rsp_id = list()
226 self._vnffgr_state = VnffgRecordState.INIT
227
228 @property
229 def id(self):
230 """ VNFFGR id """
231 return self._vnffgr_id
232
233 @property
234 def state(self):
235 """ state of this VNF """
236 return self._vnffgr_state
237
238 def fetch_vnffgr(self):
239 """
240 Get VNFFGR message to be published
241 """
242
243 if self._vnffgr_state == VnffgRecordState.INIT:
244 vnffgr_dict = {"id": self._vnffgr_id,
245 "vnffgd_id_ref": self._vnffgd_msg.id,
246 "vnffgd_name_ref": self._vnffgd_msg.name,
247 "sdn_account": self._sdn_account_name,
248 "operational_status": 'init',
249 }
250 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
251 elif self._vnffgr_state == VnffgRecordState.TERMINATED:
252 vnffgr_dict = {"id": self._vnffgr_id,
253 "vnffgd_id_ref": self._vnffgd_msg.id,
254 "vnffgd_name_ref": self._vnffgd_msg.name,
255 "sdn_account": self._sdn_account_name,
256 "operational_status": 'terminated',
257 }
258 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
259 else:
260 try:
261 vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id)
262 except Exception:
263 self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id)
264 self._vnffgr_state = VnffgRecordState.FAILED
265 vnffgr_dict = {"id": self._vnffgr_id,
266 "vnffgd_id_ref": self._vnffgd_msg.id,
267 "vnffgd_name_ref": self._vnffgd_msg.name,
268 "sdn_account": self._sdn_account_name,
269 "operational_status": 'failed',
270 }
271 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
272
273 return vnffgr
274
275 @asyncio.coroutine
276 def vnffgr_create_msg(self):
277 """ Virtual Link Record message for Creating VLR in VNS """
278 vnffgr_dict = {"id": self._vnffgr_id,
279 "vnffgd_id_ref": self._vnffgd_msg.id,
280 "vnffgd_name_ref": self._vnffgd_msg.name,
281 "sdn_account": self._sdn_account_name,
282 "cloud_account": self._cloud_account_name,
283 }
284 vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict)
285 for rsp in self._vnffgd_msg.rsp:
286 vnffgr_rsp = vnffgr.rsp.add()
287 vnffgr_rsp.id = str(uuid.uuid4())
288 vnffgr_rsp.name = self._nsr.name + '.' + rsp.name
289 self._vnffgr_rsp_id.append(vnffgr_rsp.id)
290 vnffgr_rsp.vnffgd_rsp_id_ref = rsp.id
291 vnffgr_rsp.vnffgd_rsp_name_ref = rsp.name
292 for rsp_cp_ref in rsp.vnfd_connection_point_ref:
293 vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref]
294 self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd)
295 if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'):
296 self._log.debug("Service Function Type for VNFD ID %s is %s",
297 rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type)
298 else:
299 self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",
300 rsp_cp_ref.vnfd_id_ref)
301 continue
302
303 vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add()
304 vnfr_cp_ref.member_vnf_index_ref = rsp_cp_ref.member_vnf_index_ref
305 vnfr_cp_ref.hop_number = rsp_cp_ref.order
306 vnfr_cp_ref.vnfd_id_ref =rsp_cp_ref.vnfd_id_ref
307 vnfr_cp_ref.service_function_type = vnfd[0].service_function_type
308 for nsr_vnfr in self._nsr.vnfrs.values():
309 if (nsr_vnfr.vnfd.id == vnfr_cp_ref.vnfd_id_ref and
310 nsr_vnfr.member_vnf_index == vnfr_cp_ref.member_vnf_index_ref):
311 vnfr_cp_ref.vnfr_id_ref = nsr_vnfr.id
312 vnfr_cp_ref.vnfr_name_ref = nsr_vnfr.name
313 vnfr_cp_ref.vnfr_connection_point_ref = rsp_cp_ref.vnfd_connection_point_ref
314
315 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
316 self._log.debug(" Received VNFR is %s", vnfr)
317 while vnfr.operational_status != 'running':
318 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
319 if vnfr.operational_status == 'failed':
320 self._log.error("Fetching VNFR for %s failed", vnfr.id)
321 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
322 (self.id, vnfr.id))
323 yield from asyncio.sleep(2, loop=self._loop)
324 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
325 self._log.debug("Received VNFR is %s", vnfr)
326
327 vnfr_cp_ref.connection_point_params.mgmt_address = vnfr.mgmt_interface.ip_address
328 for cp in vnfr.connection_point:
329 if cp.name == vnfr_cp_ref.vnfr_connection_point_ref:
330 vnfr_cp_ref.connection_point_params.port_id = cp.connection_point_id
331 vnfr_cp_ref.connection_point_params.name = self._nsr.name + '.' + cp.name
332 for vdu in vnfr.vdur:
333 for intf in vdu.interface:
334 if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnfr_cp_ref.vnfr_connection_point_ref:
335 vnfr_cp_ref.connection_point_params.vm_id = vdu.vim_id
336 self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id,
337 vnfr_cp_ref.connection_point_params.vm_id)
338 break
339
340 vnfr_cp_ref.connection_point_params.address = cp.ip_address
341 vnfr_cp_ref.connection_point_params.port = VnffgRecord.SFF_DP_PORT
342
343 for vnffgd_classifier in self._vnffgd_msg.classifier:
344 _rsp = [rsp for rsp in vnffgr.rsp if rsp.vnffgd_rsp_id_ref == vnffgd_classifier.rsp_id_ref]
345 if len(_rsp) > 0:
346 rsp_id_ref = _rsp[0].id
347 rsp_name = _rsp[0].name
348 else:
349 self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",
350 vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id)
351 continue
352 vnffgr_classifier = vnffgr.classifier.add()
353 vnffgr_classifier.id = vnffgd_classifier.id
354 vnffgr_classifier.name = self._nsr.name + '.' + vnffgd_classifier.name
355 _rsp[0].classifier_name = vnffgr_classifier.name
356 vnffgr_classifier.rsp_id_ref = rsp_id_ref
357 vnffgr_classifier.rsp_name = rsp_name
358 for nsr_vnfr in self._nsr.vnfrs.values():
359 if (nsr_vnfr.vnfd.id == vnffgd_classifier.vnfd_id_ref and
360 nsr_vnfr.member_vnf_index == vnffgd_classifier.member_vnf_index_ref):
361 vnffgr_classifier.vnfr_id_ref = nsr_vnfr.id
362 vnffgr_classifier.vnfr_name_ref = nsr_vnfr.name
363 vnffgr_classifier.vnfr_connection_point_ref = vnffgd_classifier.vnfd_connection_point_ref
364
365 if nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER':
366 vnffgr_classifier.sff_name = nsr_vnfr.name
367
368 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
369 self._log.debug(" Received VNFR is %s", vnfr)
370 while vnfr.operational_status != 'running':
371 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
372 if vnfr.operational_status == 'failed':
373 self._log.error("Fetching VNFR for %s failed", vnfr.id)
374 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" %
375 (self.id, vnfr.id))
376 yield from asyncio.sleep(2, loop=self._loop)
377 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
378 self._log.debug("Received VNFR is %s", vnfr)
379
380 for cp in vnfr.connection_point:
381 if cp.name == vnffgr_classifier.vnfr_connection_point_ref:
382 vnffgr_classifier.port_id = cp.connection_point_id
383 vnffgr_classifier.ip_address = cp.ip_address
384 for vdu in vnfr.vdur:
385 for intf in vdu.interface:
386 if intf.type_yang == "EXTERNAL" and intf.external_connection_point_ref == vnffgr_classifier.vnfr_connection_point_ref:
387 vnffgr_classifier.vm_id = vdu.vim_id
388 self._log.debug("VIM ID for CP %s in VNFR %s is %s",
389 cp.name,nsr_vnfr.id,
390 vnfr_cp_ref.connection_point_params.vm_id)
391 break
392
393 self._log.info("VNFFGR msg to be sent is %s", vnffgr)
394 return vnffgr
395
396 @asyncio.coroutine
397 def vnffgr_nsr_sff_list(self):
398 """ SFF List for VNFR """
399 sff_list = {}
400 sf_list = [nsr_vnfr.name for nsr_vnfr in self._nsr.vnfrs.values() if nsr_vnfr.vnfd.service_function_chain == 'SF']
401
402 for nsr_vnfr in self._nsr.vnfrs.values():
403 if (nsr_vnfr.vnfd.service_function_chain == 'CLASSIFIER' or nsr_vnfr.vnfd.service_function_chain == 'SFF'):
404 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
405 self._log.debug(" Received VNFR is %s", vnfr)
406 while vnfr.operational_status != 'running':
407 self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status)
408 if vnfr.operational_status == 'failed':
409 self._log.error("Fetching VNFR for %s failed", vnfr.id)
410 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id))
411 yield from asyncio.sleep(2, loop=self._loop)
412 vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath)
413 self._log.debug("Received VNFR is %s", vnfr)
414
415 sff = RwsdnalYang.YangData_RwProject_Project_Vnffgs_VnffgChain_Sff()
416 sff_list[nsr_vnfr.vnfd.id] = sff
417 sff.name = nsr_vnfr.name
418 sff.function_type = nsr_vnfr.vnfd.service_function_chain
419
420 sff.mgmt_address = vnfr.mgmt_interface.ip_address
421 sff.mgmt_port = VnffgRecord.SFF_MGMT_PORT
422 for cp in vnfr.connection_point:
423 sff_dp = sff.dp_endpoints.add()
424 sff_dp.name = self._nsr.name + '.' + cp.name
425 sff_dp.address = cp.ip_address
426 sff_dp.port = VnffgRecord.SFF_DP_PORT
427 if nsr_vnfr.vnfd.service_function_chain == 'SFF':
428 for sf_name in sf_list:
429 _sf = sff.vnfr_list.add()
430 _sf.vnfr_name = sf_name
431
432 return sff_list
433
434 @asyncio.coroutine
435 def instantiate(self):
436 """ Instantiate this VNFFG """
437
438 self._log.info("Instaniating VNFFGR with vnffgd %s",
439 self._vnffgd_msg)
440
441
442 vnffgr_request = yield from self.vnffgr_create_msg()
443 vnffg_sff_list = yield from self.vnffgr_nsr_sff_list()
444
445 try:
446 vnffgr = self._vnffgmgr.create_vnffgr(vnffgr_request,self._vnffgd_msg.classifier,vnffg_sff_list)
447 except Exception as e:
448 self._log.exception("VNFFG instantiation failed: %s", str(e))
449 self._vnffgr_state = VnffgRecordState.FAILED
450 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self.id, vnffgr_request.id))
451
452 self._vnffgr_state = VnffgRecordState.INSTANTIATION_PENDING
453
454 self._log.info("Instantiated VNFFGR :%s", vnffgr)
455 self._vnffgr_state = VnffgRecordState.ACTIVE
456
457 self._log.info("Invoking update_state to update NSR state for NSR ID: %s", self._nsr.id)
458 yield from self._nsr.update_state()
459
460 def vnffgr_in_vnffgrm(self):
461 """ Is there a VNFR record in VNFM """
462 if (self._vnffgr_state == VnffgRecordState.ACTIVE or
463 self._vnffgr_state == VnffgRecordState.INSTANTIATION_PENDING or
464 self._vnffgr_state == VnffgRecordState.FAILED):
465 return True
466
467 return False
468
469 @asyncio.coroutine
470 def terminate(self):
471 """ Terminate this VNFFGR """
472 if not self.vnffgr_in_vnffgrm():
473 self._log.error("Ignoring terminate request for id %s in state %s",
474 self.id, self._vnffgr_state)
475 return
476
477 self._log.info("Terminating VNFFGR id:%s", self.id)
478 self._vnffgr_state = VnffgRecordState.TERMINATE_PENDING
479
480 self._vnffgmgr.terminate_vnffgr(self._vnffgr_id)
481
482 self._vnffgr_state = VnffgRecordState.TERMINATED
483 self._log.debug("Terminated VNFFGR id:%s", self.id)
484
485
486 class VirtualLinkRecord(object):
487 """ Virtual Link Records class"""
488 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
489 @staticmethod
490 @asyncio.coroutine
491 def create_record(dts, log, loop, project, nsr_name, vld_msg,
492 datacenter, ip_profile, nsr_id, restart_mode=False):
493 """Creates a new VLR object based on the given data.
494
495 If restart mode is enabled, then we look for existing records in the
496 DTS and create a VLR records using the exiting data(ID)
497
498 Returns:
499 VirtualLinkRecord
500 """
501 vlr_obj = VirtualLinkRecord(
502 dts,
503 log,
504 loop,
505 project,
506 nsr_name,
507 vld_msg,
508 datacenter,
509 ip_profile,
510 nsr_id,
511 )
512
513 if restart_mode:
514 res_iter = yield from dts.query_read(
515 project.add_project("D,/vlr:vlr-catalog/vlr:vlr"),
516 rwdts.XactFlag.MERGE)
517
518 for fut in res_iter:
519 response = yield from fut
520 vlr = response.result
521
522 # Check if the record is already present, if so use the ID of
523 # the existing record. Since the name of the record is uniquely
524 # formed we can use it as a search key!
525 if vlr.name == vlr_obj.name:
526 vlr_obj.reset_id(vlr.id)
527 break
528
529 return vlr_obj
530
531 def __init__(self, dts, log, loop, project, nsr_name, vld_msg,
532 datacenter, ip_profile, nsr_id):
533 self._dts = dts
534 self._log = log
535 self._loop = loop
536 self._project = project
537 self._nsr_name = nsr_name
538 self._vld_msg = vld_msg
539 self._datacenter_name = datacenter
540 self._assigned_subnet = None
541 self._nsr_id = nsr_id
542 self._ip_profile = ip_profile
543 self._vlr_id = str(uuid.uuid4())
544 self._state = VlRecordState.INIT
545 self._prev_state = None
546 self._create_time = int(time.time())
547 self.state_failed_reason = None
548
549 @property
550 def xpath(self):
551 """ path for this object """
552 return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id={}]".
553 format(quoted_key(self._vlr_id)))
554
555 @property
556 def id(self):
557 """ VLR id """
558 return self._vlr_id
559
560 @property
561 def nsr_name(self):
562 """ Get NSR name for this VL """
563 return self.nsr_name
564
565 @property
566 def vld_msg(self):
567 """ Virtual Link Desciptor """
568 return self._vld_msg
569
570 @property
571 def assigned_subnet(self):
572 """ Subnet assigned to this VL"""
573 return self._assigned_subnet
574
575 @property
576 def name(self):
577 """
578 Get the name for this VLR.
579 VLR name is "nsr name:VLD name"
580 """
581 if self.vld_msg.vim_network_name:
582 return self.vld_msg.vim_network_name
583 elif self.vld_msg.name == "multisite":
584 # This is a temporary hack to identify manually provisioned inter-site network
585 return self.vld_msg.name
586 else:
587 return self._project.name + "." +self._nsr_name + "." + self.vld_msg.name
588
589 @property
590 def datacenter_name(self):
591 """ Datacenter that this VLR should be created in """
592 return self._datacenter_name
593
594 @staticmethod
595 def vlr_xpath(vlr):
596 """ Get the VLR path from VLR """
597 return (VirtualLinkRecord.XPATH + "[vlr:id={}]").format(quoted_key(vlr.id))
598
599 @property
600 def state(self):
601 """ VLR state """
602 return self._state
603
604 @state.setter
605 def state(self, value):
606 """ VLR set state """
607 self._state = value
608
609 @property
610 def prev_state(self):
611 """ VLR previous state """
612 return self._prev_state
613
614 @prev_state.setter
615 def prev_state(self, value):
616 """ VLR set previous state """
617 self._prev_state = value
618
619 @property
620 def vlr_msg(self):
621 """ Virtual Link Record message for Creating VLR in VNS """
622 vld_fields = ["short_name",
623 "vendor",
624 "description",
625 "version",
626 "type_yang",
627 "vim_network_name",
628 "provider_network"]
629
630 vld_copy_dict = {k: v for k, v in self.vld_msg.as_dict().items()
631 if k in vld_fields}
632
633 vlr_dict = {"id": self._vlr_id,
634 "nsr_id_ref": self._nsr_id,
635 "vld_ref": self.vld_msg.id,
636 "name": self.name,
637 "create_time": self._create_time,
638 "datacenter": self._datacenter_name,
639 }
640
641 if self._ip_profile and self._ip_profile.has_field('ip_profile_params'):
642 vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict()
643
644
645 vlr_dict.update(vld_copy_dict)
646 vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict)
647
648 if self.vld_msg.has_field('virtual_connection_points'):
649 for cp in self.vld_msg.virtual_connection_points:
650 vcp = vlr.virtual_connection_points.add()
651 vcp.from_dict(cp.as_dict())
652 return vlr
653
654 def reset_id(self, vlr_id):
655 self._vlr_id = vlr_id
656
657 def create_nsr_vlr_msg(self, vnfrs):
658 """ The VLR message"""
659 nsr_vlr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr()
660 nsr_vlr.vlr_ref = self._vlr_id
661 nsr_vlr.assigned_subnet = self.assigned_subnet
662 nsr_vlr.datacenter = self._datacenter_name
663
664 for conn in self.vld_msg.vnfd_connection_point_ref:
665 for vnfr in vnfrs:
666 if (vnfr.vnfd.id == conn.vnfd_id_ref and
667 vnfr.member_vnf_index == conn.member_vnf_index_ref and
668 self._datacenter_name == vnfr._datacenter_name):
669 cp_entry = nsr_vlr.vnfr_connection_point_ref.add()
670 cp_entry.vnfr_id = vnfr.id
671 cp_entry.connection_point = conn.vnfd_connection_point_ref
672
673 return nsr_vlr
674
675 @asyncio.coroutine
676 def instantiate(self):
677 """ Instantiate this VL """
678 self._log.debug("Instaniating VLR key %s, vld %s",
679 self.xpath, self._vld_msg)
680 vlr = None
681 self._state = VlRecordState.INSTANTIATION_PENDING
682 self._log.debug("Executing VL create path:%s msg:%s",
683 self.xpath, self.vlr_msg)
684
685 with self._dts.transaction(flags=0) as xact:
686 block = xact.block_create()
687 block.add_query_create(self.xpath, self.vlr_msg)
688 self._log.debug("Executing VL create path:%s msg:%s",
689 self.xpath, self.vlr_msg)
690 res_iter = yield from block.execute(now=True)
691 for ent in res_iter:
692 res = yield from ent
693 vlr = res.result
694
695 if vlr is None:
696 self._state = VlRecordState.FAILED
697 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self.id)
698
699 if vlr.operational_status == 'failed':
700 self._log.debug("NS Id:%s VL creation failed for vlr id %s", self.id, vlr.id)
701 self._state = VlRecordState.FAILED
702 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr.id, vlr.operational_status_details))
703
704 self._log.info("Instantiated VL with xpath %s and vlr:%s",
705 self.xpath, vlr)
706 self._assigned_subnet = vlr.assigned_subnet
707
708 def vlr_in_vns(self):
709 """ Is there a VLR record in VNS """
710 if (self._state == VlRecordState.ACTIVE or
711 self._state == VlRecordState.INSTANTIATION_PENDING or
712 self._state == VlRecordState.TERMINATE_PENDING or
713 self._state == VlRecordState.FAILED):
714 return True
715
716 return False
717
718 @asyncio.coroutine
719 def terminate(self):
720 """ Terminate this VL """
721 if not self.vlr_in_vns():
722 self._log.debug("Ignoring terminate request for id %s in state %s",
723 self.id, self._state)
724 return
725
726 self._log.debug("Terminating VL id:%s", self.id)
727 self._state = VlRecordState.TERMINATE_PENDING
728
729 with self._dts.transaction(flags=0) as xact:
730 block = xact.block_create()
731 block.add_query_delete(self.xpath)
732 yield from block.execute(flags=0, now=True)
733
734 self._state = VlRecordState.TERMINATED
735 self._log.debug("Terminated VL id:%s", self.id)
736
737 def set_state_from_op_status(self, operational_status):
738 """ Set the state of this VL based on operational_status"""
739
740 self._log.debug("set_state_from_op_status called for vlr id %s with value %s", self.id, operational_status)
741 if operational_status == 'running':
742 self._state = VlRecordState.ACTIVE
743 elif operational_status == 'failed':
744 self._state = VlRecordState.FAILED
745 elif operational_status == 'vl_alloc_pending':
746 self._state = VlRecordState.INSTANTIATION_PENDING
747 else:
748 raise VirtualLinkRecordError("Unknown operational_status %s" % (operational_status))
749
750 class VnfRecordState(Enum):
751 """ Vnf Record State """
752 INIT = 101
753 INSTANTIATION_PENDING = 102
754 ACTIVE = 103
755 TERMINATE_PENDING = 104
756 TERMINATED = 105
757 FAILED = 106
758
759
760 class VirtualNetworkFunctionRecord(object):
761 """ Virtual Network Function Record class"""
762 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
763
764 @staticmethod
765 @asyncio.coroutine
766 def create_record(dts, log, loop, project, vnfd, nsr_config, const_vnfd_msg, nsd_id, nsr_name,
767 datacenter_name, nsr_id, group_name, group_instance_id,
768 placement_groups, cloud_config, restart_mode=False):
769 """Creates a new VNFR object based on the given data.
770
771 If restart mode is enabled, then we look for existing records in the
772 DTS and create a VNFR records using the exiting data(ID)
773
774 Returns:
775 VirtualNetworkFunctionRecord
776 """
777
778 vnfr_obj = VirtualNetworkFunctionRecord(
779 dts,
780 log,
781 loop,
782 project,
783 vnfd,
784 nsr_config,
785 const_vnfd_msg,
786 nsd_id,
787 nsr_name,
788 datacenter_name,
789 nsr_id,
790 group_name,
791 group_instance_id,
792 placement_groups,
793 cloud_config,
794 restart_mode=restart_mode)
795
796 if restart_mode:
797 res_iter = yield from dts.query_read(
798 project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"),
799 rwdts.XactFlag.MERGE)
800
801 for fut in res_iter:
802 response = yield from fut
803 vnfr = response.result
804
805 if vnfr.name == vnfr_obj.name:
806 vnfr_obj.reset_id(vnfr.id)
807 break
808
809 return vnfr_obj
810
811 def __init__(self,
812 dts,
813 log,
814 loop,
815 project,
816 vnfd,
817 nsr_config,
818 const_vnfd_msg,
819 nsd_id,
820 nsr_name,
821 datacenter_name,
822 nsr_id,
823 group_name=None,
824 group_instance_id=None,
825 placement_groups = [],
826 cloud_config = None,
827 restart_mode = False):
828 self._dts = dts
829 self._log = log
830 self._loop = loop
831 self._project = project
832 self._vnfd = vnfd
833 self._nsr_config = nsr_config
834 self._const_vnfd_msg = const_vnfd_msg
835 self._nsd_id = nsd_id
836 self._nsr_name = nsr_name
837 self._nsr_id = nsr_id
838 self._datacenter_name = datacenter_name
839 self._group_name = group_name
840 self._group_instance_id = group_instance_id
841 self._placement_groups = placement_groups
842 self._cloud_config = cloud_config
843 self.restart_mode = restart_mode
844
845 self._config_status = NsrYang.ConfigStates.INIT
846 self._create_time = int(time.time())
847
848 self._prev_state = VnfRecordState.INIT
849 self._state = VnfRecordState.INIT
850 self._state_failed_reason = None
851
852 self._active_vdus = 0
853
854 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
855 self.configure()
856
857 self._vnfr_id = str(uuid.uuid4())
858 self._name = None
859
860 self.substitute_vnf_input_parameters = VnfInputParameterSubstitution(self._log,
861 self._const_vnfd_msg,
862 self._project)
863 self._vnfr_msg = self.create_vnfr_msg()
864 self._log.debug("Set VNFR {} config type to {}".
865 format(self.name, self.config_type))
866
867
868 if group_name is None and group_instance_id is not None:
869 raise ValueError("Group instance id must not be provided with an empty group name")
870
871 @property
872 def id(self):
873 """ VNFR id """
874 return self._vnfr_id
875
876 @property
877 def xpath(self):
878 """ VNFR xpath """
879 return self._project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]"
880 .format(quoted_key(self.id)))
881
882 @property
883 def vnfr_msg(self):
884 """ VNFR message """
885 return self._vnfr_msg
886
887 @property
888 def const_vnfr_msg(self):
889 """ VNFR message """
890 return RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef(
891 vnfr_id=self.id, datacenter=self._datacenter_name)
892
893 @property
894 def vnfd(self):
895 """ vnfd """
896 return self._vnfd
897
898 @property
899 def datacenter_name(self):
900 """ Datacenter that this VNF should be created in """
901 return self._datacenter_name
902
903
904 @property
905 def active(self):
906 """ Is this VNF actve """
907 return True if self._state == VnfRecordState.ACTIVE else False
908
909 @property
910 def state(self):
911 """ state of this VNF """
912 return self._state
913
914 @property
915 def state_failed_reason(self):
916 """ Error message in case this VNF is in failed state """
917 return self._state_failed_reason
918
919 @property
920 def member_vnf_index(self):
921 """ Member VNF index """
922 return self._const_vnfd_msg.member_vnf_index
923
924 @property
925 def nsr_name(self):
926 """ NSR name"""
927 return self._nsr_name
928
929 @property
930 def name(self):
931 """ Name of this VNFR """
932 if self._name is not None:
933 return self._name
934
935 name_tags = [self._project.name, self._nsr_name]
936
937 if self._group_name is not None:
938 name_tags.append(self._group_name)
939
940 if self._group_instance_id is not None:
941 name_tags.append(str(self._group_instance_id))
942
943 name_tags.extend([self.vnfd.name, str(self.member_vnf_index)])
944
945 self._name = "__".join(name_tags)
946
947 return self._name
948
949 @staticmethod
950 def vnfr_xpath(vnfr):
951 """ Get the VNFR path from VNFR """
952 return (VirtualNetworkFunctionRecord.XPATH +
953 "[vnfr:id={}]").format(quoted_key(vnfr.id))
954
955 @property
956 def config_type(self):
957 cfg_types = ['netconf', 'juju', 'script']
958 for method in cfg_types:
959 if self._vnfd.vnf_configuration.has_field(method):
960 return method
961 return 'none'
962
963 @property
964 def config_status(self):
965 """Return the config status as YANG ENUM string"""
966 self._log.debug("Map VNFR {} config status {} ({})".
967 format(self.name, self._config_status, self.config_type))
968 if self.config_type == 'none':
969 return 'config_not_needed'
970 elif self._config_status == NsrYang.ConfigStates.CONFIGURED:
971 return 'configured'
972 elif self._config_status == NsrYang.ConfigStates.FAILED:
973 return 'failed'
974
975 return 'configuring'
976
977 def set_state(self, state):
978 """ set the state of this object """
979 self._prev_state = self._state
980 self._state = state
981
982 def reset_id(self, vnfr_id):
983 self._vnfr_id = vnfr_id
984 self._vnfr_msg = self.create_vnfr_msg()
985
986 def configure(self):
987 self.config_store.merge_vnfd_config(
988 self._project.name,
989 self._nsd_id,
990 self._vnfd,
991 self.member_vnf_index,
992 )
993
994 def create_vnfr_msg(self):
995 """ VNFR message for this VNFR """
996 vnfd_fields = [
997 "short_name",
998 "vendor",
999 "description",
1000 "version",
1001 "type_yang",
1002 ]
1003 vnfd_copy_dict = {k: v for k, v in self._vnfd.as_dict().items() if k in vnfd_fields}
1004 vnfr_dict = {
1005 "id": self.id,
1006 "nsr_id_ref": self._nsr_id,
1007 "name": self.name,
1008 "datacenter": self._datacenter_name,
1009 "config_status": self.config_status
1010 }
1011 vnfr_dict.update(vnfd_copy_dict)
1012
1013 vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict)
1014 vnfr.vnfd = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd. \
1015 from_dict(self.vnfd.as_dict())
1016 vnfr.member_vnf_index_ref = self.member_vnf_index
1017 vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict())
1018
1019 if self._vnfd.mgmt_interface.has_field("port"):
1020 vnfr.mgmt_interface.port = self._vnfd.mgmt_interface.port
1021
1022 for group_info in self._placement_groups:
1023 group = vnfr.placement_groups_info.add()
1024 group.from_dict(group_info.as_dict())
1025
1026 if self._cloud_config and len(self._cloud_config.as_dict()):
1027 self._log.debug("Cloud config during vnfr create is {}".format(self._cloud_config))
1028 vnfr.cloud_config = self._cloud_config
1029
1030 # UI expects the monitoring param field to exist
1031 vnfr.monitoring_param = []
1032
1033 self._log.debug("Get vnfr_msg for VNFR {} : {}".format(self.name, vnfr))
1034
1035 if self.restart_mode:
1036 vnfr.operational_status = 'init'
1037 else:
1038 # Set Operational Status as pre-init for Input Param Substitution
1039 vnfr.operational_status = 'pre_init'
1040
1041 return vnfr
1042
1043 @asyncio.coroutine
1044 def update_vnfm(self):
1045 self._log.debug("Send an update to VNFM for VNFR {} with {}".
1046 format(self.name, self.vnfr_msg))
1047 yield from self._dts.query_update(
1048 self.xpath,
1049 rwdts.XactFlag.REPLACE,
1050 self.vnfr_msg
1051 )
1052
1053 def get_config_status(self):
1054 """Return the config status as YANG ENUM"""
1055 return self._config_status
1056
1057 @asyncio.coroutine
1058 def set_config_status(self, status):
1059
1060 def status_to_string(status):
1061 status_dc = {
1062 NsrYang.ConfigStates.INIT : 'init',
1063 NsrYang.ConfigStates.CONFIGURING : 'configuring',
1064 NsrYang.ConfigStates.CONFIG_NOT_NEEDED : 'config_not_needed',
1065 NsrYang.ConfigStates.CONFIGURED : 'configured',
1066 NsrYang.ConfigStates.FAILED : 'failed',
1067 }
1068
1069 return status_dc[status]
1070
1071 self._log.debug("Update VNFR {} from {} ({}) to {}".
1072 format(self.name, self._config_status,
1073 self.config_type, status))
1074 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1075 self._log.warning("Updating already configured VNFR {}".
1076 format(self.name))
1077 return
1078
1079 if self._config_status != status:
1080 try:
1081 self._config_status = status
1082 # I don't think this is used. Original implementor can check.
1083 # Caused Exception, so corrected it by status_to_string
1084 # But not sure whats the use of this variable?
1085 self.vnfr_msg.config_status = status_to_string(status)
1086 except Exception as e:
1087 self._log.exception("Exception=%s", str(e))
1088
1089 self._log.debug("Updated VNFR {} status to {}".format(self.name, status))
1090
1091 if self._config_status != NsrYang.ConfigStates.INIT:
1092 try:
1093 # Publish only after VNFM has the VNFR created
1094 yield from self.update_vnfm()
1095 except Exception as e:
1096 self._log.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1097 format(status, self.name, e))
1098 self._log.exception(e)
1099
1100 def is_configured(self):
1101 if self.config_type == 'none':
1102 return True
1103
1104 if self._config_status == NsrYang.ConfigStates.CONFIGURED:
1105 return True
1106
1107 return False
1108
1109 @asyncio.coroutine
1110 def update_config_primitives(self, vnf_config, nsr):
1111 # Update only after we are configured
1112 if self._config_status == NsrYang.ConfigStates.INIT:
1113 return
1114
1115 if not vnf_config.as_dict():
1116 return
1117
1118 self._log.debug("Update VNFR {} config: {}".
1119 format(self.name, vnf_config.as_dict()))
1120
1121 # Update config primitive
1122 updated = False
1123 for prim in self._vnfd.vnf_configuration.config_primitive:
1124 for p in vnf_config.config_primitive:
1125 if prim.name == p.name:
1126 for param in prim.parameter:
1127 for pa in p.parameter:
1128 if pa.name == param.name:
1129 if pa.default_value and \
1130 (pa.default_value != param.default_value):
1131 param.default_value = pa.default_value
1132 param.read_only = pa.read_only
1133 updated = True
1134 break
1135 self._log.debug("Prim: {}".format(prim.as_dict()))
1136 break
1137
1138 if updated:
1139 self._log.debug("Updated VNFD {} config: {}".
1140 format(self._vnfd.name,
1141 self._vnfd.vnf_configuration))
1142 self._vnfr_msg = self.create_vnfr_msg()
1143
1144 try:
1145 yield from nsr.nsm_plugin.update_vnfr(self)
1146 except Exception as e:
1147 self._log.error("Exception updating VNFM with new config "
1148 "primitive for VNFR {}: {}".
1149 format(self.name, e))
1150 self._log.exception(e)
1151
1152 @asyncio.coroutine
1153 def instantiate(self, nsr):
1154 """ Instantiate this VNFR"""
1155
1156 self._log.debug("Instaniating VNFR key %s, vnfd %s",
1157 self.xpath, self._vnfd)
1158
1159 self._log.debug("Create VNF with xpath %s and vnfr %s",
1160 self.xpath, self.vnfr_msg)
1161
1162 self.set_state(VnfRecordState.INSTANTIATION_PENDING)
1163
1164 def find_vlr_for_cp(conn):
1165 """ Find VLR for the given connection point """
1166 for vlr_id, vlr in nsr.vlrs.items():
1167 for vnfd_cp in vlr.vld_msg.vnfd_connection_point_ref:
1168 if (vnfd_cp.vnfd_id_ref == self._vnfd.id and
1169 vnfd_cp.vnfd_connection_point_ref == conn.name and
1170 vnfd_cp.member_vnf_index_ref == self.member_vnf_index and
1171 vlr._datacenter_name == self._datacenter_name):
1172 self._log.debug("Found VLR for cp_name:%s and vnf-index:%d",
1173 conn.name, self.member_vnf_index)
1174 return vlr
1175 return None
1176
1177 # For every connection point in the VNFD fill in the identifier
1178 self._log.debug("Add connection point for VNF %s: %s",
1179 self.vnfr_msg.name, self._vnfd.connection_point)
1180 for conn_p in self._vnfd.connection_point:
1181 cpr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint()
1182 cpr.name = conn_p.name
1183 cpr.type_yang = conn_p.type_yang
1184 if conn_p.has_field('port_security_enabled'):
1185 cpr.port_security_enabled = conn_p.port_security_enabled
1186
1187 vlr_ref = find_vlr_for_cp(conn_p)
1188 if vlr_ref is None:
1189 msg = "Failed to find VLR for cp = %s" % conn_p.name
1190 self._log.debug("%s", msg)
1191 # raise VirtualNetworkFunctionRecordError(msg)
1192 continue
1193
1194 cpr.vlr_ref = vlr_ref.id
1195
1196 self.vnfr_msg.connection_point.append(cpr)
1197 self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1198 cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id)
1199
1200 self._log.debug("VNFR {} restart mode {}".
1201 format(self.vnfr_msg.id, self.restart_mode))
1202 if not self.restart_mode:
1203 # Checking for NS Terminate.
1204 if nsr._ns_terminate_received == False:
1205 # Create with pre-init operational state publishes the vnfr for substitution.
1206 yield from self._dts.query_create(self.xpath, 0, self.vnfr_msg)
1207 # Call to substitute VNF Input Parameter
1208 self.substitute_vnf_input_parameters(self.vnfr_msg, self._nsr_config)
1209 # Calling Update with pre-init operational data after Param substitution to instatntiate vnfr
1210 yield from self._dts.query_update(self.xpath, 0, self.vnfr_msg)
1211
1212 else:
1213 yield from self._dts.query_update(self.xpath,
1214 0,
1215 self.vnfr_msg)
1216
1217 self._log.info("Created VNF with xpath %s and vnfr %s",
1218 self.xpath, self.vnfr_msg)
1219
1220 @asyncio.coroutine
1221 def update_state(self, vnfr_msg):
1222 """ Update this VNFR"""
1223 if vnfr_msg.operational_status == "running":
1224 if self.vnfr_msg.operational_status != "running":
1225 yield from self.is_active()
1226 elif vnfr_msg.operational_status == "failed":
1227 yield from self.instantiation_failed(failed_reason=vnfr_msg.operational_status_details)
1228
1229 @asyncio.coroutine
1230 def is_active(self):
1231 """ This VNFR is active """
1232 self._log.debug("VNFR %s is active", self._vnfr_id)
1233 self.set_state(VnfRecordState.ACTIVE)
1234
1235 @asyncio.coroutine
1236 def instantiation_failed(self, failed_reason=None):
1237 """ This VNFR instantiation failed"""
1238 self._log.debug("VNFR %s instantiation failed", self._vnfr_id)
1239 self.set_state(VnfRecordState.FAILED)
1240 self._state_failed_reason = failed_reason
1241
1242 def vnfr_in_vnfm(self):
1243 """ Is there a VNFR record in VNFM """
1244 if (self._state == VnfRecordState.ACTIVE or
1245 self._state == VnfRecordState.INSTANTIATION_PENDING or
1246 self._state == VnfRecordState.FAILED):
1247 return True
1248
1249 return False
1250
1251 @asyncio.coroutine
1252 def terminate(self):
1253 """ Terminate this VNF """
1254 if not self.vnfr_in_vnfm():
1255 self._log.debug("Ignoring terminate request for id %s in state %s",
1256 self.id, self._state)
1257 return
1258
1259 self._log.debug("Terminating VNF id:%s", self.id)
1260 self.set_state(VnfRecordState.TERMINATE_PENDING)
1261 with self._dts.transaction(flags=0) as xact:
1262 block = xact.block_create()
1263 block.add_query_delete(self.xpath)
1264 yield from block.execute(flags=0)
1265 self.set_state(VnfRecordState.TERMINATED)
1266 self._log.debug("Terminated VNF id:%s", self.id)
1267
1268
1269 class NetworkServiceStatus(object):
1270 """ A class representing the Network service's status """
1271 MAX_EVENTS_RECORDED = 10
1272 """ Network service Status class"""
1273 def __init__(self, dts, log, loop):
1274 self._dts = dts
1275 self._log = log
1276 self._loop = loop
1277
1278 self._state = NetworkServiceRecordState.INIT
1279 self._events = deque([])
1280
1281 @asyncio.coroutine
1282 def create_notification(self, evt, evt_desc, evt_details):
1283 xp = "N,/rw-nsr:nsm-notification"
1284 notif = RwNsrYang.YangNotif_RwNsr_NsmNotification()
1285 notif.event = evt
1286 notif.description = evt_desc
1287 notif.details = evt_details if evt_details is not None else None
1288
1289 yield from self._dts.query_create(xp, rwdts.XactFlag.ADVISE, notif)
1290 self._log.info("Notification called by creating dts query: %s", notif)
1291
1292 def record_event(self, evt, evt_desc, evt_details):
1293 """ Record an event """
1294 self._log.debug("Recording event - evt %s, evt_descr %s len = %s",
1295 evt, evt_desc, len(self._events))
1296 if len(self._events) >= NetworkServiceStatus.MAX_EVENTS_RECORDED:
1297 self._events.popleft()
1298 self._events.append((int(time.time()), evt, evt_desc,
1299 evt_details if evt_details is not None else None))
1300
1301 self._loop.create_task(self.create_notification(evt,evt_desc,evt_details))
1302
1303 def set_state(self, state):
1304 """ set the state of this status object """
1305 self._state = state
1306
1307 def yang_str(self):
1308 """ Return the state as a yang enum string """
1309 state_to_str_map = {"INIT": "init",
1310 "VL_INIT_PHASE": "vl_init_phase",
1311 "VNF_INIT_PHASE": "vnf_init_phase",
1312 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1313 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1314 "RUNNING": "running",
1315 "SCALING_OUT": "scaling_out",
1316 "SCALING_IN": "scaling_in",
1317 "TERMINATE_RCVD": "terminate_rcvd",
1318 "TERMINATE": "terminate",
1319 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1320 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1321 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1322 "TERMINATED": "terminated",
1323 "FAILED": "failed",
1324 "VL_INSTANTIATE": "vl_instantiate",
1325 "VL_TERMINATE": "vl_terminate",
1326 }
1327 return state_to_str_map[self._state.name]
1328
1329 @property
1330 def state(self):
1331 """ State of this status object """
1332 return self._state
1333
1334 @property
1335 def msg(self):
1336 """ Network Service Record as a message"""
1337 event_list = []
1338 idx = 1
1339 for entry in self._events:
1340 event = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents()
1341 event.id = idx
1342 idx += 1
1343 event.timestamp, event.event, event.description, event.details = entry
1344 event_list.append(event)
1345 return event_list
1346
1347
1348 class NetworkServiceRecord(object):
1349 """ Network service record """
1350 XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr"
1351
1352 def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg,
1353 sdn_account_name, key_pairs, project, restart_mode=False,
1354 vlr_handler=None):
1355 self._dts = dts
1356 self._log = log
1357 self._loop = loop
1358 self._nsm = nsm
1359 self._nsr_cfg_msg = nsr_cfg_msg
1360 self._nsm_plugin = nsm_plugin
1361 self._sdn_account_name = sdn_account_name
1362 self._vlr_handler = vlr_handler
1363 self._project = project
1364
1365 self._nsd = None
1366 self._nsr_msg = None
1367 self._nsr_regh = None
1368 self._key_pairs = key_pairs
1369 self._ssh_key_file = None
1370 self._ssh_pub_key = None
1371 self._vlrs = {}
1372 self._vnfrs = {}
1373 self._vnfds = {}
1374 self._vnffgrs = {}
1375 self._param_pools = {}
1376 self._scaling_groups = {}
1377 self._create_time = int(time.time())
1378 self._op_status = NetworkServiceStatus(dts, log, loop)
1379 self._config_status = NsrYang.ConfigStates.CONFIGURING
1380 self._config_status_details = None
1381 self._job_id = 0
1382 self.restart_mode = restart_mode
1383 self.config_store = rift.mano.config_data.config.ConfigStore(self._log)
1384 self._debug_running = False
1385 self._is_active = False
1386 self._vl_phase_completed = False
1387 self._vnf_phase_completed = False
1388 self.instantiated = set()
1389
1390 # Used for orchestration_progress
1391 self._active_vms = 0
1392 self._active_networks = 0
1393
1394 # A flag to indicate if the NS has failed, currently it is recorded in
1395 # operational status, but at the time of termination this field is
1396 # over-written making it difficult to identify the failure.
1397 self._is_failed = False
1398
1399 # Initalise the state to init
1400 # The NSR moves through the following transitions
1401 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1402 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1403 # 3. VNFS_READY - READY when the NSR is published
1404
1405 self.set_state(NetworkServiceRecordState.INIT)
1406
1407 self.substitute_input_parameters = InputParameterSubstitution(self._log, self._project)
1408
1409 # Create an asyncio loop to know when the virtual links are ready
1410 self._vls_ready = asyncio.Event(loop=self._loop)
1411
1412 # This variable stores all the terminate events received per NS. This is then used to prevent any
1413 # further nsr non-terminate updates received in case of terminate being called bedore ns in in running state.
1414 self._ns_terminate_received = False
1415
1416 @property
1417 def nsm_plugin(self):
1418 """ NSM Plugin """
1419 return self._nsm_plugin
1420
1421 def set_state(self, state):
1422 """ Set state for this NSR"""
1423 # We are in init phase and is moving to the next state
1424 # The new state could be a FAILED state or VNF_INIIT_PHASE
1425 if self.state == NetworkServiceRecordState.VL_INIT_PHASE:
1426 self._vl_phase_completed = True
1427
1428 if self.state == NetworkServiceRecordState.VNF_INIT_PHASE:
1429 self._vnf_phase_completed = True
1430
1431 self._op_status.set_state(state)
1432
1433 self._nsm_plugin.set_state(self.id, state)
1434
1435 @property
1436 def id(self):
1437 """ Get id for this NSR"""
1438 return self._nsr_cfg_msg.id
1439
1440 @property
1441 def name(self):
1442 """ Name of this network service record """
1443 return self._nsr_cfg_msg.name
1444
1445 @property
1446 def _datacenter_name(self):
1447 if self._nsr_cfg_msg.has_field('datacenter'):
1448 return self._nsr_cfg_msg.datacenter
1449 return None
1450
1451 @property
1452 def state(self):
1453 """State of this NetworkServiceRecord"""
1454 return self._op_status.state
1455
1456 @property
1457 def active(self):
1458 """ Is this NSR active ?"""
1459 return True if self._op_status.state == NetworkServiceRecordState.RUNNING else False
1460
1461 @property
1462 def vlrs(self):
1463 """ VLRs associated with this NSR"""
1464 return self._vlrs
1465
1466 @property
1467 def vnfrs(self):
1468 """ VNFRs associated with this NSR"""
1469 return self._vnfrs
1470
1471 @property
1472 def vnffgrs(self):
1473 """ VNFFGRs associated with this NSR"""
1474 return self._vnffgrs
1475
1476 @property
1477 def scaling_groups(self):
1478 """ Scaling groups associated with this NSR """
1479 return self._scaling_groups
1480
1481 @property
1482 def param_pools(self):
1483 """ Parameter value pools associated with this NSR"""
1484 return self._param_pools
1485
1486 @property
1487 def nsr_cfg_msg(self):
1488 return self._nsr_cfg_msg
1489
1490 @nsr_cfg_msg.setter
1491 def nsr_cfg_msg(self, msg):
1492 self._nsr_cfg_msg = msg
1493
1494 @property
1495 def nsd_msg(self):
1496 """ NSD Protobuf for this NSR """
1497 if self._nsd is not None:
1498 return self._nsd
1499 self._nsd = self._nsr_cfg_msg.nsd
1500 return self._nsd
1501
1502 @property
1503 def nsd_id(self):
1504 """ NSD ID for this NSR """
1505 return self.nsd_msg.id
1506
1507 @property
1508 def job_id(self):
1509 ''' Get a new job id for config primitive'''
1510 self._job_id += 1
1511 return self._job_id
1512
1513 @property
1514 def config_status(self):
1515 """ Config status for NSR """
1516 return self._config_status
1517
1518 @property
1519 def nsm(self):
1520 """NS Manager"""
1521 return self._nsm
1522
1523 @property
1524 def is_failed(self):
1525 return self._is_failed
1526
1527 @property
1528 def public_key(self):
1529 return self._ssh_pub_key
1530
1531 @property
1532 def private_key(self):
1533 return self._ssh_key_file
1534
1535 def resolve_placement_group_cloud_construct(self, input_group):
1536 """
1537 Returns the cloud specific construct for placement group
1538 """
1539 copy_dict = ['name', 'requirement', 'strategy']
1540
1541 for group_info in self._nsr_cfg_msg.nsd_placement_group_maps:
1542 if group_info.placement_group_ref == input_group.name:
1543 group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1544 group_dict = {k:v for k,v in
1545 group_info.as_dict().items() if k != 'placement_group_ref'}
1546 for param in copy_dict:
1547 group_dict.update({param: getattr(input_group, param)})
1548 group.from_dict(group_dict)
1549 return group
1550 return None
1551
1552
1553 def __str__(self):
1554 return "NSR(name={}, nsd_id={}, data center={})".format(
1555 self.name, self.nsd_id, self._datacenter_name
1556 )
1557
1558 def _get_vnfd(self, vnfd_id, config_xact):
1559 """ Fetch vnfd msg for the passed vnfd id """
1560 return self._nsm.get_vnfd(vnfd_id, config_xact)
1561
1562 def _get_vnfd_datacenter(self, vnfd_member_index):
1563 """ Fetch datacenter for the passed vnfd id """
1564 if self._nsr_cfg_msg.vnf_datacenter_map:
1565 vim_accounts = [vnf.datacenter for vnf in self._nsr_cfg_msg.vnf_datacenter_map \
1566 if str(vnfd_member_index) == str(vnf.member_vnf_index_ref)]
1567 if vim_accounts and vim_accounts[0]:
1568 return vim_accounts[0]
1569 return self._datacenter_name
1570
1571 def _get_constituent_vnfd_msg(self, vnf_index):
1572 for const_vnfd in self.nsd_msg.constituent_vnfd:
1573 if const_vnfd.member_vnf_index == vnf_index:
1574 return const_vnfd
1575
1576 raise ValueError("Constituent VNF index %s not found" % vnf_index)
1577
1578 def record_event(self, evt, evt_desc, evt_details=None, state=None):
1579 """ Record an event """
1580 self._op_status.record_event(evt, evt_desc, evt_details)
1581 if state is not None:
1582 self.set_state(state)
1583
1584 def scaling_trigger_str(self, trigger):
1585 SCALING_TRIGGER_STRS = {
1586 NsdBaseYang.ScalingTrigger.PRE_SCALE_IN : 'pre-scale-in',
1587 NsdBaseYang.ScalingTrigger.POST_SCALE_IN : 'post-scale-in',
1588 NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT : 'pre-scale-out',
1589 NsdBaseYang.ScalingTrigger.POST_SCALE_OUT : 'post-scale-out',
1590 }
1591 try:
1592 return SCALING_TRIGGER_STRS[trigger]
1593 except Exception as e:
1594 self._log.error("Scaling trigger mapping error for {} : {}".
1595 format(trigger, e))
1596 self._log.exception(e)
1597 return "Unknown trigger"
1598
1599 def generate_ssh_key_pair(self, config_xact):
1600 '''Generate a ssh key pair if required'''
1601 if self._ssh_key_file:
1602 self._log.debug("Key pair already generated")
1603 return
1604
1605 gen_key = False
1606 for cv in self.nsd_msg.constituent_vnfd:
1607 vnfd = self._get_vnfd(cv.vnfd_id_ref, config_xact)
1608 if vnfd and vnfd.mgmt_interface.ssh_key:
1609 gen_key = True
1610 break
1611
1612 if not gen_key:
1613 return
1614
1615 try:
1616 key = ManoSshKey(self._log)
1617 path = tempfile.mkdtemp()
1618 key.write_to_disk(name=self.id, directory=path)
1619 self._ssh_key_file = "file://{}".format(key.private_key_file)
1620 self._ssh_pub_key = key.public_key
1621 except Exception as e:
1622 self._log.exception("Error generating ssh key for {}: {}".
1623 format(self.nsr_cfg_msg.name, e))
1624
1625 @asyncio.coroutine
1626 def instantiate_vls(self):
1627 """
1628 This function instantiates VLs for every VL in this Network Service
1629 """
1630 self._log.debug("Instantiating %d VLs in NSD id %s", len(self._vlrs),
1631 self.id)
1632 for vlr_id, vlr in self._vlrs.items():
1633 yield from self.nsm_plugin.instantiate_vl(self, vlr)
1634
1635 if not isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin):
1636 self._vls_ready.set()
1637
1638 # Wait for the VLs to be ready before yielding control out
1639 self._log.debug("Waitng for %d VLs in NSR id %s to be active",
1640 len(self._vlrs), self.id)
1641 if self._vlrs:
1642 self._log.debug("NSR id:%s, name:%s - Waiting for %d VLs to be ready",
1643 self.id, self.name, len(self._vlrs))
1644 yield from self._vls_ready.wait()
1645 else:
1646 self._log.debug("NSR id:%s, name:%s, No virtual links found",
1647 self.id, self.name)
1648 self._vls_ready.set()
1649
1650 self._log.info("All %d VLs in NSR id %s are active, start the VNFs",
1651 len(self._vlrs), self.id)
1652 @asyncio.coroutine
1653 def create(self, config_xact):
1654 """ Create this network service"""
1655 self._log.debug("Create NS {} for {}".format(self.name, self._project.name))
1656 # Create virtual links for all the external vnf
1657 # connection points in this NS
1658 yield from self.create_vls()
1659
1660 # Create VNFs in this network service
1661 yield from self.create_vnfs(config_xact)
1662
1663 # Create VNFFG for network service
1664 self.create_vnffgs()
1665
1666 # Create Scaling Groups for each scaling group in NSD
1667 self.create_scaling_groups()
1668
1669 # Create Parameter Pools
1670 self.create_param_pools()
1671
1672 @asyncio.coroutine
1673 def apply_scale_group_config_script(self, script, group, scale_instance, trigger, vnfrs=None):
1674 """ Apply config based on script for scale group """
1675 rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
1676
1677 @asyncio.coroutine
1678 def add_vnfrs_data(vnfrs_list):
1679 """ Add as a dict each of the VNFRs data """
1680 vnfrs_data = []
1681
1682 for vnfr in vnfrs_list:
1683 self._log.debug("Add VNFR {} data".format(vnfr))
1684 vnfr_data = dict()
1685 vnfr_data['name'] = vnfr.name
1686 if trigger in [NsdBaseYang.ScalingTrigger.PRE_SCALE_IN,
1687 NsdBaseYang.ScalingTrigger.POST_SCALE_OUT]:
1688 # Get VNF management and other IPs, etc
1689 opdata = yield from self.fetch_vnfr(vnfr.xpath)
1690 self._log.debug("VNFR {} op data: {}".format(vnfr.name, opdata))
1691 try:
1692 vnfr_data['rw_mgmt_ip'] = opdata.mgmt_interface.ip_address
1693 vnfr_data['rw_mgmt_port'] = opdata.mgmt_interface.port
1694 vnfr_data['member_vnf_index_ref'] = opdata.member_vnf_index_ref
1695 vnfr_data['vdur_data'] = []
1696 for vdur in opdata.vdur:
1697 vdur_data = dict()
1698 vdur_data['vm_name'] = vdur.name
1699 vdur_data['vm_mgmt_ip'] = vdur.vm_management_ip
1700 vnfr_data['vdur_data'].append(vdur_data)
1701 except Exception as e:
1702 self._log.error("Unable to get management IP for vnfr {}:{}".
1703 format(vnfr.name, e))
1704
1705 try:
1706 vnfr_data['connection_points'] = []
1707 for cp in opdata.connection_point:
1708 con_pt = dict()
1709 con_pt['name'] = cp.name
1710 con_pt['ip_address'] = cp.ip_address
1711 vnfr_data['connection_points'].append(con_pt)
1712 except Exception as e:
1713 self._log.error("Exception getting connections points for VNFR {}: {}".
1714 format(vnfr.name, e))
1715
1716 vnfrs_data.append(vnfr_data)
1717 self._log.debug("VNFRs data: {}".format(vnfrs_data))
1718
1719 return vnfrs_data
1720
1721 def add_nsr_data(nsr):
1722 nsr_data = dict()
1723 nsr_data['name'] = nsr.name
1724 return nsr_data
1725
1726 if script is None or len(script) == 0:
1727 self._log.error("Script not provided for scale group config: {}".format(group.name))
1728 return False
1729
1730 if script[0] == '/':
1731 path = script
1732 else:
1733 path = os.path.join(rift_var_root_dir,
1734 'launchpad/packages/nsd',
1735 self._project.name,
1736 self.nsd_id, 'scripts',
1737 script)
1738
1739 if not os.path.exists(path):
1740 self._log.error("Config failed for scale group {}: Script does not exist at {}".
1741 format(group.name, path))
1742 return False
1743
1744 # Build a YAML file with all parameters for the script to execute
1745 # The data consists of 5 sections
1746 # 1. Trigger
1747 # 2. Scale group config
1748 # 3. VNFRs in the scale group
1749 # 4. VNFRs outside scale group
1750 # 5. NSR data
1751 data = dict()
1752 data['trigger'] = group.trigger_map(trigger)
1753 data['config'] = group.group_msg.as_dict()
1754
1755 if vnfrs:
1756 data["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs)
1757 else:
1758 data["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance.vnfrs)
1759
1760 data["vnfrs_others"] = yield from add_vnfrs_data(self.vnfrs.values())
1761 data["nsr"] = add_nsr_data(self)
1762
1763 tmp_file = None
1764 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1765 tmp_file.write(yaml.dump(data, default_flow_style=True)
1766 .encode("UTF-8"))
1767
1768 self._log.debug("Creating a temp file: {} with input data: {}".
1769 format(tmp_file.name, data))
1770
1771 cmd = "{} {}".format(path, tmp_file.name)
1772 self._log.debug("Running the CMD: {}".format(cmd))
1773 proc = yield from asyncio.create_subprocess_shell(cmd, loop=self._loop)
1774 rc = yield from proc.wait()
1775 if rc:
1776 self._log.error("The script {} for scale group {} config returned: {}".
1777 format(script, group.name, rc))
1778 return False
1779
1780 # Success
1781 return True
1782
1783
1784 @asyncio.coroutine
1785 def apply_scaling_group_config(self, trigger, group, scale_instance, vnfrs=None):
1786 """ Apply the config for the scaling group based on trigger """
1787 if group is None or scale_instance is None:
1788 return False
1789
1790 @asyncio.coroutine
1791 def update_config_status(success=True, err_msg=None):
1792 """ This is ugly!!!
1793 We are trying to determine the scaling instance's config status
1794 as a collation of the config status associated with 4 different triggers
1795 """
1796 self._log.debug("Update %s scaling config status to %r : %s",
1797 scale_instance, success, err_msg)
1798 if (scale_instance.config_status == "failed"):
1799 # Do not update the config status if it is already in failed state
1800 return
1801
1802 if scale_instance.config_status == "configured":
1803 # Update only to failed state an already configured scale instance
1804 if not success:
1805 scale_instance.config_status = "failed"
1806 scale_instance.config_err_msg = err_msg
1807 yield from self.update_state()
1808 else:
1809 # We are in configuring state
1810 # Only after post scale out mark instance as configured
1811 if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT:
1812 if success:
1813 scale_instance.config_status = "configured"
1814 for vnfr in scale_instance.vnfrs:
1815 if vnfr.config_status == "configuring":
1816 vnfr.vnfr_msg.config_status = "configured"
1817 yield from vnfr.update_vnfm()
1818 else:
1819 scale_instance.config_status = "failed"
1820 scale_instance.config_err_msg = err_msg
1821
1822 yield from self.update_state()
1823 # Publish config state as update_state seems to care only operational status
1824 yield from self.publish()
1825
1826 config = group.trigger_config(trigger)
1827 if config is None:
1828 if trigger == NsdBaseYang.ScalingTrigger.POST_SCALE_OUT:
1829 self._log.debug("No config needed, update %s scaling config status to configured",
1830 scale_instance)
1831 scale_instance.config_status = "configured"
1832 return True
1833
1834 self._log.debug("Scaling group {} config: {}".format(group.name, config))
1835 if config.has_field("ns_service_primitive_name_ref"):
1836 config_name = config.ns_service_primitive_name_ref
1837 nsd_msg = self.nsd_msg
1838 config_primitive = None
1839 for ns_cfg_prim in nsd_msg.service_primitive:
1840 if ns_cfg_prim.name == config_name:
1841 config_primitive = ns_cfg_prim
1842 break
1843
1844 if config_primitive is None:
1845 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name, self.name))
1846
1847 self._log.debug("Scaling group {} config primitive: {}".format(group.name, config_primitive))
1848 if config_primitive.has_field("user_defined_script"):
1849 script_path = '/'.join(["launchpad/packages/nsd", self._project.name, nsd_msg.id, "scripts", config_primitive.user_defined_script])
1850 rc = yield from self.apply_scale_group_config_script(script_path,
1851 group, scale_instance, trigger, vnfrs)
1852 err_msg = None
1853 if not rc:
1854 err_msg = "Failed config for trigger {} using config script '{}'". \
1855 format(self.scaling_trigger_str(trigger),
1856 config_primitive.user_defined_script)
1857 yield from update_config_status(success=rc, err_msg=err_msg)
1858 return rc
1859 else:
1860 err_msg = "Failed config for trigger {} as config script is not specified". \
1861 format(self.scaling_trigger_str(trigger))
1862 yield from update_config_status(success=False, err_msg=err_msg)
1863 raise NotImplementedError("Only script based config support for scale group for now: {}".
1864 format(group.name))
1865 else:
1866 err_msg = "Failed config for trigger {} as config primitive is not specified".\
1867 format(self.scaling_trigger_str(trigger))
1868 yield from update_config_status(success=False, err_msg=err_msg)
1869 self._log.error("Config primitive not specified for config action in scale group %s" %
1870 (group.name))
1871 return False
1872
1873 def create_scaling_groups(self):
1874 """ This function creates a NSScalingGroup for every scaling
1875 group defined in he NSD"""
1876
1877 for scaling_group_msg in self.nsd_msg.scaling_group_descriptor:
1878 self._log.debug("Found scaling_group %s in nsr id %s",
1879 scaling_group_msg.name, self.id)
1880
1881 group_record = scale_group.ScalingGroup(
1882 self._log,
1883 scaling_group_msg
1884 )
1885
1886 self._scaling_groups[group_record.name] = group_record
1887
1888 @asyncio.coroutine
1889 def create_scale_group_instance(self, group_name, index, config_xact, is_default=False):
1890 group = self._scaling_groups[group_name]
1891 scale_instance = group.create_instance(index, is_default)
1892
1893 @asyncio.coroutine
1894 def create_vnfs():
1895 self._log.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1896 len(self.nsd_msg.constituent_vnfd), self.id, self)
1897
1898 vnfrs = []
1899 for vnf_index, count in group.vnf_index_count_map.items():
1900 const_vnfd_msg = self._get_constituent_vnfd_msg(vnf_index)
1901 vnfd_msg = self._get_vnfd(const_vnfd_msg.vnfd_id_ref, config_xact)
1902
1903 datacenter_name = self._get_vnfd_datacenter(const_vnfd_msg.member_vnf_index)
1904 if datacenter_name is None:
1905 datacenter_name = self._datacenter_name
1906 for _ in range(count):
1907 vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, datacenter_name, group_name, index)
1908 scale_instance.add_vnfr(vnfr)
1909 vnfrs.append(vnfr)
1910 return vnfrs
1911
1912 @asyncio.coroutine
1913 def instantiate_instance():
1914 self._log.debug("Creating %s VNFRS", scale_instance)
1915 vnfrs = yield from create_vnfs()
1916 yield from self.publish()
1917
1918 self._log.debug("Instantiating %s VNFRS for %s", len(vnfrs), scale_instance)
1919 scale_instance.operational_status = "vnf_init_phase"
1920 yield from self.update_state()
1921
1922 try:
1923 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_OUT,
1924 group, scale_instance, vnfrs)
1925 if not rc:
1926 self._log.error("Pre scale out config for scale group {} ({}) failed".
1927 format(group.name, index))
1928 scale_instance.operational_status = "failed"
1929 else:
1930 yield from self.instantiate_vnfs(vnfrs, scaleout=True)
1931
1932
1933 except Exception as e:
1934 self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1935 format(group.name, e))
1936 self._log.exception(e)
1937 scale_instance.operational_status = "failed"
1938
1939 yield from self.update_state()
1940
1941 yield from instantiate_instance()
1942
1943 @asyncio.coroutine
1944 def delete_scale_group_instance(self, group_name, index):
1945 group = self._scaling_groups[group_name]
1946 scale_instance = group.get_instance(index)
1947 if scale_instance.is_default:
1948 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1949
1950 scale_instance.operational_status = "terminate"
1951 yield from self.update_state()
1952
1953 @asyncio.coroutine
1954 def terminate_instance():
1955 self._log.debug("Terminating scaling instance %s VNFRS" % scale_instance)
1956 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.PRE_SCALE_IN,
1957 group, scale_instance)
1958 if not rc:
1959 self._log.error("Pre scale in config for scale group {} ({}) failed".
1960 format(group.name, index))
1961
1962 # Going ahead with terminate, even if there is an error in pre-scale-in config
1963 # as this could be result of scale out failure and we need to cleanup this group
1964 yield from self.terminate_vnfrs(scale_instance.vnfrs, scalein=True)
1965 group.delete_instance(index)
1966
1967 scale_instance.operational_status = "vnf_terminate_phase"
1968 yield from self.update_state()
1969
1970 yield from terminate_instance()
1971
1972 @asyncio.coroutine
1973 def _update_scale_group_instances_status(self):
1974 @asyncio.coroutine
1975 def post_scale_out_task(group, instance):
1976 # Apply post scale out config once all VNFRs are active
1977 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_OUT,
1978 group, instance)
1979 instance.operational_status = "running"
1980 if rc:
1981 self._log.debug("Scale out for group {} and instance {} succeeded".
1982 format(group.name, instance.instance_id))
1983 else:
1984 self._log.error("Post scale out config for scale group {} ({}) failed".
1985 format(group.name, instance.instance_id))
1986
1987 yield from self.update_state()
1988
1989 group_instances = {group: group.instances for group in self._scaling_groups.values()}
1990 for group, instances in group_instances.items():
1991 self._log.debug("Updating %s instance status", group)
1992 for instance in instances:
1993 instance_vnf_state_list = [vnfr.state for vnfr in instance.vnfrs]
1994 self._log.debug("Got vnfr instance states: %s", instance_vnf_state_list)
1995 if instance.operational_status == "vnf_init_phase":
1996 if all([state == VnfRecordState.ACTIVE for state in instance_vnf_state_list]):
1997 instance.operational_status = "running"
1998
1999 # Create a task for post scale out to allow us to sleep before attempting
2000 # to configure newly created VM's
2001 self._loop.create_task(post_scale_out_task(group, instance))
2002
2003 elif any([state == VnfRecordState.FAILED for state in instance_vnf_state_list]):
2004 self._log.debug("Scale out for group {} and instance {} failed".
2005 format(group.name, instance.instance_id))
2006 instance.operational_status = "failed"
2007
2008 elif instance.operational_status == "vnf_terminate_phase":
2009 if all([state == VnfRecordState.TERMINATED for state in instance_vnf_state_list]):
2010 instance.operational_status = "terminated"
2011 rc = yield from self.apply_scaling_group_config(NsdBaseYang.ScalingTrigger.POST_SCALE_IN,
2012 group, instance)
2013 if rc:
2014 self._log.debug("Scale in for group {} and instance {} succeeded".
2015 format(group.name, instance.instance_id))
2016 else:
2017 self._log.error("Post scale in config for scale group {} ({}) failed".
2018 format(group.name, instance.instance_id))
2019
2020 def create_vnffgs(self):
2021 """ This function creates VNFFGs for every VNFFG in the NSD
2022 associated with this NSR"""
2023
2024 for vnffgd in self.nsd_msg.vnffgd:
2025 self._log.debug("Found vnffgd %s in nsr id %s", vnffgd, self.id)
2026 vnffgr = VnffgRecord(self._dts,
2027 self._log,
2028 self._loop,
2029 self._nsm._vnffgmgr,
2030 self,
2031 self.name,
2032 vnffgd,
2033 self._sdn_account_name,
2034 self._datacenter_name
2035 )
2036 self._vnffgrs[vnffgr.id] = vnffgr
2037
2038 def resolve_vld_ip_profile(self, nsd_msg, vld):
2039 self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref)
2040 if not vld.has_field('ip_profile_ref'):
2041 return None
2042 profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref]
2043 return profile[0] if profile else None
2044
2045 @asyncio.coroutine
2046 def _create_vls(self, vld, datacenter):
2047 """Create a VLR in the cloud account specified using the given VLD
2048
2049 Args:
2050 vld : VLD yang obj
2051 datacenter : Cloud account name
2052
2053 Returns:
2054 VirtualLinkRecord
2055 """
2056 vlr = yield from VirtualLinkRecord.create_record(
2057 self._dts,
2058 self._log,
2059 self._loop,
2060 self._project,
2061 self.name,
2062 vld,
2063 datacenter,
2064 self.resolve_vld_ip_profile(self.nsd_msg, vld),
2065 self.id,
2066 restart_mode=self.restart_mode)
2067
2068 return vlr
2069
2070 def _extract_datacenters_for_vl(self, vld):
2071 """
2072 Extracts the list of cloud accounts from the NS Config obj
2073
2074 Rules:
2075 1. Cloud accounts based connection point (vnf_datacenter_map)
2076 Args:
2077 vld : VLD yang object
2078
2079 Returns:
2080 TYPE: Description
2081 """
2082 datacenter_list = []
2083
2084 if self._nsr_cfg_msg.vnf_datacenter_map:
2085 # Handle case where datacenter is None
2086 vnf_datacenter_map = {}
2087 for vnf in self._nsr_cfg_msg.vnf_datacenter_map:
2088 if vnf.datacenter is not None or vnf.datacenter is not None:
2089 vnf_datacenter_map[vnf.member_vnf_index_ref] = \
2090 vnf.datacenter
2091
2092 for vnfc in vld.vnfd_connection_point_ref:
2093 datacenter = vnf_datacenter_map.get(
2094 vnfc.member_vnf_index_ref, self._datacenter_name)
2095
2096 datacenter_list.append(datacenter)
2097
2098 if self._nsr_cfg_msg.vl_datacenter_map:
2099 for vld_map in self._nsr_cfg_msg.vl_datacenter_map:
2100 if vld_map.vld_id_ref == vld.id:
2101 for datacenter in vld_map.datacenters:
2102 datacenter_list.append(datacenter)
2103
2104 # If no config has been provided then fall-back to the default
2105 # account
2106 if not datacenter_list:
2107 datacenter_list.append(self._datacenter_name)
2108
2109 self._log.debug("VL {} data center list: {}".
2110 format(vld.name, datacenter_list))
2111 return set(datacenter_list)
2112
2113 @asyncio.coroutine
2114 def create_vls(self):
2115 """ This function creates VLs for every VLD in the NSD
2116 associated with this NSR"""
2117 for vld in self.nsd_msg.vld:
2118
2119 self._log.debug("Found vld %s in nsr id %s", vld, self.id)
2120 datacenter_list = self._extract_datacenters_for_vl(vld)
2121 for datacenter in datacenter_list:
2122 vlr = yield from self._create_vls(vld, datacenter)
2123 self._vlrs[vlr.id] = vlr
2124 self._nsm.add_vlr_id_nsr_map(vlr.id, self)
2125
2126 @asyncio.coroutine
2127 def create_vl_instance(self, vld):
2128 self._log.error("Create VL for {}: {}".format(self.id, vld.as_dict()))
2129 # Check if the VL is already present
2130 vlr = None
2131 for vl_id, vl in self._vlrs.items():
2132 if vl.vld_msg.id == vld.id:
2133 self._log.error("The VLD %s already in NSR %s as VLR %s with status %s",
2134 vld.id, self.id, vl.id, vl.state)
2135 vlr = vl
2136 if vlr.state != VlRecordState.TERMINATED:
2137 err_msg = "VLR for VL {} in NSR {} already instantiated". \
2138 format(vld, self.id)
2139 self._log.error(err_msg)
2140 raise NsrVlUpdateError(err_msg)
2141 break
2142
2143 if vlr is None:
2144 datacenter_list = self._extract_datacenters_for_vl(vld)
2145 for datacenter in datacenter_list:
2146 vlr = yield from self._create_vls(vld, account, datacenter)
2147 self._vlrs[vlr.id] = vlr
2148 self._nsm.add_vlr_id_nsr_map(vlr.id, self)
2149
2150 vlr.state = VlRecordState.INSTANTIATION_PENDING
2151 yield from self.update_state()
2152
2153 try:
2154 yield from self.nsm_plugin.instantiate_vl(self, vlr)
2155
2156 except Exception as e:
2157 err_msg = "Error instantiating VL for NSR {} and VLD {}: {}". \
2158 format(self.id, vld.id, e)
2159 self._log.error(err_msg)
2160 self._log.exception(e)
2161 vlr.state = VlRecordState.FAILED
2162
2163 yield from self.update_state()
2164
2165 @asyncio.coroutine
2166 def delete_vl_instance(self, vld):
2167 for vlr_id, vlr in self._vlrs.items():
2168 if vlr.vld_msg.id == vld.id:
2169 self._log.debug("Found VLR %s for VLD %s in NSR %s",
2170 vlr.id, vld.id, self.id)
2171 vlr.state = VlRecordState.TERMINATE_PENDING
2172 yield from self.update_state()
2173
2174 try:
2175 yield from self.nsm_plugin.terminate_vl(vlr)
2176 vlr.state = VlRecordState.TERMINATED
2177 del self._vlrs[vlr]
2178 self.remove_vlr_id_nsr_map(vlr.id)
2179
2180 except Exception as e:
2181 err_msg = "Error terminating VL for NSR {} and VLD {}: {}". \
2182 format(self.id, vld.id, e)
2183 self._log.error(err_msg)
2184 self._log.exception(e)
2185 vlr.state = VlRecordState.FAILED
2186
2187 yield from self.update_state()
2188 break
2189
2190 @asyncio.coroutine
2191 def create_vnfs(self, config_xact):
2192 """
2193 This function creates VNFs for every VNF in the NSD
2194 associated with this NSR
2195 """
2196 self._log.debug("Creating %u VNFs associated with this NS id %s",
2197 len(self.nsd_msg.constituent_vnfd), self.id)
2198
2199 for const_vnfd in self.nsd_msg.constituent_vnfd:
2200 if not const_vnfd.start_by_default:
2201 self._log.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
2202 const_vnfd.member_vnf_index)
2203 continue
2204
2205 vnfd_msg = self._get_vnfd(const_vnfd.vnfd_id_ref, config_xact)
2206 datacenter_name = self._get_vnfd_datacenter(const_vnfd.member_vnf_index)
2207 if datacenter_name is None:
2208 datacenter_name = self._datacenter_name
2209 yield from self.create_vnf_record(vnfd_msg, const_vnfd, datacenter_name)
2210
2211 def get_placement_groups(self, vnfd_msg, const_vnfd):
2212 placement_groups = []
2213 for group in self.nsd_msg.placement_groups:
2214 for member_vnfd in group.member_vnfd:
2215 if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \
2216 (member_vnfd.member_vnf_index_ref == str(const_vnfd.member_vnf_index)):
2217 group_info = self.resolve_placement_group_cloud_construct(group)
2218 if group_info is None:
2219 self._log.info("Could not resolve cloud-construct for placement group: %s", group.name)
2220 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2221 else:
2222 self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2223 str(group_info),
2224 vnfd_msg.name,
2225 const_vnfd.member_vnf_index)
2226 placement_groups.append(group_info)
2227 return placement_groups
2228
2229 def get_cloud_config(self):
2230 cloud_config = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_CloudConfig()
2231 self._log.debug("Received key pair is {}".format(self._key_pairs))
2232
2233 for authorized_key in self.nsr_cfg_msg.ssh_authorized_key:
2234 if authorized_key.key_pair_ref in self._key_pairs:
2235 key_pair = cloud_config.key_pair.add()
2236 key_pair.from_dict(self._key_pairs[authorized_key.key_pair_ref].as_dict())
2237 for nsd_key_pair in self.nsd_msg.key_pair:
2238 key_pair = cloud_config.key_pair.add()
2239 key_pair.from_dict(key_pair.as_dict())
2240 for nsr_cfg_user in self.nsr_cfg_msg.user:
2241 user = cloud_config.user.add()
2242 user.name = nsr_cfg_user.name
2243 user.user_info = nsr_cfg_user.user_info
2244 for ssh_key in nsr_cfg_user.ssh_authorized_key:
2245 if ssh_key.key_pair_ref in self._key_pairs:
2246 key_pair = user.key_pair.add()
2247 key_pair.from_dict(self._key_pairs[ssh_key.key_pair_ref].as_dict())
2248 for nsd_user in self.nsd_msg.user:
2249 user = cloud_config.user.add()
2250 user.from_dict(nsd_user.as_dict())
2251
2252 self._log.debug("Formed cloud-config msg is {}".format(cloud_config))
2253 return cloud_config
2254
2255 @asyncio.coroutine
2256 def create_vnf_record(self, vnfd_msg, const_vnfd, datacenter_name, group_name=None, group_instance_id=None):
2257 # Fetch the VNFD associated with this VNF
2258 placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd)
2259 cloud_config = self.get_cloud_config()
2260 self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,datacenter_name)
2261 self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2262 vnfd_msg.name,
2263 const_vnfd.member_vnf_index,
2264 [ group.name for group in placement_groups])
2265
2266 vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts,
2267 self._log,
2268 self._loop,
2269 self._project,
2270 vnfd_msg,
2271 self._nsr_cfg_msg,
2272 const_vnfd,
2273 self.nsd_id,
2274 self.name,
2275 datacenter_name,
2276 self.id,
2277 group_name,
2278 group_instance_id,
2279 placement_groups,
2280 cloud_config,
2281 restart_mode=self.restart_mode,
2282 )
2283 if vnfr.id in self._vnfrs:
2284 err = "VNF with VNFR id %s already in vnf list" % (vnfr.id,)
2285 raise NetworkServiceRecordError(err)
2286
2287 self._vnfrs[vnfr.id] = vnfr
2288 self._nsm.vnfrs[vnfr.id] = vnfr
2289
2290 yield from vnfr.set_config_status(NsrYang.ConfigStates.INIT)
2291
2292 self._log.debug("Added VNFR %s to NSM VNFR list with id %s",
2293 vnfr.name,
2294 vnfr.id)
2295
2296 return vnfr
2297
2298 def create_param_pools(self):
2299 for param_pool in self.nsd_msg.parameter_pool:
2300 self._log.debug("Found parameter pool %s in nsr id %s", param_pool, self.id)
2301
2302 start_value = param_pool.range.start_value
2303 end_value = param_pool.range.end_value
2304 if end_value < start_value:
2305 raise NetworkServiceRecordError(
2306 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2307 start_value, end_value
2308 )
2309 )
2310
2311 self._param_pools[param_pool.name] = config_value_pool.ParameterValuePool(
2312 self._log,
2313 param_pool.name,
2314 range(start_value, end_value)
2315 )
2316
2317 @asyncio.coroutine
2318 def fetch_vnfr(self, vnfr_path):
2319 """ Fetch VNFR record """
2320 vnfr = None
2321 self._log.debug("Fetching VNFR with key %s while instantiating %s",
2322 vnfr_path, self.id)
2323 res_iter = yield from self._dts.query_read(vnfr_path, rwdts.XactFlag.MERGE)
2324
2325 for ent in res_iter:
2326 res = yield from ent
2327 vnfr = res.result
2328
2329 return vnfr
2330
2331 @asyncio.coroutine
2332 def instantiate_vnfs(self, vnfrs, scaleout=False):
2333 """
2334 This function instantiates VNFs for every VNF in this Network Service
2335 """
2336 @asyncio.coroutine
2337 def instantiate_vnf(vnf):
2338 self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id)
2339 vnfd_id = vnf.vnfr_msg.vnfd.id
2340 for dependency_vnf in dependencies[vnfd_id]:
2341 while dependency_vnf not in self.instantiated:
2342 yield from asyncio.sleep(1, loop=self._loop)
2343
2344 yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout)
2345 self.instantiated.add(vnfd_id)
2346
2347 self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id)
2348 dependencies = collections.defaultdict(list)
2349 for dependency_vnf in self._nsr_cfg_msg.nsd.vnf_dependency:
2350 dependencies[dependency_vnf.vnf_source_ref].append(dependency_vnf.vnf_depends_on_ref)
2351
2352 # The dictionary copy is to ensure that if a terminate is initiated right after instantiation, the
2353 # Runtime error for "dictionary changed size during iteration" does not occur.
2354 # vnfrs - 'dict_values' object
2355 # vnfrs_copy - list object
2356 vnfrs_copy = list(vnfrs)
2357 tasks = []
2358 for vnf in vnfrs_copy:
2359 vnf_task = self._loop.create_task(instantiate_vnf(vnf))
2360 tasks.append(vnf_task)
2361
2362 if len(tasks) > 0:
2363 self._log.debug("Waiting for %s instantiate_vnf tasks to complete", len(tasks))
2364 done, pending = yield from asyncio.wait(tasks, loop=self._loop, timeout=30)
2365 if pending:
2366 self._log.error("The Instantiate vnf task timed out after 30 seconds.")
2367 raise VirtualNetworkFunctionRecordError("Task tied out : ", pending)
2368
2369 @asyncio.coroutine
2370 def instantiate_vnffgs(self):
2371 """
2372 This function instantiates VNFFGs for every VNFFG in this Network Service
2373 """
2374 self._log.debug("Instantiating %u VNFFGs in NS %s",
2375 len(self.nsd_msg.vnffgd), self.id)
2376 for _, vnfr in self.vnfrs.items():
2377 while vnfr.state in [VnfRecordState.INSTANTIATION_PENDING, VnfRecordState.INIT]:
2378 self._log.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr.name,vnfr.state)
2379 yield from asyncio.sleep(2, loop=self._loop)
2380 if vnfr.state == VnfRecordState.ACTIVE:
2381 self._log.debug("Received vnfr state for vnfr %s is %s ",vnfr.name,vnfr.state)
2382 continue
2383 else:
2384 self._log.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr.name,vnfr.state)
2385 self._vnffgr_state = VnffgRecordState.FAILED
2386 return
2387
2388 self._log.info("Waiting for 90 seconds for VMs to come up")
2389 yield from asyncio.sleep(90, loop=self._loop)
2390 self._log.info("Starting VNFFG orchestration")
2391 for vnffg in self._vnffgrs.values():
2392 self._log.debug("Instantiating VNFFG: %s in NS %s", vnffg, self.id)
2393 yield from vnffg.instantiate()
2394
2395 @asyncio.coroutine
2396 def instantiate_scaling_instances(self, config_xact):
2397 """ Instantiate any default scaling instances in this Network Service """
2398 for group in self._scaling_groups.values():
2399 for i in range(group.min_instance_count):
2400 self._log.debug("Instantiating %s default scaling instance %s", group, i)
2401 yield from self.create_scale_group_instance(
2402 group.name, i, config_xact, is_default=True
2403 )
2404
2405 for group_msg in self._nsr_cfg_msg.scaling_group:
2406 if group_msg.scaling_group_name_ref != group.name:
2407 continue
2408
2409 for instance in group_msg.instance:
2410 self._log.debug("Reloading %s scaling instance %s", group_msg, instance.id)
2411 yield from self.create_scale_group_instance(
2412 group.name, instance.id, config_xact, is_default=False
2413 )
2414
2415 def has_scaling_instances(self):
2416 """ Return boolean indicating if the network service has default scaling groups """
2417 for group in self._scaling_groups.values():
2418 if group.min_instance_count > 0:
2419 return True
2420
2421 for group_msg in self._nsr_cfg_msg.scaling_group:
2422 if len(group_msg.instance) > 0:
2423 return True
2424
2425 return False
2426
2427 @asyncio.coroutine
2428 def publish(self):
2429 """ This function publishes this NSR """
2430
2431 self._nsr_msg = self.create_msg()
2432
2433 self._log.debug("Publishing the NSR with xpath %s and nsr %s",
2434 self.nsr_xpath,
2435 self._nsr_msg)
2436
2437 if self._debug_running:
2438 self._log.debug("Publishing NSR in RUNNING state!")
2439 #raise()
2440
2441 yield from self._nsm.nsr_handler.update(None, self.nsr_xpath, self._nsr_msg)
2442 if self._op_status.state == NetworkServiceRecordState.RUNNING:
2443 self._debug_running = True
2444
2445 @asyncio.coroutine
2446 def unpublish(self, xact=None):
2447 """ Unpublish this NSR object """
2448 self._log.debug("Unpublishing Network service id %s", self.id)
2449
2450 yield from self._nsm.nsr_handler.delete(xact, self.nsr_xpath)
2451
2452 @property
2453 def nsr_xpath(self):
2454 """ Returns the xpath associated with this NSR """
2455 return self._project.add_project((
2456 "D,/nsr:ns-instance-opdata" +
2457 "/nsr:nsr[nsr:ns-instance-config-ref={}]"
2458 ).format(quoted_key(self.id)))
2459
2460 @staticmethod
2461 def xpath_from_nsr(nsr):
2462 """ Returns the xpath associated with this NSR op data"""
2463 return (NetworkServiceRecord.XPATH +
2464 "[nsr:ns-instance-config-ref={}]").format(quoted_key(nsr.id))
2465
2466 @property
2467 def nsd_xpath(self):
2468 """ Return NSD config xpath."""
2469 return self._project.add_project((
2470 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id={}]"
2471 ).format(quoted_key(self.nsd_id)))
2472
2473 @asyncio.coroutine
2474 def instantiate(self, config_xact):
2475 """"Instantiates a NetworkServiceRecord.
2476
2477 This function instantiates a Network service
2478 which involves the following steps,
2479
2480 * Instantiate every VL in NSD by sending create VLR request to DTS.
2481 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2482 * Publish the NSR details to DTS
2483
2484 Arguments:
2485 nsr: The NSR configuration request containing nsr-id and nsd
2486 config_xact: The configuration transaction which initiated the instatiation
2487
2488 Raises:
2489 NetworkServiceRecordError if the NSR creation fails
2490
2491 Returns:
2492 No return value
2493 """
2494
2495 self._log.debug("Instantiating NS - %s xact - %s", self, config_xact)
2496
2497 # Move the state to INIITALIZING
2498 self.set_state(NetworkServiceRecordState.INIT)
2499
2500 event_descr = "Instantiation Request Received NSR Id: %s, NS Name: %s" % (self.id, self.name)
2501 self.record_event("instantiating", event_descr)
2502
2503 # Find the NSD
2504 self._nsd = self._nsr_cfg_msg.nsd
2505
2506 # Merge any config and initial config primitive values
2507 self.config_store.merge_nsd_config(self.nsd_msg, self._project.name)
2508 self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict()))
2509
2510 event_descr = "Fetched NSD with descriptor id %s, NS Name: %s" % (self.nsd_id, self.name)
2511 self.record_event("nsd-fetched", event_descr)
2512
2513 if self._nsd is None:
2514 msg = "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2515 self._log.debug(msg, self.nsd_id, self.id)
2516 raise NetworkServiceRecordError(self)
2517
2518 self._log.debug("Got nsd result %s", self._nsd)
2519
2520 # Substitute any input parameters
2521 self.substitute_input_parameters(self._nsd, self._nsr_cfg_msg)
2522
2523 # Create the record
2524 yield from self.create(config_xact)
2525
2526 # Publish the NSR to DTS
2527 yield from self.publish()
2528
2529 @asyncio.coroutine
2530 def do_instantiate():
2531 """
2532 Instantiate network service
2533 """
2534 self._log.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2535 self.id, self.nsd_id)
2536
2537 # instantiate the VLs
2538 event_descr = ("Instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2539 (len(self.nsd_msg.vld), self.id, self.name))
2540 self.record_event("begin-external-vls-instantiation", event_descr)
2541
2542 self.set_state(NetworkServiceRecordState.VL_INIT_PHASE)
2543
2544 # Publish the NSR to DTS
2545 yield from self.publish()
2546
2547 if self._ns_terminate_received:
2548 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-external-vls-instantiation.")
2549 # Setting this flag as False again as this is a state where neither VL or VNF have been instantiated.
2550 self._ns_terminate_received = False
2551 # At this stage only ns-instance opdata is published. Cleaning up the record.
2552 yield from self.unpublish()
2553 return
2554
2555 yield from self.instantiate_vls()
2556
2557 event_descr = ("Finished instantiating %s external VLs for NSR id: %s, NS Name: %s " %
2558 (len(self.nsd_msg.vld), self.id, self.name))
2559 self.record_event("end-external-vls-instantiation", event_descr)
2560
2561 self.set_state(NetworkServiceRecordState.VNF_INIT_PHASE)
2562
2563 # Publish the NSR to DTS
2564 yield from self.publish()
2565
2566 self._log.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2567 self.id, self.nsd_id)
2568
2569 # instantiate the VNFs
2570 event_descr = ("Instantiating %s VNFS for NSR id: %s, NS Name: %s " %
2571 (len(self.nsd_msg.constituent_vnfd), self.id, self.name))
2572
2573 self.record_event("begin-vnf-instantiation", event_descr)
2574
2575 if self._ns_terminate_received:
2576 self._log.debug("Terminate Received. Interrupting Instantiation at event : end-external-vls-instantiation.")
2577 return
2578
2579 yield from self.instantiate_vnfs(self._vnfrs.values())
2580
2581 self._log.debug(" Finished instantiating %d VNFs for NSR id: %s, NS Name: %s",
2582 len(self.nsd_msg.constituent_vnfd), self.id, self.name)
2583
2584 event_descr = ("Finished instantiating %s VNFs for NSR id: %s, NS Name: %s" %
2585 (len(self.nsd_msg.constituent_vnfd), self.id, self.name))
2586 self.record_event("end-vnf-instantiation", event_descr)
2587
2588 # Publish the NSR to DTS
2589 yield from self.publish()
2590
2591 if len(self.vnffgrs) > 0:
2592 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2593 event_descr = ("Instantiating %s VNFFGS for NSR id: %s, NS Name: %s" %
2594 (len(self.nsd_msg.vnffgd), self.id, self.name))
2595
2596 self.record_event("begin-vnffg-instantiation", event_descr)
2597
2598 if self._ns_terminate_received:
2599 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-vnffg-instantiation.")
2600 return
2601
2602 yield from self.instantiate_vnffgs()
2603
2604 event_descr = ("Finished instantiating %s VNFFGDs for NSR id: %s, NS Name: %s" %
2605 (len(self.nsd_msg.vnffgd), self.id, self.name))
2606 self.record_event("end-vnffg-instantiation", event_descr)
2607
2608 if self.has_scaling_instances():
2609 event_descr = ("Instantiating %s Scaling Groups for NSR id: %s, NS Name: %s" %
2610 (len(self._scaling_groups), self.id, self.name))
2611
2612 self.record_event("begin-scaling-group-instantiation", event_descr)
2613
2614 if self._ns_terminate_received:
2615 self._log.debug("Terminate Received. Interrupting Instantiation at event : begin-scaling-group-instantiation.")
2616 return
2617
2618 yield from self.instantiate_scaling_instances(config_xact)
2619 self.record_event("end-scaling-group-instantiation", event_descr)
2620
2621 # Give the plugin a chance to deploy the network service now that all
2622 # virtual links and vnfs are instantiated
2623 yield from self.nsm_plugin.deploy(self._nsr_msg)
2624
2625 self._log.debug("Publishing NSR...... nsr[%s], nsd[%s], for NS[%s]",
2626 self.id, self.nsd_id, self.name)
2627
2628 # Publish the NSR to DTS
2629 yield from self.publish()
2630
2631 self._log.debug("Published NSR...... nsr[%s], nsd[%s], for NS[%s]",
2632 self.id, self.nsd_id, self.name)
2633
2634 def on_instantiate_done(fut):
2635 # If the do_instantiate fails, then publish NSR with failed result
2636 e = fut.exception()
2637 if e is not None:
2638 import traceback, sys
2639 print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True)
2640 self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e))
2641 self._loop.create_task(self.instantiation_failed(failed_reason=str(e)))
2642
2643 instantiate_task = self._loop.create_task(do_instantiate())
2644 instantiate_task.add_done_callback(on_instantiate_done)
2645
2646 @asyncio.coroutine
2647 def set_config_status(self, status, status_details=None):
2648 if self.config_status != status:
2649 self._log.debug("Updating NSR {} status for {} to {}".
2650 format(self.name, self.config_status, status))
2651 self._config_status = status
2652 self._config_status_details = status_details
2653
2654 if self._config_status == NsrYang.ConfigStates.FAILED:
2655 self.record_event("config-failed", "NS configuration failed",
2656 evt_details=self._config_status_details)
2657
2658 yield from self.publish()
2659
2660 if status == NsrYang.ConfigStates.TERMINATE:
2661 yield from self.terminate_ns_cont()
2662
2663 @asyncio.coroutine
2664 def is_active(self):
2665 """ This NS is active """
2666 self.set_state(NetworkServiceRecordState.RUNNING)
2667 if self._is_active:
2668 return
2669
2670 # Publish the NSR to DTS
2671 self._log.debug("Network service %s is active ", self.id)
2672 self._is_active = True
2673
2674 event_descr = "NSR in running state for NSR id: %s, NS Name: %s" % (self.id, self.name)
2675 self.record_event("ns-running", event_descr)
2676
2677 yield from self.publish()
2678
2679 @asyncio.coroutine
2680 def instantiation_failed(self, failed_reason=None):
2681 """ The NS instantiation failed"""
2682 self._log.error("Network service id:%s, name:%s instantiation failed",
2683 self.id, self.name)
2684 self.set_state(NetworkServiceRecordState.FAILED)
2685 self._is_failed = True
2686
2687 event_descr = "Instantiation of NS %s - %s failed" % (self.id, self.name)
2688 self.record_event("ns-failed", event_descr, evt_details=failed_reason)
2689
2690 # Publish the NSR to DTS
2691 yield from self.publish()
2692
2693 @asyncio.coroutine
2694 def terminate_vnfrs(self, vnfrs, scalein=False):
2695 """ Terminate VNFRS in this network service """
2696 self._log.debug("Terminating VNFs in network service %s - %s", self.id, self.name)
2697 vnfr_ids = []
2698 for vnfr in list(vnfrs):
2699 self._log.debug("Terminating VNFs in network service %s %s", vnfr.id, self.id)
2700 yield from self.nsm_plugin.terminate_vnf(self, vnfr, scalein=scalein)
2701 vnfr_ids.append(vnfr.id)
2702
2703 for vnfr_id in vnfr_ids:
2704 self._vnfrs.pop(vnfr_id, None)
2705
2706 @asyncio.coroutine
2707 def terminate(self):
2708 """Start terminate of a NetworkServiceRecord."""
2709 # Move the state to TERMINATE
2710 self.set_state(NetworkServiceRecordState.TERMINATE)
2711 event_descr = "Terminate being processed for NS Id: %s, NS Name: %s" % (self.id, self.name)
2712 self.record_event("terminate", event_descr)
2713 self._log.debug("Terminating network service id: %s, NS Name: %s", self.id, self.name)
2714
2715 # Adding the NSR ID on terminate Evet. This will be checked to halt the instantiation if not already finished.
2716 self._ns_terminate_received = True
2717
2718 yield from self.publish()
2719
2720 if self._is_failed:
2721 # IN case the instantiation failed, then trigger a cleanup immediately
2722 # don't wait for Cfg manager, as it will have no idea of this NSR.
2723 # Due to the failure
2724 yield from self.terminate_ns_cont()
2725
2726
2727 @asyncio.coroutine
2728 def terminate_ns_cont(self):
2729 """Config script related to terminate finished, continue termination"""
2730 def terminate_vnffgrs():
2731 """ Terminate VNFFGRS in this network service """
2732 self._log.debug("Terminating VNFFGRs in network service %s - %s", self.id, self.name)
2733 for vnffgr in self.vnffgrs.values():
2734 yield from vnffgr.terminate()
2735
2736 def terminate_vlrs():
2737 """ Terminate VLRs in this netork service """
2738 self._log.debug("Terminating VLs in network service %s - %s", self.id, self.name)
2739 for vlr_id, vlr in self.vlrs.items():
2740 yield from self.nsm_plugin.terminate_vl(vlr)
2741 vlr.state = VlRecordState.TERMINATED
2742
2743 # Move the state to VNF_TERMINATE_PHASE
2744 self._log.debug("Terminating VNFFGs in NS ID: %s, NS Name: %s", self.id, self.name)
2745 self.set_state(NetworkServiceRecordState.VNFFG_TERMINATE_PHASE)
2746 event_descr = "Terminating VNFFGS in NS Id: %s, NS Name: %s" % (self.id, self.name)
2747 self.record_event("terminating-vnffgss", event_descr)
2748 yield from terminate_vnffgrs()
2749
2750 # Move the state to VNF_TERMINATE_PHASE
2751 self.set_state(NetworkServiceRecordState.VNF_TERMINATE_PHASE)
2752 event_descr = "Terminating VNFS in NS Id: %s, NS Name: %s" % (self.id, self.name)
2753 self.record_event("terminating-vnfs", event_descr)
2754 yield from self.terminate_vnfrs(self.vnfrs.values())
2755
2756 # Move the state to VL_TERMINATE_PHASE
2757 self.set_state(NetworkServiceRecordState.VL_TERMINATE_PHASE)
2758 event_descr = "Terminating VLs in NS Id: %s, NS Name: %s" % (self.id, self.name)
2759 self.record_event("terminating-vls", event_descr)
2760 yield from terminate_vlrs()
2761 yield from self.nsm_plugin.terminate_ns(self)
2762 # Remove the generated SSH key
2763 if self._ssh_key_file:
2764 p = urlparse(self._ssh_key_file)
2765 if p[0] == 'file':
2766 path = os.path.dirname(p[2])
2767 self._log.debug("NSR {}: Removing keys in {}".format(self.name,
2768 path))
2769 shutil.rmtree(path, ignore_errors=True)
2770
2771 # Move the state to TERMINATED
2772 self.set_state(NetworkServiceRecordState.TERMINATED)
2773 event_descr = "Terminated NS Id: %s, NS Name: %s" % (self.id, self.name)
2774 self.record_event("terminated", event_descr)
2775
2776 # Unpublish the NSR record
2777 self._log.debug("Unpublishing the network service %s - %s", self.id, self.name)
2778 yield from self.unpublish()
2779
2780 # Finaly delete the NS instance from this NS Manager
2781 self._log.debug("Deleting the network service %s - %s", self.id, self.name)
2782 self.nsm.delete_nsr(self.id)
2783
2784 def enable(self):
2785 """"Enable a NetworkServiceRecord."""
2786 pass
2787
2788 def disable(self):
2789 """"Disable a NetworkServiceRecord."""
2790 pass
2791
2792 def map_config_status(self):
2793 self._log.debug("Config status for ns {} is {}".
2794 format(self.name, self._config_status))
2795 if self._config_status == NsrYang.ConfigStates.CONFIGURING:
2796 return 'configuring'
2797 if self._config_status == NsrYang.ConfigStates.FAILED:
2798 return 'failed'
2799 return 'configured'
2800
2801 def vl_phase_completed(self):
2802 """ Are VLs created in this NS?"""
2803 return self._vl_phase_completed
2804
2805 def vnf_phase_completed(self):
2806 """ Are VLs created in this NS?"""
2807 return self._vnf_phase_completed
2808
2809 def create_msg(self):
2810 """ The network serice record as a message """
2811 nsr_dict = {"ns_instance_config_ref": self.id}
2812 nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
2813 #nsr.datacenter = self.cloud_account_name
2814 nsr.sdn_account = self._sdn_account_name
2815 nsr.name_ref = self.name
2816 nsr.nsd_ref = self.nsd_id
2817 nsr.nsd_name_ref = self.nsd_msg.name
2818 nsr.operational_events = self._op_status.msg
2819 nsr.operational_status = self._op_status.yang_str()
2820 nsr.config_status = self.map_config_status()
2821 nsr.config_status_details = self._config_status_details
2822 nsr.create_time = self._create_time
2823 nsr.uptime = int(time.time()) - self._create_time
2824
2825 # Added for OpenMano
2826
2827 nsr.orchestration_progress.networks.total = len(self.nsd_msg.vld)
2828 if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
2829 # Taking the last update by OpenMano
2830 nsr.orchestration_progress.networks.active = self.nsm_plugin._openmano_nsrs[self.id]._active_nets
2831 else:
2832 nsr.orchestration_progress.networks.active = self._active_networks
2833 no_of_vdus = 0
2834 for vnfr_id, vnfr in self._vnfrs.items():
2835 no_of_vdus += len(vnfr.vnfd.vdu)
2836
2837 nsr.orchestration_progress.vms.total = no_of_vdus
2838 if isinstance(self.nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
2839 # Taking the last update by OpenMano
2840 nsr.orchestration_progress.vms.active = self.nsm_plugin._openmano_nsrs[self.id]._active_vms
2841 else:
2842 nsr.orchestration_progress.vms.active = self._active_vms
2843
2844 # Generated SSH key
2845 if self._ssh_pub_key:
2846 nsr.ssh_key_generated.private_key_file = self._ssh_key_file
2847 nsr.ssh_key_generated.public_key = self._ssh_pub_key
2848
2849 for cfg_prim in self.nsd_msg.service_primitive:
2850 cfg_prim = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict(
2851 cfg_prim.as_dict())
2852 nsr.service_primitive.append(cfg_prim)
2853
2854 for init_cfg in self.nsd_msg.initial_service_primitive:
2855 prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_InitialServicePrimitive.from_dict(
2856 init_cfg.as_dict())
2857 nsr.initial_service_primitive.append(prim)
2858
2859 for term_cfg in self.nsd_msg.terminate_service_primitive:
2860 prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_TerminateServicePrimitive.from_dict(
2861 term_cfg.as_dict())
2862 nsr.terminate_service_primitive.append(prim)
2863
2864 if self.vl_phase_completed():
2865 for vlr_id, vlr in self.vlrs.items():
2866 nsr.vlr.append(vlr.create_nsr_vlr_msg(self.vnfrs.values()))
2867
2868 if self.vnf_phase_completed():
2869 for vnfr_id in self.vnfrs:
2870 nsr.constituent_vnfr_ref.append(self.vnfrs[vnfr_id].const_vnfr_msg)
2871 for vnffgr in self.vnffgrs.values():
2872 nsr.vnffgr.append(vnffgr.fetch_vnffgr())
2873 for scaling_group in self._scaling_groups.values():
2874 nsr.scaling_group_record.append(scaling_group.create_record_msg())
2875
2876 return nsr
2877
2878 def all_vnfs_active(self):
2879 """ Are all VNFS in this NS active? """
2880 for _, vnfr in self.vnfrs.items():
2881 if vnfr.active is not True:
2882 return False
2883 return True
2884
2885 @asyncio.coroutine
2886 def update_state(self):
2887 """ Re-evaluate this NS's state """
2888 curr_state = self._op_status.state
2889
2890 # This means that the terminate has been fired before the NS was UP.
2891 if self._ns_terminate_received:
2892 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
2893 self._ns_terminate_received = False
2894 yield from self.terminate_ns_cont()
2895 else:
2896 if curr_state == NetworkServiceRecordState.TERMINATED:
2897 self._log.debug("NS (%s - %s) in terminated state, not updating state", self.id, self.name)
2898 return
2899
2900 new_state = NetworkServiceRecordState.RUNNING
2901 self._log.debug("Received update_state for nsr: %s, curr-state: %s",
2902 self.id, curr_state)
2903
2904 # check all VLs
2905 if (isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin)):
2906 for vlr_id, vl in self.vlrs.items():
2907 self._log.debug("VLR %s state %s", vlr_id, vl.state)
2908 if vl.state in [VlRecordState.ACTIVE, VlRecordState.TERMINATED]:
2909 continue
2910 elif vl.state == VlRecordState.FAILED:
2911 if vl.prev_state != vl.state:
2912 event_descr = "Instantiation of VL %s failed" % vl.id
2913 event_error_details = vl.state_failed_reason
2914 self.record_event("vl-failed", event_descr, evt_details=event_error_details)
2915 vl.prev_state = vl.state
2916 new_state = NetworkServiceRecordState.FAILED
2917 break
2918 else:
2919 self._log.debug("VL already in failed state")
2920 else:
2921 if vl.state in [VlRecordState.INSTANTIATION_PENDING, VlRecordState.INIT]:
2922 new_state = NetworkServiceRecordState.VL_INSTANTIATE
2923 break
2924
2925 if vl.state in [VlRecordState.TERMINATE_PENDING]:
2926 new_state = NetworkServiceRecordState.VL_TERMINATE
2927 break
2928
2929 # Check all the VNFRs are present
2930 if new_state == NetworkServiceRecordState.RUNNING:
2931 for _, vnfr in self.vnfrs.items():
2932 self._log.debug("VNFR state %s", vnfr.state)
2933 if vnfr.state in [VnfRecordState.ACTIVE, VnfRecordState.TERMINATED]:
2934 active_vdus = 0
2935 for vnfr in self.vnfrs:
2936 active_vdus += self.nsm._vnfrs[vnfr]._active_vdus
2937
2938 if self._active_vms != active_vdus:
2939 self._active_vms = active_vdus
2940 yield from self.publish()
2941
2942 continue
2943
2944 elif vnfr.state == VnfRecordState.FAILED:
2945 if vnfr._prev_state != vnfr.state:
2946 event_descr = "Instantiation of VNF %s for NS: %s failed" % (vnfr.id, self.name)
2947 event_error_details = vnfr.state_failed_reason
2948 self.record_event("vnf-failed", event_descr, evt_details=event_error_details)
2949 vnfr.set_state(VnfRecordState.FAILED)
2950 else:
2951 self._log.info("VNF state did not change, curr=%s, prev=%s",
2952 vnfr.state, vnfr._prev_state)
2953 new_state = NetworkServiceRecordState.FAILED
2954 break
2955 else:
2956 self._log.debug("VNF %s in NSR %s - %s is still not active; current state is: %s",
2957 vnfr.id, self.id, self.name, vnfr.state)
2958 new_state = curr_state
2959
2960 # If new state is RUNNING; check VNFFGRs are also active
2961 if new_state == NetworkServiceRecordState.RUNNING:
2962 for _, vnffgr in self.vnffgrs.items():
2963 self._log.debug("Checking vnffgr state for nsr %s is: %s",
2964 self.id, vnffgr.state)
2965 if vnffgr.state == VnffgRecordState.ACTIVE:
2966 continue
2967 elif vnffgr.state == VnffgRecordState.FAILED:
2968 event_descr = "Instantiation of VNFFGR %s failed" % vnffgr.id
2969 self.record_event("vnffg-failed", event_descr)
2970 new_state = NetworkServiceRecordState.FAILED
2971 break
2972 else:
2973 self._log.info("VNFFGR %s in NSR %s - %s is still not active; current state is: %s",
2974 vnffgr.id, self.id, self.name, vnffgr.state)
2975 new_state = curr_state
2976
2977 # Update all the scaling group instance operational status to
2978 # reflect the state of all VNFR within that instance
2979 yield from self._update_scale_group_instances_status()
2980
2981 for _, group in self._scaling_groups.items():
2982 if group.state == scale_group.ScaleGroupState.SCALING_OUT:
2983 new_state = NetworkServiceRecordState.SCALING_OUT
2984 break
2985 elif group.state == scale_group.ScaleGroupState.SCALING_IN:
2986 new_state = NetworkServiceRecordState.SCALING_IN
2987 break
2988
2989 if new_state != curr_state:
2990 self._log.debug("Changing state of Network service %s - %s from %s to %s",
2991 self.id, self.name, curr_state, new_state)
2992 if new_state == NetworkServiceRecordState.RUNNING:
2993 yield from self.is_active()
2994 elif new_state == NetworkServiceRecordState.FAILED:
2995 # If the NS is already active and we entered scaling_in, scaling_out,
2996 # do not mark the NS as failing if scaling operation failed.
2997 if curr_state in [NetworkServiceRecordState.SCALING_OUT,
2998 NetworkServiceRecordState.SCALING_IN] and self._is_active:
2999 new_state = NetworkServiceRecordState.RUNNING
3000 self.set_state(new_state)
3001 else:
3002 yield from self.instantiation_failed()
3003 else:
3004 self.set_state(new_state)
3005
3006 yield from self.publish()
3007
3008 def vl_instantiation_state(self):
3009 """ Check if all VLs in this NS are active """
3010 for vl_id, vlr in self.vlrs.items():
3011 if vlr.state == VlRecordState.ACTIVE:
3012 continue
3013 elif vlr.state == VlRecordState.FAILED:
3014 return VlRecordState.FAILED
3015 elif vlr.state == VlRecordState.TERMINATED:
3016 return VlRecordState.TERMINATED
3017 elif vlr.state == VlRecordState.INSTANTIATION_PENDING:
3018 return VlRecordState.INSTANTIATION_PENDING
3019 else:
3020 self._log.error("vlr %s still in state %s", vlr, vlr.state)
3021 raise VirtualLinkRecordError("Invalid state %s" %(vlr.state))
3022 return VlRecordState.ACTIVE
3023
3024 def vl_instantiation_successful(self):
3025 """ Mark that all VLs in this NS are active """
3026 if self._vls_ready.is_set():
3027 self._log.error("NSR id %s, vls_ready is already set", self.id)
3028
3029 if self.vl_instantiation_state() == VlRecordState.ACTIVE:
3030 self._log.debug("NSR id %s, All %d vlrs are in active state %s",
3031 self.id, len(self.vlrs), self.vl_instantiation_state)
3032 self._vls_ready.set()
3033
3034 def vlr_event(self, vlr, action):
3035 self._log.debug("Received VLR %s with action:%s", vlr, action)
3036
3037 if vlr.id not in self.vlrs:
3038 self._log.error("VLR %s:%s received for unknown id, state:%s",
3039 vlr.id, vlr.name, vlr.operational_status)
3040 return
3041
3042 vlr_local = self.vlrs[vlr.id]
3043
3044 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
3045 if vlr.operational_status == 'running':
3046 vlr_local.set_state_from_op_status(vlr.operational_status)
3047 self._active_networks += 1
3048 self._log.info("VLR %s:%s moving to active state",
3049 vlr.id,vlr.name)
3050 elif vlr.operational_status == 'failed':
3051 vlr_local.set_state_from_op_status(vlr.operational_status)
3052 vlr_local.state_failed_reason = vlr.operational_status_details
3053 asyncio.ensure_future(self.update_state(), loop=self._loop)
3054 self._log.info("VLR %s:%s moving to failed state",
3055 vlr.id,vlr.name)
3056 else:
3057 self._log.warning("VLR %s:%s received state:%s",
3058 vlr.id, vlr.name, vlr.operational_status)
3059
3060 if isinstance(self.nsm_plugin, rwnsmplugin.RwNsPlugin):
3061 self.vl_instantiation_successful()
3062
3063 # self.update_state() is responsible for publishing the NSR state. Its being called by vlr_event and update_vnfr.
3064 # The call from vlr_event occurs only if vlr reaches a failed state. Hence implementing the check here to handle
3065 # ns terminate received after other vlr states as vl-alloc-pending, vl-init, running.
3066 if self._ns_terminate_received:
3067 # Resetting this flag so that terminate ns is not called via subsequent DTS Handlers after the intial call.
3068 if vlr.operational_status in ['running', 'failed']:
3069 self._ns_terminate_received = False
3070 asyncio.ensure_future(self.terminate_ns_cont(), loop=self._loop)
3071
3072
3073 class InputParameterSubstitution(object):
3074 """
3075 This class is responsible for substituting input parameters into an NSD.
3076 """
3077
3078 def __init__(self, log, project):
3079 """Create an instance of InputParameterSubstitution
3080
3081 Arguments:
3082 log - a logger for this object to use
3083
3084 """
3085 self.log = log
3086 self.project = project
3087
3088 def _fix_xpath(self, xpath):
3089 # Fix the parameter.xpath to include project and correct namespace
3090 self.log.debug("Provided xpath: {}".format(xpath))
3091 #Split the xpath at the /
3092 attrs = xpath.split('/')
3093 new_xp = attrs[0]
3094 for attr in attrs[1:]:
3095 new_ns = 'project-nsd'
3096 name = attr
3097 if ':' in attr:
3098 # Includes namespace
3099 ns, name = attr.split(':', 2)
3100 if ns == "rw-nsd":
3101 ns = "rw-project-nsd"
3102
3103 new_xp = new_xp + '/' + new_ns + ':' + name
3104
3105 updated_xpath = self.project.add_project(new_xp)
3106
3107 self.log.error("Updated xpath: {}".format(updated_xpath))
3108 return updated_xpath
3109
3110 def __call__(self, nsd, nsr_config):
3111 """Substitutes input parameters from the NSR config into the NSD
3112
3113 This call modifies the provided NSD with the input parameters that are
3114 contained in the NSR config.
3115
3116 Arguments:
3117 nsd - a GI NSD object
3118 nsr_config - a GI NSR config object
3119
3120 """
3121 if nsd is None or nsr_config is None:
3122 return
3123
3124 # Create a lookup of the xpath elements that this descriptor allows
3125 # to be modified
3126 optional_input_parameters = set()
3127 for input_parameter in nsd.input_parameter_xpath:
3128 optional_input_parameters.add(input_parameter.xpath)
3129
3130 # Apply the input parameters to the descriptor
3131 if nsr_config.input_parameter:
3132 for param in nsr_config.input_parameter:
3133 if param.xpath not in optional_input_parameters:
3134 msg = "tried to set an invalid input parameter ({})"
3135 self.log.error(msg.format(param.xpath))
3136 continue
3137
3138 self.log.debug(
3139 "input-parameter:{} = {}".format(
3140 param.xpath,
3141 param.value,
3142 )
3143 )
3144
3145 try:
3146 xp = self._fix_xpath(param.xpath)
3147 xpath.setxattr(nsd, xp, param.value)
3148
3149 except Exception as e:
3150 self.log.exception(e)
3151
3152
3153 class VnfInputParameterSubstitution(object):
3154 """
3155 This class is responsible for substituting input parameters into a VNFD.
3156 """
3157
3158 def __init__(self, log, const_vnfd, project):
3159 """Create an instance of VnfInputParameterSubstitution
3160
3161 Arguments:
3162 log - a logger for this object to use
3163 const_vnfd - id refs for vnfs in a ns
3164 project - project for the VNFs
3165 """
3166
3167 self.log = log
3168 self.member_vnf_index = const_vnfd.member_vnf_index
3169 self.vnfd_id_ref = const_vnfd.vnfd_id_ref
3170 self.project = project
3171
3172 def __call__(self, vnfr, nsr_config):
3173 """Substitutes vnf input parameters from the NSR config into the VNFD
3174
3175 This call modifies the provided VNFD with the input parameters that are
3176 contained in the NSR config.
3177
3178 Arguments:
3179 vnfr - a GI VNFR object
3180 nsr_config - a GI NSR Config object
3181
3182 """
3183
3184 def compose_xpath(xpath, id):
3185 prefix = "/rw-project:project[rw-project:name={}]".format(quoted_key(self.project.name)) + \
3186 "/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vnfd/".format(quoted_key(id))
3187
3188 suffix = '/'.join(xpath.split('/')[3:]).replace('vnfd', 'vnfr')
3189 return prefix + suffix
3190
3191 def substitute_xpath(ip_xpath, substitute_value, vnfr):
3192 vnfr_xpath = compose_xpath(ip_xpath, vnfr.id)
3193
3194 try:
3195 verify_xpath_wildcarded = xpath.getxattr(vnfr, vnfr_xpath)
3196
3197 self.log.debug(
3198 "vnf-input-parameter:{} = {}, for VNF : [member-vnf-index : {}, vnfd-id-ref : {}]".format(
3199 ip_xpath,
3200 substitute_value,
3201 self.member_vnf_index,
3202 self.vnfd_id_ref
3203 )
3204 )
3205 try:
3206 xpath.setxattr(vnfr, vnfr_xpath, substitute_value)
3207
3208 except Exception as e:
3209 self.log.exception(e)
3210
3211 except Exception as e:
3212 self.log.exception("Wildcarded xpath {} is listy in nature. Can not update. Exception => {}"
3213 .format(ip_xpath, e))
3214
3215 if vnfr is None or nsr_config is None:
3216 return
3217
3218 optional_input_parameters = set()
3219 for input_parameter in nsr_config.nsd.input_parameter_xpath:
3220 optional_input_parameters.add(input_parameter.xpath)
3221
3222 # Apply the input parameters to the vnfr
3223 if nsr_config.vnf_input_parameter:
3224 for param in nsr_config.vnf_input_parameter:
3225 if (param.member_vnf_index_ref == self.member_vnf_index and param.vnfd_id_ref == self.vnfd_id_ref):
3226 if param.input_parameter:
3227 for ip in param.input_parameter:
3228 if ip.xpath not in optional_input_parameters:
3229 msg = "Substitution Failed. Tried to set an invalid vnf input parameter ({}) for vnf [member-vnf-index : {}, vnfd-id-ref : {}]"
3230 self.log.error(msg.format(ip.xpath, self.member_vnf_index, self.vnfd_id_ref))
3231 continue
3232
3233 try:
3234 substitute_xpath(ip.xpath, ip.value, vnfr)
3235 except Exception as e:
3236 self.log.exception(e)
3237 else:
3238 self.log.debug("Substituting Xpaths with default Values")
3239 for input_parameter in nsr_config.nsd.input_parameter_xpath:
3240 if input_parameter.default_value is not None:
3241 try:
3242 if "vnfd-catalog" in input_parameter.xpath:
3243 substitute_xpath(input_parameter.xpath, input_parameter.default_value, vnfr)
3244 except Exception as e:
3245 self.log.exception(e)
3246
3247
3248 class NetworkServiceDescriptor(object):
3249 """
3250 Network service descriptor class
3251 """
3252
3253 def __init__(self, dts, log, loop, nsd, nsm):
3254 self._dts = dts
3255 self._log = log
3256 self._loop = loop
3257
3258 self._nsd = nsd
3259 self._nsm = nsm
3260
3261 @property
3262 def id(self):
3263 """ Returns nsd id """
3264 return self._nsd.id
3265
3266 @property
3267 def name(self):
3268 """ Returns name of nsd """
3269 return self._nsd.name
3270
3271 @property
3272 def msg(self):
3273 """ Return the message associated with this NetworkServiceDescriptor"""
3274 return self._nsd
3275
3276 @staticmethod
3277 def path_for_id(nsd_id):
3278 """ Return path for the passed nsd_id"""
3279 return self._nsm._project.add_project(
3280 "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'".
3281 format(nsd_id))
3282
3283 def path(self):
3284 """ Return the message associated with this NetworkServiceDescriptor"""
3285 return NetworkServiceDescriptor.path_for_id(self.id)
3286
3287 def update(self, nsd):
3288 """ Update the NSD descriptor """
3289 self._nsd = nsd
3290
3291
3292 class NsdDtsHandler(object):
3293 """ The network service descriptor DTS handler """
3294 XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd"
3295
3296 def __init__(self, dts, log, loop, nsm):
3297 self._dts = dts
3298 self._log = log
3299 self._loop = loop
3300 self._nsm = nsm
3301
3302 self._regh = None
3303 self._project = nsm._project
3304
3305 @property
3306 def regh(self):
3307 """ Return registration handle """
3308 return self._regh
3309
3310 @asyncio.coroutine
3311 def register(self):
3312 """ Register for Nsd create/update/delete/read requests from dts """
3313
3314 if self._regh:
3315 self._log.warning("DTS handler already registered for project {}".
3316 format(self._project.name))
3317 return
3318
3319 def on_apply(dts, acg, xact, action, scratch):
3320 """Apply the configuration"""
3321 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
3322 self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)",
3323 xact, action)
3324
3325 if self._regh:
3326 # Create/Update an NSD record
3327 for cfg in self._regh.get_xact_elements(xact):
3328 # Only interested in those NSD cfgs whose ID was received in prepare callback
3329 if cfg.id in scratch.get('nsds', []) or is_recovery:
3330 self._nsm.update_nsd(cfg)
3331
3332 else:
3333 # This can happen if we do the deregister
3334 # during project delete before this is called
3335 self._log.debug("No reg handle for {} for project {}".
3336 format(self.__class__, self._project.name))
3337
3338 scratch.pop('nsds', None)
3339
3340 return RwTypes.RwStatus.SUCCESS
3341
3342 @asyncio.coroutine
3343 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3344 """ Prepare callback from DTS for NSD config """
3345
3346 self._log.info("Got nsd prepare - config received nsd id %s, msg %s",
3347 msg.id, msg)
3348
3349 fref = ProtobufC.FieldReference.alloc()
3350 fref.goto_whole_message(msg.to_pbcm())
3351
3352 if fref.is_field_deleted():
3353 # Delete an NSD record
3354 self._log.debug("Deleting NSD with id %s", msg.id)
3355 self._nsm.delete_nsd(msg.id)
3356 else:
3357 # Add this NSD to scratch to create/update in apply callback
3358 nsds = scratch.setdefault('nsds', [])
3359 nsds.append(msg.id)
3360 # acg._scratch['nsds'].append(msg.id)
3361
3362 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3363
3364 xpath = self._project.add_project(NsdDtsHandler.XPATH)
3365 self._log.debug(
3366 "Registering for NSD config using xpath: %s",
3367 xpath,
3368 )
3369
3370 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3371 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3372 # Need a list in scratch to store NSDs to create/update later
3373 # acg._scratch['nsds'] = list()
3374 self._regh = acg.register(
3375 xpath=xpath,
3376 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
3377 on_prepare=on_prepare)
3378
3379 def deregister(self):
3380 self._log.debug("De-register NSD handler for project {}".
3381 format(self._project.name))
3382 if self._regh:
3383 self._regh.deregister()
3384 self._regh = None
3385
3386
3387 class VnfdDtsHandler(object):
3388 """ DTS handler for VNFD config changes """
3389 XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd"
3390
3391 def __init__(self, dts, log, loop, nsm):
3392 self._dts = dts
3393 self._log = log
3394 self._loop = loop
3395 self._nsm = nsm
3396 self._regh = None
3397 self._project = nsm._project
3398
3399 @property
3400 def regh(self):
3401 """ DTS registration handle """
3402 return self._regh
3403
3404 @asyncio.coroutine
3405 def register(self):
3406 """ Register for VNFD configuration"""
3407
3408 if self._regh:
3409 self._log.warning("DTS handler already registered for project {}".
3410 format(self._project.name))
3411 return
3412
3413 @asyncio.coroutine
3414 def on_apply(dts, acg, xact, action, scratch):
3415 """Apply the configuration"""
3416 self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
3417 xact, action, scratch)
3418
3419 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
3420
3421 if self._regh:
3422 # Create/Update a VNFD record
3423 for cfg in self._regh.get_xact_elements(xact):
3424 # Only interested in those VNFD cfgs whose ID was received in prepare callback
3425 if cfg.id in scratch.get('vnfds', []) or is_recovery:
3426 self._nsm.update_vnfd(cfg)
3427
3428 for cfg in self._regh.elements:
3429 if cfg.id in scratch.get('deleted_vnfds', []):
3430 yield from self._nsm.delete_vnfd(cfg.id)
3431
3432 else:
3433 self._log.warning("Reg handle none for {} in project {}".
3434 format(self.__class__, self._project))
3435
3436 scratch.pop('vnfds', None)
3437 scratch.pop('deleted_vnfds', None)
3438
3439 @asyncio.coroutine
3440 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3441 """ on prepare callback """
3442 xpath = ks_path.to_xpath(NsdYang.get_schema())
3443 self._log.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
3444 xpath, xact_info.query_action, msg)
3445
3446 fref = ProtobufC.FieldReference.alloc()
3447 fref.goto_whole_message(msg.to_pbcm())
3448
3449 # Handle deletes in prepare_callback, but adds/updates in apply_callback
3450 if fref.is_field_deleted():
3451 self._log.debug("Adding msg to deleted field")
3452 deleted_vnfds = scratch.setdefault('deleted_vnfds', [])
3453 deleted_vnfds.append(msg.id)
3454 else:
3455 # Add this VNFD to scratch to create/update in apply callback
3456 vnfds = scratch.setdefault('vnfds', [])
3457 vnfds.append(msg.id)
3458
3459 try:
3460 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
3461 except rift.tasklets.dts.ResponseError as e:
3462 self._log.warning(
3463 "VnfdDtsHandler in project {} with path {} for action {} failed: {}".
3464 format(self._project, xpath, xact_info.query_action, e))
3465
3466
3467 xpath = self._project.add_project(VnfdDtsHandler.XPATH)
3468 self._log.debug(
3469 "Registering for VNFD config using xpath {} for project {}"
3470 .format(xpath, self._project))
3471 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
3472 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
3473 # Need a list in scratch to store VNFDs to create/update later
3474 # acg._scratch['vnfds'] = list()
3475 # acg._scratch['deleted_vnfds'] = list()
3476 self._regh = acg.register(
3477 xpath=xpath,
3478 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
3479 on_prepare=on_prepare)
3480
3481 def deregister(self):
3482 self._log.debug("De-register VNFD handler for project {}".
3483 format(self._project.name))
3484 if self._regh:
3485 self._regh.deregister()
3486 self._regh = None
3487
3488
3489 class NsrRpcDtsHandler(object):
3490 """ The network service instantiation RPC DTS handler """
3491 EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service"
3492 EXEC_NSR_CONF_O_XPATH = "O,/nsr:start-network-service"
3493 NETCONF_IP_ADDRESS = "127.0.0.1"
3494 NETCONF_PORT = 2022
3495 RESTCONF_PORT = 8008
3496 NETCONF_USER = "@rift"
3497 NETCONF_PW = "rift"
3498 REST_BASE_V2_URL = 'https://{}:{}/v2/api/'.format("127.0.0.1",
3499 RESTCONF_PORT)
3500
3501 def __init__(self, dts, log, loop, nsm):
3502 self._dts = dts
3503 self._log = log
3504 self._loop = loop
3505 self._nsm = nsm
3506 self._project = nsm._project
3507 self._nsd = None
3508
3509 self._ns_regh = None
3510
3511 self._manager = None
3512 self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + \
3513 'project/{}/'.format(self._project) + \
3514 'config/ns-instance-config'
3515
3516 self._model = RwYang.Model.create_libncx()
3517 self._model.load_schema_ypbc(RwNsrYang.get_schema())
3518
3519 @property
3520 def nsm(self):
3521 """ Return the NS manager instance """
3522 return self._nsm
3523
3524 @staticmethod
3525 def wrap_netconf_config_xml(xml):
3526 xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml)
3527 return xml
3528
3529 @asyncio.coroutine
3530 def _connect(self, timeout_secs=240):
3531
3532 start_time = time.time()
3533 while (time.time() - start_time) < timeout_secs:
3534
3535 try:
3536 self._log.debug("Attemping NsmTasklet netconf connection.")
3537
3538 manager = yield from ncclient.asyncio_manager.asyncio_connect(
3539 loop=self._loop,
3540 host=NsrRpcDtsHandler.NETCONF_IP_ADDRESS,
3541 port=NsrRpcDtsHandler.NETCONF_PORT,
3542 username=NsrRpcDtsHandler.NETCONF_USER,
3543 password=NsrRpcDtsHandler.NETCONF_PW,
3544 allow_agent=False,
3545 look_for_keys=False,
3546 hostkey_verify=False,
3547 )
3548
3549 return manager
3550
3551 except ncclient.transport.errors.SSHError as e:
3552 self._log.warning("Netconf connection to launchpad %s failed: %s",
3553 NsrRpcDtsHandler.NETCONF_IP_ADDRESS, str(e))
3554
3555 yield from asyncio.sleep(5, loop=self._loop)
3556
3557 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
3558 timeout_secs)
3559
3560 def _apply_ns_instance_config(self,payload_dict):
3561 req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
3562 response=requests.post(self._nsr_config_url,
3563 headers=req_hdr,
3564 auth=(NsrRpcDtsHandler.NETCONF_USER, NsrRpcDtsHandler.NETCONF_PW),
3565 data=payload_dict,
3566 verify=False)
3567 return response
3568
3569 @asyncio.coroutine
3570 def register(self):
3571 """ Register for NS monitoring read from dts """
3572
3573 @asyncio.coroutine
3574 def on_ns_config_prepare(xact_info, action, ks_path, msg):
3575 """ prepare callback from dts start-network-service"""
3576 assert action == rwdts.QueryAction.RPC
3577
3578 if not self._project.rpc_check(msg, xact_info):
3579 return
3580
3581 rpc_ip = msg
3582 rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({
3583 "nsr_id":str(uuid.uuid4())
3584 })
3585
3586 if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and
3587 ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)):
3588 errmsg = (
3589 "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".
3590 format(rpc_ip))
3591 self._log.error(errmsg)
3592 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3593 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3594 errmsg)
3595 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3596 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3597 return
3598
3599 self._log.debug("start-network-service RPC input: {}".format(rpc_ip))
3600
3601 try:
3602 # Add used value to the pool
3603 self._log.debug("RPC output: {}".format(rpc_op))
3604
3605 nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref)
3606
3607 self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3608 rpc_ip.name, rpc_ip.nsd_ref)
3609
3610 ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"}
3611 ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items()
3612 if k in RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields}
3613 ns_instance_config_dict.update(ns_instance_config_copy_dict)
3614
3615 ns_instance_config = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict)
3616 ns_instance_config.nsd = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd()
3617 ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict())
3618
3619 payload_dict = ns_instance_config.to_json(self._model)
3620
3621 self._log.debug("Sending configure ns-instance-config json to %s: %s",
3622 self._nsr_config_url,ns_instance_config)
3623
3624 response = yield from self._loop.run_in_executor(
3625 None,
3626 self._apply_ns_instance_config,
3627 payload_dict
3628 )
3629 response.raise_for_status()
3630 self._log.debug("Received edit config response: %s", response.json())
3631
3632 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
3633 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3634 rpc_op)
3635 except Exception as e:
3636 errmsg = ("Exception processing the "
3637 "start-network-service: {}".format(e))
3638 self._log.exception(errmsg)
3639 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3640 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH,
3641 errmsg)
3642 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
3643 NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH)
3644
3645 self._ns_regh = yield from self._dts.register(
3646 xpath=NsrRpcDtsHandler.EXEC_NSR_CONF_XPATH,
3647 handler=rift.tasklets.DTS.RegistrationHandler(
3648 on_prepare=on_ns_config_prepare),
3649 flags=rwdts.Flag.PUBLISHER,
3650 )
3651
3652 def deregister(self):
3653 if self._ns_regh:
3654 self._ns_regh.deregister()
3655 self._ns_regh = None
3656
3657
3658 class NsrDtsHandler(object):
3659 """ The network service DTS handler """
3660 NSR_XPATH = "C,/nsr:ns-instance-config/nsr:nsr"
3661 SCALE_INSTANCE_XPATH = "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3662 KEY_PAIR_XPATH = "C,/nsr:key-pair"
3663
3664 def __init__(self, dts, log, loop, nsm):
3665 self._dts = dts
3666 self._log = log
3667 self._loop = loop
3668 self._nsm = nsm
3669 self._project = self._nsm._project
3670
3671 self._nsr_regh = None
3672 self._scale_regh = None
3673 self._key_pair_regh = None
3674
3675 @property
3676 def nsm(self):
3677 """ Return the NS manager instance """
3678 return self._nsm
3679
3680 @asyncio.coroutine
3681 def register(self):
3682 """ Register for Nsr create/update/delete/read requests from dts """
3683
3684 if self._nsr_regh:
3685 self._log.warning("DTS handler already registered for project {}".
3686 format(self._project.name))
3687 return
3688
3689 def nsr_id_from_keyspec(ks):
3690 nsr_path_entry = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks)
3691 nsr_id = nsr_path_entry.key00.id
3692 return nsr_id
3693
3694 def group_name_from_keyspec(ks):
3695 group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks)
3696 group_name = group_path_entry.key00.scaling_group_name_ref
3697 return group_name
3698
3699 def is_instance_in_reg_elements(nsr_id, group_name, instance_id):
3700 """ Return boolean indicating if scaling group instance was already commited previously.
3701
3702 By looking at the existing elements in this registration handle (elements not part
3703 of this current xact), we can tell if the instance was configured previously without
3704 keeping any application state.
3705 """
3706 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3707 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3708 elem_group_name = group_name_from_keyspec(keyspec)
3709
3710 if elem_nsr_id != nsr_id or group_name != elem_group_name:
3711 continue
3712
3713 if instance_cfg.id == instance_id:
3714 return True
3715
3716 return False
3717
3718 def get_scale_group_instance_delta(nsr_id, group_name, xact):
3719 delta = {"added": [], "deleted": []}
3720 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(xact, include_keyspec=True):
3721 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3722 if elem_nsr_id != nsr_id:
3723 continue
3724
3725 elem_group_name = group_name_from_keyspec(keyspec)
3726 if elem_group_name != group_name:
3727 continue
3728
3729 delta["added"].append(instance_cfg.id)
3730
3731 for instance_cfg, keyspec in self._scale_regh.get_xact_elements(include_keyspec=True):
3732 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3733 if elem_nsr_id != nsr_id:
3734 continue
3735
3736 elem_group_name = group_name_from_keyspec(keyspec)
3737 if elem_group_name != group_name:
3738 continue
3739
3740 if instance_cfg.id in delta["added"]:
3741 delta["added"].remove(instance_cfg.id)
3742 else:
3743 delta["deleted"].append(instance_cfg.id)
3744
3745 return delta
3746
3747 @asyncio.coroutine
3748 def update_nsr_nsd(nsr_id, xact, scratch):
3749
3750 @asyncio.coroutine
3751 def get_nsr_vl_delta(nsr_id, xact, scratch):
3752 delta = {"added": [], "deleted": []}
3753 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(xact, include_keyspec=True):
3754 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3755 if elem_nsr_id != nsr_id:
3756 continue
3757
3758 if 'vld' in instance_cfg.nsd:
3759 for vld in instance_cfg.nsd.vld:
3760 delta["added"].append(vld)
3761
3762 for instance_cfg, keyspec in self._nsr_regh.get_xact_elements(include_keyspec=True):
3763 self._log.debug("NSR update: %s", instance_cfg)
3764 elem_nsr_id = nsr_id_from_keyspec(keyspec)
3765 if elem_nsr_id != nsr_id:
3766 continue
3767
3768 if 'vld' in instance_cfg.nsd:
3769 for vld in instance_cfg.nsd.vld:
3770 if vld in delta["added"]:
3771 delta["added"].remove(vld)
3772 else:
3773 delta["deleted"].append(vld)
3774
3775 return delta
3776
3777 vl_delta = yield from get_nsr_vl_delta(nsr_id, xact, scratch)
3778 self._log.debug("Got NSR:%s VL instance delta: %s", nsr_id, vl_delta)
3779
3780 for vld in vl_delta["added"]:
3781 yield from self._nsm.nsr_instantiate_vl(nsr_id, vld)
3782
3783 for vld in vl_delta["deleted"]:
3784 yield from self._nsm.nsr_terminate_vl(nsr_id, vld)
3785
3786 def get_nsr_key_pairs(dts_member_reg, xact):
3787 key_pairs = {}
3788 for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True):
3789 self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec))
3790 xpath = keyspec.to_xpath(RwNsrYang.get_schema())
3791 key_pairs[instance_cfg.name] = instance_cfg
3792 return key_pairs
3793
3794 def on_apply(dts, acg, xact, action, scratch):
3795 """Apply the configuration"""
3796 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3797 xact, action, scratch)
3798
3799 @asyncio.coroutine
3800 def handle_create_nsr(msg, key_pairs=None, restart_mode=False):
3801 # Handle create nsr requests """
3802 # Do some validations
3803 if not msg.has_field("nsd"):
3804 err = "NSD not provided"
3805 self._log.error(err)
3806 raise NetworkServiceRecordError(err)
3807
3808 self._log.debug("Creating NetworkServiceRecord %s from nsr config %s",
3809 msg.id, msg.as_dict())
3810 nsr = yield from self.nsm.create_nsr(msg,
3811 xact,
3812 key_pairs=key_pairs,
3813 restart_mode=restart_mode)
3814 return nsr
3815
3816 def handle_delete_nsr(msg):
3817 @asyncio.coroutine
3818 def delete_instantiation(ns_id):
3819 """ Delete instantiation """
3820 yield from self._nsm.terminate_ns(ns_id, None)
3821
3822 # Handle delete NSR requests
3823 self._log.info("Delete req for NSR Id: %s received", msg.id)
3824 # Terminate the NSR instance
3825 nsr = self._nsm.get_ns_by_nsr_id(msg.id)
3826
3827 nsr.set_state(NetworkServiceRecordState.TERMINATE_RCVD)
3828 event_descr = "Terminate rcvd for NS Id: %s, NS Name: %s" % (msg.id, msg.name)
3829 nsr.record_event("terminate-rcvd", event_descr)
3830
3831 self._loop.create_task(delete_instantiation(msg.id))
3832
3833 @asyncio.coroutine
3834 def begin_instantiation(nsr):
3835 # Begin instantiation
3836 self._log.info("Beginning NS instantiation: %s", nsr.id)
3837 try:
3838 yield from self._nsm.instantiate_ns(nsr.id, xact)
3839 except Exception as e:
3840 self._log.exception(e)
3841 raise e
3842
3843 @asyncio.coroutine
3844 def instantiate_ns(msg, key_pairs, restart_mode=False):
3845 nsr = yield from handle_create_nsr(msg, key_pairs, restart_mode=restart_mode)
3846 yield from begin_instantiation(nsr)
3847
3848 def on_instantiate_done(fut, msg):
3849 # If the do_instantiate fails, then publish NSR with failed result
3850 e = fut.exception()
3851 if e is not None:
3852 import traceback
3853 print(traceback.format_exception(None, e, e.__traceback__), file=sys.stderr, flush=True)
3854 self._log.error("NSR instantiation failed for NSR id %s: %s", msg.id, str(e))
3855 failed_nsr = self._nsm.nsrs[msg.id]
3856 self._loop.create_task(failed_nsr.instantiation_failed(failed_reason=str(e)))
3857
3858
3859 self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3860 xact, action, scratch)
3861
3862 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
3863 key_pairs = []
3864 if self._key_pair_regh:
3865 for element in self._key_pair_regh.elements:
3866 key_pairs.append(element)
3867 else:
3868 self._log.error("Reg handle none for key pair in project {}".
3869 format(self._project))
3870
3871 if self._nsr_regh:
3872 for element in self._nsr_regh.elements:
3873 if element.id not in self.nsm._nsrs:
3874 instantiate_task = self._loop.create_task(instantiate_ns(element, key_pairs,
3875 restart_mode=True))
3876 instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=element))
3877 else:
3878 self._log.error("Reg handle none for NSR in project {}".
3879 format(self._project))
3880
3881 return RwTypes.RwStatus.SUCCESS
3882
3883 (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh,
3884 xact,
3885 "id")
3886 self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs,
3887 deleted_msgs, updated_msgs)
3888
3889 for msg in added_msgs:
3890 if msg.id not in self._nsm.nsrs:
3891 self._log.info("Create NSR received in on_apply to instantiate NS:%s", msg.id)
3892 key_pairs = get_nsr_key_pairs(self._key_pair_regh, xact)
3893 instantiate_task = self._loop.create_task(instantiate_ns(msg,key_pairs))
3894 instantiate_task.add_done_callback(functools.partial(on_instantiate_done, msg=msg))
3895
3896 for msg in deleted_msgs:
3897 self._log.info("Delete NSR received in on_apply to terminate NS:%s", msg.id)
3898 try:
3899 handle_delete_nsr(msg)
3900 except Exception:
3901 self._log.exception("Failed to terminate NS:%s", msg.id)
3902
3903 for msg in updated_msgs:
3904 self._log.info("Update NSR received in on_apply: %s", msg)
3905 self._nsm.nsr_update_cfg(msg.id, msg)
3906
3907 if 'nsd' in msg:
3908 self._loop.create_task(update_nsr_nsd(msg.id, xact, scratch))
3909
3910 for group in msg.scaling_group:
3911 instance_delta = get_scale_group_instance_delta(msg.id, group.scaling_group_name_ref, xact)
3912 self._log.debug("Got NSR:%s scale group instance delta: %s", msg.id, instance_delta)
3913
3914 for instance_id in instance_delta["added"]:
3915 self._nsm.scale_nsr_out(msg.id, group.scaling_group_name_ref, instance_id, xact)
3916
3917 for instance_id in instance_delta["deleted"]:
3918 self._nsm.scale_nsr_in(msg.id, group.scaling_group_name_ref, instance_id)
3919
3920
3921 return RwTypes.RwStatus.SUCCESS
3922
3923 @asyncio.coroutine
3924 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
3925 """ Prepare calllback from DTS for NSR """
3926
3927 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
3928 action = xact_info.query_action
3929 self._log.debug(
3930 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3931 xact, action, xact_info, xpath, msg
3932 )
3933
3934 fref = ProtobufC.FieldReference.alloc()
3935 fref.goto_whole_message(msg.to_pbcm())
3936
3937 def send_err_msg(err_msg):
3938 self._log.error(errmsg)
3939 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
3940 xpath,
3941 errmsg)
3942 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
3943
3944
3945 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]:
3946 # if this is an NSR create
3947 if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs:
3948 # Ensure the Cloud account/datacenter has been specified
3949 if not msg.has_field("datacenter") and not msg.has_field("datacenter"):
3950 errmsg = ("Cloud account or datacenter not specified in NS {}".
3951 format(msg.name))
3952 send_err_msg(errmsg)
3953 return
3954
3955 # Check if nsd is specified
3956 if not msg.has_field("nsd"):
3957 errmsg = ("NSD not specified in NS {}".
3958 format(msg.name))
3959 send_err_msg(errmsg)
3960 return
3961
3962 else:
3963 nsr = self._nsm.nsrs[msg.id]
3964 if msg.has_field("nsd"):
3965 if nsr.state != NetworkServiceRecordState.RUNNING:
3966 errmsg = ("Unable to update VL when NS {} not in running state".
3967 format(msg.name))
3968 send_err_msg(errmsg)
3969 return
3970
3971 if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0:
3972 errmsg = ("NS config {} NSD should have atleast 1 VLD".
3973 format(msg.name))
3974 send_err_msg(errmsg)
3975 return
3976
3977 if msg.has_field("scaling_group"):
3978 self._log.debug("ScaleMsg %s", msg)
3979 self._log.debug("NSSCALINGSTATE %s", nsr.state)
3980 if nsr.state != NetworkServiceRecordState.RUNNING:
3981 errmsg = ("Unable to perform scaling action when NS {} not in running state".
3982 format(msg.name))
3983 send_err_msg(errmsg)
3984 return
3985
3986 if len(msg.scaling_group) > 1:
3987 errmsg = ("Only a single scaling group can be configured at a time for NS {}".
3988 format(msg.name))
3989 send_err_msg(errmsg)
3990 return
3991
3992 for group_msg in msg.scaling_group:
3993 num_new_group_instances = len(group_msg.instance)
3994 if num_new_group_instances > 1:
3995 errmsg = ("Only a single scaling instance can be modified at a time for NS {}".
3996 format(msg.name))
3997 send_err_msg(errmsg)
3998 return
3999
4000 elif num_new_group_instances == 1:
4001 scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref]
4002 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
4003 if len(scale_group.instances) == scale_group.max_instance_count:
4004 errmsg = (" Max instances for {} reached for NS {}".
4005 format(str(scale_group), msg.name))
4006 send_err_msg(errmsg)
4007 return
4008
4009 acg.handle.prepare_complete_ok(xact_info.handle)
4010
4011
4012 xpath = self._project.add_project(NsrDtsHandler.NSR_XPATH)
4013 self._log.debug("Registering for NSR config using xpath: {}".
4014 format(xpath))
4015
4016 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
4017 with self._dts.appconf_group_create(handler=acg_hdl) as acg:
4018 self._nsr_regh = acg.register(
4019 xpath=xpath,
4020 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
4021 on_prepare=on_prepare
4022 )
4023
4024 self._scale_regh = acg.register(
4025 xpath=self._project.add_project(NsrDtsHandler.SCALE_INSTANCE_XPATH),
4026 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE,
4027 )
4028
4029 self._key_pair_regh = acg.register(
4030 xpath=self._project.add_project(NsrDtsHandler.KEY_PAIR_XPATH),
4031 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE,
4032 )
4033
4034 def deregister(self):
4035 self._log.debug("De-register NSR config for project {}".
4036 format(self._project.name))
4037 if self._nsr_regh:
4038 self._nsr_regh.deregister()
4039 self._nsr_regh = None
4040 if self._scale_regh:
4041 self._scale_regh.deregister()
4042 self._scale_regh = None
4043 if self._key_pair_regh:
4044 self._key_pair_regh.deregister()
4045 self._key_pair_regh = None
4046
4047
4048 class VnfrDtsHandler(object):
4049 """ The virtual network service DTS handler """
4050 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
4051
4052 def __init__(self, dts, log, loop, nsm):
4053 self._dts = dts
4054 self._log = log
4055 self._loop = loop
4056 self._nsm = nsm
4057
4058 self._regh = None
4059
4060 @property
4061 def regh(self):
4062 """ Return registration handle """
4063 return self._regh
4064
4065 @property
4066 def nsm(self):
4067 """ Return the NS manager instance """
4068 return self._nsm
4069
4070 @asyncio.coroutine
4071 def register(self):
4072 """ Register for vnfr create/update/delete/ advises from dts """
4073 if self._regh:
4074 self._log.warning("VNFR DTS handler already registered for project {}".
4075 format(self._project.name))
4076 return
4077
4078 @asyncio.coroutine
4079 def on_prepare(xact_info, action, ks_path, msg):
4080 """ prepare callback from dts """
4081 xpath = ks_path.to_xpath(RwNsrYang.get_schema())
4082 self._log.debug(
4083 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
4084 xact_info, action, ks_path, msg
4085 )
4086
4087 schema = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema()
4088 path_entry = schema.keyspec_to_entry(ks_path)
4089 if not path_entry or (path_entry.key00.id not in self._nsm._vnfrs):
4090 # This can happen when using external RO or after delete with monitoring params
4091 self._log.debug("%s request for non existent record path %s",
4092 action, xpath)
4093 xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath)
4094
4095 return
4096
4097 if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE:
4098 yield from self._nsm.update_vnfr(msg)
4099 elif action == rwdts.QueryAction.DELETE:
4100 self._log.debug("Deleting VNFR with id %s", path_entry.key00.id)
4101
4102 self._nsm.delete_vnfr(path_entry.key00.id)
4103
4104 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath)
4105
4106 self._log.debug("Registering for VNFR using xpath: %s",
4107 VnfrDtsHandler.XPATH)
4108
4109 hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
4110 with self._dts.group_create() as group:
4111 self._regh = group.register(xpath=self._nsm._project.add_project(
4112 VnfrDtsHandler.XPATH),
4113 handler=hdl,
4114 flags=(rwdts.Flag.SUBSCRIBER),)
4115
4116 def deregister(self):
4117 self._log.debug("De-register VNFR for project {}".
4118 format(self._nsm._project.name))
4119 if self._regh:
4120 self._regh.deregister()
4121 self._regh = None
4122
4123 class NsManager(object):
4124 """ The Network Service Manager class"""
4125 def __init__(self, dts, log, loop, project,
4126 nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector,
4127 vnffgmgr, vnfd_pub_handler, cloud_account_handler):
4128 self._dts = dts
4129 self._log = log
4130 self._loop = loop
4131 self._project = project
4132 self._nsr_handler = nsr_handler
4133 self._vnfr_pub_handler = vnfr_handler
4134 self._vlr_pub_handler = vlr_handler
4135 self._vnffgmgr = vnffgmgr
4136 self._vnfd_pub_handler = vnfd_pub_handler
4137 self._cloud_account_handler = cloud_account_handler
4138
4139 self._ro_plugin_selector = ro_plugin_selector
4140
4141 # Intialize the set of variables for implementing Scaling RPC using REST.
4142 self._headers = {"content-type":"application/json", "accept":"application/json"}
4143 self._user = '@rift'
4144 self._password = 'rift'
4145 self._ip = 'localhost'
4146 self._rport = 8008
4147 self._conf_url = "https://{ip}:{port}/api/config/project/{project}". \
4148 format(ip=self._ip,
4149 port=self._rport,
4150 project=self._project.name)
4151
4152 self._nsrs = {}
4153 self._nsds = {}
4154 self._vnfds = {}
4155 self._vnfrs = {}
4156 self._nsr_for_vlr = {}
4157
4158 self.cfgmgr_obj = conman.ROConfigManager(log, loop, dts, self)
4159
4160 # TODO: All these handlers should move to tasklet level.
4161 # Passing self is often an indication of bad design
4162 self._nsd_dts_handler = NsdDtsHandler(dts, log, loop, self)
4163 self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self)
4164 self._dts_handlers = [self._nsd_dts_handler,
4165 VnfrDtsHandler(dts, log, loop, self),
4166 NsrDtsHandler(dts, log, loop, self),
4167 ScalingRpcHandler(log, dts, loop, self, self.scale_rpc_callback),
4168 # NsrRpcDtsHandler(dts, log, loop, self),
4169 self._vnfd_dts_handler,
4170 self.cfgmgr_obj,
4171 ]
4172
4173
4174 @property
4175 def log(self):
4176 """ Log handle """
4177 return self._log
4178
4179 @property
4180 def loop(self):
4181 """ Loop """
4182 return self._loop
4183
4184 @property
4185 def dts(self):
4186 """ DTS handle """
4187 return self._dts
4188
4189 @property
4190 def nsr_handler(self):
4191 """" NSR handler """
4192 return self._nsr_handler
4193
4194 @property
4195 def so_obj(self):
4196 """" So Obj handler """
4197 return self._so_obj
4198
4199 @property
4200 def nsrs(self):
4201 """ NSRs in this NSM"""
4202 return self._nsrs
4203
4204 @property
4205 def nsds(self):
4206 """ NSDs in this NSM"""
4207 return self._nsds
4208
4209 @property
4210 def vnfds(self):
4211 """ VNFDs in this NSM"""
4212 return self._vnfds
4213
4214 @property
4215 def vnfrs(self):
4216 """ VNFRs in this NSM"""
4217 return self._vnfrs
4218
4219 @property
4220 def nsr_pub_handler(self):
4221 """ NSR publication handler """
4222 return self._nsr_handler
4223
4224 @property
4225 def vnfr_pub_handler(self):
4226 """ VNFR publication handler """
4227 return self._vnfr_pub_handler
4228
4229 @property
4230 def vlr_pub_handler(self):
4231 """ VLR publication handler """
4232 return self._vlr_pub_handler
4233
4234 @property
4235 def vnfd_pub_handler(self):
4236 return self._vnfd_pub_handler
4237
4238 @asyncio.coroutine
4239 def register(self):
4240 """ Register all static DTS handlers """
4241 self._log.debug("Register DTS handlers for project {}".format(self._project))
4242 for dts_handle in self._dts_handlers:
4243 if asyncio.iscoroutinefunction(dts_handle.register):
4244 yield from dts_handle.register()
4245 else:
4246 dts_handle.register()
4247
4248 def deregister(self):
4249 """ Register all static DTS handlers """
4250 for dts_handle in self._dts_handlers:
4251 dts_handle.deregister()
4252
4253
4254 def get_ns_by_nsr_id(self, nsr_id):
4255 """ get NSR by nsr id """
4256 if nsr_id not in self._nsrs:
4257 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id)
4258
4259 return self._nsrs[nsr_id]
4260
4261 def scale_nsr_out(self, nsr_id, scale_group_name, instance_id, config_xact):
4262 self.log.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4263 nsr_id,
4264 scale_group_name,
4265 instance_id
4266 )
4267 nsr = self._nsrs[nsr_id]
4268 if nsr.state != NetworkServiceRecordState.RUNNING:
4269 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4270
4271 self._loop.create_task(nsr.create_scale_group_instance(scale_group_name, instance_id, config_xact))
4272
4273 def scale_nsr_in(self, nsr_id, scale_group_name, instance_id):
4274 self.log.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
4275 nsr_id,
4276 scale_group_name,
4277 instance_id,
4278 )
4279 nsr = self._nsrs[nsr_id]
4280 if nsr.state != NetworkServiceRecordState.RUNNING:
4281 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
4282
4283 self._loop.create_task(nsr.delete_scale_group_instance(scale_group_name, instance_id))
4284
4285 def scale_rpc_callback(self, xact, msg, action):
4286 """Callback handler for RPC calls
4287 Args:
4288 xact : Transaction Handler
4289 msg : RPC input
4290 action : Scaling Action
4291 """
4292 def get_scaling_group_information():
4293 scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
4294 output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False)
4295 if output.text is None or len(output.text) == 0:
4296 self.log.error("nsr id %s information not present", self._nsr_id)
4297 return None
4298 scaling_group_info = json.loads(output.text)
4299 return scaling_group_info
4300
4301 def config_scaling_group_information(scaling_group_info):
4302 data_str = json.dumps(scaling_group_info)
4303
4304 scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref)
4305 response = requests.put(scale_out_url, data=data_str, verify=False,
4306 auth=(self._user, self._password), headers=self._headers)
4307 response.raise_for_status()
4308
4309 def scale_out():
4310 scaling_group_info = get_scaling_group_information()
4311 self._log.debug("Scale out info: {}".format(scaling_group_info))
4312 if scaling_group_info is None:
4313 return
4314
4315 scaling_group_present = False
4316 if "scaling-group" in scaling_group_info["nsr:nsr"]:
4317 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
4318 for scaling_group in scaling_group_array:
4319 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
4320 scaling_group_present = True
4321 if 'instance' not in scaling_group:
4322 scaling_group['instance'] = []
4323 for instance in scaling_group['instance']:
4324 if instance["id"] == int(msg.instance_id):
4325 self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id)
4326 return
4327 scaling_group["instance"].append({"id": int(msg.instance_id)})
4328
4329 if not scaling_group_present:
4330 scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref,
4331 "instance": [{"id": msg.instance_id}]}]
4332
4333 config_scaling_group_information(scaling_group_info)
4334 return
4335
4336 def scale_in():
4337 scaling_group_info = get_scaling_group_information()
4338 if scaling_group_info is None:
4339 return
4340
4341 scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"]
4342 scaling_group_present = False
4343 instance_id_present = False
4344 for scaling_group in scaling_group_array:
4345 if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref:
4346 scaling_group_present = True
4347 if 'instance' in scaling_group:
4348 instance_array = scaling_group["instance"];
4349 for index in range(len(instance_array)):
4350 if instance_array[index]["id"] == int(msg.instance_id):
4351 instance_array.pop(index)
4352 instance_id_present = True
4353 break
4354
4355 if not scaling_group_present:
4356 self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref)
4357 return
4358
4359 if not instance_id_present:
4360 self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id)
4361 return
4362
4363 config_scaling_group_information(scaling_group_info)
4364 return
4365
4366 if action == ScalingRpcHandler.ACTION.SCALE_OUT:
4367 self._loop.run_in_executor(None, scale_out)
4368 else:
4369 self._loop.run_in_executor(None, scale_in)
4370
4371 def nsr_update_cfg(self, nsr_id, msg):
4372 nsr = self._nsrs[nsr_id]
4373 nsr.nsr_cfg_msg= msg
4374
4375 def nsr_instantiate_vl(self, nsr_id, vld):
4376 self.log.error("NSR {} create VL {}".format(nsr_id, vld))
4377 nsr = self._nsrs[nsr_id]
4378 if nsr.state != NetworkServiceRecordState.RUNNING:
4379 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
4380
4381 # Not calling in a separate task as this is called from a separate task
4382 yield from nsr.create_vl_instance(vld)
4383
4384 def nsr_terminate_vl(self, nsr_id, vld):
4385 self.log.debug("NSR {} delete VL {}".format(nsr_id, vld.id))
4386 nsr = self._nsrs[nsr_id]
4387 if nsr.state != NetworkServiceRecordState.RUNNING:
4388 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
4389
4390 # Not calling in a separate task as this is called from a separate task
4391 yield from nsr.delete_vl_instance(vld)
4392
4393 @asyncio.coroutine
4394 def create_nsr(self, nsr_msg, config_xact, key_pairs=None,restart_mode=False):
4395 """ Create an NSR instance """
4396 self._log.debug("NSRMSG %s", nsr_msg)
4397 if nsr_msg.id in self._nsrs:
4398 msg = "NSR id %s already exists" % nsr_msg.id
4399 self._log.error(msg)
4400 raise NetworkServiceRecordError(msg)
4401
4402 self._log.debug("Create NetworkServiceRecord nsr id %s from nsd_id %s",
4403 nsr_msg.id,
4404 nsr_msg.nsd.id)
4405
4406 nsm_plugin = self._ro_plugin_selector.get_ro_plugin(nsr_msg.resource_orchestrator)
4407 #Work Around - openmano expects datacenter id instead of datacenter name
4408 if isinstance(nsm_plugin, openmano_nsm.OpenmanoNsPlugin):
4409 for uuid, name in nsm_plugin._cli_api.datacenter_list():
4410 if name == nsr_msg.datacenter:
4411 nsr_msg.datacenter = uuid
4412
4413 sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.datacenter)
4414
4415 nsr = NetworkServiceRecord(self._dts,
4416 self._log,
4417 self._loop,
4418 self,
4419 nsm_plugin,
4420 nsr_msg,
4421 sdn_account_name,
4422 key_pairs,
4423 self._project,
4424 restart_mode=restart_mode,
4425 vlr_handler=self._vlr_pub_handler
4426 )
4427 self._nsrs[nsr_msg.id] = nsr
4428
4429 try:
4430 # Generate ssh key pair if required
4431 nsr.generate_ssh_key_pair(config_xact)
4432 except Exception as e:
4433 self._log.exception("SSH key: {}".format(e))
4434
4435 self._log.debug("NSR {}: SSh key generated: {}".format(nsr_msg.name,
4436 nsr.public_key))
4437
4438 ssh_key = {'private_key': nsr.private_key,
4439 'public_key': nsr.public_key
4440 }
4441
4442 nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs, ssh_key=ssh_key)
4443
4444 return nsr
4445
4446 def delete_nsr(self, nsr_id):
4447 """
4448 Delete NSR with the passed nsr id
4449 """
4450 del self._nsrs[nsr_id]
4451
4452 @asyncio.coroutine
4453 def instantiate_ns(self, nsr_id, config_xact):
4454 """ Instantiate an NS instance """
4455 self._log.debug("Instantiating Network service id %s", nsr_id)
4456 if nsr_id not in self._nsrs:
4457 err = "NSR id %s not found " % nsr_id
4458 self._log.error(err)
4459 raise NetworkServiceRecordError(err)
4460
4461 nsr = self._nsrs[nsr_id]
4462 try:
4463 yield from nsr.nsm_plugin.instantiate_ns(nsr, config_xact)
4464 except Exception as e:
4465 self._log.exception("NS instantiate: {}".format(e))
4466 raise e
4467
4468 @asyncio.coroutine
4469 def update_vnfr(self, vnfr):
4470 """Create/Update an VNFR """
4471
4472 vnfr_state = self._vnfrs[vnfr.id].state
4473 self._log.debug("Updating VNFR with state %s: vnfr %s", vnfr_state, vnfr)
4474
4475 no_of_active_vms = 0
4476 for vdur in vnfr.vdur:
4477 if vdur.operational_status == 'running':
4478 no_of_active_vms += 1
4479
4480 self._vnfrs[vnfr.id]._active_vdus = no_of_active_vms
4481 yield from self._vnfrs[vnfr.id].update_state(vnfr)
4482 nsr = self.find_nsr_for_vnfr(vnfr.id)
4483 if nsr is not None:
4484 nsr._vnf_inst_started = False
4485 yield from nsr.update_state()
4486
4487 def find_nsr_for_vnfr(self, vnfr_id):
4488 """ Find the NSR which )has the passed vnfr id"""
4489 for nsr in list(self.nsrs.values()):
4490 for vnfr in list(nsr.vnfrs.values()):
4491 if vnfr.id == vnfr_id:
4492 return nsr
4493 return None
4494
4495 def delete_vnfr(self, vnfr_id):
4496 """ Delete VNFR with the passed id"""
4497 del self._vnfrs[vnfr_id]
4498
4499 @asyncio.coroutine
4500 def get_nsr_config(self, nsd_id):
4501 xpath = self._project.add_project("C,/nsr:ns-instance-config")
4502 results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE)
4503
4504 for result in results:
4505 entry = yield from result
4506 ns_instance_config = entry.result
4507
4508 for nsr in ns_instance_config.nsr:
4509 if nsr.nsd.id == nsd_id:
4510 return nsr
4511
4512 return None
4513
4514 def get_nsd(self, nsd_id):
4515 """ Get network service descriptor for the passed nsd_id"""
4516 if nsd_id not in self._nsds:
4517 self._log.error("Cannot find NSD id:%s", nsd_id)
4518 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id)
4519
4520 return self._nsds[nsd_id]
4521
4522 def create_nsd(self, nsd_msg):
4523 """ Create a network service descriptor """
4524 self._log.debug("Create network service descriptor - %s", nsd_msg)
4525 if nsd_msg.id in self._nsds:
4526 self._log.error("Cannot create NSD %s -NSD ID already exists", nsd_msg)
4527 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg.id)
4528
4529 nsd = NetworkServiceDescriptor(
4530 self._dts,
4531 self._log,
4532 self._loop,
4533 nsd_msg,
4534 self
4535 )
4536 self._nsds[nsd_msg.id] = nsd
4537
4538 return nsd
4539
4540 def update_nsd(self, nsd):
4541 """ update the Network service descriptor """
4542 self._log.debug("Update network service descriptor - %s", nsd)
4543 if nsd.id not in self._nsds:
4544 self._log.debug("No NSD found - creating NSD id = %s", nsd.id)
4545 self.create_nsd(nsd)
4546 else:
4547 self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd)
4548 self._nsds[nsd.id].update(nsd)
4549
4550 def delete_nsd(self, nsd_id):
4551 """ Delete the Network service descriptor with the passed id """
4552 self._log.debug("Deleting the network service descriptor - %s", nsd_id)
4553 if nsd_id not in self._nsds:
4554 self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id)
4555 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id)
4556 del self._nsds[nsd_id]
4557
4558 def get_vnfd_config(self, xact):
4559 vnfd_dts_reg = self._vnfd_dts_handler.regh
4560 for cfg in vnfd_dts_reg.get_xact_elements(xact):
4561 if cfg.id not in self._vnfds:
4562 self.create_vnfd(cfg)
4563
4564 def get_vnfd(self, vnfd_id, xact):
4565 """ Get virtual network function descriptor for the passed vnfd_id"""
4566 if vnfd_id not in self._vnfds:
4567 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4568 self.get_vnfd_config(xact)
4569
4570 if vnfd_id not in self._vnfds:
4571 self._log.error("Cannot find VNFD id:%s", vnfd_id)
4572 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id)
4573
4574 return self._vnfds[vnfd_id]
4575
4576 def create_vnfd(self, vnfd):
4577 """ Create a virtual network function descriptor """
4578 self._log.debug("Create virtual network function descriptor - %s", vnfd)
4579 if vnfd.id in self._vnfds:
4580 self._log.error("Cannot create VNFD %s -VNFD ID already exists", vnfd)
4581 raise VnfDescriptorError("VNFD already exists-%s", vnfd.id)
4582
4583 self._vnfds[vnfd.id] = vnfd
4584 return self._vnfds[vnfd.id]
4585
4586 def update_vnfd(self, vnfd):
4587 """ Update the virtual network function descriptor """
4588 self._log.debug("Update virtual network function descriptor- %s", vnfd)
4589
4590
4591 if vnfd.id not in self._vnfds:
4592 self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id)
4593 self.create_vnfd(vnfd)
4594 else:
4595 self._log.debug("Updating VNFD id = %s, vnfd = %s", vnfd.id, vnfd)
4596 self._vnfds[vnfd.id] = vnfd
4597
4598 @asyncio.coroutine
4599 def delete_vnfd(self, vnfd_id):
4600 """ Delete the virtual network function descriptor with the passed id """
4601 self._log.debug("Deleting the virtual network function descriptor - %s", vnfd_id)
4602 if vnfd_id not in self._vnfds:
4603 self._log.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id)
4604 raise VnfDescriptorError("Cannot find %s", vnfd_id)
4605
4606 del self._vnfds[vnfd_id]
4607
4608 @asyncio.coroutine
4609 def publish_nsr(self, xact, path, msg):
4610 """ Publish a NSR """
4611 self._log.debug("Publish NSR with path %s, msg %s",
4612 path, msg)
4613 yield from self.nsr_handler.update(xact, path, msg)
4614
4615 @asyncio.coroutine
4616 def unpublish_nsr(self, xact, path):
4617 """ Un Publish an NSR """
4618 self._log.debug("Publishing delete NSR with path %s", path)
4619 yield from self.nsr_handler.delete(path, xact)
4620
4621 def vnfr_is_ready(self, vnfr_id):
4622 """ VNFR with the id is ready """
4623 self._log.debug("VNFR id %s ready", vnfr_id)
4624 if vnfr_id not in self._vnfds:
4625 err = "Did not find VNFR ID with id %s" % vnfr_id
4626 self._log.critical("err")
4627 raise VirtualNetworkFunctionRecordError(err)
4628 self._vnfrs[vnfr_id].is_ready()
4629
4630
4631 @asyncio.coroutine
4632 def terminate_ns(self, nsr_id, xact):
4633 """
4634 Terminate network service for the given NSR Id
4635 """
4636
4637 if nsr_id not in self._nsrs:
4638 return
4639
4640 # Terminate the instances/networks assocaited with this nw service
4641 self._log.debug("Terminating the network service %s", nsr_id)
4642 try :
4643 yield from self._nsrs[nsr_id].terminate()
4644 except Exception as e:
4645 self.log.exception("Failed to terminate NSR[id=%s]", nsr_id)
4646
4647 def vlr_event(self, vlr, action):
4648 self._log.debug("Received VLR %s with action:%s", vlr, action)
4649 # Find the NS and see if we can proceed
4650 nsr = self.find_nsr_for_vlr_id(vlr.id)
4651 if nsr is None:
4652 self._log.error("VLR %s:%s received for NSR, state:%s",
4653 vlr.id, vlr.name, vlr.operational_status)
4654 return
4655 nsr.vlr_event(vlr, action)
4656
4657 def add_vlr_id_nsr_map(self, vlr_id, nsr):
4658 """ Add a mapping for vlr_id into NSR """
4659 self._nsr_for_vlr[vlr_id] = nsr
4660
4661 def remove_vlr_id_nsr_map(self, vlr_id):
4662 """ Remove a mapping for vlr_id into NSR """
4663 if vlr_id in self._nsr_for_vlr:
4664 del self._nsr_for_vlr[vlr_id]
4665
4666 def find_nsr_for_vlr_id(self, vlr_id):
4667 """ Find NSR for VLR id """
4668 nsr = None
4669 if vlr_id in self._nsr_for_vlr:
4670 nsr = self._nsr_for_vlr[vlr_id]
4671 return nsr
4672
4673
4674 class NsmRecordsPublisherProxy(object):
4675 """ This class provides a publisher interface that allows plugin objects
4676 to publish NSR/VNFR/VLR"""
4677
4678 def __init__(self, dts, log, loop, project, nsr_pub_hdlr,
4679 vnfr_pub_hdlr, vlr_pub_hdlr,):
4680 self._dts = dts
4681 self._log = log
4682 self._loop = loop
4683 self._project = project
4684 self._nsr_pub_hdlr = nsr_pub_hdlr
4685 self._vlr_pub_hdlr = vlr_pub_hdlr
4686 self._vnfr_pub_hdlr = vnfr_pub_hdlr
4687
4688 @asyncio.coroutine
4689 def publish_nsr_opdata(self, xact, nsr):
4690 """ Publish an NSR """
4691 path = ("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref={}]"
4692 ).format(quoted_key(nsr.ns_instance_config_ref))
4693 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4694
4695 @asyncio.coroutine
4696 def publish_nsr(self, xact, nsr):
4697 """ Publish an NSR """
4698 path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr))
4699 return (yield from self._nsr_pub_hdlr.update(xact, path, nsr))
4700
4701 @asyncio.coroutine
4702 def unpublish_nsr(self, xact, nsr):
4703 """ Unpublish an NSR """
4704 path = self._project.add_project(NetworkServiceRecord.xpath_from_nsr(nsr))
4705 return (yield from self._nsr_pub_hdlr.delete(xact, path))
4706
4707 @asyncio.coroutine
4708 def publish_vnfr(self, xact, vnfr):
4709 """ Publish an VNFR """
4710 path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr))
4711 return (yield from self._vnfr_pub_hdlr.update(xact, path, vnfr))
4712
4713 @asyncio.coroutine
4714 def unpublish_vnfr(self, xact, vnfr):
4715 """ Unpublish a VNFR """
4716 path = self._project.add_project(VirtualNetworkFunctionRecord.vnfr_xpath(vnfr))
4717 yield from self._vnfr_pub_hdlr.delete(xact, path)
4718 # NOTE: The regh delete does not send the on_prepare to VNFM tasklet as well
4719 # as remove all the VNFR elements. So need to send this additional delete block.
4720 with self._dts.transaction(flags = 0) as xact:
4721 block = xact.block_create()
4722 block.add_query_delete(path)
4723 yield from block.execute(flags=0, now=True)
4724
4725 @asyncio.coroutine
4726 def publish_vlr(self, xact, vlr):
4727 """ Publish a VLR """
4728 path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr))
4729 return (yield from self._vlr_pub_hdlr.update(xact, path, vlr))
4730
4731 @asyncio.coroutine
4732 def unpublish_vlr(self, xact, vlr):
4733 """ Unpublish a VLR """
4734 path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr))
4735 return (yield from self._vlr_pub_hdlr.delete(xact, path))
4736
4737 class ScalingRpcHandler(mano_dts.DtsHandler):
4738 """ The Network service Monitor DTS handler """
4739 SCALE_IN_INPUT_XPATH = "I,/nsr:exec-scale-in"
4740 SCALE_IN_OUTPUT_XPATH = "O,/nsr:exec-scale-in"
4741
4742 SCALE_OUT_INPUT_XPATH = "I,/nsr:exec-scale-out"
4743 SCALE_OUT_OUTPUT_XPATH = "O,/nsr:exec-scale-out"
4744
4745 ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT')
4746
4747 def __init__(self, log, dts, loop, nsm, callback=None):
4748 super().__init__(log, dts, loop, nsm._project)
4749 self._nsm = nsm
4750 self.callback = callback
4751 self.last_instance_id = defaultdict(int)
4752
4753 self._reg_in = None
4754 self._reg_out = None
4755
4756 @asyncio.coroutine
4757 def register(self):
4758
4759 def send_err_msg(err_msg, xact_info, ks_path, e=False):
4760 xpath = ks_path.to_xpath(NsrYang.get_schema())
4761 if e:
4762 self._log.exception(err_msg)
4763 else:
4764 self._log.error(err_msg)
4765 xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE,
4766 xpath,
4767 err_msg)
4768 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
4769
4770 @asyncio.coroutine
4771 def on_scale_in_prepare(xact_info, action, ks_path, msg):
4772 assert action == rwdts.QueryAction.RPC
4773
4774 self._log.debug("Scale in called: {}".format(msg.as_dict()))
4775 if not self.project.rpc_check(msg, xact_info):
4776 return
4777
4778 try:
4779 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({
4780 "instance_id": msg.instance_id})
4781
4782 nsr = self._nsm.nsrs[msg.nsr_id_ref]
4783 if nsr.state != NetworkServiceRecordState.RUNNING:
4784 errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
4785 format(nsr.name, nsr.id))
4786 send_err_msg(errmsg, xact_info, ks_path)
4787 return
4788
4789 xact_info.respond_xpath(
4790 rwdts.XactRspCode.ACK,
4791 self.__class__.SCALE_IN_OUTPUT_XPATH,
4792 rpc_op)
4793
4794 if self.callback:
4795 self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN)
4796
4797 except Exception as e:
4798 errmsg = ("Exception doing scale in using {}: {}".
4799 format(msg, e))
4800 send_err_msg(errmsg, xact_info, ks_path, e=True)
4801
4802 @asyncio.coroutine
4803 def on_scale_out_prepare(xact_info, action, ks_path, msg):
4804 assert action == rwdts.QueryAction.RPC
4805
4806 self._log.debug("Scale out called: {}".format(msg.as_dict()))
4807 if not self.project.rpc_check(msg, xact_info):
4808 return
4809
4810 try:
4811 scaling_group = msg.scaling_group_name_ref
4812 if not msg.instance_id:
4813 last_instance_id = self.last_instance_id[scale_group]
4814 msg.instance_id = last_instance_id + 1
4815 self.last_instance_id[scale_group] += 1
4816
4817 nsr = self._nsm.nsrs[msg.nsr_id_ref]
4818 if nsr.state != NetworkServiceRecordState.RUNNING:
4819 errmsg = ("Unable to perform scaling action when NS {}({}) not in running state".
4820 format(nsr.name, nsr.id))
4821 send_err_msg(errmsg, xact_info, ks_path)
4822 return
4823
4824 rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({
4825 "instance_id": msg.instance_id})
4826
4827 xact_info.respond_xpath(
4828 rwdts.XactRspCode.ACK,
4829 self.__class__.SCALE_OUT_OUTPUT_XPATH,
4830 rpc_op)
4831
4832 if self.callback:
4833 self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT)
4834
4835 except Exception as e:
4836 errmsg = ("Exception doing scale in using {}: {}".
4837 format(msg, e))
4838 send_err_msg(errmsg, xact_info, ks_path, e=True)
4839
4840 self._reg_in = yield from self.dts.register(
4841 xpath=self.__class__.SCALE_IN_INPUT_XPATH,
4842 handler=rift.tasklets.DTS.RegistrationHandler(
4843 on_prepare=on_scale_in_prepare),
4844 flags=rwdts.Flag.PUBLISHER)
4845
4846 self._reg_out = yield from self.dts.register(
4847 xpath=self.__class__.SCALE_OUT_INPUT_XPATH,
4848 handler=rift.tasklets.DTS.RegistrationHandler(
4849 on_prepare=on_scale_out_prepare),
4850 flags=rwdts.Flag.PUBLISHER)
4851
4852 def deregister(self):
4853 if self._reg_in:
4854 self._reg_in.deregister()
4855 self._reg_in = None
4856
4857 if self._reg_out:
4858 self._reg_out.deregister()
4859 self._reg_out = None
4860
4861
4862 class NsmProject(ManoProject):
4863
4864 def __init__(self, name, tasklet, **kw):
4865 super(NsmProject, self).__init__(tasklet.log, name)
4866 self.update(tasklet)
4867 self._nsm = None
4868
4869 self._ro_plugin_selector = None
4870 self._vnffgmgr = None
4871
4872 self._nsr_pub_handler = None
4873 self._vnfr_pub_handler = None
4874 self._vlr_pub_handler = None
4875 self._vnfd_pub_handler = None
4876 self._scale_cfg_handler = None
4877
4878 self._records_publisher_proxy = None
4879
4880 def vlr_event(self, vlr, action):
4881 """ VLR Event callback """
4882 self.log.debug("VLR Event received for VLR %s with action %s", vlr, action)
4883 self._nsm.vlr_event(vlr, action)
4884
4885 @asyncio.coroutine
4886 def register(self):
4887 self.log.debug("Register NsmProject for {}".format(self.name))
4888
4889 self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(
4890 self._dts, self.log, self.loop, self)
4891 yield from self._nsr_pub_handler.register()
4892
4893 self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(
4894 self._dts, self.log, self.loop, self)
4895 yield from self._vnfr_pub_handler.register()
4896
4897 self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(
4898 self._dts, self.log, self.loop, self)
4899 yield from self._vlr_pub_handler.register()
4900
4901 self._vlr_sub_handler = subscriber.VlrSubscriberDtsHandler(self.log,
4902 self._dts,
4903 self.loop,
4904 self,
4905 self.vlr_event,
4906 )
4907 yield from self._vlr_sub_handler.register()
4908
4909 manifest = self._tasklet.tasklet_info.get_pb_manifest()
4910 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
4911 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
4912 ssl_key = manifest.bootstrap_phase.rwsecurity.key
4913
4914 self._vnfd_pub_handler = publisher.VnfdPublisher(
4915 use_ssl, ssl_cert, ssl_key, self.loop, self)
4916
4917 self._records_publisher_proxy = NsmRecordsPublisherProxy(
4918 self._dts,
4919 self.log,
4920 self.loop,
4921 self,
4922 self._nsr_pub_handler,
4923 self._vnfr_pub_handler,
4924 self._vlr_pub_handler,
4925 )
4926
4927 # Register the NSM to receive the nsm plugin
4928 # when cloud account is configured
4929 self._ro_plugin_selector = cloud.ROAccountConfigSubscriber(
4930 self._dts,
4931 self.log,
4932 self.loop,
4933 self,
4934 self._records_publisher_proxy
4935 )
4936 yield from self._ro_plugin_selector.register()
4937
4938 self._cloud_account_handler = cloud.CloudAccountConfigSubscriber(
4939 self._log,
4940 self._dts,
4941 self.log_hdl,
4942 self,
4943 )
4944
4945 yield from self._cloud_account_handler.register()
4946
4947 self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop,
4948 self, self._cloud_account_handler)
4949 yield from self._vnffgmgr.register()
4950
4951 self._nsm = NsManager(
4952 self._dts,
4953 self.log,
4954 self.loop,
4955 self,
4956 self._nsr_pub_handler,
4957 self._vnfr_pub_handler,
4958 self._vlr_pub_handler,
4959 self._ro_plugin_selector,
4960 self._vnffgmgr,
4961 self._vnfd_pub_handler,
4962 self._cloud_account_handler,
4963 )
4964
4965 yield from self._nsm.register()
4966 self.log.debug("Register NsmProject for {} complete".format(self.name))
4967
4968 def deregister(self):
4969 self._log.debug("Project {} de-register".format(self.name))
4970 self._nsm.deregister()
4971 self._vnffgmgr.deregister()
4972 self._cloud_account_handler.deregister()
4973 self._ro_plugin_selector.deregister()
4974 self._nsr_pub_handler.deregister()
4975 self._vnfr_pub_handler.deregister()
4976 self._vlr_pub_handler.deregister()
4977 self._vlr_sub_handler.deregister()
4978 self._nsm = None
4979
4980 @asyncio.coroutine
4981 def delete_prepare(self):
4982 if self._nsm and self._nsm._nsrs:
4983 delete_msg = "Project has NSR associated with it. Delete all Project NSR and try again."
4984 return False, delete_msg
4985 return True, "True"
4986
4987
4988 class NsmTasklet(rift.tasklets.Tasklet):
4989 """
4990 The network service manager tasklet
4991 """
4992 def __init__(self, *args, **kwargs):
4993 super(NsmTasklet, self).__init__(*args, **kwargs)
4994 self.rwlog.set_category("rw-mano-log")
4995 self.rwlog.set_subcategory("nsm")
4996
4997 self._dts = None
4998 self.project_handler = None
4999 self.projects = {}
5000
5001 @property
5002 def dts(self):
5003 return self._dts
5004
5005 def start(self):
5006 """ The task start callback """
5007 super(NsmTasklet, self).start()
5008 self.log.info("Starting NsmTasklet")
5009
5010 self.log.debug("Registering with dts")
5011 self._dts = rift.tasklets.DTS(self.tasklet_info,
5012 RwNsmYang.get_schema(),
5013 self.loop,
5014 self.on_dts_state_change)
5015
5016 self.log.debug("Created DTS Api GI Object: %s", self._dts)
5017
5018 def stop(self):
5019 try:
5020 self._dts.deinit()
5021 except Exception:
5022 print("Caught Exception in NSM stop:", sys.exc_info()[0])
5023 raise
5024
5025 def on_instance_started(self):
5026 """ Task instance started callback """
5027 self.log.debug("Got instance started callback")
5028
5029 @asyncio.coroutine
5030 def init(self):
5031 """ Task init callback """
5032 self.log.debug("Got instance started callback")
5033
5034 self.log.debug("creating project handler")
5035 self.project_handler = ProjectHandler(self, NsmProject)
5036 self.project_handler.register()
5037
5038
5039
5040 @asyncio.coroutine
5041 def run(self):
5042 """ Task run callback """
5043 pass
5044
5045 @asyncio.coroutine
5046 def on_dts_state_change(self, state):
5047 """Take action according to current dts state to transition
5048 application into the corresponding application state
5049
5050 Arguments
5051 state - current dts state
5052 """
5053 switch = {
5054 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
5055 rwdts.State.CONFIG: rwdts.State.RUN,
5056 }
5057
5058 handlers = {
5059 rwdts.State.INIT: self.init,
5060 rwdts.State.RUN: self.run,
5061 }
5062
5063 # Transition application to next state
5064 handler = handlers.get(state, None)
5065 if handler is not None:
5066 yield from handler()
5067
5068 # Transition dts to next state
5069 next_state = switch.get(state, None)
5070 if next_state is not None:
5071 self.log.debug("Changing state to %s", next_state)
5072 self._dts.handle.set_state(next_state)