fix(FS): sync problems fixed syncing only the particular NSD and VNFD that the instan...
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 on_update_db=self._on_update_n2vc_db,
101 fs=self.fs,
102 db=self.db
103 )
104
105 self.conn_helm_ee = LCMHelmConn(
106 log=self.logger,
107 loop=self.loop,
108 vca_config=self.vca_config,
109 on_update_db=self._on_update_n2vc_db
110 )
111
112 self.k8sclusterhelm2 = K8sHelmConnector(
113 kubectl_command=self.vca_config.get("kubectlpath"),
114 helm_command=self.vca_config.get("helmpath"),
115 log=self.logger,
116 on_update_db=None,
117 fs=self.fs,
118 db=self.db
119 )
120
121 self.k8sclusterhelm3 = K8sHelm3Connector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helm3path"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 log=self.logger,
134 loop=self.loop,
135 on_update_db=self._on_update_k8s_db,
136 fs=self.fs,
137 db=self.db
138 )
139
140 self.k8scluster_map = {
141 "helm-chart": self.k8sclusterhelm2,
142 "helm-chart-v3": self.k8sclusterhelm3,
143 "chart": self.k8sclusterhelm3,
144 "juju-bundle": self.k8sclusterjuju,
145 "juju": self.k8sclusterjuju,
146 }
147
148 self.vca_map = {
149 "lxc_proxy_charm": self.n2vc,
150 "native_charm": self.n2vc,
151 "k8s_proxy_charm": self.n2vc,
152 "helm": self.conn_helm_ee,
153 "helm-v3": self.conn_helm_ee
154 }
155
156 self.prometheus = prometheus
157
158 # create RO client
159 self.RO = NgRoClient(self.loop, **self.ro_config)
160
161 @staticmethod
162 def increment_ip_mac(ip_mac, vm_index=1):
163 if not isinstance(ip_mac, str):
164 return ip_mac
165 try:
166 # try with ipv4 look for last dot
167 i = ip_mac.rfind(".")
168 if i > 0:
169 i += 1
170 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
171 # try with ipv6 or mac look for last colon. Operate in hex
172 i = ip_mac.rfind(":")
173 if i > 0:
174 i += 1
175 # format in hex, len can be 2 for mac or 4 for ipv6
176 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
177 except Exception:
178 pass
179 return None
180
181 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
182
183 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
184
185 try:
186 # TODO filter RO descriptor fields...
187
188 # write to database
189 db_dict = dict()
190 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
191 db_dict['deploymentStatus'] = ro_descriptor
192 self.update_db_2("nsrs", nsrs_id, db_dict)
193
194 except Exception as e:
195 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
196
197 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
198
199 # remove last dot from path (if exists)
200 if path.endswith('.'):
201 path = path[:-1]
202
203 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
204 # .format(table, filter, path, updated_data))
205 try:
206
207 nsr_id = filter.get('_id')
208
209 # read ns record from database
210 nsr = self.db.get_one(table='nsrs', q_filter=filter)
211 current_ns_status = nsr.get('nsState')
212
213 # get vca status for NS
214 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id)
215
216 # vcaStatus
217 db_dict = dict()
218 db_dict['vcaStatus'] = status_dict
219 await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id)
220
221 # update configurationStatus for this VCA
222 try:
223 vca_index = int(path[path.rfind(".")+1:])
224
225 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
226 vca_status = vca_list[vca_index].get('status')
227
228 configuration_status_list = nsr.get('configurationStatus')
229 config_status = configuration_status_list[vca_index].get('status')
230
231 if config_status == 'BROKEN' and vca_status != 'failed':
232 db_dict['configurationStatus'][vca_index] = 'READY'
233 elif config_status != 'BROKEN' and vca_status == 'failed':
234 db_dict['configurationStatus'][vca_index] = 'BROKEN'
235 except Exception as e:
236 # not update configurationStatus
237 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
238
239 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
240 # if nsState = 'DEGRADED' check if all is OK
241 is_degraded = False
242 if current_ns_status in ('READY', 'DEGRADED'):
243 error_description = ''
244 # check machines
245 if status_dict.get('machines'):
246 for machine_id in status_dict.get('machines'):
247 machine = status_dict.get('machines').get(machine_id)
248 # check machine agent-status
249 if machine.get('agent-status'):
250 s = machine.get('agent-status').get('status')
251 if s != 'started':
252 is_degraded = True
253 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
254 # check machine instance status
255 if machine.get('instance-status'):
256 s = machine.get('instance-status').get('status')
257 if s != 'running':
258 is_degraded = True
259 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
260 # check applications
261 if status_dict.get('applications'):
262 for app_id in status_dict.get('applications'):
263 app = status_dict.get('applications').get(app_id)
264 # check application status
265 if app.get('status'):
266 s = app.get('status').get('status')
267 if s != 'active':
268 is_degraded = True
269 error_description += 'application {} status={} ; '.format(app_id, s)
270
271 if error_description:
272 db_dict['errorDescription'] = error_description
273 if current_ns_status == 'READY' and is_degraded:
274 db_dict['nsState'] = 'DEGRADED'
275 if current_ns_status == 'DEGRADED' and not is_degraded:
276 db_dict['nsState'] = 'READY'
277
278 # write to database
279 self.update_db_2("nsrs", nsr_id, db_dict)
280
281 except (asyncio.CancelledError, asyncio.TimeoutError):
282 raise
283 except Exception as e:
284 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
285
286 async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None):
287 """
288 Updating vca status in NSR record
289 :param cluster_uuid: UUID of a k8s cluster
290 :param kdu_instance: The unique name of the KDU instance
291 :param filter: To get nsr_id
292 :return: none
293 """
294
295 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
296 # .format(cluster_uuid, kdu_instance, filter))
297
298 try:
299 nsr_id = filter.get('_id')
300
301 # get vca status for NS
302 vca_status = await self.k8sclusterjuju.status_kdu(
303 cluster_uuid,
304 kdu_instance,
305 complete_status=True,
306 yaml_format=False,
307 vca_id=vca_id,
308 )
309 # vcaStatus
310 db_dict = dict()
311 db_dict['vcaStatus'] = {nsr_id: vca_status}
312
313 await self.k8sclusterjuju.update_vca_status(
314 db_dict['vcaStatus'],
315 kdu_instance,
316 vca_id=vca_id,
317 )
318
319 # write to database
320 self.update_db_2("nsrs", nsr_id, db_dict)
321
322 except (asyncio.CancelledError, asyncio.TimeoutError):
323 raise
324 except Exception as e:
325 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
326
327 @staticmethod
328 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
329 try:
330 env = Environment(undefined=StrictUndefined)
331 template = env.from_string(cloud_init_text)
332 return template.render(additional_params or {})
333 except UndefinedError as e:
334 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
335 "file, must be provided in the instantiation parameters inside the "
336 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
337 except (TemplateError, TemplateNotFound) as e:
338 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
339 format(vnfd_id, vdu_id, e))
340
341 def _get_vdu_cloud_init_content(self, vdu, vnfd):
342 cloud_init_content = cloud_init_file = None
343 try:
344 if vdu.get("cloud-init-file"):
345 base_folder = vnfd["_admin"]["storage"]
346 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
347 vdu["cloud-init-file"])
348 with self.fs.file_open(cloud_init_file, "r") as ci_file:
349 cloud_init_content = ci_file.read()
350 elif vdu.get("cloud-init"):
351 cloud_init_content = vdu["cloud-init"]
352
353 return cloud_init_content
354 except FsException as e:
355 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
356 format(vnfd["id"], vdu["id"], cloud_init_file, e))
357
358 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
359 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
360 additional_params = vdur.get("additionalParams")
361 return parse_yaml_strings(additional_params)
362
363 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
364 """
365 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
366 :param vnfd: input vnfd
367 :param new_id: overrides vnf id if provided
368 :param additionalParams: Instantiation params for VNFs provided
369 :param nsrId: Id of the NSR
370 :return: copy of vnfd
371 """
372 vnfd_RO = deepcopy(vnfd)
373 # remove unused by RO configuration, monitoring, scaling and internal keys
374 vnfd_RO.pop("_id", None)
375 vnfd_RO.pop("_admin", None)
376 vnfd_RO.pop("monitoring-param", None)
377 vnfd_RO.pop("scaling-group-descriptor", None)
378 vnfd_RO.pop("kdu", None)
379 vnfd_RO.pop("k8s-cluster", None)
380 if new_id:
381 vnfd_RO["id"] = new_id
382
383 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
384 for vdu in get_iterable(vnfd_RO, "vdu"):
385 vdu.pop("cloud-init-file", None)
386 vdu.pop("cloud-init", None)
387 return vnfd_RO
388
389 @staticmethod
390 def ip_profile_2_RO(ip_profile):
391 RO_ip_profile = deepcopy(ip_profile)
392 if "dns-server" in RO_ip_profile:
393 if isinstance(RO_ip_profile["dns-server"], list):
394 RO_ip_profile["dns-address"] = []
395 for ds in RO_ip_profile.pop("dns-server"):
396 RO_ip_profile["dns-address"].append(ds['address'])
397 else:
398 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
399 if RO_ip_profile.get("ip-version") == "ipv4":
400 RO_ip_profile["ip-version"] = "IPv4"
401 if RO_ip_profile.get("ip-version") == "ipv6":
402 RO_ip_profile["ip-version"] = "IPv6"
403 if "dhcp-params" in RO_ip_profile:
404 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
405 return RO_ip_profile
406
407 def _get_ro_vim_id_for_vim_account(self, vim_account):
408 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
409 if db_vim["_admin"]["operationalState"] != "ENABLED":
410 raise LcmException("VIM={} is not available. operationalState={}".format(
411 vim_account, db_vim["_admin"]["operationalState"]))
412 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
413 return RO_vim_id
414
415 def get_ro_wim_id_for_wim_account(self, wim_account):
416 if isinstance(wim_account, str):
417 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
418 if db_wim["_admin"]["operationalState"] != "ENABLED":
419 raise LcmException("WIM={} is not available. operationalState={}".format(
420 wim_account, db_wim["_admin"]["operationalState"]))
421 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
422 return RO_wim_id
423 else:
424 return wim_account
425
426 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
427
428 db_vdu_push_list = []
429 db_update = {"_admin.modified": time()}
430 if vdu_create:
431 for vdu_id, vdu_count in vdu_create.items():
432 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
433 if not vdur:
434 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
435 format(vdu_id))
436
437 for count in range(vdu_count):
438 vdur_copy = deepcopy(vdur)
439 vdur_copy["status"] = "BUILD"
440 vdur_copy["status-detailed"] = None
441 vdur_copy["ip-address"]: None
442 vdur_copy["_id"] = str(uuid4())
443 vdur_copy["count-index"] += count + 1
444 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
445 vdur_copy.pop("vim_info", None)
446 for iface in vdur_copy["interfaces"]:
447 if iface.get("fixed-ip"):
448 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
449 else:
450 iface.pop("ip-address", None)
451 if iface.get("fixed-mac"):
452 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
453 else:
454 iface.pop("mac-address", None)
455 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
456 db_vdu_push_list.append(vdur_copy)
457 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
458 if vdu_delete:
459 for vdu_id, vdu_count in vdu_delete.items():
460 if mark_delete:
461 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
462 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
463 else:
464 # it must be deleted one by one because common.db does not allow otherwise
465 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
466 for vdu in vdus_to_delete[:vdu_count]:
467 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
468 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
469 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
470 # modify passed dictionary db_vnfr
471 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
472 db_vnfr["vdur"] = db_vnfr_["vdur"]
473
474 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
475 """
476 Updates database nsr with the RO info for the created vld
477 :param ns_update_nsr: dictionary to be filled with the updated info
478 :param db_nsr: content of db_nsr. This is also modified
479 :param nsr_desc_RO: nsr descriptor from RO
480 :return: Nothing, LcmException is raised on errors
481 """
482
483 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
484 for net_RO in get_iterable(nsr_desc_RO, "nets"):
485 if vld["id"] != net_RO.get("ns_net_osm_id"):
486 continue
487 vld["vim-id"] = net_RO.get("vim_net_id")
488 vld["name"] = net_RO.get("vim_name")
489 vld["status"] = net_RO.get("status")
490 vld["status-detailed"] = net_RO.get("error_msg")
491 ns_update_nsr["vld.{}".format(vld_index)] = vld
492 break
493 else:
494 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
495
496 def set_vnfr_at_error(self, db_vnfrs, error_text):
497 try:
498 for db_vnfr in db_vnfrs.values():
499 vnfr_update = {"status": "ERROR"}
500 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
501 if "status" not in vdur:
502 vdur["status"] = "ERROR"
503 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
504 if error_text:
505 vdur["status-detailed"] = str(error_text)
506 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
507 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
508 except DbException as e:
509 self.logger.error("Cannot update vnf. {}".format(e))
510
511 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
512 """
513 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
514 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
515 :param nsr_desc_RO: nsr descriptor from RO
516 :return: Nothing, LcmException is raised on errors
517 """
518 for vnf_index, db_vnfr in db_vnfrs.items():
519 for vnf_RO in nsr_desc_RO["vnfs"]:
520 if vnf_RO["member_vnf_index"] != vnf_index:
521 continue
522 vnfr_update = {}
523 if vnf_RO.get("ip_address"):
524 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
525 elif not db_vnfr.get("ip-address"):
526 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
527 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
528
529 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
530 vdur_RO_count_index = 0
531 if vdur.get("pdu-type"):
532 continue
533 for vdur_RO in get_iterable(vnf_RO, "vms"):
534 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
535 continue
536 if vdur["count-index"] != vdur_RO_count_index:
537 vdur_RO_count_index += 1
538 continue
539 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
540 if vdur_RO.get("ip_address"):
541 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
542 else:
543 vdur["ip-address"] = None
544 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
545 vdur["name"] = vdur_RO.get("vim_name")
546 vdur["status"] = vdur_RO.get("status")
547 vdur["status-detailed"] = vdur_RO.get("error_msg")
548 for ifacer in get_iterable(vdur, "interfaces"):
549 for interface_RO in get_iterable(vdur_RO, "interfaces"):
550 if ifacer["name"] == interface_RO.get("internal_name"):
551 ifacer["ip-address"] = interface_RO.get("ip_address")
552 ifacer["mac-address"] = interface_RO.get("mac_address")
553 break
554 else:
555 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
556 "from VIM info"
557 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
558 vnfr_update["vdur.{}".format(vdu_index)] = vdur
559 break
560 else:
561 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
562 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
563
564 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
565 for net_RO in get_iterable(nsr_desc_RO, "nets"):
566 if vld["id"] != net_RO.get("vnf_net_osm_id"):
567 continue
568 vld["vim-id"] = net_RO.get("vim_net_id")
569 vld["name"] = net_RO.get("vim_name")
570 vld["status"] = net_RO.get("status")
571 vld["status-detailed"] = net_RO.get("error_msg")
572 vnfr_update["vld.{}".format(vld_index)] = vld
573 break
574 else:
575 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
576 vnf_index, vld["id"]))
577
578 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
579 break
580
581 else:
582 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
583
584 def _get_ns_config_info(self, nsr_id):
585 """
586 Generates a mapping between vnf,vdu elements and the N2VC id
587 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
588 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
589 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
590 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
591 """
592 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
593 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
594 mapping = {}
595 ns_config_info = {"osm-config-mapping": mapping}
596 for vca in vca_deployed_list:
597 if not vca["member-vnf-index"]:
598 continue
599 if not vca["vdu_id"]:
600 mapping[vca["member-vnf-index"]] = vca["application"]
601 else:
602 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
603 vca["application"]
604 return ns_config_info
605
606 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
607 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
608
609 db_vims = {}
610
611 def get_vim_account(vim_account_id):
612 nonlocal db_vims
613 if vim_account_id in db_vims:
614 return db_vims[vim_account_id]
615 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
616 db_vims[vim_account_id] = db_vim
617 return db_vim
618
619 # modify target_vld info with instantiation parameters
620 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
621 if vld_params.get("ip-profile"):
622 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
623 if vld_params.get("provider-network"):
624 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
625 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
626 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
627 if vld_params.get("wimAccountId"):
628 target_wim = "wim:{}".format(vld_params["wimAccountId"])
629 target_vld["vim_info"][target_wim] = {}
630 for param in ("vim-network-name", "vim-network-id"):
631 if vld_params.get(param):
632 if isinstance(vld_params[param], dict):
633 for vim, vim_net in vld_params[param].items():
634 other_target_vim = "vim:" + vim
635 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
636 else: # isinstance str
637 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
638 if vld_params.get("common_id"):
639 target_vld["common_id"] = vld_params.get("common_id")
640
641 nslcmop_id = db_nslcmop["_id"]
642 target = {
643 "name": db_nsr["name"],
644 "ns": {"vld": []},
645 "vnf": [],
646 "image": deepcopy(db_nsr["image"]),
647 "flavor": deepcopy(db_nsr["flavor"]),
648 "action_id": nslcmop_id,
649 "cloud_init_content": {},
650 }
651 for image in target["image"]:
652 image["vim_info"] = {}
653 for flavor in target["flavor"]:
654 flavor["vim_info"] = {}
655
656 if db_nslcmop.get("lcmOperationType") != "instantiate":
657 # get parameters of instantiation:
658 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
659 "lcmOperationType": "instantiate"})[-1]
660 ns_params = db_nslcmop_instantiate.get("operationParams")
661 else:
662 ns_params = db_nslcmop.get("operationParams")
663 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
664 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
665
666 cp2target = {}
667 for vld_index, vld in enumerate(db_nsr.get("vld")):
668 target_vim = "vim:{}".format(ns_params["vimAccountId"])
669 target_vld = {
670 "id": vld["id"],
671 "name": vld["name"],
672 "mgmt-network": vld.get("mgmt-network", False),
673 "type": vld.get("type"),
674 "vim_info": {
675 target_vim: {
676 "vim_network_name": vld.get("vim-network-name"),
677 "vim_account_id": ns_params["vimAccountId"]
678 }
679 }
680 }
681 # check if this network needs SDN assist
682 if vld.get("pci-interfaces"):
683 db_vim = get_vim_account(ns_params["vimAccountId"])
684 sdnc_id = db_vim["config"].get("sdn-controller")
685 if sdnc_id:
686 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
687 target_sdn = "sdn:{}".format(sdnc_id)
688 target_vld["vim_info"][target_sdn] = {
689 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
690
691 nsd_vnf_profiles = get_vnf_profiles(nsd)
692 for nsd_vnf_profile in nsd_vnf_profiles:
693 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
694 if cp["virtual-link-profile-id"] == vld["id"]:
695 cp2target["member_vnf:{}.{}".format(
696 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
697 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
698 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
699
700 # check at nsd descriptor, if there is an ip-profile
701 vld_params = {}
702 nsd_vlp = find_in_list(
703 get_virtual_link_profiles(nsd),
704 lambda a_link_profile: a_link_profile["virtual-link-desc-id"] == vld["id"])
705 if nsd_vlp and nsd_vlp.get("virtual-link-protocol-data") and \
706 nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
707 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
708 ip_profile_dest_data = {}
709 if "ip-version" in ip_profile_source_data:
710 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
711 if "cidr" in ip_profile_source_data:
712 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
713 if "gateway-ip" in ip_profile_source_data:
714 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
715 if "dhcp-enabled" in ip_profile_source_data:
716 ip_profile_dest_data["dhcp-params"] = {
717 "enabled": ip_profile_source_data["dhcp-enabled"]
718 }
719 vld_params["ip-profile"] = ip_profile_dest_data
720
721 # update vld_params with instantiation params
722 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
723 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
724 if vld_instantiation_params:
725 vld_params.update(vld_instantiation_params)
726 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
727 target["ns"]["vld"].append(target_vld)
728
729 for vnfr in db_vnfrs.values():
730 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
731 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
732 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
733 target_vnf = deepcopy(vnfr)
734 target_vim = "vim:{}".format(vnfr["vim-account-id"])
735 for vld in target_vnf.get("vld", ()):
736 # check if connected to a ns.vld, to fill target'
737 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
738 lambda cpd: cpd.get("id") == vld["id"])
739 if vnf_cp:
740 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
741 if cp2target.get(ns_cp):
742 vld["target"] = cp2target[ns_cp]
743
744 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
745 # check if this network needs SDN assist
746 target_sdn = None
747 if vld.get("pci-interfaces"):
748 db_vim = get_vim_account(vnfr["vim-account-id"])
749 sdnc_id = db_vim["config"].get("sdn-controller")
750 if sdnc_id:
751 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
752 target_sdn = "sdn:{}".format(sdnc_id)
753 vld["vim_info"][target_sdn] = {
754 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
755
756 # check at vnfd descriptor, if there is an ip-profile
757 vld_params = {}
758 vnfd_vlp = find_in_list(
759 get_virtual_link_profiles(vnfd),
760 lambda a_link_profile: a_link_profile["id"] == vld["id"]
761 )
762 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
763 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
764 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
765 ip_profile_dest_data = {}
766 if "ip-version" in ip_profile_source_data:
767 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
768 if "cidr" in ip_profile_source_data:
769 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
770 if "gateway-ip" in ip_profile_source_data:
771 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
772 if "dhcp-enabled" in ip_profile_source_data:
773 ip_profile_dest_data["dhcp-params"] = {
774 "enabled": ip_profile_source_data["dhcp-enabled"]
775 }
776
777 vld_params["ip-profile"] = ip_profile_dest_data
778 # update vld_params with instantiation params
779 if vnf_params:
780 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
781 lambda i_vld: i_vld["name"] == vld["id"])
782 if vld_instantiation_params:
783 vld_params.update(vld_instantiation_params)
784 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
785
786 vdur_list = []
787 for vdur in target_vnf.get("vdur", ()):
788 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
789 continue # This vdu must not be created
790 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
791
792 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
793
794 if ssh_keys_all:
795 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
796 vnf_configuration = get_configuration(vnfd, vnfd["id"])
797 if vdu_configuration and vdu_configuration.get("config-access") and \
798 vdu_configuration.get("config-access").get("ssh-access"):
799 vdur["ssh-keys"] = ssh_keys_all
800 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
801 elif vnf_configuration and vnf_configuration.get("config-access") and \
802 vnf_configuration.get("config-access").get("ssh-access") and \
803 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
804 vdur["ssh-keys"] = ssh_keys_all
805 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
806 elif ssh_keys_instantiation and \
807 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
808 vdur["ssh-keys"] = ssh_keys_instantiation
809
810 self.logger.debug("NS > vdur > {}".format(vdur))
811
812 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
813 # cloud-init
814 if vdud.get("cloud-init-file"):
815 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
816 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
817 if vdur["cloud-init"] not in target["cloud_init_content"]:
818 base_folder = vnfd["_admin"]["storage"]
819 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
820 vdud.get("cloud-init-file"))
821 with self.fs.file_open(cloud_init_file, "r") as ci_file:
822 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
823 elif vdud.get("cloud-init"):
824 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
825 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
826 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
827 vdur["additionalParams"] = vdur.get("additionalParams") or {}
828 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
829 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
830 vdur["additionalParams"] = deploy_params_vdu
831
832 # flavor
833 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
834 if target_vim not in ns_flavor["vim_info"]:
835 ns_flavor["vim_info"][target_vim] = {}
836
837 # deal with images
838 # in case alternative images are provided we must check if they should be applied
839 # for the vim_type, modify the vim_type taking into account
840 ns_image_id = int(vdur["ns-image-id"])
841 if vdur.get("alt-image-ids"):
842 db_vim = get_vim_account(vnfr["vim-account-id"])
843 vim_type = db_vim["vim_type"]
844 for alt_image_id in vdur.get("alt-image-ids"):
845 ns_alt_image = target["image"][int(alt_image_id)]
846 if vim_type == ns_alt_image.get("vim-type"):
847 # must use alternative image
848 self.logger.debug("use alternative image id: {}".format(alt_image_id))
849 ns_image_id = alt_image_id
850 vdur["ns-image-id"] = ns_image_id
851 break
852 ns_image = target["image"][int(ns_image_id)]
853 if target_vim not in ns_image["vim_info"]:
854 ns_image["vim_info"][target_vim] = {}
855
856 vdur["vim_info"] = {target_vim: {}}
857 # instantiation parameters
858 # if vnf_params:
859 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
860 # vdud["id"]), None)
861 vdur_list.append(vdur)
862 target_vnf["vdur"] = vdur_list
863 target["vnf"].append(target_vnf)
864
865 desc = await self.RO.deploy(nsr_id, target)
866 self.logger.debug("RO return > {}".format(desc))
867 action_id = desc["action_id"]
868 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
869
870 # Updating NSR
871 db_nsr_update = {
872 "_admin.deployed.RO.operational-status": "running",
873 "detailed-status": " ".join(stage)
874 }
875 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
876 self.update_db_2("nsrs", nsr_id, db_nsr_update)
877 self._write_op_status(nslcmop_id, stage)
878 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
879 return
880
881 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
882 detailed_status_old = None
883 db_nsr_update = {}
884 start_time = start_time or time()
885 while time() <= start_time + timeout:
886 desc_status = await self.RO.status(nsr_id, action_id)
887 self.logger.debug("Wait NG RO > {}".format(desc_status))
888 if desc_status["status"] == "FAILED":
889 raise NgRoException(desc_status["details"])
890 elif desc_status["status"] == "BUILD":
891 if stage:
892 stage[2] = "VIM: ({})".format(desc_status["details"])
893 elif desc_status["status"] == "DONE":
894 if stage:
895 stage[2] = "Deployed at VIM"
896 break
897 else:
898 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
899 if stage and nslcmop_id and stage[2] != detailed_status_old:
900 detailed_status_old = stage[2]
901 db_nsr_update["detailed-status"] = " ".join(stage)
902 self.update_db_2("nsrs", nsr_id, db_nsr_update)
903 self._write_op_status(nslcmop_id, stage)
904 await asyncio.sleep(15, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise NgRoException("Timeout waiting ns to deploy")
907
908 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
909 db_nsr_update = {}
910 failed_detail = []
911 action_id = None
912 start_deploy = time()
913 try:
914 target = {
915 "ns": {"vld": []},
916 "vnf": [],
917 "image": [],
918 "flavor": [],
919 "action_id": nslcmop_id
920 }
921 desc = await self.RO.deploy(nsr_id, target)
922 action_id = desc["action_id"]
923 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
924 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
925 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
926
927 # wait until done
928 delete_timeout = 20 * 60 # 20 minutes
929 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
930
931 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
933 # delete all nsr
934 await self.RO.delete(nsr_id)
935 except Exception as e:
936 if isinstance(e, NgRoException) and e.http_code == 404: # not found
937 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
938 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
939 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
940 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
941 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
942 failed_detail.append("delete conflict: {}".format(e))
943 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
944 else:
945 failed_detail.append("delete error: {}".format(e))
946 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
947
948 if failed_detail:
949 stage[2] = "Error deleting from VIM"
950 else:
951 stage[2] = "Deleted from VIM"
952 db_nsr_update["detailed-status"] = " ".join(stage)
953 self.update_db_2("nsrs", nsr_id, db_nsr_update)
954 self._write_op_status(nslcmop_id, stage)
955
956 if failed_detail:
957 raise LcmException("; ".join(failed_detail))
958 return
959
960 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
961 n2vc_key_list, stage):
962 """
963 Instantiate at RO
964 :param logging_text: preffix text to use at logging
965 :param nsr_id: nsr identity
966 :param nsd: database content of ns descriptor
967 :param db_nsr: database content of ns record
968 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
969 :param db_vnfrs:
970 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
971 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
972 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
973 :return: None or exception
974 """
975 try:
976 start_deploy = time()
977 ns_params = db_nslcmop.get("operationParams")
978 if ns_params and ns_params.get("timeout_ns_deploy"):
979 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
980 else:
981 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
982
983 # Check for and optionally request placement optimization. Database will be updated if placement activated
984 stage[2] = "Waiting for Placement."
985 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
986 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
987 for vnfr in db_vnfrs.values():
988 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
989 break
990 else:
991 ns_params["vimAccountId"] == vnfr["vim-account-id"]
992
993 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
994 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
995 except Exception as e:
996 stage[2] = "ERROR deploying at VIM"
997 self.set_vnfr_at_error(db_vnfrs, str(e))
998 self.logger.error("Error deploying at VIM {}".format(e),
999 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
1000 NgRoException)))
1001 raise
1002
1003 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1004 """
1005 Wait for kdu to be up, get ip address
1006 :param logging_text: prefix use for logging
1007 :param nsr_id:
1008 :param vnfr_id:
1009 :param kdu_name:
1010 :return: IP address
1011 """
1012
1013 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1014 nb_tries = 0
1015
1016 while nb_tries < 360:
1017 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1018 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1019 if not kdur:
1020 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1021 if kdur.get("status"):
1022 if kdur["status"] in ("READY", "ENABLED"):
1023 return kdur.get("ip-address")
1024 else:
1025 raise LcmException("target KDU={} is in error state".format(kdu_name))
1026
1027 await asyncio.sleep(10, loop=self.loop)
1028 nb_tries += 1
1029 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1030
1031 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1032 """
1033 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1034 :param logging_text: prefix use for logging
1035 :param nsr_id:
1036 :param vnfr_id:
1037 :param vdu_id:
1038 :param vdu_index:
1039 :param pub_key: public ssh key to inject, None to skip
1040 :param user: user to apply the public ssh key
1041 :return: IP address
1042 """
1043
1044 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1045 ro_nsr_id = None
1046 ip_address = None
1047 nb_tries = 0
1048 target_vdu_id = None
1049 ro_retries = 0
1050
1051 while True:
1052
1053 ro_retries += 1
1054 if ro_retries >= 360: # 1 hour
1055 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1056
1057 await asyncio.sleep(10, loop=self.loop)
1058
1059 # get ip address
1060 if not target_vdu_id:
1061 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1062
1063 if not vdu_id: # for the VNF case
1064 if db_vnfr.get("status") == "ERROR":
1065 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1066 ip_address = db_vnfr.get("ip-address")
1067 if not ip_address:
1068 continue
1069 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1070 else: # VDU case
1071 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1072 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1073
1074 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1075 vdur = db_vnfr["vdur"][0]
1076 if not vdur:
1077 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1078 vdu_index))
1079 # New generation RO stores information at "vim_info"
1080 ng_ro_status = None
1081 target_vim = None
1082 if vdur.get("vim_info"):
1083 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1084 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1085 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1086 ip_address = vdur.get("ip-address")
1087 if not ip_address:
1088 continue
1089 target_vdu_id = vdur["vdu-id-ref"]
1090 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1091 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1092
1093 if not target_vdu_id:
1094 continue
1095
1096 # inject public key into machine
1097 if pub_key and user:
1098 self.logger.debug(logging_text + "Inserting RO key")
1099 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1100 if vdur.get("pdu-type"):
1101 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1102 return ip_address
1103 try:
1104 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1105 if self.ng_ro:
1106 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1107 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1108 }
1109 desc = await self.RO.deploy(nsr_id, target)
1110 action_id = desc["action_id"]
1111 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1112 break
1113 else:
1114 # wait until NS is deployed at RO
1115 if not ro_nsr_id:
1116 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1117 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1118 if not ro_nsr_id:
1119 continue
1120 result_dict = await self.RO.create_action(
1121 item="ns",
1122 item_id_name=ro_nsr_id,
1123 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1124 )
1125 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1126 if not result_dict or not isinstance(result_dict, dict):
1127 raise LcmException("Unknown response from RO when injecting key")
1128 for result in result_dict.values():
1129 if result.get("vim_result") == 200:
1130 break
1131 else:
1132 raise ROclient.ROClientException("error injecting key: {}".format(
1133 result.get("description")))
1134 break
1135 except NgRoException as e:
1136 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1137 except ROclient.ROClientException as e:
1138 if not nb_tries:
1139 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1140 format(e, 20*10))
1141 nb_tries += 1
1142 if nb_tries >= 20:
1143 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1144 else:
1145 break
1146
1147 return ip_address
1148
1149 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1150 """
1151 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1152 """
1153 my_vca = vca_deployed_list[vca_index]
1154 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1155 # vdu or kdu: no dependencies
1156 return
1157 timeout = 300
1158 while timeout >= 0:
1159 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1160 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1161 configuration_status_list = db_nsr["configurationStatus"]
1162 for index, vca_deployed in enumerate(configuration_status_list):
1163 if index == vca_index:
1164 # myself
1165 continue
1166 if not my_vca.get("member-vnf-index") or \
1167 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1168 internal_status = configuration_status_list[index].get("status")
1169 if internal_status == 'READY':
1170 continue
1171 elif internal_status == 'BROKEN':
1172 raise LcmException("Configuration aborted because dependent charm/s has failed")
1173 else:
1174 break
1175 else:
1176 # no dependencies, return
1177 return
1178 await asyncio.sleep(10)
1179 timeout -= 1
1180
1181 raise LcmException("Configuration aborted because dependent charm/s timeout")
1182
1183 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1184 return (
1185 deep_get(db_vnfr, ("vca-id",)) or
1186 deep_get(db_nsr, ("instantiate_params", "vcaId"))
1187 )
1188
1189 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1190 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1191 ee_config_descriptor):
1192 nsr_id = db_nsr["_id"]
1193 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1194 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1195 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1196 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1197 db_dict = {
1198 'collection': 'nsrs',
1199 'filter': {'_id': nsr_id},
1200 'path': db_update_entry
1201 }
1202 step = ""
1203 try:
1204
1205 element_type = 'NS'
1206 element_under_configuration = nsr_id
1207
1208 vnfr_id = None
1209 if db_vnfr:
1210 vnfr_id = db_vnfr["_id"]
1211 osm_config["osm"]["vnf_id"] = vnfr_id
1212
1213 namespace = "{nsi}.{ns}".format(
1214 nsi=nsi_id if nsi_id else "",
1215 ns=nsr_id)
1216
1217 if vnfr_id:
1218 element_type = 'VNF'
1219 element_under_configuration = vnfr_id
1220 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1221 if vdu_id:
1222 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1223 element_type = 'VDU'
1224 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1225 osm_config["osm"]["vdu_id"] = vdu_id
1226 elif kdu_name:
1227 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1228 element_type = 'KDU'
1229 element_under_configuration = kdu_name
1230 osm_config["osm"]["kdu_name"] = kdu_name
1231
1232 # Get artifact path
1233 artifact_path = "{}/{}/{}/{}".format(
1234 base_folder["folder"],
1235 base_folder["pkg-dir"],
1236 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1237 vca_name
1238 )
1239
1240 self.logger.debug("Artifact path > {}".format(artifact_path))
1241
1242 # get initial_config_primitive_list that applies to this element
1243 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1244
1245 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1246
1247 # add config if not present for NS charm
1248 ee_descriptor_id = ee_config_descriptor.get("id")
1249 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1250 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1251 vca_deployed, ee_descriptor_id)
1252
1253 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1254 # n2vc_redesign STEP 3.1
1255 # find old ee_id if exists
1256 ee_id = vca_deployed.get("ee_id")
1257
1258 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1259 # create or register execution environment in VCA
1260 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1261
1262 self._write_configuration_status(
1263 nsr_id=nsr_id,
1264 vca_index=vca_index,
1265 status='CREATING',
1266 element_under_configuration=element_under_configuration,
1267 element_type=element_type
1268 )
1269
1270 step = "create execution environment"
1271 self.logger.debug(logging_text + step)
1272
1273 ee_id = None
1274 credentials = None
1275 if vca_type == "k8s_proxy_charm":
1276 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1277 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1278 namespace=namespace,
1279 artifact_path=artifact_path,
1280 db_dict=db_dict,
1281 vca_id=vca_id,
1282 )
1283 elif vca_type == "helm" or vca_type == "helm-v3":
1284 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1285 namespace=namespace,
1286 reuse_ee_id=ee_id,
1287 db_dict=db_dict,
1288 config=osm_config,
1289 artifact_path=artifact_path,
1290 vca_type=vca_type
1291 )
1292 else:
1293 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1294 namespace=namespace,
1295 reuse_ee_id=ee_id,
1296 db_dict=db_dict,
1297 vca_id=vca_id,
1298 )
1299
1300 elif vca_type == "native_charm":
1301 step = "Waiting to VM being up and getting IP address"
1302 self.logger.debug(logging_text + step)
1303 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1304 user=None, pub_key=None)
1305 credentials = {"hostname": rw_mgmt_ip}
1306 # get username
1307 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1308 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1309 # merged. Meanwhile let's get username from initial-config-primitive
1310 if not username and initial_config_primitive_list:
1311 for config_primitive in initial_config_primitive_list:
1312 for param in config_primitive.get("parameter", ()):
1313 if param["name"] == "ssh-username":
1314 username = param["value"]
1315 break
1316 if not username:
1317 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1318 "'config-access.ssh-access.default-user'")
1319 credentials["username"] = username
1320 # n2vc_redesign STEP 3.2
1321
1322 self._write_configuration_status(
1323 nsr_id=nsr_id,
1324 vca_index=vca_index,
1325 status='REGISTERING',
1326 element_under_configuration=element_under_configuration,
1327 element_type=element_type
1328 )
1329
1330 step = "register execution environment {}".format(credentials)
1331 self.logger.debug(logging_text + step)
1332 ee_id = await self.vca_map[vca_type].register_execution_environment(
1333 credentials=credentials,
1334 namespace=namespace,
1335 db_dict=db_dict,
1336 vca_id=vca_id,
1337 )
1338
1339 # for compatibility with MON/POL modules, the need model and application name at database
1340 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1341 ee_id_parts = ee_id.split('.')
1342 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1343 if len(ee_id_parts) >= 2:
1344 model_name = ee_id_parts[0]
1345 application_name = ee_id_parts[1]
1346 db_nsr_update[db_update_entry + "model"] = model_name
1347 db_nsr_update[db_update_entry + "application"] = application_name
1348
1349 # n2vc_redesign STEP 3.3
1350 step = "Install configuration Software"
1351
1352 self._write_configuration_status(
1353 nsr_id=nsr_id,
1354 vca_index=vca_index,
1355 status='INSTALLING SW',
1356 element_under_configuration=element_under_configuration,
1357 element_type=element_type,
1358 other_update=db_nsr_update
1359 )
1360
1361 # TODO check if already done
1362 self.logger.debug(logging_text + step)
1363 config = None
1364 if vca_type == "native_charm":
1365 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1366 if config_primitive:
1367 config = self._map_primitive_params(
1368 config_primitive,
1369 {},
1370 deploy_params
1371 )
1372 num_units = 1
1373 if vca_type == "lxc_proxy_charm":
1374 if element_type == "NS":
1375 num_units = db_nsr.get("config-units") or 1
1376 elif element_type == "VNF":
1377 num_units = db_vnfr.get("config-units") or 1
1378 elif element_type == "VDU":
1379 for v in db_vnfr["vdur"]:
1380 if vdu_id == v["vdu-id-ref"]:
1381 num_units = v.get("config-units") or 1
1382 break
1383 if vca_type != "k8s_proxy_charm":
1384 await self.vca_map[vca_type].install_configuration_sw(
1385 ee_id=ee_id,
1386 artifact_path=artifact_path,
1387 db_dict=db_dict,
1388 config=config,
1389 num_units=num_units,
1390 vca_id=vca_id,
1391 )
1392
1393 # write in db flag of configuration_sw already installed
1394 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1395
1396 # add relations for this VCA (wait for other peers related with this VCA)
1397 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1398 vca_index=vca_index, vca_id=vca_id, vca_type=vca_type)
1399
1400 # if SSH access is required, then get execution environment SSH public
1401 # if native charm we have waited already to VM be UP
1402 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1403 pub_key = None
1404 user = None
1405 # self.logger.debug("get ssh key block")
1406 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1407 # self.logger.debug("ssh key needed")
1408 # Needed to inject a ssh key
1409 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1410 step = "Install configuration Software, getting public ssh key"
1411 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1412 ee_id=ee_id,
1413 db_dict=db_dict,
1414 vca_id=vca_id
1415 )
1416
1417 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1418 else:
1419 # self.logger.debug("no need to get ssh key")
1420 step = "Waiting to VM being up and getting IP address"
1421 self.logger.debug(logging_text + step)
1422
1423 # n2vc_redesign STEP 5.1
1424 # wait for RO (ip-address) Insert pub_key into VM
1425 if vnfr_id:
1426 if kdu_name:
1427 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1428 else:
1429 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1430 vdu_index, user=user, pub_key=pub_key)
1431 else:
1432 rw_mgmt_ip = None # This is for a NS configuration
1433
1434 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1435
1436 # store rw_mgmt_ip in deploy params for later replacement
1437 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1438
1439 # n2vc_redesign STEP 6 Execute initial config primitive
1440 step = 'execute initial config primitive'
1441
1442 # wait for dependent primitives execution (NS -> VNF -> VDU)
1443 if initial_config_primitive_list:
1444 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1445
1446 # stage, in function of element type: vdu, kdu, vnf or ns
1447 my_vca = vca_deployed_list[vca_index]
1448 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1449 # VDU or KDU
1450 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1451 elif my_vca.get("member-vnf-index"):
1452 # VNF
1453 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1454 else:
1455 # NS
1456 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1457
1458 self._write_configuration_status(
1459 nsr_id=nsr_id,
1460 vca_index=vca_index,
1461 status='EXECUTING PRIMITIVE'
1462 )
1463
1464 self._write_op_status(
1465 op_id=nslcmop_id,
1466 stage=stage
1467 )
1468
1469 check_if_terminated_needed = True
1470 for initial_config_primitive in initial_config_primitive_list:
1471 # adding information on the vca_deployed if it is a NS execution environment
1472 if not vca_deployed["member-vnf-index"]:
1473 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1474 # TODO check if already done
1475 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1476
1477 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1478 self.logger.debug(logging_text + step)
1479 await self.vca_map[vca_type].exec_primitive(
1480 ee_id=ee_id,
1481 primitive_name=initial_config_primitive["name"],
1482 params_dict=primitive_params_,
1483 db_dict=db_dict,
1484 vca_id=vca_id,
1485 )
1486 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1487 if check_if_terminated_needed:
1488 if config_descriptor.get('terminate-config-primitive'):
1489 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1490 check_if_terminated_needed = False
1491
1492 # TODO register in database that primitive is done
1493
1494 # STEP 7 Configure metrics
1495 if vca_type == "helm" or vca_type == "helm-v3":
1496 prometheus_jobs = await self.add_prometheus_metrics(
1497 ee_id=ee_id,
1498 artifact_path=artifact_path,
1499 ee_config_descriptor=ee_config_descriptor,
1500 vnfr_id=vnfr_id,
1501 nsr_id=nsr_id,
1502 target_ip=rw_mgmt_ip,
1503 )
1504 if prometheus_jobs:
1505 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1506
1507 step = "instantiated at VCA"
1508 self.logger.debug(logging_text + step)
1509
1510 self._write_configuration_status(
1511 nsr_id=nsr_id,
1512 vca_index=vca_index,
1513 status='READY'
1514 )
1515
1516 except Exception as e: # TODO not use Exception but N2VC exception
1517 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1518 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1519 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1520 self._write_configuration_status(
1521 nsr_id=nsr_id,
1522 vca_index=vca_index,
1523 status='BROKEN'
1524 )
1525 raise LcmException("{} {}".format(step, e)) from e
1526
1527 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1528 error_description: str = None, error_detail: str = None, other_update: dict = None):
1529 """
1530 Update db_nsr fields.
1531 :param nsr_id:
1532 :param ns_state:
1533 :param current_operation:
1534 :param current_operation_id:
1535 :param error_description:
1536 :param error_detail:
1537 :param other_update: Other required changes at database if provided, will be cleared
1538 :return:
1539 """
1540 try:
1541 db_dict = other_update or {}
1542 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1543 db_dict["_admin.current-operation"] = current_operation_id
1544 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1545 db_dict["currentOperation"] = current_operation
1546 db_dict["currentOperationID"] = current_operation_id
1547 db_dict["errorDescription"] = error_description
1548 db_dict["errorDetail"] = error_detail
1549
1550 if ns_state:
1551 db_dict["nsState"] = ns_state
1552 self.update_db_2("nsrs", nsr_id, db_dict)
1553 except DbException as e:
1554 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1555
1556 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1557 operation_state: str = None, other_update: dict = None):
1558 try:
1559 db_dict = other_update or {}
1560 db_dict['queuePosition'] = queuePosition
1561 if isinstance(stage, list):
1562 db_dict['stage'] = stage[0]
1563 db_dict['detailed-status'] = " ".join(stage)
1564 elif stage is not None:
1565 db_dict['stage'] = str(stage)
1566
1567 if error_message is not None:
1568 db_dict['errorMessage'] = error_message
1569 if operation_state is not None:
1570 db_dict['operationState'] = operation_state
1571 db_dict["statusEnteredTime"] = time()
1572 self.update_db_2("nslcmops", op_id, db_dict)
1573 except DbException as e:
1574 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1575
1576 def _write_all_config_status(self, db_nsr: dict, status: str):
1577 try:
1578 nsr_id = db_nsr["_id"]
1579 # configurationStatus
1580 config_status = db_nsr.get('configurationStatus')
1581 if config_status:
1582 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1583 enumerate(config_status) if v}
1584 # update status
1585 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1586
1587 except DbException as e:
1588 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1589
1590 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1591 element_under_configuration: str = None, element_type: str = None,
1592 other_update: dict = None):
1593
1594 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1595 # .format(vca_index, status))
1596
1597 try:
1598 db_path = 'configurationStatus.{}.'.format(vca_index)
1599 db_dict = other_update or {}
1600 if status:
1601 db_dict[db_path + 'status'] = status
1602 if element_under_configuration:
1603 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1604 if element_type:
1605 db_dict[db_path + 'elementType'] = element_type
1606 self.update_db_2("nsrs", nsr_id, db_dict)
1607 except DbException as e:
1608 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1609 .format(status, nsr_id, vca_index, e))
1610
1611 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1612 """
1613 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1614 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1615 Database is used because the result can be obtained from a different LCM worker in case of HA.
1616 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1617 :param db_nslcmop: database content of nslcmop
1618 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1619 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1620 computed 'vim-account-id'
1621 """
1622 modified = False
1623 nslcmop_id = db_nslcmop['_id']
1624 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1625 if placement_engine == "PLA":
1626 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1627 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1628 db_poll_interval = 5
1629 wait = db_poll_interval * 10
1630 pla_result = None
1631 while not pla_result and wait >= 0:
1632 await asyncio.sleep(db_poll_interval)
1633 wait -= db_poll_interval
1634 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1635 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1636
1637 if not pla_result:
1638 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1639
1640 for pla_vnf in pla_result['vnf']:
1641 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1642 if not pla_vnf.get('vimAccountId') or not vnfr:
1643 continue
1644 modified = True
1645 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1646 # Modifies db_vnfrs
1647 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1648 return modified
1649
1650 def update_nsrs_with_pla_result(self, params):
1651 try:
1652 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1653 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1654 except Exception as e:
1655 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1656
1657 async def instantiate(self, nsr_id, nslcmop_id):
1658 """
1659
1660 :param nsr_id: ns instance to deploy
1661 :param nslcmop_id: operation to run
1662 :return:
1663 """
1664
1665 # Try to lock HA task here
1666 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1667 if not task_is_locked_by_me:
1668 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1669 return
1670
1671 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1672 self.logger.debug(logging_text + "Enter")
1673
1674 # get all needed from database
1675
1676 # database nsrs record
1677 db_nsr = None
1678
1679 # database nslcmops record
1680 db_nslcmop = None
1681
1682 # update operation on nsrs
1683 db_nsr_update = {}
1684 # update operation on nslcmops
1685 db_nslcmop_update = {}
1686
1687 nslcmop_operation_state = None
1688 db_vnfrs = {} # vnf's info indexed by member-index
1689 # n2vc_info = {}
1690 tasks_dict_info = {} # from task to info text
1691 exc = None
1692 error_list = []
1693 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1694 # ^ stage, step, VIM progress
1695 try:
1696 # wait for any previous tasks in process
1697 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1698
1699 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1700 stage[1] = "Reading from database."
1701 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1702 db_nsr_update["detailed-status"] = "creating"
1703 db_nsr_update["operational-status"] = "init"
1704 self._write_ns_status(
1705 nsr_id=nsr_id,
1706 ns_state="BUILDING",
1707 current_operation="INSTANTIATING",
1708 current_operation_id=nslcmop_id,
1709 other_update=db_nsr_update
1710 )
1711 self._write_op_status(
1712 op_id=nslcmop_id,
1713 stage=stage,
1714 queuePosition=0
1715 )
1716
1717 # read from db: operation
1718 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1719 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1720 ns_params = db_nslcmop.get("operationParams")
1721 if ns_params and ns_params.get("timeout_ns_deploy"):
1722 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1723 else:
1724 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1725
1726 # read from db: ns
1727 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1728 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1729 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1730 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1731 self.fs.sync(db_nsr["nsd-id"])
1732 db_nsr["nsd"] = nsd
1733 # nsr_name = db_nsr["name"] # TODO short-name??
1734
1735 # read from db: vnf's of this ns
1736 stage[1] = "Getting vnfrs from db."
1737 self.logger.debug(logging_text + stage[1])
1738 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1739
1740 # read from db: vnfd's for every vnf
1741 db_vnfds = [] # every vnfd data
1742
1743 # for each vnf in ns, read vnfd
1744 for vnfr in db_vnfrs_list:
1745 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1746 vnfd_id = vnfr["vnfd-id"]
1747 vnfd_ref = vnfr["vnfd-ref"]
1748 self.fs.sync(vnfd_id)
1749
1750 # if we haven't this vnfd, read it from db
1751 if vnfd_id not in db_vnfds:
1752 # read from db
1753 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1754 self.logger.debug(logging_text + stage[1])
1755 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1756
1757 # store vnfd
1758 db_vnfds.append(vnfd)
1759
1760 # Get or generates the _admin.deployed.VCA list
1761 vca_deployed_list = None
1762 if db_nsr["_admin"].get("deployed"):
1763 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1764 if vca_deployed_list is None:
1765 vca_deployed_list = []
1766 configuration_status_list = []
1767 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1768 db_nsr_update["configurationStatus"] = configuration_status_list
1769 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1770 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1771 elif isinstance(vca_deployed_list, dict):
1772 # maintain backward compatibility. Change a dict to list at database
1773 vca_deployed_list = list(vca_deployed_list.values())
1774 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1775 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1776
1777 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1778 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1779 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1780
1781 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1782 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1784 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1785
1786 # n2vc_redesign STEP 2 Deploy Network Scenario
1787 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1788 self._write_op_status(
1789 op_id=nslcmop_id,
1790 stage=stage
1791 )
1792
1793 stage[1] = "Deploying KDUs."
1794 # self.logger.debug(logging_text + "Before deploy_kdus")
1795 # Call to deploy_kdus in case exists the "vdu:kdu" param
1796 await self.deploy_kdus(
1797 logging_text=logging_text,
1798 nsr_id=nsr_id,
1799 nslcmop_id=nslcmop_id,
1800 db_vnfrs=db_vnfrs,
1801 db_vnfds=db_vnfds,
1802 task_instantiation_info=tasks_dict_info,
1803 )
1804
1805 stage[1] = "Getting VCA public key."
1806 # n2vc_redesign STEP 1 Get VCA public ssh-key
1807 # feature 1429. Add n2vc public key to needed VMs
1808 n2vc_key = self.n2vc.get_public_key()
1809 n2vc_key_list = [n2vc_key]
1810 if self.vca_config.get("public_key"):
1811 n2vc_key_list.append(self.vca_config["public_key"])
1812
1813 stage[1] = "Deploying NS at VIM."
1814 task_ro = asyncio.ensure_future(
1815 self.instantiate_RO(
1816 logging_text=logging_text,
1817 nsr_id=nsr_id,
1818 nsd=nsd,
1819 db_nsr=db_nsr,
1820 db_nslcmop=db_nslcmop,
1821 db_vnfrs=db_vnfrs,
1822 db_vnfds=db_vnfds,
1823 n2vc_key_list=n2vc_key_list,
1824 stage=stage
1825 )
1826 )
1827 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1828 tasks_dict_info[task_ro] = "Deploying at VIM"
1829
1830 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1831 stage[1] = "Deploying Execution Environments."
1832 self.logger.debug(logging_text + stage[1])
1833
1834 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1835 for vnf_profile in get_vnf_profiles(nsd):
1836 vnfd_id = vnf_profile["vnfd-id"]
1837 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1838 member_vnf_index = str(vnf_profile["id"])
1839 db_vnfr = db_vnfrs[member_vnf_index]
1840 base_folder = vnfd["_admin"]["storage"]
1841 vdu_id = None
1842 vdu_index = 0
1843 vdu_name = None
1844 kdu_name = None
1845
1846 # Get additional parameters
1847 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1848 if db_vnfr.get("additionalParamsForVnf"):
1849 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1850
1851 descriptor_config = get_configuration(vnfd, vnfd["id"])
1852 if descriptor_config:
1853 self._deploy_n2vc(
1854 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1855 db_nsr=db_nsr,
1856 db_vnfr=db_vnfr,
1857 nslcmop_id=nslcmop_id,
1858 nsr_id=nsr_id,
1859 nsi_id=nsi_id,
1860 vnfd_id=vnfd_id,
1861 vdu_id=vdu_id,
1862 kdu_name=kdu_name,
1863 member_vnf_index=member_vnf_index,
1864 vdu_index=vdu_index,
1865 vdu_name=vdu_name,
1866 deploy_params=deploy_params,
1867 descriptor_config=descriptor_config,
1868 base_folder=base_folder,
1869 task_instantiation_info=tasks_dict_info,
1870 stage=stage
1871 )
1872
1873 # Deploy charms for each VDU that supports one.
1874 for vdud in get_vdu_list(vnfd):
1875 vdu_id = vdud["id"]
1876 descriptor_config = get_configuration(vnfd, vdu_id)
1877 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1878
1879 if vdur.get("additionalParams"):
1880 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1881 else:
1882 deploy_params_vdu = deploy_params
1883 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1884 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1885
1886 self.logger.debug("VDUD > {}".format(vdud))
1887 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1888 if descriptor_config:
1889 vdu_name = None
1890 kdu_name = None
1891 for vdu_index in range(vdud_count):
1892 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1893 self._deploy_n2vc(
1894 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1895 member_vnf_index, vdu_id, vdu_index),
1896 db_nsr=db_nsr,
1897 db_vnfr=db_vnfr,
1898 nslcmop_id=nslcmop_id,
1899 nsr_id=nsr_id,
1900 nsi_id=nsi_id,
1901 vnfd_id=vnfd_id,
1902 vdu_id=vdu_id,
1903 kdu_name=kdu_name,
1904 member_vnf_index=member_vnf_index,
1905 vdu_index=vdu_index,
1906 vdu_name=vdu_name,
1907 deploy_params=deploy_params_vdu,
1908 descriptor_config=descriptor_config,
1909 base_folder=base_folder,
1910 task_instantiation_info=tasks_dict_info,
1911 stage=stage
1912 )
1913 for kdud in get_kdu_list(vnfd):
1914 kdu_name = kdud["name"]
1915 descriptor_config = get_configuration(vnfd, kdu_name)
1916 if descriptor_config:
1917 vdu_id = None
1918 vdu_index = 0
1919 vdu_name = None
1920 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1921 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1922 if kdur.get("additionalParams"):
1923 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1924
1925 self._deploy_n2vc(
1926 logging_text=logging_text,
1927 db_nsr=db_nsr,
1928 db_vnfr=db_vnfr,
1929 nslcmop_id=nslcmop_id,
1930 nsr_id=nsr_id,
1931 nsi_id=nsi_id,
1932 vnfd_id=vnfd_id,
1933 vdu_id=vdu_id,
1934 kdu_name=kdu_name,
1935 member_vnf_index=member_vnf_index,
1936 vdu_index=vdu_index,
1937 vdu_name=vdu_name,
1938 deploy_params=deploy_params_kdu,
1939 descriptor_config=descriptor_config,
1940 base_folder=base_folder,
1941 task_instantiation_info=tasks_dict_info,
1942 stage=stage
1943 )
1944
1945 # Check if this NS has a charm configuration
1946 descriptor_config = nsd.get("ns-configuration")
1947 if descriptor_config and descriptor_config.get("juju"):
1948 vnfd_id = None
1949 db_vnfr = None
1950 member_vnf_index = None
1951 vdu_id = None
1952 kdu_name = None
1953 vdu_index = 0
1954 vdu_name = None
1955
1956 # Get additional parameters
1957 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1958 if db_nsr.get("additionalParamsForNs"):
1959 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1960 base_folder = nsd["_admin"]["storage"]
1961 self._deploy_n2vc(
1962 logging_text=logging_text,
1963 db_nsr=db_nsr,
1964 db_vnfr=db_vnfr,
1965 nslcmop_id=nslcmop_id,
1966 nsr_id=nsr_id,
1967 nsi_id=nsi_id,
1968 vnfd_id=vnfd_id,
1969 vdu_id=vdu_id,
1970 kdu_name=kdu_name,
1971 member_vnf_index=member_vnf_index,
1972 vdu_index=vdu_index,
1973 vdu_name=vdu_name,
1974 deploy_params=deploy_params,
1975 descriptor_config=descriptor_config,
1976 base_folder=base_folder,
1977 task_instantiation_info=tasks_dict_info,
1978 stage=stage
1979 )
1980
1981 # rest of staff will be done at finally
1982
1983 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1984 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1985 exc = e
1986 except asyncio.CancelledError:
1987 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1988 exc = "Operation was cancelled"
1989 except Exception as e:
1990 exc = traceback.format_exc()
1991 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1992 finally:
1993 if exc:
1994 error_list.append(str(exc))
1995 try:
1996 # wait for pending tasks
1997 if tasks_dict_info:
1998 stage[1] = "Waiting for instantiate pending tasks."
1999 self.logger.debug(logging_text + stage[1])
2000 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2001 stage, nslcmop_id, nsr_id=nsr_id)
2002 stage[1] = stage[2] = ""
2003 except asyncio.CancelledError:
2004 error_list.append("Cancelled")
2005 # TODO cancel all tasks
2006 except Exception as exc:
2007 error_list.append(str(exc))
2008
2009 # update operation-status
2010 db_nsr_update["operational-status"] = "running"
2011 # let's begin with VCA 'configured' status (later we can change it)
2012 db_nsr_update["config-status"] = "configured"
2013 for task, task_name in tasks_dict_info.items():
2014 if not task.done() or task.cancelled() or task.exception():
2015 if task_name.startswith(self.task_name_deploy_vca):
2016 # A N2VC task is pending
2017 db_nsr_update["config-status"] = "failed"
2018 else:
2019 # RO or KDU task is pending
2020 db_nsr_update["operational-status"] = "failed"
2021
2022 # update status at database
2023 if error_list:
2024 error_detail = ". ".join(error_list)
2025 self.logger.error(logging_text + error_detail)
2026 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2027 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2028
2029 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2030 db_nslcmop_update["detailed-status"] = error_detail
2031 nslcmop_operation_state = "FAILED"
2032 ns_state = "BROKEN"
2033 else:
2034 error_detail = None
2035 error_description_nsr = error_description_nslcmop = None
2036 ns_state = "READY"
2037 db_nsr_update["detailed-status"] = "Done"
2038 db_nslcmop_update["detailed-status"] = "Done"
2039 nslcmop_operation_state = "COMPLETED"
2040
2041 if db_nsr:
2042 self._write_ns_status(
2043 nsr_id=nsr_id,
2044 ns_state=ns_state,
2045 current_operation="IDLE",
2046 current_operation_id=None,
2047 error_description=error_description_nsr,
2048 error_detail=error_detail,
2049 other_update=db_nsr_update
2050 )
2051 self._write_op_status(
2052 op_id=nslcmop_id,
2053 stage="",
2054 error_message=error_description_nslcmop,
2055 operation_state=nslcmop_operation_state,
2056 other_update=db_nslcmop_update,
2057 )
2058
2059 if nslcmop_operation_state:
2060 try:
2061 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2062 "operationState": nslcmop_operation_state},
2063 loop=self.loop)
2064 except Exception as e:
2065 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2066
2067 self.logger.debug(logging_text + "Exit")
2068 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2069
2070 async def _add_vca_relations(
2071 self,
2072 logging_text,
2073 nsr_id,
2074 vca_index: int,
2075 timeout: int = 3600,
2076 vca_type: str = None,
2077 vca_id: str = None,
2078 ) -> bool:
2079
2080 # steps:
2081 # 1. find all relations for this VCA
2082 # 2. wait for other peers related
2083 # 3. add relations
2084
2085 try:
2086 vca_type = vca_type or "lxc_proxy_charm"
2087
2088 # STEP 1: find all relations for this VCA
2089
2090 # read nsr record
2091 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2092 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2093
2094 # this VCA data
2095 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2096
2097 # read all ns-configuration relations
2098 ns_relations = list()
2099 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2100 if db_ns_relations:
2101 for r in db_ns_relations:
2102 # check if this VCA is in the relation
2103 if my_vca.get('member-vnf-index') in\
2104 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2105 ns_relations.append(r)
2106
2107 # read all vnf-configuration relations
2108 vnf_relations = list()
2109 db_vnfd_list = db_nsr.get('vnfd-id')
2110 if db_vnfd_list:
2111 for vnfd in db_vnfd_list:
2112 db_vnf_relations = None
2113 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2114 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2115 if db_vnf_configuration:
2116 db_vnf_relations = db_vnf_configuration.get("relation", [])
2117 if db_vnf_relations:
2118 for r in db_vnf_relations:
2119 # check if this VCA is in the relation
2120 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2121 vnf_relations.append(r)
2122
2123 # if no relations, terminate
2124 if not ns_relations and not vnf_relations:
2125 self.logger.debug(logging_text + ' No relations')
2126 return True
2127
2128 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2129
2130 # add all relations
2131 start = time()
2132 while True:
2133 # check timeout
2134 now = time()
2135 if now - start >= timeout:
2136 self.logger.error(logging_text + ' : timeout adding relations')
2137 return False
2138
2139 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2140 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2141
2142 # for each defined NS relation, find the VCA's related
2143 for r in ns_relations.copy():
2144 from_vca_ee_id = None
2145 to_vca_ee_id = None
2146 from_vca_endpoint = None
2147 to_vca_endpoint = None
2148 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2149 for vca in vca_list:
2150 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2151 and vca.get('config_sw_installed'):
2152 from_vca_ee_id = vca.get('ee_id')
2153 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2154 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2155 and vca.get('config_sw_installed'):
2156 to_vca_ee_id = vca.get('ee_id')
2157 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2158 if from_vca_ee_id and to_vca_ee_id:
2159 # add relation
2160 await self.vca_map[vca_type].add_relation(
2161 ee_id_1=from_vca_ee_id,
2162 ee_id_2=to_vca_ee_id,
2163 endpoint_1=from_vca_endpoint,
2164 endpoint_2=to_vca_endpoint,
2165 vca_id=vca_id,
2166 )
2167 # remove entry from relations list
2168 ns_relations.remove(r)
2169 else:
2170 # check failed peers
2171 try:
2172 vca_status_list = db_nsr.get('configurationStatus')
2173 if vca_status_list:
2174 for i in range(len(vca_list)):
2175 vca = vca_list[i]
2176 vca_status = vca_status_list[i]
2177 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2178 if vca_status.get('status') == 'BROKEN':
2179 # peer broken: remove relation from list
2180 ns_relations.remove(r)
2181 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2182 if vca_status.get('status') == 'BROKEN':
2183 # peer broken: remove relation from list
2184 ns_relations.remove(r)
2185 except Exception:
2186 # ignore
2187 pass
2188
2189 # for each defined VNF relation, find the VCA's related
2190 for r in vnf_relations.copy():
2191 from_vca_ee_id = None
2192 to_vca_ee_id = None
2193 from_vca_endpoint = None
2194 to_vca_endpoint = None
2195 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2196 for vca in vca_list:
2197 key_to_check = "vdu_id"
2198 if vca.get("vdu_id") is None:
2199 key_to_check = "vnfd_id"
2200 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2201 from_vca_ee_id = vca.get('ee_id')
2202 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2203 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2204 to_vca_ee_id = vca.get('ee_id')
2205 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2206 if from_vca_ee_id and to_vca_ee_id:
2207 # add relation
2208 await self.vca_map[vca_type].add_relation(
2209 ee_id_1=from_vca_ee_id,
2210 ee_id_2=to_vca_ee_id,
2211 endpoint_1=from_vca_endpoint,
2212 endpoint_2=to_vca_endpoint,
2213 vca_id=vca_id,
2214 )
2215 # remove entry from relations list
2216 vnf_relations.remove(r)
2217 else:
2218 # check failed peers
2219 try:
2220 vca_status_list = db_nsr.get('configurationStatus')
2221 if vca_status_list:
2222 for i in range(len(vca_list)):
2223 vca = vca_list[i]
2224 vca_status = vca_status_list[i]
2225 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2226 if vca_status.get('status') == 'BROKEN':
2227 # peer broken: remove relation from list
2228 vnf_relations.remove(r)
2229 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2230 if vca_status.get('status') == 'BROKEN':
2231 # peer broken: remove relation from list
2232 vnf_relations.remove(r)
2233 except Exception:
2234 # ignore
2235 pass
2236
2237 # wait for next try
2238 await asyncio.sleep(5.0)
2239
2240 if not ns_relations and not vnf_relations:
2241 self.logger.debug('Relations added')
2242 break
2243
2244 return True
2245
2246 except Exception as e:
2247 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2248 return False
2249
2250 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2251 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600,
2252 vca_id: str = None):
2253
2254 try:
2255 k8sclustertype = k8s_instance_info["k8scluster-type"]
2256 # Instantiate kdu
2257 db_dict_install = {"collection": "nsrs",
2258 "filter": {"_id": nsr_id},
2259 "path": nsr_db_path}
2260
2261 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2262 db_dict=db_dict_install,
2263 kdu_model=k8s_instance_info["kdu-model"],
2264 kdu_name=k8s_instance_info["kdu-name"],
2265 )
2266 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2267 await self.k8scluster_map[k8sclustertype].install(
2268 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2269 kdu_model=k8s_instance_info["kdu-model"],
2270 atomic=True,
2271 params=k8params,
2272 db_dict=db_dict_install,
2273 timeout=timeout,
2274 kdu_name=k8s_instance_info["kdu-name"],
2275 namespace=k8s_instance_info["namespace"],
2276 kdu_instance=kdu_instance,
2277 vca_id=vca_id,
2278 )
2279 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2280
2281 # Obtain services to obtain management service ip
2282 services = await self.k8scluster_map[k8sclustertype].get_services(
2283 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2284 kdu_instance=kdu_instance,
2285 namespace=k8s_instance_info["namespace"])
2286
2287 # Obtain management service info (if exists)
2288 vnfr_update_dict = {}
2289 kdu_config = get_configuration(vnfd, kdud["name"])
2290 if kdu_config:
2291 target_ee_list = kdu_config.get("execution-environment-list", [])
2292 else:
2293 target_ee_list = []
2294
2295 if services:
2296 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2297 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2298 for mgmt_service in mgmt_services:
2299 for service in services:
2300 if service["name"].startswith(mgmt_service["name"]):
2301 # Mgmt service found, Obtain service ip
2302 ip = service.get("external_ip", service.get("cluster_ip"))
2303 if isinstance(ip, list) and len(ip) == 1:
2304 ip = ip[0]
2305
2306 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2307
2308 # Check if must update also mgmt ip at the vnf
2309 service_external_cp = mgmt_service.get("external-connection-point-ref")
2310 if service_external_cp:
2311 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2312 vnfr_update_dict["ip-address"] = ip
2313
2314 if find_in_list(
2315 target_ee_list,
2316 lambda ee: ee.get("external-connection-point-ref", "") == service_external_cp
2317 ):
2318 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2319 break
2320 else:
2321 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2322
2323 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2324 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2325
2326 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2327 if kdu_config and kdu_config.get("initial-config-primitive") and \
2328 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2329 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2330 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2331
2332 for initial_config_primitive in initial_config_primitive_list:
2333 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2334
2335 await asyncio.wait_for(
2336 self.k8scluster_map[k8sclustertype].exec_primitive(
2337 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2338 kdu_instance=kdu_instance,
2339 primitive_name=initial_config_primitive["name"],
2340 params=primitive_params_, db_dict=db_dict_install,
2341 vca_id=vca_id,
2342 ),
2343 timeout=timeout
2344 )
2345
2346 except Exception as e:
2347 # Prepare update db with error and raise exception
2348 try:
2349 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2350 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2351 except Exception:
2352 # ignore to keep original exception
2353 pass
2354 # reraise original error
2355 raise
2356
2357 return kdu_instance
2358
2359 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2360 # Launch kdus if present in the descriptor
2361
2362 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2363
2364 async def _get_cluster_id(cluster_id, cluster_type):
2365 nonlocal k8scluster_id_2_uuic
2366 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2367 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2368
2369 # check if K8scluster is creating and wait look if previous tasks in process
2370 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2371 if task_dependency:
2372 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2373 self.logger.debug(logging_text + text)
2374 await asyncio.wait(task_dependency, timeout=3600)
2375
2376 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2377 if not db_k8scluster:
2378 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2379
2380 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2381 if not k8s_id:
2382 if cluster_type == "helm-chart-v3":
2383 try:
2384 # backward compatibility for existing clusters that have not been initialized for helm v3
2385 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2386 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2387 reuse_cluster_uuid=cluster_id)
2388 db_k8scluster_update = {}
2389 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2390 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2391 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2392 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2393 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2394 except Exception as e:
2395 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2396 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2397 cluster_type))
2398 else:
2399 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2400 format(cluster_id, cluster_type))
2401 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2402 return k8s_id
2403
2404 logging_text += "Deploy kdus: "
2405 step = ""
2406 try:
2407 db_nsr_update = {"_admin.deployed.K8s": []}
2408 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2409
2410 index = 0
2411 updated_cluster_list = []
2412 updated_v3_cluster_list = []
2413
2414 for vnfr_data in db_vnfrs.values():
2415 vca_id = self.get_vca_id(vnfr_data, {})
2416 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2417 # Step 0: Prepare and set parameters
2418 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2419 vnfd_id = vnfr_data.get('vnfd-id')
2420 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2421 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2422 namespace = kdur.get("k8s-namespace")
2423 if kdur.get("helm-chart"):
2424 kdumodel = kdur["helm-chart"]
2425 # Default version: helm3, if helm-version is v2 assign v2
2426 k8sclustertype = "helm-chart-v3"
2427 self.logger.debug("kdur: {}".format(kdur))
2428 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2429 k8sclustertype = "helm-chart"
2430 elif kdur.get("juju-bundle"):
2431 kdumodel = kdur["juju-bundle"]
2432 k8sclustertype = "juju-bundle"
2433 else:
2434 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2435 "juju-bundle. Maybe an old NBI version is running".
2436 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2437 # check if kdumodel is a file and exists
2438 try:
2439 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2440 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2441 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2442 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2443 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2444 kdumodel)
2445 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2446 kdumodel = self.fs.path + filename
2447 except (asyncio.TimeoutError, asyncio.CancelledError):
2448 raise
2449 except Exception: # it is not a file
2450 pass
2451
2452 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2453 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2454 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2455
2456 # Synchronize repos
2457 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2458 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2459 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2460 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2461 if del_repo_list or added_repo_dict:
2462 if k8sclustertype == "helm-chart":
2463 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2464 updated = {'_admin.helm_charts_added.' +
2465 item: name for item, name in added_repo_dict.items()}
2466 updated_cluster_list.append(cluster_uuid)
2467 elif k8sclustertype == "helm-chart-v3":
2468 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2469 updated = {'_admin.helm_charts_v3_added.' +
2470 item: name for item, name in added_repo_dict.items()}
2471 updated_v3_cluster_list.append(cluster_uuid)
2472 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2473 "'{}' to_delete: {}, to_add: {}".
2474 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2475 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2476
2477 # Instantiate kdu
2478 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2479 kdur["kdu-name"], k8s_cluster_id)
2480 k8s_instance_info = {"kdu-instance": None,
2481 "k8scluster-uuid": cluster_uuid,
2482 "k8scluster-type": k8sclustertype,
2483 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2484 "kdu-name": kdur["kdu-name"],
2485 "kdu-model": kdumodel,
2486 "namespace": namespace}
2487 db_path = "_admin.deployed.K8s.{}".format(index)
2488 db_nsr_update[db_path] = k8s_instance_info
2489 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2490 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2491 task = asyncio.ensure_future(
2492 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2493 k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id))
2494 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2495 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2496
2497 index += 1
2498
2499 except (LcmException, asyncio.CancelledError):
2500 raise
2501 except Exception as e:
2502 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2503 if isinstance(e, (N2VCException, DbException)):
2504 self.logger.error(logging_text + msg)
2505 else:
2506 self.logger.critical(logging_text + msg, exc_info=True)
2507 raise LcmException(msg)
2508 finally:
2509 if db_nsr_update:
2510 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2511
2512 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2513 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2514 base_folder, task_instantiation_info, stage):
2515 # launch instantiate_N2VC in a asyncio task and register task object
2516 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2517 # if not found, create one entry and update database
2518 # fill db_nsr._admin.deployed.VCA.<index>
2519
2520 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2521 if "execution-environment-list" in descriptor_config:
2522 ee_list = descriptor_config.get("execution-environment-list", [])
2523 else: # other types as script are not supported
2524 ee_list = []
2525
2526 for ee_item in ee_list:
2527 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2528 ee_item.get("helm-chart")))
2529 ee_descriptor_id = ee_item.get("id")
2530 if ee_item.get("juju"):
2531 vca_name = ee_item['juju'].get('charm')
2532 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2533 if ee_item['juju'].get('cloud') == "k8s":
2534 vca_type = "k8s_proxy_charm"
2535 elif ee_item['juju'].get('proxy') is False:
2536 vca_type = "native_charm"
2537 elif ee_item.get("helm-chart"):
2538 vca_name = ee_item['helm-chart']
2539 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2540 vca_type = "helm"
2541 else:
2542 vca_type = "helm-v3"
2543 else:
2544 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2545 continue
2546
2547 vca_index = -1
2548 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2549 if not vca_deployed:
2550 continue
2551 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2552 vca_deployed.get("vdu_id") == vdu_id and \
2553 vca_deployed.get("kdu_name") == kdu_name and \
2554 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2555 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2556 break
2557 else:
2558 # not found, create one.
2559 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2560 if vdu_id:
2561 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2562 elif kdu_name:
2563 target += "/kdu/{}".format(kdu_name)
2564 vca_deployed = {
2565 "target_element": target,
2566 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2567 "member-vnf-index": member_vnf_index,
2568 "vdu_id": vdu_id,
2569 "kdu_name": kdu_name,
2570 "vdu_count_index": vdu_index,
2571 "operational-status": "init", # TODO revise
2572 "detailed-status": "", # TODO revise
2573 "step": "initial-deploy", # TODO revise
2574 "vnfd_id": vnfd_id,
2575 "vdu_name": vdu_name,
2576 "type": vca_type,
2577 "ee_descriptor_id": ee_descriptor_id
2578 }
2579 vca_index += 1
2580
2581 # create VCA and configurationStatus in db
2582 db_dict = {
2583 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2584 "configurationStatus.{}".format(vca_index): dict()
2585 }
2586 self.update_db_2("nsrs", nsr_id, db_dict)
2587
2588 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2589
2590 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2591 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2592 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2593
2594 # Launch task
2595 task_n2vc = asyncio.ensure_future(
2596 self.instantiate_N2VC(
2597 logging_text=logging_text,
2598 vca_index=vca_index,
2599 nsi_id=nsi_id,
2600 db_nsr=db_nsr,
2601 db_vnfr=db_vnfr,
2602 vdu_id=vdu_id,
2603 kdu_name=kdu_name,
2604 vdu_index=vdu_index,
2605 deploy_params=deploy_params,
2606 config_descriptor=descriptor_config,
2607 base_folder=base_folder,
2608 nslcmop_id=nslcmop_id,
2609 stage=stage,
2610 vca_type=vca_type,
2611 vca_name=vca_name,
2612 ee_config_descriptor=ee_item
2613 )
2614 )
2615 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2616 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2617 member_vnf_index or "", vdu_id or "")
2618
2619 @staticmethod
2620 def _create_nslcmop(nsr_id, operation, params):
2621 """
2622 Creates a ns-lcm-opp content to be stored at database.
2623 :param nsr_id: internal id of the instance
2624 :param operation: instantiate, terminate, scale, action, ...
2625 :param params: user parameters for the operation
2626 :return: dictionary following SOL005 format
2627 """
2628 # Raise exception if invalid arguments
2629 if not (nsr_id and operation and params):
2630 raise LcmException(
2631 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2632 now = time()
2633 _id = str(uuid4())
2634 nslcmop = {
2635 "id": _id,
2636 "_id": _id,
2637 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2638 "operationState": "PROCESSING",
2639 "statusEnteredTime": now,
2640 "nsInstanceId": nsr_id,
2641 "lcmOperationType": operation,
2642 "startTime": now,
2643 "isAutomaticInvocation": False,
2644 "operationParams": params,
2645 "isCancelPending": False,
2646 "links": {
2647 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2648 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2649 }
2650 }
2651 return nslcmop
2652
2653 def _format_additional_params(self, params):
2654 params = params or {}
2655 for key, value in params.items():
2656 if str(value).startswith("!!yaml "):
2657 params[key] = yaml.safe_load(value[7:])
2658 return params
2659
2660 def _get_terminate_primitive_params(self, seq, vnf_index):
2661 primitive = seq.get('name')
2662 primitive_params = {}
2663 params = {
2664 "member_vnf_index": vnf_index,
2665 "primitive": primitive,
2666 "primitive_params": primitive_params,
2667 }
2668 desc_params = {}
2669 return self._map_primitive_params(seq, params, desc_params)
2670
2671 # sub-operations
2672
2673 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2674 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2675 if op.get('operationState') == 'COMPLETED':
2676 # b. Skip sub-operation
2677 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2678 return self.SUBOPERATION_STATUS_SKIP
2679 else:
2680 # c. retry executing sub-operation
2681 # The sub-operation exists, and operationState != 'COMPLETED'
2682 # Update operationState = 'PROCESSING' to indicate a retry.
2683 operationState = 'PROCESSING'
2684 detailed_status = 'In progress'
2685 self._update_suboperation_status(
2686 db_nslcmop, op_index, operationState, detailed_status)
2687 # Return the sub-operation index
2688 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2689 # with arguments extracted from the sub-operation
2690 return op_index
2691
2692 # Find a sub-operation where all keys in a matching dictionary must match
2693 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2694 def _find_suboperation(self, db_nslcmop, match):
2695 if db_nslcmop and match:
2696 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2697 for i, op in enumerate(op_list):
2698 if all(op.get(k) == match[k] for k in match):
2699 return i
2700 return self.SUBOPERATION_STATUS_NOT_FOUND
2701
2702 # Update status for a sub-operation given its index
2703 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2704 # Update DB for HA tasks
2705 q_filter = {'_id': db_nslcmop['_id']}
2706 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2707 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2708 self.db.set_one("nslcmops",
2709 q_filter=q_filter,
2710 update_dict=update_dict,
2711 fail_on_empty=False)
2712
2713 # Add sub-operation, return the index of the added sub-operation
2714 # Optionally, set operationState, detailed-status, and operationType
2715 # Status and type are currently set for 'scale' sub-operations:
2716 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2717 # 'detailed-status' : status message
2718 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2719 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2720 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2721 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2722 RO_nsr_id=None, RO_scaling_info=None):
2723 if not db_nslcmop:
2724 return self.SUBOPERATION_STATUS_NOT_FOUND
2725 # Get the "_admin.operations" list, if it exists
2726 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2727 op_list = db_nslcmop_admin.get('operations')
2728 # Create or append to the "_admin.operations" list
2729 new_op = {'member_vnf_index': vnf_index,
2730 'vdu_id': vdu_id,
2731 'vdu_count_index': vdu_count_index,
2732 'primitive': primitive,
2733 'primitive_params': mapped_primitive_params}
2734 if operationState:
2735 new_op['operationState'] = operationState
2736 if detailed_status:
2737 new_op['detailed-status'] = detailed_status
2738 if operationType:
2739 new_op['lcmOperationType'] = operationType
2740 if RO_nsr_id:
2741 new_op['RO_nsr_id'] = RO_nsr_id
2742 if RO_scaling_info:
2743 new_op['RO_scaling_info'] = RO_scaling_info
2744 if not op_list:
2745 # No existing operations, create key 'operations' with current operation as first list element
2746 db_nslcmop_admin.update({'operations': [new_op]})
2747 op_list = db_nslcmop_admin.get('operations')
2748 else:
2749 # Existing operations, append operation to list
2750 op_list.append(new_op)
2751
2752 db_nslcmop_update = {'_admin.operations': op_list}
2753 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2754 op_index = len(op_list) - 1
2755 return op_index
2756
2757 # Helper methods for scale() sub-operations
2758
2759 # pre-scale/post-scale:
2760 # Check for 3 different cases:
2761 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2762 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2763 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2764 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2765 operationType, RO_nsr_id=None, RO_scaling_info=None):
2766 # Find this sub-operation
2767 if RO_nsr_id and RO_scaling_info:
2768 operationType = 'SCALE-RO'
2769 match = {
2770 'member_vnf_index': vnf_index,
2771 'RO_nsr_id': RO_nsr_id,
2772 'RO_scaling_info': RO_scaling_info,
2773 }
2774 else:
2775 match = {
2776 'member_vnf_index': vnf_index,
2777 'primitive': vnf_config_primitive,
2778 'primitive_params': primitive_params,
2779 'lcmOperationType': operationType
2780 }
2781 op_index = self._find_suboperation(db_nslcmop, match)
2782 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2783 # a. New sub-operation
2784 # The sub-operation does not exist, add it.
2785 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2786 # The following parameters are set to None for all kind of scaling:
2787 vdu_id = None
2788 vdu_count_index = None
2789 vdu_name = None
2790 if RO_nsr_id and RO_scaling_info:
2791 vnf_config_primitive = None
2792 primitive_params = None
2793 else:
2794 RO_nsr_id = None
2795 RO_scaling_info = None
2796 # Initial status for sub-operation
2797 operationState = 'PROCESSING'
2798 detailed_status = 'In progress'
2799 # Add sub-operation for pre/post-scaling (zero or more operations)
2800 self._add_suboperation(db_nslcmop,
2801 vnf_index,
2802 vdu_id,
2803 vdu_count_index,
2804 vdu_name,
2805 vnf_config_primitive,
2806 primitive_params,
2807 operationState,
2808 detailed_status,
2809 operationType,
2810 RO_nsr_id,
2811 RO_scaling_info)
2812 return self.SUBOPERATION_STATUS_NEW
2813 else:
2814 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2815 # or op_index (operationState != 'COMPLETED')
2816 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2817
2818 # Function to return execution_environment id
2819
2820 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2821 # TODO vdu_index_count
2822 for vca in vca_deployed_list:
2823 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2824 return vca["ee_id"]
2825
2826 async def destroy_N2VC(
2827 self,
2828 logging_text,
2829 db_nslcmop,
2830 vca_deployed,
2831 config_descriptor,
2832 vca_index,
2833 destroy_ee=True,
2834 exec_primitives=True,
2835 scaling_in=False,
2836 vca_id: str = None,
2837 ):
2838 """
2839 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2840 :param logging_text:
2841 :param db_nslcmop:
2842 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2843 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2844 :param vca_index: index in the database _admin.deployed.VCA
2845 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2846 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2847 not executed properly
2848 :param scaling_in: True destroys the application, False destroys the model
2849 :return: None or exception
2850 """
2851
2852 self.logger.debug(
2853 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2854 vca_index, vca_deployed, config_descriptor, destroy_ee
2855 )
2856 )
2857
2858 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2859
2860 # execute terminate_primitives
2861 if exec_primitives:
2862 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2863 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2864 vdu_id = vca_deployed.get("vdu_id")
2865 vdu_count_index = vca_deployed.get("vdu_count_index")
2866 vdu_name = vca_deployed.get("vdu_name")
2867 vnf_index = vca_deployed.get("member-vnf-index")
2868 if terminate_primitives and vca_deployed.get("needed_terminate"):
2869 for seq in terminate_primitives:
2870 # For each sequence in list, get primitive and call _ns_execute_primitive()
2871 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2872 vnf_index, seq.get("name"))
2873 self.logger.debug(logging_text + step)
2874 # Create the primitive for each sequence, i.e. "primitive": "touch"
2875 primitive = seq.get('name')
2876 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2877
2878 # Add sub-operation
2879 self._add_suboperation(db_nslcmop,
2880 vnf_index,
2881 vdu_id,
2882 vdu_count_index,
2883 vdu_name,
2884 primitive,
2885 mapped_primitive_params)
2886 # Sub-operations: Call _ns_execute_primitive() instead of action()
2887 try:
2888 result, result_detail = await self._ns_execute_primitive(
2889 vca_deployed["ee_id"], primitive,
2890 mapped_primitive_params,
2891 vca_type=vca_type,
2892 vca_id=vca_id,
2893 )
2894 except LcmException:
2895 # this happens when VCA is not deployed. In this case it is not needed to terminate
2896 continue
2897 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2898 if result not in result_ok:
2899 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2900 "error {}".format(seq.get("name"), vnf_index, result_detail))
2901 # set that this VCA do not need terminated
2902 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2903 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2904
2905 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2906 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2907
2908 if destroy_ee:
2909 await self.vca_map[vca_type].delete_execution_environment(
2910 vca_deployed["ee_id"],
2911 scaling_in=scaling_in,
2912 vca_id=vca_id,
2913 )
2914
2915 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
2916 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2917 namespace = "." + db_nsr["_id"]
2918 try:
2919 await self.n2vc.delete_namespace(
2920 namespace=namespace,
2921 total_timeout=self.timeout_charm_delete,
2922 vca_id=vca_id,
2923 )
2924 except N2VCNotFound: # already deleted. Skip
2925 pass
2926 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2927
2928 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2929 """
2930 Terminates a deployment from RO
2931 :param logging_text:
2932 :param nsr_deployed: db_nsr._admin.deployed
2933 :param nsr_id:
2934 :param nslcmop_id:
2935 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2936 this method will update only the index 2, but it will write on database the concatenated content of the list
2937 :return:
2938 """
2939 db_nsr_update = {}
2940 failed_detail = []
2941 ro_nsr_id = ro_delete_action = None
2942 if nsr_deployed and nsr_deployed.get("RO"):
2943 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2944 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2945 try:
2946 if ro_nsr_id:
2947 stage[2] = "Deleting ns from VIM."
2948 db_nsr_update["detailed-status"] = " ".join(stage)
2949 self._write_op_status(nslcmop_id, stage)
2950 self.logger.debug(logging_text + stage[2])
2951 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2952 self._write_op_status(nslcmop_id, stage)
2953 desc = await self.RO.delete("ns", ro_nsr_id)
2954 ro_delete_action = desc["action_id"]
2955 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2956 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2957 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2958 if ro_delete_action:
2959 # wait until NS is deleted from VIM
2960 stage[2] = "Waiting ns deleted from VIM."
2961 detailed_status_old = None
2962 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2963 ro_delete_action))
2964 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2965 self._write_op_status(nslcmop_id, stage)
2966
2967 delete_timeout = 20 * 60 # 20 minutes
2968 while delete_timeout > 0:
2969 desc = await self.RO.show(
2970 "ns",
2971 item_id_name=ro_nsr_id,
2972 extra_item="action",
2973 extra_item_id=ro_delete_action)
2974
2975 # deploymentStatus
2976 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2977
2978 ns_status, ns_status_info = self.RO.check_action_status(desc)
2979 if ns_status == "ERROR":
2980 raise ROclient.ROClientException(ns_status_info)
2981 elif ns_status == "BUILD":
2982 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2983 elif ns_status == "ACTIVE":
2984 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2985 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2986 break
2987 else:
2988 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2989 if stage[2] != detailed_status_old:
2990 detailed_status_old = stage[2]
2991 db_nsr_update["detailed-status"] = " ".join(stage)
2992 self._write_op_status(nslcmop_id, stage)
2993 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2994 await asyncio.sleep(5, loop=self.loop)
2995 delete_timeout -= 5
2996 else: # delete_timeout <= 0:
2997 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2998
2999 except Exception as e:
3000 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3001 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3002 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3003 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3004 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3005 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3006 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3007 failed_detail.append("delete conflict: {}".format(e))
3008 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3009 else:
3010 failed_detail.append("delete error: {}".format(e))
3011 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3012
3013 # Delete nsd
3014 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3015 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3016 try:
3017 stage[2] = "Deleting nsd from RO."
3018 db_nsr_update["detailed-status"] = " ".join(stage)
3019 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3020 self._write_op_status(nslcmop_id, stage)
3021 await self.RO.delete("nsd", ro_nsd_id)
3022 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3023 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3024 except Exception as e:
3025 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3026 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3027 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3028 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3029 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3030 self.logger.debug(logging_text + failed_detail[-1])
3031 else:
3032 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3033 self.logger.error(logging_text + failed_detail[-1])
3034
3035 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3036 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3037 if not vnf_deployed or not vnf_deployed["id"]:
3038 continue
3039 try:
3040 ro_vnfd_id = vnf_deployed["id"]
3041 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3042 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3043 db_nsr_update["detailed-status"] = " ".join(stage)
3044 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3045 self._write_op_status(nslcmop_id, stage)
3046 await self.RO.delete("vnfd", ro_vnfd_id)
3047 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3048 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3049 except Exception as e:
3050 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3051 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3052 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3053 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3054 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3055 self.logger.debug(logging_text + failed_detail[-1])
3056 else:
3057 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3058 self.logger.error(logging_text + failed_detail[-1])
3059
3060 if failed_detail:
3061 stage[2] = "Error deleting from VIM"
3062 else:
3063 stage[2] = "Deleted from VIM"
3064 db_nsr_update["detailed-status"] = " ".join(stage)
3065 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3066 self._write_op_status(nslcmop_id, stage)
3067
3068 if failed_detail:
3069 raise LcmException("; ".join(failed_detail))
3070
3071 async def terminate(self, nsr_id, nslcmop_id):
3072 # Try to lock HA task here
3073 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3074 if not task_is_locked_by_me:
3075 return
3076
3077 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3078 self.logger.debug(logging_text + "Enter")
3079 timeout_ns_terminate = self.timeout_ns_terminate
3080 db_nsr = None
3081 db_nslcmop = None
3082 operation_params = None
3083 exc = None
3084 error_list = [] # annotates all failed error messages
3085 db_nslcmop_update = {}
3086 autoremove = False # autoremove after terminated
3087 tasks_dict_info = {}
3088 db_nsr_update = {}
3089 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3090 # ^ contains [stage, step, VIM-status]
3091 try:
3092 # wait for any previous tasks in process
3093 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3094
3095 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3096 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3097 operation_params = db_nslcmop.get("operationParams") or {}
3098 if operation_params.get("timeout_ns_terminate"):
3099 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3100 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3101 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3102
3103 db_nsr_update["operational-status"] = "terminating"
3104 db_nsr_update["config-status"] = "terminating"
3105 self._write_ns_status(
3106 nsr_id=nsr_id,
3107 ns_state="TERMINATING",
3108 current_operation="TERMINATING",
3109 current_operation_id=nslcmop_id,
3110 other_update=db_nsr_update
3111 )
3112 self._write_op_status(
3113 op_id=nslcmop_id,
3114 queuePosition=0,
3115 stage=stage
3116 )
3117 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3118 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3119 return
3120
3121 stage[1] = "Getting vnf descriptors from db."
3122 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3123 db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list}
3124 db_vnfds_from_id = {}
3125 db_vnfds_from_member_index = {}
3126 # Loop over VNFRs
3127 for vnfr in db_vnfrs_list:
3128 vnfd_id = vnfr["vnfd-id"]
3129 if vnfd_id not in db_vnfds_from_id:
3130 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3131 db_vnfds_from_id[vnfd_id] = vnfd
3132 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3133
3134 # Destroy individual execution environments when there are terminating primitives.
3135 # Rest of EE will be deleted at once
3136 # TODO - check before calling _destroy_N2VC
3137 # if not operation_params.get("skip_terminate_primitives"):#
3138 # or not vca.get("needed_terminate"):
3139 stage[0] = "Stage 2/3 execute terminating primitives."
3140 self.logger.debug(logging_text + stage[0])
3141 stage[1] = "Looking execution environment that needs terminate."
3142 self.logger.debug(logging_text + stage[1])
3143
3144 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3145 config_descriptor = None
3146
3147 vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr)
3148 if not vca or not vca.get("ee_id"):
3149 continue
3150 if not vca.get("member-vnf-index"):
3151 # ns
3152 config_descriptor = db_nsr.get("ns-configuration")
3153 elif vca.get("vdu_id"):
3154 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3155 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3156 elif vca.get("kdu_name"):
3157 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3158 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3159 else:
3160 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3161 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3162 vca_type = vca.get("type")
3163 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3164 vca.get("needed_terminate"))
3165 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3166 # pending native charms
3167 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3168 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3169 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3170 task = asyncio.ensure_future(
3171 self.destroy_N2VC(
3172 logging_text,
3173 db_nslcmop,
3174 vca,
3175 config_descriptor,
3176 vca_index,
3177 destroy_ee,
3178 exec_terminate_primitives,
3179 vca_id=vca_id,
3180 )
3181 )
3182 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3183
3184 # wait for pending tasks of terminate primitives
3185 if tasks_dict_info:
3186 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3187 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3188 min(self.timeout_charm_delete, timeout_ns_terminate),
3189 stage, nslcmop_id)
3190 tasks_dict_info.clear()
3191 if error_list:
3192 return # raise LcmException("; ".join(error_list))
3193
3194 # remove All execution environments at once
3195 stage[0] = "Stage 3/3 delete all."
3196
3197 if nsr_deployed.get("VCA"):
3198 stage[1] = "Deleting all execution environments."
3199 self.logger.debug(logging_text + stage[1])
3200 vca_id = self.get_vca_id({}, db_nsr)
3201 task_delete_ee = asyncio.ensure_future(
3202 asyncio.wait_for(
3203 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
3204 timeout=self.timeout_charm_delete
3205 )
3206 )
3207 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3208 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3209
3210 # Delete from k8scluster
3211 stage[1] = "Deleting KDUs."
3212 self.logger.debug(logging_text + stage[1])
3213 # print(nsr_deployed)
3214 for kdu in get_iterable(nsr_deployed, "K8s"):
3215 if not kdu or not kdu.get("kdu-instance"):
3216 continue
3217 kdu_instance = kdu.get("kdu-instance")
3218 if kdu.get("k8scluster-type") in self.k8scluster_map:
3219 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
3220 vca_id = self.get_vca_id({}, db_nsr)
3221 task_delete_kdu_instance = asyncio.ensure_future(
3222 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3223 cluster_uuid=kdu.get("k8scluster-uuid"),
3224 kdu_instance=kdu_instance,
3225 vca_id=vca_id,
3226 )
3227 )
3228 else:
3229 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3230 format(kdu.get("k8scluster-type")))
3231 continue
3232 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3233
3234 # remove from RO
3235 stage[1] = "Deleting ns from VIM."
3236 if self.ng_ro:
3237 task_delete_ro = asyncio.ensure_future(
3238 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3239 else:
3240 task_delete_ro = asyncio.ensure_future(
3241 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3242 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3243
3244 # rest of staff will be done at finally
3245
3246 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3247 self.logger.error(logging_text + "Exit Exception {}".format(e))
3248 exc = e
3249 except asyncio.CancelledError:
3250 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3251 exc = "Operation was cancelled"
3252 except Exception as e:
3253 exc = traceback.format_exc()
3254 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3255 finally:
3256 if exc:
3257 error_list.append(str(exc))
3258 try:
3259 # wait for pending tasks
3260 if tasks_dict_info:
3261 stage[1] = "Waiting for terminate pending tasks."
3262 self.logger.debug(logging_text + stage[1])
3263 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3264 stage, nslcmop_id)
3265 stage[1] = stage[2] = ""
3266 except asyncio.CancelledError:
3267 error_list.append("Cancelled")
3268 # TODO cancell all tasks
3269 except Exception as exc:
3270 error_list.append(str(exc))
3271 # update status at database
3272 if error_list:
3273 error_detail = "; ".join(error_list)
3274 # self.logger.error(logging_text + error_detail)
3275 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3276 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3277
3278 db_nsr_update["operational-status"] = "failed"
3279 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3280 db_nslcmop_update["detailed-status"] = error_detail
3281 nslcmop_operation_state = "FAILED"
3282 ns_state = "BROKEN"
3283 else:
3284 error_detail = None
3285 error_description_nsr = error_description_nslcmop = None
3286 ns_state = "NOT_INSTANTIATED"
3287 db_nsr_update["operational-status"] = "terminated"
3288 db_nsr_update["detailed-status"] = "Done"
3289 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3290 db_nslcmop_update["detailed-status"] = "Done"
3291 nslcmop_operation_state = "COMPLETED"
3292
3293 if db_nsr:
3294 self._write_ns_status(
3295 nsr_id=nsr_id,
3296 ns_state=ns_state,
3297 current_operation="IDLE",
3298 current_operation_id=None,
3299 error_description=error_description_nsr,
3300 error_detail=error_detail,
3301 other_update=db_nsr_update
3302 )
3303 self._write_op_status(
3304 op_id=nslcmop_id,
3305 stage="",
3306 error_message=error_description_nslcmop,
3307 operation_state=nslcmop_operation_state,
3308 other_update=db_nslcmop_update,
3309 )
3310 if ns_state == "NOT_INSTANTIATED":
3311 try:
3312 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3313 except DbException as e:
3314 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3315 format(nsr_id, e))
3316 if operation_params:
3317 autoremove = operation_params.get("autoremove", False)
3318 if nslcmop_operation_state:
3319 try:
3320 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3321 "operationState": nslcmop_operation_state,
3322 "autoremove": autoremove},
3323 loop=self.loop)
3324 except Exception as e:
3325 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3326
3327 self.logger.debug(logging_text + "Exit")
3328 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3329
3330 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3331 time_start = time()
3332 error_detail_list = []
3333 error_list = []
3334 pending_tasks = list(created_tasks_info.keys())
3335 num_tasks = len(pending_tasks)
3336 num_done = 0
3337 stage[1] = "{}/{}.".format(num_done, num_tasks)
3338 self._write_op_status(nslcmop_id, stage)
3339 while pending_tasks:
3340 new_error = None
3341 _timeout = timeout + time_start - time()
3342 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3343 return_when=asyncio.FIRST_COMPLETED)
3344 num_done += len(done)
3345 if not done: # Timeout
3346 for task in pending_tasks:
3347 new_error = created_tasks_info[task] + ": Timeout"
3348 error_detail_list.append(new_error)
3349 error_list.append(new_error)
3350 break
3351 for task in done:
3352 if task.cancelled():
3353 exc = "Cancelled"
3354 else:
3355 exc = task.exception()
3356 if exc:
3357 if isinstance(exc, asyncio.TimeoutError):
3358 exc = "Timeout"
3359 new_error = created_tasks_info[task] + ": {}".format(exc)
3360 error_list.append(created_tasks_info[task])
3361 error_detail_list.append(new_error)
3362 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3363 K8sException, NgRoException)):
3364 self.logger.error(logging_text + new_error)
3365 else:
3366 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3367 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3368 else:
3369 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3370 stage[1] = "{}/{}.".format(num_done, num_tasks)
3371 if new_error:
3372 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3373 if nsr_id: # update also nsr
3374 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3375 "errorDetail": ". ".join(error_detail_list)})
3376 self._write_op_status(nslcmop_id, stage)
3377 return error_detail_list
3378
3379 @staticmethod
3380 def _map_primitive_params(primitive_desc, params, instantiation_params):
3381 """
3382 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3383 The default-value is used. If it is between < > it look for a value at instantiation_params
3384 :param primitive_desc: portion of VNFD/NSD that describes primitive
3385 :param params: Params provided by user
3386 :param instantiation_params: Instantiation params provided by user
3387 :return: a dictionary with the calculated params
3388 """
3389 calculated_params = {}
3390 for parameter in primitive_desc.get("parameter", ()):
3391 param_name = parameter["name"]
3392 if param_name in params:
3393 calculated_params[param_name] = params[param_name]
3394 elif "default-value" in parameter or "value" in parameter:
3395 if "value" in parameter:
3396 calculated_params[param_name] = parameter["value"]
3397 else:
3398 calculated_params[param_name] = parameter["default-value"]
3399 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3400 and calculated_params[param_name].endswith(">"):
3401 if calculated_params[param_name][1:-1] in instantiation_params:
3402 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3403 else:
3404 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3405 format(calculated_params[param_name], primitive_desc["name"]))
3406 else:
3407 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3408 format(param_name, primitive_desc["name"]))
3409
3410 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3411 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3412 default_flow_style=True, width=256)
3413 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3414 calculated_params[param_name] = calculated_params[param_name][7:]
3415 if parameter.get("data-type") == "INTEGER":
3416 try:
3417 calculated_params[param_name] = int(calculated_params[param_name])
3418 except ValueError: # error converting string to int
3419 raise LcmException(
3420 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3421 elif parameter.get("data-type") == "BOOLEAN":
3422 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3423
3424 # add always ns_config_info if primitive name is config
3425 if primitive_desc["name"] == "config":
3426 if "ns_config_info" in instantiation_params:
3427 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3428 return calculated_params
3429
3430 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3431 ee_descriptor_id=None):
3432 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3433 for vca in deployed_vca:
3434 if not vca:
3435 continue
3436 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3437 continue
3438 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3439 continue
3440 if kdu_name and kdu_name != vca["kdu_name"]:
3441 continue
3442 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3443 continue
3444 break
3445 else:
3446 # vca_deployed not found
3447 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3448 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3449 ee_descriptor_id))
3450 # get ee_id
3451 ee_id = vca.get("ee_id")
3452 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3453 if not ee_id:
3454 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3455 "execution environment"
3456 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3457 return ee_id, vca_type
3458
3459 async def _ns_execute_primitive(
3460 self,
3461 ee_id,
3462 primitive,
3463 primitive_params,
3464 retries=0,
3465 retries_interval=30,
3466 timeout=None,
3467 vca_type=None,
3468 db_dict=None,
3469 vca_id: str = None,
3470 ) -> (str, str):
3471 try:
3472 if primitive == "config":
3473 primitive_params = {"params": primitive_params}
3474
3475 vca_type = vca_type or "lxc_proxy_charm"
3476
3477 while retries >= 0:
3478 try:
3479 output = await asyncio.wait_for(
3480 self.vca_map[vca_type].exec_primitive(
3481 ee_id=ee_id,
3482 primitive_name=primitive,
3483 params_dict=primitive_params,
3484 progress_timeout=self.timeout_progress_primitive,
3485 total_timeout=self.timeout_primitive,
3486 db_dict=db_dict,
3487 vca_id=vca_id,
3488 ),
3489 timeout=timeout or self.timeout_primitive)
3490 # execution was OK
3491 break
3492 except asyncio.CancelledError:
3493 raise
3494 except Exception as e: # asyncio.TimeoutError
3495 if isinstance(e, asyncio.TimeoutError):
3496 e = "Timeout"
3497 retries -= 1
3498 if retries >= 0:
3499 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3500 # wait and retry
3501 await asyncio.sleep(retries_interval, loop=self.loop)
3502 else:
3503 return 'FAILED', str(e)
3504
3505 return 'COMPLETED', output
3506
3507 except (LcmException, asyncio.CancelledError):
3508 raise
3509 except Exception as e:
3510 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3511
3512 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3513 """
3514 Updating the vca_status with latest juju information in nsrs record
3515 :param: nsr_id: Id of the nsr
3516 :param: nslcmop_id: Id of the nslcmop
3517 :return: None
3518 """
3519
3520 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3521 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3522 vca_id = self.get_vca_id({}, db_nsr)
3523 if db_nsr['_admin']['deployed']['K8s']:
3524 for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
3525 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
3526 await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id)
3527 else:
3528 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3529 table, filter = "nsrs", {"_id": nsr_id}
3530 path = "_admin.deployed.VCA.{}.".format(vca_index)
3531 await self._on_update_n2vc_db(table, filter, path, {})
3532
3533 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3534 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3535
3536 async def action(self, nsr_id, nslcmop_id):
3537 # Try to lock HA task here
3538 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3539 if not task_is_locked_by_me:
3540 return
3541
3542 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3543 self.logger.debug(logging_text + "Enter")
3544 # get all needed from database
3545 db_nsr = None
3546 db_nslcmop = None
3547 db_nsr_update = {}
3548 db_nslcmop_update = {}
3549 nslcmop_operation_state = None
3550 error_description_nslcmop = None
3551 exc = None
3552 try:
3553 # wait for any previous tasks in process
3554 step = "Waiting for previous operations to terminate"
3555 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3556
3557 self._write_ns_status(
3558 nsr_id=nsr_id,
3559 ns_state=None,
3560 current_operation="RUNNING ACTION",
3561 current_operation_id=nslcmop_id
3562 )
3563
3564 step = "Getting information from database"
3565 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3566 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3567
3568 nsr_deployed = db_nsr["_admin"].get("deployed")
3569 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3570 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3571 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3572 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3573 primitive = db_nslcmop["operationParams"]["primitive"]
3574 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3575 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3576
3577 if vnf_index:
3578 step = "Getting vnfr from database"
3579 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3580 step = "Getting vnfd from database"
3581 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3582 else:
3583 step = "Getting nsd from database"
3584 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3585
3586 vca_id = self.get_vca_id(db_vnfr, db_nsr)
3587 # for backward compatibility
3588 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3589 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3590 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3591 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3592
3593 # look for primitive
3594 config_primitive_desc = descriptor_configuration = None
3595 if vdu_id:
3596 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3597 elif kdu_name:
3598 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3599 elif vnf_index:
3600 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3601 else:
3602 descriptor_configuration = db_nsd.get("ns-configuration")
3603
3604 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3605 for config_primitive in descriptor_configuration["config-primitive"]:
3606 if config_primitive["name"] == primitive:
3607 config_primitive_desc = config_primitive
3608 break
3609
3610 if not config_primitive_desc:
3611 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3612 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3613 format(primitive))
3614 primitive_name = primitive
3615 ee_descriptor_id = None
3616 else:
3617 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3618 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3619
3620 if vnf_index:
3621 if vdu_id:
3622 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3623 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3624 elif kdu_name:
3625 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3626 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3627 else:
3628 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3629 else:
3630 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3631 if kdu_name and get_configuration(db_vnfd, kdu_name):
3632 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3633 actions = set()
3634 for primitive in kdu_configuration.get("initial-config-primitive", []):
3635 actions.add(primitive["name"])
3636 for primitive in kdu_configuration.get("config-primitive", []):
3637 actions.add(primitive["name"])
3638 kdu_action = True if primitive_name in actions else False
3639
3640 # TODO check if ns is in a proper status
3641 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3642 # kdur and desc_params already set from before
3643 if primitive_params:
3644 desc_params.update(primitive_params)
3645 # TODO Check if we will need something at vnf level
3646 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3647 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3648 break
3649 else:
3650 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3651
3652 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3653 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3654 raise LcmException(msg)
3655
3656 db_dict = {"collection": "nsrs",
3657 "filter": {"_id": nsr_id},
3658 "path": "_admin.deployed.K8s.{}".format(index)}
3659 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3660 step = "Executing kdu {}".format(primitive_name)
3661 if primitive_name == "upgrade":
3662 if desc_params.get("kdu_model"):
3663 kdu_model = desc_params.get("kdu_model")
3664 del desc_params["kdu_model"]
3665 else:
3666 kdu_model = kdu.get("kdu-model")
3667 parts = kdu_model.split(sep=":")
3668 if len(parts) == 2:
3669 kdu_model = parts[0]
3670
3671 detailed_status = await asyncio.wait_for(
3672 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3673 cluster_uuid=kdu.get("k8scluster-uuid"),
3674 kdu_instance=kdu.get("kdu-instance"),
3675 atomic=True, kdu_model=kdu_model,
3676 params=desc_params, db_dict=db_dict,
3677 timeout=timeout_ns_action),
3678 timeout=timeout_ns_action + 10)
3679 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3680 elif primitive_name == "rollback":
3681 detailed_status = await asyncio.wait_for(
3682 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3683 cluster_uuid=kdu.get("k8scluster-uuid"),
3684 kdu_instance=kdu.get("kdu-instance"),
3685 db_dict=db_dict),
3686 timeout=timeout_ns_action)
3687 elif primitive_name == "status":
3688 detailed_status = await asyncio.wait_for(
3689 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3690 cluster_uuid=kdu.get("k8scluster-uuid"),
3691 kdu_instance=kdu.get("kdu-instance"),
3692 vca_id=vca_id,
3693 ),
3694 timeout=timeout_ns_action
3695 )
3696 else:
3697 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3698 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3699
3700 detailed_status = await asyncio.wait_for(
3701 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3702 cluster_uuid=kdu.get("k8scluster-uuid"),
3703 kdu_instance=kdu_instance,
3704 primitive_name=primitive_name,
3705 params=params, db_dict=db_dict,
3706 timeout=timeout_ns_action,
3707 vca_id=vca_id,
3708 ),
3709 timeout=timeout_ns_action
3710 )
3711
3712 if detailed_status:
3713 nslcmop_operation_state = 'COMPLETED'
3714 else:
3715 detailed_status = ''
3716 nslcmop_operation_state = 'FAILED'
3717 else:
3718 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3719 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3720 ee_descriptor_id=ee_descriptor_id)
3721 for vca_index, vca_deployed in enumerate(db_nsr['_admin']['deployed']['VCA']):
3722 if vca_deployed.get("member-vnf-index") == vnf_index:
3723 db_dict = {"collection": "nsrs",
3724 "filter": {"_id": nsr_id},
3725 "path": "_admin.deployed.VCA.{}.".format(vca_index)}
3726 break
3727 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3728 ee_id,
3729 primitive=primitive_name,
3730 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3731 timeout=timeout_ns_action,
3732 vca_type=vca_type,
3733 db_dict=db_dict,
3734 vca_id=vca_id,
3735 )
3736
3737 db_nslcmop_update["detailed-status"] = detailed_status
3738 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3739 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3740 detailed_status))
3741 return # database update is called inside finally
3742
3743 except (DbException, LcmException, N2VCException, K8sException) as e:
3744 self.logger.error(logging_text + "Exit Exception {}".format(e))
3745 exc = e
3746 except asyncio.CancelledError:
3747 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3748 exc = "Operation was cancelled"
3749 except asyncio.TimeoutError:
3750 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3751 exc = "Timeout"
3752 except Exception as e:
3753 exc = traceback.format_exc()
3754 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3755 finally:
3756 if exc:
3757 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3758 "FAILED {}: {}".format(step, exc)
3759 nslcmop_operation_state = "FAILED"
3760 if db_nsr:
3761 self._write_ns_status(
3762 nsr_id=nsr_id,
3763 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3764 current_operation="IDLE",
3765 current_operation_id=None,
3766 # error_description=error_description_nsr,
3767 # error_detail=error_detail,
3768 other_update=db_nsr_update
3769 )
3770
3771 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3772 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3773
3774 if nslcmop_operation_state:
3775 try:
3776 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3777 "operationState": nslcmop_operation_state},
3778 loop=self.loop)
3779 except Exception as e:
3780 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3781 self.logger.debug(logging_text + "Exit")
3782 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3783 return nslcmop_operation_state, detailed_status
3784
3785 async def scale(self, nsr_id, nslcmop_id):
3786 # Try to lock HA task here
3787 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3788 if not task_is_locked_by_me:
3789 return
3790
3791 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3792 stage = ['', '', '']
3793 tasks_dict_info = {}
3794 # ^ stage, step, VIM progress
3795 self.logger.debug(logging_text + "Enter")
3796 # get all needed from database
3797 db_nsr = None
3798 db_nslcmop_update = {}
3799 db_nsr_update = {}
3800 exc = None
3801 # in case of error, indicates what part of scale was failed to put nsr at error status
3802 scale_process = None
3803 old_operational_status = ""
3804 old_config_status = ""
3805 nsi_id = None
3806 try:
3807 # wait for any previous tasks in process
3808 step = "Waiting for previous operations to terminate"
3809 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3810 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3811 current_operation="SCALING", current_operation_id=nslcmop_id)
3812
3813 step = "Getting nslcmop from database"
3814 self.logger.debug(step + " after having waited for previous tasks to be completed")
3815 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3816
3817 step = "Getting nsr from database"
3818 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3819 old_operational_status = db_nsr["operational-status"]
3820 old_config_status = db_nsr["config-status"]
3821
3822 step = "Parsing scaling parameters"
3823 db_nsr_update["operational-status"] = "scaling"
3824 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3825 nsr_deployed = db_nsr["_admin"].get("deployed")
3826
3827 #######
3828 nsr_deployed = db_nsr["_admin"].get("deployed")
3829 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3830 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3831 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3832 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3833 #######
3834
3835 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3836 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3837 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3838 # for backward compatibility
3839 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3840 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3841 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3842 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3843
3844 step = "Getting vnfr from database"
3845 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3846
3847 vca_id = self.get_vca_id(db_vnfr, db_nsr)
3848
3849 step = "Getting vnfd from database"
3850 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3851
3852 base_folder = db_vnfd["_admin"]["storage"]
3853
3854 step = "Getting scaling-group-descriptor"
3855 scaling_descriptor = find_in_list(
3856 get_scaling_aspect(
3857 db_vnfd
3858 ),
3859 lambda scale_desc: scale_desc["name"] == scaling_group
3860 )
3861 if not scaling_descriptor:
3862 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3863 "at vnfd:scaling-group-descriptor".format(scaling_group))
3864
3865 step = "Sending scale order to VIM"
3866 # TODO check if ns is in a proper status
3867 nb_scale_op = 0
3868 if not db_nsr["_admin"].get("scaling-group"):
3869 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3870 admin_scale_index = 0
3871 else:
3872 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3873 if admin_scale_info["name"] == scaling_group:
3874 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3875 break
3876 else: # not found, set index one plus last element and add new entry with the name
3877 admin_scale_index += 1
3878 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3879 RO_scaling_info = []
3880 VCA_scaling_info = []
3881 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3882 if scaling_type == "SCALE_OUT":
3883 if "aspect-delta-details" not in scaling_descriptor:
3884 raise LcmException(
3885 "Aspect delta details not fount in scaling descriptor {}".format(
3886 scaling_descriptor["name"]
3887 )
3888 )
3889 # count if max-instance-count is reached
3890 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3891
3892 vdu_scaling_info["scaling_direction"] = "OUT"
3893 vdu_scaling_info["vdu-create"] = {}
3894 for delta in deltas:
3895 for vdu_delta in delta["vdu-delta"]:
3896 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3897 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3898 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3899 if cloud_init_text:
3900 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3901 cloud_init_list = []
3902
3903 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3904 max_instance_count = 10
3905 if vdu_profile and "max-number-of-instances" in vdu_profile:
3906 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3907
3908 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3909
3910 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3911
3912 if nb_scale_op + default_instance_num > max_instance_count:
3913 raise LcmException(
3914 "reached the limit of {} (max-instance-count) "
3915 "scaling-out operations for the "
3916 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3917 )
3918 for x in range(vdu_delta.get("number-of-instances", 1)):
3919 if cloud_init_text:
3920 # TODO Information of its own ip is not available because db_vnfr is not updated.
3921 additional_params["OSM"] = get_osm_params(
3922 db_vnfr,
3923 vdu_delta["id"],
3924 vdu_index + x
3925 )
3926 cloud_init_list.append(
3927 self._parse_cloud_init(
3928 cloud_init_text,
3929 additional_params,
3930 db_vnfd["id"],
3931 vdud["id"]
3932 )
3933 )
3934 VCA_scaling_info.append(
3935 {
3936 "osm_vdu_id": vdu_delta["id"],
3937 "member-vnf-index": vnf_index,
3938 "type": "create",
3939 "vdu_index": vdu_index + x
3940 }
3941 )
3942 RO_scaling_info.append(
3943 {
3944 "osm_vdu_id": vdu_delta["id"],
3945 "member-vnf-index": vnf_index,
3946 "type": "create",
3947 "count": vdu_delta.get("number-of-instances", 1)
3948 }
3949 )
3950 if cloud_init_list:
3951 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3952 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3953
3954 elif scaling_type == "SCALE_IN":
3955 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3956 min_instance_count = int(scaling_descriptor["min-instance-count"])
3957
3958 vdu_scaling_info["scaling_direction"] = "IN"
3959 vdu_scaling_info["vdu-delete"] = {}
3960 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3961 for delta in deltas:
3962 for vdu_delta in delta["vdu-delta"]:
3963 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3964 min_instance_count = 0
3965 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3966 if vdu_profile and "min-number-of-instances" in vdu_profile:
3967 min_instance_count = vdu_profile["min-number-of-instances"]
3968
3969 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3970
3971 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3972 if nb_scale_op + default_instance_num < min_instance_count:
3973 raise LcmException(
3974 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3975 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3976 )
3977 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3978 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3979 "vdu_index": vdu_index - 1})
3980 for x in range(vdu_delta.get("number-of-instances", 1)):
3981 VCA_scaling_info.append(
3982 {
3983 "osm_vdu_id": vdu_delta["id"],
3984 "member-vnf-index": vnf_index,
3985 "type": "delete",
3986 "vdu_index": vdu_index - 1 - x
3987 }
3988 )
3989 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3990
3991 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3992 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3993 if vdu_scaling_info["scaling_direction"] == "IN":
3994 for vdur in reversed(db_vnfr["vdur"]):
3995 if vdu_delete.get(vdur["vdu-id-ref"]):
3996 vdu_delete[vdur["vdu-id-ref"]] -= 1
3997 vdu_scaling_info["vdu"].append({
3998 "name": vdur.get("name") or vdur.get("vdu-name"),
3999 "vdu_id": vdur["vdu-id-ref"],
4000 "interface": []
4001 })
4002 for interface in vdur["interfaces"]:
4003 vdu_scaling_info["vdu"][-1]["interface"].append({
4004 "name": interface["name"],
4005 "ip_address": interface["ip-address"],
4006 "mac_address": interface.get("mac-address"),
4007 })
4008 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
4009
4010 # PRE-SCALE BEGIN
4011 step = "Executing pre-scale vnf-config-primitive"
4012 if scaling_descriptor.get("scaling-config-action"):
4013 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4014 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4015 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4016 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4017 step = db_nslcmop_update["detailed-status"] = \
4018 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4019
4020 # look for primitive
4021 for config_primitive in (get_configuration(
4022 db_vnfd, db_vnfd["id"]
4023 ) or {}).get("config-primitive", ()):
4024 if config_primitive["name"] == vnf_config_primitive:
4025 break
4026 else:
4027 raise LcmException(
4028 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4029 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4030 "primitive".format(scaling_group, vnf_config_primitive))
4031
4032 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4033 if db_vnfr.get("additionalParamsForVnf"):
4034 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4035
4036 scale_process = "VCA"
4037 db_nsr_update["config-status"] = "configuring pre-scaling"
4038 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4039
4040 # Pre-scale retry check: Check if this sub-operation has been executed before
4041 op_index = self._check_or_add_scale_suboperation(
4042 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4043 if op_index == self.SUBOPERATION_STATUS_SKIP:
4044 # Skip sub-operation
4045 result = 'COMPLETED'
4046 result_detail = 'Done'
4047 self.logger.debug(logging_text +
4048 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4049 vnf_config_primitive, result, result_detail))
4050 else:
4051 if op_index == self.SUBOPERATION_STATUS_NEW:
4052 # New sub-operation: Get index of this sub-operation
4053 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4054 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4055 format(vnf_config_primitive))
4056 else:
4057 # retry: Get registered params for this existing sub-operation
4058 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4059 vnf_index = op.get('member_vnf_index')
4060 vnf_config_primitive = op.get('primitive')
4061 primitive_params = op.get('primitive_params')
4062 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4063 format(vnf_config_primitive))
4064 # Execute the primitive, either with new (first-time) or registered (reintent) args
4065 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4066 primitive_name = config_primitive.get("execution-environment-primitive",
4067 vnf_config_primitive)
4068 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4069 member_vnf_index=vnf_index,
4070 vdu_id=None,
4071 vdu_count_index=None,
4072 ee_descriptor_id=ee_descriptor_id)
4073 result, result_detail = await self._ns_execute_primitive(
4074 ee_id, primitive_name,
4075 primitive_params,
4076 vca_type=vca_type,
4077 vca_id=vca_id,
4078 )
4079 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4080 vnf_config_primitive, result, result_detail))
4081 # Update operationState = COMPLETED | FAILED
4082 self._update_suboperation_status(
4083 db_nslcmop, op_index, result, result_detail)
4084
4085 if result == "FAILED":
4086 raise LcmException(result_detail)
4087 db_nsr_update["config-status"] = old_config_status
4088 scale_process = None
4089 # PRE-SCALE END
4090
4091 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4092 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4093
4094 # SCALE-IN VCA - BEGIN
4095 if VCA_scaling_info:
4096 step = db_nslcmop_update["detailed-status"] = \
4097 "Deleting the execution environments"
4098 scale_process = "VCA"
4099 for vdu_info in VCA_scaling_info:
4100 if vdu_info["type"] == "delete":
4101 member_vnf_index = str(vdu_info["member-vnf-index"])
4102 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4103 vdu_id = vdu_info["osm_vdu_id"]
4104 vdu_index = int(vdu_info["vdu_index"])
4105 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4106 member_vnf_index, vdu_id, vdu_index)
4107 stage[2] = step = "Scaling in VCA"
4108 self._write_op_status(
4109 op_id=nslcmop_id,
4110 stage=stage
4111 )
4112 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
4113 config_update = db_nsr["configurationStatus"]
4114 for vca_index, vca in enumerate(vca_update):
4115 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
4116 vca["vdu_count_index"] == vdu_index:
4117 if vca.get("vdu_id"):
4118 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4119 elif vca.get("kdu_name"):
4120 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4121 else:
4122 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4123 operation_params = db_nslcmop.get("operationParams") or {}
4124 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
4125 vca.get("needed_terminate"))
4126 task = asyncio.ensure_future(
4127 asyncio.wait_for(
4128 self.destroy_N2VC(
4129 logging_text,
4130 db_nslcmop,
4131 vca,
4132 config_descriptor,
4133 vca_index,
4134 destroy_ee=True,
4135 exec_primitives=exec_terminate_primitives,
4136 scaling_in=True,
4137 vca_id=vca_id,
4138 ),
4139 timeout=self.timeout_charm_delete
4140 )
4141 )
4142 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4143 del vca_update[vca_index]
4144 del config_update[vca_index]
4145 # wait for pending tasks of terminate primitives
4146 if tasks_dict_info:
4147 self.logger.debug(logging_text +
4148 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
4149 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
4150 min(self.timeout_charm_delete,
4151 self.timeout_ns_terminate),
4152 stage, nslcmop_id)
4153 tasks_dict_info.clear()
4154 if error_list:
4155 raise LcmException("; ".join(error_list))
4156
4157 db_vca_and_config_update = {
4158 "_admin.deployed.VCA": vca_update,
4159 "configurationStatus": config_update
4160 }
4161 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
4162 scale_process = None
4163 # SCALE-IN VCA - END
4164
4165 # SCALE RO - BEGIN
4166 if RO_scaling_info:
4167 scale_process = "RO"
4168 if self.ro_config.get("ng"):
4169 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4170 vdu_scaling_info.pop("vdu-create", None)
4171 vdu_scaling_info.pop("vdu-delete", None)
4172
4173 scale_process = None
4174 if db_nsr_update:
4175 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4176 # SCALE RO - END
4177
4178 # SCALE-UP VCA - BEGIN
4179 if VCA_scaling_info:
4180 step = db_nslcmop_update["detailed-status"] = \
4181 "Creating new execution environments"
4182 scale_process = "VCA"
4183 for vdu_info in VCA_scaling_info:
4184 if vdu_info["type"] == "create":
4185 member_vnf_index = str(vdu_info["member-vnf-index"])
4186 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4187 vnfd_id = db_vnfr["vnfd-ref"]
4188 vdu_index = int(vdu_info["vdu_index"])
4189 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4190 if db_vnfr.get("additionalParamsForVnf"):
4191 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4192 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4193 if descriptor_config:
4194 vdu_id = None
4195 vdu_name = None
4196 kdu_name = None
4197 self._deploy_n2vc(
4198 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4199 db_nsr=db_nsr,
4200 db_vnfr=db_vnfr,
4201 nslcmop_id=nslcmop_id,
4202 nsr_id=nsr_id,
4203 nsi_id=nsi_id,
4204 vnfd_id=vnfd_id,
4205 vdu_id=vdu_id,
4206 kdu_name=kdu_name,
4207 member_vnf_index=member_vnf_index,
4208 vdu_index=vdu_index,
4209 vdu_name=vdu_name,
4210 deploy_params=deploy_params,
4211 descriptor_config=descriptor_config,
4212 base_folder=base_folder,
4213 task_instantiation_info=tasks_dict_info,
4214 stage=stage
4215 )
4216 vdu_id = vdu_info["osm_vdu_id"]
4217 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4218 descriptor_config = get_configuration(db_vnfd, vdu_id)
4219 if vdur.get("additionalParams"):
4220 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4221 else:
4222 deploy_params_vdu = deploy_params
4223 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4224 if descriptor_config:
4225 vdu_name = None
4226 kdu_name = None
4227 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4228 member_vnf_index, vdu_id, vdu_index)
4229 stage[2] = step = "Scaling out VCA"
4230 self._write_op_status(
4231 op_id=nslcmop_id,
4232 stage=stage
4233 )
4234 self._deploy_n2vc(
4235 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4236 member_vnf_index, vdu_id, vdu_index),
4237 db_nsr=db_nsr,
4238 db_vnfr=db_vnfr,
4239 nslcmop_id=nslcmop_id,
4240 nsr_id=nsr_id,
4241 nsi_id=nsi_id,
4242 vnfd_id=vnfd_id,
4243 vdu_id=vdu_id,
4244 kdu_name=kdu_name,
4245 member_vnf_index=member_vnf_index,
4246 vdu_index=vdu_index,
4247 vdu_name=vdu_name,
4248 deploy_params=deploy_params_vdu,
4249 descriptor_config=descriptor_config,
4250 base_folder=base_folder,
4251 task_instantiation_info=tasks_dict_info,
4252 stage=stage
4253 )
4254 # SCALE-UP VCA - END
4255 scale_process = None
4256
4257 # POST-SCALE BEGIN
4258 # execute primitive service POST-SCALING
4259 step = "Executing post-scale vnf-config-primitive"
4260 if scaling_descriptor.get("scaling-config-action"):
4261 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4262 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4263 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4264 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4265 step = db_nslcmop_update["detailed-status"] = \
4266 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4267
4268 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4269 if db_vnfr.get("additionalParamsForVnf"):
4270 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4271
4272 # look for primitive
4273 for config_primitive in (
4274 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4275 ).get("config-primitive", ()):
4276 if config_primitive["name"] == vnf_config_primitive:
4277 break
4278 else:
4279 raise LcmException(
4280 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4281 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4282 "config-primitive".format(scaling_group, vnf_config_primitive))
4283 scale_process = "VCA"
4284 db_nsr_update["config-status"] = "configuring post-scaling"
4285 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4286
4287 # Post-scale retry check: Check if this sub-operation has been executed before
4288 op_index = self._check_or_add_scale_suboperation(
4289 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4290 if op_index == self.SUBOPERATION_STATUS_SKIP:
4291 # Skip sub-operation
4292 result = 'COMPLETED'
4293 result_detail = 'Done'
4294 self.logger.debug(logging_text +
4295 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4296 format(vnf_config_primitive, result, result_detail))
4297 else:
4298 if op_index == self.SUBOPERATION_STATUS_NEW:
4299 # New sub-operation: Get index of this sub-operation
4300 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4301 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4302 format(vnf_config_primitive))
4303 else:
4304 # retry: Get registered params for this existing sub-operation
4305 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4306 vnf_index = op.get('member_vnf_index')
4307 vnf_config_primitive = op.get('primitive')
4308 primitive_params = op.get('primitive_params')
4309 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4310 format(vnf_config_primitive))
4311 # Execute the primitive, either with new (first-time) or registered (reintent) args
4312 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4313 primitive_name = config_primitive.get("execution-environment-primitive",
4314 vnf_config_primitive)
4315 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4316 member_vnf_index=vnf_index,
4317 vdu_id=None,
4318 vdu_count_index=None,
4319 ee_descriptor_id=ee_descriptor_id)
4320 result, result_detail = await self._ns_execute_primitive(
4321 ee_id,
4322 primitive_name,
4323 primitive_params,
4324 vca_type=vca_type,
4325 vca_id=vca_id,
4326 )
4327 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4328 vnf_config_primitive, result, result_detail))
4329 # Update operationState = COMPLETED | FAILED
4330 self._update_suboperation_status(
4331 db_nslcmop, op_index, result, result_detail)
4332
4333 if result == "FAILED":
4334 raise LcmException(result_detail)
4335 db_nsr_update["config-status"] = old_config_status
4336 scale_process = None
4337 # POST-SCALE END
4338
4339 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4340 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4341 else old_operational_status
4342 db_nsr_update["config-status"] = old_config_status
4343 return
4344 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4345 self.logger.error(logging_text + "Exit Exception {}".format(e))
4346 exc = e
4347 except asyncio.CancelledError:
4348 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4349 exc = "Operation was cancelled"
4350 except Exception as e:
4351 exc = traceback.format_exc()
4352 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4353 finally:
4354 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4355 if tasks_dict_info:
4356 stage[1] = "Waiting for instantiate pending tasks."
4357 self.logger.debug(logging_text + stage[1])
4358 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4359 stage, nslcmop_id, nsr_id=nsr_id)
4360 if exc:
4361 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4362 nslcmop_operation_state = "FAILED"
4363 if db_nsr:
4364 db_nsr_update["operational-status"] = old_operational_status
4365 db_nsr_update["config-status"] = old_config_status
4366 db_nsr_update["detailed-status"] = ""
4367 if scale_process:
4368 if "VCA" in scale_process:
4369 db_nsr_update["config-status"] = "failed"
4370 if "RO" in scale_process:
4371 db_nsr_update["operational-status"] = "failed"
4372 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4373 exc)
4374 else:
4375 error_description_nslcmop = None
4376 nslcmop_operation_state = "COMPLETED"
4377 db_nslcmop_update["detailed-status"] = "Done"
4378
4379 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4380 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4381 if db_nsr:
4382 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4383 current_operation_id=None, other_update=db_nsr_update)
4384
4385 if nslcmop_operation_state:
4386 try:
4387 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4388 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4389 except Exception as e:
4390 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4391 self.logger.debug(logging_text + "Exit")
4392 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4393
4394 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4395 nsr_id = db_nslcmop["nsInstanceId"]
4396 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4397 db_vnfrs = {}
4398
4399 # read from db: vnfd's for every vnf
4400 db_vnfds = []
4401
4402 # for each vnf in ns, read vnfd
4403 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4404 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4405 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4406 # if we haven't this vnfd, read it from db
4407 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4408 # read from db
4409 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4410 db_vnfds.append(vnfd)
4411 n2vc_key = self.n2vc.get_public_key()
4412 n2vc_key_list = [n2vc_key]
4413 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4414 mark_delete=True)
4415 # db_vnfr has been updated, update db_vnfrs to use it
4416 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4417 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4418 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4419 timeout_ns_deploy=self.timeout_ns_deploy)
4420 if vdu_scaling_info.get("vdu-delete"):
4421 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4422
4423 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4424 if not self.prometheus:
4425 return
4426 # look if exist a file called 'prometheus*.j2' and
4427 artifact_content = self.fs.dir_ls(artifact_path)
4428 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4429 if not job_file:
4430 return
4431 with self.fs.file_open((artifact_path, job_file), "r") as f:
4432 job_data = f.read()
4433
4434 # TODO get_service
4435 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4436 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4437 host_port = "80"
4438 vnfr_id = vnfr_id.replace("-", "")
4439 variables = {
4440 "JOB_NAME": vnfr_id,
4441 "TARGET_IP": target_ip,
4442 "EXPORTER_POD_IP": host_name,
4443 "EXPORTER_POD_PORT": host_port,
4444 }
4445 job_list = self.prometheus.parse_job(job_data, variables)
4446 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4447 for job in job_list:
4448 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4449 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4450 job["nsr_id"] = nsr_id
4451 job_dict = {jl["job_name"]: jl for jl in job_list}
4452 if await self.prometheus.update(job_dict):
4453 return list(job_dict.keys())
4454
4455 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4456 """
4457 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4458
4459 :param: vim_account_id: VIM Account ID
4460
4461 :return: (cloud_name, cloud_credential)
4462 """
4463 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4464 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4465
4466 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4467 """
4468 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4469
4470 :param: vim_account_id: VIM Account ID
4471
4472 :return: (cloud_name, cloud_credential)
4473 """
4474 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4475 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")