Bug 1951 fixed to update the deploy_params_kdu dict instead of overwriting it
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :return: none
339 """
340
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
343
344 try:
345 nsr_id = filter.get("_id")
346
347 # get vca status for NS
348 vca_status = await self.k8sclusterjuju.status_kdu(
349 cluster_uuid,
350 kdu_instance,
351 complete_status=True,
352 yaml_format=False,
353 vca_id=vca_id,
354 )
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 await self.k8sclusterjuju.update_vca_status(
360 db_dict["vcaStatus"],
361 kdu_instance,
362 vca_id=vca_id,
363 )
364
365 # write to database
366 self.update_db_2("nsrs", nsr_id, db_dict)
367
368 except (asyncio.CancelledError, asyncio.TimeoutError):
369 raise
370 except Exception as e:
371 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
372
373 @staticmethod
374 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
375 try:
376 env = Environment(undefined=StrictUndefined)
377 template = env.from_string(cloud_init_text)
378 return template.render(additional_params or {})
379 except UndefinedError as e:
380 raise LcmException(
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
384 )
385 except (TemplateError, TemplateNotFound) as e:
386 raise LcmException(
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
388 vnfd_id, vdu_id, e
389 )
390 )
391
392 def _get_vdu_cloud_init_content(self, vdu, vnfd):
393 cloud_init_content = cloud_init_file = None
394 try:
395 if vdu.get("cloud-init-file"):
396 base_folder = vnfd["_admin"]["storage"]
397 cloud_init_file = "{}/{}/cloud_init/{}".format(
398 base_folder["folder"],
399 base_folder["pkg-dir"],
400 vdu["cloud-init-file"],
401 )
402 with self.fs.file_open(cloud_init_file, "r") as ci_file:
403 cloud_init_content = ci_file.read()
404 elif vdu.get("cloud-init"):
405 cloud_init_content = vdu["cloud-init"]
406
407 return cloud_init_content
408 except FsException as e:
409 raise LcmException(
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd["id"], vdu["id"], cloud_init_file, e
412 )
413 )
414
415 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
416 vdur = next(
417 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
418 )
419 additional_params = vdur.get("additionalParams")
420 return parse_yaml_strings(additional_params)
421
422 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
423 """
424 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
425 :param vnfd: input vnfd
426 :param new_id: overrides vnf id if provided
427 :param additionalParams: Instantiation params for VNFs provided
428 :param nsrId: Id of the NSR
429 :return: copy of vnfd
430 """
431 vnfd_RO = deepcopy(vnfd)
432 # remove unused by RO configuration, monitoring, scaling and internal keys
433 vnfd_RO.pop("_id", None)
434 vnfd_RO.pop("_admin", None)
435 vnfd_RO.pop("monitoring-param", None)
436 vnfd_RO.pop("scaling-group-descriptor", None)
437 vnfd_RO.pop("kdu", None)
438 vnfd_RO.pop("k8s-cluster", None)
439 if new_id:
440 vnfd_RO["id"] = new_id
441
442 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
443 for vdu in get_iterable(vnfd_RO, "vdu"):
444 vdu.pop("cloud-init-file", None)
445 vdu.pop("cloud-init", None)
446 return vnfd_RO
447
448 @staticmethod
449 def ip_profile_2_RO(ip_profile):
450 RO_ip_profile = deepcopy(ip_profile)
451 if "dns-server" in RO_ip_profile:
452 if isinstance(RO_ip_profile["dns-server"], list):
453 RO_ip_profile["dns-address"] = []
454 for ds in RO_ip_profile.pop("dns-server"):
455 RO_ip_profile["dns-address"].append(ds["address"])
456 else:
457 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
458 if RO_ip_profile.get("ip-version") == "ipv4":
459 RO_ip_profile["ip-version"] = "IPv4"
460 if RO_ip_profile.get("ip-version") == "ipv6":
461 RO_ip_profile["ip-version"] = "IPv6"
462 if "dhcp-params" in RO_ip_profile:
463 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
464 return RO_ip_profile
465
466 def _get_ro_vim_id_for_vim_account(self, vim_account):
467 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
468 if db_vim["_admin"]["operationalState"] != "ENABLED":
469 raise LcmException(
470 "VIM={} is not available. operationalState={}".format(
471 vim_account, db_vim["_admin"]["operationalState"]
472 )
473 )
474 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
475 return RO_vim_id
476
477 def get_ro_wim_id_for_wim_account(self, wim_account):
478 if isinstance(wim_account, str):
479 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
480 if db_wim["_admin"]["operationalState"] != "ENABLED":
481 raise LcmException(
482 "WIM={} is not available. operationalState={}".format(
483 wim_account, db_wim["_admin"]["operationalState"]
484 )
485 )
486 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
487 return RO_wim_id
488 else:
489 return wim_account
490
491 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
492
493 db_vdu_push_list = []
494 db_update = {"_admin.modified": time()}
495 if vdu_create:
496 for vdu_id, vdu_count in vdu_create.items():
497 vdur = next(
498 (
499 vdur
500 for vdur in reversed(db_vnfr["vdur"])
501 if vdur["vdu-id-ref"] == vdu_id
502 ),
503 None,
504 )
505 if not vdur:
506 raise LcmException(
507 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
508 vdu_id
509 )
510 )
511
512 for count in range(vdu_count):
513 vdur_copy = deepcopy(vdur)
514 vdur_copy["status"] = "BUILD"
515 vdur_copy["status-detailed"] = None
516 vdur_copy["ip-address"] = None
517 vdur_copy["_id"] = str(uuid4())
518 vdur_copy["count-index"] += count + 1
519 vdur_copy["id"] = "{}-{}".format(
520 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
521 )
522 vdur_copy.pop("vim_info", None)
523 for iface in vdur_copy["interfaces"]:
524 if iface.get("fixed-ip"):
525 iface["ip-address"] = self.increment_ip_mac(
526 iface["ip-address"], count + 1
527 )
528 else:
529 iface.pop("ip-address", None)
530 if iface.get("fixed-mac"):
531 iface["mac-address"] = self.increment_ip_mac(
532 iface["mac-address"], count + 1
533 )
534 else:
535 iface.pop("mac-address", None)
536 iface.pop(
537 "mgmt_vnf", None
538 ) # only first vdu can be managment of vnf
539 db_vdu_push_list.append(vdur_copy)
540 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
541 if vdu_delete:
542 for vdu_id, vdu_count in vdu_delete.items():
543 if mark_delete:
544 indexes_to_delete = [
545 iv[0]
546 for iv in enumerate(db_vnfr["vdur"])
547 if iv[1]["vdu-id-ref"] == vdu_id
548 ]
549 db_update.update(
550 {
551 "vdur.{}.status".format(i): "DELETING"
552 for i in indexes_to_delete[-vdu_count:]
553 }
554 )
555 else:
556 # it must be deleted one by one because common.db does not allow otherwise
557 vdus_to_delete = [
558 v
559 for v in reversed(db_vnfr["vdur"])
560 if v["vdu-id-ref"] == vdu_id
561 ]
562 for vdu in vdus_to_delete[:vdu_count]:
563 self.db.set_one(
564 "vnfrs",
565 {"_id": db_vnfr["_id"]},
566 None,
567 pull={"vdur": {"_id": vdu["_id"]}},
568 )
569 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
570 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
571 # modify passed dictionary db_vnfr
572 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
573 db_vnfr["vdur"] = db_vnfr_["vdur"]
574
575 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
576 """
577 Updates database nsr with the RO info for the created vld
578 :param ns_update_nsr: dictionary to be filled with the updated info
579 :param db_nsr: content of db_nsr. This is also modified
580 :param nsr_desc_RO: nsr descriptor from RO
581 :return: Nothing, LcmException is raised on errors
582 """
583
584 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
585 for net_RO in get_iterable(nsr_desc_RO, "nets"):
586 if vld["id"] != net_RO.get("ns_net_osm_id"):
587 continue
588 vld["vim-id"] = net_RO.get("vim_net_id")
589 vld["name"] = net_RO.get("vim_name")
590 vld["status"] = net_RO.get("status")
591 vld["status-detailed"] = net_RO.get("error_msg")
592 ns_update_nsr["vld.{}".format(vld_index)] = vld
593 break
594 else:
595 raise LcmException(
596 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
597 )
598
599 def set_vnfr_at_error(self, db_vnfrs, error_text):
600 try:
601 for db_vnfr in db_vnfrs.values():
602 vnfr_update = {"status": "ERROR"}
603 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
604 if "status" not in vdur:
605 vdur["status"] = "ERROR"
606 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
607 if error_text:
608 vdur["status-detailed"] = str(error_text)
609 vnfr_update[
610 "vdur.{}.status-detailed".format(vdu_index)
611 ] = "ERROR"
612 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
613 except DbException as e:
614 self.logger.error("Cannot update vnf. {}".format(e))
615
616 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
617 """
618 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
619 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
620 :param nsr_desc_RO: nsr descriptor from RO
621 :return: Nothing, LcmException is raised on errors
622 """
623 for vnf_index, db_vnfr in db_vnfrs.items():
624 for vnf_RO in nsr_desc_RO["vnfs"]:
625 if vnf_RO["member_vnf_index"] != vnf_index:
626 continue
627 vnfr_update = {}
628 if vnf_RO.get("ip_address"):
629 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
630 "ip_address"
631 ].split(";")[0]
632 elif not db_vnfr.get("ip-address"):
633 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
634 raise LcmExceptionNoMgmtIP(
635 "ns member_vnf_index '{}' has no IP address".format(
636 vnf_index
637 )
638 )
639
640 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
641 vdur_RO_count_index = 0
642 if vdur.get("pdu-type"):
643 continue
644 for vdur_RO in get_iterable(vnf_RO, "vms"):
645 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
646 continue
647 if vdur["count-index"] != vdur_RO_count_index:
648 vdur_RO_count_index += 1
649 continue
650 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
651 if vdur_RO.get("ip_address"):
652 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
653 else:
654 vdur["ip-address"] = None
655 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
656 vdur["name"] = vdur_RO.get("vim_name")
657 vdur["status"] = vdur_RO.get("status")
658 vdur["status-detailed"] = vdur_RO.get("error_msg")
659 for ifacer in get_iterable(vdur, "interfaces"):
660 for interface_RO in get_iterable(vdur_RO, "interfaces"):
661 if ifacer["name"] == interface_RO.get("internal_name"):
662 ifacer["ip-address"] = interface_RO.get(
663 "ip_address"
664 )
665 ifacer["mac-address"] = interface_RO.get(
666 "mac_address"
667 )
668 break
669 else:
670 raise LcmException(
671 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info".format(
673 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
674 )
675 )
676 vnfr_update["vdur.{}".format(vdu_index)] = vdur
677 break
678 else:
679 raise LcmException(
680 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
681 "VIM info".format(
682 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
683 )
684 )
685
686 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
687 for net_RO in get_iterable(nsr_desc_RO, "nets"):
688 if vld["id"] != net_RO.get("vnf_net_osm_id"):
689 continue
690 vld["vim-id"] = net_RO.get("vim_net_id")
691 vld["name"] = net_RO.get("vim_name")
692 vld["status"] = net_RO.get("status")
693 vld["status-detailed"] = net_RO.get("error_msg")
694 vnfr_update["vld.{}".format(vld_index)] = vld
695 break
696 else:
697 raise LcmException(
698 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
699 vnf_index, vld["id"]
700 )
701 )
702
703 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
704 break
705
706 else:
707 raise LcmException(
708 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
709 vnf_index
710 )
711 )
712
713 def _get_ns_config_info(self, nsr_id):
714 """
715 Generates a mapping between vnf,vdu elements and the N2VC id
716 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
717 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
718 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
719 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
720 """
721 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
722 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
723 mapping = {}
724 ns_config_info = {"osm-config-mapping": mapping}
725 for vca in vca_deployed_list:
726 if not vca["member-vnf-index"]:
727 continue
728 if not vca["vdu_id"]:
729 mapping[vca["member-vnf-index"]] = vca["application"]
730 else:
731 mapping[
732 "{}.{}.{}".format(
733 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
734 )
735 ] = vca["application"]
736 return ns_config_info
737
738 async def _instantiate_ng_ro(
739 self,
740 logging_text,
741 nsr_id,
742 nsd,
743 db_nsr,
744 db_nslcmop,
745 db_vnfrs,
746 db_vnfds,
747 n2vc_key_list,
748 stage,
749 start_deploy,
750 timeout_ns_deploy,
751 ):
752
753 db_vims = {}
754
755 def get_vim_account(vim_account_id):
756 nonlocal db_vims
757 if vim_account_id in db_vims:
758 return db_vims[vim_account_id]
759 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
760 db_vims[vim_account_id] = db_vim
761 return db_vim
762
763 # modify target_vld info with instantiation parameters
764 def parse_vld_instantiation_params(
765 target_vim, target_vld, vld_params, target_sdn
766 ):
767 if vld_params.get("ip-profile"):
768 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
769 "ip-profile"
770 ]
771 if vld_params.get("provider-network"):
772 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
773 "provider-network"
774 ]
775 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
776 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
777 "provider-network"
778 ]["sdn-ports"]
779 if vld_params.get("wimAccountId"):
780 target_wim = "wim:{}".format(vld_params["wimAccountId"])
781 target_vld["vim_info"][target_wim] = {}
782 for param in ("vim-network-name", "vim-network-id"):
783 if vld_params.get(param):
784 if isinstance(vld_params[param], dict):
785 for vim, vim_net in vld_params[param].items():
786 other_target_vim = "vim:" + vim
787 populate_dict(
788 target_vld["vim_info"],
789 (other_target_vim, param.replace("-", "_")),
790 vim_net,
791 )
792 else: # isinstance str
793 target_vld["vim_info"][target_vim][
794 param.replace("-", "_")
795 ] = vld_params[param]
796 if vld_params.get("common_id"):
797 target_vld["common_id"] = vld_params.get("common_id")
798
799 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
800 def update_ns_vld_target(target, ns_params):
801 for vnf_params in ns_params.get("vnf", ()):
802 if vnf_params.get("vimAccountId"):
803 target_vnf = next(
804 (
805 vnfr
806 for vnfr in db_vnfrs.values()
807 if vnf_params["member-vnf-index"]
808 == vnfr["member-vnf-index-ref"]
809 ),
810 None,
811 )
812 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
813 for a_index, a_vld in enumerate(target["ns"]["vld"]):
814 target_vld = find_in_list(
815 get_iterable(vdur, "interfaces"),
816 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
817 )
818 if target_vld:
819 if vnf_params.get("vimAccountId") not in a_vld.get(
820 "vim_info", {}
821 ):
822 target["ns"]["vld"][a_index].get("vim_info").update(
823 {
824 "vim:{}".format(vnf_params["vimAccountId"]): {
825 "vim_network_name": ""
826 }
827 }
828 )
829
830 nslcmop_id = db_nslcmop["_id"]
831 target = {
832 "name": db_nsr["name"],
833 "ns": {"vld": []},
834 "vnf": [],
835 "image": deepcopy(db_nsr["image"]),
836 "flavor": deepcopy(db_nsr["flavor"]),
837 "action_id": nslcmop_id,
838 "cloud_init_content": {},
839 }
840 for image in target["image"]:
841 image["vim_info"] = {}
842 for flavor in target["flavor"]:
843 flavor["vim_info"] = {}
844
845 if db_nslcmop.get("lcmOperationType") != "instantiate":
846 # get parameters of instantiation:
847 db_nslcmop_instantiate = self.db.get_list(
848 "nslcmops",
849 {
850 "nsInstanceId": db_nslcmop["nsInstanceId"],
851 "lcmOperationType": "instantiate",
852 },
853 )[-1]
854 ns_params = db_nslcmop_instantiate.get("operationParams")
855 else:
856 ns_params = db_nslcmop.get("operationParams")
857 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
858 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
859
860 cp2target = {}
861 for vld_index, vld in enumerate(db_nsr.get("vld")):
862 target_vim = "vim:{}".format(ns_params["vimAccountId"])
863 target_vld = {
864 "id": vld["id"],
865 "name": vld["name"],
866 "mgmt-network": vld.get("mgmt-network", False),
867 "type": vld.get("type"),
868 "vim_info": {
869 target_vim: {
870 "vim_network_name": vld.get("vim-network-name"),
871 "vim_account_id": ns_params["vimAccountId"],
872 }
873 },
874 }
875 # check if this network needs SDN assist
876 if vld.get("pci-interfaces"):
877 db_vim = get_vim_account(ns_params["vimAccountId"])
878 sdnc_id = db_vim["config"].get("sdn-controller")
879 if sdnc_id:
880 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
881 target_sdn = "sdn:{}".format(sdnc_id)
882 target_vld["vim_info"][target_sdn] = {
883 "sdn": True,
884 "target_vim": target_vim,
885 "vlds": [sdn_vld],
886 "type": vld.get("type"),
887 }
888
889 nsd_vnf_profiles = get_vnf_profiles(nsd)
890 for nsd_vnf_profile in nsd_vnf_profiles:
891 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
892 if cp["virtual-link-profile-id"] == vld["id"]:
893 cp2target[
894 "member_vnf:{}.{}".format(
895 cp["constituent-cpd-id"][0][
896 "constituent-base-element-id"
897 ],
898 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
899 )
900 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
901
902 # check at nsd descriptor, if there is an ip-profile
903 vld_params = {}
904 nsd_vlp = find_in_list(
905 get_virtual_link_profiles(nsd),
906 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
907 == vld["id"],
908 )
909 if (
910 nsd_vlp
911 and nsd_vlp.get("virtual-link-protocol-data")
912 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
913 ):
914 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
915 "l3-protocol-data"
916 ]
917 ip_profile_dest_data = {}
918 if "ip-version" in ip_profile_source_data:
919 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
920 "ip-version"
921 ]
922 if "cidr" in ip_profile_source_data:
923 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
924 "cidr"
925 ]
926 if "gateway-ip" in ip_profile_source_data:
927 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
928 "gateway-ip"
929 ]
930 if "dhcp-enabled" in ip_profile_source_data:
931 ip_profile_dest_data["dhcp-params"] = {
932 "enabled": ip_profile_source_data["dhcp-enabled"]
933 }
934 vld_params["ip-profile"] = ip_profile_dest_data
935
936 # update vld_params with instantiation params
937 vld_instantiation_params = find_in_list(
938 get_iterable(ns_params, "vld"),
939 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
940 )
941 if vld_instantiation_params:
942 vld_params.update(vld_instantiation_params)
943 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
944 target["ns"]["vld"].append(target_vld)
945 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
946 update_ns_vld_target(target, ns_params)
947
948 for vnfr in db_vnfrs.values():
949 vnfd = find_in_list(
950 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
951 )
952 vnf_params = find_in_list(
953 get_iterable(ns_params, "vnf"),
954 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
955 )
956 target_vnf = deepcopy(vnfr)
957 target_vim = "vim:{}".format(vnfr["vim-account-id"])
958 for vld in target_vnf.get("vld", ()):
959 # check if connected to a ns.vld, to fill target'
960 vnf_cp = find_in_list(
961 vnfd.get("int-virtual-link-desc", ()),
962 lambda cpd: cpd.get("id") == vld["id"],
963 )
964 if vnf_cp:
965 ns_cp = "member_vnf:{}.{}".format(
966 vnfr["member-vnf-index-ref"], vnf_cp["id"]
967 )
968 if cp2target.get(ns_cp):
969 vld["target"] = cp2target[ns_cp]
970
971 vld["vim_info"] = {
972 target_vim: {"vim_network_name": vld.get("vim-network-name")}
973 }
974 # check if this network needs SDN assist
975 target_sdn = None
976 if vld.get("pci-interfaces"):
977 db_vim = get_vim_account(vnfr["vim-account-id"])
978 sdnc_id = db_vim["config"].get("sdn-controller")
979 if sdnc_id:
980 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
981 target_sdn = "sdn:{}".format(sdnc_id)
982 vld["vim_info"][target_sdn] = {
983 "sdn": True,
984 "target_vim": target_vim,
985 "vlds": [sdn_vld],
986 "type": vld.get("type"),
987 }
988
989 # check at vnfd descriptor, if there is an ip-profile
990 vld_params = {}
991 vnfd_vlp = find_in_list(
992 get_virtual_link_profiles(vnfd),
993 lambda a_link_profile: a_link_profile["id"] == vld["id"],
994 )
995 if (
996 vnfd_vlp
997 and vnfd_vlp.get("virtual-link-protocol-data")
998 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
999 ):
1000 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1001 "l3-protocol-data"
1002 ]
1003 ip_profile_dest_data = {}
1004 if "ip-version" in ip_profile_source_data:
1005 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1006 "ip-version"
1007 ]
1008 if "cidr" in ip_profile_source_data:
1009 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1010 "cidr"
1011 ]
1012 if "gateway-ip" in ip_profile_source_data:
1013 ip_profile_dest_data[
1014 "gateway-address"
1015 ] = ip_profile_source_data["gateway-ip"]
1016 if "dhcp-enabled" in ip_profile_source_data:
1017 ip_profile_dest_data["dhcp-params"] = {
1018 "enabled": ip_profile_source_data["dhcp-enabled"]
1019 }
1020
1021 vld_params["ip-profile"] = ip_profile_dest_data
1022 # update vld_params with instantiation params
1023 if vnf_params:
1024 vld_instantiation_params = find_in_list(
1025 get_iterable(vnf_params, "internal-vld"),
1026 lambda i_vld: i_vld["name"] == vld["id"],
1027 )
1028 if vld_instantiation_params:
1029 vld_params.update(vld_instantiation_params)
1030 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1031
1032 vdur_list = []
1033 for vdur in target_vnf.get("vdur", ()):
1034 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1035 continue # This vdu must not be created
1036 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1037
1038 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1039
1040 if ssh_keys_all:
1041 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1042 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1043 if (
1044 vdu_configuration
1045 and vdu_configuration.get("config-access")
1046 and vdu_configuration.get("config-access").get("ssh-access")
1047 ):
1048 vdur["ssh-keys"] = ssh_keys_all
1049 vdur["ssh-access-required"] = vdu_configuration[
1050 "config-access"
1051 ]["ssh-access"]["required"]
1052 elif (
1053 vnf_configuration
1054 and vnf_configuration.get("config-access")
1055 and vnf_configuration.get("config-access").get("ssh-access")
1056 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1057 ):
1058 vdur["ssh-keys"] = ssh_keys_all
1059 vdur["ssh-access-required"] = vnf_configuration[
1060 "config-access"
1061 ]["ssh-access"]["required"]
1062 elif ssh_keys_instantiation and find_in_list(
1063 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1064 ):
1065 vdur["ssh-keys"] = ssh_keys_instantiation
1066
1067 self.logger.debug("NS > vdur > {}".format(vdur))
1068
1069 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1070 # cloud-init
1071 if vdud.get("cloud-init-file"):
1072 vdur["cloud-init"] = "{}:file:{}".format(
1073 vnfd["_id"], vdud.get("cloud-init-file")
1074 )
1075 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1076 if vdur["cloud-init"] not in target["cloud_init_content"]:
1077 base_folder = vnfd["_admin"]["storage"]
1078 cloud_init_file = "{}/{}/cloud_init/{}".format(
1079 base_folder["folder"],
1080 base_folder["pkg-dir"],
1081 vdud.get("cloud-init-file"),
1082 )
1083 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1084 target["cloud_init_content"][
1085 vdur["cloud-init"]
1086 ] = ci_file.read()
1087 elif vdud.get("cloud-init"):
1088 vdur["cloud-init"] = "{}:vdu:{}".format(
1089 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1090 )
1091 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1092 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1093 "cloud-init"
1094 ]
1095 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1096 deploy_params_vdu = self._format_additional_params(
1097 vdur.get("additionalParams") or {}
1098 )
1099 deploy_params_vdu["OSM"] = get_osm_params(
1100 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1101 )
1102 vdur["additionalParams"] = deploy_params_vdu
1103
1104 # flavor
1105 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1106 if target_vim not in ns_flavor["vim_info"]:
1107 ns_flavor["vim_info"][target_vim] = {}
1108
1109 # deal with images
1110 # in case alternative images are provided we must check if they should be applied
1111 # for the vim_type, modify the vim_type taking into account
1112 ns_image_id = int(vdur["ns-image-id"])
1113 if vdur.get("alt-image-ids"):
1114 db_vim = get_vim_account(vnfr["vim-account-id"])
1115 vim_type = db_vim["vim_type"]
1116 for alt_image_id in vdur.get("alt-image-ids"):
1117 ns_alt_image = target["image"][int(alt_image_id)]
1118 if vim_type == ns_alt_image.get("vim-type"):
1119 # must use alternative image
1120 self.logger.debug(
1121 "use alternative image id: {}".format(alt_image_id)
1122 )
1123 ns_image_id = alt_image_id
1124 vdur["ns-image-id"] = ns_image_id
1125 break
1126 ns_image = target["image"][int(ns_image_id)]
1127 if target_vim not in ns_image["vim_info"]:
1128 ns_image["vim_info"][target_vim] = {}
1129
1130 vdur["vim_info"] = {target_vim: {}}
1131 # instantiation parameters
1132 # if vnf_params:
1133 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1134 # vdud["id"]), None)
1135 vdur_list.append(vdur)
1136 target_vnf["vdur"] = vdur_list
1137 target["vnf"].append(target_vnf)
1138
1139 desc = await self.RO.deploy(nsr_id, target)
1140 self.logger.debug("RO return > {}".format(desc))
1141 action_id = desc["action_id"]
1142 await self._wait_ng_ro(
1143 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1144 )
1145
1146 # Updating NSR
1147 db_nsr_update = {
1148 "_admin.deployed.RO.operational-status": "running",
1149 "detailed-status": " ".join(stage),
1150 }
1151 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1152 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1153 self._write_op_status(nslcmop_id, stage)
1154 self.logger.debug(
1155 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1156 )
1157 return
1158
1159 async def _wait_ng_ro(
1160 self,
1161 nsr_id,
1162 action_id,
1163 nslcmop_id=None,
1164 start_time=None,
1165 timeout=600,
1166 stage=None,
1167 ):
1168 detailed_status_old = None
1169 db_nsr_update = {}
1170 start_time = start_time or time()
1171 while time() <= start_time + timeout:
1172 desc_status = await self.RO.status(nsr_id, action_id)
1173 self.logger.debug("Wait NG RO > {}".format(desc_status))
1174 if desc_status["status"] == "FAILED":
1175 raise NgRoException(desc_status["details"])
1176 elif desc_status["status"] == "BUILD":
1177 if stage:
1178 stage[2] = "VIM: ({})".format(desc_status["details"])
1179 elif desc_status["status"] == "DONE":
1180 if stage:
1181 stage[2] = "Deployed at VIM"
1182 break
1183 else:
1184 assert False, "ROclient.check_ns_status returns unknown {}".format(
1185 desc_status["status"]
1186 )
1187 if stage and nslcmop_id and stage[2] != detailed_status_old:
1188 detailed_status_old = stage[2]
1189 db_nsr_update["detailed-status"] = " ".join(stage)
1190 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1191 self._write_op_status(nslcmop_id, stage)
1192 await asyncio.sleep(15, loop=self.loop)
1193 else: # timeout_ns_deploy
1194 raise NgRoException("Timeout waiting ns to deploy")
1195
1196 async def _terminate_ng_ro(
1197 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1198 ):
1199 db_nsr_update = {}
1200 failed_detail = []
1201 action_id = None
1202 start_deploy = time()
1203 try:
1204 target = {
1205 "ns": {"vld": []},
1206 "vnf": [],
1207 "image": [],
1208 "flavor": [],
1209 "action_id": nslcmop_id,
1210 }
1211 desc = await self.RO.deploy(nsr_id, target)
1212 action_id = desc["action_id"]
1213 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1214 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1215 self.logger.debug(
1216 logging_text
1217 + "ns terminate action at RO. action_id={}".format(action_id)
1218 )
1219
1220 # wait until done
1221 delete_timeout = 20 * 60 # 20 minutes
1222 await self._wait_ng_ro(
1223 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1224 )
1225
1226 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1227 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1228 # delete all nsr
1229 await self.RO.delete(nsr_id)
1230 except Exception as e:
1231 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1232 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1233 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1234 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1235 self.logger.debug(
1236 logging_text + "RO_action_id={} already deleted".format(action_id)
1237 )
1238 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1239 failed_detail.append("delete conflict: {}".format(e))
1240 self.logger.debug(
1241 logging_text
1242 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1243 )
1244 else:
1245 failed_detail.append("delete error: {}".format(e))
1246 self.logger.error(
1247 logging_text
1248 + "RO_action_id={} delete error: {}".format(action_id, e)
1249 )
1250
1251 if failed_detail:
1252 stage[2] = "Error deleting from VIM"
1253 else:
1254 stage[2] = "Deleted from VIM"
1255 db_nsr_update["detailed-status"] = " ".join(stage)
1256 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1257 self._write_op_status(nslcmop_id, stage)
1258
1259 if failed_detail:
1260 raise LcmException("; ".join(failed_detail))
1261 return
1262
1263 async def instantiate_RO(
1264 self,
1265 logging_text,
1266 nsr_id,
1267 nsd,
1268 db_nsr,
1269 db_nslcmop,
1270 db_vnfrs,
1271 db_vnfds,
1272 n2vc_key_list,
1273 stage,
1274 ):
1275 """
1276 Instantiate at RO
1277 :param logging_text: preffix text to use at logging
1278 :param nsr_id: nsr identity
1279 :param nsd: database content of ns descriptor
1280 :param db_nsr: database content of ns record
1281 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1282 :param db_vnfrs:
1283 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1284 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1285 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1286 :return: None or exception
1287 """
1288 try:
1289 start_deploy = time()
1290 ns_params = db_nslcmop.get("operationParams")
1291 if ns_params and ns_params.get("timeout_ns_deploy"):
1292 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1293 else:
1294 timeout_ns_deploy = self.timeout.get(
1295 "ns_deploy", self.timeout_ns_deploy
1296 )
1297
1298 # Check for and optionally request placement optimization. Database will be updated if placement activated
1299 stage[2] = "Waiting for Placement."
1300 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1301 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1302 for vnfr in db_vnfrs.values():
1303 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1304 break
1305 else:
1306 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1307
1308 return await self._instantiate_ng_ro(
1309 logging_text,
1310 nsr_id,
1311 nsd,
1312 db_nsr,
1313 db_nslcmop,
1314 db_vnfrs,
1315 db_vnfds,
1316 n2vc_key_list,
1317 stage,
1318 start_deploy,
1319 timeout_ns_deploy,
1320 )
1321 except Exception as e:
1322 stage[2] = "ERROR deploying at VIM"
1323 self.set_vnfr_at_error(db_vnfrs, str(e))
1324 self.logger.error(
1325 "Error deploying at VIM {}".format(e),
1326 exc_info=not isinstance(
1327 e,
1328 (
1329 ROclient.ROClientException,
1330 LcmException,
1331 DbException,
1332 NgRoException,
1333 ),
1334 ),
1335 )
1336 raise
1337
1338 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1339 """
1340 Wait for kdu to be up, get ip address
1341 :param logging_text: prefix use for logging
1342 :param nsr_id:
1343 :param vnfr_id:
1344 :param kdu_name:
1345 :return: IP address
1346 """
1347
1348 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1349 nb_tries = 0
1350
1351 while nb_tries < 360:
1352 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1353 kdur = next(
1354 (
1355 x
1356 for x in get_iterable(db_vnfr, "kdur")
1357 if x.get("kdu-name") == kdu_name
1358 ),
1359 None,
1360 )
1361 if not kdur:
1362 raise LcmException(
1363 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1364 )
1365 if kdur.get("status"):
1366 if kdur["status"] in ("READY", "ENABLED"):
1367 return kdur.get("ip-address")
1368 else:
1369 raise LcmException(
1370 "target KDU={} is in error state".format(kdu_name)
1371 )
1372
1373 await asyncio.sleep(10, loop=self.loop)
1374 nb_tries += 1
1375 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1376
1377 async def wait_vm_up_insert_key_ro(
1378 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1379 ):
1380 """
1381 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1382 :param logging_text: prefix use for logging
1383 :param nsr_id:
1384 :param vnfr_id:
1385 :param vdu_id:
1386 :param vdu_index:
1387 :param pub_key: public ssh key to inject, None to skip
1388 :param user: user to apply the public ssh key
1389 :return: IP address
1390 """
1391
1392 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1393 ro_nsr_id = None
1394 ip_address = None
1395 nb_tries = 0
1396 target_vdu_id = None
1397 ro_retries = 0
1398
1399 while True:
1400
1401 ro_retries += 1
1402 if ro_retries >= 360: # 1 hour
1403 raise LcmException(
1404 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1405 )
1406
1407 await asyncio.sleep(10, loop=self.loop)
1408
1409 # get ip address
1410 if not target_vdu_id:
1411 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1412
1413 if not vdu_id: # for the VNF case
1414 if db_vnfr.get("status") == "ERROR":
1415 raise LcmException(
1416 "Cannot inject ssh-key because target VNF is in error state"
1417 )
1418 ip_address = db_vnfr.get("ip-address")
1419 if not ip_address:
1420 continue
1421 vdur = next(
1422 (
1423 x
1424 for x in get_iterable(db_vnfr, "vdur")
1425 if x.get("ip-address") == ip_address
1426 ),
1427 None,
1428 )
1429 else: # VDU case
1430 vdur = next(
1431 (
1432 x
1433 for x in get_iterable(db_vnfr, "vdur")
1434 if x.get("vdu-id-ref") == vdu_id
1435 and x.get("count-index") == vdu_index
1436 ),
1437 None,
1438 )
1439
1440 if (
1441 not vdur and len(db_vnfr.get("vdur", ())) == 1
1442 ): # If only one, this should be the target vdu
1443 vdur = db_vnfr["vdur"][0]
1444 if not vdur:
1445 raise LcmException(
1446 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1447 vnfr_id, vdu_id, vdu_index
1448 )
1449 )
1450 # New generation RO stores information at "vim_info"
1451 ng_ro_status = None
1452 target_vim = None
1453 if vdur.get("vim_info"):
1454 target_vim = next(
1455 t for t in vdur["vim_info"]
1456 ) # there should be only one key
1457 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1458 if (
1459 vdur.get("pdu-type")
1460 or vdur.get("status") == "ACTIVE"
1461 or ng_ro_status == "ACTIVE"
1462 ):
1463 ip_address = vdur.get("ip-address")
1464 if not ip_address:
1465 continue
1466 target_vdu_id = vdur["vdu-id-ref"]
1467 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1468 raise LcmException(
1469 "Cannot inject ssh-key because target VM is in error state"
1470 )
1471
1472 if not target_vdu_id:
1473 continue
1474
1475 # inject public key into machine
1476 if pub_key and user:
1477 self.logger.debug(logging_text + "Inserting RO key")
1478 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1479 if vdur.get("pdu-type"):
1480 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1481 return ip_address
1482 try:
1483 ro_vm_id = "{}-{}".format(
1484 db_vnfr["member-vnf-index-ref"], target_vdu_id
1485 ) # TODO add vdu_index
1486 if self.ng_ro:
1487 target = {
1488 "action": {
1489 "action": "inject_ssh_key",
1490 "key": pub_key,
1491 "user": user,
1492 },
1493 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1494 }
1495 desc = await self.RO.deploy(nsr_id, target)
1496 action_id = desc["action_id"]
1497 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1498 break
1499 else:
1500 # wait until NS is deployed at RO
1501 if not ro_nsr_id:
1502 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1503 ro_nsr_id = deep_get(
1504 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1505 )
1506 if not ro_nsr_id:
1507 continue
1508 result_dict = await self.RO.create_action(
1509 item="ns",
1510 item_id_name=ro_nsr_id,
1511 descriptor={
1512 "add_public_key": pub_key,
1513 "vms": [ro_vm_id],
1514 "user": user,
1515 },
1516 )
1517 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1518 if not result_dict or not isinstance(result_dict, dict):
1519 raise LcmException(
1520 "Unknown response from RO when injecting key"
1521 )
1522 for result in result_dict.values():
1523 if result.get("vim_result") == 200:
1524 break
1525 else:
1526 raise ROclient.ROClientException(
1527 "error injecting key: {}".format(
1528 result.get("description")
1529 )
1530 )
1531 break
1532 except NgRoException as e:
1533 raise LcmException(
1534 "Reaching max tries injecting key. Error: {}".format(e)
1535 )
1536 except ROclient.ROClientException as e:
1537 if not nb_tries:
1538 self.logger.debug(
1539 logging_text
1540 + "error injecting key: {}. Retrying until {} seconds".format(
1541 e, 20 * 10
1542 )
1543 )
1544 nb_tries += 1
1545 if nb_tries >= 20:
1546 raise LcmException(
1547 "Reaching max tries injecting key. Error: {}".format(e)
1548 )
1549 else:
1550 break
1551
1552 return ip_address
1553
1554 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1555 """
1556 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1557 """
1558 my_vca = vca_deployed_list[vca_index]
1559 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1560 # vdu or kdu: no dependencies
1561 return
1562 timeout = 300
1563 while timeout >= 0:
1564 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1565 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1566 configuration_status_list = db_nsr["configurationStatus"]
1567 for index, vca_deployed in enumerate(configuration_status_list):
1568 if index == vca_index:
1569 # myself
1570 continue
1571 if not my_vca.get("member-vnf-index") or (
1572 vca_deployed.get("member-vnf-index")
1573 == my_vca.get("member-vnf-index")
1574 ):
1575 internal_status = configuration_status_list[index].get("status")
1576 if internal_status == "READY":
1577 continue
1578 elif internal_status == "BROKEN":
1579 raise LcmException(
1580 "Configuration aborted because dependent charm/s has failed"
1581 )
1582 else:
1583 break
1584 else:
1585 # no dependencies, return
1586 return
1587 await asyncio.sleep(10)
1588 timeout -= 1
1589
1590 raise LcmException("Configuration aborted because dependent charm/s timeout")
1591
1592 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1593 vca_id = None
1594 if db_vnfr:
1595 vca_id = deep_get(db_vnfr, ("vca-id",))
1596 elif db_nsr:
1597 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1598 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1599 return vca_id
1600
1601 async def instantiate_N2VC(
1602 self,
1603 logging_text,
1604 vca_index,
1605 nsi_id,
1606 db_nsr,
1607 db_vnfr,
1608 vdu_id,
1609 kdu_name,
1610 vdu_index,
1611 config_descriptor,
1612 deploy_params,
1613 base_folder,
1614 nslcmop_id,
1615 stage,
1616 vca_type,
1617 vca_name,
1618 ee_config_descriptor,
1619 ):
1620 nsr_id = db_nsr["_id"]
1621 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1622 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1623 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1624 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1625 db_dict = {
1626 "collection": "nsrs",
1627 "filter": {"_id": nsr_id},
1628 "path": db_update_entry,
1629 }
1630 step = ""
1631 try:
1632
1633 element_type = "NS"
1634 element_under_configuration = nsr_id
1635
1636 vnfr_id = None
1637 if db_vnfr:
1638 vnfr_id = db_vnfr["_id"]
1639 osm_config["osm"]["vnf_id"] = vnfr_id
1640
1641 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1642
1643 if vca_type == "native_charm":
1644 index_number = 0
1645 else:
1646 index_number = vdu_index or 0
1647
1648 if vnfr_id:
1649 element_type = "VNF"
1650 element_under_configuration = vnfr_id
1651 namespace += ".{}-{}".format(vnfr_id, index_number)
1652 if vdu_id:
1653 namespace += ".{}-{}".format(vdu_id, index_number)
1654 element_type = "VDU"
1655 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1656 osm_config["osm"]["vdu_id"] = vdu_id
1657 elif kdu_name:
1658 namespace += ".{}".format(kdu_name)
1659 element_type = "KDU"
1660 element_under_configuration = kdu_name
1661 osm_config["osm"]["kdu_name"] = kdu_name
1662
1663 # Get artifact path
1664 artifact_path = "{}/{}/{}/{}".format(
1665 base_folder["folder"],
1666 base_folder["pkg-dir"],
1667 "charms"
1668 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1669 else "helm-charts",
1670 vca_name,
1671 )
1672
1673 self.logger.debug("Artifact path > {}".format(artifact_path))
1674
1675 # get initial_config_primitive_list that applies to this element
1676 initial_config_primitive_list = config_descriptor.get(
1677 "initial-config-primitive"
1678 )
1679
1680 self.logger.debug(
1681 "Initial config primitive list > {}".format(
1682 initial_config_primitive_list
1683 )
1684 )
1685
1686 # add config if not present for NS charm
1687 ee_descriptor_id = ee_config_descriptor.get("id")
1688 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1689 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1690 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1691 )
1692
1693 self.logger.debug(
1694 "Initial config primitive list #2 > {}".format(
1695 initial_config_primitive_list
1696 )
1697 )
1698 # n2vc_redesign STEP 3.1
1699 # find old ee_id if exists
1700 ee_id = vca_deployed.get("ee_id")
1701
1702 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1703 # create or register execution environment in VCA
1704 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1705
1706 self._write_configuration_status(
1707 nsr_id=nsr_id,
1708 vca_index=vca_index,
1709 status="CREATING",
1710 element_under_configuration=element_under_configuration,
1711 element_type=element_type,
1712 )
1713
1714 step = "create execution environment"
1715 self.logger.debug(logging_text + step)
1716
1717 ee_id = None
1718 credentials = None
1719 if vca_type == "k8s_proxy_charm":
1720 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1721 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1722 namespace=namespace,
1723 artifact_path=artifact_path,
1724 db_dict=db_dict,
1725 vca_id=vca_id,
1726 )
1727 elif vca_type == "helm" or vca_type == "helm-v3":
1728 ee_id, credentials = await self.vca_map[
1729 vca_type
1730 ].create_execution_environment(
1731 namespace=namespace,
1732 reuse_ee_id=ee_id,
1733 db_dict=db_dict,
1734 config=osm_config,
1735 artifact_path=artifact_path,
1736 vca_type=vca_type,
1737 )
1738 else:
1739 ee_id, credentials = await self.vca_map[
1740 vca_type
1741 ].create_execution_environment(
1742 namespace=namespace,
1743 reuse_ee_id=ee_id,
1744 db_dict=db_dict,
1745 vca_id=vca_id,
1746 )
1747
1748 elif vca_type == "native_charm":
1749 step = "Waiting to VM being up and getting IP address"
1750 self.logger.debug(logging_text + step)
1751 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1752 logging_text,
1753 nsr_id,
1754 vnfr_id,
1755 vdu_id,
1756 vdu_index,
1757 user=None,
1758 pub_key=None,
1759 )
1760 credentials = {"hostname": rw_mgmt_ip}
1761 # get username
1762 username = deep_get(
1763 config_descriptor, ("config-access", "ssh-access", "default-user")
1764 )
1765 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1766 # merged. Meanwhile let's get username from initial-config-primitive
1767 if not username and initial_config_primitive_list:
1768 for config_primitive in initial_config_primitive_list:
1769 for param in config_primitive.get("parameter", ()):
1770 if param["name"] == "ssh-username":
1771 username = param["value"]
1772 break
1773 if not username:
1774 raise LcmException(
1775 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1776 "'config-access.ssh-access.default-user'"
1777 )
1778 credentials["username"] = username
1779 # n2vc_redesign STEP 3.2
1780
1781 self._write_configuration_status(
1782 nsr_id=nsr_id,
1783 vca_index=vca_index,
1784 status="REGISTERING",
1785 element_under_configuration=element_under_configuration,
1786 element_type=element_type,
1787 )
1788
1789 step = "register execution environment {}".format(credentials)
1790 self.logger.debug(logging_text + step)
1791 ee_id = await self.vca_map[vca_type].register_execution_environment(
1792 credentials=credentials,
1793 namespace=namespace,
1794 db_dict=db_dict,
1795 vca_id=vca_id,
1796 )
1797
1798 # for compatibility with MON/POL modules, the need model and application name at database
1799 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1800 ee_id_parts = ee_id.split(".")
1801 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1802 if len(ee_id_parts) >= 2:
1803 model_name = ee_id_parts[0]
1804 application_name = ee_id_parts[1]
1805 db_nsr_update[db_update_entry + "model"] = model_name
1806 db_nsr_update[db_update_entry + "application"] = application_name
1807
1808 # n2vc_redesign STEP 3.3
1809 step = "Install configuration Software"
1810
1811 self._write_configuration_status(
1812 nsr_id=nsr_id,
1813 vca_index=vca_index,
1814 status="INSTALLING SW",
1815 element_under_configuration=element_under_configuration,
1816 element_type=element_type,
1817 other_update=db_nsr_update,
1818 )
1819
1820 # TODO check if already done
1821 self.logger.debug(logging_text + step)
1822 config = None
1823 if vca_type == "native_charm":
1824 config_primitive = next(
1825 (p for p in initial_config_primitive_list if p["name"] == "config"),
1826 None,
1827 )
1828 if config_primitive:
1829 config = self._map_primitive_params(
1830 config_primitive, {}, deploy_params
1831 )
1832 num_units = 1
1833 if vca_type == "lxc_proxy_charm":
1834 if element_type == "NS":
1835 num_units = db_nsr.get("config-units") or 1
1836 elif element_type == "VNF":
1837 num_units = db_vnfr.get("config-units") or 1
1838 elif element_type == "VDU":
1839 for v in db_vnfr["vdur"]:
1840 if vdu_id == v["vdu-id-ref"]:
1841 num_units = v.get("config-units") or 1
1842 break
1843 if vca_type != "k8s_proxy_charm":
1844 await self.vca_map[vca_type].install_configuration_sw(
1845 ee_id=ee_id,
1846 artifact_path=artifact_path,
1847 db_dict=db_dict,
1848 config=config,
1849 num_units=num_units,
1850 vca_id=vca_id,
1851 vca_type=vca_type,
1852 )
1853
1854 # write in db flag of configuration_sw already installed
1855 self.update_db_2(
1856 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1857 )
1858
1859 # add relations for this VCA (wait for other peers related with this VCA)
1860 await self._add_vca_relations(
1861 logging_text=logging_text,
1862 nsr_id=nsr_id,
1863 vca_index=vca_index,
1864 vca_id=vca_id,
1865 vca_type=vca_type,
1866 )
1867
1868 # if SSH access is required, then get execution environment SSH public
1869 # if native charm we have waited already to VM be UP
1870 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1871 pub_key = None
1872 user = None
1873 # self.logger.debug("get ssh key block")
1874 if deep_get(
1875 config_descriptor, ("config-access", "ssh-access", "required")
1876 ):
1877 # self.logger.debug("ssh key needed")
1878 # Needed to inject a ssh key
1879 user = deep_get(
1880 config_descriptor,
1881 ("config-access", "ssh-access", "default-user"),
1882 )
1883 step = "Install configuration Software, getting public ssh key"
1884 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1885 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1886 )
1887
1888 step = "Insert public key into VM user={} ssh_key={}".format(
1889 user, pub_key
1890 )
1891 else:
1892 # self.logger.debug("no need to get ssh key")
1893 step = "Waiting to VM being up and getting IP address"
1894 self.logger.debug(logging_text + step)
1895
1896 # n2vc_redesign STEP 5.1
1897 # wait for RO (ip-address) Insert pub_key into VM
1898 if vnfr_id:
1899 if kdu_name:
1900 rw_mgmt_ip = await self.wait_kdu_up(
1901 logging_text, nsr_id, vnfr_id, kdu_name
1902 )
1903 else:
1904 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1905 logging_text,
1906 nsr_id,
1907 vnfr_id,
1908 vdu_id,
1909 vdu_index,
1910 user=user,
1911 pub_key=pub_key,
1912 )
1913 else:
1914 rw_mgmt_ip = None # This is for a NS configuration
1915
1916 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1917
1918 # store rw_mgmt_ip in deploy params for later replacement
1919 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1920
1921 # n2vc_redesign STEP 6 Execute initial config primitive
1922 step = "execute initial config primitive"
1923
1924 # wait for dependent primitives execution (NS -> VNF -> VDU)
1925 if initial_config_primitive_list:
1926 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1927
1928 # stage, in function of element type: vdu, kdu, vnf or ns
1929 my_vca = vca_deployed_list[vca_index]
1930 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1931 # VDU or KDU
1932 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1933 elif my_vca.get("member-vnf-index"):
1934 # VNF
1935 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1936 else:
1937 # NS
1938 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1939
1940 self._write_configuration_status(
1941 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1942 )
1943
1944 self._write_op_status(op_id=nslcmop_id, stage=stage)
1945
1946 check_if_terminated_needed = True
1947 for initial_config_primitive in initial_config_primitive_list:
1948 # adding information on the vca_deployed if it is a NS execution environment
1949 if not vca_deployed["member-vnf-index"]:
1950 deploy_params["ns_config_info"] = json.dumps(
1951 self._get_ns_config_info(nsr_id)
1952 )
1953 # TODO check if already done
1954 primitive_params_ = self._map_primitive_params(
1955 initial_config_primitive, {}, deploy_params
1956 )
1957
1958 step = "execute primitive '{}' params '{}'".format(
1959 initial_config_primitive["name"], primitive_params_
1960 )
1961 self.logger.debug(logging_text + step)
1962 await self.vca_map[vca_type].exec_primitive(
1963 ee_id=ee_id,
1964 primitive_name=initial_config_primitive["name"],
1965 params_dict=primitive_params_,
1966 db_dict=db_dict,
1967 vca_id=vca_id,
1968 vca_type=vca_type,
1969 )
1970 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1971 if check_if_terminated_needed:
1972 if config_descriptor.get("terminate-config-primitive"):
1973 self.update_db_2(
1974 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1975 )
1976 check_if_terminated_needed = False
1977
1978 # TODO register in database that primitive is done
1979
1980 # STEP 7 Configure metrics
1981 if vca_type == "helm" or vca_type == "helm-v3":
1982 prometheus_jobs = await self.add_prometheus_metrics(
1983 ee_id=ee_id,
1984 artifact_path=artifact_path,
1985 ee_config_descriptor=ee_config_descriptor,
1986 vnfr_id=vnfr_id,
1987 nsr_id=nsr_id,
1988 target_ip=rw_mgmt_ip,
1989 )
1990 if prometheus_jobs:
1991 self.update_db_2(
1992 "nsrs",
1993 nsr_id,
1994 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1995 )
1996
1997 step = "instantiated at VCA"
1998 self.logger.debug(logging_text + step)
1999
2000 self._write_configuration_status(
2001 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2002 )
2003
2004 except Exception as e: # TODO not use Exception but N2VC exception
2005 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2006 if not isinstance(
2007 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2008 ):
2009 self.logger.error(
2010 "Exception while {} : {}".format(step, e), exc_info=True
2011 )
2012 self._write_configuration_status(
2013 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2014 )
2015 raise LcmException("{} {}".format(step, e)) from e
2016
2017 def _write_ns_status(
2018 self,
2019 nsr_id: str,
2020 ns_state: str,
2021 current_operation: str,
2022 current_operation_id: str,
2023 error_description: str = None,
2024 error_detail: str = None,
2025 other_update: dict = None,
2026 ):
2027 """
2028 Update db_nsr fields.
2029 :param nsr_id:
2030 :param ns_state:
2031 :param current_operation:
2032 :param current_operation_id:
2033 :param error_description:
2034 :param error_detail:
2035 :param other_update: Other required changes at database if provided, will be cleared
2036 :return:
2037 """
2038 try:
2039 db_dict = other_update or {}
2040 db_dict[
2041 "_admin.nslcmop"
2042 ] = current_operation_id # for backward compatibility
2043 db_dict["_admin.current-operation"] = current_operation_id
2044 db_dict["_admin.operation-type"] = (
2045 current_operation if current_operation != "IDLE" else None
2046 )
2047 db_dict["currentOperation"] = current_operation
2048 db_dict["currentOperationID"] = current_operation_id
2049 db_dict["errorDescription"] = error_description
2050 db_dict["errorDetail"] = error_detail
2051
2052 if ns_state:
2053 db_dict["nsState"] = ns_state
2054 self.update_db_2("nsrs", nsr_id, db_dict)
2055 except DbException as e:
2056 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2057
2058 def _write_op_status(
2059 self,
2060 op_id: str,
2061 stage: list = None,
2062 error_message: str = None,
2063 queuePosition: int = 0,
2064 operation_state: str = None,
2065 other_update: dict = None,
2066 ):
2067 try:
2068 db_dict = other_update or {}
2069 db_dict["queuePosition"] = queuePosition
2070 if isinstance(stage, list):
2071 db_dict["stage"] = stage[0]
2072 db_dict["detailed-status"] = " ".join(stage)
2073 elif stage is not None:
2074 db_dict["stage"] = str(stage)
2075
2076 if error_message is not None:
2077 db_dict["errorMessage"] = error_message
2078 if operation_state is not None:
2079 db_dict["operationState"] = operation_state
2080 db_dict["statusEnteredTime"] = time()
2081 self.update_db_2("nslcmops", op_id, db_dict)
2082 except DbException as e:
2083 self.logger.warn(
2084 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2085 )
2086
2087 def _write_all_config_status(self, db_nsr: dict, status: str):
2088 try:
2089 nsr_id = db_nsr["_id"]
2090 # configurationStatus
2091 config_status = db_nsr.get("configurationStatus")
2092 if config_status:
2093 db_nsr_update = {
2094 "configurationStatus.{}.status".format(index): status
2095 for index, v in enumerate(config_status)
2096 if v
2097 }
2098 # update status
2099 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2100
2101 except DbException as e:
2102 self.logger.warn(
2103 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2104 )
2105
2106 def _write_configuration_status(
2107 self,
2108 nsr_id: str,
2109 vca_index: int,
2110 status: str = None,
2111 element_under_configuration: str = None,
2112 element_type: str = None,
2113 other_update: dict = None,
2114 ):
2115
2116 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2117 # .format(vca_index, status))
2118
2119 try:
2120 db_path = "configurationStatus.{}.".format(vca_index)
2121 db_dict = other_update or {}
2122 if status:
2123 db_dict[db_path + "status"] = status
2124 if element_under_configuration:
2125 db_dict[
2126 db_path + "elementUnderConfiguration"
2127 ] = element_under_configuration
2128 if element_type:
2129 db_dict[db_path + "elementType"] = element_type
2130 self.update_db_2("nsrs", nsr_id, db_dict)
2131 except DbException as e:
2132 self.logger.warn(
2133 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2134 status, nsr_id, vca_index, e
2135 )
2136 )
2137
2138 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2139 """
2140 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2141 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2142 Database is used because the result can be obtained from a different LCM worker in case of HA.
2143 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2144 :param db_nslcmop: database content of nslcmop
2145 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2146 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2147 computed 'vim-account-id'
2148 """
2149 modified = False
2150 nslcmop_id = db_nslcmop["_id"]
2151 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2152 if placement_engine == "PLA":
2153 self.logger.debug(
2154 logging_text + "Invoke and wait for placement optimization"
2155 )
2156 await self.msg.aiowrite(
2157 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2158 )
2159 db_poll_interval = 5
2160 wait = db_poll_interval * 10
2161 pla_result = None
2162 while not pla_result and wait >= 0:
2163 await asyncio.sleep(db_poll_interval)
2164 wait -= db_poll_interval
2165 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2166 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2167
2168 if not pla_result:
2169 raise LcmException(
2170 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2171 )
2172
2173 for pla_vnf in pla_result["vnf"]:
2174 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2175 if not pla_vnf.get("vimAccountId") or not vnfr:
2176 continue
2177 modified = True
2178 self.db.set_one(
2179 "vnfrs",
2180 {"_id": vnfr["_id"]},
2181 {"vim-account-id": pla_vnf["vimAccountId"]},
2182 )
2183 # Modifies db_vnfrs
2184 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2185 return modified
2186
2187 def update_nsrs_with_pla_result(self, params):
2188 try:
2189 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2190 self.update_db_2(
2191 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2192 )
2193 except Exception as e:
2194 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2195
2196 async def instantiate(self, nsr_id, nslcmop_id):
2197 """
2198
2199 :param nsr_id: ns instance to deploy
2200 :param nslcmop_id: operation to run
2201 :return:
2202 """
2203
2204 # Try to lock HA task here
2205 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2206 if not task_is_locked_by_me:
2207 self.logger.debug(
2208 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2209 )
2210 return
2211
2212 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2213 self.logger.debug(logging_text + "Enter")
2214
2215 # get all needed from database
2216
2217 # database nsrs record
2218 db_nsr = None
2219
2220 # database nslcmops record
2221 db_nslcmop = None
2222
2223 # update operation on nsrs
2224 db_nsr_update = {}
2225 # update operation on nslcmops
2226 db_nslcmop_update = {}
2227
2228 nslcmop_operation_state = None
2229 db_vnfrs = {} # vnf's info indexed by member-index
2230 # n2vc_info = {}
2231 tasks_dict_info = {} # from task to info text
2232 exc = None
2233 error_list = []
2234 stage = [
2235 "Stage 1/5: preparation of the environment.",
2236 "Waiting for previous operations to terminate.",
2237 "",
2238 ]
2239 # ^ stage, step, VIM progress
2240 try:
2241 # wait for any previous tasks in process
2242 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2243
2244 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2245 stage[1] = "Reading from database."
2246 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2247 db_nsr_update["detailed-status"] = "creating"
2248 db_nsr_update["operational-status"] = "init"
2249 self._write_ns_status(
2250 nsr_id=nsr_id,
2251 ns_state="BUILDING",
2252 current_operation="INSTANTIATING",
2253 current_operation_id=nslcmop_id,
2254 other_update=db_nsr_update,
2255 )
2256 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2257
2258 # read from db: operation
2259 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2260 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2261 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2262 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2263 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2264 )
2265 ns_params = db_nslcmop.get("operationParams")
2266 if ns_params and ns_params.get("timeout_ns_deploy"):
2267 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2268 else:
2269 timeout_ns_deploy = self.timeout.get(
2270 "ns_deploy", self.timeout_ns_deploy
2271 )
2272
2273 # read from db: ns
2274 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2275 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2276 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2277 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2278 self.fs.sync(db_nsr["nsd-id"])
2279 db_nsr["nsd"] = nsd
2280 # nsr_name = db_nsr["name"] # TODO short-name??
2281
2282 # read from db: vnf's of this ns
2283 stage[1] = "Getting vnfrs from db."
2284 self.logger.debug(logging_text + stage[1])
2285 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2286
2287 # read from db: vnfd's for every vnf
2288 db_vnfds = [] # every vnfd data
2289
2290 # for each vnf in ns, read vnfd
2291 for vnfr in db_vnfrs_list:
2292 if vnfr.get("kdur"):
2293 kdur_list = []
2294 for kdur in vnfr["kdur"]:
2295 if kdur.get("additionalParams"):
2296 kdur["additionalParams"] = json.loads(
2297 kdur["additionalParams"]
2298 )
2299 kdur_list.append(kdur)
2300 vnfr["kdur"] = kdur_list
2301
2302 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2303 vnfd_id = vnfr["vnfd-id"]
2304 vnfd_ref = vnfr["vnfd-ref"]
2305 self.fs.sync(vnfd_id)
2306
2307 # if we haven't this vnfd, read it from db
2308 if vnfd_id not in db_vnfds:
2309 # read from db
2310 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2311 vnfd_id, vnfd_ref
2312 )
2313 self.logger.debug(logging_text + stage[1])
2314 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2315
2316 # store vnfd
2317 db_vnfds.append(vnfd)
2318
2319 # Get or generates the _admin.deployed.VCA list
2320 vca_deployed_list = None
2321 if db_nsr["_admin"].get("deployed"):
2322 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2323 if vca_deployed_list is None:
2324 vca_deployed_list = []
2325 configuration_status_list = []
2326 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2327 db_nsr_update["configurationStatus"] = configuration_status_list
2328 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2329 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2330 elif isinstance(vca_deployed_list, dict):
2331 # maintain backward compatibility. Change a dict to list at database
2332 vca_deployed_list = list(vca_deployed_list.values())
2333 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2334 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2335
2336 if not isinstance(
2337 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2338 ):
2339 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2340 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2341
2342 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2343 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2344 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2345 self.db.set_list(
2346 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2347 )
2348
2349 # n2vc_redesign STEP 2 Deploy Network Scenario
2350 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2351 self._write_op_status(op_id=nslcmop_id, stage=stage)
2352
2353 stage[1] = "Deploying KDUs."
2354 # self.logger.debug(logging_text + "Before deploy_kdus")
2355 # Call to deploy_kdus in case exists the "vdu:kdu" param
2356 await self.deploy_kdus(
2357 logging_text=logging_text,
2358 nsr_id=nsr_id,
2359 nslcmop_id=nslcmop_id,
2360 db_vnfrs=db_vnfrs,
2361 db_vnfds=db_vnfds,
2362 task_instantiation_info=tasks_dict_info,
2363 )
2364
2365 stage[1] = "Getting VCA public key."
2366 # n2vc_redesign STEP 1 Get VCA public ssh-key
2367 # feature 1429. Add n2vc public key to needed VMs
2368 n2vc_key = self.n2vc.get_public_key()
2369 n2vc_key_list = [n2vc_key]
2370 if self.vca_config.get("public_key"):
2371 n2vc_key_list.append(self.vca_config["public_key"])
2372
2373 stage[1] = "Deploying NS at VIM."
2374 task_ro = asyncio.ensure_future(
2375 self.instantiate_RO(
2376 logging_text=logging_text,
2377 nsr_id=nsr_id,
2378 nsd=nsd,
2379 db_nsr=db_nsr,
2380 db_nslcmop=db_nslcmop,
2381 db_vnfrs=db_vnfrs,
2382 db_vnfds=db_vnfds,
2383 n2vc_key_list=n2vc_key_list,
2384 stage=stage,
2385 )
2386 )
2387 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2388 tasks_dict_info[task_ro] = "Deploying at VIM"
2389
2390 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2391 stage[1] = "Deploying Execution Environments."
2392 self.logger.debug(logging_text + stage[1])
2393
2394 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2395 for vnf_profile in get_vnf_profiles(nsd):
2396 vnfd_id = vnf_profile["vnfd-id"]
2397 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2398 member_vnf_index = str(vnf_profile["id"])
2399 db_vnfr = db_vnfrs[member_vnf_index]
2400 base_folder = vnfd["_admin"]["storage"]
2401 vdu_id = None
2402 vdu_index = 0
2403 vdu_name = None
2404 kdu_name = None
2405
2406 # Get additional parameters
2407 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2408 if db_vnfr.get("additionalParamsForVnf"):
2409 deploy_params.update(
2410 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2411 )
2412
2413 descriptor_config = get_configuration(vnfd, vnfd["id"])
2414 if descriptor_config:
2415 self._deploy_n2vc(
2416 logging_text=logging_text
2417 + "member_vnf_index={} ".format(member_vnf_index),
2418 db_nsr=db_nsr,
2419 db_vnfr=db_vnfr,
2420 nslcmop_id=nslcmop_id,
2421 nsr_id=nsr_id,
2422 nsi_id=nsi_id,
2423 vnfd_id=vnfd_id,
2424 vdu_id=vdu_id,
2425 kdu_name=kdu_name,
2426 member_vnf_index=member_vnf_index,
2427 vdu_index=vdu_index,
2428 vdu_name=vdu_name,
2429 deploy_params=deploy_params,
2430 descriptor_config=descriptor_config,
2431 base_folder=base_folder,
2432 task_instantiation_info=tasks_dict_info,
2433 stage=stage,
2434 )
2435
2436 # Deploy charms for each VDU that supports one.
2437 for vdud in get_vdu_list(vnfd):
2438 vdu_id = vdud["id"]
2439 descriptor_config = get_configuration(vnfd, vdu_id)
2440 vdur = find_in_list(
2441 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2442 )
2443
2444 if vdur.get("additionalParams"):
2445 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2446 else:
2447 deploy_params_vdu = deploy_params
2448 deploy_params_vdu["OSM"] = get_osm_params(
2449 db_vnfr, vdu_id, vdu_count_index=0
2450 )
2451 vdud_count = get_number_of_instances(vnfd, vdu_id)
2452
2453 self.logger.debug("VDUD > {}".format(vdud))
2454 self.logger.debug(
2455 "Descriptor config > {}".format(descriptor_config)
2456 )
2457 if descriptor_config:
2458 vdu_name = None
2459 kdu_name = None
2460 for vdu_index in range(vdud_count):
2461 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2462 self._deploy_n2vc(
2463 logging_text=logging_text
2464 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2465 member_vnf_index, vdu_id, vdu_index
2466 ),
2467 db_nsr=db_nsr,
2468 db_vnfr=db_vnfr,
2469 nslcmop_id=nslcmop_id,
2470 nsr_id=nsr_id,
2471 nsi_id=nsi_id,
2472 vnfd_id=vnfd_id,
2473 vdu_id=vdu_id,
2474 kdu_name=kdu_name,
2475 member_vnf_index=member_vnf_index,
2476 vdu_index=vdu_index,
2477 vdu_name=vdu_name,
2478 deploy_params=deploy_params_vdu,
2479 descriptor_config=descriptor_config,
2480 base_folder=base_folder,
2481 task_instantiation_info=tasks_dict_info,
2482 stage=stage,
2483 )
2484 for kdud in get_kdu_list(vnfd):
2485 kdu_name = kdud["name"]
2486 descriptor_config = get_configuration(vnfd, kdu_name)
2487 if descriptor_config:
2488 vdu_id = None
2489 vdu_index = 0
2490 vdu_name = None
2491 kdur = next(
2492 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2493 )
2494 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2495 if kdur.get("additionalParams"):
2496 deploy_params_kdu.update(
2497 parse_yaml_strings(kdur["additionalParams"].copy())
2498 )
2499
2500 self._deploy_n2vc(
2501 logging_text=logging_text,
2502 db_nsr=db_nsr,
2503 db_vnfr=db_vnfr,
2504 nslcmop_id=nslcmop_id,
2505 nsr_id=nsr_id,
2506 nsi_id=nsi_id,
2507 vnfd_id=vnfd_id,
2508 vdu_id=vdu_id,
2509 kdu_name=kdu_name,
2510 member_vnf_index=member_vnf_index,
2511 vdu_index=vdu_index,
2512 vdu_name=vdu_name,
2513 deploy_params=deploy_params_kdu,
2514 descriptor_config=descriptor_config,
2515 base_folder=base_folder,
2516 task_instantiation_info=tasks_dict_info,
2517 stage=stage,
2518 )
2519
2520 # Check if this NS has a charm configuration
2521 descriptor_config = nsd.get("ns-configuration")
2522 if descriptor_config and descriptor_config.get("juju"):
2523 vnfd_id = None
2524 db_vnfr = None
2525 member_vnf_index = None
2526 vdu_id = None
2527 kdu_name = None
2528 vdu_index = 0
2529 vdu_name = None
2530
2531 # Get additional parameters
2532 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2533 if db_nsr.get("additionalParamsForNs"):
2534 deploy_params.update(
2535 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2536 )
2537 base_folder = nsd["_admin"]["storage"]
2538 self._deploy_n2vc(
2539 logging_text=logging_text,
2540 db_nsr=db_nsr,
2541 db_vnfr=db_vnfr,
2542 nslcmop_id=nslcmop_id,
2543 nsr_id=nsr_id,
2544 nsi_id=nsi_id,
2545 vnfd_id=vnfd_id,
2546 vdu_id=vdu_id,
2547 kdu_name=kdu_name,
2548 member_vnf_index=member_vnf_index,
2549 vdu_index=vdu_index,
2550 vdu_name=vdu_name,
2551 deploy_params=deploy_params,
2552 descriptor_config=descriptor_config,
2553 base_folder=base_folder,
2554 task_instantiation_info=tasks_dict_info,
2555 stage=stage,
2556 )
2557
2558 # rest of staff will be done at finally
2559
2560 except (
2561 ROclient.ROClientException,
2562 DbException,
2563 LcmException,
2564 N2VCException,
2565 ) as e:
2566 self.logger.error(
2567 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2568 )
2569 exc = e
2570 except asyncio.CancelledError:
2571 self.logger.error(
2572 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2573 )
2574 exc = "Operation was cancelled"
2575 except Exception as e:
2576 exc = traceback.format_exc()
2577 self.logger.critical(
2578 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2579 exc_info=True,
2580 )
2581 finally:
2582 if exc:
2583 error_list.append(str(exc))
2584 try:
2585 # wait for pending tasks
2586 if tasks_dict_info:
2587 stage[1] = "Waiting for instantiate pending tasks."
2588 self.logger.debug(logging_text + stage[1])
2589 error_list += await self._wait_for_tasks(
2590 logging_text,
2591 tasks_dict_info,
2592 timeout_ns_deploy,
2593 stage,
2594 nslcmop_id,
2595 nsr_id=nsr_id,
2596 )
2597 stage[1] = stage[2] = ""
2598 except asyncio.CancelledError:
2599 error_list.append("Cancelled")
2600 # TODO cancel all tasks
2601 except Exception as exc:
2602 error_list.append(str(exc))
2603
2604 # update operation-status
2605 db_nsr_update["operational-status"] = "running"
2606 # let's begin with VCA 'configured' status (later we can change it)
2607 db_nsr_update["config-status"] = "configured"
2608 for task, task_name in tasks_dict_info.items():
2609 if not task.done() or task.cancelled() or task.exception():
2610 if task_name.startswith(self.task_name_deploy_vca):
2611 # A N2VC task is pending
2612 db_nsr_update["config-status"] = "failed"
2613 else:
2614 # RO or KDU task is pending
2615 db_nsr_update["operational-status"] = "failed"
2616
2617 # update status at database
2618 if error_list:
2619 error_detail = ". ".join(error_list)
2620 self.logger.error(logging_text + error_detail)
2621 error_description_nslcmop = "{} Detail: {}".format(
2622 stage[0], error_detail
2623 )
2624 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2625 nslcmop_id, stage[0]
2626 )
2627
2628 db_nsr_update["detailed-status"] = (
2629 error_description_nsr + " Detail: " + error_detail
2630 )
2631 db_nslcmop_update["detailed-status"] = error_detail
2632 nslcmop_operation_state = "FAILED"
2633 ns_state = "BROKEN"
2634 else:
2635 error_detail = None
2636 error_description_nsr = error_description_nslcmop = None
2637 ns_state = "READY"
2638 db_nsr_update["detailed-status"] = "Done"
2639 db_nslcmop_update["detailed-status"] = "Done"
2640 nslcmop_operation_state = "COMPLETED"
2641
2642 if db_nsr:
2643 self._write_ns_status(
2644 nsr_id=nsr_id,
2645 ns_state=ns_state,
2646 current_operation="IDLE",
2647 current_operation_id=None,
2648 error_description=error_description_nsr,
2649 error_detail=error_detail,
2650 other_update=db_nsr_update,
2651 )
2652 self._write_op_status(
2653 op_id=nslcmop_id,
2654 stage="",
2655 error_message=error_description_nslcmop,
2656 operation_state=nslcmop_operation_state,
2657 other_update=db_nslcmop_update,
2658 )
2659
2660 if nslcmop_operation_state:
2661 try:
2662 await self.msg.aiowrite(
2663 "ns",
2664 "instantiated",
2665 {
2666 "nsr_id": nsr_id,
2667 "nslcmop_id": nslcmop_id,
2668 "operationState": nslcmop_operation_state,
2669 },
2670 loop=self.loop,
2671 )
2672 except Exception as e:
2673 self.logger.error(
2674 logging_text + "kafka_write notification Exception {}".format(e)
2675 )
2676
2677 self.logger.debug(logging_text + "Exit")
2678 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2679
2680 async def _add_vca_relations(
2681 self,
2682 logging_text,
2683 nsr_id,
2684 vca_index: int,
2685 timeout: int = 3600,
2686 vca_type: str = None,
2687 vca_id: str = None,
2688 ) -> bool:
2689
2690 # steps:
2691 # 1. find all relations for this VCA
2692 # 2. wait for other peers related
2693 # 3. add relations
2694
2695 try:
2696 vca_type = vca_type or "lxc_proxy_charm"
2697
2698 # STEP 1: find all relations for this VCA
2699
2700 # read nsr record
2701 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2702 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2703
2704 # this VCA data
2705 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2706
2707 # read all ns-configuration relations
2708 ns_relations = list()
2709 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2710 if db_ns_relations:
2711 for r in db_ns_relations:
2712 # check if this VCA is in the relation
2713 if my_vca.get("member-vnf-index") in (
2714 r.get("entities")[0].get("id"),
2715 r.get("entities")[1].get("id"),
2716 ):
2717 ns_relations.append(r)
2718
2719 # read all vnf-configuration relations
2720 vnf_relations = list()
2721 db_vnfd_list = db_nsr.get("vnfd-id")
2722 if db_vnfd_list:
2723 for vnfd in db_vnfd_list:
2724 db_vnf_relations = None
2725 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2726 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2727 if db_vnf_configuration:
2728 db_vnf_relations = db_vnf_configuration.get("relation", [])
2729 if db_vnf_relations:
2730 for r in db_vnf_relations:
2731 # check if this VCA is in the relation
2732 if my_vca.get("vdu_id") in (
2733 r.get("entities")[0].get("id"),
2734 r.get("entities")[1].get("id"),
2735 ):
2736 vnf_relations.append(r)
2737
2738 # if no relations, terminate
2739 if not ns_relations and not vnf_relations:
2740 self.logger.debug(logging_text + " No relations")
2741 return True
2742
2743 self.logger.debug(
2744 logging_text
2745 + " adding relations\n {}\n {}".format(
2746 ns_relations, vnf_relations
2747 )
2748 )
2749
2750 # add all relations
2751 start = time()
2752 while True:
2753 # check timeout
2754 now = time()
2755 if now - start >= timeout:
2756 self.logger.error(logging_text + " : timeout adding relations")
2757 return False
2758
2759 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2760 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2761
2762 # for each defined NS relation, find the VCA's related
2763 for r in ns_relations.copy():
2764 from_vca_ee_id = None
2765 to_vca_ee_id = None
2766 from_vca_endpoint = None
2767 to_vca_endpoint = None
2768 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2769 for vca in vca_list:
2770 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2771 "id"
2772 ) and vca.get("config_sw_installed"):
2773 from_vca_ee_id = vca.get("ee_id")
2774 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2775 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2776 "id"
2777 ) and vca.get("config_sw_installed"):
2778 to_vca_ee_id = vca.get("ee_id")
2779 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2780 if from_vca_ee_id and to_vca_ee_id:
2781 # add relation
2782 await self.vca_map[vca_type].add_relation(
2783 ee_id_1=from_vca_ee_id,
2784 ee_id_2=to_vca_ee_id,
2785 endpoint_1=from_vca_endpoint,
2786 endpoint_2=to_vca_endpoint,
2787 vca_id=vca_id,
2788 )
2789 # remove entry from relations list
2790 ns_relations.remove(r)
2791 else:
2792 # check failed peers
2793 try:
2794 vca_status_list = db_nsr.get("configurationStatus")
2795 if vca_status_list:
2796 for i in range(len(vca_list)):
2797 vca = vca_list[i]
2798 vca_status = vca_status_list[i]
2799 if vca.get("member-vnf-index") == r.get("entities")[
2800 0
2801 ].get("id"):
2802 if vca_status.get("status") == "BROKEN":
2803 # peer broken: remove relation from list
2804 ns_relations.remove(r)
2805 if vca.get("member-vnf-index") == r.get("entities")[
2806 1
2807 ].get("id"):
2808 if vca_status.get("status") == "BROKEN":
2809 # peer broken: remove relation from list
2810 ns_relations.remove(r)
2811 except Exception:
2812 # ignore
2813 pass
2814
2815 # for each defined VNF relation, find the VCA's related
2816 for r in vnf_relations.copy():
2817 from_vca_ee_id = None
2818 to_vca_ee_id = None
2819 from_vca_endpoint = None
2820 to_vca_endpoint = None
2821 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2822 for vca in vca_list:
2823 key_to_check = "vdu_id"
2824 if vca.get("vdu_id") is None:
2825 key_to_check = "vnfd_id"
2826 if vca.get(key_to_check) == r.get("entities")[0].get(
2827 "id"
2828 ) and vca.get("config_sw_installed"):
2829 from_vca_ee_id = vca.get("ee_id")
2830 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2831 if vca.get(key_to_check) == r.get("entities")[1].get(
2832 "id"
2833 ) and vca.get("config_sw_installed"):
2834 to_vca_ee_id = vca.get("ee_id")
2835 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2836 if from_vca_ee_id and to_vca_ee_id:
2837 # add relation
2838 await self.vca_map[vca_type].add_relation(
2839 ee_id_1=from_vca_ee_id,
2840 ee_id_2=to_vca_ee_id,
2841 endpoint_1=from_vca_endpoint,
2842 endpoint_2=to_vca_endpoint,
2843 vca_id=vca_id,
2844 )
2845 # remove entry from relations list
2846 vnf_relations.remove(r)
2847 else:
2848 # check failed peers
2849 try:
2850 vca_status_list = db_nsr.get("configurationStatus")
2851 if vca_status_list:
2852 for i in range(len(vca_list)):
2853 vca = vca_list[i]
2854 vca_status = vca_status_list[i]
2855 if vca.get("vdu_id") == r.get("entities")[0].get(
2856 "id"
2857 ):
2858 if vca_status.get("status") == "BROKEN":
2859 # peer broken: remove relation from list
2860 vnf_relations.remove(r)
2861 if vca.get("vdu_id") == r.get("entities")[1].get(
2862 "id"
2863 ):
2864 if vca_status.get("status") == "BROKEN":
2865 # peer broken: remove relation from list
2866 vnf_relations.remove(r)
2867 except Exception:
2868 # ignore
2869 pass
2870
2871 # wait for next try
2872 await asyncio.sleep(5.0)
2873
2874 if not ns_relations and not vnf_relations:
2875 self.logger.debug("Relations added")
2876 break
2877
2878 return True
2879
2880 except Exception as e:
2881 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2882 return False
2883
2884 async def _install_kdu(
2885 self,
2886 nsr_id: str,
2887 nsr_db_path: str,
2888 vnfr_data: dict,
2889 kdu_index: int,
2890 kdud: dict,
2891 vnfd: dict,
2892 k8s_instance_info: dict,
2893 k8params: dict = None,
2894 timeout: int = 600,
2895 vca_id: str = None,
2896 ):
2897
2898 try:
2899 k8sclustertype = k8s_instance_info["k8scluster-type"]
2900 # Instantiate kdu
2901 db_dict_install = {
2902 "collection": "nsrs",
2903 "filter": {"_id": nsr_id},
2904 "path": nsr_db_path,
2905 }
2906
2907 if k8s_instance_info.get("kdu-deployment-name"):
2908 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2909 else:
2910 kdu_instance = self.k8scluster_map[
2911 k8sclustertype
2912 ].generate_kdu_instance_name(
2913 db_dict=db_dict_install,
2914 kdu_model=k8s_instance_info["kdu-model"],
2915 kdu_name=k8s_instance_info["kdu-name"],
2916 )
2917 self.update_db_2(
2918 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2919 )
2920 await self.k8scluster_map[k8sclustertype].install(
2921 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2922 kdu_model=k8s_instance_info["kdu-model"],
2923 atomic=True,
2924 params=k8params,
2925 db_dict=db_dict_install,
2926 timeout=timeout,
2927 kdu_name=k8s_instance_info["kdu-name"],
2928 namespace=k8s_instance_info["namespace"],
2929 kdu_instance=kdu_instance,
2930 vca_id=vca_id,
2931 )
2932 self.update_db_2(
2933 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2934 )
2935
2936 # Obtain services to obtain management service ip
2937 services = await self.k8scluster_map[k8sclustertype].get_services(
2938 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2939 kdu_instance=kdu_instance,
2940 namespace=k8s_instance_info["namespace"],
2941 )
2942
2943 # Obtain management service info (if exists)
2944 vnfr_update_dict = {}
2945 kdu_config = get_configuration(vnfd, kdud["name"])
2946 if kdu_config:
2947 target_ee_list = kdu_config.get("execution-environment-list", [])
2948 else:
2949 target_ee_list = []
2950
2951 if services:
2952 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2953 mgmt_services = [
2954 service
2955 for service in kdud.get("service", [])
2956 if service.get("mgmt-service")
2957 ]
2958 for mgmt_service in mgmt_services:
2959 for service in services:
2960 if service["name"].startswith(mgmt_service["name"]):
2961 # Mgmt service found, Obtain service ip
2962 ip = service.get("external_ip", service.get("cluster_ip"))
2963 if isinstance(ip, list) and len(ip) == 1:
2964 ip = ip[0]
2965
2966 vnfr_update_dict[
2967 "kdur.{}.ip-address".format(kdu_index)
2968 ] = ip
2969
2970 # Check if must update also mgmt ip at the vnf
2971 service_external_cp = mgmt_service.get(
2972 "external-connection-point-ref"
2973 )
2974 if service_external_cp:
2975 if (
2976 deep_get(vnfd, ("mgmt-interface", "cp"))
2977 == service_external_cp
2978 ):
2979 vnfr_update_dict["ip-address"] = ip
2980
2981 if find_in_list(
2982 target_ee_list,
2983 lambda ee: ee.get(
2984 "external-connection-point-ref", ""
2985 )
2986 == service_external_cp,
2987 ):
2988 vnfr_update_dict[
2989 "kdur.{}.ip-address".format(kdu_index)
2990 ] = ip
2991 break
2992 else:
2993 self.logger.warn(
2994 "Mgmt service name: {} not found".format(
2995 mgmt_service["name"]
2996 )
2997 )
2998
2999 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3000 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3001
3002 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3003 if (
3004 kdu_config
3005 and kdu_config.get("initial-config-primitive")
3006 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3007 ):
3008 initial_config_primitive_list = kdu_config.get(
3009 "initial-config-primitive"
3010 )
3011 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3012
3013 for initial_config_primitive in initial_config_primitive_list:
3014 primitive_params_ = self._map_primitive_params(
3015 initial_config_primitive, {}, {}
3016 )
3017
3018 await asyncio.wait_for(
3019 self.k8scluster_map[k8sclustertype].exec_primitive(
3020 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3021 kdu_instance=kdu_instance,
3022 primitive_name=initial_config_primitive["name"],
3023 params=primitive_params_,
3024 db_dict=db_dict_install,
3025 vca_id=vca_id,
3026 ),
3027 timeout=timeout,
3028 )
3029
3030 except Exception as e:
3031 # Prepare update db with error and raise exception
3032 try:
3033 self.update_db_2(
3034 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3035 )
3036 self.update_db_2(
3037 "vnfrs",
3038 vnfr_data.get("_id"),
3039 {"kdur.{}.status".format(kdu_index): "ERROR"},
3040 )
3041 except Exception:
3042 # ignore to keep original exception
3043 pass
3044 # reraise original error
3045 raise
3046
3047 return kdu_instance
3048
3049 async def deploy_kdus(
3050 self,
3051 logging_text,
3052 nsr_id,
3053 nslcmop_id,
3054 db_vnfrs,
3055 db_vnfds,
3056 task_instantiation_info,
3057 ):
3058 # Launch kdus if present in the descriptor
3059
3060 k8scluster_id_2_uuic = {
3061 "helm-chart-v3": {},
3062 "helm-chart": {},
3063 "juju-bundle": {},
3064 }
3065
3066 async def _get_cluster_id(cluster_id, cluster_type):
3067 nonlocal k8scluster_id_2_uuic
3068 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3069 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3070
3071 # check if K8scluster is creating and wait look if previous tasks in process
3072 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3073 "k8scluster", cluster_id
3074 )
3075 if task_dependency:
3076 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3077 task_name, cluster_id
3078 )
3079 self.logger.debug(logging_text + text)
3080 await asyncio.wait(task_dependency, timeout=3600)
3081
3082 db_k8scluster = self.db.get_one(
3083 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3084 )
3085 if not db_k8scluster:
3086 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3087
3088 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3089 if not k8s_id:
3090 if cluster_type == "helm-chart-v3":
3091 try:
3092 # backward compatibility for existing clusters that have not been initialized for helm v3
3093 k8s_credentials = yaml.safe_dump(
3094 db_k8scluster.get("credentials")
3095 )
3096 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3097 k8s_credentials, reuse_cluster_uuid=cluster_id
3098 )
3099 db_k8scluster_update = {}
3100 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3101 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3102 db_k8scluster_update[
3103 "_admin.helm-chart-v3.created"
3104 ] = uninstall_sw
3105 db_k8scluster_update[
3106 "_admin.helm-chart-v3.operationalState"
3107 ] = "ENABLED"
3108 self.update_db_2(
3109 "k8sclusters", cluster_id, db_k8scluster_update
3110 )
3111 except Exception as e:
3112 self.logger.error(
3113 logging_text
3114 + "error initializing helm-v3 cluster: {}".format(str(e))
3115 )
3116 raise LcmException(
3117 "K8s cluster '{}' has not been initialized for '{}'".format(
3118 cluster_id, cluster_type
3119 )
3120 )
3121 else:
3122 raise LcmException(
3123 "K8s cluster '{}' has not been initialized for '{}'".format(
3124 cluster_id, cluster_type
3125 )
3126 )
3127 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3128 return k8s_id
3129
3130 logging_text += "Deploy kdus: "
3131 step = ""
3132 try:
3133 db_nsr_update = {"_admin.deployed.K8s": []}
3134 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3135
3136 index = 0
3137 updated_cluster_list = []
3138 updated_v3_cluster_list = []
3139
3140 for vnfr_data in db_vnfrs.values():
3141 vca_id = self.get_vca_id(vnfr_data, {})
3142 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3143 # Step 0: Prepare and set parameters
3144 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3145 vnfd_id = vnfr_data.get("vnfd-id")
3146 vnfd_with_id = find_in_list(
3147 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3148 )
3149 kdud = next(
3150 kdud
3151 for kdud in vnfd_with_id["kdu"]
3152 if kdud["name"] == kdur["kdu-name"]
3153 )
3154 namespace = kdur.get("k8s-namespace")
3155 kdu_deployment_name = kdur.get("kdu-deployment-name")
3156 if kdur.get("helm-chart"):
3157 kdumodel = kdur["helm-chart"]
3158 # Default version: helm3, if helm-version is v2 assign v2
3159 k8sclustertype = "helm-chart-v3"
3160 self.logger.debug("kdur: {}".format(kdur))
3161 if (
3162 kdur.get("helm-version")
3163 and kdur.get("helm-version") == "v2"
3164 ):
3165 k8sclustertype = "helm-chart"
3166 elif kdur.get("juju-bundle"):
3167 kdumodel = kdur["juju-bundle"]
3168 k8sclustertype = "juju-bundle"
3169 else:
3170 raise LcmException(
3171 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3172 "juju-bundle. Maybe an old NBI version is running".format(
3173 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3174 )
3175 )
3176 # check if kdumodel is a file and exists
3177 try:
3178 vnfd_with_id = find_in_list(
3179 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3180 )
3181 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3182 if storage and storage.get(
3183 "pkg-dir"
3184 ): # may be not present if vnfd has not artifacts
3185 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3186 filename = "{}/{}/{}s/{}".format(
3187 storage["folder"],
3188 storage["pkg-dir"],
3189 k8sclustertype,
3190 kdumodel,
3191 )
3192 if self.fs.file_exists(
3193 filename, mode="file"
3194 ) or self.fs.file_exists(filename, mode="dir"):
3195 kdumodel = self.fs.path + filename
3196 except (asyncio.TimeoutError, asyncio.CancelledError):
3197 raise
3198 except Exception: # it is not a file
3199 pass
3200
3201 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3202 step = "Synchronize repos for k8s cluster '{}'".format(
3203 k8s_cluster_id
3204 )
3205 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3206
3207 # Synchronize repos
3208 if (
3209 k8sclustertype == "helm-chart"
3210 and cluster_uuid not in updated_cluster_list
3211 ) or (
3212 k8sclustertype == "helm-chart-v3"
3213 and cluster_uuid not in updated_v3_cluster_list
3214 ):
3215 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3216 self.k8scluster_map[k8sclustertype].synchronize_repos(
3217 cluster_uuid=cluster_uuid
3218 )
3219 )
3220 if del_repo_list or added_repo_dict:
3221 if k8sclustertype == "helm-chart":
3222 unset = {
3223 "_admin.helm_charts_added." + item: None
3224 for item in del_repo_list
3225 }
3226 updated = {
3227 "_admin.helm_charts_added." + item: name
3228 for item, name in added_repo_dict.items()
3229 }
3230 updated_cluster_list.append(cluster_uuid)
3231 elif k8sclustertype == "helm-chart-v3":
3232 unset = {
3233 "_admin.helm_charts_v3_added." + item: None
3234 for item in del_repo_list
3235 }
3236 updated = {
3237 "_admin.helm_charts_v3_added." + item: name
3238 for item, name in added_repo_dict.items()
3239 }
3240 updated_v3_cluster_list.append(cluster_uuid)
3241 self.logger.debug(
3242 logging_text + "repos synchronized on k8s cluster "
3243 "'{}' to_delete: {}, to_add: {}".format(
3244 k8s_cluster_id, del_repo_list, added_repo_dict
3245 )
3246 )
3247 self.db.set_one(
3248 "k8sclusters",
3249 {"_id": k8s_cluster_id},
3250 updated,
3251 unset=unset,
3252 )
3253
3254 # Instantiate kdu
3255 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3256 vnfr_data["member-vnf-index-ref"],
3257 kdur["kdu-name"],
3258 k8s_cluster_id,
3259 )
3260 k8s_instance_info = {
3261 "kdu-instance": None,
3262 "k8scluster-uuid": cluster_uuid,
3263 "k8scluster-type": k8sclustertype,
3264 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3265 "kdu-name": kdur["kdu-name"],
3266 "kdu-model": kdumodel,
3267 "namespace": namespace,
3268 "kdu-deployment-name": kdu_deployment_name,
3269 }
3270 db_path = "_admin.deployed.K8s.{}".format(index)
3271 db_nsr_update[db_path] = k8s_instance_info
3272 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3273 vnfd_with_id = find_in_list(
3274 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3275 )
3276 task = asyncio.ensure_future(
3277 self._install_kdu(
3278 nsr_id,
3279 db_path,
3280 vnfr_data,
3281 kdu_index,
3282 kdud,
3283 vnfd_with_id,
3284 k8s_instance_info,
3285 k8params=desc_params,
3286 timeout=600,
3287 vca_id=vca_id,
3288 )
3289 )
3290 self.lcm_tasks.register(
3291 "ns",
3292 nsr_id,
3293 nslcmop_id,
3294 "instantiate_KDU-{}".format(index),
3295 task,
3296 )
3297 task_instantiation_info[task] = "Deploying KDU {}".format(
3298 kdur["kdu-name"]
3299 )
3300
3301 index += 1
3302
3303 except (LcmException, asyncio.CancelledError):
3304 raise
3305 except Exception as e:
3306 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3307 if isinstance(e, (N2VCException, DbException)):
3308 self.logger.error(logging_text + msg)
3309 else:
3310 self.logger.critical(logging_text + msg, exc_info=True)
3311 raise LcmException(msg)
3312 finally:
3313 if db_nsr_update:
3314 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3315
3316 def _deploy_n2vc(
3317 self,
3318 logging_text,
3319 db_nsr,
3320 db_vnfr,
3321 nslcmop_id,
3322 nsr_id,
3323 nsi_id,
3324 vnfd_id,
3325 vdu_id,
3326 kdu_name,
3327 member_vnf_index,
3328 vdu_index,
3329 vdu_name,
3330 deploy_params,
3331 descriptor_config,
3332 base_folder,
3333 task_instantiation_info,
3334 stage,
3335 ):
3336 # launch instantiate_N2VC in a asyncio task and register task object
3337 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3338 # if not found, create one entry and update database
3339 # fill db_nsr._admin.deployed.VCA.<index>
3340
3341 self.logger.debug(
3342 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3343 )
3344 if "execution-environment-list" in descriptor_config:
3345 ee_list = descriptor_config.get("execution-environment-list", [])
3346 elif "juju" in descriptor_config:
3347 ee_list = [descriptor_config] # ns charms
3348 else: # other types as script are not supported
3349 ee_list = []
3350
3351 for ee_item in ee_list:
3352 self.logger.debug(
3353 logging_text
3354 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3355 ee_item.get("juju"), ee_item.get("helm-chart")
3356 )
3357 )
3358 ee_descriptor_id = ee_item.get("id")
3359 if ee_item.get("juju"):
3360 vca_name = ee_item["juju"].get("charm")
3361 vca_type = (
3362 "lxc_proxy_charm"
3363 if ee_item["juju"].get("charm") is not None
3364 else "native_charm"
3365 )
3366 if ee_item["juju"].get("cloud") == "k8s":
3367 vca_type = "k8s_proxy_charm"
3368 elif ee_item["juju"].get("proxy") is False:
3369 vca_type = "native_charm"
3370 elif ee_item.get("helm-chart"):
3371 vca_name = ee_item["helm-chart"]
3372 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3373 vca_type = "helm"
3374 else:
3375 vca_type = "helm-v3"
3376 else:
3377 self.logger.debug(
3378 logging_text + "skipping non juju neither charm configuration"
3379 )
3380 continue
3381
3382 vca_index = -1
3383 for vca_index, vca_deployed in enumerate(
3384 db_nsr["_admin"]["deployed"]["VCA"]
3385 ):
3386 if not vca_deployed:
3387 continue
3388 if (
3389 vca_deployed.get("member-vnf-index") == member_vnf_index
3390 and vca_deployed.get("vdu_id") == vdu_id
3391 and vca_deployed.get("kdu_name") == kdu_name
3392 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3393 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3394 ):
3395 break
3396 else:
3397 # not found, create one.
3398 target = (
3399 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3400 )
3401 if vdu_id:
3402 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3403 elif kdu_name:
3404 target += "/kdu/{}".format(kdu_name)
3405 vca_deployed = {
3406 "target_element": target,
3407 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3408 "member-vnf-index": member_vnf_index,
3409 "vdu_id": vdu_id,
3410 "kdu_name": kdu_name,
3411 "vdu_count_index": vdu_index,
3412 "operational-status": "init", # TODO revise
3413 "detailed-status": "", # TODO revise
3414 "step": "initial-deploy", # TODO revise
3415 "vnfd_id": vnfd_id,
3416 "vdu_name": vdu_name,
3417 "type": vca_type,
3418 "ee_descriptor_id": ee_descriptor_id,
3419 }
3420 vca_index += 1
3421
3422 # create VCA and configurationStatus in db
3423 db_dict = {
3424 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3425 "configurationStatus.{}".format(vca_index): dict(),
3426 }
3427 self.update_db_2("nsrs", nsr_id, db_dict)
3428
3429 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3430
3431 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3432 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3433 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3434
3435 # Launch task
3436 task_n2vc = asyncio.ensure_future(
3437 self.instantiate_N2VC(
3438 logging_text=logging_text,
3439 vca_index=vca_index,
3440 nsi_id=nsi_id,
3441 db_nsr=db_nsr,
3442 db_vnfr=db_vnfr,
3443 vdu_id=vdu_id,
3444 kdu_name=kdu_name,
3445 vdu_index=vdu_index,
3446 deploy_params=deploy_params,
3447 config_descriptor=descriptor_config,
3448 base_folder=base_folder,
3449 nslcmop_id=nslcmop_id,
3450 stage=stage,
3451 vca_type=vca_type,
3452 vca_name=vca_name,
3453 ee_config_descriptor=ee_item,
3454 )
3455 )
3456 self.lcm_tasks.register(
3457 "ns",
3458 nsr_id,
3459 nslcmop_id,
3460 "instantiate_N2VC-{}".format(vca_index),
3461 task_n2vc,
3462 )
3463 task_instantiation_info[
3464 task_n2vc
3465 ] = self.task_name_deploy_vca + " {}.{}".format(
3466 member_vnf_index or "", vdu_id or ""
3467 )
3468
3469 @staticmethod
3470 def _create_nslcmop(nsr_id, operation, params):
3471 """
3472 Creates a ns-lcm-opp content to be stored at database.
3473 :param nsr_id: internal id of the instance
3474 :param operation: instantiate, terminate, scale, action, ...
3475 :param params: user parameters for the operation
3476 :return: dictionary following SOL005 format
3477 """
3478 # Raise exception if invalid arguments
3479 if not (nsr_id and operation and params):
3480 raise LcmException(
3481 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3482 )
3483 now = time()
3484 _id = str(uuid4())
3485 nslcmop = {
3486 "id": _id,
3487 "_id": _id,
3488 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3489 "operationState": "PROCESSING",
3490 "statusEnteredTime": now,
3491 "nsInstanceId": nsr_id,
3492 "lcmOperationType": operation,
3493 "startTime": now,
3494 "isAutomaticInvocation": False,
3495 "operationParams": params,
3496 "isCancelPending": False,
3497 "links": {
3498 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3499 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3500 },
3501 }
3502 return nslcmop
3503
3504 def _format_additional_params(self, params):
3505 params = params or {}
3506 for key, value in params.items():
3507 if str(value).startswith("!!yaml "):
3508 params[key] = yaml.safe_load(value[7:])
3509 return params
3510
3511 def _get_terminate_primitive_params(self, seq, vnf_index):
3512 primitive = seq.get("name")
3513 primitive_params = {}
3514 params = {
3515 "member_vnf_index": vnf_index,
3516 "primitive": primitive,
3517 "primitive_params": primitive_params,
3518 }
3519 desc_params = {}
3520 return self._map_primitive_params(seq, params, desc_params)
3521
3522 # sub-operations
3523
3524 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3525 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3526 if op.get("operationState") == "COMPLETED":
3527 # b. Skip sub-operation
3528 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3529 return self.SUBOPERATION_STATUS_SKIP
3530 else:
3531 # c. retry executing sub-operation
3532 # The sub-operation exists, and operationState != 'COMPLETED'
3533 # Update operationState = 'PROCESSING' to indicate a retry.
3534 operationState = "PROCESSING"
3535 detailed_status = "In progress"
3536 self._update_suboperation_status(
3537 db_nslcmop, op_index, operationState, detailed_status
3538 )
3539 # Return the sub-operation index
3540 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3541 # with arguments extracted from the sub-operation
3542 return op_index
3543
3544 # Find a sub-operation where all keys in a matching dictionary must match
3545 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3546 def _find_suboperation(self, db_nslcmop, match):
3547 if db_nslcmop and match:
3548 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3549 for i, op in enumerate(op_list):
3550 if all(op.get(k) == match[k] for k in match):
3551 return i
3552 return self.SUBOPERATION_STATUS_NOT_FOUND
3553
3554 # Update status for a sub-operation given its index
3555 def _update_suboperation_status(
3556 self, db_nslcmop, op_index, operationState, detailed_status
3557 ):
3558 # Update DB for HA tasks
3559 q_filter = {"_id": db_nslcmop["_id"]}
3560 update_dict = {
3561 "_admin.operations.{}.operationState".format(op_index): operationState,
3562 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3563 }
3564 self.db.set_one(
3565 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3566 )
3567
3568 # Add sub-operation, return the index of the added sub-operation
3569 # Optionally, set operationState, detailed-status, and operationType
3570 # Status and type are currently set for 'scale' sub-operations:
3571 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3572 # 'detailed-status' : status message
3573 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3574 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3575 def _add_suboperation(
3576 self,
3577 db_nslcmop,
3578 vnf_index,
3579 vdu_id,
3580 vdu_count_index,
3581 vdu_name,
3582 primitive,
3583 mapped_primitive_params,
3584 operationState=None,
3585 detailed_status=None,
3586 operationType=None,
3587 RO_nsr_id=None,
3588 RO_scaling_info=None,
3589 ):
3590 if not db_nslcmop:
3591 return self.SUBOPERATION_STATUS_NOT_FOUND
3592 # Get the "_admin.operations" list, if it exists
3593 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3594 op_list = db_nslcmop_admin.get("operations")
3595 # Create or append to the "_admin.operations" list
3596 new_op = {
3597 "member_vnf_index": vnf_index,
3598 "vdu_id": vdu_id,
3599 "vdu_count_index": vdu_count_index,
3600 "primitive": primitive,
3601 "primitive_params": mapped_primitive_params,
3602 }
3603 if operationState:
3604 new_op["operationState"] = operationState
3605 if detailed_status:
3606 new_op["detailed-status"] = detailed_status
3607 if operationType:
3608 new_op["lcmOperationType"] = operationType
3609 if RO_nsr_id:
3610 new_op["RO_nsr_id"] = RO_nsr_id
3611 if RO_scaling_info:
3612 new_op["RO_scaling_info"] = RO_scaling_info
3613 if not op_list:
3614 # No existing operations, create key 'operations' with current operation as first list element
3615 db_nslcmop_admin.update({"operations": [new_op]})
3616 op_list = db_nslcmop_admin.get("operations")
3617 else:
3618 # Existing operations, append operation to list
3619 op_list.append(new_op)
3620
3621 db_nslcmop_update = {"_admin.operations": op_list}
3622 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3623 op_index = len(op_list) - 1
3624 return op_index
3625
3626 # Helper methods for scale() sub-operations
3627
3628 # pre-scale/post-scale:
3629 # Check for 3 different cases:
3630 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3631 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3632 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3633 def _check_or_add_scale_suboperation(
3634 self,
3635 db_nslcmop,
3636 vnf_index,
3637 vnf_config_primitive,
3638 primitive_params,
3639 operationType,
3640 RO_nsr_id=None,
3641 RO_scaling_info=None,
3642 ):
3643 # Find this sub-operation
3644 if RO_nsr_id and RO_scaling_info:
3645 operationType = "SCALE-RO"
3646 match = {
3647 "member_vnf_index": vnf_index,
3648 "RO_nsr_id": RO_nsr_id,
3649 "RO_scaling_info": RO_scaling_info,
3650 }
3651 else:
3652 match = {
3653 "member_vnf_index": vnf_index,
3654 "primitive": vnf_config_primitive,
3655 "primitive_params": primitive_params,
3656 "lcmOperationType": operationType,
3657 }
3658 op_index = self._find_suboperation(db_nslcmop, match)
3659 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3660 # a. New sub-operation
3661 # The sub-operation does not exist, add it.
3662 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3663 # The following parameters are set to None for all kind of scaling:
3664 vdu_id = None
3665 vdu_count_index = None
3666 vdu_name = None
3667 if RO_nsr_id and RO_scaling_info:
3668 vnf_config_primitive = None
3669 primitive_params = None
3670 else:
3671 RO_nsr_id = None
3672 RO_scaling_info = None
3673 # Initial status for sub-operation
3674 operationState = "PROCESSING"
3675 detailed_status = "In progress"
3676 # Add sub-operation for pre/post-scaling (zero or more operations)
3677 self._add_suboperation(
3678 db_nslcmop,
3679 vnf_index,
3680 vdu_id,
3681 vdu_count_index,
3682 vdu_name,
3683 vnf_config_primitive,
3684 primitive_params,
3685 operationState,
3686 detailed_status,
3687 operationType,
3688 RO_nsr_id,
3689 RO_scaling_info,
3690 )
3691 return self.SUBOPERATION_STATUS_NEW
3692 else:
3693 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3694 # or op_index (operationState != 'COMPLETED')
3695 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3696
3697 # Function to return execution_environment id
3698
3699 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3700 # TODO vdu_index_count
3701 for vca in vca_deployed_list:
3702 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3703 return vca["ee_id"]
3704
3705 async def destroy_N2VC(
3706 self,
3707 logging_text,
3708 db_nslcmop,
3709 vca_deployed,
3710 config_descriptor,
3711 vca_index,
3712 destroy_ee=True,
3713 exec_primitives=True,
3714 scaling_in=False,
3715 vca_id: str = None,
3716 ):
3717 """
3718 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3719 :param logging_text:
3720 :param db_nslcmop:
3721 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3722 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3723 :param vca_index: index in the database _admin.deployed.VCA
3724 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3725 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3726 not executed properly
3727 :param scaling_in: True destroys the application, False destroys the model
3728 :return: None or exception
3729 """
3730
3731 self.logger.debug(
3732 logging_text
3733 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3734 vca_index, vca_deployed, config_descriptor, destroy_ee
3735 )
3736 )
3737
3738 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3739
3740 # execute terminate_primitives
3741 if exec_primitives:
3742 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3743 config_descriptor.get("terminate-config-primitive"),
3744 vca_deployed.get("ee_descriptor_id"),
3745 )
3746 vdu_id = vca_deployed.get("vdu_id")
3747 vdu_count_index = vca_deployed.get("vdu_count_index")
3748 vdu_name = vca_deployed.get("vdu_name")
3749 vnf_index = vca_deployed.get("member-vnf-index")
3750 if terminate_primitives and vca_deployed.get("needed_terminate"):
3751 for seq in terminate_primitives:
3752 # For each sequence in list, get primitive and call _ns_execute_primitive()
3753 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3754 vnf_index, seq.get("name")
3755 )
3756 self.logger.debug(logging_text + step)
3757 # Create the primitive for each sequence, i.e. "primitive": "touch"
3758 primitive = seq.get("name")
3759 mapped_primitive_params = self._get_terminate_primitive_params(
3760 seq, vnf_index
3761 )
3762
3763 # Add sub-operation
3764 self._add_suboperation(
3765 db_nslcmop,
3766 vnf_index,
3767 vdu_id,
3768 vdu_count_index,
3769 vdu_name,
3770 primitive,
3771 mapped_primitive_params,
3772 )
3773 # Sub-operations: Call _ns_execute_primitive() instead of action()
3774 try:
3775 result, result_detail = await self._ns_execute_primitive(
3776 vca_deployed["ee_id"],
3777 primitive,
3778 mapped_primitive_params,
3779 vca_type=vca_type,
3780 vca_id=vca_id,
3781 )
3782 except LcmException:
3783 # this happens when VCA is not deployed. In this case it is not needed to terminate
3784 continue
3785 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3786 if result not in result_ok:
3787 raise LcmException(
3788 "terminate_primitive {} for vnf_member_index={} fails with "
3789 "error {}".format(seq.get("name"), vnf_index, result_detail)
3790 )
3791 # set that this VCA do not need terminated
3792 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3793 vca_index
3794 )
3795 self.update_db_2(
3796 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3797 )
3798
3799 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3800 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3801
3802 if destroy_ee:
3803 await self.vca_map[vca_type].delete_execution_environment(
3804 vca_deployed["ee_id"],
3805 scaling_in=scaling_in,
3806 vca_type=vca_type,
3807 vca_id=vca_id,
3808 )
3809
3810 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3811 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3812 namespace = "." + db_nsr["_id"]
3813 try:
3814 await self.n2vc.delete_namespace(
3815 namespace=namespace,
3816 total_timeout=self.timeout_charm_delete,
3817 vca_id=vca_id,
3818 )
3819 except N2VCNotFound: # already deleted. Skip
3820 pass
3821 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3822
3823 async def _terminate_RO(
3824 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3825 ):
3826 """
3827 Terminates a deployment from RO
3828 :param logging_text:
3829 :param nsr_deployed: db_nsr._admin.deployed
3830 :param nsr_id:
3831 :param nslcmop_id:
3832 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3833 this method will update only the index 2, but it will write on database the concatenated content of the list
3834 :return:
3835 """
3836 db_nsr_update = {}
3837 failed_detail = []
3838 ro_nsr_id = ro_delete_action = None
3839 if nsr_deployed and nsr_deployed.get("RO"):
3840 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3841 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3842 try:
3843 if ro_nsr_id:
3844 stage[2] = "Deleting ns from VIM."
3845 db_nsr_update["detailed-status"] = " ".join(stage)
3846 self._write_op_status(nslcmop_id, stage)
3847 self.logger.debug(logging_text + stage[2])
3848 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3849 self._write_op_status(nslcmop_id, stage)
3850 desc = await self.RO.delete("ns", ro_nsr_id)
3851 ro_delete_action = desc["action_id"]
3852 db_nsr_update[
3853 "_admin.deployed.RO.nsr_delete_action_id"
3854 ] = ro_delete_action
3855 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3856 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3857 if ro_delete_action:
3858 # wait until NS is deleted from VIM
3859 stage[2] = "Waiting ns deleted from VIM."
3860 detailed_status_old = None
3861 self.logger.debug(
3862 logging_text
3863 + stage[2]
3864 + " RO_id={} ro_delete_action={}".format(
3865 ro_nsr_id, ro_delete_action
3866 )
3867 )
3868 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3869 self._write_op_status(nslcmop_id, stage)
3870
3871 delete_timeout = 20 * 60 # 20 minutes
3872 while delete_timeout > 0:
3873 desc = await self.RO.show(
3874 "ns",
3875 item_id_name=ro_nsr_id,
3876 extra_item="action",
3877 extra_item_id=ro_delete_action,
3878 )
3879
3880 # deploymentStatus
3881 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3882
3883 ns_status, ns_status_info = self.RO.check_action_status(desc)
3884 if ns_status == "ERROR":
3885 raise ROclient.ROClientException(ns_status_info)
3886 elif ns_status == "BUILD":
3887 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3888 elif ns_status == "ACTIVE":
3889 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3890 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3891 break
3892 else:
3893 assert (
3894 False
3895 ), "ROclient.check_action_status returns unknown {}".format(
3896 ns_status
3897 )
3898 if stage[2] != detailed_status_old:
3899 detailed_status_old = stage[2]
3900 db_nsr_update["detailed-status"] = " ".join(stage)
3901 self._write_op_status(nslcmop_id, stage)
3902 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3903 await asyncio.sleep(5, loop=self.loop)
3904 delete_timeout -= 5
3905 else: # delete_timeout <= 0:
3906 raise ROclient.ROClientException(
3907 "Timeout waiting ns deleted from VIM"
3908 )
3909
3910 except Exception as e:
3911 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3912 if (
3913 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3914 ): # not found
3915 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3916 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3917 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3918 self.logger.debug(
3919 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3920 )
3921 elif (
3922 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3923 ): # conflict
3924 failed_detail.append("delete conflict: {}".format(e))
3925 self.logger.debug(
3926 logging_text
3927 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3928 )
3929 else:
3930 failed_detail.append("delete error: {}".format(e))
3931 self.logger.error(
3932 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3933 )
3934
3935 # Delete nsd
3936 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3937 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3938 try:
3939 stage[2] = "Deleting nsd from RO."
3940 db_nsr_update["detailed-status"] = " ".join(stage)
3941 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3942 self._write_op_status(nslcmop_id, stage)
3943 await self.RO.delete("nsd", ro_nsd_id)
3944 self.logger.debug(
3945 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3946 )
3947 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3948 except Exception as e:
3949 if (
3950 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3951 ): # not found
3952 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3953 self.logger.debug(
3954 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3955 )
3956 elif (
3957 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3958 ): # conflict
3959 failed_detail.append(
3960 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3961 )
3962 self.logger.debug(logging_text + failed_detail[-1])
3963 else:
3964 failed_detail.append(
3965 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
3966 )
3967 self.logger.error(logging_text + failed_detail[-1])
3968
3969 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3970 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3971 if not vnf_deployed or not vnf_deployed["id"]:
3972 continue
3973 try:
3974 ro_vnfd_id = vnf_deployed["id"]
3975 stage[
3976 2
3977 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3978 vnf_deployed["member-vnf-index"], ro_vnfd_id
3979 )
3980 db_nsr_update["detailed-status"] = " ".join(stage)
3981 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3982 self._write_op_status(nslcmop_id, stage)
3983 await self.RO.delete("vnfd", ro_vnfd_id)
3984 self.logger.debug(
3985 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
3986 )
3987 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3988 except Exception as e:
3989 if (
3990 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3991 ): # not found
3992 db_nsr_update[
3993 "_admin.deployed.RO.vnfd.{}.id".format(index)
3994 ] = None
3995 self.logger.debug(
3996 logging_text
3997 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
3998 )
3999 elif (
4000 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4001 ): # conflict
4002 failed_detail.append(
4003 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4004 )
4005 self.logger.debug(logging_text + failed_detail[-1])
4006 else:
4007 failed_detail.append(
4008 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4009 )
4010 self.logger.error(logging_text + failed_detail[-1])
4011
4012 if failed_detail:
4013 stage[2] = "Error deleting from VIM"
4014 else:
4015 stage[2] = "Deleted from VIM"
4016 db_nsr_update["detailed-status"] = " ".join(stage)
4017 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4018 self._write_op_status(nslcmop_id, stage)
4019
4020 if failed_detail:
4021 raise LcmException("; ".join(failed_detail))
4022
4023 async def terminate(self, nsr_id, nslcmop_id):
4024 # Try to lock HA task here
4025 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4026 if not task_is_locked_by_me:
4027 return
4028
4029 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4030 self.logger.debug(logging_text + "Enter")
4031 timeout_ns_terminate = self.timeout_ns_terminate
4032 db_nsr = None
4033 db_nslcmop = None
4034 operation_params = None
4035 exc = None
4036 error_list = [] # annotates all failed error messages
4037 db_nslcmop_update = {}
4038 autoremove = False # autoremove after terminated
4039 tasks_dict_info = {}
4040 db_nsr_update = {}
4041 stage = [
4042 "Stage 1/3: Preparing task.",
4043 "Waiting for previous operations to terminate.",
4044 "",
4045 ]
4046 # ^ contains [stage, step, VIM-status]
4047 try:
4048 # wait for any previous tasks in process
4049 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4050
4051 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4052 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4053 operation_params = db_nslcmop.get("operationParams") or {}
4054 if operation_params.get("timeout_ns_terminate"):
4055 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4056 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4057 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4058
4059 db_nsr_update["operational-status"] = "terminating"
4060 db_nsr_update["config-status"] = "terminating"
4061 self._write_ns_status(
4062 nsr_id=nsr_id,
4063 ns_state="TERMINATING",
4064 current_operation="TERMINATING",
4065 current_operation_id=nslcmop_id,
4066 other_update=db_nsr_update,
4067 )
4068 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4069 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4070 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4071 return
4072
4073 stage[1] = "Getting vnf descriptors from db."
4074 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4075 db_vnfrs_dict = {
4076 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4077 }
4078 db_vnfds_from_id = {}
4079 db_vnfds_from_member_index = {}
4080 # Loop over VNFRs
4081 for vnfr in db_vnfrs_list:
4082 vnfd_id = vnfr["vnfd-id"]
4083 if vnfd_id not in db_vnfds_from_id:
4084 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4085 db_vnfds_from_id[vnfd_id] = vnfd
4086 db_vnfds_from_member_index[
4087 vnfr["member-vnf-index-ref"]
4088 ] = db_vnfds_from_id[vnfd_id]
4089
4090 # Destroy individual execution environments when there are terminating primitives.
4091 # Rest of EE will be deleted at once
4092 # TODO - check before calling _destroy_N2VC
4093 # if not operation_params.get("skip_terminate_primitives"):#
4094 # or not vca.get("needed_terminate"):
4095 stage[0] = "Stage 2/3 execute terminating primitives."
4096 self.logger.debug(logging_text + stage[0])
4097 stage[1] = "Looking execution environment that needs terminate."
4098 self.logger.debug(logging_text + stage[1])
4099
4100 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4101 config_descriptor = None
4102 vca_member_vnf_index = vca.get("member-vnf-index")
4103 vca_id = self.get_vca_id(
4104 db_vnfrs_dict.get(vca_member_vnf_index)
4105 if vca_member_vnf_index
4106 else None,
4107 db_nsr,
4108 )
4109 if not vca or not vca.get("ee_id"):
4110 continue
4111 if not vca.get("member-vnf-index"):
4112 # ns
4113 config_descriptor = db_nsr.get("ns-configuration")
4114 elif vca.get("vdu_id"):
4115 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4116 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4117 elif vca.get("kdu_name"):
4118 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4119 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4120 else:
4121 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4122 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4123 vca_type = vca.get("type")
4124 exec_terminate_primitives = not operation_params.get(
4125 "skip_terminate_primitives"
4126 ) and vca.get("needed_terminate")
4127 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4128 # pending native charms
4129 destroy_ee = (
4130 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4131 )
4132 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4133 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4134 task = asyncio.ensure_future(
4135 self.destroy_N2VC(
4136 logging_text,
4137 db_nslcmop,
4138 vca,
4139 config_descriptor,
4140 vca_index,
4141 destroy_ee,
4142 exec_terminate_primitives,
4143 vca_id=vca_id,
4144 )
4145 )
4146 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4147
4148 # wait for pending tasks of terminate primitives
4149 if tasks_dict_info:
4150 self.logger.debug(
4151 logging_text
4152 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4153 )
4154 error_list = await self._wait_for_tasks(
4155 logging_text,
4156 tasks_dict_info,
4157 min(self.timeout_charm_delete, timeout_ns_terminate),
4158 stage,
4159 nslcmop_id,
4160 )
4161 tasks_dict_info.clear()
4162 if error_list:
4163 return # raise LcmException("; ".join(error_list))
4164
4165 # remove All execution environments at once
4166 stage[0] = "Stage 3/3 delete all."
4167
4168 if nsr_deployed.get("VCA"):
4169 stage[1] = "Deleting all execution environments."
4170 self.logger.debug(logging_text + stage[1])
4171 vca_id = self.get_vca_id({}, db_nsr)
4172 task_delete_ee = asyncio.ensure_future(
4173 asyncio.wait_for(
4174 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4175 timeout=self.timeout_charm_delete,
4176 )
4177 )
4178 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4179 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4180
4181 # Delete from k8scluster
4182 stage[1] = "Deleting KDUs."
4183 self.logger.debug(logging_text + stage[1])
4184 # print(nsr_deployed)
4185 for kdu in get_iterable(nsr_deployed, "K8s"):
4186 if not kdu or not kdu.get("kdu-instance"):
4187 continue
4188 kdu_instance = kdu.get("kdu-instance")
4189 if kdu.get("k8scluster-type") in self.k8scluster_map:
4190 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4191 vca_id = self.get_vca_id({}, db_nsr)
4192 task_delete_kdu_instance = asyncio.ensure_future(
4193 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4194 cluster_uuid=kdu.get("k8scluster-uuid"),
4195 kdu_instance=kdu_instance,
4196 vca_id=vca_id,
4197 )
4198 )
4199 else:
4200 self.logger.error(
4201 logging_text
4202 + "Unknown k8s deployment type {}".format(
4203 kdu.get("k8scluster-type")
4204 )
4205 )
4206 continue
4207 tasks_dict_info[
4208 task_delete_kdu_instance
4209 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4210
4211 # remove from RO
4212 stage[1] = "Deleting ns from VIM."
4213 if self.ng_ro:
4214 task_delete_ro = asyncio.ensure_future(
4215 self._terminate_ng_ro(
4216 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4217 )
4218 )
4219 else:
4220 task_delete_ro = asyncio.ensure_future(
4221 self._terminate_RO(
4222 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4223 )
4224 )
4225 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4226
4227 # rest of staff will be done at finally
4228
4229 except (
4230 ROclient.ROClientException,
4231 DbException,
4232 LcmException,
4233 N2VCException,
4234 ) as e:
4235 self.logger.error(logging_text + "Exit Exception {}".format(e))
4236 exc = e
4237 except asyncio.CancelledError:
4238 self.logger.error(
4239 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4240 )
4241 exc = "Operation was cancelled"
4242 except Exception as e:
4243 exc = traceback.format_exc()
4244 self.logger.critical(
4245 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4246 exc_info=True,
4247 )
4248 finally:
4249 if exc:
4250 error_list.append(str(exc))
4251 try:
4252 # wait for pending tasks
4253 if tasks_dict_info:
4254 stage[1] = "Waiting for terminate pending tasks."
4255 self.logger.debug(logging_text + stage[1])
4256 error_list += await self._wait_for_tasks(
4257 logging_text,
4258 tasks_dict_info,
4259 timeout_ns_terminate,
4260 stage,
4261 nslcmop_id,
4262 )
4263 stage[1] = stage[2] = ""
4264 except asyncio.CancelledError:
4265 error_list.append("Cancelled")
4266 # TODO cancell all tasks
4267 except Exception as exc:
4268 error_list.append(str(exc))
4269 # update status at database
4270 if error_list:
4271 error_detail = "; ".join(error_list)
4272 # self.logger.error(logging_text + error_detail)
4273 error_description_nslcmop = "{} Detail: {}".format(
4274 stage[0], error_detail
4275 )
4276 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4277 nslcmop_id, stage[0]
4278 )
4279
4280 db_nsr_update["operational-status"] = "failed"
4281 db_nsr_update["detailed-status"] = (
4282 error_description_nsr + " Detail: " + error_detail
4283 )
4284 db_nslcmop_update["detailed-status"] = error_detail
4285 nslcmop_operation_state = "FAILED"
4286 ns_state = "BROKEN"
4287 else:
4288 error_detail = None
4289 error_description_nsr = error_description_nslcmop = None
4290 ns_state = "NOT_INSTANTIATED"
4291 db_nsr_update["operational-status"] = "terminated"
4292 db_nsr_update["detailed-status"] = "Done"
4293 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4294 db_nslcmop_update["detailed-status"] = "Done"
4295 nslcmop_operation_state = "COMPLETED"
4296
4297 if db_nsr:
4298 self._write_ns_status(
4299 nsr_id=nsr_id,
4300 ns_state=ns_state,
4301 current_operation="IDLE",
4302 current_operation_id=None,
4303 error_description=error_description_nsr,
4304 error_detail=error_detail,
4305 other_update=db_nsr_update,
4306 )
4307 self._write_op_status(
4308 op_id=nslcmop_id,
4309 stage="",
4310 error_message=error_description_nslcmop,
4311 operation_state=nslcmop_operation_state,
4312 other_update=db_nslcmop_update,
4313 )
4314 if ns_state == "NOT_INSTANTIATED":
4315 try:
4316 self.db.set_list(
4317 "vnfrs",
4318 {"nsr-id-ref": nsr_id},
4319 {"_admin.nsState": "NOT_INSTANTIATED"},
4320 )
4321 except DbException as e:
4322 self.logger.warn(
4323 logging_text
4324 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4325 nsr_id, e
4326 )
4327 )
4328 if operation_params:
4329 autoremove = operation_params.get("autoremove", False)
4330 if nslcmop_operation_state:
4331 try:
4332 await self.msg.aiowrite(
4333 "ns",
4334 "terminated",
4335 {
4336 "nsr_id": nsr_id,
4337 "nslcmop_id": nslcmop_id,
4338 "operationState": nslcmop_operation_state,
4339 "autoremove": autoremove,
4340 },
4341 loop=self.loop,
4342 )
4343 except Exception as e:
4344 self.logger.error(
4345 logging_text + "kafka_write notification Exception {}".format(e)
4346 )
4347
4348 self.logger.debug(logging_text + "Exit")
4349 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4350
4351 async def _wait_for_tasks(
4352 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4353 ):
4354 time_start = time()
4355 error_detail_list = []
4356 error_list = []
4357 pending_tasks = list(created_tasks_info.keys())
4358 num_tasks = len(pending_tasks)
4359 num_done = 0
4360 stage[1] = "{}/{}.".format(num_done, num_tasks)
4361 self._write_op_status(nslcmop_id, stage)
4362 while pending_tasks:
4363 new_error = None
4364 _timeout = timeout + time_start - time()
4365 done, pending_tasks = await asyncio.wait(
4366 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4367 )
4368 num_done += len(done)
4369 if not done: # Timeout
4370 for task in pending_tasks:
4371 new_error = created_tasks_info[task] + ": Timeout"
4372 error_detail_list.append(new_error)
4373 error_list.append(new_error)
4374 break
4375 for task in done:
4376 if task.cancelled():
4377 exc = "Cancelled"
4378 else:
4379 exc = task.exception()
4380 if exc:
4381 if isinstance(exc, asyncio.TimeoutError):
4382 exc = "Timeout"
4383 new_error = created_tasks_info[task] + ": {}".format(exc)
4384 error_list.append(created_tasks_info[task])
4385 error_detail_list.append(new_error)
4386 if isinstance(
4387 exc,
4388 (
4389 str,
4390 DbException,
4391 N2VCException,
4392 ROclient.ROClientException,
4393 LcmException,
4394 K8sException,
4395 NgRoException,
4396 ),
4397 ):
4398 self.logger.error(logging_text + new_error)
4399 else:
4400 exc_traceback = "".join(
4401 traceback.format_exception(None, exc, exc.__traceback__)
4402 )
4403 self.logger.error(
4404 logging_text
4405 + created_tasks_info[task]
4406 + " "
4407 + exc_traceback
4408 )
4409 else:
4410 self.logger.debug(
4411 logging_text + created_tasks_info[task] + ": Done"
4412 )
4413 stage[1] = "{}/{}.".format(num_done, num_tasks)
4414 if new_error:
4415 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4416 if nsr_id: # update also nsr
4417 self.update_db_2(
4418 "nsrs",
4419 nsr_id,
4420 {
4421 "errorDescription": "Error at: " + ", ".join(error_list),
4422 "errorDetail": ". ".join(error_detail_list),
4423 },
4424 )
4425 self._write_op_status(nslcmop_id, stage)
4426 return error_detail_list
4427
4428 @staticmethod
4429 def _map_primitive_params(primitive_desc, params, instantiation_params):
4430 """
4431 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4432 The default-value is used. If it is between < > it look for a value at instantiation_params
4433 :param primitive_desc: portion of VNFD/NSD that describes primitive
4434 :param params: Params provided by user
4435 :param instantiation_params: Instantiation params provided by user
4436 :return: a dictionary with the calculated params
4437 """
4438 calculated_params = {}
4439 for parameter in primitive_desc.get("parameter", ()):
4440 param_name = parameter["name"]
4441 if param_name in params:
4442 calculated_params[param_name] = params[param_name]
4443 elif "default-value" in parameter or "value" in parameter:
4444 if "value" in parameter:
4445 calculated_params[param_name] = parameter["value"]
4446 else:
4447 calculated_params[param_name] = parameter["default-value"]
4448 if (
4449 isinstance(calculated_params[param_name], str)
4450 and calculated_params[param_name].startswith("<")
4451 and calculated_params[param_name].endswith(">")
4452 ):
4453 if calculated_params[param_name][1:-1] in instantiation_params:
4454 calculated_params[param_name] = instantiation_params[
4455 calculated_params[param_name][1:-1]
4456 ]
4457 else:
4458 raise LcmException(
4459 "Parameter {} needed to execute primitive {} not provided".format(
4460 calculated_params[param_name], primitive_desc["name"]
4461 )
4462 )
4463 else:
4464 raise LcmException(
4465 "Parameter {} needed to execute primitive {} not provided".format(
4466 param_name, primitive_desc["name"]
4467 )
4468 )
4469
4470 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4471 calculated_params[param_name] = yaml.safe_dump(
4472 calculated_params[param_name], default_flow_style=True, width=256
4473 )
4474 elif isinstance(calculated_params[param_name], str) and calculated_params[
4475 param_name
4476 ].startswith("!!yaml "):
4477 calculated_params[param_name] = calculated_params[param_name][7:]
4478 if parameter.get("data-type") == "INTEGER":
4479 try:
4480 calculated_params[param_name] = int(calculated_params[param_name])
4481 except ValueError: # error converting string to int
4482 raise LcmException(
4483 "Parameter {} of primitive {} must be integer".format(
4484 param_name, primitive_desc["name"]
4485 )
4486 )
4487 elif parameter.get("data-type") == "BOOLEAN":
4488 calculated_params[param_name] = not (
4489 (str(calculated_params[param_name])).lower() == "false"
4490 )
4491
4492 # add always ns_config_info if primitive name is config
4493 if primitive_desc["name"] == "config":
4494 if "ns_config_info" in instantiation_params:
4495 calculated_params["ns_config_info"] = instantiation_params[
4496 "ns_config_info"
4497 ]
4498 return calculated_params
4499
4500 def _look_for_deployed_vca(
4501 self,
4502 deployed_vca,
4503 member_vnf_index,
4504 vdu_id,
4505 vdu_count_index,
4506 kdu_name=None,
4507 ee_descriptor_id=None,
4508 ):
4509 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4510 for vca in deployed_vca:
4511 if not vca:
4512 continue
4513 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4514 continue
4515 if (
4516 vdu_count_index is not None
4517 and vdu_count_index != vca["vdu_count_index"]
4518 ):
4519 continue
4520 if kdu_name and kdu_name != vca["kdu_name"]:
4521 continue
4522 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4523 continue
4524 break
4525 else:
4526 # vca_deployed not found
4527 raise LcmException(
4528 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4529 " is not deployed".format(
4530 member_vnf_index,
4531 vdu_id,
4532 vdu_count_index,
4533 kdu_name,
4534 ee_descriptor_id,
4535 )
4536 )
4537 # get ee_id
4538 ee_id = vca.get("ee_id")
4539 vca_type = vca.get(
4540 "type", "lxc_proxy_charm"
4541 ) # default value for backward compatibility - proxy charm
4542 if not ee_id:
4543 raise LcmException(
4544 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4545 "execution environment".format(
4546 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4547 )
4548 )
4549 return ee_id, vca_type
4550
4551 async def _ns_execute_primitive(
4552 self,
4553 ee_id,
4554 primitive,
4555 primitive_params,
4556 retries=0,
4557 retries_interval=30,
4558 timeout=None,
4559 vca_type=None,
4560 db_dict=None,
4561 vca_id: str = None,
4562 ) -> (str, str):
4563 try:
4564 if primitive == "config":
4565 primitive_params = {"params": primitive_params}
4566
4567 vca_type = vca_type or "lxc_proxy_charm"
4568
4569 while retries >= 0:
4570 try:
4571 output = await asyncio.wait_for(
4572 self.vca_map[vca_type].exec_primitive(
4573 ee_id=ee_id,
4574 primitive_name=primitive,
4575 params_dict=primitive_params,
4576 progress_timeout=self.timeout_progress_primitive,
4577 total_timeout=self.timeout_primitive,
4578 db_dict=db_dict,
4579 vca_id=vca_id,
4580 vca_type=vca_type,
4581 ),
4582 timeout=timeout or self.timeout_primitive,
4583 )
4584 # execution was OK
4585 break
4586 except asyncio.CancelledError:
4587 raise
4588 except Exception as e: # asyncio.TimeoutError
4589 if isinstance(e, asyncio.TimeoutError):
4590 e = "Timeout"
4591 retries -= 1
4592 if retries >= 0:
4593 self.logger.debug(
4594 "Error executing action {} on {} -> {}".format(
4595 primitive, ee_id, e
4596 )
4597 )
4598 # wait and retry
4599 await asyncio.sleep(retries_interval, loop=self.loop)
4600 else:
4601 return "FAILED", str(e)
4602
4603 return "COMPLETED", output
4604
4605 except (LcmException, asyncio.CancelledError):
4606 raise
4607 except Exception as e:
4608 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4609
4610 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4611 """
4612 Updating the vca_status with latest juju information in nsrs record
4613 :param: nsr_id: Id of the nsr
4614 :param: nslcmop_id: Id of the nslcmop
4615 :return: None
4616 """
4617
4618 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4619 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4620 vca_id = self.get_vca_id({}, db_nsr)
4621 if db_nsr["_admin"]["deployed"]["K8s"]:
4622 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4623 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4624 await self._on_update_k8s_db(
4625 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4626 )
4627 else:
4628 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4629 table, filter = "nsrs", {"_id": nsr_id}
4630 path = "_admin.deployed.VCA.{}.".format(vca_index)
4631 await self._on_update_n2vc_db(table, filter, path, {})
4632
4633 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4634 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4635
4636 async def action(self, nsr_id, nslcmop_id):
4637 # Try to lock HA task here
4638 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4639 if not task_is_locked_by_me:
4640 return
4641
4642 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4643 self.logger.debug(logging_text + "Enter")
4644 # get all needed from database
4645 db_nsr = None
4646 db_nslcmop = None
4647 db_nsr_update = {}
4648 db_nslcmop_update = {}
4649 nslcmop_operation_state = None
4650 error_description_nslcmop = None
4651 exc = None
4652 try:
4653 # wait for any previous tasks in process
4654 step = "Waiting for previous operations to terminate"
4655 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4656
4657 self._write_ns_status(
4658 nsr_id=nsr_id,
4659 ns_state=None,
4660 current_operation="RUNNING ACTION",
4661 current_operation_id=nslcmop_id,
4662 )
4663
4664 step = "Getting information from database"
4665 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4666 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4667 if db_nslcmop["operationParams"].get("primitive_params"):
4668 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4669 db_nslcmop["operationParams"]["primitive_params"]
4670 )
4671
4672 nsr_deployed = db_nsr["_admin"].get("deployed")
4673 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4674 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4675 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4676 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4677 primitive = db_nslcmop["operationParams"]["primitive"]
4678 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4679 timeout_ns_action = db_nslcmop["operationParams"].get(
4680 "timeout_ns_action", self.timeout_primitive
4681 )
4682
4683 if vnf_index:
4684 step = "Getting vnfr from database"
4685 db_vnfr = self.db.get_one(
4686 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4687 )
4688 if db_vnfr.get("kdur"):
4689 kdur_list = []
4690 for kdur in db_vnfr["kdur"]:
4691 if kdur.get("additionalParams"):
4692 kdur["additionalParams"] = json.loads(
4693 kdur["additionalParams"]
4694 )
4695 kdur_list.append(kdur)
4696 db_vnfr["kdur"] = kdur_list
4697 step = "Getting vnfd from database"
4698 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4699 else:
4700 step = "Getting nsd from database"
4701 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4702
4703 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4704 # for backward compatibility
4705 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4706 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4707 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4709
4710 # look for primitive
4711 config_primitive_desc = descriptor_configuration = None
4712 if vdu_id:
4713 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4714 elif kdu_name:
4715 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4716 elif vnf_index:
4717 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4718 else:
4719 descriptor_configuration = db_nsd.get("ns-configuration")
4720
4721 if descriptor_configuration and descriptor_configuration.get(
4722 "config-primitive"
4723 ):
4724 for config_primitive in descriptor_configuration["config-primitive"]:
4725 if config_primitive["name"] == primitive:
4726 config_primitive_desc = config_primitive
4727 break
4728
4729 if not config_primitive_desc:
4730 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4731 raise LcmException(
4732 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4733 primitive
4734 )
4735 )
4736 primitive_name = primitive
4737 ee_descriptor_id = None
4738 else:
4739 primitive_name = config_primitive_desc.get(
4740 "execution-environment-primitive", primitive
4741 )
4742 ee_descriptor_id = config_primitive_desc.get(
4743 "execution-environment-ref"
4744 )
4745
4746 if vnf_index:
4747 if vdu_id:
4748 vdur = next(
4749 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4750 )
4751 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4752 elif kdu_name:
4753 kdur = next(
4754 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4755 )
4756 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4757 else:
4758 desc_params = parse_yaml_strings(
4759 db_vnfr.get("additionalParamsForVnf")
4760 )
4761 else:
4762 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4763 if kdu_name and get_configuration(db_vnfd, kdu_name):
4764 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4765 actions = set()
4766 for primitive in kdu_configuration.get("initial-config-primitive", []):
4767 actions.add(primitive["name"])
4768 for primitive in kdu_configuration.get("config-primitive", []):
4769 actions.add(primitive["name"])
4770 kdu_action = True if primitive_name in actions else False
4771
4772 # TODO check if ns is in a proper status
4773 if kdu_name and (
4774 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4775 ):
4776 # kdur and desc_params already set from before
4777 if primitive_params:
4778 desc_params.update(primitive_params)
4779 # TODO Check if we will need something at vnf level
4780 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4781 if (
4782 kdu_name == kdu["kdu-name"]
4783 and kdu["member-vnf-index"] == vnf_index
4784 ):
4785 break
4786 else:
4787 raise LcmException(
4788 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4789 )
4790
4791 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4792 msg = "unknown k8scluster-type '{}'".format(
4793 kdu.get("k8scluster-type")
4794 )
4795 raise LcmException(msg)
4796
4797 db_dict = {
4798 "collection": "nsrs",
4799 "filter": {"_id": nsr_id},
4800 "path": "_admin.deployed.K8s.{}".format(index),
4801 }
4802 self.logger.debug(
4803 logging_text
4804 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4805 )
4806 step = "Executing kdu {}".format(primitive_name)
4807 if primitive_name == "upgrade":
4808 if desc_params.get("kdu_model"):
4809 kdu_model = desc_params.get("kdu_model")
4810 del desc_params["kdu_model"]
4811 else:
4812 kdu_model = kdu.get("kdu-model")
4813 parts = kdu_model.split(sep=":")
4814 if len(parts) == 2:
4815 kdu_model = parts[0]
4816
4817 detailed_status = await asyncio.wait_for(
4818 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4819 cluster_uuid=kdu.get("k8scluster-uuid"),
4820 kdu_instance=kdu.get("kdu-instance"),
4821 atomic=True,
4822 kdu_model=kdu_model,
4823 params=desc_params,
4824 db_dict=db_dict,
4825 timeout=timeout_ns_action,
4826 ),
4827 timeout=timeout_ns_action + 10,
4828 )
4829 self.logger.debug(
4830 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4831 )
4832 elif primitive_name == "rollback":
4833 detailed_status = await asyncio.wait_for(
4834 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4835 cluster_uuid=kdu.get("k8scluster-uuid"),
4836 kdu_instance=kdu.get("kdu-instance"),
4837 db_dict=db_dict,
4838 ),
4839 timeout=timeout_ns_action,
4840 )
4841 elif primitive_name == "status":
4842 detailed_status = await asyncio.wait_for(
4843 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4844 cluster_uuid=kdu.get("k8scluster-uuid"),
4845 kdu_instance=kdu.get("kdu-instance"),
4846 vca_id=vca_id,
4847 ),
4848 timeout=timeout_ns_action,
4849 )
4850 else:
4851 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4852 kdu["kdu-name"], nsr_id
4853 )
4854 params = self._map_primitive_params(
4855 config_primitive_desc, primitive_params, desc_params
4856 )
4857
4858 detailed_status = await asyncio.wait_for(
4859 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4860 cluster_uuid=kdu.get("k8scluster-uuid"),
4861 kdu_instance=kdu_instance,
4862 primitive_name=primitive_name,
4863 params=params,
4864 db_dict=db_dict,
4865 timeout=timeout_ns_action,
4866 vca_id=vca_id,
4867 ),
4868 timeout=timeout_ns_action,
4869 )
4870
4871 if detailed_status:
4872 nslcmop_operation_state = "COMPLETED"
4873 else:
4874 detailed_status = ""
4875 nslcmop_operation_state = "FAILED"
4876 else:
4877 ee_id, vca_type = self._look_for_deployed_vca(
4878 nsr_deployed["VCA"],
4879 member_vnf_index=vnf_index,
4880 vdu_id=vdu_id,
4881 vdu_count_index=vdu_count_index,
4882 ee_descriptor_id=ee_descriptor_id,
4883 )
4884 for vca_index, vca_deployed in enumerate(
4885 db_nsr["_admin"]["deployed"]["VCA"]
4886 ):
4887 if vca_deployed.get("member-vnf-index") == vnf_index:
4888 db_dict = {
4889 "collection": "nsrs",
4890 "filter": {"_id": nsr_id},
4891 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4892 }
4893 break
4894 (
4895 nslcmop_operation_state,
4896 detailed_status,
4897 ) = await self._ns_execute_primitive(
4898 ee_id,
4899 primitive=primitive_name,
4900 primitive_params=self._map_primitive_params(
4901 config_primitive_desc, primitive_params, desc_params
4902 ),
4903 timeout=timeout_ns_action,
4904 vca_type=vca_type,
4905 db_dict=db_dict,
4906 vca_id=vca_id,
4907 )
4908
4909 db_nslcmop_update["detailed-status"] = detailed_status
4910 error_description_nslcmop = (
4911 detailed_status if nslcmop_operation_state == "FAILED" else ""
4912 )
4913 self.logger.debug(
4914 logging_text
4915 + " task Done with result {} {}".format(
4916 nslcmop_operation_state, detailed_status
4917 )
4918 )
4919 return # database update is called inside finally
4920
4921 except (DbException, LcmException, N2VCException, K8sException) as e:
4922 self.logger.error(logging_text + "Exit Exception {}".format(e))
4923 exc = e
4924 except asyncio.CancelledError:
4925 self.logger.error(
4926 logging_text + "Cancelled Exception while '{}'".format(step)
4927 )
4928 exc = "Operation was cancelled"
4929 except asyncio.TimeoutError:
4930 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4931 exc = "Timeout"
4932 except Exception as e:
4933 exc = traceback.format_exc()
4934 self.logger.critical(
4935 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4936 exc_info=True,
4937 )
4938 finally:
4939 if exc:
4940 db_nslcmop_update[
4941 "detailed-status"
4942 ] = (
4943 detailed_status
4944 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4945 nslcmop_operation_state = "FAILED"
4946 if db_nsr:
4947 self._write_ns_status(
4948 nsr_id=nsr_id,
4949 ns_state=db_nsr[
4950 "nsState"
4951 ], # TODO check if degraded. For the moment use previous status
4952 current_operation="IDLE",
4953 current_operation_id=None,
4954 # error_description=error_description_nsr,
4955 # error_detail=error_detail,
4956 other_update=db_nsr_update,
4957 )
4958
4959 self._write_op_status(
4960 op_id=nslcmop_id,
4961 stage="",
4962 error_message=error_description_nslcmop,
4963 operation_state=nslcmop_operation_state,
4964 other_update=db_nslcmop_update,
4965 )
4966
4967 if nslcmop_operation_state:
4968 try:
4969 await self.msg.aiowrite(
4970 "ns",
4971 "actioned",
4972 {
4973 "nsr_id": nsr_id,
4974 "nslcmop_id": nslcmop_id,
4975 "operationState": nslcmop_operation_state,
4976 },
4977 loop=self.loop,
4978 )
4979 except Exception as e:
4980 self.logger.error(
4981 logging_text + "kafka_write notification Exception {}".format(e)
4982 )
4983 self.logger.debug(logging_text + "Exit")
4984 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4985 return nslcmop_operation_state, detailed_status
4986
4987 async def scale(self, nsr_id, nslcmop_id):
4988 # Try to lock HA task here
4989 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4990 if not task_is_locked_by_me:
4991 return
4992
4993 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4994 stage = ["", "", ""]
4995 tasks_dict_info = {}
4996 # ^ stage, step, VIM progress
4997 self.logger.debug(logging_text + "Enter")
4998 # get all needed from database
4999 db_nsr = None
5000 db_nslcmop_update = {}
5001 db_nsr_update = {}
5002 exc = None
5003 # in case of error, indicates what part of scale was failed to put nsr at error status
5004 scale_process = None
5005 old_operational_status = ""
5006 old_config_status = ""
5007 nsi_id = None
5008 try:
5009 # wait for any previous tasks in process
5010 step = "Waiting for previous operations to terminate"
5011 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5012 self._write_ns_status(
5013 nsr_id=nsr_id,
5014 ns_state=None,
5015 current_operation="SCALING",
5016 current_operation_id=nslcmop_id,
5017 )
5018
5019 step = "Getting nslcmop from database"
5020 self.logger.debug(
5021 step + " after having waited for previous tasks to be completed"
5022 )
5023 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5024
5025 step = "Getting nsr from database"
5026 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5027 old_operational_status = db_nsr["operational-status"]
5028 old_config_status = db_nsr["config-status"]
5029
5030 step = "Parsing scaling parameters"
5031 db_nsr_update["operational-status"] = "scaling"
5032 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5033 nsr_deployed = db_nsr["_admin"].get("deployed")
5034
5035 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5036 "scaleByStepData"
5037 ]["member-vnf-index"]
5038 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5039 "scaleByStepData"
5040 ]["scaling-group-descriptor"]
5041 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5042 # for backward compatibility
5043 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5044 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5045 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5046 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5047
5048 step = "Getting vnfr from database"
5049 db_vnfr = self.db.get_one(
5050 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5051 )
5052
5053 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5054
5055 step = "Getting vnfd from database"
5056 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5057
5058 base_folder = db_vnfd["_admin"]["storage"]
5059
5060 step = "Getting scaling-group-descriptor"
5061 scaling_descriptor = find_in_list(
5062 get_scaling_aspect(db_vnfd),
5063 lambda scale_desc: scale_desc["name"] == scaling_group,
5064 )
5065 if not scaling_descriptor:
5066 raise LcmException(
5067 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5068 "at vnfd:scaling-group-descriptor".format(scaling_group)
5069 )
5070
5071 step = "Sending scale order to VIM"
5072 # TODO check if ns is in a proper status
5073 nb_scale_op = 0
5074 if not db_nsr["_admin"].get("scaling-group"):
5075 self.update_db_2(
5076 "nsrs",
5077 nsr_id,
5078 {
5079 "_admin.scaling-group": [
5080 {"name": scaling_group, "nb-scale-op": 0}
5081 ]
5082 },
5083 )
5084 admin_scale_index = 0
5085 else:
5086 for admin_scale_index, admin_scale_info in enumerate(
5087 db_nsr["_admin"]["scaling-group"]
5088 ):
5089 if admin_scale_info["name"] == scaling_group:
5090 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5091 break
5092 else: # not found, set index one plus last element and add new entry with the name
5093 admin_scale_index += 1
5094 db_nsr_update[
5095 "_admin.scaling-group.{}.name".format(admin_scale_index)
5096 ] = scaling_group
5097
5098 vca_scaling_info = []
5099 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5100 if scaling_type == "SCALE_OUT":
5101 if "aspect-delta-details" not in scaling_descriptor:
5102 raise LcmException(
5103 "Aspect delta details not fount in scaling descriptor {}".format(
5104 scaling_descriptor["name"]
5105 )
5106 )
5107 # count if max-instance-count is reached
5108 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5109
5110 scaling_info["scaling_direction"] = "OUT"
5111 scaling_info["vdu-create"] = {}
5112 scaling_info["kdu-create"] = {}
5113 for delta in deltas:
5114 for vdu_delta in delta.get("vdu-delta", {}):
5115 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5116 # vdu_index also provides the number of instance of the targeted vdu
5117 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5118 cloud_init_text = self._get_vdu_cloud_init_content(
5119 vdud, db_vnfd
5120 )
5121 if cloud_init_text:
5122 additional_params = (
5123 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5124 or {}
5125 )
5126 cloud_init_list = []
5127
5128 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5129 max_instance_count = 10
5130 if vdu_profile and "max-number-of-instances" in vdu_profile:
5131 max_instance_count = vdu_profile.get(
5132 "max-number-of-instances", 10
5133 )
5134
5135 default_instance_num = get_number_of_instances(
5136 db_vnfd, vdud["id"]
5137 )
5138 instances_number = vdu_delta.get("number-of-instances", 1)
5139 nb_scale_op += instances_number
5140
5141 new_instance_count = nb_scale_op + default_instance_num
5142 # Control if new count is over max and vdu count is less than max.
5143 # Then assign new instance count
5144 if new_instance_count > max_instance_count > vdu_count:
5145 instances_number = new_instance_count - max_instance_count
5146 else:
5147 instances_number = instances_number
5148
5149 if new_instance_count > max_instance_count:
5150 raise LcmException(
5151 "reached the limit of {} (max-instance-count) "
5152 "scaling-out operations for the "
5153 "scaling-group-descriptor '{}'".format(
5154 nb_scale_op, scaling_group
5155 )
5156 )
5157 for x in range(vdu_delta.get("number-of-instances", 1)):
5158 if cloud_init_text:
5159 # TODO Information of its own ip is not available because db_vnfr is not updated.
5160 additional_params["OSM"] = get_osm_params(
5161 db_vnfr, vdu_delta["id"], vdu_index + x
5162 )
5163 cloud_init_list.append(
5164 self._parse_cloud_init(
5165 cloud_init_text,
5166 additional_params,
5167 db_vnfd["id"],
5168 vdud["id"],
5169 )
5170 )
5171 vca_scaling_info.append(
5172 {
5173 "osm_vdu_id": vdu_delta["id"],
5174 "member-vnf-index": vnf_index,
5175 "type": "create",
5176 "vdu_index": vdu_index + x,
5177 }
5178 )
5179 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5180 for kdu_delta in delta.get("kdu-resource-delta", {}):
5181 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5182 kdu_name = kdu_profile["kdu-name"]
5183 resource_name = kdu_profile["resource-name"]
5184
5185 # Might have different kdus in the same delta
5186 # Should have list for each kdu
5187 if not scaling_info["kdu-create"].get(kdu_name, None):
5188 scaling_info["kdu-create"][kdu_name] = []
5189
5190 kdur = get_kdur(db_vnfr, kdu_name)
5191 if kdur.get("helm-chart"):
5192 k8s_cluster_type = "helm-chart-v3"
5193 self.logger.debug("kdur: {}".format(kdur))
5194 if (
5195 kdur.get("helm-version")
5196 and kdur.get("helm-version") == "v2"
5197 ):
5198 k8s_cluster_type = "helm-chart"
5199 raise NotImplementedError
5200 elif kdur.get("juju-bundle"):
5201 k8s_cluster_type = "juju-bundle"
5202 else:
5203 raise LcmException(
5204 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5205 "juju-bundle. Maybe an old NBI version is running".format(
5206 db_vnfr["member-vnf-index-ref"], kdu_name
5207 )
5208 )
5209
5210 max_instance_count = 10
5211 if kdu_profile and "max-number-of-instances" in kdu_profile:
5212 max_instance_count = kdu_profile.get(
5213 "max-number-of-instances", 10
5214 )
5215
5216 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5217 deployed_kdu, _ = get_deployed_kdu(
5218 nsr_deployed, kdu_name, vnf_index
5219 )
5220 if deployed_kdu is None:
5221 raise LcmException(
5222 "KDU '{}' for vnf '{}' not deployed".format(
5223 kdu_name, vnf_index
5224 )
5225 )
5226 kdu_instance = deployed_kdu.get("kdu-instance")
5227 instance_num = await self.k8scluster_map[
5228 k8s_cluster_type
5229 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5230 kdu_replica_count = instance_num + kdu_delta.get(
5231 "number-of-instances", 1
5232 )
5233
5234 # Control if new count is over max and instance_num is less than max.
5235 # Then assign max instance number to kdu replica count
5236 if kdu_replica_count > max_instance_count > instance_num:
5237 kdu_replica_count = max_instance_count
5238 if kdu_replica_count > max_instance_count:
5239 raise LcmException(
5240 "reached the limit of {} (max-instance-count) "
5241 "scaling-out operations for the "
5242 "scaling-group-descriptor '{}'".format(
5243 instance_num, scaling_group
5244 )
5245 )
5246
5247 for x in range(kdu_delta.get("number-of-instances", 1)):
5248 vca_scaling_info.append(
5249 {
5250 "osm_kdu_id": kdu_name,
5251 "member-vnf-index": vnf_index,
5252 "type": "create",
5253 "kdu_index": instance_num + x - 1,
5254 }
5255 )
5256 scaling_info["kdu-create"][kdu_name].append(
5257 {
5258 "member-vnf-index": vnf_index,
5259 "type": "create",
5260 "k8s-cluster-type": k8s_cluster_type,
5261 "resource-name": resource_name,
5262 "scale": kdu_replica_count,
5263 }
5264 )
5265 elif scaling_type == "SCALE_IN":
5266 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5267
5268 scaling_info["scaling_direction"] = "IN"
5269 scaling_info["vdu-delete"] = {}
5270 scaling_info["kdu-delete"] = {}
5271
5272 for delta in deltas:
5273 for vdu_delta in delta.get("vdu-delta", {}):
5274 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5275 min_instance_count = 0
5276 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5277 if vdu_profile and "min-number-of-instances" in vdu_profile:
5278 min_instance_count = vdu_profile["min-number-of-instances"]
5279
5280 default_instance_num = get_number_of_instances(
5281 db_vnfd, vdu_delta["id"]
5282 )
5283 instance_num = vdu_delta.get("number-of-instances", 1)
5284 nb_scale_op -= instance_num
5285
5286 new_instance_count = nb_scale_op + default_instance_num
5287
5288 if new_instance_count < min_instance_count < vdu_count:
5289 instances_number = min_instance_count - new_instance_count
5290 else:
5291 instances_number = instance_num
5292
5293 if new_instance_count < min_instance_count:
5294 raise LcmException(
5295 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5296 "scaling-group-descriptor '{}'".format(
5297 nb_scale_op, scaling_group
5298 )
5299 )
5300 for x in range(vdu_delta.get("number-of-instances", 1)):
5301 vca_scaling_info.append(
5302 {
5303 "osm_vdu_id": vdu_delta["id"],
5304 "member-vnf-index": vnf_index,
5305 "type": "delete",
5306 "vdu_index": vdu_index - 1 - x,
5307 }
5308 )
5309 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5310 for kdu_delta in delta.get("kdu-resource-delta", {}):
5311 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5312 kdu_name = kdu_profile["kdu-name"]
5313 resource_name = kdu_profile["resource-name"]
5314
5315 if not scaling_info["kdu-delete"].get(kdu_name, None):
5316 scaling_info["kdu-delete"][kdu_name] = []
5317
5318 kdur = get_kdur(db_vnfr, kdu_name)
5319 if kdur.get("helm-chart"):
5320 k8s_cluster_type = "helm-chart-v3"
5321 self.logger.debug("kdur: {}".format(kdur))
5322 if (
5323 kdur.get("helm-version")
5324 and kdur.get("helm-version") == "v2"
5325 ):
5326 k8s_cluster_type = "helm-chart"
5327 raise NotImplementedError
5328 elif kdur.get("juju-bundle"):
5329 k8s_cluster_type = "juju-bundle"
5330 else:
5331 raise LcmException(
5332 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5333 "juju-bundle. Maybe an old NBI version is running".format(
5334 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5335 )
5336 )
5337
5338 min_instance_count = 0
5339 if kdu_profile and "min-number-of-instances" in kdu_profile:
5340 min_instance_count = kdu_profile["min-number-of-instances"]
5341
5342 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5343 deployed_kdu, _ = get_deployed_kdu(
5344 nsr_deployed, kdu_name, vnf_index
5345 )
5346 if deployed_kdu is None:
5347 raise LcmException(
5348 "KDU '{}' for vnf '{}' not deployed".format(
5349 kdu_name, vnf_index
5350 )
5351 )
5352 kdu_instance = deployed_kdu.get("kdu-instance")
5353 instance_num = await self.k8scluster_map[
5354 k8s_cluster_type
5355 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5356 kdu_replica_count = instance_num - kdu_delta.get(
5357 "number-of-instances", 1
5358 )
5359
5360 if kdu_replica_count < min_instance_count < instance_num:
5361 kdu_replica_count = min_instance_count
5362 if kdu_replica_count < min_instance_count:
5363 raise LcmException(
5364 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5365 "scaling-group-descriptor '{}'".format(
5366 instance_num, scaling_group
5367 )
5368 )
5369
5370 for x in range(kdu_delta.get("number-of-instances", 1)):
5371 vca_scaling_info.append(
5372 {
5373 "osm_kdu_id": kdu_name,
5374 "member-vnf-index": vnf_index,
5375 "type": "delete",
5376 "kdu_index": instance_num - x - 1,
5377 }
5378 )
5379 scaling_info["kdu-delete"][kdu_name].append(
5380 {
5381 "member-vnf-index": vnf_index,
5382 "type": "delete",
5383 "k8s-cluster-type": k8s_cluster_type,
5384 "resource-name": resource_name,
5385 "scale": kdu_replica_count,
5386 }
5387 )
5388
5389 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5390 vdu_delete = copy(scaling_info.get("vdu-delete"))
5391 if scaling_info["scaling_direction"] == "IN":
5392 for vdur in reversed(db_vnfr["vdur"]):
5393 if vdu_delete.get(vdur["vdu-id-ref"]):
5394 vdu_delete[vdur["vdu-id-ref"]] -= 1
5395 scaling_info["vdu"].append(
5396 {
5397 "name": vdur.get("name") or vdur.get("vdu-name"),
5398 "vdu_id": vdur["vdu-id-ref"],
5399 "interface": [],
5400 }
5401 )
5402 for interface in vdur["interfaces"]:
5403 scaling_info["vdu"][-1]["interface"].append(
5404 {
5405 "name": interface["name"],
5406 "ip_address": interface["ip-address"],
5407 "mac_address": interface.get("mac-address"),
5408 }
5409 )
5410 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5411
5412 # PRE-SCALE BEGIN
5413 step = "Executing pre-scale vnf-config-primitive"
5414 if scaling_descriptor.get("scaling-config-action"):
5415 for scaling_config_action in scaling_descriptor[
5416 "scaling-config-action"
5417 ]:
5418 if (
5419 scaling_config_action.get("trigger") == "pre-scale-in"
5420 and scaling_type == "SCALE_IN"
5421 ) or (
5422 scaling_config_action.get("trigger") == "pre-scale-out"
5423 and scaling_type == "SCALE_OUT"
5424 ):
5425 vnf_config_primitive = scaling_config_action[
5426 "vnf-config-primitive-name-ref"
5427 ]
5428 step = db_nslcmop_update[
5429 "detailed-status"
5430 ] = "executing pre-scale scaling-config-action '{}'".format(
5431 vnf_config_primitive
5432 )
5433
5434 # look for primitive
5435 for config_primitive in (
5436 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5437 ).get("config-primitive", ()):
5438 if config_primitive["name"] == vnf_config_primitive:
5439 break
5440 else:
5441 raise LcmException(
5442 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5443 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5444 "primitive".format(scaling_group, vnf_config_primitive)
5445 )
5446
5447 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5448 if db_vnfr.get("additionalParamsForVnf"):
5449 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5450
5451 scale_process = "VCA"
5452 db_nsr_update["config-status"] = "configuring pre-scaling"
5453 primitive_params = self._map_primitive_params(
5454 config_primitive, {}, vnfr_params
5455 )
5456
5457 # Pre-scale retry check: Check if this sub-operation has been executed before
5458 op_index = self._check_or_add_scale_suboperation(
5459 db_nslcmop,
5460 vnf_index,
5461 vnf_config_primitive,
5462 primitive_params,
5463 "PRE-SCALE",
5464 )
5465 if op_index == self.SUBOPERATION_STATUS_SKIP:
5466 # Skip sub-operation
5467 result = "COMPLETED"
5468 result_detail = "Done"
5469 self.logger.debug(
5470 logging_text
5471 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5472 vnf_config_primitive, result, result_detail
5473 )
5474 )
5475 else:
5476 if op_index == self.SUBOPERATION_STATUS_NEW:
5477 # New sub-operation: Get index of this sub-operation
5478 op_index = (
5479 len(db_nslcmop.get("_admin", {}).get("operations"))
5480 - 1
5481 )
5482 self.logger.debug(
5483 logging_text
5484 + "vnf_config_primitive={} New sub-operation".format(
5485 vnf_config_primitive
5486 )
5487 )
5488 else:
5489 # retry: Get registered params for this existing sub-operation
5490 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5491 op_index
5492 ]
5493 vnf_index = op.get("member_vnf_index")
5494 vnf_config_primitive = op.get("primitive")
5495 primitive_params = op.get("primitive_params")
5496 self.logger.debug(
5497 logging_text
5498 + "vnf_config_primitive={} Sub-operation retry".format(
5499 vnf_config_primitive
5500 )
5501 )
5502 # Execute the primitive, either with new (first-time) or registered (reintent) args
5503 ee_descriptor_id = config_primitive.get(
5504 "execution-environment-ref"
5505 )
5506 primitive_name = config_primitive.get(
5507 "execution-environment-primitive", vnf_config_primitive
5508 )
5509 ee_id, vca_type = self._look_for_deployed_vca(
5510 nsr_deployed["VCA"],
5511 member_vnf_index=vnf_index,
5512 vdu_id=None,
5513 vdu_count_index=None,
5514 ee_descriptor_id=ee_descriptor_id,
5515 )
5516 result, result_detail = await self._ns_execute_primitive(
5517 ee_id,
5518 primitive_name,
5519 primitive_params,
5520 vca_type=vca_type,
5521 vca_id=vca_id,
5522 )
5523 self.logger.debug(
5524 logging_text
5525 + "vnf_config_primitive={} Done with result {} {}".format(
5526 vnf_config_primitive, result, result_detail
5527 )
5528 )
5529 # Update operationState = COMPLETED | FAILED
5530 self._update_suboperation_status(
5531 db_nslcmop, op_index, result, result_detail
5532 )
5533
5534 if result == "FAILED":
5535 raise LcmException(result_detail)
5536 db_nsr_update["config-status"] = old_config_status
5537 scale_process = None
5538 # PRE-SCALE END
5539
5540 db_nsr_update[
5541 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5542 ] = nb_scale_op
5543 db_nsr_update[
5544 "_admin.scaling-group.{}.time".format(admin_scale_index)
5545 ] = time()
5546
5547 # SCALE-IN VCA - BEGIN
5548 if vca_scaling_info:
5549 step = db_nslcmop_update[
5550 "detailed-status"
5551 ] = "Deleting the execution environments"
5552 scale_process = "VCA"
5553 for vca_info in vca_scaling_info:
5554 if vca_info["type"] == "delete":
5555 member_vnf_index = str(vca_info["member-vnf-index"])
5556 self.logger.debug(
5557 logging_text + "vdu info: {}".format(vca_info)
5558 )
5559 if vca_info.get("osm_vdu_id"):
5560 vdu_id = vca_info["osm_vdu_id"]
5561 vdu_index = int(vca_info["vdu_index"])
5562 stage[
5563 1
5564 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5565 member_vnf_index, vdu_id, vdu_index
5566 )
5567 else:
5568 vdu_index = 0
5569 kdu_id = vca_info["osm_kdu_id"]
5570 stage[
5571 1
5572 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5573 member_vnf_index, kdu_id, vdu_index
5574 )
5575 stage[2] = step = "Scaling in VCA"
5576 self._write_op_status(op_id=nslcmop_id, stage=stage)
5577 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5578 config_update = db_nsr["configurationStatus"]
5579 for vca_index, vca in enumerate(vca_update):
5580 if (
5581 (vca or vca.get("ee_id"))
5582 and vca["member-vnf-index"] == member_vnf_index
5583 and vca["vdu_count_index"] == vdu_index
5584 ):
5585 if vca.get("vdu_id"):
5586 config_descriptor = get_configuration(
5587 db_vnfd, vca.get("vdu_id")
5588 )
5589 elif vca.get("kdu_name"):
5590 config_descriptor = get_configuration(
5591 db_vnfd, vca.get("kdu_name")
5592 )
5593 else:
5594 config_descriptor = get_configuration(
5595 db_vnfd, db_vnfd["id"]
5596 )
5597 operation_params = (
5598 db_nslcmop.get("operationParams") or {}
5599 )
5600 exec_terminate_primitives = not operation_params.get(
5601 "skip_terminate_primitives"
5602 ) and vca.get("needed_terminate")
5603 task = asyncio.ensure_future(
5604 asyncio.wait_for(
5605 self.destroy_N2VC(
5606 logging_text,
5607 db_nslcmop,
5608 vca,
5609 config_descriptor,
5610 vca_index,
5611 destroy_ee=True,
5612 exec_primitives=exec_terminate_primitives,
5613 scaling_in=True,
5614 vca_id=vca_id,
5615 ),
5616 timeout=self.timeout_charm_delete,
5617 )
5618 )
5619 tasks_dict_info[task] = "Terminating VCA {}".format(
5620 vca.get("ee_id")
5621 )
5622 del vca_update[vca_index]
5623 del config_update[vca_index]
5624 # wait for pending tasks of terminate primitives
5625 if tasks_dict_info:
5626 self.logger.debug(
5627 logging_text
5628 + "Waiting for tasks {}".format(
5629 list(tasks_dict_info.keys())
5630 )
5631 )
5632 error_list = await self._wait_for_tasks(
5633 logging_text,
5634 tasks_dict_info,
5635 min(
5636 self.timeout_charm_delete, self.timeout_ns_terminate
5637 ),
5638 stage,
5639 nslcmop_id,
5640 )
5641 tasks_dict_info.clear()
5642 if error_list:
5643 raise LcmException("; ".join(error_list))
5644
5645 db_vca_and_config_update = {
5646 "_admin.deployed.VCA": vca_update,
5647 "configurationStatus": config_update,
5648 }
5649 self.update_db_2(
5650 "nsrs", db_nsr["_id"], db_vca_and_config_update
5651 )
5652 scale_process = None
5653 # SCALE-IN VCA - END
5654
5655 # SCALE RO - BEGIN
5656 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5657 scale_process = "RO"
5658 if self.ro_config.get("ng"):
5659 await self._scale_ng_ro(
5660 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5661 )
5662 scaling_info.pop("vdu-create", None)
5663 scaling_info.pop("vdu-delete", None)
5664
5665 scale_process = None
5666 # SCALE RO - END
5667
5668 # SCALE KDU - BEGIN
5669 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5670 scale_process = "KDU"
5671 await self._scale_kdu(
5672 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5673 )
5674 scaling_info.pop("kdu-create", None)
5675 scaling_info.pop("kdu-delete", None)
5676
5677 scale_process = None
5678 # SCALE KDU - END
5679
5680 if db_nsr_update:
5681 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5682
5683 # SCALE-UP VCA - BEGIN
5684 if vca_scaling_info:
5685 step = db_nslcmop_update[
5686 "detailed-status"
5687 ] = "Creating new execution environments"
5688 scale_process = "VCA"
5689 for vca_info in vca_scaling_info:
5690 if vca_info["type"] == "create":
5691 member_vnf_index = str(vca_info["member-vnf-index"])
5692 self.logger.debug(
5693 logging_text + "vdu info: {}".format(vca_info)
5694 )
5695 vnfd_id = db_vnfr["vnfd-ref"]
5696 if vca_info.get("osm_vdu_id"):
5697 vdu_index = int(vca_info["vdu_index"])
5698 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5699 if db_vnfr.get("additionalParamsForVnf"):
5700 deploy_params.update(
5701 parse_yaml_strings(
5702 db_vnfr["additionalParamsForVnf"].copy()
5703 )
5704 )
5705 descriptor_config = get_configuration(
5706 db_vnfd, db_vnfd["id"]
5707 )
5708 if descriptor_config:
5709 vdu_id = None
5710 vdu_name = None
5711 kdu_name = None
5712 self._deploy_n2vc(
5713 logging_text=logging_text
5714 + "member_vnf_index={} ".format(member_vnf_index),
5715 db_nsr=db_nsr,
5716 db_vnfr=db_vnfr,
5717 nslcmop_id=nslcmop_id,
5718 nsr_id=nsr_id,
5719 nsi_id=nsi_id,
5720 vnfd_id=vnfd_id,
5721 vdu_id=vdu_id,
5722 kdu_name=kdu_name,
5723 member_vnf_index=member_vnf_index,
5724 vdu_index=vdu_index,
5725 vdu_name=vdu_name,
5726 deploy_params=deploy_params,
5727 descriptor_config=descriptor_config,
5728 base_folder=base_folder,
5729 task_instantiation_info=tasks_dict_info,
5730 stage=stage,
5731 )
5732 vdu_id = vca_info["osm_vdu_id"]
5733 vdur = find_in_list(
5734 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5735 )
5736 descriptor_config = get_configuration(db_vnfd, vdu_id)
5737 if vdur.get("additionalParams"):
5738 deploy_params_vdu = parse_yaml_strings(
5739 vdur["additionalParams"]
5740 )
5741 else:
5742 deploy_params_vdu = deploy_params
5743 deploy_params_vdu["OSM"] = get_osm_params(
5744 db_vnfr, vdu_id, vdu_count_index=vdu_index
5745 )
5746 if descriptor_config:
5747 vdu_name = None
5748 kdu_name = None
5749 stage[
5750 1
5751 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5752 member_vnf_index, vdu_id, vdu_index
5753 )
5754 stage[2] = step = "Scaling out VCA"
5755 self._write_op_status(op_id=nslcmop_id, stage=stage)
5756 self._deploy_n2vc(
5757 logging_text=logging_text
5758 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5759 member_vnf_index, vdu_id, vdu_index
5760 ),
5761 db_nsr=db_nsr,
5762 db_vnfr=db_vnfr,
5763 nslcmop_id=nslcmop_id,
5764 nsr_id=nsr_id,
5765 nsi_id=nsi_id,
5766 vnfd_id=vnfd_id,
5767 vdu_id=vdu_id,
5768 kdu_name=kdu_name,
5769 member_vnf_index=member_vnf_index,
5770 vdu_index=vdu_index,
5771 vdu_name=vdu_name,
5772 deploy_params=deploy_params_vdu,
5773 descriptor_config=descriptor_config,
5774 base_folder=base_folder,
5775 task_instantiation_info=tasks_dict_info,
5776 stage=stage,
5777 )
5778 else:
5779 kdu_name = vca_info["osm_kdu_id"]
5780 descriptor_config = get_configuration(db_vnfd, kdu_name)
5781 if descriptor_config:
5782 vdu_id = None
5783 kdu_index = int(vca_info["kdu_index"])
5784 vdu_name = None
5785 kdur = next(
5786 x
5787 for x in db_vnfr["kdur"]
5788 if x["kdu-name"] == kdu_name
5789 )
5790 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5791 if kdur.get("additionalParams"):
5792 deploy_params_kdu = parse_yaml_strings(
5793 kdur["additionalParams"]
5794 )
5795
5796 self._deploy_n2vc(
5797 logging_text=logging_text,
5798 db_nsr=db_nsr,
5799 db_vnfr=db_vnfr,
5800 nslcmop_id=nslcmop_id,
5801 nsr_id=nsr_id,
5802 nsi_id=nsi_id,
5803 vnfd_id=vnfd_id,
5804 vdu_id=vdu_id,
5805 kdu_name=kdu_name,
5806 member_vnf_index=member_vnf_index,
5807 vdu_index=kdu_index,
5808 vdu_name=vdu_name,
5809 deploy_params=deploy_params_kdu,
5810 descriptor_config=descriptor_config,
5811 base_folder=base_folder,
5812 task_instantiation_info=tasks_dict_info,
5813 stage=stage,
5814 )
5815 # SCALE-UP VCA - END
5816 scale_process = None
5817
5818 # POST-SCALE BEGIN
5819 # execute primitive service POST-SCALING
5820 step = "Executing post-scale vnf-config-primitive"
5821 if scaling_descriptor.get("scaling-config-action"):
5822 for scaling_config_action in scaling_descriptor[
5823 "scaling-config-action"
5824 ]:
5825 if (
5826 scaling_config_action.get("trigger") == "post-scale-in"
5827 and scaling_type == "SCALE_IN"
5828 ) or (
5829 scaling_config_action.get("trigger") == "post-scale-out"
5830 and scaling_type == "SCALE_OUT"
5831 ):
5832 vnf_config_primitive = scaling_config_action[
5833 "vnf-config-primitive-name-ref"
5834 ]
5835 step = db_nslcmop_update[
5836 "detailed-status"
5837 ] = "executing post-scale scaling-config-action '{}'".format(
5838 vnf_config_primitive
5839 )
5840
5841 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5842 if db_vnfr.get("additionalParamsForVnf"):
5843 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5844
5845 # look for primitive
5846 for config_primitive in (
5847 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5848 ).get("config-primitive", ()):
5849 if config_primitive["name"] == vnf_config_primitive:
5850 break
5851 else:
5852 raise LcmException(
5853 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5854 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5855 "config-primitive".format(
5856 scaling_group, vnf_config_primitive
5857 )
5858 )
5859 scale_process = "VCA"
5860 db_nsr_update["config-status"] = "configuring post-scaling"
5861 primitive_params = self._map_primitive_params(
5862 config_primitive, {}, vnfr_params
5863 )
5864
5865 # Post-scale retry check: Check if this sub-operation has been executed before
5866 op_index = self._check_or_add_scale_suboperation(
5867 db_nslcmop,
5868 vnf_index,
5869 vnf_config_primitive,
5870 primitive_params,
5871 "POST-SCALE",
5872 )
5873 if op_index == self.SUBOPERATION_STATUS_SKIP:
5874 # Skip sub-operation
5875 result = "COMPLETED"
5876 result_detail = "Done"
5877 self.logger.debug(
5878 logging_text
5879 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5880 vnf_config_primitive, result, result_detail
5881 )
5882 )
5883 else:
5884 if op_index == self.SUBOPERATION_STATUS_NEW:
5885 # New sub-operation: Get index of this sub-operation
5886 op_index = (
5887 len(db_nslcmop.get("_admin", {}).get("operations"))
5888 - 1
5889 )
5890 self.logger.debug(
5891 logging_text
5892 + "vnf_config_primitive={} New sub-operation".format(
5893 vnf_config_primitive
5894 )
5895 )
5896 else:
5897 # retry: Get registered params for this existing sub-operation
5898 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5899 op_index
5900 ]
5901 vnf_index = op.get("member_vnf_index")
5902 vnf_config_primitive = op.get("primitive")
5903 primitive_params = op.get("primitive_params")
5904 self.logger.debug(
5905 logging_text
5906 + "vnf_config_primitive={} Sub-operation retry".format(
5907 vnf_config_primitive
5908 )
5909 )
5910 # Execute the primitive, either with new (first-time) or registered (reintent) args
5911 ee_descriptor_id = config_primitive.get(
5912 "execution-environment-ref"
5913 )
5914 primitive_name = config_primitive.get(
5915 "execution-environment-primitive", vnf_config_primitive
5916 )
5917 ee_id, vca_type = self._look_for_deployed_vca(
5918 nsr_deployed["VCA"],
5919 member_vnf_index=vnf_index,
5920 vdu_id=None,
5921 vdu_count_index=None,
5922 ee_descriptor_id=ee_descriptor_id,
5923 )
5924 result, result_detail = await self._ns_execute_primitive(
5925 ee_id,
5926 primitive_name,
5927 primitive_params,
5928 vca_type=vca_type,
5929 vca_id=vca_id,
5930 )
5931 self.logger.debug(
5932 logging_text
5933 + "vnf_config_primitive={} Done with result {} {}".format(
5934 vnf_config_primitive, result, result_detail
5935 )
5936 )
5937 # Update operationState = COMPLETED | FAILED
5938 self._update_suboperation_status(
5939 db_nslcmop, op_index, result, result_detail
5940 )
5941
5942 if result == "FAILED":
5943 raise LcmException(result_detail)
5944 db_nsr_update["config-status"] = old_config_status
5945 scale_process = None
5946 # POST-SCALE END
5947
5948 db_nsr_update[
5949 "detailed-status"
5950 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5951 db_nsr_update["operational-status"] = (
5952 "running"
5953 if old_operational_status == "failed"
5954 else old_operational_status
5955 )
5956 db_nsr_update["config-status"] = old_config_status
5957 return
5958 except (
5959 ROclient.ROClientException,
5960 DbException,
5961 LcmException,
5962 NgRoException,
5963 ) as e:
5964 self.logger.error(logging_text + "Exit Exception {}".format(e))
5965 exc = e
5966 except asyncio.CancelledError:
5967 self.logger.error(
5968 logging_text + "Cancelled Exception while '{}'".format(step)
5969 )
5970 exc = "Operation was cancelled"
5971 except Exception as e:
5972 exc = traceback.format_exc()
5973 self.logger.critical(
5974 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5975 exc_info=True,
5976 )
5977 finally:
5978 self._write_ns_status(
5979 nsr_id=nsr_id,
5980 ns_state=None,
5981 current_operation="IDLE",
5982 current_operation_id=None,
5983 )
5984 if tasks_dict_info:
5985 stage[1] = "Waiting for instantiate pending tasks."
5986 self.logger.debug(logging_text + stage[1])
5987 exc = await self._wait_for_tasks(
5988 logging_text,
5989 tasks_dict_info,
5990 self.timeout_ns_deploy,
5991 stage,
5992 nslcmop_id,
5993 nsr_id=nsr_id,
5994 )
5995 if exc:
5996 db_nslcmop_update[
5997 "detailed-status"
5998 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5999 nslcmop_operation_state = "FAILED"
6000 if db_nsr:
6001 db_nsr_update["operational-status"] = old_operational_status
6002 db_nsr_update["config-status"] = old_config_status
6003 db_nsr_update["detailed-status"] = ""
6004 if scale_process:
6005 if "VCA" in scale_process:
6006 db_nsr_update["config-status"] = "failed"
6007 if "RO" in scale_process:
6008 db_nsr_update["operational-status"] = "failed"
6009 db_nsr_update[
6010 "detailed-status"
6011 ] = "FAILED scaling nslcmop={} {}: {}".format(
6012 nslcmop_id, step, exc
6013 )
6014 else:
6015 error_description_nslcmop = None
6016 nslcmop_operation_state = "COMPLETED"
6017 db_nslcmop_update["detailed-status"] = "Done"
6018
6019 self._write_op_status(
6020 op_id=nslcmop_id,
6021 stage="",
6022 error_message=error_description_nslcmop,
6023 operation_state=nslcmop_operation_state,
6024 other_update=db_nslcmop_update,
6025 )
6026 if db_nsr:
6027 self._write_ns_status(
6028 nsr_id=nsr_id,
6029 ns_state=None,
6030 current_operation="IDLE",
6031 current_operation_id=None,
6032 other_update=db_nsr_update,
6033 )
6034
6035 if nslcmop_operation_state:
6036 try:
6037 msg = {
6038 "nsr_id": nsr_id,
6039 "nslcmop_id": nslcmop_id,
6040 "operationState": nslcmop_operation_state,
6041 }
6042 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6043 except Exception as e:
6044 self.logger.error(
6045 logging_text + "kafka_write notification Exception {}".format(e)
6046 )
6047 self.logger.debug(logging_text + "Exit")
6048 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6049
6050 async def _scale_kdu(
6051 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6052 ):
6053 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6054 for kdu_name in _scaling_info:
6055 for kdu_scaling_info in _scaling_info[kdu_name]:
6056 deployed_kdu, index = get_deployed_kdu(
6057 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6058 )
6059 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6060 kdu_instance = deployed_kdu["kdu-instance"]
6061 scale = int(kdu_scaling_info["scale"])
6062 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6063
6064 db_dict = {
6065 "collection": "nsrs",
6066 "filter": {"_id": nsr_id},
6067 "path": "_admin.deployed.K8s.{}".format(index),
6068 }
6069
6070 step = "scaling application {}".format(
6071 kdu_scaling_info["resource-name"]
6072 )
6073 self.logger.debug(logging_text + step)
6074
6075 if kdu_scaling_info["type"] == "delete":
6076 kdu_config = get_configuration(db_vnfd, kdu_name)
6077 if (
6078 kdu_config
6079 and kdu_config.get("terminate-config-primitive")
6080 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6081 ):
6082 terminate_config_primitive_list = kdu_config.get(
6083 "terminate-config-primitive"
6084 )
6085 terminate_config_primitive_list.sort(
6086 key=lambda val: int(val["seq"])
6087 )
6088
6089 for (
6090 terminate_config_primitive
6091 ) in terminate_config_primitive_list:
6092 primitive_params_ = self._map_primitive_params(
6093 terminate_config_primitive, {}, {}
6094 )
6095 step = "execute terminate config primitive"
6096 self.logger.debug(logging_text + step)
6097 await asyncio.wait_for(
6098 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6099 cluster_uuid=cluster_uuid,
6100 kdu_instance=kdu_instance,
6101 primitive_name=terminate_config_primitive["name"],
6102 params=primitive_params_,
6103 db_dict=db_dict,
6104 vca_id=vca_id,
6105 ),
6106 timeout=600,
6107 )
6108
6109 await asyncio.wait_for(
6110 self.k8scluster_map[k8s_cluster_type].scale(
6111 kdu_instance,
6112 scale,
6113 kdu_scaling_info["resource-name"],
6114 vca_id=vca_id,
6115 ),
6116 timeout=self.timeout_vca_on_error,
6117 )
6118
6119 if kdu_scaling_info["type"] == "create":
6120 kdu_config = get_configuration(db_vnfd, kdu_name)
6121 if (
6122 kdu_config
6123 and kdu_config.get("initial-config-primitive")
6124 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6125 ):
6126 initial_config_primitive_list = kdu_config.get(
6127 "initial-config-primitive"
6128 )
6129 initial_config_primitive_list.sort(
6130 key=lambda val: int(val["seq"])
6131 )
6132
6133 for initial_config_primitive in initial_config_primitive_list:
6134 primitive_params_ = self._map_primitive_params(
6135 initial_config_primitive, {}, {}
6136 )
6137 step = "execute initial config primitive"
6138 self.logger.debug(logging_text + step)
6139 await asyncio.wait_for(
6140 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6141 cluster_uuid=cluster_uuid,
6142 kdu_instance=kdu_instance,
6143 primitive_name=initial_config_primitive["name"],
6144 params=primitive_params_,
6145 db_dict=db_dict,
6146 vca_id=vca_id,
6147 ),
6148 timeout=600,
6149 )
6150
6151 async def _scale_ng_ro(
6152 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6153 ):
6154 nsr_id = db_nslcmop["nsInstanceId"]
6155 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6156 db_vnfrs = {}
6157
6158 # read from db: vnfd's for every vnf
6159 db_vnfds = []
6160
6161 # for each vnf in ns, read vnfd
6162 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6163 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6164 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6165 # if we haven't this vnfd, read it from db
6166 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6167 # read from db
6168 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6169 db_vnfds.append(vnfd)
6170 n2vc_key = self.n2vc.get_public_key()
6171 n2vc_key_list = [n2vc_key]
6172 self.scale_vnfr(
6173 db_vnfr,
6174 vdu_scaling_info.get("vdu-create"),
6175 vdu_scaling_info.get("vdu-delete"),
6176 mark_delete=True,
6177 )
6178 # db_vnfr has been updated, update db_vnfrs to use it
6179 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6180 await self._instantiate_ng_ro(
6181 logging_text,
6182 nsr_id,
6183 db_nsd,
6184 db_nsr,
6185 db_nslcmop,
6186 db_vnfrs,
6187 db_vnfds,
6188 n2vc_key_list,
6189 stage=stage,
6190 start_deploy=time(),
6191 timeout_ns_deploy=self.timeout_ns_deploy,
6192 )
6193 if vdu_scaling_info.get("vdu-delete"):
6194 self.scale_vnfr(
6195 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6196 )
6197
6198 async def add_prometheus_metrics(
6199 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6200 ):
6201 if not self.prometheus:
6202 return
6203 # look if exist a file called 'prometheus*.j2' and
6204 artifact_content = self.fs.dir_ls(artifact_path)
6205 job_file = next(
6206 (
6207 f
6208 for f in artifact_content
6209 if f.startswith("prometheus") and f.endswith(".j2")
6210 ),
6211 None,
6212 )
6213 if not job_file:
6214 return
6215 with self.fs.file_open((artifact_path, job_file), "r") as f:
6216 job_data = f.read()
6217
6218 # TODO get_service
6219 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6220 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6221 host_port = "80"
6222 vnfr_id = vnfr_id.replace("-", "")
6223 variables = {
6224 "JOB_NAME": vnfr_id,
6225 "TARGET_IP": target_ip,
6226 "EXPORTER_POD_IP": host_name,
6227 "EXPORTER_POD_PORT": host_port,
6228 }
6229 job_list = self.prometheus.parse_job(job_data, variables)
6230 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6231 for job in job_list:
6232 if (
6233 not isinstance(job.get("job_name"), str)
6234 or vnfr_id not in job["job_name"]
6235 ):
6236 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6237 job["nsr_id"] = nsr_id
6238 job_dict = {jl["job_name"]: jl for jl in job_list}
6239 if await self.prometheus.update(job_dict):
6240 return list(job_dict.keys())
6241
6242 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6243 """
6244 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6245
6246 :param: vim_account_id: VIM Account ID
6247
6248 :return: (cloud_name, cloud_credential)
6249 """
6250 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6251 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6252
6253 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6254 """
6255 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6256
6257 :param: vim_account_id: VIM Account ID
6258
6259 :return: (cloud_name, cloud_credential)
6260 """
6261 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6262 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")