fix/feat(relations): external connection point ref now works with multiple KDU
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 on_update_db=self._on_update_n2vc_db,
101 fs=self.fs,
102 db=self.db
103 )
104
105 self.conn_helm_ee = LCMHelmConn(
106 log=self.logger,
107 loop=self.loop,
108 vca_config=self.vca_config,
109 on_update_db=self._on_update_n2vc_db
110 )
111
112 self.k8sclusterhelm2 = K8sHelmConnector(
113 kubectl_command=self.vca_config.get("kubectlpath"),
114 helm_command=self.vca_config.get("helmpath"),
115 log=self.logger,
116 on_update_db=None,
117 fs=self.fs,
118 db=self.db
119 )
120
121 self.k8sclusterhelm3 = K8sHelm3Connector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helm3path"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 log=self.logger,
134 loop=self.loop,
135 on_update_db=self._on_update_k8s_db,
136 fs=self.fs,
137 db=self.db
138 )
139
140 self.k8scluster_map = {
141 "helm-chart": self.k8sclusterhelm2,
142 "helm-chart-v3": self.k8sclusterhelm3,
143 "chart": self.k8sclusterhelm3,
144 "juju-bundle": self.k8sclusterjuju,
145 "juju": self.k8sclusterjuju,
146 }
147
148 self.vca_map = {
149 "lxc_proxy_charm": self.n2vc,
150 "native_charm": self.n2vc,
151 "k8s_proxy_charm": self.n2vc,
152 "helm": self.conn_helm_ee,
153 "helm-v3": self.conn_helm_ee
154 }
155
156 self.prometheus = prometheus
157
158 # create RO client
159 self.RO = NgRoClient(self.loop, **self.ro_config)
160
161 @staticmethod
162 def increment_ip_mac(ip_mac, vm_index=1):
163 if not isinstance(ip_mac, str):
164 return ip_mac
165 try:
166 # try with ipv4 look for last dot
167 i = ip_mac.rfind(".")
168 if i > 0:
169 i += 1
170 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
171 # try with ipv6 or mac look for last colon. Operate in hex
172 i = ip_mac.rfind(":")
173 if i > 0:
174 i += 1
175 # format in hex, len can be 2 for mac or 4 for ipv6
176 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
177 except Exception:
178 pass
179 return None
180
181 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
182
183 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
184
185 try:
186 # TODO filter RO descriptor fields...
187
188 # write to database
189 db_dict = dict()
190 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
191 db_dict['deploymentStatus'] = ro_descriptor
192 self.update_db_2("nsrs", nsrs_id, db_dict)
193
194 except Exception as e:
195 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
196
197 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
198
199 # remove last dot from path (if exists)
200 if path.endswith('.'):
201 path = path[:-1]
202
203 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
204 # .format(table, filter, path, updated_data))
205 try:
206
207 nsr_id = filter.get('_id')
208
209 # read ns record from database
210 nsr = self.db.get_one(table='nsrs', q_filter=filter)
211 current_ns_status = nsr.get('nsState')
212
213 # get vca status for NS
214 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id)
215
216 # vcaStatus
217 db_dict = dict()
218 db_dict['vcaStatus'] = status_dict
219 await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id)
220
221 # update configurationStatus for this VCA
222 try:
223 vca_index = int(path[path.rfind(".")+1:])
224
225 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
226 vca_status = vca_list[vca_index].get('status')
227
228 configuration_status_list = nsr.get('configurationStatus')
229 config_status = configuration_status_list[vca_index].get('status')
230
231 if config_status == 'BROKEN' and vca_status != 'failed':
232 db_dict['configurationStatus'][vca_index] = 'READY'
233 elif config_status != 'BROKEN' and vca_status == 'failed':
234 db_dict['configurationStatus'][vca_index] = 'BROKEN'
235 except Exception as e:
236 # not update configurationStatus
237 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
238
239 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
240 # if nsState = 'DEGRADED' check if all is OK
241 is_degraded = False
242 if current_ns_status in ('READY', 'DEGRADED'):
243 error_description = ''
244 # check machines
245 if status_dict.get('machines'):
246 for machine_id in status_dict.get('machines'):
247 machine = status_dict.get('machines').get(machine_id)
248 # check machine agent-status
249 if machine.get('agent-status'):
250 s = machine.get('agent-status').get('status')
251 if s != 'started':
252 is_degraded = True
253 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
254 # check machine instance status
255 if machine.get('instance-status'):
256 s = machine.get('instance-status').get('status')
257 if s != 'running':
258 is_degraded = True
259 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
260 # check applications
261 if status_dict.get('applications'):
262 for app_id in status_dict.get('applications'):
263 app = status_dict.get('applications').get(app_id)
264 # check application status
265 if app.get('status'):
266 s = app.get('status').get('status')
267 if s != 'active':
268 is_degraded = True
269 error_description += 'application {} status={} ; '.format(app_id, s)
270
271 if error_description:
272 db_dict['errorDescription'] = error_description
273 if current_ns_status == 'READY' and is_degraded:
274 db_dict['nsState'] = 'DEGRADED'
275 if current_ns_status == 'DEGRADED' and not is_degraded:
276 db_dict['nsState'] = 'READY'
277
278 # write to database
279 self.update_db_2("nsrs", nsr_id, db_dict)
280
281 except (asyncio.CancelledError, asyncio.TimeoutError):
282 raise
283 except Exception as e:
284 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
285
286 async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None):
287 """
288 Updating vca status in NSR record
289 :param cluster_uuid: UUID of a k8s cluster
290 :param kdu_instance: The unique name of the KDU instance
291 :param filter: To get nsr_id
292 :return: none
293 """
294
295 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
296 # .format(cluster_uuid, kdu_instance, filter))
297
298 try:
299 nsr_id = filter.get('_id')
300
301 # get vca status for NS
302 vca_status = await self.k8sclusterjuju.status_kdu(
303 cluster_uuid,
304 kdu_instance,
305 complete_status=True,
306 yaml_format=False,
307 vca_id=vca_id,
308 )
309 # vcaStatus
310 db_dict = dict()
311 db_dict['vcaStatus'] = {nsr_id: vca_status}
312
313 await self.k8sclusterjuju.update_vca_status(
314 db_dict['vcaStatus'],
315 kdu_instance,
316 vca_id=vca_id,
317 )
318
319 # write to database
320 self.update_db_2("nsrs", nsr_id, db_dict)
321
322 except (asyncio.CancelledError, asyncio.TimeoutError):
323 raise
324 except Exception as e:
325 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
326
327 @staticmethod
328 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
329 try:
330 env = Environment(undefined=StrictUndefined)
331 template = env.from_string(cloud_init_text)
332 return template.render(additional_params or {})
333 except UndefinedError as e:
334 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
335 "file, must be provided in the instantiation parameters inside the "
336 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
337 except (TemplateError, TemplateNotFound) as e:
338 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
339 format(vnfd_id, vdu_id, e))
340
341 def _get_vdu_cloud_init_content(self, vdu, vnfd):
342 cloud_init_content = cloud_init_file = None
343 try:
344 if vdu.get("cloud-init-file"):
345 base_folder = vnfd["_admin"]["storage"]
346 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
347 vdu["cloud-init-file"])
348 with self.fs.file_open(cloud_init_file, "r") as ci_file:
349 cloud_init_content = ci_file.read()
350 elif vdu.get("cloud-init"):
351 cloud_init_content = vdu["cloud-init"]
352
353 return cloud_init_content
354 except FsException as e:
355 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
356 format(vnfd["id"], vdu["id"], cloud_init_file, e))
357
358 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
359 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
360 additional_params = vdur.get("additionalParams")
361 return parse_yaml_strings(additional_params)
362
363 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
364 """
365 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
366 :param vnfd: input vnfd
367 :param new_id: overrides vnf id if provided
368 :param additionalParams: Instantiation params for VNFs provided
369 :param nsrId: Id of the NSR
370 :return: copy of vnfd
371 """
372 vnfd_RO = deepcopy(vnfd)
373 # remove unused by RO configuration, monitoring, scaling and internal keys
374 vnfd_RO.pop("_id", None)
375 vnfd_RO.pop("_admin", None)
376 vnfd_RO.pop("monitoring-param", None)
377 vnfd_RO.pop("scaling-group-descriptor", None)
378 vnfd_RO.pop("kdu", None)
379 vnfd_RO.pop("k8s-cluster", None)
380 if new_id:
381 vnfd_RO["id"] = new_id
382
383 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
384 for vdu in get_iterable(vnfd_RO, "vdu"):
385 vdu.pop("cloud-init-file", None)
386 vdu.pop("cloud-init", None)
387 return vnfd_RO
388
389 @staticmethod
390 def ip_profile_2_RO(ip_profile):
391 RO_ip_profile = deepcopy(ip_profile)
392 if "dns-server" in RO_ip_profile:
393 if isinstance(RO_ip_profile["dns-server"], list):
394 RO_ip_profile["dns-address"] = []
395 for ds in RO_ip_profile.pop("dns-server"):
396 RO_ip_profile["dns-address"].append(ds['address'])
397 else:
398 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
399 if RO_ip_profile.get("ip-version") == "ipv4":
400 RO_ip_profile["ip-version"] = "IPv4"
401 if RO_ip_profile.get("ip-version") == "ipv6":
402 RO_ip_profile["ip-version"] = "IPv6"
403 if "dhcp-params" in RO_ip_profile:
404 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
405 return RO_ip_profile
406
407 def _get_ro_vim_id_for_vim_account(self, vim_account):
408 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
409 if db_vim["_admin"]["operationalState"] != "ENABLED":
410 raise LcmException("VIM={} is not available. operationalState={}".format(
411 vim_account, db_vim["_admin"]["operationalState"]))
412 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
413 return RO_vim_id
414
415 def get_ro_wim_id_for_wim_account(self, wim_account):
416 if isinstance(wim_account, str):
417 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
418 if db_wim["_admin"]["operationalState"] != "ENABLED":
419 raise LcmException("WIM={} is not available. operationalState={}".format(
420 wim_account, db_wim["_admin"]["operationalState"]))
421 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
422 return RO_wim_id
423 else:
424 return wim_account
425
426 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
427
428 db_vdu_push_list = []
429 db_update = {"_admin.modified": time()}
430 if vdu_create:
431 for vdu_id, vdu_count in vdu_create.items():
432 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
433 if not vdur:
434 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
435 format(vdu_id))
436
437 for count in range(vdu_count):
438 vdur_copy = deepcopy(vdur)
439 vdur_copy["status"] = "BUILD"
440 vdur_copy["status-detailed"] = None
441 vdur_copy["ip-address"]: None
442 vdur_copy["_id"] = str(uuid4())
443 vdur_copy["count-index"] += count + 1
444 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
445 vdur_copy.pop("vim_info", None)
446 for iface in vdur_copy["interfaces"]:
447 if iface.get("fixed-ip"):
448 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
449 else:
450 iface.pop("ip-address", None)
451 if iface.get("fixed-mac"):
452 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
453 else:
454 iface.pop("mac-address", None)
455 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
456 db_vdu_push_list.append(vdur_copy)
457 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
458 if vdu_delete:
459 for vdu_id, vdu_count in vdu_delete.items():
460 if mark_delete:
461 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
462 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
463 else:
464 # it must be deleted one by one because common.db does not allow otherwise
465 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
466 for vdu in vdus_to_delete[:vdu_count]:
467 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
468 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
469 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
470 # modify passed dictionary db_vnfr
471 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
472 db_vnfr["vdur"] = db_vnfr_["vdur"]
473
474 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
475 """
476 Updates database nsr with the RO info for the created vld
477 :param ns_update_nsr: dictionary to be filled with the updated info
478 :param db_nsr: content of db_nsr. This is also modified
479 :param nsr_desc_RO: nsr descriptor from RO
480 :return: Nothing, LcmException is raised on errors
481 """
482
483 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
484 for net_RO in get_iterable(nsr_desc_RO, "nets"):
485 if vld["id"] != net_RO.get("ns_net_osm_id"):
486 continue
487 vld["vim-id"] = net_RO.get("vim_net_id")
488 vld["name"] = net_RO.get("vim_name")
489 vld["status"] = net_RO.get("status")
490 vld["status-detailed"] = net_RO.get("error_msg")
491 ns_update_nsr["vld.{}".format(vld_index)] = vld
492 break
493 else:
494 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
495
496 def set_vnfr_at_error(self, db_vnfrs, error_text):
497 try:
498 for db_vnfr in db_vnfrs.values():
499 vnfr_update = {"status": "ERROR"}
500 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
501 if "status" not in vdur:
502 vdur["status"] = "ERROR"
503 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
504 if error_text:
505 vdur["status-detailed"] = str(error_text)
506 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
507 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
508 except DbException as e:
509 self.logger.error("Cannot update vnf. {}".format(e))
510
511 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
512 """
513 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
514 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
515 :param nsr_desc_RO: nsr descriptor from RO
516 :return: Nothing, LcmException is raised on errors
517 """
518 for vnf_index, db_vnfr in db_vnfrs.items():
519 for vnf_RO in nsr_desc_RO["vnfs"]:
520 if vnf_RO["member_vnf_index"] != vnf_index:
521 continue
522 vnfr_update = {}
523 if vnf_RO.get("ip_address"):
524 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
525 elif not db_vnfr.get("ip-address"):
526 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
527 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
528
529 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
530 vdur_RO_count_index = 0
531 if vdur.get("pdu-type"):
532 continue
533 for vdur_RO in get_iterable(vnf_RO, "vms"):
534 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
535 continue
536 if vdur["count-index"] != vdur_RO_count_index:
537 vdur_RO_count_index += 1
538 continue
539 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
540 if vdur_RO.get("ip_address"):
541 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
542 else:
543 vdur["ip-address"] = None
544 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
545 vdur["name"] = vdur_RO.get("vim_name")
546 vdur["status"] = vdur_RO.get("status")
547 vdur["status-detailed"] = vdur_RO.get("error_msg")
548 for ifacer in get_iterable(vdur, "interfaces"):
549 for interface_RO in get_iterable(vdur_RO, "interfaces"):
550 if ifacer["name"] == interface_RO.get("internal_name"):
551 ifacer["ip-address"] = interface_RO.get("ip_address")
552 ifacer["mac-address"] = interface_RO.get("mac_address")
553 break
554 else:
555 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
556 "from VIM info"
557 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
558 vnfr_update["vdur.{}".format(vdu_index)] = vdur
559 break
560 else:
561 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
562 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
563
564 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
565 for net_RO in get_iterable(nsr_desc_RO, "nets"):
566 if vld["id"] != net_RO.get("vnf_net_osm_id"):
567 continue
568 vld["vim-id"] = net_RO.get("vim_net_id")
569 vld["name"] = net_RO.get("vim_name")
570 vld["status"] = net_RO.get("status")
571 vld["status-detailed"] = net_RO.get("error_msg")
572 vnfr_update["vld.{}".format(vld_index)] = vld
573 break
574 else:
575 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
576 vnf_index, vld["id"]))
577
578 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
579 break
580
581 else:
582 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
583
584 def _get_ns_config_info(self, nsr_id):
585 """
586 Generates a mapping between vnf,vdu elements and the N2VC id
587 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
588 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
589 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
590 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
591 """
592 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
593 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
594 mapping = {}
595 ns_config_info = {"osm-config-mapping": mapping}
596 for vca in vca_deployed_list:
597 if not vca["member-vnf-index"]:
598 continue
599 if not vca["vdu_id"]:
600 mapping[vca["member-vnf-index"]] = vca["application"]
601 else:
602 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
603 vca["application"]
604 return ns_config_info
605
606 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
607 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
608
609 db_vims = {}
610
611 def get_vim_account(vim_account_id):
612 nonlocal db_vims
613 if vim_account_id in db_vims:
614 return db_vims[vim_account_id]
615 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
616 db_vims[vim_account_id] = db_vim
617 return db_vim
618
619 # modify target_vld info with instantiation parameters
620 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
621 if vld_params.get("ip-profile"):
622 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
623 if vld_params.get("provider-network"):
624 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
625 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
626 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
627 if vld_params.get("wimAccountId"):
628 target_wim = "wim:{}".format(vld_params["wimAccountId"])
629 target_vld["vim_info"][target_wim] = {}
630 for param in ("vim-network-name", "vim-network-id"):
631 if vld_params.get(param):
632 if isinstance(vld_params[param], dict):
633 for vim, vim_net in vld_params[param].items():
634 other_target_vim = "vim:" + vim
635 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
636 else: # isinstance str
637 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
638 if vld_params.get("common_id"):
639 target_vld["common_id"] = vld_params.get("common_id")
640
641 nslcmop_id = db_nslcmop["_id"]
642 target = {
643 "name": db_nsr["name"],
644 "ns": {"vld": []},
645 "vnf": [],
646 "image": deepcopy(db_nsr["image"]),
647 "flavor": deepcopy(db_nsr["flavor"]),
648 "action_id": nslcmop_id,
649 "cloud_init_content": {},
650 }
651 for image in target["image"]:
652 image["vim_info"] = {}
653 for flavor in target["flavor"]:
654 flavor["vim_info"] = {}
655
656 if db_nslcmop.get("lcmOperationType") != "instantiate":
657 # get parameters of instantiation:
658 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
659 "lcmOperationType": "instantiate"})[-1]
660 ns_params = db_nslcmop_instantiate.get("operationParams")
661 else:
662 ns_params = db_nslcmop.get("operationParams")
663 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
664 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
665
666 cp2target = {}
667 for vld_index, vld in enumerate(db_nsr.get("vld")):
668 target_vim = "vim:{}".format(ns_params["vimAccountId"])
669 target_vld = {
670 "id": vld["id"],
671 "name": vld["name"],
672 "mgmt-network": vld.get("mgmt-network", False),
673 "type": vld.get("type"),
674 "vim_info": {
675 target_vim: {
676 "vim_network_name": vld.get("vim-network-name"),
677 "vim_account_id": ns_params["vimAccountId"]
678 }
679 }
680 }
681 # check if this network needs SDN assist
682 if vld.get("pci-interfaces"):
683 db_vim = get_vim_account(ns_params["vimAccountId"])
684 sdnc_id = db_vim["config"].get("sdn-controller")
685 if sdnc_id:
686 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
687 target_sdn = "sdn:{}".format(sdnc_id)
688 target_vld["vim_info"][target_sdn] = {
689 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
690
691 nsd_vnf_profiles = get_vnf_profiles(nsd)
692 for nsd_vnf_profile in nsd_vnf_profiles:
693 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
694 if cp["virtual-link-profile-id"] == vld["id"]:
695 cp2target["member_vnf:{}.{}".format(
696 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
697 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
698 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
699
700 # check at nsd descriptor, if there is an ip-profile
701 vld_params = {}
702 nsd_vlp = find_in_list(
703 get_virtual_link_profiles(nsd),
704 lambda a_link_profile: a_link_profile["virtual-link-desc-id"] == vld["id"])
705 if nsd_vlp and nsd_vlp.get("virtual-link-protocol-data") and \
706 nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
707 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
708 ip_profile_dest_data = {}
709 if "ip-version" in ip_profile_source_data:
710 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
711 if "cidr" in ip_profile_source_data:
712 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
713 if "gateway-ip" in ip_profile_source_data:
714 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
715 if "dhcp-enabled" in ip_profile_source_data:
716 ip_profile_dest_data["dhcp-params"] = {
717 "enabled": ip_profile_source_data["dhcp-enabled"]
718 }
719 vld_params["ip-profile"] = ip_profile_dest_data
720
721 # update vld_params with instantiation params
722 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
723 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
724 if vld_instantiation_params:
725 vld_params.update(vld_instantiation_params)
726 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
727 target["ns"]["vld"].append(target_vld)
728
729 for vnfr in db_vnfrs.values():
730 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
731 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
732 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
733 target_vnf = deepcopy(vnfr)
734 target_vim = "vim:{}".format(vnfr["vim-account-id"])
735 for vld in target_vnf.get("vld", ()):
736 # check if connected to a ns.vld, to fill target'
737 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
738 lambda cpd: cpd.get("id") == vld["id"])
739 if vnf_cp:
740 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
741 if cp2target.get(ns_cp):
742 vld["target"] = cp2target[ns_cp]
743
744 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
745 # check if this network needs SDN assist
746 target_sdn = None
747 if vld.get("pci-interfaces"):
748 db_vim = get_vim_account(vnfr["vim-account-id"])
749 sdnc_id = db_vim["config"].get("sdn-controller")
750 if sdnc_id:
751 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
752 target_sdn = "sdn:{}".format(sdnc_id)
753 vld["vim_info"][target_sdn] = {
754 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
755
756 # check at vnfd descriptor, if there is an ip-profile
757 vld_params = {}
758 vnfd_vlp = find_in_list(
759 get_virtual_link_profiles(vnfd),
760 lambda a_link_profile: a_link_profile["id"] == vld["id"]
761 )
762 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
763 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
764 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
765 ip_profile_dest_data = {}
766 if "ip-version" in ip_profile_source_data:
767 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
768 if "cidr" in ip_profile_source_data:
769 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
770 if "gateway-ip" in ip_profile_source_data:
771 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
772 if "dhcp-enabled" in ip_profile_source_data:
773 ip_profile_dest_data["dhcp-params"] = {
774 "enabled": ip_profile_source_data["dhcp-enabled"]
775 }
776
777 vld_params["ip-profile"] = ip_profile_dest_data
778 # update vld_params with instantiation params
779 if vnf_params:
780 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
781 lambda i_vld: i_vld["name"] == vld["id"])
782 if vld_instantiation_params:
783 vld_params.update(vld_instantiation_params)
784 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
785
786 vdur_list = []
787 for vdur in target_vnf.get("vdur", ()):
788 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
789 continue # This vdu must not be created
790 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
791
792 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
793
794 if ssh_keys_all:
795 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
796 vnf_configuration = get_configuration(vnfd, vnfd["id"])
797 if vdu_configuration and vdu_configuration.get("config-access") and \
798 vdu_configuration.get("config-access").get("ssh-access"):
799 vdur["ssh-keys"] = ssh_keys_all
800 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
801 elif vnf_configuration and vnf_configuration.get("config-access") and \
802 vnf_configuration.get("config-access").get("ssh-access") and \
803 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
804 vdur["ssh-keys"] = ssh_keys_all
805 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
806 elif ssh_keys_instantiation and \
807 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
808 vdur["ssh-keys"] = ssh_keys_instantiation
809
810 self.logger.debug("NS > vdur > {}".format(vdur))
811
812 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
813 # cloud-init
814 if vdud.get("cloud-init-file"):
815 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
816 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
817 if vdur["cloud-init"] not in target["cloud_init_content"]:
818 base_folder = vnfd["_admin"]["storage"]
819 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
820 vdud.get("cloud-init-file"))
821 with self.fs.file_open(cloud_init_file, "r") as ci_file:
822 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
823 elif vdud.get("cloud-init"):
824 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
825 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
826 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
827 vdur["additionalParams"] = vdur.get("additionalParams") or {}
828 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
829 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
830 vdur["additionalParams"] = deploy_params_vdu
831
832 # flavor
833 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
834 if target_vim not in ns_flavor["vim_info"]:
835 ns_flavor["vim_info"][target_vim] = {}
836
837 # deal with images
838 # in case alternative images are provided we must check if they should be applied
839 # for the vim_type, modify the vim_type taking into account
840 ns_image_id = int(vdur["ns-image-id"])
841 if vdur.get("alt-image-ids"):
842 db_vim = get_vim_account(vnfr["vim-account-id"])
843 vim_type = db_vim["vim_type"]
844 for alt_image_id in vdur.get("alt-image-ids"):
845 ns_alt_image = target["image"][int(alt_image_id)]
846 if vim_type == ns_alt_image.get("vim-type"):
847 # must use alternative image
848 self.logger.debug("use alternative image id: {}".format(alt_image_id))
849 ns_image_id = alt_image_id
850 vdur["ns-image-id"] = ns_image_id
851 break
852 ns_image = target["image"][int(ns_image_id)]
853 if target_vim not in ns_image["vim_info"]:
854 ns_image["vim_info"][target_vim] = {}
855
856 vdur["vim_info"] = {target_vim: {}}
857 # instantiation parameters
858 # if vnf_params:
859 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
860 # vdud["id"]), None)
861 vdur_list.append(vdur)
862 target_vnf["vdur"] = vdur_list
863 target["vnf"].append(target_vnf)
864
865 desc = await self.RO.deploy(nsr_id, target)
866 self.logger.debug("RO return > {}".format(desc))
867 action_id = desc["action_id"]
868 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
869
870 # Updating NSR
871 db_nsr_update = {
872 "_admin.deployed.RO.operational-status": "running",
873 "detailed-status": " ".join(stage)
874 }
875 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
876 self.update_db_2("nsrs", nsr_id, db_nsr_update)
877 self._write_op_status(nslcmop_id, stage)
878 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
879 return
880
881 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
882 detailed_status_old = None
883 db_nsr_update = {}
884 start_time = start_time or time()
885 while time() <= start_time + timeout:
886 desc_status = await self.RO.status(nsr_id, action_id)
887 self.logger.debug("Wait NG RO > {}".format(desc_status))
888 if desc_status["status"] == "FAILED":
889 raise NgRoException(desc_status["details"])
890 elif desc_status["status"] == "BUILD":
891 if stage:
892 stage[2] = "VIM: ({})".format(desc_status["details"])
893 elif desc_status["status"] == "DONE":
894 if stage:
895 stage[2] = "Deployed at VIM"
896 break
897 else:
898 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
899 if stage and nslcmop_id and stage[2] != detailed_status_old:
900 detailed_status_old = stage[2]
901 db_nsr_update["detailed-status"] = " ".join(stage)
902 self.update_db_2("nsrs", nsr_id, db_nsr_update)
903 self._write_op_status(nslcmop_id, stage)
904 await asyncio.sleep(15, loop=self.loop)
905 else: # timeout_ns_deploy
906 raise NgRoException("Timeout waiting ns to deploy")
907
908 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
909 db_nsr_update = {}
910 failed_detail = []
911 action_id = None
912 start_deploy = time()
913 try:
914 target = {
915 "ns": {"vld": []},
916 "vnf": [],
917 "image": [],
918 "flavor": [],
919 "action_id": nslcmop_id
920 }
921 desc = await self.RO.deploy(nsr_id, target)
922 action_id = desc["action_id"]
923 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
924 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
925 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
926
927 # wait until done
928 delete_timeout = 20 * 60 # 20 minutes
929 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
930
931 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
932 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
933 # delete all nsr
934 await self.RO.delete(nsr_id)
935 except Exception as e:
936 if isinstance(e, NgRoException) and e.http_code == 404: # not found
937 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
938 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
939 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
940 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
941 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
942 failed_detail.append("delete conflict: {}".format(e))
943 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
944 else:
945 failed_detail.append("delete error: {}".format(e))
946 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
947
948 if failed_detail:
949 stage[2] = "Error deleting from VIM"
950 else:
951 stage[2] = "Deleted from VIM"
952 db_nsr_update["detailed-status"] = " ".join(stage)
953 self.update_db_2("nsrs", nsr_id, db_nsr_update)
954 self._write_op_status(nslcmop_id, stage)
955
956 if failed_detail:
957 raise LcmException("; ".join(failed_detail))
958 return
959
960 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
961 n2vc_key_list, stage):
962 """
963 Instantiate at RO
964 :param logging_text: preffix text to use at logging
965 :param nsr_id: nsr identity
966 :param nsd: database content of ns descriptor
967 :param db_nsr: database content of ns record
968 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
969 :param db_vnfrs:
970 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
971 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
972 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
973 :return: None or exception
974 """
975 try:
976 start_deploy = time()
977 ns_params = db_nslcmop.get("operationParams")
978 if ns_params and ns_params.get("timeout_ns_deploy"):
979 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
980 else:
981 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
982
983 # Check for and optionally request placement optimization. Database will be updated if placement activated
984 stage[2] = "Waiting for Placement."
985 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
986 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
987 for vnfr in db_vnfrs.values():
988 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
989 break
990 else:
991 ns_params["vimAccountId"] == vnfr["vim-account-id"]
992
993 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
994 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
995 except Exception as e:
996 stage[2] = "ERROR deploying at VIM"
997 self.set_vnfr_at_error(db_vnfrs, str(e))
998 self.logger.error("Error deploying at VIM {}".format(e),
999 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
1000 NgRoException)))
1001 raise
1002
1003 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1004 """
1005 Wait for kdu to be up, get ip address
1006 :param logging_text: prefix use for logging
1007 :param nsr_id:
1008 :param vnfr_id:
1009 :param kdu_name:
1010 :return: IP address
1011 """
1012
1013 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1014 nb_tries = 0
1015
1016 while nb_tries < 360:
1017 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1018 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1019 if not kdur:
1020 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1021 if kdur.get("status"):
1022 if kdur["status"] in ("READY", "ENABLED"):
1023 return kdur.get("ip-address")
1024 else:
1025 raise LcmException("target KDU={} is in error state".format(kdu_name))
1026
1027 await asyncio.sleep(10, loop=self.loop)
1028 nb_tries += 1
1029 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1030
1031 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1032 """
1033 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1034 :param logging_text: prefix use for logging
1035 :param nsr_id:
1036 :param vnfr_id:
1037 :param vdu_id:
1038 :param vdu_index:
1039 :param pub_key: public ssh key to inject, None to skip
1040 :param user: user to apply the public ssh key
1041 :return: IP address
1042 """
1043
1044 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1045 ro_nsr_id = None
1046 ip_address = None
1047 nb_tries = 0
1048 target_vdu_id = None
1049 ro_retries = 0
1050
1051 while True:
1052
1053 ro_retries += 1
1054 if ro_retries >= 360: # 1 hour
1055 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1056
1057 await asyncio.sleep(10, loop=self.loop)
1058
1059 # get ip address
1060 if not target_vdu_id:
1061 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1062
1063 if not vdu_id: # for the VNF case
1064 if db_vnfr.get("status") == "ERROR":
1065 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1066 ip_address = db_vnfr.get("ip-address")
1067 if not ip_address:
1068 continue
1069 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1070 else: # VDU case
1071 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1072 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1073
1074 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1075 vdur = db_vnfr["vdur"][0]
1076 if not vdur:
1077 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1078 vdu_index))
1079 # New generation RO stores information at "vim_info"
1080 ng_ro_status = None
1081 target_vim = None
1082 if vdur.get("vim_info"):
1083 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1084 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1085 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1086 ip_address = vdur.get("ip-address")
1087 if not ip_address:
1088 continue
1089 target_vdu_id = vdur["vdu-id-ref"]
1090 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1091 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1092
1093 if not target_vdu_id:
1094 continue
1095
1096 # inject public key into machine
1097 if pub_key and user:
1098 self.logger.debug(logging_text + "Inserting RO key")
1099 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1100 if vdur.get("pdu-type"):
1101 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1102 return ip_address
1103 try:
1104 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1105 if self.ng_ro:
1106 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1107 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1108 }
1109 desc = await self.RO.deploy(nsr_id, target)
1110 action_id = desc["action_id"]
1111 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1112 break
1113 else:
1114 # wait until NS is deployed at RO
1115 if not ro_nsr_id:
1116 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1117 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1118 if not ro_nsr_id:
1119 continue
1120 result_dict = await self.RO.create_action(
1121 item="ns",
1122 item_id_name=ro_nsr_id,
1123 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1124 )
1125 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1126 if not result_dict or not isinstance(result_dict, dict):
1127 raise LcmException("Unknown response from RO when injecting key")
1128 for result in result_dict.values():
1129 if result.get("vim_result") == 200:
1130 break
1131 else:
1132 raise ROclient.ROClientException("error injecting key: {}".format(
1133 result.get("description")))
1134 break
1135 except NgRoException as e:
1136 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1137 except ROclient.ROClientException as e:
1138 if not nb_tries:
1139 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1140 format(e, 20*10))
1141 nb_tries += 1
1142 if nb_tries >= 20:
1143 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1144 else:
1145 break
1146
1147 return ip_address
1148
1149 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1150 """
1151 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1152 """
1153 my_vca = vca_deployed_list[vca_index]
1154 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1155 # vdu or kdu: no dependencies
1156 return
1157 timeout = 300
1158 while timeout >= 0:
1159 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1160 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1161 configuration_status_list = db_nsr["configurationStatus"]
1162 for index, vca_deployed in enumerate(configuration_status_list):
1163 if index == vca_index:
1164 # myself
1165 continue
1166 if not my_vca.get("member-vnf-index") or \
1167 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1168 internal_status = configuration_status_list[index].get("status")
1169 if internal_status == 'READY':
1170 continue
1171 elif internal_status == 'BROKEN':
1172 raise LcmException("Configuration aborted because dependent charm/s has failed")
1173 else:
1174 break
1175 else:
1176 # no dependencies, return
1177 return
1178 await asyncio.sleep(10)
1179 timeout -= 1
1180
1181 raise LcmException("Configuration aborted because dependent charm/s timeout")
1182
1183 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1184 return (
1185 deep_get(db_vnfr, ("vca-id",)) or
1186 deep_get(db_nsr, ("instantiate_params", "vcaId"))
1187 )
1188
1189 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1190 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1191 ee_config_descriptor):
1192 nsr_id = db_nsr["_id"]
1193 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1194 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1195 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1196 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1197 db_dict = {
1198 'collection': 'nsrs',
1199 'filter': {'_id': nsr_id},
1200 'path': db_update_entry
1201 }
1202 step = ""
1203 try:
1204
1205 element_type = 'NS'
1206 element_under_configuration = nsr_id
1207
1208 vnfr_id = None
1209 if db_vnfr:
1210 vnfr_id = db_vnfr["_id"]
1211 osm_config["osm"]["vnf_id"] = vnfr_id
1212
1213 namespace = "{nsi}.{ns}".format(
1214 nsi=nsi_id if nsi_id else "",
1215 ns=nsr_id)
1216
1217 if vnfr_id:
1218 element_type = 'VNF'
1219 element_under_configuration = vnfr_id
1220 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1221 if vdu_id:
1222 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1223 element_type = 'VDU'
1224 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1225 osm_config["osm"]["vdu_id"] = vdu_id
1226 elif kdu_name:
1227 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1228 element_type = 'KDU'
1229 element_under_configuration = kdu_name
1230 osm_config["osm"]["kdu_name"] = kdu_name
1231
1232 # Get artifact path
1233 artifact_path = "{}/{}/{}/{}".format(
1234 base_folder["folder"],
1235 base_folder["pkg-dir"],
1236 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1237 vca_name
1238 )
1239
1240 self.logger.debug("Artifact path > {}".format(artifact_path))
1241
1242 # get initial_config_primitive_list that applies to this element
1243 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1244
1245 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1246
1247 # add config if not present for NS charm
1248 ee_descriptor_id = ee_config_descriptor.get("id")
1249 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1250 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1251 vca_deployed, ee_descriptor_id)
1252
1253 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1254 # n2vc_redesign STEP 3.1
1255 # find old ee_id if exists
1256 ee_id = vca_deployed.get("ee_id")
1257
1258 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1259 # create or register execution environment in VCA
1260 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1261
1262 self._write_configuration_status(
1263 nsr_id=nsr_id,
1264 vca_index=vca_index,
1265 status='CREATING',
1266 element_under_configuration=element_under_configuration,
1267 element_type=element_type
1268 )
1269
1270 step = "create execution environment"
1271 self.logger.debug(logging_text + step)
1272
1273 ee_id = None
1274 credentials = None
1275 if vca_type == "k8s_proxy_charm":
1276 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1277 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1278 namespace=namespace,
1279 artifact_path=artifact_path,
1280 db_dict=db_dict,
1281 vca_id=vca_id,
1282 )
1283 elif vca_type == "helm" or vca_type == "helm-v3":
1284 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1285 namespace=namespace,
1286 reuse_ee_id=ee_id,
1287 db_dict=db_dict,
1288 config=osm_config,
1289 artifact_path=artifact_path,
1290 vca_type=vca_type
1291 )
1292 else:
1293 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1294 namespace=namespace,
1295 reuse_ee_id=ee_id,
1296 db_dict=db_dict,
1297 vca_id=vca_id,
1298 )
1299
1300 elif vca_type == "native_charm":
1301 step = "Waiting to VM being up and getting IP address"
1302 self.logger.debug(logging_text + step)
1303 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1304 user=None, pub_key=None)
1305 credentials = {"hostname": rw_mgmt_ip}
1306 # get username
1307 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1308 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1309 # merged. Meanwhile let's get username from initial-config-primitive
1310 if not username and initial_config_primitive_list:
1311 for config_primitive in initial_config_primitive_list:
1312 for param in config_primitive.get("parameter", ()):
1313 if param["name"] == "ssh-username":
1314 username = param["value"]
1315 break
1316 if not username:
1317 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1318 "'config-access.ssh-access.default-user'")
1319 credentials["username"] = username
1320 # n2vc_redesign STEP 3.2
1321
1322 self._write_configuration_status(
1323 nsr_id=nsr_id,
1324 vca_index=vca_index,
1325 status='REGISTERING',
1326 element_under_configuration=element_under_configuration,
1327 element_type=element_type
1328 )
1329
1330 step = "register execution environment {}".format(credentials)
1331 self.logger.debug(logging_text + step)
1332 ee_id = await self.vca_map[vca_type].register_execution_environment(
1333 credentials=credentials,
1334 namespace=namespace,
1335 db_dict=db_dict,
1336 vca_id=vca_id,
1337 )
1338
1339 # for compatibility with MON/POL modules, the need model and application name at database
1340 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1341 ee_id_parts = ee_id.split('.')
1342 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1343 if len(ee_id_parts) >= 2:
1344 model_name = ee_id_parts[0]
1345 application_name = ee_id_parts[1]
1346 db_nsr_update[db_update_entry + "model"] = model_name
1347 db_nsr_update[db_update_entry + "application"] = application_name
1348
1349 # n2vc_redesign STEP 3.3
1350 step = "Install configuration Software"
1351
1352 self._write_configuration_status(
1353 nsr_id=nsr_id,
1354 vca_index=vca_index,
1355 status='INSTALLING SW',
1356 element_under_configuration=element_under_configuration,
1357 element_type=element_type,
1358 other_update=db_nsr_update
1359 )
1360
1361 # TODO check if already done
1362 self.logger.debug(logging_text + step)
1363 config = None
1364 if vca_type == "native_charm":
1365 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1366 if config_primitive:
1367 config = self._map_primitive_params(
1368 config_primitive,
1369 {},
1370 deploy_params
1371 )
1372 num_units = 1
1373 if vca_type == "lxc_proxy_charm":
1374 if element_type == "NS":
1375 num_units = db_nsr.get("config-units") or 1
1376 elif element_type == "VNF":
1377 num_units = db_vnfr.get("config-units") or 1
1378 elif element_type == "VDU":
1379 for v in db_vnfr["vdur"]:
1380 if vdu_id == v["vdu-id-ref"]:
1381 num_units = v.get("config-units") or 1
1382 break
1383 if vca_type != "k8s_proxy_charm":
1384 await self.vca_map[vca_type].install_configuration_sw(
1385 ee_id=ee_id,
1386 artifact_path=artifact_path,
1387 db_dict=db_dict,
1388 config=config,
1389 num_units=num_units,
1390 vca_id=vca_id,
1391 )
1392
1393 # write in db flag of configuration_sw already installed
1394 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1395
1396 # add relations for this VCA (wait for other peers related with this VCA)
1397 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1398 vca_index=vca_index, vca_id=vca_id, vca_type=vca_type)
1399
1400 # if SSH access is required, then get execution environment SSH public
1401 # if native charm we have waited already to VM be UP
1402 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1403 pub_key = None
1404 user = None
1405 # self.logger.debug("get ssh key block")
1406 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1407 # self.logger.debug("ssh key needed")
1408 # Needed to inject a ssh key
1409 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1410 step = "Install configuration Software, getting public ssh key"
1411 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1412 ee_id=ee_id,
1413 db_dict=db_dict,
1414 vca_id=vca_id
1415 )
1416
1417 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1418 else:
1419 # self.logger.debug("no need to get ssh key")
1420 step = "Waiting to VM being up and getting IP address"
1421 self.logger.debug(logging_text + step)
1422
1423 # n2vc_redesign STEP 5.1
1424 # wait for RO (ip-address) Insert pub_key into VM
1425 if vnfr_id:
1426 if kdu_name:
1427 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1428 else:
1429 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1430 vdu_index, user=user, pub_key=pub_key)
1431 else:
1432 rw_mgmt_ip = None # This is for a NS configuration
1433
1434 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1435
1436 # store rw_mgmt_ip in deploy params for later replacement
1437 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1438
1439 # n2vc_redesign STEP 6 Execute initial config primitive
1440 step = 'execute initial config primitive'
1441
1442 # wait for dependent primitives execution (NS -> VNF -> VDU)
1443 if initial_config_primitive_list:
1444 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1445
1446 # stage, in function of element type: vdu, kdu, vnf or ns
1447 my_vca = vca_deployed_list[vca_index]
1448 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1449 # VDU or KDU
1450 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1451 elif my_vca.get("member-vnf-index"):
1452 # VNF
1453 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1454 else:
1455 # NS
1456 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1457
1458 self._write_configuration_status(
1459 nsr_id=nsr_id,
1460 vca_index=vca_index,
1461 status='EXECUTING PRIMITIVE'
1462 )
1463
1464 self._write_op_status(
1465 op_id=nslcmop_id,
1466 stage=stage
1467 )
1468
1469 check_if_terminated_needed = True
1470 for initial_config_primitive in initial_config_primitive_list:
1471 # adding information on the vca_deployed if it is a NS execution environment
1472 if not vca_deployed["member-vnf-index"]:
1473 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1474 # TODO check if already done
1475 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1476
1477 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1478 self.logger.debug(logging_text + step)
1479 await self.vca_map[vca_type].exec_primitive(
1480 ee_id=ee_id,
1481 primitive_name=initial_config_primitive["name"],
1482 params_dict=primitive_params_,
1483 db_dict=db_dict,
1484 vca_id=vca_id,
1485 )
1486 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1487 if check_if_terminated_needed:
1488 if config_descriptor.get('terminate-config-primitive'):
1489 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1490 check_if_terminated_needed = False
1491
1492 # TODO register in database that primitive is done
1493
1494 # STEP 7 Configure metrics
1495 if vca_type == "helm" or vca_type == "helm-v3":
1496 prometheus_jobs = await self.add_prometheus_metrics(
1497 ee_id=ee_id,
1498 artifact_path=artifact_path,
1499 ee_config_descriptor=ee_config_descriptor,
1500 vnfr_id=vnfr_id,
1501 nsr_id=nsr_id,
1502 target_ip=rw_mgmt_ip,
1503 )
1504 if prometheus_jobs:
1505 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1506
1507 step = "instantiated at VCA"
1508 self.logger.debug(logging_text + step)
1509
1510 self._write_configuration_status(
1511 nsr_id=nsr_id,
1512 vca_index=vca_index,
1513 status='READY'
1514 )
1515
1516 except Exception as e: # TODO not use Exception but N2VC exception
1517 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1518 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1519 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1520 self._write_configuration_status(
1521 nsr_id=nsr_id,
1522 vca_index=vca_index,
1523 status='BROKEN'
1524 )
1525 raise LcmException("{} {}".format(step, e)) from e
1526
1527 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1528 error_description: str = None, error_detail: str = None, other_update: dict = None):
1529 """
1530 Update db_nsr fields.
1531 :param nsr_id:
1532 :param ns_state:
1533 :param current_operation:
1534 :param current_operation_id:
1535 :param error_description:
1536 :param error_detail:
1537 :param other_update: Other required changes at database if provided, will be cleared
1538 :return:
1539 """
1540 try:
1541 db_dict = other_update or {}
1542 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1543 db_dict["_admin.current-operation"] = current_operation_id
1544 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1545 db_dict["currentOperation"] = current_operation
1546 db_dict["currentOperationID"] = current_operation_id
1547 db_dict["errorDescription"] = error_description
1548 db_dict["errorDetail"] = error_detail
1549
1550 if ns_state:
1551 db_dict["nsState"] = ns_state
1552 self.update_db_2("nsrs", nsr_id, db_dict)
1553 except DbException as e:
1554 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1555
1556 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1557 operation_state: str = None, other_update: dict = None):
1558 try:
1559 db_dict = other_update or {}
1560 db_dict['queuePosition'] = queuePosition
1561 if isinstance(stage, list):
1562 db_dict['stage'] = stage[0]
1563 db_dict['detailed-status'] = " ".join(stage)
1564 elif stage is not None:
1565 db_dict['stage'] = str(stage)
1566
1567 if error_message is not None:
1568 db_dict['errorMessage'] = error_message
1569 if operation_state is not None:
1570 db_dict['operationState'] = operation_state
1571 db_dict["statusEnteredTime"] = time()
1572 self.update_db_2("nslcmops", op_id, db_dict)
1573 except DbException as e:
1574 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1575
1576 def _write_all_config_status(self, db_nsr: dict, status: str):
1577 try:
1578 nsr_id = db_nsr["_id"]
1579 # configurationStatus
1580 config_status = db_nsr.get('configurationStatus')
1581 if config_status:
1582 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1583 enumerate(config_status) if v}
1584 # update status
1585 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1586
1587 except DbException as e:
1588 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1589
1590 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1591 element_under_configuration: str = None, element_type: str = None,
1592 other_update: dict = None):
1593
1594 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1595 # .format(vca_index, status))
1596
1597 try:
1598 db_path = 'configurationStatus.{}.'.format(vca_index)
1599 db_dict = other_update or {}
1600 if status:
1601 db_dict[db_path + 'status'] = status
1602 if element_under_configuration:
1603 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1604 if element_type:
1605 db_dict[db_path + 'elementType'] = element_type
1606 self.update_db_2("nsrs", nsr_id, db_dict)
1607 except DbException as e:
1608 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1609 .format(status, nsr_id, vca_index, e))
1610
1611 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1612 """
1613 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1614 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1615 Database is used because the result can be obtained from a different LCM worker in case of HA.
1616 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1617 :param db_nslcmop: database content of nslcmop
1618 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1619 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1620 computed 'vim-account-id'
1621 """
1622 modified = False
1623 nslcmop_id = db_nslcmop['_id']
1624 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1625 if placement_engine == "PLA":
1626 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1627 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1628 db_poll_interval = 5
1629 wait = db_poll_interval * 10
1630 pla_result = None
1631 while not pla_result and wait >= 0:
1632 await asyncio.sleep(db_poll_interval)
1633 wait -= db_poll_interval
1634 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1635 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1636
1637 if not pla_result:
1638 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1639
1640 for pla_vnf in pla_result['vnf']:
1641 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1642 if not pla_vnf.get('vimAccountId') or not vnfr:
1643 continue
1644 modified = True
1645 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1646 # Modifies db_vnfrs
1647 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1648 return modified
1649
1650 def update_nsrs_with_pla_result(self, params):
1651 try:
1652 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1653 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1654 except Exception as e:
1655 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1656
1657 async def instantiate(self, nsr_id, nslcmop_id):
1658 """
1659
1660 :param nsr_id: ns instance to deploy
1661 :param nslcmop_id: operation to run
1662 :return:
1663 """
1664
1665 # Try to lock HA task here
1666 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1667 if not task_is_locked_by_me:
1668 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1669 return
1670
1671 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1672 self.logger.debug(logging_text + "Enter")
1673
1674 # get all needed from database
1675
1676 # database nsrs record
1677 db_nsr = None
1678
1679 # database nslcmops record
1680 db_nslcmop = None
1681
1682 # update operation on nsrs
1683 db_nsr_update = {}
1684 # update operation on nslcmops
1685 db_nslcmop_update = {}
1686
1687 nslcmop_operation_state = None
1688 db_vnfrs = {} # vnf's info indexed by member-index
1689 # n2vc_info = {}
1690 tasks_dict_info = {} # from task to info text
1691 exc = None
1692 error_list = []
1693 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1694 # ^ stage, step, VIM progress
1695 try:
1696 # wait for any previous tasks in process
1697 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1698
1699 stage[1] = "Sync filesystem from database."
1700 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1701
1702 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1703 stage[1] = "Reading from database."
1704 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1705 db_nsr_update["detailed-status"] = "creating"
1706 db_nsr_update["operational-status"] = "init"
1707 self._write_ns_status(
1708 nsr_id=nsr_id,
1709 ns_state="BUILDING",
1710 current_operation="INSTANTIATING",
1711 current_operation_id=nslcmop_id,
1712 other_update=db_nsr_update
1713 )
1714 self._write_op_status(
1715 op_id=nslcmop_id,
1716 stage=stage,
1717 queuePosition=0
1718 )
1719
1720 # read from db: operation
1721 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1722 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1723 ns_params = db_nslcmop.get("operationParams")
1724 if ns_params and ns_params.get("timeout_ns_deploy"):
1725 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1726 else:
1727 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1728
1729 # read from db: ns
1730 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1731 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1732 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1733 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1734 db_nsr["nsd"] = nsd
1735 # nsr_name = db_nsr["name"] # TODO short-name??
1736
1737 # read from db: vnf's of this ns
1738 stage[1] = "Getting vnfrs from db."
1739 self.logger.debug(logging_text + stage[1])
1740 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1741
1742 # read from db: vnfd's for every vnf
1743 db_vnfds = [] # every vnfd data
1744
1745 # for each vnf in ns, read vnfd
1746 for vnfr in db_vnfrs_list:
1747 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1748 vnfd_id = vnfr["vnfd-id"]
1749 vnfd_ref = vnfr["vnfd-ref"]
1750
1751 # if we haven't this vnfd, read it from db
1752 if vnfd_id not in db_vnfds:
1753 # read from db
1754 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1755 self.logger.debug(logging_text + stage[1])
1756 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1757
1758 # store vnfd
1759 db_vnfds.append(vnfd)
1760
1761 # Get or generates the _admin.deployed.VCA list
1762 vca_deployed_list = None
1763 if db_nsr["_admin"].get("deployed"):
1764 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1765 if vca_deployed_list is None:
1766 vca_deployed_list = []
1767 configuration_status_list = []
1768 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1769 db_nsr_update["configurationStatus"] = configuration_status_list
1770 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1771 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1772 elif isinstance(vca_deployed_list, dict):
1773 # maintain backward compatibility. Change a dict to list at database
1774 vca_deployed_list = list(vca_deployed_list.values())
1775 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1776 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1777
1778 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1779 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1780 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1781
1782 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1783 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1784 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1785 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1786
1787 # n2vc_redesign STEP 2 Deploy Network Scenario
1788 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1789 self._write_op_status(
1790 op_id=nslcmop_id,
1791 stage=stage
1792 )
1793
1794 stage[1] = "Deploying KDUs."
1795 # self.logger.debug(logging_text + "Before deploy_kdus")
1796 # Call to deploy_kdus in case exists the "vdu:kdu" param
1797 await self.deploy_kdus(
1798 logging_text=logging_text,
1799 nsr_id=nsr_id,
1800 nslcmop_id=nslcmop_id,
1801 db_vnfrs=db_vnfrs,
1802 db_vnfds=db_vnfds,
1803 task_instantiation_info=tasks_dict_info,
1804 )
1805
1806 stage[1] = "Getting VCA public key."
1807 # n2vc_redesign STEP 1 Get VCA public ssh-key
1808 # feature 1429. Add n2vc public key to needed VMs
1809 n2vc_key = self.n2vc.get_public_key()
1810 n2vc_key_list = [n2vc_key]
1811 if self.vca_config.get("public_key"):
1812 n2vc_key_list.append(self.vca_config["public_key"])
1813
1814 stage[1] = "Deploying NS at VIM."
1815 task_ro = asyncio.ensure_future(
1816 self.instantiate_RO(
1817 logging_text=logging_text,
1818 nsr_id=nsr_id,
1819 nsd=nsd,
1820 db_nsr=db_nsr,
1821 db_nslcmop=db_nslcmop,
1822 db_vnfrs=db_vnfrs,
1823 db_vnfds=db_vnfds,
1824 n2vc_key_list=n2vc_key_list,
1825 stage=stage
1826 )
1827 )
1828 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1829 tasks_dict_info[task_ro] = "Deploying at VIM"
1830
1831 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1832 stage[1] = "Deploying Execution Environments."
1833 self.logger.debug(logging_text + stage[1])
1834
1835 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1836 for vnf_profile in get_vnf_profiles(nsd):
1837 vnfd_id = vnf_profile["vnfd-id"]
1838 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1839 member_vnf_index = str(vnf_profile["id"])
1840 db_vnfr = db_vnfrs[member_vnf_index]
1841 base_folder = vnfd["_admin"]["storage"]
1842 vdu_id = None
1843 vdu_index = 0
1844 vdu_name = None
1845 kdu_name = None
1846
1847 # Get additional parameters
1848 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1849 if db_vnfr.get("additionalParamsForVnf"):
1850 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1851
1852 descriptor_config = get_configuration(vnfd, vnfd["id"])
1853 if descriptor_config:
1854 self._deploy_n2vc(
1855 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1856 db_nsr=db_nsr,
1857 db_vnfr=db_vnfr,
1858 nslcmop_id=nslcmop_id,
1859 nsr_id=nsr_id,
1860 nsi_id=nsi_id,
1861 vnfd_id=vnfd_id,
1862 vdu_id=vdu_id,
1863 kdu_name=kdu_name,
1864 member_vnf_index=member_vnf_index,
1865 vdu_index=vdu_index,
1866 vdu_name=vdu_name,
1867 deploy_params=deploy_params,
1868 descriptor_config=descriptor_config,
1869 base_folder=base_folder,
1870 task_instantiation_info=tasks_dict_info,
1871 stage=stage
1872 )
1873
1874 # Deploy charms for each VDU that supports one.
1875 for vdud in get_vdu_list(vnfd):
1876 vdu_id = vdud["id"]
1877 descriptor_config = get_configuration(vnfd, vdu_id)
1878 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1879
1880 if vdur.get("additionalParams"):
1881 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1882 else:
1883 deploy_params_vdu = deploy_params
1884 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1885 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1886
1887 self.logger.debug("VDUD > {}".format(vdud))
1888 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1889 if descriptor_config:
1890 vdu_name = None
1891 kdu_name = None
1892 for vdu_index in range(vdud_count):
1893 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1894 self._deploy_n2vc(
1895 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1896 member_vnf_index, vdu_id, vdu_index),
1897 db_nsr=db_nsr,
1898 db_vnfr=db_vnfr,
1899 nslcmop_id=nslcmop_id,
1900 nsr_id=nsr_id,
1901 nsi_id=nsi_id,
1902 vnfd_id=vnfd_id,
1903 vdu_id=vdu_id,
1904 kdu_name=kdu_name,
1905 member_vnf_index=member_vnf_index,
1906 vdu_index=vdu_index,
1907 vdu_name=vdu_name,
1908 deploy_params=deploy_params_vdu,
1909 descriptor_config=descriptor_config,
1910 base_folder=base_folder,
1911 task_instantiation_info=tasks_dict_info,
1912 stage=stage
1913 )
1914 for kdud in get_kdu_list(vnfd):
1915 kdu_name = kdud["name"]
1916 descriptor_config = get_configuration(vnfd, kdu_name)
1917 if descriptor_config:
1918 vdu_id = None
1919 vdu_index = 0
1920 vdu_name = None
1921 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1922 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1923 if kdur.get("additionalParams"):
1924 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1925
1926 self._deploy_n2vc(
1927 logging_text=logging_text,
1928 db_nsr=db_nsr,
1929 db_vnfr=db_vnfr,
1930 nslcmop_id=nslcmop_id,
1931 nsr_id=nsr_id,
1932 nsi_id=nsi_id,
1933 vnfd_id=vnfd_id,
1934 vdu_id=vdu_id,
1935 kdu_name=kdu_name,
1936 member_vnf_index=member_vnf_index,
1937 vdu_index=vdu_index,
1938 vdu_name=vdu_name,
1939 deploy_params=deploy_params_kdu,
1940 descriptor_config=descriptor_config,
1941 base_folder=base_folder,
1942 task_instantiation_info=tasks_dict_info,
1943 stage=stage
1944 )
1945
1946 # Check if this NS has a charm configuration
1947 descriptor_config = nsd.get("ns-configuration")
1948 if descriptor_config and descriptor_config.get("juju"):
1949 vnfd_id = None
1950 db_vnfr = None
1951 member_vnf_index = None
1952 vdu_id = None
1953 kdu_name = None
1954 vdu_index = 0
1955 vdu_name = None
1956
1957 # Get additional parameters
1958 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1959 if db_nsr.get("additionalParamsForNs"):
1960 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1961 base_folder = nsd["_admin"]["storage"]
1962 self._deploy_n2vc(
1963 logging_text=logging_text,
1964 db_nsr=db_nsr,
1965 db_vnfr=db_vnfr,
1966 nslcmop_id=nslcmop_id,
1967 nsr_id=nsr_id,
1968 nsi_id=nsi_id,
1969 vnfd_id=vnfd_id,
1970 vdu_id=vdu_id,
1971 kdu_name=kdu_name,
1972 member_vnf_index=member_vnf_index,
1973 vdu_index=vdu_index,
1974 vdu_name=vdu_name,
1975 deploy_params=deploy_params,
1976 descriptor_config=descriptor_config,
1977 base_folder=base_folder,
1978 task_instantiation_info=tasks_dict_info,
1979 stage=stage
1980 )
1981
1982 # rest of staff will be done at finally
1983
1984 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1985 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1986 exc = e
1987 except asyncio.CancelledError:
1988 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1989 exc = "Operation was cancelled"
1990 except Exception as e:
1991 exc = traceback.format_exc()
1992 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1993 finally:
1994 if exc:
1995 error_list.append(str(exc))
1996 try:
1997 # wait for pending tasks
1998 if tasks_dict_info:
1999 stage[1] = "Waiting for instantiate pending tasks."
2000 self.logger.debug(logging_text + stage[1])
2001 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2002 stage, nslcmop_id, nsr_id=nsr_id)
2003 stage[1] = stage[2] = ""
2004 except asyncio.CancelledError:
2005 error_list.append("Cancelled")
2006 # TODO cancel all tasks
2007 except Exception as exc:
2008 error_list.append(str(exc))
2009
2010 # update operation-status
2011 db_nsr_update["operational-status"] = "running"
2012 # let's begin with VCA 'configured' status (later we can change it)
2013 db_nsr_update["config-status"] = "configured"
2014 for task, task_name in tasks_dict_info.items():
2015 if not task.done() or task.cancelled() or task.exception():
2016 if task_name.startswith(self.task_name_deploy_vca):
2017 # A N2VC task is pending
2018 db_nsr_update["config-status"] = "failed"
2019 else:
2020 # RO or KDU task is pending
2021 db_nsr_update["operational-status"] = "failed"
2022
2023 # update status at database
2024 if error_list:
2025 error_detail = ". ".join(error_list)
2026 self.logger.error(logging_text + error_detail)
2027 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2028 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2029
2030 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2031 db_nslcmop_update["detailed-status"] = error_detail
2032 nslcmop_operation_state = "FAILED"
2033 ns_state = "BROKEN"
2034 else:
2035 error_detail = None
2036 error_description_nsr = error_description_nslcmop = None
2037 ns_state = "READY"
2038 db_nsr_update["detailed-status"] = "Done"
2039 db_nslcmop_update["detailed-status"] = "Done"
2040 nslcmop_operation_state = "COMPLETED"
2041
2042 if db_nsr:
2043 self._write_ns_status(
2044 nsr_id=nsr_id,
2045 ns_state=ns_state,
2046 current_operation="IDLE",
2047 current_operation_id=None,
2048 error_description=error_description_nsr,
2049 error_detail=error_detail,
2050 other_update=db_nsr_update
2051 )
2052 self._write_op_status(
2053 op_id=nslcmop_id,
2054 stage="",
2055 error_message=error_description_nslcmop,
2056 operation_state=nslcmop_operation_state,
2057 other_update=db_nslcmop_update,
2058 )
2059
2060 if nslcmop_operation_state:
2061 try:
2062 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2063 "operationState": nslcmop_operation_state},
2064 loop=self.loop)
2065 except Exception as e:
2066 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2067
2068 self.logger.debug(logging_text + "Exit")
2069 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2070
2071 async def _add_vca_relations(
2072 self,
2073 logging_text,
2074 nsr_id,
2075 vca_index: int,
2076 timeout: int = 3600,
2077 vca_type: str = None,
2078 vca_id: str = None,
2079 ) -> bool:
2080
2081 # steps:
2082 # 1. find all relations for this VCA
2083 # 2. wait for other peers related
2084 # 3. add relations
2085
2086 try:
2087 vca_type = vca_type or "lxc_proxy_charm"
2088
2089 # STEP 1: find all relations for this VCA
2090
2091 # read nsr record
2092 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2093 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2094
2095 # this VCA data
2096 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2097
2098 # read all ns-configuration relations
2099 ns_relations = list()
2100 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2101 if db_ns_relations:
2102 for r in db_ns_relations:
2103 # check if this VCA is in the relation
2104 if my_vca.get('member-vnf-index') in\
2105 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2106 ns_relations.append(r)
2107
2108 # read all vnf-configuration relations
2109 vnf_relations = list()
2110 db_vnfd_list = db_nsr.get('vnfd-id')
2111 if db_vnfd_list:
2112 for vnfd in db_vnfd_list:
2113 db_vnf_relations = None
2114 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2115 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2116 if db_vnf_configuration:
2117 db_vnf_relations = db_vnf_configuration.get("relation", [])
2118 if db_vnf_relations:
2119 for r in db_vnf_relations:
2120 # check if this VCA is in the relation
2121 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2122 vnf_relations.append(r)
2123
2124 # if no relations, terminate
2125 if not ns_relations and not vnf_relations:
2126 self.logger.debug(logging_text + ' No relations')
2127 return True
2128
2129 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2130
2131 # add all relations
2132 start = time()
2133 while True:
2134 # check timeout
2135 now = time()
2136 if now - start >= timeout:
2137 self.logger.error(logging_text + ' : timeout adding relations')
2138 return False
2139
2140 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2141 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2142
2143 # for each defined NS relation, find the VCA's related
2144 for r in ns_relations.copy():
2145 from_vca_ee_id = None
2146 to_vca_ee_id = None
2147 from_vca_endpoint = None
2148 to_vca_endpoint = None
2149 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2150 for vca in vca_list:
2151 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2152 and vca.get('config_sw_installed'):
2153 from_vca_ee_id = vca.get('ee_id')
2154 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2155 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2156 and vca.get('config_sw_installed'):
2157 to_vca_ee_id = vca.get('ee_id')
2158 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2159 if from_vca_ee_id and to_vca_ee_id:
2160 # add relation
2161 await self.vca_map[vca_type].add_relation(
2162 ee_id_1=from_vca_ee_id,
2163 ee_id_2=to_vca_ee_id,
2164 endpoint_1=from_vca_endpoint,
2165 endpoint_2=to_vca_endpoint,
2166 vca_id=vca_id,
2167 )
2168 # remove entry from relations list
2169 ns_relations.remove(r)
2170 else:
2171 # check failed peers
2172 try:
2173 vca_status_list = db_nsr.get('configurationStatus')
2174 if vca_status_list:
2175 for i in range(len(vca_list)):
2176 vca = vca_list[i]
2177 vca_status = vca_status_list[i]
2178 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2179 if vca_status.get('status') == 'BROKEN':
2180 # peer broken: remove relation from list
2181 ns_relations.remove(r)
2182 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2183 if vca_status.get('status') == 'BROKEN':
2184 # peer broken: remove relation from list
2185 ns_relations.remove(r)
2186 except Exception:
2187 # ignore
2188 pass
2189
2190 # for each defined VNF relation, find the VCA's related
2191 for r in vnf_relations.copy():
2192 from_vca_ee_id = None
2193 to_vca_ee_id = None
2194 from_vca_endpoint = None
2195 to_vca_endpoint = None
2196 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2197 for vca in vca_list:
2198 key_to_check = "vdu_id"
2199 if vca.get("vdu_id") is None:
2200 key_to_check = "vnfd_id"
2201 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2202 from_vca_ee_id = vca.get('ee_id')
2203 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2204 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2205 to_vca_ee_id = vca.get('ee_id')
2206 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2207 if from_vca_ee_id and to_vca_ee_id:
2208 # add relation
2209 await self.vca_map[vca_type].add_relation(
2210 ee_id_1=from_vca_ee_id,
2211 ee_id_2=to_vca_ee_id,
2212 endpoint_1=from_vca_endpoint,
2213 endpoint_2=to_vca_endpoint,
2214 vca_id=vca_id,
2215 )
2216 # remove entry from relations list
2217 vnf_relations.remove(r)
2218 else:
2219 # check failed peers
2220 try:
2221 vca_status_list = db_nsr.get('configurationStatus')
2222 if vca_status_list:
2223 for i in range(len(vca_list)):
2224 vca = vca_list[i]
2225 vca_status = vca_status_list[i]
2226 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2227 if vca_status.get('status') == 'BROKEN':
2228 # peer broken: remove relation from list
2229 vnf_relations.remove(r)
2230 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2231 if vca_status.get('status') == 'BROKEN':
2232 # peer broken: remove relation from list
2233 vnf_relations.remove(r)
2234 except Exception:
2235 # ignore
2236 pass
2237
2238 # wait for next try
2239 await asyncio.sleep(5.0)
2240
2241 if not ns_relations and not vnf_relations:
2242 self.logger.debug('Relations added')
2243 break
2244
2245 return True
2246
2247 except Exception as e:
2248 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2249 return False
2250
2251 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2252 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600,
2253 vca_id: str = None):
2254
2255 try:
2256 k8sclustertype = k8s_instance_info["k8scluster-type"]
2257 # Instantiate kdu
2258 db_dict_install = {"collection": "nsrs",
2259 "filter": {"_id": nsr_id},
2260 "path": nsr_db_path}
2261
2262 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2263 db_dict=db_dict_install,
2264 kdu_model=k8s_instance_info["kdu-model"],
2265 kdu_name=k8s_instance_info["kdu-name"],
2266 )
2267 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2268 await self.k8scluster_map[k8sclustertype].install(
2269 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2270 kdu_model=k8s_instance_info["kdu-model"],
2271 atomic=True,
2272 params=k8params,
2273 db_dict=db_dict_install,
2274 timeout=timeout,
2275 kdu_name=k8s_instance_info["kdu-name"],
2276 namespace=k8s_instance_info["namespace"],
2277 kdu_instance=kdu_instance,
2278 vca_id=vca_id,
2279 )
2280 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2281
2282 # Obtain services to obtain management service ip
2283 services = await self.k8scluster_map[k8sclustertype].get_services(
2284 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2285 kdu_instance=kdu_instance,
2286 namespace=k8s_instance_info["namespace"])
2287
2288 # Obtain management service info (if exists)
2289 vnfr_update_dict = {}
2290 kdu_config = get_configuration(vnfd, kdud["name"])
2291 if kdu_config:
2292 target_ee_list = kdu_config.get("execution-environment-list", [])
2293 else:
2294 target_ee_list = []
2295
2296 if services:
2297 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2298 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2299 for mgmt_service in mgmt_services:
2300 for service in services:
2301 if service["name"].startswith(mgmt_service["name"]):
2302 # Mgmt service found, Obtain service ip
2303 ip = service.get("external_ip", service.get("cluster_ip"))
2304 if isinstance(ip, list) and len(ip) == 1:
2305 ip = ip[0]
2306
2307 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2308
2309 # Check if must update also mgmt ip at the vnf
2310 service_external_cp = mgmt_service.get("external-connection-point-ref")
2311 if service_external_cp:
2312 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2313 vnfr_update_dict["ip-address"] = ip
2314
2315 if find_in_list(
2316 target_ee_list,
2317 lambda ee: ee.get("external-connection-point-ref", "") == service_external_cp
2318 ):
2319 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2320 break
2321 else:
2322 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2323
2324 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2325 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2326
2327 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2328 if kdu_config and kdu_config.get("initial-config-primitive") and \
2329 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2330 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2331 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2332
2333 for initial_config_primitive in initial_config_primitive_list:
2334 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2335
2336 await asyncio.wait_for(
2337 self.k8scluster_map[k8sclustertype].exec_primitive(
2338 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2339 kdu_instance=kdu_instance,
2340 primitive_name=initial_config_primitive["name"],
2341 params=primitive_params_, db_dict=db_dict_install,
2342 vca_id=vca_id,
2343 ),
2344 timeout=timeout
2345 )
2346
2347 except Exception as e:
2348 # Prepare update db with error and raise exception
2349 try:
2350 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2351 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2352 except Exception:
2353 # ignore to keep original exception
2354 pass
2355 # reraise original error
2356 raise
2357
2358 return kdu_instance
2359
2360 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2361 # Launch kdus if present in the descriptor
2362
2363 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2364
2365 async def _get_cluster_id(cluster_id, cluster_type):
2366 nonlocal k8scluster_id_2_uuic
2367 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2368 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2369
2370 # check if K8scluster is creating and wait look if previous tasks in process
2371 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2372 if task_dependency:
2373 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2374 self.logger.debug(logging_text + text)
2375 await asyncio.wait(task_dependency, timeout=3600)
2376
2377 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2378 if not db_k8scluster:
2379 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2380
2381 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2382 if not k8s_id:
2383 if cluster_type == "helm-chart-v3":
2384 try:
2385 # backward compatibility for existing clusters that have not been initialized for helm v3
2386 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2387 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2388 reuse_cluster_uuid=cluster_id)
2389 db_k8scluster_update = {}
2390 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2391 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2392 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2393 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2394 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2395 except Exception as e:
2396 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2397 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2398 cluster_type))
2399 else:
2400 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2401 format(cluster_id, cluster_type))
2402 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2403 return k8s_id
2404
2405 logging_text += "Deploy kdus: "
2406 step = ""
2407 try:
2408 db_nsr_update = {"_admin.deployed.K8s": []}
2409 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2410
2411 index = 0
2412 updated_cluster_list = []
2413 updated_v3_cluster_list = []
2414
2415 for vnfr_data in db_vnfrs.values():
2416 vca_id = self.get_vca_id(vnfr_data, {})
2417 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2418 # Step 0: Prepare and set parameters
2419 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2420 vnfd_id = vnfr_data.get('vnfd-id')
2421 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2422 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2423 namespace = kdur.get("k8s-namespace")
2424 if kdur.get("helm-chart"):
2425 kdumodel = kdur["helm-chart"]
2426 # Default version: helm3, if helm-version is v2 assign v2
2427 k8sclustertype = "helm-chart-v3"
2428 self.logger.debug("kdur: {}".format(kdur))
2429 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2430 k8sclustertype = "helm-chart"
2431 elif kdur.get("juju-bundle"):
2432 kdumodel = kdur["juju-bundle"]
2433 k8sclustertype = "juju-bundle"
2434 else:
2435 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2436 "juju-bundle. Maybe an old NBI version is running".
2437 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2438 # check if kdumodel is a file and exists
2439 try:
2440 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2441 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2442 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2443 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2444 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2445 kdumodel)
2446 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2447 kdumodel = self.fs.path + filename
2448 except (asyncio.TimeoutError, asyncio.CancelledError):
2449 raise
2450 except Exception: # it is not a file
2451 pass
2452
2453 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2454 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2455 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2456
2457 # Synchronize repos
2458 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2459 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2460 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2461 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2462 if del_repo_list or added_repo_dict:
2463 if k8sclustertype == "helm-chart":
2464 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2465 updated = {'_admin.helm_charts_added.' +
2466 item: name for item, name in added_repo_dict.items()}
2467 updated_cluster_list.append(cluster_uuid)
2468 elif k8sclustertype == "helm-chart-v3":
2469 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2470 updated = {'_admin.helm_charts_v3_added.' +
2471 item: name for item, name in added_repo_dict.items()}
2472 updated_v3_cluster_list.append(cluster_uuid)
2473 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2474 "'{}' to_delete: {}, to_add: {}".
2475 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2476 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2477
2478 # Instantiate kdu
2479 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2480 kdur["kdu-name"], k8s_cluster_id)
2481 k8s_instance_info = {"kdu-instance": None,
2482 "k8scluster-uuid": cluster_uuid,
2483 "k8scluster-type": k8sclustertype,
2484 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2485 "kdu-name": kdur["kdu-name"],
2486 "kdu-model": kdumodel,
2487 "namespace": namespace}
2488 db_path = "_admin.deployed.K8s.{}".format(index)
2489 db_nsr_update[db_path] = k8s_instance_info
2490 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2491 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2492 task = asyncio.ensure_future(
2493 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2494 k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id))
2495 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2496 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2497
2498 index += 1
2499
2500 except (LcmException, asyncio.CancelledError):
2501 raise
2502 except Exception as e:
2503 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2504 if isinstance(e, (N2VCException, DbException)):
2505 self.logger.error(logging_text + msg)
2506 else:
2507 self.logger.critical(logging_text + msg, exc_info=True)
2508 raise LcmException(msg)
2509 finally:
2510 if db_nsr_update:
2511 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2512
2513 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2514 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2515 base_folder, task_instantiation_info, stage):
2516 # launch instantiate_N2VC in a asyncio task and register task object
2517 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2518 # if not found, create one entry and update database
2519 # fill db_nsr._admin.deployed.VCA.<index>
2520
2521 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2522 if "execution-environment-list" in descriptor_config:
2523 ee_list = descriptor_config.get("execution-environment-list", [])
2524 else: # other types as script are not supported
2525 ee_list = []
2526
2527 for ee_item in ee_list:
2528 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2529 ee_item.get("helm-chart")))
2530 ee_descriptor_id = ee_item.get("id")
2531 if ee_item.get("juju"):
2532 vca_name = ee_item['juju'].get('charm')
2533 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2534 if ee_item['juju'].get('cloud') == "k8s":
2535 vca_type = "k8s_proxy_charm"
2536 elif ee_item['juju'].get('proxy') is False:
2537 vca_type = "native_charm"
2538 elif ee_item.get("helm-chart"):
2539 vca_name = ee_item['helm-chart']
2540 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2541 vca_type = "helm"
2542 else:
2543 vca_type = "helm-v3"
2544 else:
2545 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2546 continue
2547
2548 vca_index = -1
2549 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2550 if not vca_deployed:
2551 continue
2552 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2553 vca_deployed.get("vdu_id") == vdu_id and \
2554 vca_deployed.get("kdu_name") == kdu_name and \
2555 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2556 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2557 break
2558 else:
2559 # not found, create one.
2560 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2561 if vdu_id:
2562 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2563 elif kdu_name:
2564 target += "/kdu/{}".format(kdu_name)
2565 vca_deployed = {
2566 "target_element": target,
2567 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2568 "member-vnf-index": member_vnf_index,
2569 "vdu_id": vdu_id,
2570 "kdu_name": kdu_name,
2571 "vdu_count_index": vdu_index,
2572 "operational-status": "init", # TODO revise
2573 "detailed-status": "", # TODO revise
2574 "step": "initial-deploy", # TODO revise
2575 "vnfd_id": vnfd_id,
2576 "vdu_name": vdu_name,
2577 "type": vca_type,
2578 "ee_descriptor_id": ee_descriptor_id
2579 }
2580 vca_index += 1
2581
2582 # create VCA and configurationStatus in db
2583 db_dict = {
2584 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2585 "configurationStatus.{}".format(vca_index): dict()
2586 }
2587 self.update_db_2("nsrs", nsr_id, db_dict)
2588
2589 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2590
2591 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2592 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2593 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2594
2595 # Launch task
2596 task_n2vc = asyncio.ensure_future(
2597 self.instantiate_N2VC(
2598 logging_text=logging_text,
2599 vca_index=vca_index,
2600 nsi_id=nsi_id,
2601 db_nsr=db_nsr,
2602 db_vnfr=db_vnfr,
2603 vdu_id=vdu_id,
2604 kdu_name=kdu_name,
2605 vdu_index=vdu_index,
2606 deploy_params=deploy_params,
2607 config_descriptor=descriptor_config,
2608 base_folder=base_folder,
2609 nslcmop_id=nslcmop_id,
2610 stage=stage,
2611 vca_type=vca_type,
2612 vca_name=vca_name,
2613 ee_config_descriptor=ee_item
2614 )
2615 )
2616 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2617 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2618 member_vnf_index or "", vdu_id or "")
2619
2620 @staticmethod
2621 def _create_nslcmop(nsr_id, operation, params):
2622 """
2623 Creates a ns-lcm-opp content to be stored at database.
2624 :param nsr_id: internal id of the instance
2625 :param operation: instantiate, terminate, scale, action, ...
2626 :param params: user parameters for the operation
2627 :return: dictionary following SOL005 format
2628 """
2629 # Raise exception if invalid arguments
2630 if not (nsr_id and operation and params):
2631 raise LcmException(
2632 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2633 now = time()
2634 _id = str(uuid4())
2635 nslcmop = {
2636 "id": _id,
2637 "_id": _id,
2638 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2639 "operationState": "PROCESSING",
2640 "statusEnteredTime": now,
2641 "nsInstanceId": nsr_id,
2642 "lcmOperationType": operation,
2643 "startTime": now,
2644 "isAutomaticInvocation": False,
2645 "operationParams": params,
2646 "isCancelPending": False,
2647 "links": {
2648 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2649 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2650 }
2651 }
2652 return nslcmop
2653
2654 def _format_additional_params(self, params):
2655 params = params or {}
2656 for key, value in params.items():
2657 if str(value).startswith("!!yaml "):
2658 params[key] = yaml.safe_load(value[7:])
2659 return params
2660
2661 def _get_terminate_primitive_params(self, seq, vnf_index):
2662 primitive = seq.get('name')
2663 primitive_params = {}
2664 params = {
2665 "member_vnf_index": vnf_index,
2666 "primitive": primitive,
2667 "primitive_params": primitive_params,
2668 }
2669 desc_params = {}
2670 return self._map_primitive_params(seq, params, desc_params)
2671
2672 # sub-operations
2673
2674 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2675 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2676 if op.get('operationState') == 'COMPLETED':
2677 # b. Skip sub-operation
2678 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2679 return self.SUBOPERATION_STATUS_SKIP
2680 else:
2681 # c. retry executing sub-operation
2682 # The sub-operation exists, and operationState != 'COMPLETED'
2683 # Update operationState = 'PROCESSING' to indicate a retry.
2684 operationState = 'PROCESSING'
2685 detailed_status = 'In progress'
2686 self._update_suboperation_status(
2687 db_nslcmop, op_index, operationState, detailed_status)
2688 # Return the sub-operation index
2689 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2690 # with arguments extracted from the sub-operation
2691 return op_index
2692
2693 # Find a sub-operation where all keys in a matching dictionary must match
2694 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2695 def _find_suboperation(self, db_nslcmop, match):
2696 if db_nslcmop and match:
2697 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2698 for i, op in enumerate(op_list):
2699 if all(op.get(k) == match[k] for k in match):
2700 return i
2701 return self.SUBOPERATION_STATUS_NOT_FOUND
2702
2703 # Update status for a sub-operation given its index
2704 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2705 # Update DB for HA tasks
2706 q_filter = {'_id': db_nslcmop['_id']}
2707 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2708 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2709 self.db.set_one("nslcmops",
2710 q_filter=q_filter,
2711 update_dict=update_dict,
2712 fail_on_empty=False)
2713
2714 # Add sub-operation, return the index of the added sub-operation
2715 # Optionally, set operationState, detailed-status, and operationType
2716 # Status and type are currently set for 'scale' sub-operations:
2717 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2718 # 'detailed-status' : status message
2719 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2720 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2721 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2722 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2723 RO_nsr_id=None, RO_scaling_info=None):
2724 if not db_nslcmop:
2725 return self.SUBOPERATION_STATUS_NOT_FOUND
2726 # Get the "_admin.operations" list, if it exists
2727 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2728 op_list = db_nslcmop_admin.get('operations')
2729 # Create or append to the "_admin.operations" list
2730 new_op = {'member_vnf_index': vnf_index,
2731 'vdu_id': vdu_id,
2732 'vdu_count_index': vdu_count_index,
2733 'primitive': primitive,
2734 'primitive_params': mapped_primitive_params}
2735 if operationState:
2736 new_op['operationState'] = operationState
2737 if detailed_status:
2738 new_op['detailed-status'] = detailed_status
2739 if operationType:
2740 new_op['lcmOperationType'] = operationType
2741 if RO_nsr_id:
2742 new_op['RO_nsr_id'] = RO_nsr_id
2743 if RO_scaling_info:
2744 new_op['RO_scaling_info'] = RO_scaling_info
2745 if not op_list:
2746 # No existing operations, create key 'operations' with current operation as first list element
2747 db_nslcmop_admin.update({'operations': [new_op]})
2748 op_list = db_nslcmop_admin.get('operations')
2749 else:
2750 # Existing operations, append operation to list
2751 op_list.append(new_op)
2752
2753 db_nslcmop_update = {'_admin.operations': op_list}
2754 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2755 op_index = len(op_list) - 1
2756 return op_index
2757
2758 # Helper methods for scale() sub-operations
2759
2760 # pre-scale/post-scale:
2761 # Check for 3 different cases:
2762 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2763 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2764 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2765 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2766 operationType, RO_nsr_id=None, RO_scaling_info=None):
2767 # Find this sub-operation
2768 if RO_nsr_id and RO_scaling_info:
2769 operationType = 'SCALE-RO'
2770 match = {
2771 'member_vnf_index': vnf_index,
2772 'RO_nsr_id': RO_nsr_id,
2773 'RO_scaling_info': RO_scaling_info,
2774 }
2775 else:
2776 match = {
2777 'member_vnf_index': vnf_index,
2778 'primitive': vnf_config_primitive,
2779 'primitive_params': primitive_params,
2780 'lcmOperationType': operationType
2781 }
2782 op_index = self._find_suboperation(db_nslcmop, match)
2783 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2784 # a. New sub-operation
2785 # The sub-operation does not exist, add it.
2786 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2787 # The following parameters are set to None for all kind of scaling:
2788 vdu_id = None
2789 vdu_count_index = None
2790 vdu_name = None
2791 if RO_nsr_id and RO_scaling_info:
2792 vnf_config_primitive = None
2793 primitive_params = None
2794 else:
2795 RO_nsr_id = None
2796 RO_scaling_info = None
2797 # Initial status for sub-operation
2798 operationState = 'PROCESSING'
2799 detailed_status = 'In progress'
2800 # Add sub-operation for pre/post-scaling (zero or more operations)
2801 self._add_suboperation(db_nslcmop,
2802 vnf_index,
2803 vdu_id,
2804 vdu_count_index,
2805 vdu_name,
2806 vnf_config_primitive,
2807 primitive_params,
2808 operationState,
2809 detailed_status,
2810 operationType,
2811 RO_nsr_id,
2812 RO_scaling_info)
2813 return self.SUBOPERATION_STATUS_NEW
2814 else:
2815 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2816 # or op_index (operationState != 'COMPLETED')
2817 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2818
2819 # Function to return execution_environment id
2820
2821 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2822 # TODO vdu_index_count
2823 for vca in vca_deployed_list:
2824 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2825 return vca["ee_id"]
2826
2827 async def destroy_N2VC(
2828 self,
2829 logging_text,
2830 db_nslcmop,
2831 vca_deployed,
2832 config_descriptor,
2833 vca_index,
2834 destroy_ee=True,
2835 exec_primitives=True,
2836 scaling_in=False,
2837 vca_id: str = None,
2838 ):
2839 """
2840 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2841 :param logging_text:
2842 :param db_nslcmop:
2843 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2844 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2845 :param vca_index: index in the database _admin.deployed.VCA
2846 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2847 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2848 not executed properly
2849 :param scaling_in: True destroys the application, False destroys the model
2850 :return: None or exception
2851 """
2852
2853 self.logger.debug(
2854 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2855 vca_index, vca_deployed, config_descriptor, destroy_ee
2856 )
2857 )
2858
2859 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2860
2861 # execute terminate_primitives
2862 if exec_primitives:
2863 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2864 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2865 vdu_id = vca_deployed.get("vdu_id")
2866 vdu_count_index = vca_deployed.get("vdu_count_index")
2867 vdu_name = vca_deployed.get("vdu_name")
2868 vnf_index = vca_deployed.get("member-vnf-index")
2869 if terminate_primitives and vca_deployed.get("needed_terminate"):
2870 for seq in terminate_primitives:
2871 # For each sequence in list, get primitive and call _ns_execute_primitive()
2872 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2873 vnf_index, seq.get("name"))
2874 self.logger.debug(logging_text + step)
2875 # Create the primitive for each sequence, i.e. "primitive": "touch"
2876 primitive = seq.get('name')
2877 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2878
2879 # Add sub-operation
2880 self._add_suboperation(db_nslcmop,
2881 vnf_index,
2882 vdu_id,
2883 vdu_count_index,
2884 vdu_name,
2885 primitive,
2886 mapped_primitive_params)
2887 # Sub-operations: Call _ns_execute_primitive() instead of action()
2888 try:
2889 result, result_detail = await self._ns_execute_primitive(
2890 vca_deployed["ee_id"], primitive,
2891 mapped_primitive_params,
2892 vca_type=vca_type,
2893 vca_id=vca_id,
2894 )
2895 except LcmException:
2896 # this happens when VCA is not deployed. In this case it is not needed to terminate
2897 continue
2898 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2899 if result not in result_ok:
2900 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2901 "error {}".format(seq.get("name"), vnf_index, result_detail))
2902 # set that this VCA do not need terminated
2903 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2904 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2905
2906 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2907 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2908
2909 if destroy_ee:
2910 await self.vca_map[vca_type].delete_execution_environment(
2911 vca_deployed["ee_id"],
2912 scaling_in=scaling_in,
2913 vca_id=vca_id,
2914 )
2915
2916 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
2917 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2918 namespace = "." + db_nsr["_id"]
2919 try:
2920 await self.n2vc.delete_namespace(
2921 namespace=namespace,
2922 total_timeout=self.timeout_charm_delete,
2923 vca_id=vca_id,
2924 )
2925 except N2VCNotFound: # already deleted. Skip
2926 pass
2927 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2928
2929 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2930 """
2931 Terminates a deployment from RO
2932 :param logging_text:
2933 :param nsr_deployed: db_nsr._admin.deployed
2934 :param nsr_id:
2935 :param nslcmop_id:
2936 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2937 this method will update only the index 2, but it will write on database the concatenated content of the list
2938 :return:
2939 """
2940 db_nsr_update = {}
2941 failed_detail = []
2942 ro_nsr_id = ro_delete_action = None
2943 if nsr_deployed and nsr_deployed.get("RO"):
2944 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2945 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2946 try:
2947 if ro_nsr_id:
2948 stage[2] = "Deleting ns from VIM."
2949 db_nsr_update["detailed-status"] = " ".join(stage)
2950 self._write_op_status(nslcmop_id, stage)
2951 self.logger.debug(logging_text + stage[2])
2952 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2953 self._write_op_status(nslcmop_id, stage)
2954 desc = await self.RO.delete("ns", ro_nsr_id)
2955 ro_delete_action = desc["action_id"]
2956 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2957 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2958 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2959 if ro_delete_action:
2960 # wait until NS is deleted from VIM
2961 stage[2] = "Waiting ns deleted from VIM."
2962 detailed_status_old = None
2963 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2964 ro_delete_action))
2965 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2966 self._write_op_status(nslcmop_id, stage)
2967
2968 delete_timeout = 20 * 60 # 20 minutes
2969 while delete_timeout > 0:
2970 desc = await self.RO.show(
2971 "ns",
2972 item_id_name=ro_nsr_id,
2973 extra_item="action",
2974 extra_item_id=ro_delete_action)
2975
2976 # deploymentStatus
2977 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2978
2979 ns_status, ns_status_info = self.RO.check_action_status(desc)
2980 if ns_status == "ERROR":
2981 raise ROclient.ROClientException(ns_status_info)
2982 elif ns_status == "BUILD":
2983 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2984 elif ns_status == "ACTIVE":
2985 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2986 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2987 break
2988 else:
2989 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2990 if stage[2] != detailed_status_old:
2991 detailed_status_old = stage[2]
2992 db_nsr_update["detailed-status"] = " ".join(stage)
2993 self._write_op_status(nslcmop_id, stage)
2994 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2995 await asyncio.sleep(5, loop=self.loop)
2996 delete_timeout -= 5
2997 else: # delete_timeout <= 0:
2998 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2999
3000 except Exception as e:
3001 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3002 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3003 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3004 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3005 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3006 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3007 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3008 failed_detail.append("delete conflict: {}".format(e))
3009 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3010 else:
3011 failed_detail.append("delete error: {}".format(e))
3012 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3013
3014 # Delete nsd
3015 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3016 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3017 try:
3018 stage[2] = "Deleting nsd from RO."
3019 db_nsr_update["detailed-status"] = " ".join(stage)
3020 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3021 self._write_op_status(nslcmop_id, stage)
3022 await self.RO.delete("nsd", ro_nsd_id)
3023 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3024 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3025 except Exception as e:
3026 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3027 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3028 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3029 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3030 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3031 self.logger.debug(logging_text + failed_detail[-1])
3032 else:
3033 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3034 self.logger.error(logging_text + failed_detail[-1])
3035
3036 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3037 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3038 if not vnf_deployed or not vnf_deployed["id"]:
3039 continue
3040 try:
3041 ro_vnfd_id = vnf_deployed["id"]
3042 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3043 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3044 db_nsr_update["detailed-status"] = " ".join(stage)
3045 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3046 self._write_op_status(nslcmop_id, stage)
3047 await self.RO.delete("vnfd", ro_vnfd_id)
3048 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3049 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3050 except Exception as e:
3051 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3052 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3053 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3054 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3055 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3056 self.logger.debug(logging_text + failed_detail[-1])
3057 else:
3058 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3059 self.logger.error(logging_text + failed_detail[-1])
3060
3061 if failed_detail:
3062 stage[2] = "Error deleting from VIM"
3063 else:
3064 stage[2] = "Deleted from VIM"
3065 db_nsr_update["detailed-status"] = " ".join(stage)
3066 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3067 self._write_op_status(nslcmop_id, stage)
3068
3069 if failed_detail:
3070 raise LcmException("; ".join(failed_detail))
3071
3072 async def terminate(self, nsr_id, nslcmop_id):
3073 # Try to lock HA task here
3074 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3075 if not task_is_locked_by_me:
3076 return
3077
3078 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3079 self.logger.debug(logging_text + "Enter")
3080 timeout_ns_terminate = self.timeout_ns_terminate
3081 db_nsr = None
3082 db_nslcmop = None
3083 operation_params = None
3084 exc = None
3085 error_list = [] # annotates all failed error messages
3086 db_nslcmop_update = {}
3087 autoremove = False # autoremove after terminated
3088 tasks_dict_info = {}
3089 db_nsr_update = {}
3090 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3091 # ^ contains [stage, step, VIM-status]
3092 try:
3093 # wait for any previous tasks in process
3094 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3095
3096 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3097 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3098 operation_params = db_nslcmop.get("operationParams") or {}
3099 if operation_params.get("timeout_ns_terminate"):
3100 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3101 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3102 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3103
3104 db_nsr_update["operational-status"] = "terminating"
3105 db_nsr_update["config-status"] = "terminating"
3106 self._write_ns_status(
3107 nsr_id=nsr_id,
3108 ns_state="TERMINATING",
3109 current_operation="TERMINATING",
3110 current_operation_id=nslcmop_id,
3111 other_update=db_nsr_update
3112 )
3113 self._write_op_status(
3114 op_id=nslcmop_id,
3115 queuePosition=0,
3116 stage=stage
3117 )
3118 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3119 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3120 return
3121
3122 stage[1] = "Getting vnf descriptors from db."
3123 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3124 db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list}
3125 db_vnfds_from_id = {}
3126 db_vnfds_from_member_index = {}
3127 # Loop over VNFRs
3128 for vnfr in db_vnfrs_list:
3129 vnfd_id = vnfr["vnfd-id"]
3130 if vnfd_id not in db_vnfds_from_id:
3131 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3132 db_vnfds_from_id[vnfd_id] = vnfd
3133 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3134
3135 # Destroy individual execution environments when there are terminating primitives.
3136 # Rest of EE will be deleted at once
3137 # TODO - check before calling _destroy_N2VC
3138 # if not operation_params.get("skip_terminate_primitives"):#
3139 # or not vca.get("needed_terminate"):
3140 stage[0] = "Stage 2/3 execute terminating primitives."
3141 self.logger.debug(logging_text + stage[0])
3142 stage[1] = "Looking execution environment that needs terminate."
3143 self.logger.debug(logging_text + stage[1])
3144
3145 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3146 config_descriptor = None
3147
3148 vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr)
3149 if not vca or not vca.get("ee_id"):
3150 continue
3151 if not vca.get("member-vnf-index"):
3152 # ns
3153 config_descriptor = db_nsr.get("ns-configuration")
3154 elif vca.get("vdu_id"):
3155 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3156 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3157 elif vca.get("kdu_name"):
3158 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3159 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3160 else:
3161 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3162 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3163 vca_type = vca.get("type")
3164 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3165 vca.get("needed_terminate"))
3166 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3167 # pending native charms
3168 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3169 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3170 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3171 task = asyncio.ensure_future(
3172 self.destroy_N2VC(
3173 logging_text,
3174 db_nslcmop,
3175 vca,
3176 config_descriptor,
3177 vca_index,
3178 destroy_ee,
3179 exec_terminate_primitives,
3180 vca_id=vca_id,
3181 )
3182 )
3183 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3184
3185 # wait for pending tasks of terminate primitives
3186 if tasks_dict_info:
3187 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3188 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3189 min(self.timeout_charm_delete, timeout_ns_terminate),
3190 stage, nslcmop_id)
3191 tasks_dict_info.clear()
3192 if error_list:
3193 return # raise LcmException("; ".join(error_list))
3194
3195 # remove All execution environments at once
3196 stage[0] = "Stage 3/3 delete all."
3197
3198 if nsr_deployed.get("VCA"):
3199 stage[1] = "Deleting all execution environments."
3200 self.logger.debug(logging_text + stage[1])
3201 vca_id = self.get_vca_id({}, db_nsr)
3202 task_delete_ee = asyncio.ensure_future(
3203 asyncio.wait_for(
3204 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
3205 timeout=self.timeout_charm_delete
3206 )
3207 )
3208 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3209 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3210
3211 # Delete from k8scluster
3212 stage[1] = "Deleting KDUs."
3213 self.logger.debug(logging_text + stage[1])
3214 # print(nsr_deployed)
3215 for kdu in get_iterable(nsr_deployed, "K8s"):
3216 if not kdu or not kdu.get("kdu-instance"):
3217 continue
3218 kdu_instance = kdu.get("kdu-instance")
3219 if kdu.get("k8scluster-type") in self.k8scluster_map:
3220 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
3221 vca_id = self.get_vca_id({}, db_nsr)
3222 task_delete_kdu_instance = asyncio.ensure_future(
3223 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3224 cluster_uuid=kdu.get("k8scluster-uuid"),
3225 kdu_instance=kdu_instance,
3226 vca_id=vca_id,
3227 )
3228 )
3229 else:
3230 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3231 format(kdu.get("k8scluster-type")))
3232 continue
3233 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3234
3235 # remove from RO
3236 stage[1] = "Deleting ns from VIM."
3237 if self.ng_ro:
3238 task_delete_ro = asyncio.ensure_future(
3239 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3240 else:
3241 task_delete_ro = asyncio.ensure_future(
3242 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3243 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3244
3245 # rest of staff will be done at finally
3246
3247 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3248 self.logger.error(logging_text + "Exit Exception {}".format(e))
3249 exc = e
3250 except asyncio.CancelledError:
3251 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3252 exc = "Operation was cancelled"
3253 except Exception as e:
3254 exc = traceback.format_exc()
3255 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3256 finally:
3257 if exc:
3258 error_list.append(str(exc))
3259 try:
3260 # wait for pending tasks
3261 if tasks_dict_info:
3262 stage[1] = "Waiting for terminate pending tasks."
3263 self.logger.debug(logging_text + stage[1])
3264 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3265 stage, nslcmop_id)
3266 stage[1] = stage[2] = ""
3267 except asyncio.CancelledError:
3268 error_list.append("Cancelled")
3269 # TODO cancell all tasks
3270 except Exception as exc:
3271 error_list.append(str(exc))
3272 # update status at database
3273 if error_list:
3274 error_detail = "; ".join(error_list)
3275 # self.logger.error(logging_text + error_detail)
3276 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3277 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3278
3279 db_nsr_update["operational-status"] = "failed"
3280 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3281 db_nslcmop_update["detailed-status"] = error_detail
3282 nslcmop_operation_state = "FAILED"
3283 ns_state = "BROKEN"
3284 else:
3285 error_detail = None
3286 error_description_nsr = error_description_nslcmop = None
3287 ns_state = "NOT_INSTANTIATED"
3288 db_nsr_update["operational-status"] = "terminated"
3289 db_nsr_update["detailed-status"] = "Done"
3290 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3291 db_nslcmop_update["detailed-status"] = "Done"
3292 nslcmop_operation_state = "COMPLETED"
3293
3294 if db_nsr:
3295 self._write_ns_status(
3296 nsr_id=nsr_id,
3297 ns_state=ns_state,
3298 current_operation="IDLE",
3299 current_operation_id=None,
3300 error_description=error_description_nsr,
3301 error_detail=error_detail,
3302 other_update=db_nsr_update
3303 )
3304 self._write_op_status(
3305 op_id=nslcmop_id,
3306 stage="",
3307 error_message=error_description_nslcmop,
3308 operation_state=nslcmop_operation_state,
3309 other_update=db_nslcmop_update,
3310 )
3311 if ns_state == "NOT_INSTANTIATED":
3312 try:
3313 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3314 except DbException as e:
3315 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3316 format(nsr_id, e))
3317 if operation_params:
3318 autoremove = operation_params.get("autoremove", False)
3319 if nslcmop_operation_state:
3320 try:
3321 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3322 "operationState": nslcmop_operation_state,
3323 "autoremove": autoremove},
3324 loop=self.loop)
3325 except Exception as e:
3326 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3327
3328 self.logger.debug(logging_text + "Exit")
3329 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3330
3331 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3332 time_start = time()
3333 error_detail_list = []
3334 error_list = []
3335 pending_tasks = list(created_tasks_info.keys())
3336 num_tasks = len(pending_tasks)
3337 num_done = 0
3338 stage[1] = "{}/{}.".format(num_done, num_tasks)
3339 self._write_op_status(nslcmop_id, stage)
3340 while pending_tasks:
3341 new_error = None
3342 _timeout = timeout + time_start - time()
3343 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3344 return_when=asyncio.FIRST_COMPLETED)
3345 num_done += len(done)
3346 if not done: # Timeout
3347 for task in pending_tasks:
3348 new_error = created_tasks_info[task] + ": Timeout"
3349 error_detail_list.append(new_error)
3350 error_list.append(new_error)
3351 break
3352 for task in done:
3353 if task.cancelled():
3354 exc = "Cancelled"
3355 else:
3356 exc = task.exception()
3357 if exc:
3358 if isinstance(exc, asyncio.TimeoutError):
3359 exc = "Timeout"
3360 new_error = created_tasks_info[task] + ": {}".format(exc)
3361 error_list.append(created_tasks_info[task])
3362 error_detail_list.append(new_error)
3363 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3364 K8sException, NgRoException)):
3365 self.logger.error(logging_text + new_error)
3366 else:
3367 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3368 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3369 else:
3370 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3371 stage[1] = "{}/{}.".format(num_done, num_tasks)
3372 if new_error:
3373 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3374 if nsr_id: # update also nsr
3375 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3376 "errorDetail": ". ".join(error_detail_list)})
3377 self._write_op_status(nslcmop_id, stage)
3378 return error_detail_list
3379
3380 @staticmethod
3381 def _map_primitive_params(primitive_desc, params, instantiation_params):
3382 """
3383 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3384 The default-value is used. If it is between < > it look for a value at instantiation_params
3385 :param primitive_desc: portion of VNFD/NSD that describes primitive
3386 :param params: Params provided by user
3387 :param instantiation_params: Instantiation params provided by user
3388 :return: a dictionary with the calculated params
3389 """
3390 calculated_params = {}
3391 for parameter in primitive_desc.get("parameter", ()):
3392 param_name = parameter["name"]
3393 if param_name in params:
3394 calculated_params[param_name] = params[param_name]
3395 elif "default-value" in parameter or "value" in parameter:
3396 if "value" in parameter:
3397 calculated_params[param_name] = parameter["value"]
3398 else:
3399 calculated_params[param_name] = parameter["default-value"]
3400 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3401 and calculated_params[param_name].endswith(">"):
3402 if calculated_params[param_name][1:-1] in instantiation_params:
3403 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3404 else:
3405 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3406 format(calculated_params[param_name], primitive_desc["name"]))
3407 else:
3408 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3409 format(param_name, primitive_desc["name"]))
3410
3411 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3412 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3413 default_flow_style=True, width=256)
3414 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3415 calculated_params[param_name] = calculated_params[param_name][7:]
3416 if parameter.get("data-type") == "INTEGER":
3417 try:
3418 calculated_params[param_name] = int(calculated_params[param_name])
3419 except ValueError: # error converting string to int
3420 raise LcmException(
3421 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3422 elif parameter.get("data-type") == "BOOLEAN":
3423 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3424
3425 # add always ns_config_info if primitive name is config
3426 if primitive_desc["name"] == "config":
3427 if "ns_config_info" in instantiation_params:
3428 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3429 return calculated_params
3430
3431 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3432 ee_descriptor_id=None):
3433 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3434 for vca in deployed_vca:
3435 if not vca:
3436 continue
3437 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3438 continue
3439 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3440 continue
3441 if kdu_name and kdu_name != vca["kdu_name"]:
3442 continue
3443 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3444 continue
3445 break
3446 else:
3447 # vca_deployed not found
3448 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3449 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3450 ee_descriptor_id))
3451 # get ee_id
3452 ee_id = vca.get("ee_id")
3453 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3454 if not ee_id:
3455 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3456 "execution environment"
3457 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3458 return ee_id, vca_type
3459
3460 async def _ns_execute_primitive(
3461 self,
3462 ee_id,
3463 primitive,
3464 primitive_params,
3465 retries=0,
3466 retries_interval=30,
3467 timeout=None,
3468 vca_type=None,
3469 db_dict=None,
3470 vca_id: str = None,
3471 ) -> (str, str):
3472 try:
3473 if primitive == "config":
3474 primitive_params = {"params": primitive_params}
3475
3476 vca_type = vca_type or "lxc_proxy_charm"
3477
3478 while retries >= 0:
3479 try:
3480 output = await asyncio.wait_for(
3481 self.vca_map[vca_type].exec_primitive(
3482 ee_id=ee_id,
3483 primitive_name=primitive,
3484 params_dict=primitive_params,
3485 progress_timeout=self.timeout_progress_primitive,
3486 total_timeout=self.timeout_primitive,
3487 db_dict=db_dict,
3488 vca_id=vca_id,
3489 ),
3490 timeout=timeout or self.timeout_primitive)
3491 # execution was OK
3492 break
3493 except asyncio.CancelledError:
3494 raise
3495 except Exception as e: # asyncio.TimeoutError
3496 if isinstance(e, asyncio.TimeoutError):
3497 e = "Timeout"
3498 retries -= 1
3499 if retries >= 0:
3500 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3501 # wait and retry
3502 await asyncio.sleep(retries_interval, loop=self.loop)
3503 else:
3504 return 'FAILED', str(e)
3505
3506 return 'COMPLETED', output
3507
3508 except (LcmException, asyncio.CancelledError):
3509 raise
3510 except Exception as e:
3511 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3512
3513 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3514 """
3515 Updating the vca_status with latest juju information in nsrs record
3516 :param: nsr_id: Id of the nsr
3517 :param: nslcmop_id: Id of the nslcmop
3518 :return: None
3519 """
3520
3521 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3522 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3523 vca_id = self.get_vca_id({}, db_nsr)
3524 if db_nsr['_admin']['deployed']['K8s']:
3525 for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
3526 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
3527 await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id)
3528 else:
3529 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3530 table, filter = "nsrs", {"_id": nsr_id}
3531 path = "_admin.deployed.VCA.{}.".format(vca_index)
3532 await self._on_update_n2vc_db(table, filter, path, {})
3533
3534 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3535 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3536
3537 async def action(self, nsr_id, nslcmop_id):
3538 # Try to lock HA task here
3539 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3540 if not task_is_locked_by_me:
3541 return
3542
3543 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3544 self.logger.debug(logging_text + "Enter")
3545 # get all needed from database
3546 db_nsr = None
3547 db_nslcmop = None
3548 db_nsr_update = {}
3549 db_nslcmop_update = {}
3550 nslcmop_operation_state = None
3551 error_description_nslcmop = None
3552 exc = None
3553 try:
3554 # wait for any previous tasks in process
3555 step = "Waiting for previous operations to terminate"
3556 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3557
3558 self._write_ns_status(
3559 nsr_id=nsr_id,
3560 ns_state=None,
3561 current_operation="RUNNING ACTION",
3562 current_operation_id=nslcmop_id
3563 )
3564
3565 step = "Getting information from database"
3566 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3567 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3568
3569 nsr_deployed = db_nsr["_admin"].get("deployed")
3570 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3571 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3572 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3573 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3574 primitive = db_nslcmop["operationParams"]["primitive"]
3575 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3576 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3577
3578 if vnf_index:
3579 step = "Getting vnfr from database"
3580 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3581 step = "Getting vnfd from database"
3582 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3583 else:
3584 step = "Getting nsd from database"
3585 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3586
3587 vca_id = self.get_vca_id(db_vnfr, db_nsr)
3588 # for backward compatibility
3589 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3590 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3591 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3592 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3593
3594 # look for primitive
3595 config_primitive_desc = descriptor_configuration = None
3596 if vdu_id:
3597 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3598 elif kdu_name:
3599 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3600 elif vnf_index:
3601 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3602 else:
3603 descriptor_configuration = db_nsd.get("ns-configuration")
3604
3605 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3606 for config_primitive in descriptor_configuration["config-primitive"]:
3607 if config_primitive["name"] == primitive:
3608 config_primitive_desc = config_primitive
3609 break
3610
3611 if not config_primitive_desc:
3612 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3613 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3614 format(primitive))
3615 primitive_name = primitive
3616 ee_descriptor_id = None
3617 else:
3618 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3619 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3620
3621 if vnf_index:
3622 if vdu_id:
3623 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3624 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3625 elif kdu_name:
3626 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3627 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3628 else:
3629 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3630 else:
3631 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3632 if kdu_name and get_configuration(db_vnfd, kdu_name):
3633 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3634 actions = set()
3635 for primitive in kdu_configuration.get("initial-config-primitive", []):
3636 actions.add(primitive["name"])
3637 for primitive in kdu_configuration.get("config-primitive", []):
3638 actions.add(primitive["name"])
3639 kdu_action = True if primitive_name in actions else False
3640
3641 # TODO check if ns is in a proper status
3642 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3643 # kdur and desc_params already set from before
3644 if primitive_params:
3645 desc_params.update(primitive_params)
3646 # TODO Check if we will need something at vnf level
3647 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3648 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3649 break
3650 else:
3651 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3652
3653 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3654 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3655 raise LcmException(msg)
3656
3657 db_dict = {"collection": "nsrs",
3658 "filter": {"_id": nsr_id},
3659 "path": "_admin.deployed.K8s.{}".format(index)}
3660 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3661 step = "Executing kdu {}".format(primitive_name)
3662 if primitive_name == "upgrade":
3663 if desc_params.get("kdu_model"):
3664 kdu_model = desc_params.get("kdu_model")
3665 del desc_params["kdu_model"]
3666 else:
3667 kdu_model = kdu.get("kdu-model")
3668 parts = kdu_model.split(sep=":")
3669 if len(parts) == 2:
3670 kdu_model = parts[0]
3671
3672 detailed_status = await asyncio.wait_for(
3673 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3674 cluster_uuid=kdu.get("k8scluster-uuid"),
3675 kdu_instance=kdu.get("kdu-instance"),
3676 atomic=True, kdu_model=kdu_model,
3677 params=desc_params, db_dict=db_dict,
3678 timeout=timeout_ns_action),
3679 timeout=timeout_ns_action + 10)
3680 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3681 elif primitive_name == "rollback":
3682 detailed_status = await asyncio.wait_for(
3683 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3684 cluster_uuid=kdu.get("k8scluster-uuid"),
3685 kdu_instance=kdu.get("kdu-instance"),
3686 db_dict=db_dict),
3687 timeout=timeout_ns_action)
3688 elif primitive_name == "status":
3689 detailed_status = await asyncio.wait_for(
3690 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3691 cluster_uuid=kdu.get("k8scluster-uuid"),
3692 kdu_instance=kdu.get("kdu-instance"),
3693 vca_id=vca_id,
3694 ),
3695 timeout=timeout_ns_action
3696 )
3697 else:
3698 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3699 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3700
3701 detailed_status = await asyncio.wait_for(
3702 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3703 cluster_uuid=kdu.get("k8scluster-uuid"),
3704 kdu_instance=kdu_instance,
3705 primitive_name=primitive_name,
3706 params=params, db_dict=db_dict,
3707 timeout=timeout_ns_action,
3708 vca_id=vca_id,
3709 ),
3710 timeout=timeout_ns_action
3711 )
3712
3713 if detailed_status:
3714 nslcmop_operation_state = 'COMPLETED'
3715 else:
3716 detailed_status = ''
3717 nslcmop_operation_state = 'FAILED'
3718 else:
3719 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3720 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3721 ee_descriptor_id=ee_descriptor_id)
3722 for vca_index, vca_deployed in enumerate(db_nsr['_admin']['deployed']['VCA']):
3723 if vca_deployed.get("member-vnf-index") == vnf_index:
3724 db_dict = {"collection": "nsrs",
3725 "filter": {"_id": nsr_id},
3726 "path": "_admin.deployed.VCA.{}.".format(vca_index)}
3727 break
3728 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3729 ee_id,
3730 primitive=primitive_name,
3731 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3732 timeout=timeout_ns_action,
3733 vca_type=vca_type,
3734 db_dict=db_dict,
3735 vca_id=vca_id,
3736 )
3737
3738 db_nslcmop_update["detailed-status"] = detailed_status
3739 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3740 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3741 detailed_status))
3742 return # database update is called inside finally
3743
3744 except (DbException, LcmException, N2VCException, K8sException) as e:
3745 self.logger.error(logging_text + "Exit Exception {}".format(e))
3746 exc = e
3747 except asyncio.CancelledError:
3748 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3749 exc = "Operation was cancelled"
3750 except asyncio.TimeoutError:
3751 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3752 exc = "Timeout"
3753 except Exception as e:
3754 exc = traceback.format_exc()
3755 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3756 finally:
3757 if exc:
3758 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3759 "FAILED {}: {}".format(step, exc)
3760 nslcmop_operation_state = "FAILED"
3761 if db_nsr:
3762 self._write_ns_status(
3763 nsr_id=nsr_id,
3764 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3765 current_operation="IDLE",
3766 current_operation_id=None,
3767 # error_description=error_description_nsr,
3768 # error_detail=error_detail,
3769 other_update=db_nsr_update
3770 )
3771
3772 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3773 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3774
3775 if nslcmop_operation_state:
3776 try:
3777 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3778 "operationState": nslcmop_operation_state},
3779 loop=self.loop)
3780 except Exception as e:
3781 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3782 self.logger.debug(logging_text + "Exit")
3783 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3784 return nslcmop_operation_state, detailed_status
3785
3786 async def scale(self, nsr_id, nslcmop_id):
3787 # Try to lock HA task here
3788 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3789 if not task_is_locked_by_me:
3790 return
3791
3792 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3793 stage = ['', '', '']
3794 tasks_dict_info = {}
3795 # ^ stage, step, VIM progress
3796 self.logger.debug(logging_text + "Enter")
3797 # get all needed from database
3798 db_nsr = None
3799 db_nslcmop_update = {}
3800 db_nsr_update = {}
3801 exc = None
3802 # in case of error, indicates what part of scale was failed to put nsr at error status
3803 scale_process = None
3804 old_operational_status = ""
3805 old_config_status = ""
3806 nsi_id = None
3807 try:
3808 # wait for any previous tasks in process
3809 step = "Waiting for previous operations to terminate"
3810 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3811 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3812 current_operation="SCALING", current_operation_id=nslcmop_id)
3813
3814 step = "Getting nslcmop from database"
3815 self.logger.debug(step + " after having waited for previous tasks to be completed")
3816 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3817
3818 step = "Getting nsr from database"
3819 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3820 old_operational_status = db_nsr["operational-status"]
3821 old_config_status = db_nsr["config-status"]
3822
3823 step = "Parsing scaling parameters"
3824 db_nsr_update["operational-status"] = "scaling"
3825 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3826 nsr_deployed = db_nsr["_admin"].get("deployed")
3827
3828 #######
3829 nsr_deployed = db_nsr["_admin"].get("deployed")
3830 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3831 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3832 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3833 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3834 #######
3835
3836 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3837 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3838 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3839 # for backward compatibility
3840 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3841 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3842 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3843 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3844
3845 step = "Getting vnfr from database"
3846 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3847
3848 vca_id = self.get_vca_id(db_vnfr, db_nsr)
3849
3850 step = "Getting vnfd from database"
3851 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3852
3853 base_folder = db_vnfd["_admin"]["storage"]
3854
3855 step = "Getting scaling-group-descriptor"
3856 scaling_descriptor = find_in_list(
3857 get_scaling_aspect(
3858 db_vnfd
3859 ),
3860 lambda scale_desc: scale_desc["name"] == scaling_group
3861 )
3862 if not scaling_descriptor:
3863 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3864 "at vnfd:scaling-group-descriptor".format(scaling_group))
3865
3866 step = "Sending scale order to VIM"
3867 # TODO check if ns is in a proper status
3868 nb_scale_op = 0
3869 if not db_nsr["_admin"].get("scaling-group"):
3870 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3871 admin_scale_index = 0
3872 else:
3873 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3874 if admin_scale_info["name"] == scaling_group:
3875 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3876 break
3877 else: # not found, set index one plus last element and add new entry with the name
3878 admin_scale_index += 1
3879 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3880 RO_scaling_info = []
3881 VCA_scaling_info = []
3882 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3883 if scaling_type == "SCALE_OUT":
3884 if "aspect-delta-details" not in scaling_descriptor:
3885 raise LcmException(
3886 "Aspect delta details not fount in scaling descriptor {}".format(
3887 scaling_descriptor["name"]
3888 )
3889 )
3890 # count if max-instance-count is reached
3891 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3892
3893 vdu_scaling_info["scaling_direction"] = "OUT"
3894 vdu_scaling_info["vdu-create"] = {}
3895 for delta in deltas:
3896 for vdu_delta in delta["vdu-delta"]:
3897 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3898 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3899 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3900 if cloud_init_text:
3901 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3902 cloud_init_list = []
3903
3904 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3905 max_instance_count = 10
3906 if vdu_profile and "max-number-of-instances" in vdu_profile:
3907 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3908
3909 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3910
3911 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3912
3913 if nb_scale_op + default_instance_num > max_instance_count:
3914 raise LcmException(
3915 "reached the limit of {} (max-instance-count) "
3916 "scaling-out operations for the "
3917 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3918 )
3919 for x in range(vdu_delta.get("number-of-instances", 1)):
3920 if cloud_init_text:
3921 # TODO Information of its own ip is not available because db_vnfr is not updated.
3922 additional_params["OSM"] = get_osm_params(
3923 db_vnfr,
3924 vdu_delta["id"],
3925 vdu_index + x
3926 )
3927 cloud_init_list.append(
3928 self._parse_cloud_init(
3929 cloud_init_text,
3930 additional_params,
3931 db_vnfd["id"],
3932 vdud["id"]
3933 )
3934 )
3935 VCA_scaling_info.append(
3936 {
3937 "osm_vdu_id": vdu_delta["id"],
3938 "member-vnf-index": vnf_index,
3939 "type": "create",
3940 "vdu_index": vdu_index + x
3941 }
3942 )
3943 RO_scaling_info.append(
3944 {
3945 "osm_vdu_id": vdu_delta["id"],
3946 "member-vnf-index": vnf_index,
3947 "type": "create",
3948 "count": vdu_delta.get("number-of-instances", 1)
3949 }
3950 )
3951 if cloud_init_list:
3952 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3953 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3954
3955 elif scaling_type == "SCALE_IN":
3956 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3957 min_instance_count = int(scaling_descriptor["min-instance-count"])
3958
3959 vdu_scaling_info["scaling_direction"] = "IN"
3960 vdu_scaling_info["vdu-delete"] = {}
3961 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3962 for delta in deltas:
3963 for vdu_delta in delta["vdu-delta"]:
3964 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3965 min_instance_count = 0
3966 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3967 if vdu_profile and "min-number-of-instances" in vdu_profile:
3968 min_instance_count = vdu_profile["min-number-of-instances"]
3969
3970 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3971
3972 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3973 if nb_scale_op + default_instance_num < min_instance_count:
3974 raise LcmException(
3975 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3976 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3977 )
3978 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3979 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3980 "vdu_index": vdu_index - 1})
3981 for x in range(vdu_delta.get("number-of-instances", 1)):
3982 VCA_scaling_info.append(
3983 {
3984 "osm_vdu_id": vdu_delta["id"],
3985 "member-vnf-index": vnf_index,
3986 "type": "delete",
3987 "vdu_index": vdu_index - 1 - x
3988 }
3989 )
3990 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3991
3992 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3993 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3994 if vdu_scaling_info["scaling_direction"] == "IN":
3995 for vdur in reversed(db_vnfr["vdur"]):
3996 if vdu_delete.get(vdur["vdu-id-ref"]):
3997 vdu_delete[vdur["vdu-id-ref"]] -= 1
3998 vdu_scaling_info["vdu"].append({
3999 "name": vdur.get("name") or vdur.get("vdu-name"),
4000 "vdu_id": vdur["vdu-id-ref"],
4001 "interface": []
4002 })
4003 for interface in vdur["interfaces"]:
4004 vdu_scaling_info["vdu"][-1]["interface"].append({
4005 "name": interface["name"],
4006 "ip_address": interface["ip-address"],
4007 "mac_address": interface.get("mac-address"),
4008 })
4009 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
4010
4011 # PRE-SCALE BEGIN
4012 step = "Executing pre-scale vnf-config-primitive"
4013 if scaling_descriptor.get("scaling-config-action"):
4014 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4015 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4016 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4017 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4018 step = db_nslcmop_update["detailed-status"] = \
4019 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4020
4021 # look for primitive
4022 for config_primitive in (get_configuration(
4023 db_vnfd, db_vnfd["id"]
4024 ) or {}).get("config-primitive", ()):
4025 if config_primitive["name"] == vnf_config_primitive:
4026 break
4027 else:
4028 raise LcmException(
4029 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4030 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4031 "primitive".format(scaling_group, vnf_config_primitive))
4032
4033 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4034 if db_vnfr.get("additionalParamsForVnf"):
4035 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4036
4037 scale_process = "VCA"
4038 db_nsr_update["config-status"] = "configuring pre-scaling"
4039 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4040
4041 # Pre-scale retry check: Check if this sub-operation has been executed before
4042 op_index = self._check_or_add_scale_suboperation(
4043 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4044 if op_index == self.SUBOPERATION_STATUS_SKIP:
4045 # Skip sub-operation
4046 result = 'COMPLETED'
4047 result_detail = 'Done'
4048 self.logger.debug(logging_text +
4049 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4050 vnf_config_primitive, result, result_detail))
4051 else:
4052 if op_index == self.SUBOPERATION_STATUS_NEW:
4053 # New sub-operation: Get index of this sub-operation
4054 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4055 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4056 format(vnf_config_primitive))
4057 else:
4058 # retry: Get registered params for this existing sub-operation
4059 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4060 vnf_index = op.get('member_vnf_index')
4061 vnf_config_primitive = op.get('primitive')
4062 primitive_params = op.get('primitive_params')
4063 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4064 format(vnf_config_primitive))
4065 # Execute the primitive, either with new (first-time) or registered (reintent) args
4066 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4067 primitive_name = config_primitive.get("execution-environment-primitive",
4068 vnf_config_primitive)
4069 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4070 member_vnf_index=vnf_index,
4071 vdu_id=None,
4072 vdu_count_index=None,
4073 ee_descriptor_id=ee_descriptor_id)
4074 result, result_detail = await self._ns_execute_primitive(
4075 ee_id, primitive_name,
4076 primitive_params,
4077 vca_type=vca_type,
4078 vca_id=vca_id,
4079 )
4080 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4081 vnf_config_primitive, result, result_detail))
4082 # Update operationState = COMPLETED | FAILED
4083 self._update_suboperation_status(
4084 db_nslcmop, op_index, result, result_detail)
4085
4086 if result == "FAILED":
4087 raise LcmException(result_detail)
4088 db_nsr_update["config-status"] = old_config_status
4089 scale_process = None
4090 # PRE-SCALE END
4091
4092 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4093 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4094
4095 # SCALE-IN VCA - BEGIN
4096 if VCA_scaling_info:
4097 step = db_nslcmop_update["detailed-status"] = \
4098 "Deleting the execution environments"
4099 scale_process = "VCA"
4100 for vdu_info in VCA_scaling_info:
4101 if vdu_info["type"] == "delete":
4102 member_vnf_index = str(vdu_info["member-vnf-index"])
4103 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4104 vdu_id = vdu_info["osm_vdu_id"]
4105 vdu_index = int(vdu_info["vdu_index"])
4106 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4107 member_vnf_index, vdu_id, vdu_index)
4108 stage[2] = step = "Scaling in VCA"
4109 self._write_op_status(
4110 op_id=nslcmop_id,
4111 stage=stage
4112 )
4113 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
4114 config_update = db_nsr["configurationStatus"]
4115 for vca_index, vca in enumerate(vca_update):
4116 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
4117 vca["vdu_count_index"] == vdu_index:
4118 if vca.get("vdu_id"):
4119 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4120 elif vca.get("kdu_name"):
4121 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4122 else:
4123 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4124 operation_params = db_nslcmop.get("operationParams") or {}
4125 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
4126 vca.get("needed_terminate"))
4127 task = asyncio.ensure_future(
4128 asyncio.wait_for(
4129 self.destroy_N2VC(
4130 logging_text,
4131 db_nslcmop,
4132 vca,
4133 config_descriptor,
4134 vca_index,
4135 destroy_ee=True,
4136 exec_primitives=exec_terminate_primitives,
4137 scaling_in=True,
4138 vca_id=vca_id,
4139 ),
4140 timeout=self.timeout_charm_delete
4141 )
4142 )
4143 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4144 del vca_update[vca_index]
4145 del config_update[vca_index]
4146 # wait for pending tasks of terminate primitives
4147 if tasks_dict_info:
4148 self.logger.debug(logging_text +
4149 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
4150 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
4151 min(self.timeout_charm_delete,
4152 self.timeout_ns_terminate),
4153 stage, nslcmop_id)
4154 tasks_dict_info.clear()
4155 if error_list:
4156 raise LcmException("; ".join(error_list))
4157
4158 db_vca_and_config_update = {
4159 "_admin.deployed.VCA": vca_update,
4160 "configurationStatus": config_update
4161 }
4162 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
4163 scale_process = None
4164 # SCALE-IN VCA - END
4165
4166 # SCALE RO - BEGIN
4167 if RO_scaling_info:
4168 scale_process = "RO"
4169 if self.ro_config.get("ng"):
4170 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4171 vdu_scaling_info.pop("vdu-create", None)
4172 vdu_scaling_info.pop("vdu-delete", None)
4173
4174 scale_process = None
4175 if db_nsr_update:
4176 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4177 # SCALE RO - END
4178
4179 # SCALE-UP VCA - BEGIN
4180 if VCA_scaling_info:
4181 step = db_nslcmop_update["detailed-status"] = \
4182 "Creating new execution environments"
4183 scale_process = "VCA"
4184 for vdu_info in VCA_scaling_info:
4185 if vdu_info["type"] == "create":
4186 member_vnf_index = str(vdu_info["member-vnf-index"])
4187 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4188 vnfd_id = db_vnfr["vnfd-ref"]
4189 vdu_index = int(vdu_info["vdu_index"])
4190 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4191 if db_vnfr.get("additionalParamsForVnf"):
4192 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4193 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4194 if descriptor_config:
4195 vdu_id = None
4196 vdu_name = None
4197 kdu_name = None
4198 self._deploy_n2vc(
4199 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4200 db_nsr=db_nsr,
4201 db_vnfr=db_vnfr,
4202 nslcmop_id=nslcmop_id,
4203 nsr_id=nsr_id,
4204 nsi_id=nsi_id,
4205 vnfd_id=vnfd_id,
4206 vdu_id=vdu_id,
4207 kdu_name=kdu_name,
4208 member_vnf_index=member_vnf_index,
4209 vdu_index=vdu_index,
4210 vdu_name=vdu_name,
4211 deploy_params=deploy_params,
4212 descriptor_config=descriptor_config,
4213 base_folder=base_folder,
4214 task_instantiation_info=tasks_dict_info,
4215 stage=stage
4216 )
4217 vdu_id = vdu_info["osm_vdu_id"]
4218 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4219 descriptor_config = get_configuration(db_vnfd, vdu_id)
4220 if vdur.get("additionalParams"):
4221 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4222 else:
4223 deploy_params_vdu = deploy_params
4224 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4225 if descriptor_config:
4226 vdu_name = None
4227 kdu_name = None
4228 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4229 member_vnf_index, vdu_id, vdu_index)
4230 stage[2] = step = "Scaling out VCA"
4231 self._write_op_status(
4232 op_id=nslcmop_id,
4233 stage=stage
4234 )
4235 self._deploy_n2vc(
4236 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4237 member_vnf_index, vdu_id, vdu_index),
4238 db_nsr=db_nsr,
4239 db_vnfr=db_vnfr,
4240 nslcmop_id=nslcmop_id,
4241 nsr_id=nsr_id,
4242 nsi_id=nsi_id,
4243 vnfd_id=vnfd_id,
4244 vdu_id=vdu_id,
4245 kdu_name=kdu_name,
4246 member_vnf_index=member_vnf_index,
4247 vdu_index=vdu_index,
4248 vdu_name=vdu_name,
4249 deploy_params=deploy_params_vdu,
4250 descriptor_config=descriptor_config,
4251 base_folder=base_folder,
4252 task_instantiation_info=tasks_dict_info,
4253 stage=stage
4254 )
4255 # SCALE-UP VCA - END
4256 scale_process = None
4257
4258 # POST-SCALE BEGIN
4259 # execute primitive service POST-SCALING
4260 step = "Executing post-scale vnf-config-primitive"
4261 if scaling_descriptor.get("scaling-config-action"):
4262 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4263 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4264 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4265 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4266 step = db_nslcmop_update["detailed-status"] = \
4267 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4268
4269 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4270 if db_vnfr.get("additionalParamsForVnf"):
4271 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4272
4273 # look for primitive
4274 for config_primitive in (
4275 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4276 ).get("config-primitive", ()):
4277 if config_primitive["name"] == vnf_config_primitive:
4278 break
4279 else:
4280 raise LcmException(
4281 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4282 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4283 "config-primitive".format(scaling_group, vnf_config_primitive))
4284 scale_process = "VCA"
4285 db_nsr_update["config-status"] = "configuring post-scaling"
4286 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4287
4288 # Post-scale retry check: Check if this sub-operation has been executed before
4289 op_index = self._check_or_add_scale_suboperation(
4290 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4291 if op_index == self.SUBOPERATION_STATUS_SKIP:
4292 # Skip sub-operation
4293 result = 'COMPLETED'
4294 result_detail = 'Done'
4295 self.logger.debug(logging_text +
4296 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4297 format(vnf_config_primitive, result, result_detail))
4298 else:
4299 if op_index == self.SUBOPERATION_STATUS_NEW:
4300 # New sub-operation: Get index of this sub-operation
4301 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4302 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4303 format(vnf_config_primitive))
4304 else:
4305 # retry: Get registered params for this existing sub-operation
4306 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4307 vnf_index = op.get('member_vnf_index')
4308 vnf_config_primitive = op.get('primitive')
4309 primitive_params = op.get('primitive_params')
4310 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4311 format(vnf_config_primitive))
4312 # Execute the primitive, either with new (first-time) or registered (reintent) args
4313 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4314 primitive_name = config_primitive.get("execution-environment-primitive",
4315 vnf_config_primitive)
4316 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4317 member_vnf_index=vnf_index,
4318 vdu_id=None,
4319 vdu_count_index=None,
4320 ee_descriptor_id=ee_descriptor_id)
4321 result, result_detail = await self._ns_execute_primitive(
4322 ee_id,
4323 primitive_name,
4324 primitive_params,
4325 vca_type=vca_type,
4326 vca_id=vca_id,
4327 )
4328 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4329 vnf_config_primitive, result, result_detail))
4330 # Update operationState = COMPLETED | FAILED
4331 self._update_suboperation_status(
4332 db_nslcmop, op_index, result, result_detail)
4333
4334 if result == "FAILED":
4335 raise LcmException(result_detail)
4336 db_nsr_update["config-status"] = old_config_status
4337 scale_process = None
4338 # POST-SCALE END
4339
4340 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4341 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4342 else old_operational_status
4343 db_nsr_update["config-status"] = old_config_status
4344 return
4345 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4346 self.logger.error(logging_text + "Exit Exception {}".format(e))
4347 exc = e
4348 except asyncio.CancelledError:
4349 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4350 exc = "Operation was cancelled"
4351 except Exception as e:
4352 exc = traceback.format_exc()
4353 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4354 finally:
4355 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4356 if tasks_dict_info:
4357 stage[1] = "Waiting for instantiate pending tasks."
4358 self.logger.debug(logging_text + stage[1])
4359 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4360 stage, nslcmop_id, nsr_id=nsr_id)
4361 if exc:
4362 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4363 nslcmop_operation_state = "FAILED"
4364 if db_nsr:
4365 db_nsr_update["operational-status"] = old_operational_status
4366 db_nsr_update["config-status"] = old_config_status
4367 db_nsr_update["detailed-status"] = ""
4368 if scale_process:
4369 if "VCA" in scale_process:
4370 db_nsr_update["config-status"] = "failed"
4371 if "RO" in scale_process:
4372 db_nsr_update["operational-status"] = "failed"
4373 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4374 exc)
4375 else:
4376 error_description_nslcmop = None
4377 nslcmop_operation_state = "COMPLETED"
4378 db_nslcmop_update["detailed-status"] = "Done"
4379
4380 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4381 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4382 if db_nsr:
4383 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4384 current_operation_id=None, other_update=db_nsr_update)
4385
4386 if nslcmop_operation_state:
4387 try:
4388 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4389 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4390 except Exception as e:
4391 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4392 self.logger.debug(logging_text + "Exit")
4393 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4394
4395 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4396 nsr_id = db_nslcmop["nsInstanceId"]
4397 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4398 db_vnfrs = {}
4399
4400 # read from db: vnfd's for every vnf
4401 db_vnfds = []
4402
4403 # for each vnf in ns, read vnfd
4404 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4405 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4406 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4407 # if we haven't this vnfd, read it from db
4408 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4409 # read from db
4410 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4411 db_vnfds.append(vnfd)
4412 n2vc_key = self.n2vc.get_public_key()
4413 n2vc_key_list = [n2vc_key]
4414 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4415 mark_delete=True)
4416 # db_vnfr has been updated, update db_vnfrs to use it
4417 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4418 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4419 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4420 timeout_ns_deploy=self.timeout_ns_deploy)
4421 if vdu_scaling_info.get("vdu-delete"):
4422 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4423
4424 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4425 if not self.prometheus:
4426 return
4427 # look if exist a file called 'prometheus*.j2' and
4428 artifact_content = self.fs.dir_ls(artifact_path)
4429 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4430 if not job_file:
4431 return
4432 with self.fs.file_open((artifact_path, job_file), "r") as f:
4433 job_data = f.read()
4434
4435 # TODO get_service
4436 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4437 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4438 host_port = "80"
4439 vnfr_id = vnfr_id.replace("-", "")
4440 variables = {
4441 "JOB_NAME": vnfr_id,
4442 "TARGET_IP": target_ip,
4443 "EXPORTER_POD_IP": host_name,
4444 "EXPORTER_POD_PORT": host_port,
4445 }
4446 job_list = self.prometheus.parse_job(job_data, variables)
4447 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4448 for job in job_list:
4449 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4450 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4451 job["nsr_id"] = nsr_id
4452 job_dict = {jl["job_name"]: jl for jl in job_list}
4453 if await self.prometheus.update(job_dict):
4454 return list(job_dict.keys())
4455
4456 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4457 """
4458 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4459
4460 :param: vim_account_id: VIM Account ID
4461
4462 :return: (cloud_name, cloud_credential)
4463 """
4464 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4465 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4466
4467 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4468 """
4469 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4470
4471 :param: vim_account_id: VIM Account ID
4472
4473 :return: (cloud_name, cloud_credential)
4474 """
4475 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4476 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")