fix/feat(relations): external connection point ref now works with multiple KDU
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211
212 try:
213
214 nsr_id = filter.get('_id')
215
216 # read ns record from database
217 nsr = self.db.get_one(table='nsrs', q_filter=filter)
218 current_ns_status = nsr.get('nsState')
219
220 # get vca status for NS
221 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
222
223 # vcaStatus
224 db_dict = dict()
225 db_dict['vcaStatus'] = status_dict
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 @staticmethod
293 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
294 try:
295 env = Environment(undefined=StrictUndefined)
296 template = env.from_string(cloud_init_text)
297 return template.render(additional_params or {})
298 except UndefinedError as e:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
302 except (TemplateError, TemplateNotFound) as e:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id, vdu_id, e))
305
306 def _get_vdu_cloud_init_content(self, vdu, vnfd):
307 cloud_init_content = cloud_init_file = None
308 try:
309 if vdu.get("cloud-init-file"):
310 base_folder = vnfd["_admin"]["storage"]
311 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
312 vdu["cloud-init-file"])
313 with self.fs.file_open(cloud_init_file, "r") as ci_file:
314 cloud_init_content = ci_file.read()
315 elif vdu.get("cloud-init"):
316 cloud_init_content = vdu["cloud-init"]
317
318 return cloud_init_content
319 except FsException as e:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd["id"], vdu["id"], cloud_init_file, e))
322
323 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
324 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
325 additional_params = vdur.get("additionalParams")
326 return parse_yaml_strings(additional_params)
327
328 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
329 """
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
336 """
337 vnfd_RO = deepcopy(vnfd)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO.pop("_id", None)
340 vnfd_RO.pop("_admin", None)
341 vnfd_RO.pop("monitoring-param", None)
342 vnfd_RO.pop("scaling-group-descriptor", None)
343 vnfd_RO.pop("kdu", None)
344 vnfd_RO.pop("k8s-cluster", None)
345 if new_id:
346 vnfd_RO["id"] = new_id
347
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu in get_iterable(vnfd_RO, "vdu"):
350 vdu.pop("cloud-init-file", None)
351 vdu.pop("cloud-init", None)
352 return vnfd_RO
353
354 @staticmethod
355 def ip_profile_2_RO(ip_profile):
356 RO_ip_profile = deepcopy(ip_profile)
357 if "dns-server" in RO_ip_profile:
358 if isinstance(RO_ip_profile["dns-server"], list):
359 RO_ip_profile["dns-address"] = []
360 for ds in RO_ip_profile.pop("dns-server"):
361 RO_ip_profile["dns-address"].append(ds['address'])
362 else:
363 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
364 if RO_ip_profile.get("ip-version") == "ipv4":
365 RO_ip_profile["ip-version"] = "IPv4"
366 if RO_ip_profile.get("ip-version") == "ipv6":
367 RO_ip_profile["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile:
369 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
370 return RO_ip_profile
371
372 def _get_ro_vim_id_for_vim_account(self, vim_account):
373 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
374 if db_vim["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account, db_vim["_admin"]["operationalState"]))
377 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
378 return RO_vim_id
379
380 def get_ro_wim_id_for_wim_account(self, wim_account):
381 if isinstance(wim_account, str):
382 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
383 if db_wim["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account, db_wim["_admin"]["operationalState"]))
386 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
387 return RO_wim_id
388 else:
389 return wim_account
390
391 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
392
393 db_vdu_push_list = []
394 db_update = {"_admin.modified": time()}
395 if vdu_create:
396 for vdu_id, vdu_count in vdu_create.items():
397 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
398 if not vdur:
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
400 format(vdu_id))
401
402 for count in range(vdu_count):
403 vdur_copy = deepcopy(vdur)
404 vdur_copy["status"] = "BUILD"
405 vdur_copy["status-detailed"] = None
406 vdur_copy["ip-address"]: None
407 vdur_copy["_id"] = str(uuid4())
408 vdur_copy["count-index"] += count + 1
409 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
410 vdur_copy.pop("vim_info", None)
411 for iface in vdur_copy["interfaces"]:
412 if iface.get("fixed-ip"):
413 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
414 else:
415 iface.pop("ip-address", None)
416 if iface.get("fixed-mac"):
417 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
418 else:
419 iface.pop("mac-address", None)
420 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list.append(vdur_copy)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
423 if vdu_delete:
424 for vdu_id, vdu_count in vdu_delete.items():
425 if mark_delete:
426 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
427 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
428 else:
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
431 for vdu in vdus_to_delete[:vdu_count]:
432 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
433 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
434 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
435 # modify passed dictionary db_vnfr
436 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
437 db_vnfr["vdur"] = db_vnfr_["vdur"]
438
439 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
440 """
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
446 """
447
448 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
449 for net_RO in get_iterable(nsr_desc_RO, "nets"):
450 if vld["id"] != net_RO.get("ns_net_osm_id"):
451 continue
452 vld["vim-id"] = net_RO.get("vim_net_id")
453 vld["name"] = net_RO.get("vim_name")
454 vld["status"] = net_RO.get("status")
455 vld["status-detailed"] = net_RO.get("error_msg")
456 ns_update_nsr["vld.{}".format(vld_index)] = vld
457 break
458 else:
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
460
461 def set_vnfr_at_error(self, db_vnfrs, error_text):
462 try:
463 for db_vnfr in db_vnfrs.values():
464 vnfr_update = {"status": "ERROR"}
465 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
466 if "status" not in vdur:
467 vdur["status"] = "ERROR"
468 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
469 if error_text:
470 vdur["status-detailed"] = str(error_text)
471 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
472 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
473 except DbException as e:
474 self.logger.error("Cannot update vnf. {}".format(e))
475
476 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
477 """
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
482 """
483 for vnf_index, db_vnfr in db_vnfrs.items():
484 for vnf_RO in nsr_desc_RO["vnfs"]:
485 if vnf_RO["member_vnf_index"] != vnf_index:
486 continue
487 vnfr_update = {}
488 if vnf_RO.get("ip_address"):
489 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
490 elif not db_vnfr.get("ip-address"):
491 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
493
494 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
495 vdur_RO_count_index = 0
496 if vdur.get("pdu-type"):
497 continue
498 for vdur_RO in get_iterable(vnf_RO, "vms"):
499 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
500 continue
501 if vdur["count-index"] != vdur_RO_count_index:
502 vdur_RO_count_index += 1
503 continue
504 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
505 if vdur_RO.get("ip_address"):
506 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
507 else:
508 vdur["ip-address"] = None
509 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
510 vdur["name"] = vdur_RO.get("vim_name")
511 vdur["status"] = vdur_RO.get("status")
512 vdur["status-detailed"] = vdur_RO.get("error_msg")
513 for ifacer in get_iterable(vdur, "interfaces"):
514 for interface_RO in get_iterable(vdur_RO, "interfaces"):
515 if ifacer["name"] == interface_RO.get("internal_name"):
516 ifacer["ip-address"] = interface_RO.get("ip_address")
517 ifacer["mac-address"] = interface_RO.get("mac_address")
518 break
519 else:
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
521 "from VIM info"
522 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
523 vnfr_update["vdur.{}".format(vdu_index)] = vdur
524 break
525 else:
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
528
529 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
530 for net_RO in get_iterable(nsr_desc_RO, "nets"):
531 if vld["id"] != net_RO.get("vnf_net_osm_id"):
532 continue
533 vld["vim-id"] = net_RO.get("vim_net_id")
534 vld["name"] = net_RO.get("vim_name")
535 vld["status"] = net_RO.get("status")
536 vld["status-detailed"] = net_RO.get("error_msg")
537 vnfr_update["vld.{}".format(vld_index)] = vld
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index, vld["id"]))
542
543 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
544 break
545
546 else:
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
548
549 def _get_ns_config_info(self, nsr_id):
550 """
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
556 """
557 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
558 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
559 mapping = {}
560 ns_config_info = {"osm-config-mapping": mapping}
561 for vca in vca_deployed_list:
562 if not vca["member-vnf-index"]:
563 continue
564 if not vca["vdu_id"]:
565 mapping[vca["member-vnf-index"]] = vca["application"]
566 else:
567 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
568 vca["application"]
569 return ns_config_info
570
571 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
572 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
573
574 db_vims = {}
575
576 def get_vim_account(vim_account_id):
577 nonlocal db_vims
578 if vim_account_id in db_vims:
579 return db_vims[vim_account_id]
580 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
581 db_vims[vim_account_id] = db_vim
582 return db_vim
583
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
586 if vld_params.get("ip-profile"):
587 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
588 if vld_params.get("provider-network"):
589 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
590 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
591 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
592 if vld_params.get("wimAccountId"):
593 target_wim = "wim:{}".format(vld_params["wimAccountId"])
594 target_vld["vim_info"][target_wim] = {}
595 for param in ("vim-network-name", "vim-network-id"):
596 if vld_params.get(param):
597 if isinstance(vld_params[param], dict):
598 for vim, vim_net in vld_params[param].items():
599 other_target_vim = "vim:" + vim
600 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
601 else: # isinstance str
602 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
603 if vld_params.get("common_id"):
604 target_vld["common_id"] = vld_params.get("common_id")
605
606 nslcmop_id = db_nslcmop["_id"]
607 target = {
608 "name": db_nsr["name"],
609 "ns": {"vld": []},
610 "vnf": [],
611 "image": deepcopy(db_nsr["image"]),
612 "flavor": deepcopy(db_nsr["flavor"]),
613 "action_id": nslcmop_id,
614 "cloud_init_content": {},
615 }
616 for image in target["image"]:
617 image["vim_info"] = {}
618 for flavor in target["flavor"]:
619 flavor["vim_info"] = {}
620
621 if db_nslcmop.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params = db_nslcmop_instantiate.get("operationParams")
626 else:
627 ns_params = db_nslcmop.get("operationParams")
628 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
629 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
630
631 cp2target = {}
632 for vld_index, vld in enumerate(db_nsr.get("vld")):
633 target_vim = "vim:{}".format(ns_params["vimAccountId"])
634 target_vld = {
635 "id": vld["id"],
636 "name": vld["name"],
637 "mgmt-network": vld.get("mgmt-network", False),
638 "type": vld.get("type"),
639 "vim_info": {
640 target_vim: {
641 "vim_network_name": vld.get("vim-network-name"),
642 "vim_account_id": ns_params["vimAccountId"]
643 }
644 }
645 }
646 # check if this network needs SDN assist
647 if vld.get("pci-interfaces"):
648 db_vim = get_vim_account(ns_params["vimAccountId"])
649 sdnc_id = db_vim["config"].get("sdn-controller")
650 if sdnc_id:
651 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
652 target_sdn = "sdn:{}".format(sdnc_id)
653 target_vld["vim_info"][target_sdn] = {
654 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
655
656 nsd_vnf_profiles = get_vnf_profiles(nsd)
657 for nsd_vnf_profile in nsd_vnf_profiles:
658 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
659 if cp["virtual-link-profile-id"] == vld["id"]:
660 cp2target["member_vnf:{}.{}".format(
661 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
662 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
663 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
664
665 # check at nsd descriptor, if there is an ip-profile
666 vld_params = {}
667 nsd_vlp = find_in_list(
668 get_virtual_link_profiles(nsd),
669 lambda a_link_profile: a_link_profile["virtual-link-desc-id"] == vld["id"])
670 if nsd_vlp and nsd_vlp.get("virtual-link-protocol-data") and \
671 nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
672 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
673 ip_profile_dest_data = {}
674 if "ip-version" in ip_profile_source_data:
675 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
676 if "cidr" in ip_profile_source_data:
677 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
678 if "gateway-ip" in ip_profile_source_data:
679 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
680 if "dhcp-enabled" in ip_profile_source_data:
681 ip_profile_dest_data["dhcp-params"] = {
682 "enabled": ip_profile_source_data["dhcp-enabled"]
683 }
684 vld_params["ip-profile"] = ip_profile_dest_data
685
686 # update vld_params with instantiation params
687 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
688 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
689 if vld_instantiation_params:
690 vld_params.update(vld_instantiation_params)
691 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
692 target["ns"]["vld"].append(target_vld)
693
694 for vnfr in db_vnfrs.values():
695 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
696 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
697 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
698 target_vnf = deepcopy(vnfr)
699 target_vim = "vim:{}".format(vnfr["vim-account-id"])
700 for vld in target_vnf.get("vld", ()):
701 # check if connected to a ns.vld, to fill target'
702 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
703 lambda cpd: cpd.get("id") == vld["id"])
704 if vnf_cp:
705 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
706 if cp2target.get(ns_cp):
707 vld["target"] = cp2target[ns_cp]
708
709 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
710 # check if this network needs SDN assist
711 target_sdn = None
712 if vld.get("pci-interfaces"):
713 db_vim = get_vim_account(vnfr["vim-account-id"])
714 sdnc_id = db_vim["config"].get("sdn-controller")
715 if sdnc_id:
716 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
717 target_sdn = "sdn:{}".format(sdnc_id)
718 vld["vim_info"][target_sdn] = {
719 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
720
721 # check at vnfd descriptor, if there is an ip-profile
722 vld_params = {}
723 vnfd_vlp = find_in_list(
724 get_virtual_link_profiles(vnfd),
725 lambda a_link_profile: a_link_profile["id"] == vld["id"]
726 )
727 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
728 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
729 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
730 ip_profile_dest_data = {}
731 if "ip-version" in ip_profile_source_data:
732 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
733 if "cidr" in ip_profile_source_data:
734 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
735 if "gateway-ip" in ip_profile_source_data:
736 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
737 if "dhcp-enabled" in ip_profile_source_data:
738 ip_profile_dest_data["dhcp-params"] = {
739 "enabled": ip_profile_source_data["dhcp-enabled"]
740 }
741
742 vld_params["ip-profile"] = ip_profile_dest_data
743 # update vld_params with instantiation params
744 if vnf_params:
745 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
746 lambda i_vld: i_vld["name"] == vld["id"])
747 if vld_instantiation_params:
748 vld_params.update(vld_instantiation_params)
749 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
750
751 vdur_list = []
752 for vdur in target_vnf.get("vdur", ()):
753 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
754 continue # This vdu must not be created
755 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
756
757 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
758
759 if ssh_keys_all:
760 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
761 vnf_configuration = get_configuration(vnfd, vnfd["id"])
762 if vdu_configuration and vdu_configuration.get("config-access") and \
763 vdu_configuration.get("config-access").get("ssh-access"):
764 vdur["ssh-keys"] = ssh_keys_all
765 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
766 elif vnf_configuration and vnf_configuration.get("config-access") and \
767 vnf_configuration.get("config-access").get("ssh-access") and \
768 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
769 vdur["ssh-keys"] = ssh_keys_all
770 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
771 elif ssh_keys_instantiation and \
772 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
773 vdur["ssh-keys"] = ssh_keys_instantiation
774
775 self.logger.debug("NS > vdur > {}".format(vdur))
776
777 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
778 # cloud-init
779 if vdud.get("cloud-init-file"):
780 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
781 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
782 if vdur["cloud-init"] not in target["cloud_init_content"]:
783 base_folder = vnfd["_admin"]["storage"]
784 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
785 vdud.get("cloud-init-file"))
786 with self.fs.file_open(cloud_init_file, "r") as ci_file:
787 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
788 elif vdud.get("cloud-init"):
789 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
790 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
791 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
792 vdur["additionalParams"] = vdur.get("additionalParams") or {}
793 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
794 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
795 vdur["additionalParams"] = deploy_params_vdu
796
797 # flavor
798 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
799 if target_vim not in ns_flavor["vim_info"]:
800 ns_flavor["vim_info"][target_vim] = {}
801
802 # deal with images
803 # in case alternative images are provided we must check if they should be applied
804 # for the vim_type, modify the vim_type taking into account
805 ns_image_id = int(vdur["ns-image-id"])
806 if vdur.get("alt-image-ids"):
807 db_vim = get_vim_account(vnfr["vim-account-id"])
808 vim_type = db_vim["vim_type"]
809 for alt_image_id in vdur.get("alt-image-ids"):
810 ns_alt_image = target["image"][int(alt_image_id)]
811 if vim_type == ns_alt_image.get("vim-type"):
812 # must use alternative image
813 self.logger.debug("use alternative image id: {}".format(alt_image_id))
814 ns_image_id = alt_image_id
815 vdur["ns-image-id"] = ns_image_id
816 break
817 ns_image = target["image"][int(ns_image_id)]
818 if target_vim not in ns_image["vim_info"]:
819 ns_image["vim_info"][target_vim] = {}
820
821 vdur["vim_info"] = {target_vim: {}}
822 # instantiation parameters
823 # if vnf_params:
824 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
825 # vdud["id"]), None)
826 vdur_list.append(vdur)
827 target_vnf["vdur"] = vdur_list
828 target["vnf"].append(target_vnf)
829
830 desc = await self.RO.deploy(nsr_id, target)
831 self.logger.debug("RO return > {}".format(desc))
832 action_id = desc["action_id"]
833 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
834
835 # Updating NSR
836 db_nsr_update = {
837 "_admin.deployed.RO.operational-status": "running",
838 "detailed-status": " ".join(stage)
839 }
840 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
841 self.update_db_2("nsrs", nsr_id, db_nsr_update)
842 self._write_op_status(nslcmop_id, stage)
843 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
844 return
845
846 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
847 detailed_status_old = None
848 db_nsr_update = {}
849 start_time = start_time or time()
850 while time() <= start_time + timeout:
851 desc_status = await self.RO.status(nsr_id, action_id)
852 self.logger.debug("Wait NG RO > {}".format(desc_status))
853 if desc_status["status"] == "FAILED":
854 raise NgRoException(desc_status["details"])
855 elif desc_status["status"] == "BUILD":
856 if stage:
857 stage[2] = "VIM: ({})".format(desc_status["details"])
858 elif desc_status["status"] == "DONE":
859 if stage:
860 stage[2] = "Deployed at VIM"
861 break
862 else:
863 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
864 if stage and nslcmop_id and stage[2] != detailed_status_old:
865 detailed_status_old = stage[2]
866 db_nsr_update["detailed-status"] = " ".join(stage)
867 self.update_db_2("nsrs", nsr_id, db_nsr_update)
868 self._write_op_status(nslcmop_id, stage)
869 await asyncio.sleep(15, loop=self.loop)
870 else: # timeout_ns_deploy
871 raise NgRoException("Timeout waiting ns to deploy")
872
873 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
874 db_nsr_update = {}
875 failed_detail = []
876 action_id = None
877 start_deploy = time()
878 try:
879 target = {
880 "ns": {"vld": []},
881 "vnf": [],
882 "image": [],
883 "flavor": [],
884 "action_id": nslcmop_id
885 }
886 desc = await self.RO.deploy(nsr_id, target)
887 action_id = desc["action_id"]
888 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
889 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
890 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
891
892 # wait until done
893 delete_timeout = 20 * 60 # 20 minutes
894 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
895
896 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
897 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
898 # delete all nsr
899 await self.RO.delete(nsr_id)
900 except Exception as e:
901 if isinstance(e, NgRoException) and e.http_code == 404: # not found
902 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
903 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
904 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
905 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
906 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
907 failed_detail.append("delete conflict: {}".format(e))
908 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
909 else:
910 failed_detail.append("delete error: {}".format(e))
911 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
912
913 if failed_detail:
914 stage[2] = "Error deleting from VIM"
915 else:
916 stage[2] = "Deleted from VIM"
917 db_nsr_update["detailed-status"] = " ".join(stage)
918 self.update_db_2("nsrs", nsr_id, db_nsr_update)
919 self._write_op_status(nslcmop_id, stage)
920
921 if failed_detail:
922 raise LcmException("; ".join(failed_detail))
923 return
924
925 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
926 n2vc_key_list, stage):
927 """
928 Instantiate at RO
929 :param logging_text: preffix text to use at logging
930 :param nsr_id: nsr identity
931 :param nsd: database content of ns descriptor
932 :param db_nsr: database content of ns record
933 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
934 :param db_vnfrs:
935 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
936 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
937 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
938 :return: None or exception
939 """
940 try:
941 start_deploy = time()
942 ns_params = db_nslcmop.get("operationParams")
943 if ns_params and ns_params.get("timeout_ns_deploy"):
944 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
945 else:
946 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
947
948 # Check for and optionally request placement optimization. Database will be updated if placement activated
949 stage[2] = "Waiting for Placement."
950 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
951 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
952 for vnfr in db_vnfrs.values():
953 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
954 break
955 else:
956 ns_params["vimAccountId"] == vnfr["vim-account-id"]
957
958 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
959 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
960 except Exception as e:
961 stage[2] = "ERROR deploying at VIM"
962 self.set_vnfr_at_error(db_vnfrs, str(e))
963 self.logger.error("Error deploying at VIM {}".format(e),
964 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
965 NgRoException)))
966 raise
967
968 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
969 """
970 Wait for kdu to be up, get ip address
971 :param logging_text: prefix use for logging
972 :param nsr_id:
973 :param vnfr_id:
974 :param kdu_name:
975 :return: IP address
976 """
977
978 # self.logger.debug(logging_text + "Starting wait_kdu_up")
979 nb_tries = 0
980
981 while nb_tries < 360:
982 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
983 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
984 if not kdur:
985 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
986 if kdur.get("status"):
987 if kdur["status"] in ("READY", "ENABLED"):
988 return kdur.get("ip-address")
989 else:
990 raise LcmException("target KDU={} is in error state".format(kdu_name))
991
992 await asyncio.sleep(10, loop=self.loop)
993 nb_tries += 1
994 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
995
996 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
997 """
998 Wait for ip addres at RO, and optionally, insert public key in virtual machine
999 :param logging_text: prefix use for logging
1000 :param nsr_id:
1001 :param vnfr_id:
1002 :param vdu_id:
1003 :param vdu_index:
1004 :param pub_key: public ssh key to inject, None to skip
1005 :param user: user to apply the public ssh key
1006 :return: IP address
1007 """
1008
1009 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1010 ro_nsr_id = None
1011 ip_address = None
1012 nb_tries = 0
1013 target_vdu_id = None
1014 ro_retries = 0
1015
1016 while True:
1017
1018 ro_retries += 1
1019 if ro_retries >= 360: # 1 hour
1020 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1021
1022 await asyncio.sleep(10, loop=self.loop)
1023
1024 # get ip address
1025 if not target_vdu_id:
1026 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1027
1028 if not vdu_id: # for the VNF case
1029 if db_vnfr.get("status") == "ERROR":
1030 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1031 ip_address = db_vnfr.get("ip-address")
1032 if not ip_address:
1033 continue
1034 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1035 else: # VDU case
1036 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1037 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1038
1039 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1040 vdur = db_vnfr["vdur"][0]
1041 if not vdur:
1042 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1043 vdu_index))
1044 # New generation RO stores information at "vim_info"
1045 ng_ro_status = None
1046 target_vim = None
1047 if vdur.get("vim_info"):
1048 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1049 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1050 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1051 ip_address = vdur.get("ip-address")
1052 if not ip_address:
1053 continue
1054 target_vdu_id = vdur["vdu-id-ref"]
1055 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1056 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1057
1058 if not target_vdu_id:
1059 continue
1060
1061 # inject public key into machine
1062 if pub_key and user:
1063 self.logger.debug(logging_text + "Inserting RO key")
1064 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1065 if vdur.get("pdu-type"):
1066 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1067 return ip_address
1068 try:
1069 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1070 if self.ng_ro:
1071 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1072 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1073 }
1074 desc = await self.RO.deploy(nsr_id, target)
1075 action_id = desc["action_id"]
1076 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1077 break
1078 else:
1079 # wait until NS is deployed at RO
1080 if not ro_nsr_id:
1081 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1082 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1083 if not ro_nsr_id:
1084 continue
1085 result_dict = await self.RO.create_action(
1086 item="ns",
1087 item_id_name=ro_nsr_id,
1088 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1089 )
1090 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1091 if not result_dict or not isinstance(result_dict, dict):
1092 raise LcmException("Unknown response from RO when injecting key")
1093 for result in result_dict.values():
1094 if result.get("vim_result") == 200:
1095 break
1096 else:
1097 raise ROclient.ROClientException("error injecting key: {}".format(
1098 result.get("description")))
1099 break
1100 except NgRoException as e:
1101 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1102 except ROclient.ROClientException as e:
1103 if not nb_tries:
1104 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1105 format(e, 20*10))
1106 nb_tries += 1
1107 if nb_tries >= 20:
1108 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1109 else:
1110 break
1111
1112 return ip_address
1113
1114 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1115 """
1116 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1117 """
1118 my_vca = vca_deployed_list[vca_index]
1119 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1120 # vdu or kdu: no dependencies
1121 return
1122 timeout = 300
1123 while timeout >= 0:
1124 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1125 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1126 configuration_status_list = db_nsr["configurationStatus"]
1127 for index, vca_deployed in enumerate(configuration_status_list):
1128 if index == vca_index:
1129 # myself
1130 continue
1131 if not my_vca.get("member-vnf-index") or \
1132 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1133 internal_status = configuration_status_list[index].get("status")
1134 if internal_status == 'READY':
1135 continue
1136 elif internal_status == 'BROKEN':
1137 raise LcmException("Configuration aborted because dependent charm/s has failed")
1138 else:
1139 break
1140 else:
1141 # no dependencies, return
1142 return
1143 await asyncio.sleep(10)
1144 timeout -= 1
1145
1146 raise LcmException("Configuration aborted because dependent charm/s timeout")
1147
1148 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1149 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1150 ee_config_descriptor):
1151 nsr_id = db_nsr["_id"]
1152 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1153 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1154 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1155 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1156 db_dict = {
1157 'collection': 'nsrs',
1158 'filter': {'_id': nsr_id},
1159 'path': db_update_entry
1160 }
1161 step = ""
1162 try:
1163
1164 element_type = 'NS'
1165 element_under_configuration = nsr_id
1166
1167 vnfr_id = None
1168 if db_vnfr:
1169 vnfr_id = db_vnfr["_id"]
1170 osm_config["osm"]["vnf_id"] = vnfr_id
1171
1172 namespace = "{nsi}.{ns}".format(
1173 nsi=nsi_id if nsi_id else "",
1174 ns=nsr_id)
1175
1176 if vnfr_id:
1177 element_type = 'VNF'
1178 element_under_configuration = vnfr_id
1179 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1180 if vdu_id:
1181 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1182 element_type = 'VDU'
1183 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1184 osm_config["osm"]["vdu_id"] = vdu_id
1185 elif kdu_name:
1186 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1187 element_type = 'KDU'
1188 element_under_configuration = kdu_name
1189 osm_config["osm"]["kdu_name"] = kdu_name
1190
1191 # Get artifact path
1192 artifact_path = "{}/{}/{}/{}".format(
1193 base_folder["folder"],
1194 base_folder["pkg-dir"],
1195 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1196 vca_name
1197 )
1198
1199 self.logger.debug("Artifact path > {}".format(artifact_path))
1200
1201 # get initial_config_primitive_list that applies to this element
1202 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1203
1204 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1205
1206 # add config if not present for NS charm
1207 ee_descriptor_id = ee_config_descriptor.get("id")
1208 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1209 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1210 vca_deployed, ee_descriptor_id)
1211
1212 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1213 # n2vc_redesign STEP 3.1
1214 # find old ee_id if exists
1215 ee_id = vca_deployed.get("ee_id")
1216
1217 vim_account_id = (
1218 deep_get(db_vnfr, ("vim-account-id",)) or
1219 deep_get(deploy_params, ("OSM", "vim_account_id"))
1220 )
1221 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1222 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1223 # create or register execution environment in VCA
1224 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1225
1226 self._write_configuration_status(
1227 nsr_id=nsr_id,
1228 vca_index=vca_index,
1229 status='CREATING',
1230 element_under_configuration=element_under_configuration,
1231 element_type=element_type
1232 )
1233
1234 step = "create execution environment"
1235 self.logger.debug(logging_text + step)
1236
1237 ee_id = None
1238 credentials = None
1239 if vca_type == "k8s_proxy_charm":
1240 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1241 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1242 namespace=namespace,
1243 artifact_path=artifact_path,
1244 db_dict=db_dict,
1245 cloud_name=vca_k8s_cloud,
1246 credential_name=vca_k8s_cloud_credential,
1247 )
1248 elif vca_type == "helm" or vca_type == "helm-v3":
1249 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1250 namespace=namespace,
1251 reuse_ee_id=ee_id,
1252 db_dict=db_dict,
1253 config=osm_config,
1254 artifact_path=artifact_path,
1255 vca_type=vca_type
1256 )
1257 else:
1258 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1259 namespace=namespace,
1260 reuse_ee_id=ee_id,
1261 db_dict=db_dict,
1262 cloud_name=vca_cloud,
1263 credential_name=vca_cloud_credential,
1264 )
1265
1266 elif vca_type == "native_charm":
1267 step = "Waiting to VM being up and getting IP address"
1268 self.logger.debug(logging_text + step)
1269 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1270 user=None, pub_key=None)
1271 credentials = {"hostname": rw_mgmt_ip}
1272 # get username
1273 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1274 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1275 # merged. Meanwhile let's get username from initial-config-primitive
1276 if not username and initial_config_primitive_list:
1277 for config_primitive in initial_config_primitive_list:
1278 for param in config_primitive.get("parameter", ()):
1279 if param["name"] == "ssh-username":
1280 username = param["value"]
1281 break
1282 if not username:
1283 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1284 "'config-access.ssh-access.default-user'")
1285 credentials["username"] = username
1286 # n2vc_redesign STEP 3.2
1287
1288 self._write_configuration_status(
1289 nsr_id=nsr_id,
1290 vca_index=vca_index,
1291 status='REGISTERING',
1292 element_under_configuration=element_under_configuration,
1293 element_type=element_type
1294 )
1295
1296 step = "register execution environment {}".format(credentials)
1297 self.logger.debug(logging_text + step)
1298 ee_id = await self.vca_map[vca_type].register_execution_environment(
1299 credentials=credentials,
1300 namespace=namespace,
1301 db_dict=db_dict,
1302 cloud_name=vca_cloud,
1303 credential_name=vca_cloud_credential,
1304 )
1305
1306 # for compatibility with MON/POL modules, the need model and application name at database
1307 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1308 ee_id_parts = ee_id.split('.')
1309 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1310 if len(ee_id_parts) >= 2:
1311 model_name = ee_id_parts[0]
1312 application_name = ee_id_parts[1]
1313 db_nsr_update[db_update_entry + "model"] = model_name
1314 db_nsr_update[db_update_entry + "application"] = application_name
1315
1316 # n2vc_redesign STEP 3.3
1317 step = "Install configuration Software"
1318
1319 self._write_configuration_status(
1320 nsr_id=nsr_id,
1321 vca_index=vca_index,
1322 status='INSTALLING SW',
1323 element_under_configuration=element_under_configuration,
1324 element_type=element_type,
1325 other_update=db_nsr_update
1326 )
1327
1328 # TODO check if already done
1329 self.logger.debug(logging_text + step)
1330 config = None
1331 if vca_type == "native_charm":
1332 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1333 if config_primitive:
1334 config = self._map_primitive_params(
1335 config_primitive,
1336 {},
1337 deploy_params
1338 )
1339 num_units = 1
1340 if vca_type == "lxc_proxy_charm":
1341 if element_type == "NS":
1342 num_units = db_nsr.get("config-units") or 1
1343 elif element_type == "VNF":
1344 num_units = db_vnfr.get("config-units") or 1
1345 elif element_type == "VDU":
1346 for v in db_vnfr["vdur"]:
1347 if vdu_id == v["vdu-id-ref"]:
1348 num_units = v.get("config-units") or 1
1349 break
1350 if vca_type != "k8s_proxy_charm":
1351 await self.vca_map[vca_type].install_configuration_sw(
1352 ee_id=ee_id,
1353 artifact_path=artifact_path,
1354 db_dict=db_dict,
1355 config=config,
1356 num_units=num_units,
1357 )
1358
1359 # write in db flag of configuration_sw already installed
1360 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1361
1362 # add relations for this VCA (wait for other peers related with this VCA)
1363 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1364 vca_index=vca_index, vca_type=vca_type)
1365
1366 # if SSH access is required, then get execution environment SSH public
1367 # if native charm we have waited already to VM be UP
1368 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1369 pub_key = None
1370 user = None
1371 # self.logger.debug("get ssh key block")
1372 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1373 # self.logger.debug("ssh key needed")
1374 # Needed to inject a ssh key
1375 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1376 step = "Install configuration Software, getting public ssh key"
1377 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1378
1379 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1380 else:
1381 # self.logger.debug("no need to get ssh key")
1382 step = "Waiting to VM being up and getting IP address"
1383 self.logger.debug(logging_text + step)
1384
1385 # n2vc_redesign STEP 5.1
1386 # wait for RO (ip-address) Insert pub_key into VM
1387 if vnfr_id:
1388 if kdu_name:
1389 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1390 else:
1391 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1392 vdu_index, user=user, pub_key=pub_key)
1393 else:
1394 rw_mgmt_ip = None # This is for a NS configuration
1395
1396 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1397
1398 # store rw_mgmt_ip in deploy params for later replacement
1399 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1400
1401 # n2vc_redesign STEP 6 Execute initial config primitive
1402 step = 'execute initial config primitive'
1403
1404 # wait for dependent primitives execution (NS -> VNF -> VDU)
1405 if initial_config_primitive_list:
1406 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1407
1408 # stage, in function of element type: vdu, kdu, vnf or ns
1409 my_vca = vca_deployed_list[vca_index]
1410 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1411 # VDU or KDU
1412 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1413 elif my_vca.get("member-vnf-index"):
1414 # VNF
1415 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1416 else:
1417 # NS
1418 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1419
1420 self._write_configuration_status(
1421 nsr_id=nsr_id,
1422 vca_index=vca_index,
1423 status='EXECUTING PRIMITIVE'
1424 )
1425
1426 self._write_op_status(
1427 op_id=nslcmop_id,
1428 stage=stage
1429 )
1430
1431 check_if_terminated_needed = True
1432 for initial_config_primitive in initial_config_primitive_list:
1433 # adding information on the vca_deployed if it is a NS execution environment
1434 if not vca_deployed["member-vnf-index"]:
1435 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1436 # TODO check if already done
1437 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1438
1439 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1440 self.logger.debug(logging_text + step)
1441 await self.vca_map[vca_type].exec_primitive(
1442 ee_id=ee_id,
1443 primitive_name=initial_config_primitive["name"],
1444 params_dict=primitive_params_,
1445 db_dict=db_dict
1446 )
1447 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1448 if check_if_terminated_needed:
1449 if config_descriptor.get('terminate-config-primitive'):
1450 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1451 check_if_terminated_needed = False
1452
1453 # TODO register in database that primitive is done
1454
1455 # STEP 7 Configure metrics
1456 if vca_type == "helm" or vca_type == "helm-v3":
1457 prometheus_jobs = await self.add_prometheus_metrics(
1458 ee_id=ee_id,
1459 artifact_path=artifact_path,
1460 ee_config_descriptor=ee_config_descriptor,
1461 vnfr_id=vnfr_id,
1462 nsr_id=nsr_id,
1463 target_ip=rw_mgmt_ip,
1464 )
1465 if prometheus_jobs:
1466 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1467
1468 step = "instantiated at VCA"
1469 self.logger.debug(logging_text + step)
1470
1471 self._write_configuration_status(
1472 nsr_id=nsr_id,
1473 vca_index=vca_index,
1474 status='READY'
1475 )
1476
1477 except Exception as e: # TODO not use Exception but N2VC exception
1478 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1479 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1480 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1481 self._write_configuration_status(
1482 nsr_id=nsr_id,
1483 vca_index=vca_index,
1484 status='BROKEN'
1485 )
1486 raise LcmException("{} {}".format(step, e)) from e
1487
1488 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1489 error_description: str = None, error_detail: str = None, other_update: dict = None):
1490 """
1491 Update db_nsr fields.
1492 :param nsr_id:
1493 :param ns_state:
1494 :param current_operation:
1495 :param current_operation_id:
1496 :param error_description:
1497 :param error_detail:
1498 :param other_update: Other required changes at database if provided, will be cleared
1499 :return:
1500 """
1501 try:
1502 db_dict = other_update or {}
1503 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1504 db_dict["_admin.current-operation"] = current_operation_id
1505 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1506 db_dict["currentOperation"] = current_operation
1507 db_dict["currentOperationID"] = current_operation_id
1508 db_dict["errorDescription"] = error_description
1509 db_dict["errorDetail"] = error_detail
1510
1511 if ns_state:
1512 db_dict["nsState"] = ns_state
1513 self.update_db_2("nsrs", nsr_id, db_dict)
1514 except DbException as e:
1515 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1516
1517 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1518 operation_state: str = None, other_update: dict = None):
1519 try:
1520 db_dict = other_update or {}
1521 db_dict['queuePosition'] = queuePosition
1522 if isinstance(stage, list):
1523 db_dict['stage'] = stage[0]
1524 db_dict['detailed-status'] = " ".join(stage)
1525 elif stage is not None:
1526 db_dict['stage'] = str(stage)
1527
1528 if error_message is not None:
1529 db_dict['errorMessage'] = error_message
1530 if operation_state is not None:
1531 db_dict['operationState'] = operation_state
1532 db_dict["statusEnteredTime"] = time()
1533 self.update_db_2("nslcmops", op_id, db_dict)
1534 except DbException as e:
1535 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1536
1537 def _write_all_config_status(self, db_nsr: dict, status: str):
1538 try:
1539 nsr_id = db_nsr["_id"]
1540 # configurationStatus
1541 config_status = db_nsr.get('configurationStatus')
1542 if config_status:
1543 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1544 enumerate(config_status) if v}
1545 # update status
1546 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1547
1548 except DbException as e:
1549 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1550
1551 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1552 element_under_configuration: str = None, element_type: str = None,
1553 other_update: dict = None):
1554
1555 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1556 # .format(vca_index, status))
1557
1558 try:
1559 db_path = 'configurationStatus.{}.'.format(vca_index)
1560 db_dict = other_update or {}
1561 if status:
1562 db_dict[db_path + 'status'] = status
1563 if element_under_configuration:
1564 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1565 if element_type:
1566 db_dict[db_path + 'elementType'] = element_type
1567 self.update_db_2("nsrs", nsr_id, db_dict)
1568 except DbException as e:
1569 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1570 .format(status, nsr_id, vca_index, e))
1571
1572 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1573 """
1574 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1575 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1576 Database is used because the result can be obtained from a different LCM worker in case of HA.
1577 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1578 :param db_nslcmop: database content of nslcmop
1579 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1580 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1581 computed 'vim-account-id'
1582 """
1583 modified = False
1584 nslcmop_id = db_nslcmop['_id']
1585 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1586 if placement_engine == "PLA":
1587 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1588 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1589 db_poll_interval = 5
1590 wait = db_poll_interval * 10
1591 pla_result = None
1592 while not pla_result and wait >= 0:
1593 await asyncio.sleep(db_poll_interval)
1594 wait -= db_poll_interval
1595 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1596 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1597
1598 if not pla_result:
1599 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1600
1601 for pla_vnf in pla_result['vnf']:
1602 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1603 if not pla_vnf.get('vimAccountId') or not vnfr:
1604 continue
1605 modified = True
1606 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1607 # Modifies db_vnfrs
1608 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1609 return modified
1610
1611 def update_nsrs_with_pla_result(self, params):
1612 try:
1613 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1614 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1615 except Exception as e:
1616 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1617
1618 async def instantiate(self, nsr_id, nslcmop_id):
1619 """
1620
1621 :param nsr_id: ns instance to deploy
1622 :param nslcmop_id: operation to run
1623 :return:
1624 """
1625
1626 # Try to lock HA task here
1627 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1628 if not task_is_locked_by_me:
1629 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1630 return
1631
1632 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1633 self.logger.debug(logging_text + "Enter")
1634
1635 # get all needed from database
1636
1637 # database nsrs record
1638 db_nsr = None
1639
1640 # database nslcmops record
1641 db_nslcmop = None
1642
1643 # update operation on nsrs
1644 db_nsr_update = {}
1645 # update operation on nslcmops
1646 db_nslcmop_update = {}
1647
1648 nslcmop_operation_state = None
1649 db_vnfrs = {} # vnf's info indexed by member-index
1650 # n2vc_info = {}
1651 tasks_dict_info = {} # from task to info text
1652 exc = None
1653 error_list = []
1654 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1655 # ^ stage, step, VIM progress
1656 try:
1657 # wait for any previous tasks in process
1658 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1659
1660 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1661 stage[1] = "Reading from database."
1662 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1663 db_nsr_update["detailed-status"] = "creating"
1664 db_nsr_update["operational-status"] = "init"
1665 self._write_ns_status(
1666 nsr_id=nsr_id,
1667 ns_state="BUILDING",
1668 current_operation="INSTANTIATING",
1669 current_operation_id=nslcmop_id,
1670 other_update=db_nsr_update
1671 )
1672 self._write_op_status(
1673 op_id=nslcmop_id,
1674 stage=stage,
1675 queuePosition=0
1676 )
1677
1678 # read from db: operation
1679 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1680 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1681 ns_params = db_nslcmop.get("operationParams")
1682 if ns_params and ns_params.get("timeout_ns_deploy"):
1683 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1684 else:
1685 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1686
1687 # read from db: ns
1688 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1689 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1690 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1691 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1692 self.fs.sync(db_nsr["nsd-id"])
1693 db_nsr["nsd"] = nsd
1694 # nsr_name = db_nsr["name"] # TODO short-name??
1695
1696 # read from db: vnf's of this ns
1697 stage[1] = "Getting vnfrs from db."
1698 self.logger.debug(logging_text + stage[1])
1699 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1700
1701 # read from db: vnfd's for every vnf
1702 db_vnfds = [] # every vnfd data
1703
1704 # for each vnf in ns, read vnfd
1705 for vnfr in db_vnfrs_list:
1706 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1707 vnfd_id = vnfr["vnfd-id"]
1708 vnfd_ref = vnfr["vnfd-ref"]
1709 self.fs.sync(vnfd_id)
1710
1711 # if we haven't this vnfd, read it from db
1712 if vnfd_id not in db_vnfds:
1713 # read from db
1714 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1715 self.logger.debug(logging_text + stage[1])
1716 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1717
1718 # store vnfd
1719 db_vnfds.append(vnfd)
1720
1721 # Get or generates the _admin.deployed.VCA list
1722 vca_deployed_list = None
1723 if db_nsr["_admin"].get("deployed"):
1724 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1725 if vca_deployed_list is None:
1726 vca_deployed_list = []
1727 configuration_status_list = []
1728 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1729 db_nsr_update["configurationStatus"] = configuration_status_list
1730 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1731 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1732 elif isinstance(vca_deployed_list, dict):
1733 # maintain backward compatibility. Change a dict to list at database
1734 vca_deployed_list = list(vca_deployed_list.values())
1735 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1736 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1737
1738 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1739 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1740 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1741
1742 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1743 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1744 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1745 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1746
1747 # n2vc_redesign STEP 2 Deploy Network Scenario
1748 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1749 self._write_op_status(
1750 op_id=nslcmop_id,
1751 stage=stage
1752 )
1753
1754 stage[1] = "Deploying KDUs."
1755 # self.logger.debug(logging_text + "Before deploy_kdus")
1756 # Call to deploy_kdus in case exists the "vdu:kdu" param
1757 await self.deploy_kdus(
1758 logging_text=logging_text,
1759 nsr_id=nsr_id,
1760 nslcmop_id=nslcmop_id,
1761 db_vnfrs=db_vnfrs,
1762 db_vnfds=db_vnfds,
1763 task_instantiation_info=tasks_dict_info,
1764 )
1765
1766 stage[1] = "Getting VCA public key."
1767 # n2vc_redesign STEP 1 Get VCA public ssh-key
1768 # feature 1429. Add n2vc public key to needed VMs
1769 n2vc_key = self.n2vc.get_public_key()
1770 n2vc_key_list = [n2vc_key]
1771 if self.vca_config.get("public_key"):
1772 n2vc_key_list.append(self.vca_config["public_key"])
1773
1774 stage[1] = "Deploying NS at VIM."
1775 task_ro = asyncio.ensure_future(
1776 self.instantiate_RO(
1777 logging_text=logging_text,
1778 nsr_id=nsr_id,
1779 nsd=nsd,
1780 db_nsr=db_nsr,
1781 db_nslcmop=db_nslcmop,
1782 db_vnfrs=db_vnfrs,
1783 db_vnfds=db_vnfds,
1784 n2vc_key_list=n2vc_key_list,
1785 stage=stage
1786 )
1787 )
1788 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1789 tasks_dict_info[task_ro] = "Deploying at VIM"
1790
1791 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1792 stage[1] = "Deploying Execution Environments."
1793 self.logger.debug(logging_text + stage[1])
1794
1795 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1796 for vnf_profile in get_vnf_profiles(nsd):
1797 vnfd_id = vnf_profile["vnfd-id"]
1798 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1799 member_vnf_index = str(vnf_profile["id"])
1800 db_vnfr = db_vnfrs[member_vnf_index]
1801 base_folder = vnfd["_admin"]["storage"]
1802 vdu_id = None
1803 vdu_index = 0
1804 vdu_name = None
1805 kdu_name = None
1806
1807 # Get additional parameters
1808 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1809 if db_vnfr.get("additionalParamsForVnf"):
1810 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1811
1812 descriptor_config = get_configuration(vnfd, vnfd["id"])
1813 if descriptor_config:
1814 self._deploy_n2vc(
1815 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1816 db_nsr=db_nsr,
1817 db_vnfr=db_vnfr,
1818 nslcmop_id=nslcmop_id,
1819 nsr_id=nsr_id,
1820 nsi_id=nsi_id,
1821 vnfd_id=vnfd_id,
1822 vdu_id=vdu_id,
1823 kdu_name=kdu_name,
1824 member_vnf_index=member_vnf_index,
1825 vdu_index=vdu_index,
1826 vdu_name=vdu_name,
1827 deploy_params=deploy_params,
1828 descriptor_config=descriptor_config,
1829 base_folder=base_folder,
1830 task_instantiation_info=tasks_dict_info,
1831 stage=stage
1832 )
1833
1834 # Deploy charms for each VDU that supports one.
1835 for vdud in get_vdu_list(vnfd):
1836 vdu_id = vdud["id"]
1837 descriptor_config = get_configuration(vnfd, vdu_id)
1838 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1839
1840 if vdur.get("additionalParams"):
1841 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1842 else:
1843 deploy_params_vdu = deploy_params
1844 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1845 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1846
1847 self.logger.debug("VDUD > {}".format(vdud))
1848 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1849 if descriptor_config:
1850 vdu_name = None
1851 kdu_name = None
1852 for vdu_index in range(vdud_count):
1853 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1854 self._deploy_n2vc(
1855 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1856 member_vnf_index, vdu_id, vdu_index),
1857 db_nsr=db_nsr,
1858 db_vnfr=db_vnfr,
1859 nslcmop_id=nslcmop_id,
1860 nsr_id=nsr_id,
1861 nsi_id=nsi_id,
1862 vnfd_id=vnfd_id,
1863 vdu_id=vdu_id,
1864 kdu_name=kdu_name,
1865 member_vnf_index=member_vnf_index,
1866 vdu_index=vdu_index,
1867 vdu_name=vdu_name,
1868 deploy_params=deploy_params_vdu,
1869 descriptor_config=descriptor_config,
1870 base_folder=base_folder,
1871 task_instantiation_info=tasks_dict_info,
1872 stage=stage
1873 )
1874 for kdud in get_kdu_list(vnfd):
1875 kdu_name = kdud["name"]
1876 descriptor_config = get_configuration(vnfd, kdu_name)
1877 if descriptor_config:
1878 vdu_id = None
1879 vdu_index = 0
1880 vdu_name = None
1881 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1882 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1883 if kdur.get("additionalParams"):
1884 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1885
1886 self._deploy_n2vc(
1887 logging_text=logging_text,
1888 db_nsr=db_nsr,
1889 db_vnfr=db_vnfr,
1890 nslcmop_id=nslcmop_id,
1891 nsr_id=nsr_id,
1892 nsi_id=nsi_id,
1893 vnfd_id=vnfd_id,
1894 vdu_id=vdu_id,
1895 kdu_name=kdu_name,
1896 member_vnf_index=member_vnf_index,
1897 vdu_index=vdu_index,
1898 vdu_name=vdu_name,
1899 deploy_params=deploy_params_kdu,
1900 descriptor_config=descriptor_config,
1901 base_folder=base_folder,
1902 task_instantiation_info=tasks_dict_info,
1903 stage=stage
1904 )
1905
1906 # Check if this NS has a charm configuration
1907 descriptor_config = nsd.get("ns-configuration")
1908 if descriptor_config and descriptor_config.get("juju"):
1909 vnfd_id = None
1910 db_vnfr = None
1911 member_vnf_index = None
1912 vdu_id = None
1913 kdu_name = None
1914 vdu_index = 0
1915 vdu_name = None
1916
1917 # Get additional parameters
1918 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1919 if db_nsr.get("additionalParamsForNs"):
1920 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1921 base_folder = nsd["_admin"]["storage"]
1922 self._deploy_n2vc(
1923 logging_text=logging_text,
1924 db_nsr=db_nsr,
1925 db_vnfr=db_vnfr,
1926 nslcmop_id=nslcmop_id,
1927 nsr_id=nsr_id,
1928 nsi_id=nsi_id,
1929 vnfd_id=vnfd_id,
1930 vdu_id=vdu_id,
1931 kdu_name=kdu_name,
1932 member_vnf_index=member_vnf_index,
1933 vdu_index=vdu_index,
1934 vdu_name=vdu_name,
1935 deploy_params=deploy_params,
1936 descriptor_config=descriptor_config,
1937 base_folder=base_folder,
1938 task_instantiation_info=tasks_dict_info,
1939 stage=stage
1940 )
1941
1942 # rest of staff will be done at finally
1943
1944 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1945 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1946 exc = e
1947 except asyncio.CancelledError:
1948 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1949 exc = "Operation was cancelled"
1950 except Exception as e:
1951 exc = traceback.format_exc()
1952 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1953 finally:
1954 if exc:
1955 error_list.append(str(exc))
1956 try:
1957 # wait for pending tasks
1958 if tasks_dict_info:
1959 stage[1] = "Waiting for instantiate pending tasks."
1960 self.logger.debug(logging_text + stage[1])
1961 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1962 stage, nslcmop_id, nsr_id=nsr_id)
1963 stage[1] = stage[2] = ""
1964 except asyncio.CancelledError:
1965 error_list.append("Cancelled")
1966 # TODO cancel all tasks
1967 except Exception as exc:
1968 error_list.append(str(exc))
1969
1970 # update operation-status
1971 db_nsr_update["operational-status"] = "running"
1972 # let's begin with VCA 'configured' status (later we can change it)
1973 db_nsr_update["config-status"] = "configured"
1974 for task, task_name in tasks_dict_info.items():
1975 if not task.done() or task.cancelled() or task.exception():
1976 if task_name.startswith(self.task_name_deploy_vca):
1977 # A N2VC task is pending
1978 db_nsr_update["config-status"] = "failed"
1979 else:
1980 # RO or KDU task is pending
1981 db_nsr_update["operational-status"] = "failed"
1982
1983 # update status at database
1984 if error_list:
1985 error_detail = ". ".join(error_list)
1986 self.logger.error(logging_text + error_detail)
1987 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1988 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1989
1990 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1991 db_nslcmop_update["detailed-status"] = error_detail
1992 nslcmop_operation_state = "FAILED"
1993 ns_state = "BROKEN"
1994 else:
1995 error_detail = None
1996 error_description_nsr = error_description_nslcmop = None
1997 ns_state = "READY"
1998 db_nsr_update["detailed-status"] = "Done"
1999 db_nslcmop_update["detailed-status"] = "Done"
2000 nslcmop_operation_state = "COMPLETED"
2001
2002 if db_nsr:
2003 self._write_ns_status(
2004 nsr_id=nsr_id,
2005 ns_state=ns_state,
2006 current_operation="IDLE",
2007 current_operation_id=None,
2008 error_description=error_description_nsr,
2009 error_detail=error_detail,
2010 other_update=db_nsr_update
2011 )
2012 self._write_op_status(
2013 op_id=nslcmop_id,
2014 stage="",
2015 error_message=error_description_nslcmop,
2016 operation_state=nslcmop_operation_state,
2017 other_update=db_nslcmop_update,
2018 )
2019
2020 if nslcmop_operation_state:
2021 try:
2022 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2023 "operationState": nslcmop_operation_state},
2024 loop=self.loop)
2025 except Exception as e:
2026 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2027
2028 self.logger.debug(logging_text + "Exit")
2029 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2030
2031 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2032 timeout: int = 3600, vca_type: str = None) -> bool:
2033
2034 # steps:
2035 # 1. find all relations for this VCA
2036 # 2. wait for other peers related
2037 # 3. add relations
2038
2039 try:
2040 vca_type = vca_type or "lxc_proxy_charm"
2041
2042 # STEP 1: find all relations for this VCA
2043
2044 # read nsr record
2045 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2046 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2047
2048 # this VCA data
2049 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2050
2051 # read all ns-configuration relations
2052 ns_relations = list()
2053 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2054 if db_ns_relations:
2055 for r in db_ns_relations:
2056 # check if this VCA is in the relation
2057 if my_vca.get('member-vnf-index') in\
2058 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2059 ns_relations.append(r)
2060
2061 # read all vnf-configuration relations
2062 vnf_relations = list()
2063 db_vnfd_list = db_nsr.get('vnfd-id')
2064 if db_vnfd_list:
2065 for vnfd in db_vnfd_list:
2066 db_vnf_relations = None
2067 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2068 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2069 if db_vnf_configuration:
2070 db_vnf_relations = db_vnf_configuration.get("relation", [])
2071 if db_vnf_relations:
2072 for r in db_vnf_relations:
2073 # check if this VCA is in the relation
2074 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2075 vnf_relations.append(r)
2076
2077 # if no relations, terminate
2078 if not ns_relations and not vnf_relations:
2079 self.logger.debug(logging_text + ' No relations')
2080 return True
2081
2082 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2083
2084 # add all relations
2085 start = time()
2086 while True:
2087 # check timeout
2088 now = time()
2089 if now - start >= timeout:
2090 self.logger.error(logging_text + ' : timeout adding relations')
2091 return False
2092
2093 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2094 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2095
2096 # for each defined NS relation, find the VCA's related
2097 for r in ns_relations.copy():
2098 from_vca_ee_id = None
2099 to_vca_ee_id = None
2100 from_vca_endpoint = None
2101 to_vca_endpoint = None
2102 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2103 for vca in vca_list:
2104 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2105 and vca.get('config_sw_installed'):
2106 from_vca_ee_id = vca.get('ee_id')
2107 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2108 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2109 and vca.get('config_sw_installed'):
2110 to_vca_ee_id = vca.get('ee_id')
2111 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2112 if from_vca_ee_id and to_vca_ee_id:
2113 # add relation
2114 await self.vca_map[vca_type].add_relation(
2115 ee_id_1=from_vca_ee_id,
2116 ee_id_2=to_vca_ee_id,
2117 endpoint_1=from_vca_endpoint,
2118 endpoint_2=to_vca_endpoint)
2119 # remove entry from relations list
2120 ns_relations.remove(r)
2121 else:
2122 # check failed peers
2123 try:
2124 vca_status_list = db_nsr.get('configurationStatus')
2125 if vca_status_list:
2126 for i in range(len(vca_list)):
2127 vca = vca_list[i]
2128 vca_status = vca_status_list[i]
2129 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2130 if vca_status.get('status') == 'BROKEN':
2131 # peer broken: remove relation from list
2132 ns_relations.remove(r)
2133 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2134 if vca_status.get('status') == 'BROKEN':
2135 # peer broken: remove relation from list
2136 ns_relations.remove(r)
2137 except Exception:
2138 # ignore
2139 pass
2140
2141 # for each defined VNF relation, find the VCA's related
2142 for r in vnf_relations.copy():
2143 from_vca_ee_id = None
2144 to_vca_ee_id = None
2145 from_vca_endpoint = None
2146 to_vca_endpoint = None
2147 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2148 for vca in vca_list:
2149 key_to_check = "vdu_id"
2150 if vca.get("vdu_id") is None:
2151 key_to_check = "vnfd_id"
2152 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2153 from_vca_ee_id = vca.get('ee_id')
2154 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2155 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2156 to_vca_ee_id = vca.get('ee_id')
2157 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2158 if from_vca_ee_id and to_vca_ee_id:
2159 # add relation
2160 await self.vca_map[vca_type].add_relation(
2161 ee_id_1=from_vca_ee_id,
2162 ee_id_2=to_vca_ee_id,
2163 endpoint_1=from_vca_endpoint,
2164 endpoint_2=to_vca_endpoint)
2165 # remove entry from relations list
2166 vnf_relations.remove(r)
2167 else:
2168 # check failed peers
2169 try:
2170 vca_status_list = db_nsr.get('configurationStatus')
2171 if vca_status_list:
2172 for i in range(len(vca_list)):
2173 vca = vca_list[i]
2174 vca_status = vca_status_list[i]
2175 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2176 if vca_status.get('status') == 'BROKEN':
2177 # peer broken: remove relation from list
2178 vnf_relations.remove(r)
2179 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2180 if vca_status.get('status') == 'BROKEN':
2181 # peer broken: remove relation from list
2182 vnf_relations.remove(r)
2183 except Exception:
2184 # ignore
2185 pass
2186
2187 # wait for next try
2188 await asyncio.sleep(5.0)
2189
2190 if not ns_relations and not vnf_relations:
2191 self.logger.debug('Relations added')
2192 break
2193
2194 return True
2195
2196 except Exception as e:
2197 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2198 return False
2199
2200 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2201 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2202
2203 try:
2204 k8sclustertype = k8s_instance_info["k8scluster-type"]
2205 # Instantiate kdu
2206 db_dict_install = {"collection": "nsrs",
2207 "filter": {"_id": nsr_id},
2208 "path": nsr_db_path}
2209
2210 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2211 db_dict=db_dict_install,
2212 kdu_model=k8s_instance_info["kdu-model"],
2213 kdu_name=k8s_instance_info["kdu-name"],
2214 )
2215 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2216 await self.k8scluster_map[k8sclustertype].install(
2217 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2218 kdu_model=k8s_instance_info["kdu-model"],
2219 atomic=True,
2220 params=k8params,
2221 db_dict=db_dict_install,
2222 timeout=timeout,
2223 kdu_name=k8s_instance_info["kdu-name"],
2224 namespace=k8s_instance_info["namespace"],
2225 kdu_instance=kdu_instance,
2226 )
2227 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2228
2229 # Obtain services to obtain management service ip
2230 services = await self.k8scluster_map[k8sclustertype].get_services(
2231 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2232 kdu_instance=kdu_instance,
2233 namespace=k8s_instance_info["namespace"])
2234
2235 # Obtain management service info (if exists)
2236 vnfr_update_dict = {}
2237 kdu_config = get_configuration(vnfd, kdud["name"])
2238 if kdu_config:
2239 target_ee_list = kdu_config.get("execution-environment-list", [])
2240 else:
2241 target_ee_list = []
2242
2243 if services:
2244 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2245 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2246 for mgmt_service in mgmt_services:
2247 for service in services:
2248 if service["name"].startswith(mgmt_service["name"]):
2249 # Mgmt service found, Obtain service ip
2250 ip = service.get("external_ip", service.get("cluster_ip"))
2251 if isinstance(ip, list) and len(ip) == 1:
2252 ip = ip[0]
2253
2254 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2255
2256 # Check if must update also mgmt ip at the vnf
2257 service_external_cp = mgmt_service.get("external-connection-point-ref")
2258 if service_external_cp:
2259 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2260 vnfr_update_dict["ip-address"] = ip
2261
2262 if find_in_list(
2263 target_ee_list,
2264 lambda ee: ee.get("external-connection-point-ref", "") == service_external_cp
2265 ):
2266 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2267 break
2268 else:
2269 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2270
2271 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2272 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2273
2274 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2275 if kdu_config and kdu_config.get("initial-config-primitive") and \
2276 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2277 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2278 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2279
2280 for initial_config_primitive in initial_config_primitive_list:
2281 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2282
2283 await asyncio.wait_for(
2284 self.k8scluster_map[k8sclustertype].exec_primitive(
2285 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2286 kdu_instance=kdu_instance,
2287 primitive_name=initial_config_primitive["name"],
2288 params=primitive_params_, db_dict={}),
2289 timeout=timeout)
2290
2291 except Exception as e:
2292 # Prepare update db with error and raise exception
2293 try:
2294 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2295 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2296 except Exception:
2297 # ignore to keep original exception
2298 pass
2299 # reraise original error
2300 raise
2301
2302 return kdu_instance
2303
2304 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2305 # Launch kdus if present in the descriptor
2306
2307 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2308
2309 async def _get_cluster_id(cluster_id, cluster_type):
2310 nonlocal k8scluster_id_2_uuic
2311 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2312 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2313
2314 # check if K8scluster is creating and wait look if previous tasks in process
2315 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2316 if task_dependency:
2317 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2318 self.logger.debug(logging_text + text)
2319 await asyncio.wait(task_dependency, timeout=3600)
2320
2321 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2322 if not db_k8scluster:
2323 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2324
2325 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2326 if not k8s_id:
2327 if cluster_type == "helm-chart-v3":
2328 try:
2329 # backward compatibility for existing clusters that have not been initialized for helm v3
2330 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2331 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2332 reuse_cluster_uuid=cluster_id)
2333 db_k8scluster_update = {}
2334 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2335 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2336 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2337 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2338 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2339 except Exception as e:
2340 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2341 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2342 cluster_type))
2343 else:
2344 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2345 format(cluster_id, cluster_type))
2346 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2347 return k8s_id
2348
2349 logging_text += "Deploy kdus: "
2350 step = ""
2351 try:
2352 db_nsr_update = {"_admin.deployed.K8s": []}
2353 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2354
2355 index = 0
2356 updated_cluster_list = []
2357 updated_v3_cluster_list = []
2358
2359 for vnfr_data in db_vnfrs.values():
2360 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2361 # Step 0: Prepare and set parameters
2362 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2363 vnfd_id = vnfr_data.get('vnfd-id')
2364 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2365 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2366 namespace = kdur.get("k8s-namespace")
2367 if kdur.get("helm-chart"):
2368 kdumodel = kdur["helm-chart"]
2369 # Default version: helm3, if helm-version is v2 assign v2
2370 k8sclustertype = "helm-chart-v3"
2371 self.logger.debug("kdur: {}".format(kdur))
2372 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2373 k8sclustertype = "helm-chart"
2374 elif kdur.get("juju-bundle"):
2375 kdumodel = kdur["juju-bundle"]
2376 k8sclustertype = "juju-bundle"
2377 else:
2378 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2379 "juju-bundle. Maybe an old NBI version is running".
2380 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2381 # check if kdumodel is a file and exists
2382 try:
2383 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2384 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2385 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2386 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2387 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2388 kdumodel)
2389 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2390 kdumodel = self.fs.path + filename
2391 except (asyncio.TimeoutError, asyncio.CancelledError):
2392 raise
2393 except Exception: # it is not a file
2394 pass
2395
2396 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2397 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2398 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2399
2400 # Synchronize repos
2401 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2402 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2403 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2404 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2405 if del_repo_list or added_repo_dict:
2406 if k8sclustertype == "helm-chart":
2407 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2408 updated = {'_admin.helm_charts_added.' +
2409 item: name for item, name in added_repo_dict.items()}
2410 updated_cluster_list.append(cluster_uuid)
2411 elif k8sclustertype == "helm-chart-v3":
2412 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2413 updated = {'_admin.helm_charts_v3_added.' +
2414 item: name for item, name in added_repo_dict.items()}
2415 updated_v3_cluster_list.append(cluster_uuid)
2416 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2417 "'{}' to_delete: {}, to_add: {}".
2418 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2419 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2420
2421 # Instantiate kdu
2422 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2423 kdur["kdu-name"], k8s_cluster_id)
2424 k8s_instance_info = {"kdu-instance": None,
2425 "k8scluster-uuid": cluster_uuid,
2426 "k8scluster-type": k8sclustertype,
2427 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2428 "kdu-name": kdur["kdu-name"],
2429 "kdu-model": kdumodel,
2430 "namespace": namespace}
2431 db_path = "_admin.deployed.K8s.{}".format(index)
2432 db_nsr_update[db_path] = k8s_instance_info
2433 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2434 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2435 task = asyncio.ensure_future(
2436 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2437 k8s_instance_info, k8params=desc_params, timeout=600))
2438 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2439 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2440
2441 index += 1
2442
2443 except (LcmException, asyncio.CancelledError):
2444 raise
2445 except Exception as e:
2446 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2447 if isinstance(e, (N2VCException, DbException)):
2448 self.logger.error(logging_text + msg)
2449 else:
2450 self.logger.critical(logging_text + msg, exc_info=True)
2451 raise LcmException(msg)
2452 finally:
2453 if db_nsr_update:
2454 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2455
2456 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2457 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2458 base_folder, task_instantiation_info, stage):
2459 # launch instantiate_N2VC in a asyncio task and register task object
2460 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2461 # if not found, create one entry and update database
2462 # fill db_nsr._admin.deployed.VCA.<index>
2463
2464 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2465 if "execution-environment-list" in descriptor_config:
2466 ee_list = descriptor_config.get("execution-environment-list", [])
2467 else: # other types as script are not supported
2468 ee_list = []
2469
2470 for ee_item in ee_list:
2471 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2472 ee_item.get("helm-chart")))
2473 ee_descriptor_id = ee_item.get("id")
2474 if ee_item.get("juju"):
2475 vca_name = ee_item['juju'].get('charm')
2476 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2477 if ee_item['juju'].get('cloud') == "k8s":
2478 vca_type = "k8s_proxy_charm"
2479 elif ee_item['juju'].get('proxy') is False:
2480 vca_type = "native_charm"
2481 elif ee_item.get("helm-chart"):
2482 vca_name = ee_item['helm-chart']
2483 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2484 vca_type = "helm"
2485 else:
2486 vca_type = "helm-v3"
2487 else:
2488 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2489 continue
2490
2491 vca_index = -1
2492 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2493 if not vca_deployed:
2494 continue
2495 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2496 vca_deployed.get("vdu_id") == vdu_id and \
2497 vca_deployed.get("kdu_name") == kdu_name and \
2498 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2499 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2500 break
2501 else:
2502 # not found, create one.
2503 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2504 if vdu_id:
2505 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2506 elif kdu_name:
2507 target += "/kdu/{}".format(kdu_name)
2508 vca_deployed = {
2509 "target_element": target,
2510 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2511 "member-vnf-index": member_vnf_index,
2512 "vdu_id": vdu_id,
2513 "kdu_name": kdu_name,
2514 "vdu_count_index": vdu_index,
2515 "operational-status": "init", # TODO revise
2516 "detailed-status": "", # TODO revise
2517 "step": "initial-deploy", # TODO revise
2518 "vnfd_id": vnfd_id,
2519 "vdu_name": vdu_name,
2520 "type": vca_type,
2521 "ee_descriptor_id": ee_descriptor_id
2522 }
2523 vca_index += 1
2524
2525 # create VCA and configurationStatus in db
2526 db_dict = {
2527 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2528 "configurationStatus.{}".format(vca_index): dict()
2529 }
2530 self.update_db_2("nsrs", nsr_id, db_dict)
2531
2532 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2533
2534 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2535 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2536 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2537
2538 # Launch task
2539 task_n2vc = asyncio.ensure_future(
2540 self.instantiate_N2VC(
2541 logging_text=logging_text,
2542 vca_index=vca_index,
2543 nsi_id=nsi_id,
2544 db_nsr=db_nsr,
2545 db_vnfr=db_vnfr,
2546 vdu_id=vdu_id,
2547 kdu_name=kdu_name,
2548 vdu_index=vdu_index,
2549 deploy_params=deploy_params,
2550 config_descriptor=descriptor_config,
2551 base_folder=base_folder,
2552 nslcmop_id=nslcmop_id,
2553 stage=stage,
2554 vca_type=vca_type,
2555 vca_name=vca_name,
2556 ee_config_descriptor=ee_item
2557 )
2558 )
2559 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2560 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2561 member_vnf_index or "", vdu_id or "")
2562
2563 @staticmethod
2564 def _create_nslcmop(nsr_id, operation, params):
2565 """
2566 Creates a ns-lcm-opp content to be stored at database.
2567 :param nsr_id: internal id of the instance
2568 :param operation: instantiate, terminate, scale, action, ...
2569 :param params: user parameters for the operation
2570 :return: dictionary following SOL005 format
2571 """
2572 # Raise exception if invalid arguments
2573 if not (nsr_id and operation and params):
2574 raise LcmException(
2575 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2576 now = time()
2577 _id = str(uuid4())
2578 nslcmop = {
2579 "id": _id,
2580 "_id": _id,
2581 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2582 "operationState": "PROCESSING",
2583 "statusEnteredTime": now,
2584 "nsInstanceId": nsr_id,
2585 "lcmOperationType": operation,
2586 "startTime": now,
2587 "isAutomaticInvocation": False,
2588 "operationParams": params,
2589 "isCancelPending": False,
2590 "links": {
2591 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2592 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2593 }
2594 }
2595 return nslcmop
2596
2597 def _format_additional_params(self, params):
2598 params = params or {}
2599 for key, value in params.items():
2600 if str(value).startswith("!!yaml "):
2601 params[key] = yaml.safe_load(value[7:])
2602 return params
2603
2604 def _get_terminate_primitive_params(self, seq, vnf_index):
2605 primitive = seq.get('name')
2606 primitive_params = {}
2607 params = {
2608 "member_vnf_index": vnf_index,
2609 "primitive": primitive,
2610 "primitive_params": primitive_params,
2611 }
2612 desc_params = {}
2613 return self._map_primitive_params(seq, params, desc_params)
2614
2615 # sub-operations
2616
2617 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2618 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2619 if op.get('operationState') == 'COMPLETED':
2620 # b. Skip sub-operation
2621 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2622 return self.SUBOPERATION_STATUS_SKIP
2623 else:
2624 # c. retry executing sub-operation
2625 # The sub-operation exists, and operationState != 'COMPLETED'
2626 # Update operationState = 'PROCESSING' to indicate a retry.
2627 operationState = 'PROCESSING'
2628 detailed_status = 'In progress'
2629 self._update_suboperation_status(
2630 db_nslcmop, op_index, operationState, detailed_status)
2631 # Return the sub-operation index
2632 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2633 # with arguments extracted from the sub-operation
2634 return op_index
2635
2636 # Find a sub-operation where all keys in a matching dictionary must match
2637 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2638 def _find_suboperation(self, db_nslcmop, match):
2639 if db_nslcmop and match:
2640 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2641 for i, op in enumerate(op_list):
2642 if all(op.get(k) == match[k] for k in match):
2643 return i
2644 return self.SUBOPERATION_STATUS_NOT_FOUND
2645
2646 # Update status for a sub-operation given its index
2647 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2648 # Update DB for HA tasks
2649 q_filter = {'_id': db_nslcmop['_id']}
2650 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2651 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2652 self.db.set_one("nslcmops",
2653 q_filter=q_filter,
2654 update_dict=update_dict,
2655 fail_on_empty=False)
2656
2657 # Add sub-operation, return the index of the added sub-operation
2658 # Optionally, set operationState, detailed-status, and operationType
2659 # Status and type are currently set for 'scale' sub-operations:
2660 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2661 # 'detailed-status' : status message
2662 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2663 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2664 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2665 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2666 RO_nsr_id=None, RO_scaling_info=None):
2667 if not db_nslcmop:
2668 return self.SUBOPERATION_STATUS_NOT_FOUND
2669 # Get the "_admin.operations" list, if it exists
2670 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2671 op_list = db_nslcmop_admin.get('operations')
2672 # Create or append to the "_admin.operations" list
2673 new_op = {'member_vnf_index': vnf_index,
2674 'vdu_id': vdu_id,
2675 'vdu_count_index': vdu_count_index,
2676 'primitive': primitive,
2677 'primitive_params': mapped_primitive_params}
2678 if operationState:
2679 new_op['operationState'] = operationState
2680 if detailed_status:
2681 new_op['detailed-status'] = detailed_status
2682 if operationType:
2683 new_op['lcmOperationType'] = operationType
2684 if RO_nsr_id:
2685 new_op['RO_nsr_id'] = RO_nsr_id
2686 if RO_scaling_info:
2687 new_op['RO_scaling_info'] = RO_scaling_info
2688 if not op_list:
2689 # No existing operations, create key 'operations' with current operation as first list element
2690 db_nslcmop_admin.update({'operations': [new_op]})
2691 op_list = db_nslcmop_admin.get('operations')
2692 else:
2693 # Existing operations, append operation to list
2694 op_list.append(new_op)
2695
2696 db_nslcmop_update = {'_admin.operations': op_list}
2697 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2698 op_index = len(op_list) - 1
2699 return op_index
2700
2701 # Helper methods for scale() sub-operations
2702
2703 # pre-scale/post-scale:
2704 # Check for 3 different cases:
2705 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2706 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2707 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2708 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2709 operationType, RO_nsr_id=None, RO_scaling_info=None):
2710 # Find this sub-operation
2711 if RO_nsr_id and RO_scaling_info:
2712 operationType = 'SCALE-RO'
2713 match = {
2714 'member_vnf_index': vnf_index,
2715 'RO_nsr_id': RO_nsr_id,
2716 'RO_scaling_info': RO_scaling_info,
2717 }
2718 else:
2719 match = {
2720 'member_vnf_index': vnf_index,
2721 'primitive': vnf_config_primitive,
2722 'primitive_params': primitive_params,
2723 'lcmOperationType': operationType
2724 }
2725 op_index = self._find_suboperation(db_nslcmop, match)
2726 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2727 # a. New sub-operation
2728 # The sub-operation does not exist, add it.
2729 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2730 # The following parameters are set to None for all kind of scaling:
2731 vdu_id = None
2732 vdu_count_index = None
2733 vdu_name = None
2734 if RO_nsr_id and RO_scaling_info:
2735 vnf_config_primitive = None
2736 primitive_params = None
2737 else:
2738 RO_nsr_id = None
2739 RO_scaling_info = None
2740 # Initial status for sub-operation
2741 operationState = 'PROCESSING'
2742 detailed_status = 'In progress'
2743 # Add sub-operation for pre/post-scaling (zero or more operations)
2744 self._add_suboperation(db_nslcmop,
2745 vnf_index,
2746 vdu_id,
2747 vdu_count_index,
2748 vdu_name,
2749 vnf_config_primitive,
2750 primitive_params,
2751 operationState,
2752 detailed_status,
2753 operationType,
2754 RO_nsr_id,
2755 RO_scaling_info)
2756 return self.SUBOPERATION_STATUS_NEW
2757 else:
2758 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2759 # or op_index (operationState != 'COMPLETED')
2760 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2761
2762 # Function to return execution_environment id
2763
2764 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2765 # TODO vdu_index_count
2766 for vca in vca_deployed_list:
2767 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2768 return vca["ee_id"]
2769
2770 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2771 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2772 """
2773 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2774 :param logging_text:
2775 :param db_nslcmop:
2776 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2777 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2778 :param vca_index: index in the database _admin.deployed.VCA
2779 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2780 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2781 not executed properly
2782 :param scaling_in: True destroys the application, False destroys the model
2783 :return: None or exception
2784 """
2785
2786 self.logger.debug(
2787 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2788 vca_index, vca_deployed, config_descriptor, destroy_ee
2789 )
2790 )
2791
2792 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2793
2794 # execute terminate_primitives
2795 if exec_primitives:
2796 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2797 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2798 vdu_id = vca_deployed.get("vdu_id")
2799 vdu_count_index = vca_deployed.get("vdu_count_index")
2800 vdu_name = vca_deployed.get("vdu_name")
2801 vnf_index = vca_deployed.get("member-vnf-index")
2802 if terminate_primitives and vca_deployed.get("needed_terminate"):
2803 for seq in terminate_primitives:
2804 # For each sequence in list, get primitive and call _ns_execute_primitive()
2805 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2806 vnf_index, seq.get("name"))
2807 self.logger.debug(logging_text + step)
2808 # Create the primitive for each sequence, i.e. "primitive": "touch"
2809 primitive = seq.get('name')
2810 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2811
2812 # Add sub-operation
2813 self._add_suboperation(db_nslcmop,
2814 vnf_index,
2815 vdu_id,
2816 vdu_count_index,
2817 vdu_name,
2818 primitive,
2819 mapped_primitive_params)
2820 # Sub-operations: Call _ns_execute_primitive() instead of action()
2821 try:
2822 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2823 mapped_primitive_params,
2824 vca_type=vca_type)
2825 except LcmException:
2826 # this happens when VCA is not deployed. In this case it is not needed to terminate
2827 continue
2828 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2829 if result not in result_ok:
2830 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2831 "error {}".format(seq.get("name"), vnf_index, result_detail))
2832 # set that this VCA do not need terminated
2833 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2834 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2835
2836 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2837 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2838
2839 if destroy_ee:
2840 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2841
2842 async def _delete_all_N2VC(self, db_nsr: dict):
2843 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2844 namespace = "." + db_nsr["_id"]
2845 try:
2846 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2847 except N2VCNotFound: # already deleted. Skip
2848 pass
2849 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2850
2851 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2852 """
2853 Terminates a deployment from RO
2854 :param logging_text:
2855 :param nsr_deployed: db_nsr._admin.deployed
2856 :param nsr_id:
2857 :param nslcmop_id:
2858 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2859 this method will update only the index 2, but it will write on database the concatenated content of the list
2860 :return:
2861 """
2862 db_nsr_update = {}
2863 failed_detail = []
2864 ro_nsr_id = ro_delete_action = None
2865 if nsr_deployed and nsr_deployed.get("RO"):
2866 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2867 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2868 try:
2869 if ro_nsr_id:
2870 stage[2] = "Deleting ns from VIM."
2871 db_nsr_update["detailed-status"] = " ".join(stage)
2872 self._write_op_status(nslcmop_id, stage)
2873 self.logger.debug(logging_text + stage[2])
2874 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2875 self._write_op_status(nslcmop_id, stage)
2876 desc = await self.RO.delete("ns", ro_nsr_id)
2877 ro_delete_action = desc["action_id"]
2878 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2879 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2880 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2881 if ro_delete_action:
2882 # wait until NS is deleted from VIM
2883 stage[2] = "Waiting ns deleted from VIM."
2884 detailed_status_old = None
2885 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2886 ro_delete_action))
2887 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2888 self._write_op_status(nslcmop_id, stage)
2889
2890 delete_timeout = 20 * 60 # 20 minutes
2891 while delete_timeout > 0:
2892 desc = await self.RO.show(
2893 "ns",
2894 item_id_name=ro_nsr_id,
2895 extra_item="action",
2896 extra_item_id=ro_delete_action)
2897
2898 # deploymentStatus
2899 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2900
2901 ns_status, ns_status_info = self.RO.check_action_status(desc)
2902 if ns_status == "ERROR":
2903 raise ROclient.ROClientException(ns_status_info)
2904 elif ns_status == "BUILD":
2905 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2906 elif ns_status == "ACTIVE":
2907 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2908 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2909 break
2910 else:
2911 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2912 if stage[2] != detailed_status_old:
2913 detailed_status_old = stage[2]
2914 db_nsr_update["detailed-status"] = " ".join(stage)
2915 self._write_op_status(nslcmop_id, stage)
2916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2917 await asyncio.sleep(5, loop=self.loop)
2918 delete_timeout -= 5
2919 else: # delete_timeout <= 0:
2920 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2921
2922 except Exception as e:
2923 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2924 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2925 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2926 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2927 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2928 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2929 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2930 failed_detail.append("delete conflict: {}".format(e))
2931 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2932 else:
2933 failed_detail.append("delete error: {}".format(e))
2934 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2935
2936 # Delete nsd
2937 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2938 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2939 try:
2940 stage[2] = "Deleting nsd from RO."
2941 db_nsr_update["detailed-status"] = " ".join(stage)
2942 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2943 self._write_op_status(nslcmop_id, stage)
2944 await self.RO.delete("nsd", ro_nsd_id)
2945 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2946 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2947 except Exception as e:
2948 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2949 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2950 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2951 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2952 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2953 self.logger.debug(logging_text + failed_detail[-1])
2954 else:
2955 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2956 self.logger.error(logging_text + failed_detail[-1])
2957
2958 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2959 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2960 if not vnf_deployed or not vnf_deployed["id"]:
2961 continue
2962 try:
2963 ro_vnfd_id = vnf_deployed["id"]
2964 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2965 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2966 db_nsr_update["detailed-status"] = " ".join(stage)
2967 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2968 self._write_op_status(nslcmop_id, stage)
2969 await self.RO.delete("vnfd", ro_vnfd_id)
2970 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2971 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2972 except Exception as e:
2973 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2974 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2975 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2976 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2977 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2978 self.logger.debug(logging_text + failed_detail[-1])
2979 else:
2980 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2981 self.logger.error(logging_text + failed_detail[-1])
2982
2983 if failed_detail:
2984 stage[2] = "Error deleting from VIM"
2985 else:
2986 stage[2] = "Deleted from VIM"
2987 db_nsr_update["detailed-status"] = " ".join(stage)
2988 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2989 self._write_op_status(nslcmop_id, stage)
2990
2991 if failed_detail:
2992 raise LcmException("; ".join(failed_detail))
2993
2994 async def terminate(self, nsr_id, nslcmop_id):
2995 # Try to lock HA task here
2996 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2997 if not task_is_locked_by_me:
2998 return
2999
3000 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3001 self.logger.debug(logging_text + "Enter")
3002 timeout_ns_terminate = self.timeout_ns_terminate
3003 db_nsr = None
3004 db_nslcmop = None
3005 operation_params = None
3006 exc = None
3007 error_list = [] # annotates all failed error messages
3008 db_nslcmop_update = {}
3009 autoremove = False # autoremove after terminated
3010 tasks_dict_info = {}
3011 db_nsr_update = {}
3012 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3013 # ^ contains [stage, step, VIM-status]
3014 try:
3015 # wait for any previous tasks in process
3016 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3017
3018 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3019 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3020 operation_params = db_nslcmop.get("operationParams") or {}
3021 if operation_params.get("timeout_ns_terminate"):
3022 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3023 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3024 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3025
3026 db_nsr_update["operational-status"] = "terminating"
3027 db_nsr_update["config-status"] = "terminating"
3028 self._write_ns_status(
3029 nsr_id=nsr_id,
3030 ns_state="TERMINATING",
3031 current_operation="TERMINATING",
3032 current_operation_id=nslcmop_id,
3033 other_update=db_nsr_update
3034 )
3035 self._write_op_status(
3036 op_id=nslcmop_id,
3037 queuePosition=0,
3038 stage=stage
3039 )
3040 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3041 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3042 return
3043
3044 stage[1] = "Getting vnf descriptors from db."
3045 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3046 db_vnfds_from_id = {}
3047 db_vnfds_from_member_index = {}
3048 # Loop over VNFRs
3049 for vnfr in db_vnfrs_list:
3050 vnfd_id = vnfr["vnfd-id"]
3051 if vnfd_id not in db_vnfds_from_id:
3052 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3053 db_vnfds_from_id[vnfd_id] = vnfd
3054 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3055
3056 # Destroy individual execution environments when there are terminating primitives.
3057 # Rest of EE will be deleted at once
3058 # TODO - check before calling _destroy_N2VC
3059 # if not operation_params.get("skip_terminate_primitives"):#
3060 # or not vca.get("needed_terminate"):
3061 stage[0] = "Stage 2/3 execute terminating primitives."
3062 self.logger.debug(logging_text + stage[0])
3063 stage[1] = "Looking execution environment that needs terminate."
3064 self.logger.debug(logging_text + stage[1])
3065
3066 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3067 config_descriptor = None
3068 if not vca or not vca.get("ee_id"):
3069 continue
3070 if not vca.get("member-vnf-index"):
3071 # ns
3072 config_descriptor = db_nsr.get("ns-configuration")
3073 elif vca.get("vdu_id"):
3074 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3075 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3076 elif vca.get("kdu_name"):
3077 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3078 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3079 else:
3080 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3081 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3082 vca_type = vca.get("type")
3083 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3084 vca.get("needed_terminate"))
3085 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3086 # pending native charms
3087 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3088 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3089 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3090 task = asyncio.ensure_future(
3091 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3092 destroy_ee, exec_terminate_primitives))
3093 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3094
3095 # wait for pending tasks of terminate primitives
3096 if tasks_dict_info:
3097 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3098 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3099 min(self.timeout_charm_delete, timeout_ns_terminate),
3100 stage, nslcmop_id)
3101 tasks_dict_info.clear()
3102 if error_list:
3103 return # raise LcmException("; ".join(error_list))
3104
3105 # remove All execution environments at once
3106 stage[0] = "Stage 3/3 delete all."
3107
3108 if nsr_deployed.get("VCA"):
3109 stage[1] = "Deleting all execution environments."
3110 self.logger.debug(logging_text + stage[1])
3111 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3112 timeout=self.timeout_charm_delete))
3113 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3114 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3115
3116 # Delete from k8scluster
3117 stage[1] = "Deleting KDUs."
3118 self.logger.debug(logging_text + stage[1])
3119 # print(nsr_deployed)
3120 for kdu in get_iterable(nsr_deployed, "K8s"):
3121 if not kdu or not kdu.get("kdu-instance"):
3122 continue
3123 kdu_instance = kdu.get("kdu-instance")
3124 if kdu.get("k8scluster-type") in self.k8scluster_map:
3125 task_delete_kdu_instance = asyncio.ensure_future(
3126 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3127 cluster_uuid=kdu.get("k8scluster-uuid"),
3128 kdu_instance=kdu_instance))
3129 else:
3130 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3131 format(kdu.get("k8scluster-type")))
3132 continue
3133 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3134
3135 # remove from RO
3136 stage[1] = "Deleting ns from VIM."
3137 if self.ng_ro:
3138 task_delete_ro = asyncio.ensure_future(
3139 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3140 else:
3141 task_delete_ro = asyncio.ensure_future(
3142 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3143 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3144
3145 # rest of staff will be done at finally
3146
3147 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3148 self.logger.error(logging_text + "Exit Exception {}".format(e))
3149 exc = e
3150 except asyncio.CancelledError:
3151 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3152 exc = "Operation was cancelled"
3153 except Exception as e:
3154 exc = traceback.format_exc()
3155 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3156 finally:
3157 if exc:
3158 error_list.append(str(exc))
3159 try:
3160 # wait for pending tasks
3161 if tasks_dict_info:
3162 stage[1] = "Waiting for terminate pending tasks."
3163 self.logger.debug(logging_text + stage[1])
3164 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3165 stage, nslcmop_id)
3166 stage[1] = stage[2] = ""
3167 except asyncio.CancelledError:
3168 error_list.append("Cancelled")
3169 # TODO cancell all tasks
3170 except Exception as exc:
3171 error_list.append(str(exc))
3172 # update status at database
3173 if error_list:
3174 error_detail = "; ".join(error_list)
3175 # self.logger.error(logging_text + error_detail)
3176 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3177 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3178
3179 db_nsr_update["operational-status"] = "failed"
3180 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3181 db_nslcmop_update["detailed-status"] = error_detail
3182 nslcmop_operation_state = "FAILED"
3183 ns_state = "BROKEN"
3184 else:
3185 error_detail = None
3186 error_description_nsr = error_description_nslcmop = None
3187 ns_state = "NOT_INSTANTIATED"
3188 db_nsr_update["operational-status"] = "terminated"
3189 db_nsr_update["detailed-status"] = "Done"
3190 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3191 db_nslcmop_update["detailed-status"] = "Done"
3192 nslcmop_operation_state = "COMPLETED"
3193
3194 if db_nsr:
3195 self._write_ns_status(
3196 nsr_id=nsr_id,
3197 ns_state=ns_state,
3198 current_operation="IDLE",
3199 current_operation_id=None,
3200 error_description=error_description_nsr,
3201 error_detail=error_detail,
3202 other_update=db_nsr_update
3203 )
3204 self._write_op_status(
3205 op_id=nslcmop_id,
3206 stage="",
3207 error_message=error_description_nslcmop,
3208 operation_state=nslcmop_operation_state,
3209 other_update=db_nslcmop_update,
3210 )
3211 if ns_state == "NOT_INSTANTIATED":
3212 try:
3213 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3214 except DbException as e:
3215 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3216 format(nsr_id, e))
3217 if operation_params:
3218 autoremove = operation_params.get("autoremove", False)
3219 if nslcmop_operation_state:
3220 try:
3221 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3222 "operationState": nslcmop_operation_state,
3223 "autoremove": autoremove},
3224 loop=self.loop)
3225 except Exception as e:
3226 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3227
3228 self.logger.debug(logging_text + "Exit")
3229 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3230
3231 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3232 time_start = time()
3233 error_detail_list = []
3234 error_list = []
3235 pending_tasks = list(created_tasks_info.keys())
3236 num_tasks = len(pending_tasks)
3237 num_done = 0
3238 stage[1] = "{}/{}.".format(num_done, num_tasks)
3239 self._write_op_status(nslcmop_id, stage)
3240 while pending_tasks:
3241 new_error = None
3242 _timeout = timeout + time_start - time()
3243 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3244 return_when=asyncio.FIRST_COMPLETED)
3245 num_done += len(done)
3246 if not done: # Timeout
3247 for task in pending_tasks:
3248 new_error = created_tasks_info[task] + ": Timeout"
3249 error_detail_list.append(new_error)
3250 error_list.append(new_error)
3251 break
3252 for task in done:
3253 if task.cancelled():
3254 exc = "Cancelled"
3255 else:
3256 exc = task.exception()
3257 if exc:
3258 if isinstance(exc, asyncio.TimeoutError):
3259 exc = "Timeout"
3260 new_error = created_tasks_info[task] + ": {}".format(exc)
3261 error_list.append(created_tasks_info[task])
3262 error_detail_list.append(new_error)
3263 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3264 K8sException, NgRoException)):
3265 self.logger.error(logging_text + new_error)
3266 else:
3267 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3268 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3269 else:
3270 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3271 stage[1] = "{}/{}.".format(num_done, num_tasks)
3272 if new_error:
3273 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3274 if nsr_id: # update also nsr
3275 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3276 "errorDetail": ". ".join(error_detail_list)})
3277 self._write_op_status(nslcmop_id, stage)
3278 return error_detail_list
3279
3280 @staticmethod
3281 def _map_primitive_params(primitive_desc, params, instantiation_params):
3282 """
3283 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3284 The default-value is used. If it is between < > it look for a value at instantiation_params
3285 :param primitive_desc: portion of VNFD/NSD that describes primitive
3286 :param params: Params provided by user
3287 :param instantiation_params: Instantiation params provided by user
3288 :return: a dictionary with the calculated params
3289 """
3290 calculated_params = {}
3291 for parameter in primitive_desc.get("parameter", ()):
3292 param_name = parameter["name"]
3293 if param_name in params:
3294 calculated_params[param_name] = params[param_name]
3295 elif "default-value" in parameter or "value" in parameter:
3296 if "value" in parameter:
3297 calculated_params[param_name] = parameter["value"]
3298 else:
3299 calculated_params[param_name] = parameter["default-value"]
3300 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3301 and calculated_params[param_name].endswith(">"):
3302 if calculated_params[param_name][1:-1] in instantiation_params:
3303 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3304 else:
3305 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3306 format(calculated_params[param_name], primitive_desc["name"]))
3307 else:
3308 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3309 format(param_name, primitive_desc["name"]))
3310
3311 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3312 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3313 default_flow_style=True, width=256)
3314 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3315 calculated_params[param_name] = calculated_params[param_name][7:]
3316 if parameter.get("data-type") == "INTEGER":
3317 try:
3318 calculated_params[param_name] = int(calculated_params[param_name])
3319 except ValueError: # error converting string to int
3320 raise LcmException(
3321 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3322 elif parameter.get("data-type") == "BOOLEAN":
3323 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3324
3325 # add always ns_config_info if primitive name is config
3326 if primitive_desc["name"] == "config":
3327 if "ns_config_info" in instantiation_params:
3328 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3329 return calculated_params
3330
3331 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3332 ee_descriptor_id=None):
3333 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3334 for vca in deployed_vca:
3335 if not vca:
3336 continue
3337 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3338 continue
3339 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3340 continue
3341 if kdu_name and kdu_name != vca["kdu_name"]:
3342 continue
3343 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3344 continue
3345 break
3346 else:
3347 # vca_deployed not found
3348 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3349 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3350 ee_descriptor_id))
3351 # get ee_id
3352 ee_id = vca.get("ee_id")
3353 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3354 if not ee_id:
3355 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3356 "execution environment"
3357 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3358 return ee_id, vca_type
3359
3360 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3361 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3362 try:
3363 if primitive == "config":
3364 primitive_params = {"params": primitive_params}
3365
3366 vca_type = vca_type or "lxc_proxy_charm"
3367
3368 while retries >= 0:
3369 try:
3370 output = await asyncio.wait_for(
3371 self.vca_map[vca_type].exec_primitive(
3372 ee_id=ee_id,
3373 primitive_name=primitive,
3374 params_dict=primitive_params,
3375 progress_timeout=self.timeout_progress_primitive,
3376 total_timeout=self.timeout_primitive,
3377 db_dict=db_dict),
3378 timeout=timeout or self.timeout_primitive)
3379 # execution was OK
3380 break
3381 except asyncio.CancelledError:
3382 raise
3383 except Exception as e: # asyncio.TimeoutError
3384 if isinstance(e, asyncio.TimeoutError):
3385 e = "Timeout"
3386 retries -= 1
3387 if retries >= 0:
3388 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3389 # wait and retry
3390 await asyncio.sleep(retries_interval, loop=self.loop)
3391 else:
3392 return 'FAILED', str(e)
3393
3394 return 'COMPLETED', output
3395
3396 except (LcmException, asyncio.CancelledError):
3397 raise
3398 except Exception as e:
3399 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3400
3401 async def action(self, nsr_id, nslcmop_id):
3402 # Try to lock HA task here
3403 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3404 if not task_is_locked_by_me:
3405 return
3406
3407 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3408 self.logger.debug(logging_text + "Enter")
3409 # get all needed from database
3410 db_nsr = None
3411 db_nslcmop = None
3412 db_nsr_update = {}
3413 db_nslcmop_update = {}
3414 nslcmop_operation_state = None
3415 error_description_nslcmop = None
3416 exc = None
3417 try:
3418 # wait for any previous tasks in process
3419 step = "Waiting for previous operations to terminate"
3420 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3421
3422 self._write_ns_status(
3423 nsr_id=nsr_id,
3424 ns_state=None,
3425 current_operation="RUNNING ACTION",
3426 current_operation_id=nslcmop_id
3427 )
3428
3429 step = "Getting information from database"
3430 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3431 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3432
3433 nsr_deployed = db_nsr["_admin"].get("deployed")
3434 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3435 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3436 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3437 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3438 primitive = db_nslcmop["operationParams"]["primitive"]
3439 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3440 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3441
3442 if vnf_index:
3443 step = "Getting vnfr from database"
3444 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3445 step = "Getting vnfd from database"
3446 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3447 else:
3448 step = "Getting nsd from database"
3449 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3450
3451 # for backward compatibility
3452 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3453 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3454 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3455 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3456
3457 # look for primitive
3458 config_primitive_desc = descriptor_configuration = None
3459 if vdu_id:
3460 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3461 elif kdu_name:
3462 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3463 elif vnf_index:
3464 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3465 else:
3466 descriptor_configuration = db_nsd.get("ns-configuration")
3467
3468 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3469 for config_primitive in descriptor_configuration["config-primitive"]:
3470 if config_primitive["name"] == primitive:
3471 config_primitive_desc = config_primitive
3472 break
3473
3474 if not config_primitive_desc:
3475 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3476 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3477 format(primitive))
3478 primitive_name = primitive
3479 ee_descriptor_id = None
3480 else:
3481 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3482 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3483
3484 if vnf_index:
3485 if vdu_id:
3486 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3487 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3488 elif kdu_name:
3489 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3490 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3491 else:
3492 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3493 else:
3494 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3495 if kdu_name and get_configuration(db_vnfd, kdu_name):
3496 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3497 actions = set()
3498 for primitive in kdu_configuration.get("initial-config-primitive", []):
3499 actions.add(primitive["name"])
3500 for primitive in kdu_configuration.get("config-primitive", []):
3501 actions.add(primitive["name"])
3502 kdu_action = True if primitive_name in actions else False
3503
3504 # TODO check if ns is in a proper status
3505 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3506 # kdur and desc_params already set from before
3507 if primitive_params:
3508 desc_params.update(primitive_params)
3509 # TODO Check if we will need something at vnf level
3510 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3511 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3512 break
3513 else:
3514 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3515
3516 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3517 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3518 raise LcmException(msg)
3519
3520 db_dict = {"collection": "nsrs",
3521 "filter": {"_id": nsr_id},
3522 "path": "_admin.deployed.K8s.{}".format(index)}
3523 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3524 step = "Executing kdu {}".format(primitive_name)
3525 if primitive_name == "upgrade":
3526 if desc_params.get("kdu_model"):
3527 kdu_model = desc_params.get("kdu_model")
3528 del desc_params["kdu_model"]
3529 else:
3530 kdu_model = kdu.get("kdu-model")
3531 parts = kdu_model.split(sep=":")
3532 if len(parts) == 2:
3533 kdu_model = parts[0]
3534
3535 detailed_status = await asyncio.wait_for(
3536 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3537 cluster_uuid=kdu.get("k8scluster-uuid"),
3538 kdu_instance=kdu.get("kdu-instance"),
3539 atomic=True, kdu_model=kdu_model,
3540 params=desc_params, db_dict=db_dict,
3541 timeout=timeout_ns_action),
3542 timeout=timeout_ns_action + 10)
3543 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3544 elif primitive_name == "rollback":
3545 detailed_status = await asyncio.wait_for(
3546 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3547 cluster_uuid=kdu.get("k8scluster-uuid"),
3548 kdu_instance=kdu.get("kdu-instance"),
3549 db_dict=db_dict),
3550 timeout=timeout_ns_action)
3551 elif primitive_name == "status":
3552 detailed_status = await asyncio.wait_for(
3553 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3554 cluster_uuid=kdu.get("k8scluster-uuid"),
3555 kdu_instance=kdu.get("kdu-instance")),
3556 timeout=timeout_ns_action)
3557 else:
3558 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3559 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3560
3561 detailed_status = await asyncio.wait_for(
3562 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3563 cluster_uuid=kdu.get("k8scluster-uuid"),
3564 kdu_instance=kdu_instance,
3565 primitive_name=primitive_name,
3566 params=params, db_dict=db_dict,
3567 timeout=timeout_ns_action),
3568 timeout=timeout_ns_action)
3569
3570 if detailed_status:
3571 nslcmop_operation_state = 'COMPLETED'
3572 else:
3573 detailed_status = ''
3574 nslcmop_operation_state = 'FAILED'
3575 else:
3576 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3577 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3578 ee_descriptor_id=ee_descriptor_id)
3579 db_nslcmop_notif = {"collection": "nslcmops",
3580 "filter": {"_id": nslcmop_id},
3581 "path": "admin.VCA"}
3582 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3583 ee_id,
3584 primitive=primitive_name,
3585 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3586 timeout=timeout_ns_action,
3587 vca_type=vca_type,
3588 db_dict=db_nslcmop_notif)
3589
3590 db_nslcmop_update["detailed-status"] = detailed_status
3591 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3592 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3593 detailed_status))
3594 return # database update is called inside finally
3595
3596 except (DbException, LcmException, N2VCException, K8sException) as e:
3597 self.logger.error(logging_text + "Exit Exception {}".format(e))
3598 exc = e
3599 except asyncio.CancelledError:
3600 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3601 exc = "Operation was cancelled"
3602 except asyncio.TimeoutError:
3603 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3604 exc = "Timeout"
3605 except Exception as e:
3606 exc = traceback.format_exc()
3607 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3608 finally:
3609 if exc:
3610 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3611 "FAILED {}: {}".format(step, exc)
3612 nslcmop_operation_state = "FAILED"
3613 if db_nsr:
3614 self._write_ns_status(
3615 nsr_id=nsr_id,
3616 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3617 current_operation="IDLE",
3618 current_operation_id=None,
3619 # error_description=error_description_nsr,
3620 # error_detail=error_detail,
3621 other_update=db_nsr_update
3622 )
3623
3624 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3625 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3626
3627 if nslcmop_operation_state:
3628 try:
3629 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3630 "operationState": nslcmop_operation_state},
3631 loop=self.loop)
3632 except Exception as e:
3633 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3634 self.logger.debug(logging_text + "Exit")
3635 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3636 return nslcmop_operation_state, detailed_status
3637
3638 async def scale(self, nsr_id, nslcmop_id):
3639 # Try to lock HA task here
3640 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3641 if not task_is_locked_by_me:
3642 return
3643
3644 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3645 stage = ['', '', '']
3646 tasks_dict_info = {}
3647 # ^ stage, step, VIM progress
3648 self.logger.debug(logging_text + "Enter")
3649 # get all needed from database
3650 db_nsr = None
3651 db_nslcmop_update = {}
3652 db_nsr_update = {}
3653 exc = None
3654 # in case of error, indicates what part of scale was failed to put nsr at error status
3655 scale_process = None
3656 old_operational_status = ""
3657 old_config_status = ""
3658 nsi_id = None
3659 try:
3660 # wait for any previous tasks in process
3661 step = "Waiting for previous operations to terminate"
3662 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3663 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3664 current_operation="SCALING", current_operation_id=nslcmop_id)
3665
3666 step = "Getting nslcmop from database"
3667 self.logger.debug(step + " after having waited for previous tasks to be completed")
3668 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3669
3670 step = "Getting nsr from database"
3671 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3672 old_operational_status = db_nsr["operational-status"]
3673 old_config_status = db_nsr["config-status"]
3674
3675 step = "Parsing scaling parameters"
3676 db_nsr_update["operational-status"] = "scaling"
3677 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3678 nsr_deployed = db_nsr["_admin"].get("deployed")
3679
3680 #######
3681 nsr_deployed = db_nsr["_admin"].get("deployed")
3682 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3683 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3684 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3685 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3686 #######
3687
3688 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3689 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3690 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3691 # for backward compatibility
3692 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3693 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3694 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3695 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3696
3697 step = "Getting vnfr from database"
3698 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3699
3700 step = "Getting vnfd from database"
3701 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3702
3703 base_folder = db_vnfd["_admin"]["storage"]
3704
3705 step = "Getting scaling-group-descriptor"
3706 scaling_descriptor = find_in_list(
3707 get_scaling_aspect(
3708 db_vnfd
3709 ),
3710 lambda scale_desc: scale_desc["name"] == scaling_group
3711 )
3712 if not scaling_descriptor:
3713 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3714 "at vnfd:scaling-group-descriptor".format(scaling_group))
3715
3716 step = "Sending scale order to VIM"
3717 # TODO check if ns is in a proper status
3718 nb_scale_op = 0
3719 if not db_nsr["_admin"].get("scaling-group"):
3720 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3721 admin_scale_index = 0
3722 else:
3723 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3724 if admin_scale_info["name"] == scaling_group:
3725 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3726 break
3727 else: # not found, set index one plus last element and add new entry with the name
3728 admin_scale_index += 1
3729 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3730 RO_scaling_info = []
3731 VCA_scaling_info = []
3732 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3733 if scaling_type == "SCALE_OUT":
3734 if "aspect-delta-details" not in scaling_descriptor:
3735 raise LcmException(
3736 "Aspect delta details not fount in scaling descriptor {}".format(
3737 scaling_descriptor["name"]
3738 )
3739 )
3740 # count if max-instance-count is reached
3741 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3742
3743 vdu_scaling_info["scaling_direction"] = "OUT"
3744 vdu_scaling_info["vdu-create"] = {}
3745 for delta in deltas:
3746 for vdu_delta in delta["vdu-delta"]:
3747 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3748 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3749 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3750 if cloud_init_text:
3751 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3752 cloud_init_list = []
3753
3754 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3755 max_instance_count = 10
3756 if vdu_profile and "max-number-of-instances" in vdu_profile:
3757 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3758
3759 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3760
3761 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3762
3763 if nb_scale_op + default_instance_num > max_instance_count:
3764 raise LcmException(
3765 "reached the limit of {} (max-instance-count) "
3766 "scaling-out operations for the "
3767 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3768 )
3769 for x in range(vdu_delta.get("number-of-instances", 1)):
3770 if cloud_init_text:
3771 # TODO Information of its own ip is not available because db_vnfr is not updated.
3772 additional_params["OSM"] = get_osm_params(
3773 db_vnfr,
3774 vdu_delta["id"],
3775 vdu_index + x
3776 )
3777 cloud_init_list.append(
3778 self._parse_cloud_init(
3779 cloud_init_text,
3780 additional_params,
3781 db_vnfd["id"],
3782 vdud["id"]
3783 )
3784 )
3785 VCA_scaling_info.append(
3786 {
3787 "osm_vdu_id": vdu_delta["id"],
3788 "member-vnf-index": vnf_index,
3789 "type": "create",
3790 "vdu_index": vdu_index + x
3791 }
3792 )
3793 RO_scaling_info.append(
3794 {
3795 "osm_vdu_id": vdu_delta["id"],
3796 "member-vnf-index": vnf_index,
3797 "type": "create",
3798 "count": vdu_delta.get("number-of-instances", 1)
3799 }
3800 )
3801 if cloud_init_list:
3802 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3803 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3804
3805 elif scaling_type == "SCALE_IN":
3806 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3807 min_instance_count = int(scaling_descriptor["min-instance-count"])
3808
3809 vdu_scaling_info["scaling_direction"] = "IN"
3810 vdu_scaling_info["vdu-delete"] = {}
3811 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3812 for delta in deltas:
3813 for vdu_delta in delta["vdu-delta"]:
3814 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3815 min_instance_count = 0
3816 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3817 if vdu_profile and "min-number-of-instances" in vdu_profile:
3818 min_instance_count = vdu_profile["min-number-of-instances"]
3819
3820 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3821
3822 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3823 if nb_scale_op + default_instance_num < min_instance_count:
3824 raise LcmException(
3825 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3826 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3827 )
3828 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3829 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3830 "vdu_index": vdu_index - 1})
3831 for x in range(vdu_delta.get("number-of-instances", 1)):
3832 VCA_scaling_info.append(
3833 {
3834 "osm_vdu_id": vdu_delta["id"],
3835 "member-vnf-index": vnf_index,
3836 "type": "delete",
3837 "vdu_index": vdu_index - 1 - x
3838 }
3839 )
3840 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3841
3842 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3843 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3844 if vdu_scaling_info["scaling_direction"] == "IN":
3845 for vdur in reversed(db_vnfr["vdur"]):
3846 if vdu_delete.get(vdur["vdu-id-ref"]):
3847 vdu_delete[vdur["vdu-id-ref"]] -= 1
3848 vdu_scaling_info["vdu"].append({
3849 "name": vdur.get("name") or vdur.get("vdu-name"),
3850 "vdu_id": vdur["vdu-id-ref"],
3851 "interface": []
3852 })
3853 for interface in vdur["interfaces"]:
3854 vdu_scaling_info["vdu"][-1]["interface"].append({
3855 "name": interface["name"],
3856 "ip_address": interface["ip-address"],
3857 "mac_address": interface.get("mac-address"),
3858 })
3859 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3860
3861 # PRE-SCALE BEGIN
3862 step = "Executing pre-scale vnf-config-primitive"
3863 if scaling_descriptor.get("scaling-config-action"):
3864 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3865 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3866 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3867 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3868 step = db_nslcmop_update["detailed-status"] = \
3869 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3870
3871 # look for primitive
3872 for config_primitive in (get_configuration(
3873 db_vnfd, db_vnfd["id"]
3874 ) or {}).get("config-primitive", ()):
3875 if config_primitive["name"] == vnf_config_primitive:
3876 break
3877 else:
3878 raise LcmException(
3879 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3880 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3881 "primitive".format(scaling_group, vnf_config_primitive))
3882
3883 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3884 if db_vnfr.get("additionalParamsForVnf"):
3885 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3886
3887 scale_process = "VCA"
3888 db_nsr_update["config-status"] = "configuring pre-scaling"
3889 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3890
3891 # Pre-scale retry check: Check if this sub-operation has been executed before
3892 op_index = self._check_or_add_scale_suboperation(
3893 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3894 if op_index == self.SUBOPERATION_STATUS_SKIP:
3895 # Skip sub-operation
3896 result = 'COMPLETED'
3897 result_detail = 'Done'
3898 self.logger.debug(logging_text +
3899 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3900 vnf_config_primitive, result, result_detail))
3901 else:
3902 if op_index == self.SUBOPERATION_STATUS_NEW:
3903 # New sub-operation: Get index of this sub-operation
3904 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3905 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3906 format(vnf_config_primitive))
3907 else:
3908 # retry: Get registered params for this existing sub-operation
3909 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3910 vnf_index = op.get('member_vnf_index')
3911 vnf_config_primitive = op.get('primitive')
3912 primitive_params = op.get('primitive_params')
3913 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3914 format(vnf_config_primitive))
3915 # Execute the primitive, either with new (first-time) or registered (reintent) args
3916 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3917 primitive_name = config_primitive.get("execution-environment-primitive",
3918 vnf_config_primitive)
3919 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3920 member_vnf_index=vnf_index,
3921 vdu_id=None,
3922 vdu_count_index=None,
3923 ee_descriptor_id=ee_descriptor_id)
3924 result, result_detail = await self._ns_execute_primitive(
3925 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3926 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3927 vnf_config_primitive, result, result_detail))
3928 # Update operationState = COMPLETED | FAILED
3929 self._update_suboperation_status(
3930 db_nslcmop, op_index, result, result_detail)
3931
3932 if result == "FAILED":
3933 raise LcmException(result_detail)
3934 db_nsr_update["config-status"] = old_config_status
3935 scale_process = None
3936 # PRE-SCALE END
3937
3938 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3939 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3940
3941 # SCALE-IN VCA - BEGIN
3942 if VCA_scaling_info:
3943 step = db_nslcmop_update["detailed-status"] = \
3944 "Deleting the execution environments"
3945 scale_process = "VCA"
3946 for vdu_info in VCA_scaling_info:
3947 if vdu_info["type"] == "delete":
3948 member_vnf_index = str(vdu_info["member-vnf-index"])
3949 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3950 vdu_id = vdu_info["osm_vdu_id"]
3951 vdu_index = int(vdu_info["vdu_index"])
3952 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3953 member_vnf_index, vdu_id, vdu_index)
3954 stage[2] = step = "Scaling in VCA"
3955 self._write_op_status(
3956 op_id=nslcmop_id,
3957 stage=stage
3958 )
3959 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3960 config_update = db_nsr["configurationStatus"]
3961 for vca_index, vca in enumerate(vca_update):
3962 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3963 vca["vdu_count_index"] == vdu_index:
3964 if vca.get("vdu_id"):
3965 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3966 elif vca.get("kdu_name"):
3967 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3968 else:
3969 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3970 operation_params = db_nslcmop.get("operationParams") or {}
3971 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3972 vca.get("needed_terminate"))
3973 task = asyncio.ensure_future(asyncio.wait_for(
3974 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3975 vca_index, destroy_ee=True,
3976 exec_primitives=exec_terminate_primitives,
3977 scaling_in=True), timeout=self.timeout_charm_delete))
3978 # wait before next removal
3979 await asyncio.sleep(30)
3980 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3981 del vca_update[vca_index]
3982 del config_update[vca_index]
3983 # wait for pending tasks of terminate primitives
3984 if tasks_dict_info:
3985 self.logger.debug(logging_text +
3986 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3987 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3988 min(self.timeout_charm_delete,
3989 self.timeout_ns_terminate),
3990 stage, nslcmop_id)
3991 tasks_dict_info.clear()
3992 if error_list:
3993 raise LcmException("; ".join(error_list))
3994
3995 db_vca_and_config_update = {
3996 "_admin.deployed.VCA": vca_update,
3997 "configurationStatus": config_update
3998 }
3999 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
4000 scale_process = None
4001 # SCALE-IN VCA - END
4002
4003 # SCALE RO - BEGIN
4004 if RO_scaling_info:
4005 scale_process = "RO"
4006 if self.ro_config.get("ng"):
4007 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4008 vdu_scaling_info.pop("vdu-create", None)
4009 vdu_scaling_info.pop("vdu-delete", None)
4010
4011 scale_process = None
4012 if db_nsr_update:
4013 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4014 # SCALE RO - END
4015
4016 # SCALE-UP VCA - BEGIN
4017 if VCA_scaling_info:
4018 step = db_nslcmop_update["detailed-status"] = \
4019 "Creating new execution environments"
4020 scale_process = "VCA"
4021 for vdu_info in VCA_scaling_info:
4022 if vdu_info["type"] == "create":
4023 member_vnf_index = str(vdu_info["member-vnf-index"])
4024 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4025 vnfd_id = db_vnfr["vnfd-ref"]
4026 vdu_index = int(vdu_info["vdu_index"])
4027 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4028 if db_vnfr.get("additionalParamsForVnf"):
4029 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4030 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4031 if descriptor_config:
4032 vdu_id = None
4033 vdu_name = None
4034 kdu_name = None
4035 self._deploy_n2vc(
4036 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4037 db_nsr=db_nsr,
4038 db_vnfr=db_vnfr,
4039 nslcmop_id=nslcmop_id,
4040 nsr_id=nsr_id,
4041 nsi_id=nsi_id,
4042 vnfd_id=vnfd_id,
4043 vdu_id=vdu_id,
4044 kdu_name=kdu_name,
4045 member_vnf_index=member_vnf_index,
4046 vdu_index=vdu_index,
4047 vdu_name=vdu_name,
4048 deploy_params=deploy_params,
4049 descriptor_config=descriptor_config,
4050 base_folder=base_folder,
4051 task_instantiation_info=tasks_dict_info,
4052 stage=stage
4053 )
4054 vdu_id = vdu_info["osm_vdu_id"]
4055 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4056 descriptor_config = get_configuration(db_vnfd, vdu_id)
4057 if vdur.get("additionalParams"):
4058 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4059 else:
4060 deploy_params_vdu = deploy_params
4061 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4062 if descriptor_config:
4063 vdu_name = None
4064 kdu_name = None
4065 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4066 member_vnf_index, vdu_id, vdu_index)
4067 stage[2] = step = "Scaling out VCA"
4068 self._write_op_status(
4069 op_id=nslcmop_id,
4070 stage=stage
4071 )
4072 self._deploy_n2vc(
4073 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4074 member_vnf_index, vdu_id, vdu_index),
4075 db_nsr=db_nsr,
4076 db_vnfr=db_vnfr,
4077 nslcmop_id=nslcmop_id,
4078 nsr_id=nsr_id,
4079 nsi_id=nsi_id,
4080 vnfd_id=vnfd_id,
4081 vdu_id=vdu_id,
4082 kdu_name=kdu_name,
4083 member_vnf_index=member_vnf_index,
4084 vdu_index=vdu_index,
4085 vdu_name=vdu_name,
4086 deploy_params=deploy_params_vdu,
4087 descriptor_config=descriptor_config,
4088 base_folder=base_folder,
4089 task_instantiation_info=tasks_dict_info,
4090 stage=stage
4091 )
4092 # TODO: scaling for kdu is not implemented yet.
4093 kdu_name = vdu_info["osm_vdu_id"]
4094 descriptor_config = get_configuration(db_vnfd, kdu_name)
4095 if descriptor_config:
4096 vdu_id = None
4097 vdu_index = vdu_index
4098 vdu_name = None
4099 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4100 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4101 if kdur.get("additionalParams"):
4102 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4103
4104 self._deploy_n2vc(
4105 logging_text=logging_text,
4106 db_nsr=db_nsr,
4107 db_vnfr=db_vnfr,
4108 nslcmop_id=nslcmop_id,
4109 nsr_id=nsr_id,
4110 nsi_id=nsi_id,
4111 vnfd_id=vnfd_id,
4112 vdu_id=vdu_id,
4113 kdu_name=kdu_name,
4114 member_vnf_index=member_vnf_index,
4115 vdu_index=vdu_index,
4116 vdu_name=vdu_name,
4117 deploy_params=deploy_params_kdu,
4118 descriptor_config=descriptor_config,
4119 base_folder=base_folder,
4120 task_instantiation_info=tasks_dict_info,
4121 stage=stage
4122 )
4123 # SCALE-UP VCA - END
4124 scale_process = None
4125
4126 # POST-SCALE BEGIN
4127 # execute primitive service POST-SCALING
4128 step = "Executing post-scale vnf-config-primitive"
4129 if scaling_descriptor.get("scaling-config-action"):
4130 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4131 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4132 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4133 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4134 step = db_nslcmop_update["detailed-status"] = \
4135 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4136
4137 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4138 if db_vnfr.get("additionalParamsForVnf"):
4139 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4140
4141 # look for primitive
4142 for config_primitive in (
4143 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4144 ).get("config-primitive", ()):
4145 if config_primitive["name"] == vnf_config_primitive:
4146 break
4147 else:
4148 raise LcmException(
4149 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4150 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4151 "config-primitive".format(scaling_group, vnf_config_primitive))
4152 scale_process = "VCA"
4153 db_nsr_update["config-status"] = "configuring post-scaling"
4154 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4155
4156 # Post-scale retry check: Check if this sub-operation has been executed before
4157 op_index = self._check_or_add_scale_suboperation(
4158 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4159 if op_index == self.SUBOPERATION_STATUS_SKIP:
4160 # Skip sub-operation
4161 result = 'COMPLETED'
4162 result_detail = 'Done'
4163 self.logger.debug(logging_text +
4164 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4165 format(vnf_config_primitive, result, result_detail))
4166 else:
4167 if op_index == self.SUBOPERATION_STATUS_NEW:
4168 # New sub-operation: Get index of this sub-operation
4169 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4170 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4171 format(vnf_config_primitive))
4172 else:
4173 # retry: Get registered params for this existing sub-operation
4174 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4175 vnf_index = op.get('member_vnf_index')
4176 vnf_config_primitive = op.get('primitive')
4177 primitive_params = op.get('primitive_params')
4178 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4179 format(vnf_config_primitive))
4180 # Execute the primitive, either with new (first-time) or registered (reintent) args
4181 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4182 primitive_name = config_primitive.get("execution-environment-primitive",
4183 vnf_config_primitive)
4184 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4185 member_vnf_index=vnf_index,
4186 vdu_id=None,
4187 vdu_count_index=None,
4188 ee_descriptor_id=ee_descriptor_id)
4189 result, result_detail = await self._ns_execute_primitive(
4190 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4191 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4192 vnf_config_primitive, result, result_detail))
4193 # Update operationState = COMPLETED | FAILED
4194 self._update_suboperation_status(
4195 db_nslcmop, op_index, result, result_detail)
4196
4197 if result == "FAILED":
4198 raise LcmException(result_detail)
4199 db_nsr_update["config-status"] = old_config_status
4200 scale_process = None
4201 # POST-SCALE END
4202
4203 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4204 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4205 else old_operational_status
4206 db_nsr_update["config-status"] = old_config_status
4207 return
4208 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4209 self.logger.error(logging_text + "Exit Exception {}".format(e))
4210 exc = e
4211 except asyncio.CancelledError:
4212 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4213 exc = "Operation was cancelled"
4214 except Exception as e:
4215 exc = traceback.format_exc()
4216 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4217 finally:
4218 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4219 if tasks_dict_info:
4220 stage[1] = "Waiting for instantiate pending tasks."
4221 self.logger.debug(logging_text + stage[1])
4222 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4223 stage, nslcmop_id, nsr_id=nsr_id)
4224 if exc:
4225 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4226 nslcmop_operation_state = "FAILED"
4227 if db_nsr:
4228 db_nsr_update["operational-status"] = old_operational_status
4229 db_nsr_update["config-status"] = old_config_status
4230 db_nsr_update["detailed-status"] = ""
4231 if scale_process:
4232 if "VCA" in scale_process:
4233 db_nsr_update["config-status"] = "failed"
4234 if "RO" in scale_process:
4235 db_nsr_update["operational-status"] = "failed"
4236 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4237 exc)
4238 else:
4239 error_description_nslcmop = None
4240 nslcmop_operation_state = "COMPLETED"
4241 db_nslcmop_update["detailed-status"] = "Done"
4242
4243 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4244 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4245 if db_nsr:
4246 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4247 current_operation_id=None, other_update=db_nsr_update)
4248
4249 if nslcmop_operation_state:
4250 try:
4251 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4252 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4253 except Exception as e:
4254 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4255 self.logger.debug(logging_text + "Exit")
4256 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4257
4258 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4259 nsr_id = db_nslcmop["nsInstanceId"]
4260 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4261 db_vnfrs = {}
4262
4263 # read from db: vnfd's for every vnf
4264 db_vnfds = []
4265
4266 # for each vnf in ns, read vnfd
4267 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4268 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4269 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4270 # if we haven't this vnfd, read it from db
4271 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4272 # read from db
4273 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4274 db_vnfds.append(vnfd)
4275 n2vc_key = self.n2vc.get_public_key()
4276 n2vc_key_list = [n2vc_key]
4277 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4278 mark_delete=True)
4279 # db_vnfr has been updated, update db_vnfrs to use it
4280 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4281 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4282 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4283 timeout_ns_deploy=self.timeout_ns_deploy)
4284 if vdu_scaling_info.get("vdu-delete"):
4285 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4286
4287 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4288 if not self.prometheus:
4289 return
4290 # look if exist a file called 'prometheus*.j2' and
4291 artifact_content = self.fs.dir_ls(artifact_path)
4292 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4293 if not job_file:
4294 return
4295 with self.fs.file_open((artifact_path, job_file), "r") as f:
4296 job_data = f.read()
4297
4298 # TODO get_service
4299 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4300 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4301 host_port = "80"
4302 vnfr_id = vnfr_id.replace("-", "")
4303 variables = {
4304 "JOB_NAME": vnfr_id,
4305 "TARGET_IP": target_ip,
4306 "EXPORTER_POD_IP": host_name,
4307 "EXPORTER_POD_PORT": host_port,
4308 }
4309 job_list = self.prometheus.parse_job(job_data, variables)
4310 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4311 for job in job_list:
4312 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4313 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4314 job["nsr_id"] = nsr_id
4315 job_dict = {jl["job_name"]: jl for jl in job_list}
4316 if await self.prometheus.update(job_dict):
4317 return list(job_dict.keys())
4318
4319 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4320 """
4321 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4322
4323 :param: vim_account_id: VIM Account ID
4324
4325 :return: (cloud_name, cloud_credential)
4326 """
4327 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4328 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4329
4330 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4331 """
4332 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4333
4334 :param: vim_account_id: VIM Account ID
4335
4336 :return: (cloud_name, cloud_credential)
4337 """
4338 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4339 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")