2694f765d3164919dbda32155f4be05f3af8c7b8
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :cluster_type: The cluster type (juju, k8s)
339 :return: none
340 """
341
342 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
343 # .format(cluster_uuid, kdu_instance, filter))
344
345 nsr_id = filter.get("_id")
346 try:
347 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
348 cluster_uuid=cluster_uuid,
349 kdu_instance=kdu_instance,
350 yaml_format=False,
351 complete_status=True,
352 vca_id=vca_id,
353 )
354
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 if cluster_type in ("juju-bundle", "juju"):
360 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
361 # status in a similar way between Juju Bundles and Helm Charts on this side
362 await self.k8sclusterjuju.update_vca_status(
363 db_dict["vcaStatus"],
364 kdu_instance,
365 vca_id=vca_id,
366 )
367
368 self.logger.debug(
369 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
370 )
371
372 # write to database
373 self.update_db_2("nsrs", nsr_id, db_dict)
374 except (asyncio.CancelledError, asyncio.TimeoutError):
375 raise
376 except Exception as e:
377 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
378
379 @staticmethod
380 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
381 try:
382 env = Environment(undefined=StrictUndefined)
383 template = env.from_string(cloud_init_text)
384 return template.render(additional_params or {})
385 except UndefinedError as e:
386 raise LcmException(
387 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
388 "file, must be provided in the instantiation parameters inside the "
389 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
390 )
391 except (TemplateError, TemplateNotFound) as e:
392 raise LcmException(
393 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
394 vnfd_id, vdu_id, e
395 )
396 )
397
398 def _get_vdu_cloud_init_content(self, vdu, vnfd):
399 cloud_init_content = cloud_init_file = None
400 try:
401 if vdu.get("cloud-init-file"):
402 base_folder = vnfd["_admin"]["storage"]
403 cloud_init_file = "{}/{}/cloud_init/{}".format(
404 base_folder["folder"],
405 base_folder["pkg-dir"],
406 vdu["cloud-init-file"],
407 )
408 with self.fs.file_open(cloud_init_file, "r") as ci_file:
409 cloud_init_content = ci_file.read()
410 elif vdu.get("cloud-init"):
411 cloud_init_content = vdu["cloud-init"]
412
413 return cloud_init_content
414 except FsException as e:
415 raise LcmException(
416 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
417 vnfd["id"], vdu["id"], cloud_init_file, e
418 )
419 )
420
421 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
422 vdur = next(
423 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
424 {}
425 )
426 additional_params = vdur.get("additionalParams")
427 return parse_yaml_strings(additional_params)
428
429 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
430 """
431 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
432 :param vnfd: input vnfd
433 :param new_id: overrides vnf id if provided
434 :param additionalParams: Instantiation params for VNFs provided
435 :param nsrId: Id of the NSR
436 :return: copy of vnfd
437 """
438 vnfd_RO = deepcopy(vnfd)
439 # remove unused by RO configuration, monitoring, scaling and internal keys
440 vnfd_RO.pop("_id", None)
441 vnfd_RO.pop("_admin", None)
442 vnfd_RO.pop("monitoring-param", None)
443 vnfd_RO.pop("scaling-group-descriptor", None)
444 vnfd_RO.pop("kdu", None)
445 vnfd_RO.pop("k8s-cluster", None)
446 if new_id:
447 vnfd_RO["id"] = new_id
448
449 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
450 for vdu in get_iterable(vnfd_RO, "vdu"):
451 vdu.pop("cloud-init-file", None)
452 vdu.pop("cloud-init", None)
453 return vnfd_RO
454
455 @staticmethod
456 def ip_profile_2_RO(ip_profile):
457 RO_ip_profile = deepcopy(ip_profile)
458 if "dns-server" in RO_ip_profile:
459 if isinstance(RO_ip_profile["dns-server"], list):
460 RO_ip_profile["dns-address"] = []
461 for ds in RO_ip_profile.pop("dns-server"):
462 RO_ip_profile["dns-address"].append(ds["address"])
463 else:
464 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
465 if RO_ip_profile.get("ip-version") == "ipv4":
466 RO_ip_profile["ip-version"] = "IPv4"
467 if RO_ip_profile.get("ip-version") == "ipv6":
468 RO_ip_profile["ip-version"] = "IPv6"
469 if "dhcp-params" in RO_ip_profile:
470 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
471 return RO_ip_profile
472
473 def _get_ro_vim_id_for_vim_account(self, vim_account):
474 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
475 if db_vim["_admin"]["operationalState"] != "ENABLED":
476 raise LcmException(
477 "VIM={} is not available. operationalState={}".format(
478 vim_account, db_vim["_admin"]["operationalState"]
479 )
480 )
481 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
482 return RO_vim_id
483
484 def get_ro_wim_id_for_wim_account(self, wim_account):
485 if isinstance(wim_account, str):
486 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
487 if db_wim["_admin"]["operationalState"] != "ENABLED":
488 raise LcmException(
489 "WIM={} is not available. operationalState={}".format(
490 wim_account, db_wim["_admin"]["operationalState"]
491 )
492 )
493 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
494 return RO_wim_id
495 else:
496 return wim_account
497
498 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
499
500 db_vdu_push_list = []
501 template_vdur = []
502 db_update = {"_admin.modified": time()}
503 if vdu_create:
504 for vdu_id, vdu_count in vdu_create.items():
505 vdur = next(
506 (
507 vdur
508 for vdur in reversed(db_vnfr["vdur"])
509 if vdur["vdu-id-ref"] == vdu_id
510 ),
511 None,
512 )
513 if not vdur:
514 # Read the template saved in the db:
515 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
516 vdur_template = db_vnfr.get("vdur-template")
517 if not vdur_template:
518 raise LcmException(
519 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
520 vdu_id
521 )
522 )
523 vdur = vdur_template[0]
524 #Delete a template from the database after using it
525 self.db.set_one("vnfrs",
526 {"_id": db_vnfr["_id"]},
527 None,
528 pull={"vdur-template": {"_id": vdur['_id']}}
529 )
530 for count in range(vdu_count):
531 vdur_copy = deepcopy(vdur)
532 vdur_copy["status"] = "BUILD"
533 vdur_copy["status-detailed"] = None
534 vdur_copy["ip-address"] = None
535 vdur_copy["_id"] = str(uuid4())
536 vdur_copy["count-index"] += count + 1
537 vdur_copy["id"] = "{}-{}".format(
538 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
539 )
540 vdur_copy.pop("vim_info", None)
541 for iface in vdur_copy["interfaces"]:
542 if iface.get("fixed-ip"):
543 iface["ip-address"] = self.increment_ip_mac(
544 iface["ip-address"], count + 1
545 )
546 else:
547 iface.pop("ip-address", None)
548 if iface.get("fixed-mac"):
549 iface["mac-address"] = self.increment_ip_mac(
550 iface["mac-address"], count + 1
551 )
552 else:
553 iface.pop("mac-address", None)
554 if db_vnfr["vdur"]:
555 iface.pop(
556 "mgmt_vnf", None
557 ) # only first vdu can be managment of vnf
558 db_vdu_push_list.append(vdur_copy)
559 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
560 if vdu_delete:
561 if len(db_vnfr["vdur"]) == 1:
562 # The scale will move to 0 instances
563 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
564 template_vdur = [db_vnfr["vdur"][0]]
565 for vdu_id, vdu_count in vdu_delete.items():
566 if mark_delete:
567 indexes_to_delete = [
568 iv[0]
569 for iv in enumerate(db_vnfr["vdur"])
570 if iv[1]["vdu-id-ref"] == vdu_id
571 ]
572 db_update.update(
573 {
574 "vdur.{}.status".format(i): "DELETING"
575 for i in indexes_to_delete[-vdu_count:]
576 }
577 )
578 else:
579 # it must be deleted one by one because common.db does not allow otherwise
580 vdus_to_delete = [
581 v
582 for v in reversed(db_vnfr["vdur"])
583 if v["vdu-id-ref"] == vdu_id
584 ]
585 for vdu in vdus_to_delete[:vdu_count]:
586 self.db.set_one(
587 "vnfrs",
588 {"_id": db_vnfr["_id"]},
589 None,
590 pull={"vdur": {"_id": vdu["_id"]}},
591 )
592 db_push = {}
593 if db_vdu_push_list:
594 db_push["vdur"] = db_vdu_push_list
595 if template_vdur:
596 db_push["vdur-template"] = template_vdur
597 if not db_push:
598 db_push = None
599 db_vnfr["vdur-template"] = template_vdur
600 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
601 # modify passed dictionary db_vnfr
602 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
603 db_vnfr["vdur"] = db_vnfr_["vdur"]
604
605 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
606 """
607 Updates database nsr with the RO info for the created vld
608 :param ns_update_nsr: dictionary to be filled with the updated info
609 :param db_nsr: content of db_nsr. This is also modified
610 :param nsr_desc_RO: nsr descriptor from RO
611 :return: Nothing, LcmException is raised on errors
612 """
613
614 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
615 for net_RO in get_iterable(nsr_desc_RO, "nets"):
616 if vld["id"] != net_RO.get("ns_net_osm_id"):
617 continue
618 vld["vim-id"] = net_RO.get("vim_net_id")
619 vld["name"] = net_RO.get("vim_name")
620 vld["status"] = net_RO.get("status")
621 vld["status-detailed"] = net_RO.get("error_msg")
622 ns_update_nsr["vld.{}".format(vld_index)] = vld
623 break
624 else:
625 raise LcmException(
626 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
627 )
628
629 def set_vnfr_at_error(self, db_vnfrs, error_text):
630 try:
631 for db_vnfr in db_vnfrs.values():
632 vnfr_update = {"status": "ERROR"}
633 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
634 if "status" not in vdur:
635 vdur["status"] = "ERROR"
636 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
637 if error_text:
638 vdur["status-detailed"] = str(error_text)
639 vnfr_update[
640 "vdur.{}.status-detailed".format(vdu_index)
641 ] = "ERROR"
642 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
643 except DbException as e:
644 self.logger.error("Cannot update vnf. {}".format(e))
645
646 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
647 """
648 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
649 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
650 :param nsr_desc_RO: nsr descriptor from RO
651 :return: Nothing, LcmException is raised on errors
652 """
653 for vnf_index, db_vnfr in db_vnfrs.items():
654 for vnf_RO in nsr_desc_RO["vnfs"]:
655 if vnf_RO["member_vnf_index"] != vnf_index:
656 continue
657 vnfr_update = {}
658 if vnf_RO.get("ip_address"):
659 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
660 "ip_address"
661 ].split(";")[0]
662 elif not db_vnfr.get("ip-address"):
663 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
664 raise LcmExceptionNoMgmtIP(
665 "ns member_vnf_index '{}' has no IP address".format(
666 vnf_index
667 )
668 )
669
670 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
671 vdur_RO_count_index = 0
672 if vdur.get("pdu-type"):
673 continue
674 for vdur_RO in get_iterable(vnf_RO, "vms"):
675 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
676 continue
677 if vdur["count-index"] != vdur_RO_count_index:
678 vdur_RO_count_index += 1
679 continue
680 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
681 if vdur_RO.get("ip_address"):
682 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
683 else:
684 vdur["ip-address"] = None
685 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
686 vdur["name"] = vdur_RO.get("vim_name")
687 vdur["status"] = vdur_RO.get("status")
688 vdur["status-detailed"] = vdur_RO.get("error_msg")
689 for ifacer in get_iterable(vdur, "interfaces"):
690 for interface_RO in get_iterable(vdur_RO, "interfaces"):
691 if ifacer["name"] == interface_RO.get("internal_name"):
692 ifacer["ip-address"] = interface_RO.get(
693 "ip_address"
694 )
695 ifacer["mac-address"] = interface_RO.get(
696 "mac_address"
697 )
698 break
699 else:
700 raise LcmException(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
702 "from VIM info".format(
703 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
704 )
705 )
706 vnfr_update["vdur.{}".format(vdu_index)] = vdur
707 break
708 else:
709 raise LcmException(
710 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
711 "VIM info".format(
712 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
713 )
714 )
715
716 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
717 for net_RO in get_iterable(nsr_desc_RO, "nets"):
718 if vld["id"] != net_RO.get("vnf_net_osm_id"):
719 continue
720 vld["vim-id"] = net_RO.get("vim_net_id")
721 vld["name"] = net_RO.get("vim_name")
722 vld["status"] = net_RO.get("status")
723 vld["status-detailed"] = net_RO.get("error_msg")
724 vnfr_update["vld.{}".format(vld_index)] = vld
725 break
726 else:
727 raise LcmException(
728 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
729 vnf_index, vld["id"]
730 )
731 )
732
733 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
734 break
735
736 else:
737 raise LcmException(
738 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
739 vnf_index
740 )
741 )
742
743 def _get_ns_config_info(self, nsr_id):
744 """
745 Generates a mapping between vnf,vdu elements and the N2VC id
746 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
747 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
748 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
749 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
750 """
751 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
752 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
753 mapping = {}
754 ns_config_info = {"osm-config-mapping": mapping}
755 for vca in vca_deployed_list:
756 if not vca["member-vnf-index"]:
757 continue
758 if not vca["vdu_id"]:
759 mapping[vca["member-vnf-index"]] = vca["application"]
760 else:
761 mapping[
762 "{}.{}.{}".format(
763 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
764 )
765 ] = vca["application"]
766 return ns_config_info
767
768 async def _instantiate_ng_ro(
769 self,
770 logging_text,
771 nsr_id,
772 nsd,
773 db_nsr,
774 db_nslcmop,
775 db_vnfrs,
776 db_vnfds,
777 n2vc_key_list,
778 stage,
779 start_deploy,
780 timeout_ns_deploy,
781 ):
782
783 db_vims = {}
784
785 def get_vim_account(vim_account_id):
786 nonlocal db_vims
787 if vim_account_id in db_vims:
788 return db_vims[vim_account_id]
789 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
790 db_vims[vim_account_id] = db_vim
791 return db_vim
792
793 # modify target_vld info with instantiation parameters
794 def parse_vld_instantiation_params(
795 target_vim, target_vld, vld_params, target_sdn
796 ):
797 if vld_params.get("ip-profile"):
798 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
799 "ip-profile"
800 ]
801 if vld_params.get("provider-network"):
802 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
803 "provider-network"
804 ]
805 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
806 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
807 "provider-network"
808 ]["sdn-ports"]
809 if vld_params.get("wimAccountId"):
810 target_wim = "wim:{}".format(vld_params["wimAccountId"])
811 target_vld["vim_info"][target_wim] = {}
812 for param in ("vim-network-name", "vim-network-id"):
813 if vld_params.get(param):
814 if isinstance(vld_params[param], dict):
815 for vim, vim_net in vld_params[param].items():
816 other_target_vim = "vim:" + vim
817 populate_dict(
818 target_vld["vim_info"],
819 (other_target_vim, param.replace("-", "_")),
820 vim_net,
821 )
822 else: # isinstance str
823 target_vld["vim_info"][target_vim][
824 param.replace("-", "_")
825 ] = vld_params[param]
826 if vld_params.get("common_id"):
827 target_vld["common_id"] = vld_params.get("common_id")
828
829 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
830 def update_ns_vld_target(target, ns_params):
831 for vnf_params in ns_params.get("vnf", ()):
832 if vnf_params.get("vimAccountId"):
833 target_vnf = next(
834 (
835 vnfr
836 for vnfr in db_vnfrs.values()
837 if vnf_params["member-vnf-index"]
838 == vnfr["member-vnf-index-ref"]
839 ),
840 None,
841 )
842 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
843 for a_index, a_vld in enumerate(target["ns"]["vld"]):
844 target_vld = find_in_list(
845 get_iterable(vdur, "interfaces"),
846 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
847 )
848 if target_vld:
849 if vnf_params.get("vimAccountId") not in a_vld.get(
850 "vim_info", {}
851 ):
852 target["ns"]["vld"][a_index].get("vim_info").update(
853 {
854 "vim:{}".format(vnf_params["vimAccountId"]): {
855 "vim_network_name": ""
856 }
857 }
858 )
859
860 nslcmop_id = db_nslcmop["_id"]
861 target = {
862 "name": db_nsr["name"],
863 "ns": {"vld": []},
864 "vnf": [],
865 "image": deepcopy(db_nsr["image"]),
866 "flavor": deepcopy(db_nsr["flavor"]),
867 "action_id": nslcmop_id,
868 "cloud_init_content": {},
869 }
870 for image in target["image"]:
871 image["vim_info"] = {}
872 for flavor in target["flavor"]:
873 flavor["vim_info"] = {}
874 if db_nsr.get("affinity-or-anti-affinity-group"):
875 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
876 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
877 affinity_or_anti_affinity_group["vim_info"] = {}
878
879 if db_nslcmop.get("lcmOperationType") != "instantiate":
880 # get parameters of instantiation:
881 db_nslcmop_instantiate = self.db.get_list(
882 "nslcmops",
883 {
884 "nsInstanceId": db_nslcmop["nsInstanceId"],
885 "lcmOperationType": "instantiate",
886 },
887 )[-1]
888 ns_params = db_nslcmop_instantiate.get("operationParams")
889 else:
890 ns_params = db_nslcmop.get("operationParams")
891 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
892 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
893
894 cp2target = {}
895 for vld_index, vld in enumerate(db_nsr.get("vld")):
896 target_vim = "vim:{}".format(ns_params["vimAccountId"])
897 target_vld = {
898 "id": vld["id"],
899 "name": vld["name"],
900 "mgmt-network": vld.get("mgmt-network", False),
901 "type": vld.get("type"),
902 "vim_info": {
903 target_vim: {
904 "vim_network_name": vld.get("vim-network-name"),
905 "vim_account_id": ns_params["vimAccountId"],
906 }
907 },
908 }
909 # check if this network needs SDN assist
910 if vld.get("pci-interfaces"):
911 db_vim = get_vim_account(ns_params["vimAccountId"])
912 sdnc_id = db_vim["config"].get("sdn-controller")
913 if sdnc_id:
914 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
915 target_sdn = "sdn:{}".format(sdnc_id)
916 target_vld["vim_info"][target_sdn] = {
917 "sdn": True,
918 "target_vim": target_vim,
919 "vlds": [sdn_vld],
920 "type": vld.get("type"),
921 }
922
923 nsd_vnf_profiles = get_vnf_profiles(nsd)
924 for nsd_vnf_profile in nsd_vnf_profiles:
925 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
926 if cp["virtual-link-profile-id"] == vld["id"]:
927 cp2target[
928 "member_vnf:{}.{}".format(
929 cp["constituent-cpd-id"][0][
930 "constituent-base-element-id"
931 ],
932 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
933 )
934 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
935
936 # check at nsd descriptor, if there is an ip-profile
937 vld_params = {}
938 nsd_vlp = find_in_list(
939 get_virtual_link_profiles(nsd),
940 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
941 == vld["id"],
942 )
943 if (
944 nsd_vlp
945 and nsd_vlp.get("virtual-link-protocol-data")
946 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
947 ):
948 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
949 "l3-protocol-data"
950 ]
951 ip_profile_dest_data = {}
952 if "ip-version" in ip_profile_source_data:
953 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
954 "ip-version"
955 ]
956 if "cidr" in ip_profile_source_data:
957 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
958 "cidr"
959 ]
960 if "gateway-ip" in ip_profile_source_data:
961 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
962 "gateway-ip"
963 ]
964 if "dhcp-enabled" in ip_profile_source_data:
965 ip_profile_dest_data["dhcp-params"] = {
966 "enabled": ip_profile_source_data["dhcp-enabled"]
967 }
968 vld_params["ip-profile"] = ip_profile_dest_data
969
970 # update vld_params with instantiation params
971 vld_instantiation_params = find_in_list(
972 get_iterable(ns_params, "vld"),
973 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
974 )
975 if vld_instantiation_params:
976 vld_params.update(vld_instantiation_params)
977 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
978 target["ns"]["vld"].append(target_vld)
979 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
980 update_ns_vld_target(target, ns_params)
981
982 for vnfr in db_vnfrs.values():
983 vnfd = find_in_list(
984 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
985 )
986 vnf_params = find_in_list(
987 get_iterable(ns_params, "vnf"),
988 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
989 )
990 target_vnf = deepcopy(vnfr)
991 target_vim = "vim:{}".format(vnfr["vim-account-id"])
992 for vld in target_vnf.get("vld", ()):
993 # check if connected to a ns.vld, to fill target'
994 vnf_cp = find_in_list(
995 vnfd.get("int-virtual-link-desc", ()),
996 lambda cpd: cpd.get("id") == vld["id"],
997 )
998 if vnf_cp:
999 ns_cp = "member_vnf:{}.{}".format(
1000 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1001 )
1002 if cp2target.get(ns_cp):
1003 vld["target"] = cp2target[ns_cp]
1004
1005 vld["vim_info"] = {
1006 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1007 }
1008 # check if this network needs SDN assist
1009 target_sdn = None
1010 if vld.get("pci-interfaces"):
1011 db_vim = get_vim_account(vnfr["vim-account-id"])
1012 sdnc_id = db_vim["config"].get("sdn-controller")
1013 if sdnc_id:
1014 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1015 target_sdn = "sdn:{}".format(sdnc_id)
1016 vld["vim_info"][target_sdn] = {
1017 "sdn": True,
1018 "target_vim": target_vim,
1019 "vlds": [sdn_vld],
1020 "type": vld.get("type"),
1021 }
1022
1023 # check at vnfd descriptor, if there is an ip-profile
1024 vld_params = {}
1025 vnfd_vlp = find_in_list(
1026 get_virtual_link_profiles(vnfd),
1027 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1028 )
1029 if (
1030 vnfd_vlp
1031 and vnfd_vlp.get("virtual-link-protocol-data")
1032 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1033 ):
1034 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1035 "l3-protocol-data"
1036 ]
1037 ip_profile_dest_data = {}
1038 if "ip-version" in ip_profile_source_data:
1039 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1040 "ip-version"
1041 ]
1042 if "cidr" in ip_profile_source_data:
1043 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1044 "cidr"
1045 ]
1046 if "gateway-ip" in ip_profile_source_data:
1047 ip_profile_dest_data[
1048 "gateway-address"
1049 ] = ip_profile_source_data["gateway-ip"]
1050 if "dhcp-enabled" in ip_profile_source_data:
1051 ip_profile_dest_data["dhcp-params"] = {
1052 "enabled": ip_profile_source_data["dhcp-enabled"]
1053 }
1054
1055 vld_params["ip-profile"] = ip_profile_dest_data
1056 # update vld_params with instantiation params
1057 if vnf_params:
1058 vld_instantiation_params = find_in_list(
1059 get_iterable(vnf_params, "internal-vld"),
1060 lambda i_vld: i_vld["name"] == vld["id"],
1061 )
1062 if vld_instantiation_params:
1063 vld_params.update(vld_instantiation_params)
1064 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1065
1066 vdur_list = []
1067 for vdur in target_vnf.get("vdur", ()):
1068 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1069 continue # This vdu must not be created
1070 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1071
1072 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1073
1074 if ssh_keys_all:
1075 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1076 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1077 if (
1078 vdu_configuration
1079 and vdu_configuration.get("config-access")
1080 and vdu_configuration.get("config-access").get("ssh-access")
1081 ):
1082 vdur["ssh-keys"] = ssh_keys_all
1083 vdur["ssh-access-required"] = vdu_configuration[
1084 "config-access"
1085 ]["ssh-access"]["required"]
1086 elif (
1087 vnf_configuration
1088 and vnf_configuration.get("config-access")
1089 and vnf_configuration.get("config-access").get("ssh-access")
1090 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1091 ):
1092 vdur["ssh-keys"] = ssh_keys_all
1093 vdur["ssh-access-required"] = vnf_configuration[
1094 "config-access"
1095 ]["ssh-access"]["required"]
1096 elif ssh_keys_instantiation and find_in_list(
1097 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1098 ):
1099 vdur["ssh-keys"] = ssh_keys_instantiation
1100
1101 self.logger.debug("NS > vdur > {}".format(vdur))
1102
1103 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1104 # cloud-init
1105 if vdud.get("cloud-init-file"):
1106 vdur["cloud-init"] = "{}:file:{}".format(
1107 vnfd["_id"], vdud.get("cloud-init-file")
1108 )
1109 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1110 if vdur["cloud-init"] not in target["cloud_init_content"]:
1111 base_folder = vnfd["_admin"]["storage"]
1112 cloud_init_file = "{}/{}/cloud_init/{}".format(
1113 base_folder["folder"],
1114 base_folder["pkg-dir"],
1115 vdud.get("cloud-init-file"),
1116 )
1117 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1118 target["cloud_init_content"][
1119 vdur["cloud-init"]
1120 ] = ci_file.read()
1121 elif vdud.get("cloud-init"):
1122 vdur["cloud-init"] = "{}:vdu:{}".format(
1123 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1124 )
1125 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1126 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1127 "cloud-init"
1128 ]
1129 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1130 deploy_params_vdu = self._format_additional_params(
1131 vdur.get("additionalParams") or {}
1132 )
1133 deploy_params_vdu["OSM"] = get_osm_params(
1134 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1135 )
1136 vdur["additionalParams"] = deploy_params_vdu
1137
1138 # flavor
1139 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1140 if target_vim not in ns_flavor["vim_info"]:
1141 ns_flavor["vim_info"][target_vim] = {}
1142
1143 # deal with images
1144 # in case alternative images are provided we must check if they should be applied
1145 # for the vim_type, modify the vim_type taking into account
1146 ns_image_id = int(vdur["ns-image-id"])
1147 if vdur.get("alt-image-ids"):
1148 db_vim = get_vim_account(vnfr["vim-account-id"])
1149 vim_type = db_vim["vim_type"]
1150 for alt_image_id in vdur.get("alt-image-ids"):
1151 ns_alt_image = target["image"][int(alt_image_id)]
1152 if vim_type == ns_alt_image.get("vim-type"):
1153 # must use alternative image
1154 self.logger.debug(
1155 "use alternative image id: {}".format(alt_image_id)
1156 )
1157 ns_image_id = alt_image_id
1158 vdur["ns-image-id"] = ns_image_id
1159 break
1160 ns_image = target["image"][int(ns_image_id)]
1161 if target_vim not in ns_image["vim_info"]:
1162 ns_image["vim_info"][target_vim] = {}
1163
1164 # Affinity groups
1165 if vdur.get("affinity-or-anti-affinity-group-id"):
1166 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1167 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1168 if target_vim not in ns_ags["vim_info"]:
1169 ns_ags["vim_info"][target_vim] = {}
1170
1171 vdur["vim_info"] = {target_vim: {}}
1172 # instantiation parameters
1173 # if vnf_params:
1174 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1175 # vdud["id"]), None)
1176 vdur_list.append(vdur)
1177 target_vnf["vdur"] = vdur_list
1178 target["vnf"].append(target_vnf)
1179
1180 desc = await self.RO.deploy(nsr_id, target)
1181 self.logger.debug("RO return > {}".format(desc))
1182 action_id = desc["action_id"]
1183 await self._wait_ng_ro(
1184 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1185 )
1186
1187 # Updating NSR
1188 db_nsr_update = {
1189 "_admin.deployed.RO.operational-status": "running",
1190 "detailed-status": " ".join(stage),
1191 }
1192 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1193 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1194 self._write_op_status(nslcmop_id, stage)
1195 self.logger.debug(
1196 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1197 )
1198 return
1199
1200 async def _wait_ng_ro(
1201 self,
1202 nsr_id,
1203 action_id,
1204 nslcmop_id=None,
1205 start_time=None,
1206 timeout=600,
1207 stage=None,
1208 ):
1209 detailed_status_old = None
1210 db_nsr_update = {}
1211 start_time = start_time or time()
1212 while time() <= start_time + timeout:
1213 desc_status = await self.RO.status(nsr_id, action_id)
1214 self.logger.debug("Wait NG RO > {}".format(desc_status))
1215 if desc_status["status"] == "FAILED":
1216 raise NgRoException(desc_status["details"])
1217 elif desc_status["status"] == "BUILD":
1218 if stage:
1219 stage[2] = "VIM: ({})".format(desc_status["details"])
1220 elif desc_status["status"] == "DONE":
1221 if stage:
1222 stage[2] = "Deployed at VIM"
1223 break
1224 else:
1225 assert False, "ROclient.check_ns_status returns unknown {}".format(
1226 desc_status["status"]
1227 )
1228 if stage and nslcmop_id and stage[2] != detailed_status_old:
1229 detailed_status_old = stage[2]
1230 db_nsr_update["detailed-status"] = " ".join(stage)
1231 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1232 self._write_op_status(nslcmop_id, stage)
1233 await asyncio.sleep(15, loop=self.loop)
1234 else: # timeout_ns_deploy
1235 raise NgRoException("Timeout waiting ns to deploy")
1236
1237 async def _terminate_ng_ro(
1238 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1239 ):
1240 db_nsr_update = {}
1241 failed_detail = []
1242 action_id = None
1243 start_deploy = time()
1244 try:
1245 target = {
1246 "ns": {"vld": []},
1247 "vnf": [],
1248 "image": [],
1249 "flavor": [],
1250 "action_id": nslcmop_id,
1251 }
1252 desc = await self.RO.deploy(nsr_id, target)
1253 action_id = desc["action_id"]
1254 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1255 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1256 self.logger.debug(
1257 logging_text
1258 + "ns terminate action at RO. action_id={}".format(action_id)
1259 )
1260
1261 # wait until done
1262 delete_timeout = 20 * 60 # 20 minutes
1263 await self._wait_ng_ro(
1264 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1265 )
1266
1267 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1268 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1269 # delete all nsr
1270 await self.RO.delete(nsr_id)
1271 except Exception as e:
1272 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1273 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1274 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1275 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1276 self.logger.debug(
1277 logging_text + "RO_action_id={} already deleted".format(action_id)
1278 )
1279 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1280 failed_detail.append("delete conflict: {}".format(e))
1281 self.logger.debug(
1282 logging_text
1283 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1284 )
1285 else:
1286 failed_detail.append("delete error: {}".format(e))
1287 self.logger.error(
1288 logging_text
1289 + "RO_action_id={} delete error: {}".format(action_id, e)
1290 )
1291
1292 if failed_detail:
1293 stage[2] = "Error deleting from VIM"
1294 else:
1295 stage[2] = "Deleted from VIM"
1296 db_nsr_update["detailed-status"] = " ".join(stage)
1297 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1298 self._write_op_status(nslcmop_id, stage)
1299
1300 if failed_detail:
1301 raise LcmException("; ".join(failed_detail))
1302 return
1303
1304 async def instantiate_RO(
1305 self,
1306 logging_text,
1307 nsr_id,
1308 nsd,
1309 db_nsr,
1310 db_nslcmop,
1311 db_vnfrs,
1312 db_vnfds,
1313 n2vc_key_list,
1314 stage,
1315 ):
1316 """
1317 Instantiate at RO
1318 :param logging_text: preffix text to use at logging
1319 :param nsr_id: nsr identity
1320 :param nsd: database content of ns descriptor
1321 :param db_nsr: database content of ns record
1322 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1323 :param db_vnfrs:
1324 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1325 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1326 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1327 :return: None or exception
1328 """
1329 try:
1330 start_deploy = time()
1331 ns_params = db_nslcmop.get("operationParams")
1332 if ns_params and ns_params.get("timeout_ns_deploy"):
1333 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1334 else:
1335 timeout_ns_deploy = self.timeout.get(
1336 "ns_deploy", self.timeout_ns_deploy
1337 )
1338
1339 # Check for and optionally request placement optimization. Database will be updated if placement activated
1340 stage[2] = "Waiting for Placement."
1341 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1342 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1343 for vnfr in db_vnfrs.values():
1344 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1345 break
1346 else:
1347 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1348
1349 return await self._instantiate_ng_ro(
1350 logging_text,
1351 nsr_id,
1352 nsd,
1353 db_nsr,
1354 db_nslcmop,
1355 db_vnfrs,
1356 db_vnfds,
1357 n2vc_key_list,
1358 stage,
1359 start_deploy,
1360 timeout_ns_deploy,
1361 )
1362 except Exception as e:
1363 stage[2] = "ERROR deploying at VIM"
1364 self.set_vnfr_at_error(db_vnfrs, str(e))
1365 self.logger.error(
1366 "Error deploying at VIM {}".format(e),
1367 exc_info=not isinstance(
1368 e,
1369 (
1370 ROclient.ROClientException,
1371 LcmException,
1372 DbException,
1373 NgRoException,
1374 ),
1375 ),
1376 )
1377 raise
1378
1379 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1380 """
1381 Wait for kdu to be up, get ip address
1382 :param logging_text: prefix use for logging
1383 :param nsr_id:
1384 :param vnfr_id:
1385 :param kdu_name:
1386 :return: IP address
1387 """
1388
1389 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1390 nb_tries = 0
1391
1392 while nb_tries < 360:
1393 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1394 kdur = next(
1395 (
1396 x
1397 for x in get_iterable(db_vnfr, "kdur")
1398 if x.get("kdu-name") == kdu_name
1399 ),
1400 None,
1401 )
1402 if not kdur:
1403 raise LcmException(
1404 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1405 )
1406 if kdur.get("status"):
1407 if kdur["status"] in ("READY", "ENABLED"):
1408 return kdur.get("ip-address")
1409 else:
1410 raise LcmException(
1411 "target KDU={} is in error state".format(kdu_name)
1412 )
1413
1414 await asyncio.sleep(10, loop=self.loop)
1415 nb_tries += 1
1416 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1417
1418 async def wait_vm_up_insert_key_ro(
1419 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1420 ):
1421 """
1422 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1423 :param logging_text: prefix use for logging
1424 :param nsr_id:
1425 :param vnfr_id:
1426 :param vdu_id:
1427 :param vdu_index:
1428 :param pub_key: public ssh key to inject, None to skip
1429 :param user: user to apply the public ssh key
1430 :return: IP address
1431 """
1432
1433 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1434 ro_nsr_id = None
1435 ip_address = None
1436 nb_tries = 0
1437 target_vdu_id = None
1438 ro_retries = 0
1439
1440 while True:
1441
1442 ro_retries += 1
1443 if ro_retries >= 360: # 1 hour
1444 raise LcmException(
1445 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1446 )
1447
1448 await asyncio.sleep(10, loop=self.loop)
1449
1450 # get ip address
1451 if not target_vdu_id:
1452 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1453
1454 if not vdu_id: # for the VNF case
1455 if db_vnfr.get("status") == "ERROR":
1456 raise LcmException(
1457 "Cannot inject ssh-key because target VNF is in error state"
1458 )
1459 ip_address = db_vnfr.get("ip-address")
1460 if not ip_address:
1461 continue
1462 vdur = next(
1463 (
1464 x
1465 for x in get_iterable(db_vnfr, "vdur")
1466 if x.get("ip-address") == ip_address
1467 ),
1468 None,
1469 )
1470 else: # VDU case
1471 vdur = next(
1472 (
1473 x
1474 for x in get_iterable(db_vnfr, "vdur")
1475 if x.get("vdu-id-ref") == vdu_id
1476 and x.get("count-index") == vdu_index
1477 ),
1478 None,
1479 )
1480
1481 if (
1482 not vdur and len(db_vnfr.get("vdur", ())) == 1
1483 ): # If only one, this should be the target vdu
1484 vdur = db_vnfr["vdur"][0]
1485 if not vdur:
1486 raise LcmException(
1487 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1488 vnfr_id, vdu_id, vdu_index
1489 )
1490 )
1491 # New generation RO stores information at "vim_info"
1492 ng_ro_status = None
1493 target_vim = None
1494 if vdur.get("vim_info"):
1495 target_vim = next(
1496 t for t in vdur["vim_info"]
1497 ) # there should be only one key
1498 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1499 if (
1500 vdur.get("pdu-type")
1501 or vdur.get("status") == "ACTIVE"
1502 or ng_ro_status == "ACTIVE"
1503 ):
1504 ip_address = vdur.get("ip-address")
1505 if not ip_address:
1506 continue
1507 target_vdu_id = vdur["vdu-id-ref"]
1508 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1509 raise LcmException(
1510 "Cannot inject ssh-key because target VM is in error state"
1511 )
1512
1513 if not target_vdu_id:
1514 continue
1515
1516 # inject public key into machine
1517 if pub_key and user:
1518 self.logger.debug(logging_text + "Inserting RO key")
1519 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1520 if vdur.get("pdu-type"):
1521 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1522 return ip_address
1523 try:
1524 ro_vm_id = "{}-{}".format(
1525 db_vnfr["member-vnf-index-ref"], target_vdu_id
1526 ) # TODO add vdu_index
1527 if self.ng_ro:
1528 target = {
1529 "action": {
1530 "action": "inject_ssh_key",
1531 "key": pub_key,
1532 "user": user,
1533 },
1534 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1535 }
1536 desc = await self.RO.deploy(nsr_id, target)
1537 action_id = desc["action_id"]
1538 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1539 break
1540 else:
1541 # wait until NS is deployed at RO
1542 if not ro_nsr_id:
1543 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1544 ro_nsr_id = deep_get(
1545 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1546 )
1547 if not ro_nsr_id:
1548 continue
1549 result_dict = await self.RO.create_action(
1550 item="ns",
1551 item_id_name=ro_nsr_id,
1552 descriptor={
1553 "add_public_key": pub_key,
1554 "vms": [ro_vm_id],
1555 "user": user,
1556 },
1557 )
1558 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1559 if not result_dict or not isinstance(result_dict, dict):
1560 raise LcmException(
1561 "Unknown response from RO when injecting key"
1562 )
1563 for result in result_dict.values():
1564 if result.get("vim_result") == 200:
1565 break
1566 else:
1567 raise ROclient.ROClientException(
1568 "error injecting key: {}".format(
1569 result.get("description")
1570 )
1571 )
1572 break
1573 except NgRoException as e:
1574 raise LcmException(
1575 "Reaching max tries injecting key. Error: {}".format(e)
1576 )
1577 except ROclient.ROClientException as e:
1578 if not nb_tries:
1579 self.logger.debug(
1580 logging_text
1581 + "error injecting key: {}. Retrying until {} seconds".format(
1582 e, 20 * 10
1583 )
1584 )
1585 nb_tries += 1
1586 if nb_tries >= 20:
1587 raise LcmException(
1588 "Reaching max tries injecting key. Error: {}".format(e)
1589 )
1590 else:
1591 break
1592
1593 return ip_address
1594
1595 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1596 """
1597 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1598 """
1599 my_vca = vca_deployed_list[vca_index]
1600 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1601 # vdu or kdu: no dependencies
1602 return
1603 timeout = 300
1604 while timeout >= 0:
1605 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1606 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1607 configuration_status_list = db_nsr["configurationStatus"]
1608 for index, vca_deployed in enumerate(configuration_status_list):
1609 if index == vca_index:
1610 # myself
1611 continue
1612 if not my_vca.get("member-vnf-index") or (
1613 vca_deployed.get("member-vnf-index")
1614 == my_vca.get("member-vnf-index")
1615 ):
1616 internal_status = configuration_status_list[index].get("status")
1617 if internal_status == "READY":
1618 continue
1619 elif internal_status == "BROKEN":
1620 raise LcmException(
1621 "Configuration aborted because dependent charm/s has failed"
1622 )
1623 else:
1624 break
1625 else:
1626 # no dependencies, return
1627 return
1628 await asyncio.sleep(10)
1629 timeout -= 1
1630
1631 raise LcmException("Configuration aborted because dependent charm/s timeout")
1632
1633 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1634 vca_id = None
1635 if db_vnfr:
1636 vca_id = deep_get(db_vnfr, ("vca-id",))
1637 elif db_nsr:
1638 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1639 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1640 return vca_id
1641
1642 async def instantiate_N2VC(
1643 self,
1644 logging_text,
1645 vca_index,
1646 nsi_id,
1647 db_nsr,
1648 db_vnfr,
1649 vdu_id,
1650 kdu_name,
1651 vdu_index,
1652 config_descriptor,
1653 deploy_params,
1654 base_folder,
1655 nslcmop_id,
1656 stage,
1657 vca_type,
1658 vca_name,
1659 ee_config_descriptor,
1660 ):
1661 nsr_id = db_nsr["_id"]
1662 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1663 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1664 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1665 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1666 db_dict = {
1667 "collection": "nsrs",
1668 "filter": {"_id": nsr_id},
1669 "path": db_update_entry,
1670 }
1671 step = ""
1672 try:
1673
1674 element_type = "NS"
1675 element_under_configuration = nsr_id
1676
1677 vnfr_id = None
1678 if db_vnfr:
1679 vnfr_id = db_vnfr["_id"]
1680 osm_config["osm"]["vnf_id"] = vnfr_id
1681
1682 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1683
1684 if vca_type == "native_charm":
1685 index_number = 0
1686 else:
1687 index_number = vdu_index or 0
1688
1689 if vnfr_id:
1690 element_type = "VNF"
1691 element_under_configuration = vnfr_id
1692 namespace += ".{}-{}".format(vnfr_id, index_number)
1693 if vdu_id:
1694 namespace += ".{}-{}".format(vdu_id, index_number)
1695 element_type = "VDU"
1696 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1697 osm_config["osm"]["vdu_id"] = vdu_id
1698 elif kdu_name:
1699 namespace += ".{}".format(kdu_name)
1700 element_type = "KDU"
1701 element_under_configuration = kdu_name
1702 osm_config["osm"]["kdu_name"] = kdu_name
1703
1704 # Get artifact path
1705 artifact_path = "{}/{}/{}/{}".format(
1706 base_folder["folder"],
1707 base_folder["pkg-dir"],
1708 "charms"
1709 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1710 else "helm-charts",
1711 vca_name,
1712 )
1713
1714 self.logger.debug("Artifact path > {}".format(artifact_path))
1715
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list = config_descriptor.get(
1718 "initial-config-primitive"
1719 )
1720
1721 self.logger.debug(
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1724 )
1725 )
1726
1727 # add config if not present for NS charm
1728 ee_descriptor_id = ee_config_descriptor.get("id")
1729 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1730 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1732 )
1733
1734 self.logger.debug(
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1737 )
1738 )
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id = vca_deployed.get("ee_id")
1742
1743 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1744 # create or register execution environment in VCA
1745 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1746
1747 self._write_configuration_status(
1748 nsr_id=nsr_id,
1749 vca_index=vca_index,
1750 status="CREATING",
1751 element_under_configuration=element_under_configuration,
1752 element_type=element_type,
1753 )
1754
1755 step = "create execution environment"
1756 self.logger.debug(logging_text + step)
1757
1758 ee_id = None
1759 credentials = None
1760 if vca_type == "k8s_proxy_charm":
1761 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1762 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1763 namespace=namespace,
1764 artifact_path=artifact_path,
1765 db_dict=db_dict,
1766 vca_id=vca_id,
1767 )
1768 elif vca_type == "helm" or vca_type == "helm-v3":
1769 ee_id, credentials = await self.vca_map[
1770 vca_type
1771 ].create_execution_environment(
1772 namespace=namespace,
1773 reuse_ee_id=ee_id,
1774 db_dict=db_dict,
1775 config=osm_config,
1776 artifact_path=artifact_path,
1777 vca_type=vca_type,
1778 )
1779 else:
1780 ee_id, credentials = await self.vca_map[
1781 vca_type
1782 ].create_execution_environment(
1783 namespace=namespace,
1784 reuse_ee_id=ee_id,
1785 db_dict=db_dict,
1786 vca_id=vca_id,
1787 )
1788
1789 elif vca_type == "native_charm":
1790 step = "Waiting to VM being up and getting IP address"
1791 self.logger.debug(logging_text + step)
1792 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1793 logging_text,
1794 nsr_id,
1795 vnfr_id,
1796 vdu_id,
1797 vdu_index,
1798 user=None,
1799 pub_key=None,
1800 )
1801 credentials = {"hostname": rw_mgmt_ip}
1802 # get username
1803 username = deep_get(
1804 config_descriptor, ("config-access", "ssh-access", "default-user")
1805 )
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username and initial_config_primitive_list:
1809 for config_primitive in initial_config_primitive_list:
1810 for param in config_primitive.get("parameter", ()):
1811 if param["name"] == "ssh-username":
1812 username = param["value"]
1813 break
1814 if not username:
1815 raise LcmException(
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1818 )
1819 credentials["username"] = username
1820 # n2vc_redesign STEP 3.2
1821
1822 self._write_configuration_status(
1823 nsr_id=nsr_id,
1824 vca_index=vca_index,
1825 status="REGISTERING",
1826 element_under_configuration=element_under_configuration,
1827 element_type=element_type,
1828 )
1829
1830 step = "register execution environment {}".format(credentials)
1831 self.logger.debug(logging_text + step)
1832 ee_id = await self.vca_map[vca_type].register_execution_environment(
1833 credentials=credentials,
1834 namespace=namespace,
1835 db_dict=db_dict,
1836 vca_id=vca_id,
1837 )
1838
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts = ee_id.split(".")
1842 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1843 if len(ee_id_parts) >= 2:
1844 model_name = ee_id_parts[0]
1845 application_name = ee_id_parts[1]
1846 db_nsr_update[db_update_entry + "model"] = model_name
1847 db_nsr_update[db_update_entry + "application"] = application_name
1848
1849 # n2vc_redesign STEP 3.3
1850 step = "Install configuration Software"
1851
1852 self._write_configuration_status(
1853 nsr_id=nsr_id,
1854 vca_index=vca_index,
1855 status="INSTALLING SW",
1856 element_under_configuration=element_under_configuration,
1857 element_type=element_type,
1858 other_update=db_nsr_update,
1859 )
1860
1861 # TODO check if already done
1862 self.logger.debug(logging_text + step)
1863 config = None
1864 if vca_type == "native_charm":
1865 config_primitive = next(
1866 (p for p in initial_config_primitive_list if p["name"] == "config"),
1867 None,
1868 )
1869 if config_primitive:
1870 config = self._map_primitive_params(
1871 config_primitive, {}, deploy_params
1872 )
1873 num_units = 1
1874 if vca_type == "lxc_proxy_charm":
1875 if element_type == "NS":
1876 num_units = db_nsr.get("config-units") or 1
1877 elif element_type == "VNF":
1878 num_units = db_vnfr.get("config-units") or 1
1879 elif element_type == "VDU":
1880 for v in db_vnfr["vdur"]:
1881 if vdu_id == v["vdu-id-ref"]:
1882 num_units = v.get("config-units") or 1
1883 break
1884 if vca_type != "k8s_proxy_charm":
1885 await self.vca_map[vca_type].install_configuration_sw(
1886 ee_id=ee_id,
1887 artifact_path=artifact_path,
1888 db_dict=db_dict,
1889 config=config,
1890 num_units=num_units,
1891 vca_id=vca_id,
1892 vca_type=vca_type,
1893 )
1894
1895 # write in db flag of configuration_sw already installed
1896 self.update_db_2(
1897 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1898 )
1899
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self._add_vca_relations(
1902 logging_text=logging_text,
1903 nsr_id=nsr_id,
1904 vca_index=vca_index,
1905 vca_id=vca_id,
1906 vca_type=vca_type,
1907 )
1908
1909 # if SSH access is required, then get execution environment SSH public
1910 # if native charm we have waited already to VM be UP
1911 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1912 pub_key = None
1913 user = None
1914 # self.logger.debug("get ssh key block")
1915 if deep_get(
1916 config_descriptor, ("config-access", "ssh-access", "required")
1917 ):
1918 # self.logger.debug("ssh key needed")
1919 # Needed to inject a ssh key
1920 user = deep_get(
1921 config_descriptor,
1922 ("config-access", "ssh-access", "default-user"),
1923 )
1924 step = "Install configuration Software, getting public ssh key"
1925 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1926 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1927 )
1928
1929 step = "Insert public key into VM user={} ssh_key={}".format(
1930 user, pub_key
1931 )
1932 else:
1933 # self.logger.debug("no need to get ssh key")
1934 step = "Waiting to VM being up and getting IP address"
1935 self.logger.debug(logging_text + step)
1936
1937 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1938 rw_mgmt_ip = None
1939
1940 # n2vc_redesign STEP 5.1
1941 # wait for RO (ip-address) Insert pub_key into VM
1942 if vnfr_id:
1943 if kdu_name:
1944 rw_mgmt_ip = await self.wait_kdu_up(
1945 logging_text, nsr_id, vnfr_id, kdu_name
1946 )
1947
1948 # This verification is needed in order to avoid trying to add a public key
1949 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1950 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1951 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1952 # or it is a KNF)
1953 elif db_vnfr.get('vdur'):
1954 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1955 logging_text,
1956 nsr_id,
1957 vnfr_id,
1958 vdu_id,
1959 vdu_index,
1960 user=user,
1961 pub_key=pub_key,
1962 )
1963
1964 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1965
1966 # store rw_mgmt_ip in deploy params for later replacement
1967 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1968
1969 # n2vc_redesign STEP 6 Execute initial config primitive
1970 step = "execute initial config primitive"
1971
1972 # wait for dependent primitives execution (NS -> VNF -> VDU)
1973 if initial_config_primitive_list:
1974 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1975
1976 # stage, in function of element type: vdu, kdu, vnf or ns
1977 my_vca = vca_deployed_list[vca_index]
1978 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1979 # VDU or KDU
1980 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1981 elif my_vca.get("member-vnf-index"):
1982 # VNF
1983 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1984 else:
1985 # NS
1986 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1987
1988 self._write_configuration_status(
1989 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1990 )
1991
1992 self._write_op_status(op_id=nslcmop_id, stage=stage)
1993
1994 check_if_terminated_needed = True
1995 for initial_config_primitive in initial_config_primitive_list:
1996 # adding information on the vca_deployed if it is a NS execution environment
1997 if not vca_deployed["member-vnf-index"]:
1998 deploy_params["ns_config_info"] = json.dumps(
1999 self._get_ns_config_info(nsr_id)
2000 )
2001 # TODO check if already done
2002 primitive_params_ = self._map_primitive_params(
2003 initial_config_primitive, {}, deploy_params
2004 )
2005
2006 step = "execute primitive '{}' params '{}'".format(
2007 initial_config_primitive["name"], primitive_params_
2008 )
2009 self.logger.debug(logging_text + step)
2010 await self.vca_map[vca_type].exec_primitive(
2011 ee_id=ee_id,
2012 primitive_name=initial_config_primitive["name"],
2013 params_dict=primitive_params_,
2014 db_dict=db_dict,
2015 vca_id=vca_id,
2016 vca_type=vca_type,
2017 )
2018 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2019 if check_if_terminated_needed:
2020 if config_descriptor.get("terminate-config-primitive"):
2021 self.update_db_2(
2022 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2023 )
2024 check_if_terminated_needed = False
2025
2026 # TODO register in database that primitive is done
2027
2028 # STEP 7 Configure metrics
2029 if vca_type == "helm" or vca_type == "helm-v3":
2030 prometheus_jobs = await self.add_prometheus_metrics(
2031 ee_id=ee_id,
2032 artifact_path=artifact_path,
2033 ee_config_descriptor=ee_config_descriptor,
2034 vnfr_id=vnfr_id,
2035 nsr_id=nsr_id,
2036 target_ip=rw_mgmt_ip,
2037 )
2038 if prometheus_jobs:
2039 self.update_db_2(
2040 "nsrs",
2041 nsr_id,
2042 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2043 )
2044
2045 step = "instantiated at VCA"
2046 self.logger.debug(logging_text + step)
2047
2048 self._write_configuration_status(
2049 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2050 )
2051
2052 except Exception as e: # TODO not use Exception but N2VC exception
2053 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2054 if not isinstance(
2055 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2056 ):
2057 self.logger.error(
2058 "Exception while {} : {}".format(step, e), exc_info=True
2059 )
2060 self._write_configuration_status(
2061 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2062 )
2063 raise LcmException("{} {}".format(step, e)) from e
2064
2065 def _write_ns_status(
2066 self,
2067 nsr_id: str,
2068 ns_state: str,
2069 current_operation: str,
2070 current_operation_id: str,
2071 error_description: str = None,
2072 error_detail: str = None,
2073 other_update: dict = None,
2074 ):
2075 """
2076 Update db_nsr fields.
2077 :param nsr_id:
2078 :param ns_state:
2079 :param current_operation:
2080 :param current_operation_id:
2081 :param error_description:
2082 :param error_detail:
2083 :param other_update: Other required changes at database if provided, will be cleared
2084 :return:
2085 """
2086 try:
2087 db_dict = other_update or {}
2088 db_dict[
2089 "_admin.nslcmop"
2090 ] = current_operation_id # for backward compatibility
2091 db_dict["_admin.current-operation"] = current_operation_id
2092 db_dict["_admin.operation-type"] = (
2093 current_operation if current_operation != "IDLE" else None
2094 )
2095 db_dict["currentOperation"] = current_operation
2096 db_dict["currentOperationID"] = current_operation_id
2097 db_dict["errorDescription"] = error_description
2098 db_dict["errorDetail"] = error_detail
2099
2100 if ns_state:
2101 db_dict["nsState"] = ns_state
2102 self.update_db_2("nsrs", nsr_id, db_dict)
2103 except DbException as e:
2104 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2105
2106 def _write_op_status(
2107 self,
2108 op_id: str,
2109 stage: list = None,
2110 error_message: str = None,
2111 queuePosition: int = 0,
2112 operation_state: str = None,
2113 other_update: dict = None,
2114 ):
2115 try:
2116 db_dict = other_update or {}
2117 db_dict["queuePosition"] = queuePosition
2118 if isinstance(stage, list):
2119 db_dict["stage"] = stage[0]
2120 db_dict["detailed-status"] = " ".join(stage)
2121 elif stage is not None:
2122 db_dict["stage"] = str(stage)
2123
2124 if error_message is not None:
2125 db_dict["errorMessage"] = error_message
2126 if operation_state is not None:
2127 db_dict["operationState"] = operation_state
2128 db_dict["statusEnteredTime"] = time()
2129 self.update_db_2("nslcmops", op_id, db_dict)
2130 except DbException as e:
2131 self.logger.warn(
2132 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2133 )
2134
2135 def _write_all_config_status(self, db_nsr: dict, status: str):
2136 try:
2137 nsr_id = db_nsr["_id"]
2138 # configurationStatus
2139 config_status = db_nsr.get("configurationStatus")
2140 if config_status:
2141 db_nsr_update = {
2142 "configurationStatus.{}.status".format(index): status
2143 for index, v in enumerate(config_status)
2144 if v
2145 }
2146 # update status
2147 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2148
2149 except DbException as e:
2150 self.logger.warn(
2151 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2152 )
2153
2154 def _write_configuration_status(
2155 self,
2156 nsr_id: str,
2157 vca_index: int,
2158 status: str = None,
2159 element_under_configuration: str = None,
2160 element_type: str = None,
2161 other_update: dict = None,
2162 ):
2163
2164 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2165 # .format(vca_index, status))
2166
2167 try:
2168 db_path = "configurationStatus.{}.".format(vca_index)
2169 db_dict = other_update or {}
2170 if status:
2171 db_dict[db_path + "status"] = status
2172 if element_under_configuration:
2173 db_dict[
2174 db_path + "elementUnderConfiguration"
2175 ] = element_under_configuration
2176 if element_type:
2177 db_dict[db_path + "elementType"] = element_type
2178 self.update_db_2("nsrs", nsr_id, db_dict)
2179 except DbException as e:
2180 self.logger.warn(
2181 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2182 status, nsr_id, vca_index, e
2183 )
2184 )
2185
2186 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2187 """
2188 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2189 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2190 Database is used because the result can be obtained from a different LCM worker in case of HA.
2191 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2192 :param db_nslcmop: database content of nslcmop
2193 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2194 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2195 computed 'vim-account-id'
2196 """
2197 modified = False
2198 nslcmop_id = db_nslcmop["_id"]
2199 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2200 if placement_engine == "PLA":
2201 self.logger.debug(
2202 logging_text + "Invoke and wait for placement optimization"
2203 )
2204 await self.msg.aiowrite(
2205 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2206 )
2207 db_poll_interval = 5
2208 wait = db_poll_interval * 10
2209 pla_result = None
2210 while not pla_result and wait >= 0:
2211 await asyncio.sleep(db_poll_interval)
2212 wait -= db_poll_interval
2213 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2214 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2215
2216 if not pla_result:
2217 raise LcmException(
2218 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2219 )
2220
2221 for pla_vnf in pla_result["vnf"]:
2222 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2223 if not pla_vnf.get("vimAccountId") or not vnfr:
2224 continue
2225 modified = True
2226 self.db.set_one(
2227 "vnfrs",
2228 {"_id": vnfr["_id"]},
2229 {"vim-account-id": pla_vnf["vimAccountId"]},
2230 )
2231 # Modifies db_vnfrs
2232 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2233 return modified
2234
2235 def update_nsrs_with_pla_result(self, params):
2236 try:
2237 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2238 self.update_db_2(
2239 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2240 )
2241 except Exception as e:
2242 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2243
2244 async def instantiate(self, nsr_id, nslcmop_id):
2245 """
2246
2247 :param nsr_id: ns instance to deploy
2248 :param nslcmop_id: operation to run
2249 :return:
2250 """
2251
2252 # Try to lock HA task here
2253 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2254 if not task_is_locked_by_me:
2255 self.logger.debug(
2256 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2257 )
2258 return
2259
2260 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2261 self.logger.debug(logging_text + "Enter")
2262
2263 # get all needed from database
2264
2265 # database nsrs record
2266 db_nsr = None
2267
2268 # database nslcmops record
2269 db_nslcmop = None
2270
2271 # update operation on nsrs
2272 db_nsr_update = {}
2273 # update operation on nslcmops
2274 db_nslcmop_update = {}
2275
2276 nslcmop_operation_state = None
2277 db_vnfrs = {} # vnf's info indexed by member-index
2278 # n2vc_info = {}
2279 tasks_dict_info = {} # from task to info text
2280 exc = None
2281 error_list = []
2282 stage = [
2283 "Stage 1/5: preparation of the environment.",
2284 "Waiting for previous operations to terminate.",
2285 "",
2286 ]
2287 # ^ stage, step, VIM progress
2288 try:
2289 # wait for any previous tasks in process
2290 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2291
2292 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2293 stage[1] = "Reading from database."
2294 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2295 db_nsr_update["detailed-status"] = "creating"
2296 db_nsr_update["operational-status"] = "init"
2297 self._write_ns_status(
2298 nsr_id=nsr_id,
2299 ns_state="BUILDING",
2300 current_operation="INSTANTIATING",
2301 current_operation_id=nslcmop_id,
2302 other_update=db_nsr_update,
2303 )
2304 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2305
2306 # read from db: operation
2307 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2308 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2309 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2310 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2311 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2312 )
2313 ns_params = db_nslcmop.get("operationParams")
2314 if ns_params and ns_params.get("timeout_ns_deploy"):
2315 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2316 else:
2317 timeout_ns_deploy = self.timeout.get(
2318 "ns_deploy", self.timeout_ns_deploy
2319 )
2320
2321 # read from db: ns
2322 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2323 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2324 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2325 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2326 self.fs.sync(db_nsr["nsd-id"])
2327 db_nsr["nsd"] = nsd
2328 # nsr_name = db_nsr["name"] # TODO short-name??
2329
2330 # read from db: vnf's of this ns
2331 stage[1] = "Getting vnfrs from db."
2332 self.logger.debug(logging_text + stage[1])
2333 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2334
2335 # read from db: vnfd's for every vnf
2336 db_vnfds = [] # every vnfd data
2337
2338 # for each vnf in ns, read vnfd
2339 for vnfr in db_vnfrs_list:
2340 if vnfr.get("kdur"):
2341 kdur_list = []
2342 for kdur in vnfr["kdur"]:
2343 if kdur.get("additionalParams"):
2344 kdur["additionalParams"] = json.loads(
2345 kdur["additionalParams"]
2346 )
2347 kdur_list.append(kdur)
2348 vnfr["kdur"] = kdur_list
2349
2350 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2351 vnfd_id = vnfr["vnfd-id"]
2352 vnfd_ref = vnfr["vnfd-ref"]
2353 self.fs.sync(vnfd_id)
2354
2355 # if we haven't this vnfd, read it from db
2356 if vnfd_id not in db_vnfds:
2357 # read from db
2358 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2359 vnfd_id, vnfd_ref
2360 )
2361 self.logger.debug(logging_text + stage[1])
2362 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2363
2364 # store vnfd
2365 db_vnfds.append(vnfd)
2366
2367 # Get or generates the _admin.deployed.VCA list
2368 vca_deployed_list = None
2369 if db_nsr["_admin"].get("deployed"):
2370 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2371 if vca_deployed_list is None:
2372 vca_deployed_list = []
2373 configuration_status_list = []
2374 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2375 db_nsr_update["configurationStatus"] = configuration_status_list
2376 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2377 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2378 elif isinstance(vca_deployed_list, dict):
2379 # maintain backward compatibility. Change a dict to list at database
2380 vca_deployed_list = list(vca_deployed_list.values())
2381 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2382 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2383
2384 if not isinstance(
2385 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2386 ):
2387 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2388 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2389
2390 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2391 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2392 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2393 self.db.set_list(
2394 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2395 )
2396
2397 # n2vc_redesign STEP 2 Deploy Network Scenario
2398 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2399 self._write_op_status(op_id=nslcmop_id, stage=stage)
2400
2401 stage[1] = "Deploying KDUs."
2402 # self.logger.debug(logging_text + "Before deploy_kdus")
2403 # Call to deploy_kdus in case exists the "vdu:kdu" param
2404 await self.deploy_kdus(
2405 logging_text=logging_text,
2406 nsr_id=nsr_id,
2407 nslcmop_id=nslcmop_id,
2408 db_vnfrs=db_vnfrs,
2409 db_vnfds=db_vnfds,
2410 task_instantiation_info=tasks_dict_info,
2411 )
2412
2413 stage[1] = "Getting VCA public key."
2414 # n2vc_redesign STEP 1 Get VCA public ssh-key
2415 # feature 1429. Add n2vc public key to needed VMs
2416 n2vc_key = self.n2vc.get_public_key()
2417 n2vc_key_list = [n2vc_key]
2418 if self.vca_config.get("public_key"):
2419 n2vc_key_list.append(self.vca_config["public_key"])
2420
2421 stage[1] = "Deploying NS at VIM."
2422 task_ro = asyncio.ensure_future(
2423 self.instantiate_RO(
2424 logging_text=logging_text,
2425 nsr_id=nsr_id,
2426 nsd=nsd,
2427 db_nsr=db_nsr,
2428 db_nslcmop=db_nslcmop,
2429 db_vnfrs=db_vnfrs,
2430 db_vnfds=db_vnfds,
2431 n2vc_key_list=n2vc_key_list,
2432 stage=stage,
2433 )
2434 )
2435 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2436 tasks_dict_info[task_ro] = "Deploying at VIM"
2437
2438 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2439 stage[1] = "Deploying Execution Environments."
2440 self.logger.debug(logging_text + stage[1])
2441
2442 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2443 for vnf_profile in get_vnf_profiles(nsd):
2444 vnfd_id = vnf_profile["vnfd-id"]
2445 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2446 member_vnf_index = str(vnf_profile["id"])
2447 db_vnfr = db_vnfrs[member_vnf_index]
2448 base_folder = vnfd["_admin"]["storage"]
2449 vdu_id = None
2450 vdu_index = 0
2451 vdu_name = None
2452 kdu_name = None
2453
2454 # Get additional parameters
2455 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2456 if db_vnfr.get("additionalParamsForVnf"):
2457 deploy_params.update(
2458 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2459 )
2460
2461 descriptor_config = get_configuration(vnfd, vnfd["id"])
2462 if descriptor_config:
2463 self._deploy_n2vc(
2464 logging_text=logging_text
2465 + "member_vnf_index={} ".format(member_vnf_index),
2466 db_nsr=db_nsr,
2467 db_vnfr=db_vnfr,
2468 nslcmop_id=nslcmop_id,
2469 nsr_id=nsr_id,
2470 nsi_id=nsi_id,
2471 vnfd_id=vnfd_id,
2472 vdu_id=vdu_id,
2473 kdu_name=kdu_name,
2474 member_vnf_index=member_vnf_index,
2475 vdu_index=vdu_index,
2476 vdu_name=vdu_name,
2477 deploy_params=deploy_params,
2478 descriptor_config=descriptor_config,
2479 base_folder=base_folder,
2480 task_instantiation_info=tasks_dict_info,
2481 stage=stage,
2482 )
2483
2484 # Deploy charms for each VDU that supports one.
2485 for vdud in get_vdu_list(vnfd):
2486 vdu_id = vdud["id"]
2487 descriptor_config = get_configuration(vnfd, vdu_id)
2488 vdur = find_in_list(
2489 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2490 )
2491
2492 if vdur.get("additionalParams"):
2493 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2494 else:
2495 deploy_params_vdu = deploy_params
2496 deploy_params_vdu["OSM"] = get_osm_params(
2497 db_vnfr, vdu_id, vdu_count_index=0
2498 )
2499 vdud_count = get_number_of_instances(vnfd, vdu_id)
2500
2501 self.logger.debug("VDUD > {}".format(vdud))
2502 self.logger.debug(
2503 "Descriptor config > {}".format(descriptor_config)
2504 )
2505 if descriptor_config:
2506 vdu_name = None
2507 kdu_name = None
2508 for vdu_index in range(vdud_count):
2509 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2510 self._deploy_n2vc(
2511 logging_text=logging_text
2512 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2513 member_vnf_index, vdu_id, vdu_index
2514 ),
2515 db_nsr=db_nsr,
2516 db_vnfr=db_vnfr,
2517 nslcmop_id=nslcmop_id,
2518 nsr_id=nsr_id,
2519 nsi_id=nsi_id,
2520 vnfd_id=vnfd_id,
2521 vdu_id=vdu_id,
2522 kdu_name=kdu_name,
2523 member_vnf_index=member_vnf_index,
2524 vdu_index=vdu_index,
2525 vdu_name=vdu_name,
2526 deploy_params=deploy_params_vdu,
2527 descriptor_config=descriptor_config,
2528 base_folder=base_folder,
2529 task_instantiation_info=tasks_dict_info,
2530 stage=stage,
2531 )
2532 for kdud in get_kdu_list(vnfd):
2533 kdu_name = kdud["name"]
2534 descriptor_config = get_configuration(vnfd, kdu_name)
2535 if descriptor_config:
2536 vdu_id = None
2537 vdu_index = 0
2538 vdu_name = None
2539 kdur = next(
2540 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2541 )
2542 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2543 if kdur.get("additionalParams"):
2544 deploy_params_kdu.update(
2545 parse_yaml_strings(kdur["additionalParams"].copy())
2546 )
2547
2548 self._deploy_n2vc(
2549 logging_text=logging_text,
2550 db_nsr=db_nsr,
2551 db_vnfr=db_vnfr,
2552 nslcmop_id=nslcmop_id,
2553 nsr_id=nsr_id,
2554 nsi_id=nsi_id,
2555 vnfd_id=vnfd_id,
2556 vdu_id=vdu_id,
2557 kdu_name=kdu_name,
2558 member_vnf_index=member_vnf_index,
2559 vdu_index=vdu_index,
2560 vdu_name=vdu_name,
2561 deploy_params=deploy_params_kdu,
2562 descriptor_config=descriptor_config,
2563 base_folder=base_folder,
2564 task_instantiation_info=tasks_dict_info,
2565 stage=stage,
2566 )
2567
2568 # Check if this NS has a charm configuration
2569 descriptor_config = nsd.get("ns-configuration")
2570 if descriptor_config and descriptor_config.get("juju"):
2571 vnfd_id = None
2572 db_vnfr = None
2573 member_vnf_index = None
2574 vdu_id = None
2575 kdu_name = None
2576 vdu_index = 0
2577 vdu_name = None
2578
2579 # Get additional parameters
2580 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2581 if db_nsr.get("additionalParamsForNs"):
2582 deploy_params.update(
2583 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2584 )
2585 base_folder = nsd["_admin"]["storage"]
2586 self._deploy_n2vc(
2587 logging_text=logging_text,
2588 db_nsr=db_nsr,
2589 db_vnfr=db_vnfr,
2590 nslcmop_id=nslcmop_id,
2591 nsr_id=nsr_id,
2592 nsi_id=nsi_id,
2593 vnfd_id=vnfd_id,
2594 vdu_id=vdu_id,
2595 kdu_name=kdu_name,
2596 member_vnf_index=member_vnf_index,
2597 vdu_index=vdu_index,
2598 vdu_name=vdu_name,
2599 deploy_params=deploy_params,
2600 descriptor_config=descriptor_config,
2601 base_folder=base_folder,
2602 task_instantiation_info=tasks_dict_info,
2603 stage=stage,
2604 )
2605
2606 # rest of staff will be done at finally
2607
2608 except (
2609 ROclient.ROClientException,
2610 DbException,
2611 LcmException,
2612 N2VCException,
2613 ) as e:
2614 self.logger.error(
2615 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2616 )
2617 exc = e
2618 except asyncio.CancelledError:
2619 self.logger.error(
2620 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2621 )
2622 exc = "Operation was cancelled"
2623 except Exception as e:
2624 exc = traceback.format_exc()
2625 self.logger.critical(
2626 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2627 exc_info=True,
2628 )
2629 finally:
2630 if exc:
2631 error_list.append(str(exc))
2632 try:
2633 # wait for pending tasks
2634 if tasks_dict_info:
2635 stage[1] = "Waiting for instantiate pending tasks."
2636 self.logger.debug(logging_text + stage[1])
2637 error_list += await self._wait_for_tasks(
2638 logging_text,
2639 tasks_dict_info,
2640 timeout_ns_deploy,
2641 stage,
2642 nslcmop_id,
2643 nsr_id=nsr_id,
2644 )
2645 stage[1] = stage[2] = ""
2646 except asyncio.CancelledError:
2647 error_list.append("Cancelled")
2648 # TODO cancel all tasks
2649 except Exception as exc:
2650 error_list.append(str(exc))
2651
2652 # update operation-status
2653 db_nsr_update["operational-status"] = "running"
2654 # let's begin with VCA 'configured' status (later we can change it)
2655 db_nsr_update["config-status"] = "configured"
2656 for task, task_name in tasks_dict_info.items():
2657 if not task.done() or task.cancelled() or task.exception():
2658 if task_name.startswith(self.task_name_deploy_vca):
2659 # A N2VC task is pending
2660 db_nsr_update["config-status"] = "failed"
2661 else:
2662 # RO or KDU task is pending
2663 db_nsr_update["operational-status"] = "failed"
2664
2665 # update status at database
2666 if error_list:
2667 error_detail = ". ".join(error_list)
2668 self.logger.error(logging_text + error_detail)
2669 error_description_nslcmop = "{} Detail: {}".format(
2670 stage[0], error_detail
2671 )
2672 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2673 nslcmop_id, stage[0]
2674 )
2675
2676 db_nsr_update["detailed-status"] = (
2677 error_description_nsr + " Detail: " + error_detail
2678 )
2679 db_nslcmop_update["detailed-status"] = error_detail
2680 nslcmop_operation_state = "FAILED"
2681 ns_state = "BROKEN"
2682 else:
2683 error_detail = None
2684 error_description_nsr = error_description_nslcmop = None
2685 ns_state = "READY"
2686 db_nsr_update["detailed-status"] = "Done"
2687 db_nslcmop_update["detailed-status"] = "Done"
2688 nslcmop_operation_state = "COMPLETED"
2689
2690 if db_nsr:
2691 self._write_ns_status(
2692 nsr_id=nsr_id,
2693 ns_state=ns_state,
2694 current_operation="IDLE",
2695 current_operation_id=None,
2696 error_description=error_description_nsr,
2697 error_detail=error_detail,
2698 other_update=db_nsr_update,
2699 )
2700 self._write_op_status(
2701 op_id=nslcmop_id,
2702 stage="",
2703 error_message=error_description_nslcmop,
2704 operation_state=nslcmop_operation_state,
2705 other_update=db_nslcmop_update,
2706 )
2707
2708 if nslcmop_operation_state:
2709 try:
2710 await self.msg.aiowrite(
2711 "ns",
2712 "instantiated",
2713 {
2714 "nsr_id": nsr_id,
2715 "nslcmop_id": nslcmop_id,
2716 "operationState": nslcmop_operation_state,
2717 },
2718 loop=self.loop,
2719 )
2720 except Exception as e:
2721 self.logger.error(
2722 logging_text + "kafka_write notification Exception {}".format(e)
2723 )
2724
2725 self.logger.debug(logging_text + "Exit")
2726 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2727
2728 async def _add_vca_relations(
2729 self,
2730 logging_text,
2731 nsr_id,
2732 vca_index: int,
2733 timeout: int = 3600,
2734 vca_type: str = None,
2735 vca_id: str = None,
2736 ) -> bool:
2737
2738 # steps:
2739 # 1. find all relations for this VCA
2740 # 2. wait for other peers related
2741 # 3. add relations
2742
2743 try:
2744 vca_type = vca_type or "lxc_proxy_charm"
2745
2746 # STEP 1: find all relations for this VCA
2747
2748 # read nsr record
2749 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2750 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2751
2752 # this VCA data
2753 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2754
2755 # read all ns-configuration relations
2756 ns_relations = list()
2757 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2758 if db_ns_relations:
2759 for r in db_ns_relations:
2760 # check if this VCA is in the relation
2761 if my_vca.get("member-vnf-index") in (
2762 r.get("entities")[0].get("id"),
2763 r.get("entities")[1].get("id"),
2764 ):
2765 ns_relations.append(r)
2766
2767 # read all vnf-configuration relations
2768 vnf_relations = list()
2769 db_vnfd_list = db_nsr.get("vnfd-id")
2770 if db_vnfd_list:
2771 for vnfd in db_vnfd_list:
2772 db_vnf_relations = None
2773 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2774 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2775 if db_vnf_configuration:
2776 db_vnf_relations = db_vnf_configuration.get("relation", [])
2777 if db_vnf_relations:
2778 for r in db_vnf_relations:
2779 # check if this VCA is in the relation
2780 if my_vca.get("vdu_id") in (
2781 r.get("entities")[0].get("id"),
2782 r.get("entities")[1].get("id"),
2783 ):
2784 vnf_relations.append(r)
2785
2786 # if no relations, terminate
2787 if not ns_relations and not vnf_relations:
2788 self.logger.debug(logging_text + " No relations")
2789 return True
2790
2791 self.logger.debug(
2792 logging_text
2793 + " adding relations\n {}\n {}".format(
2794 ns_relations, vnf_relations
2795 )
2796 )
2797
2798 # add all relations
2799 start = time()
2800 while True:
2801 # check timeout
2802 now = time()
2803 if now - start >= timeout:
2804 self.logger.error(logging_text + " : timeout adding relations")
2805 return False
2806
2807 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2808 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2809
2810 # for each defined NS relation, find the VCA's related
2811 for r in ns_relations.copy():
2812 from_vca_ee_id = None
2813 to_vca_ee_id = None
2814 from_vca_endpoint = None
2815 to_vca_endpoint = None
2816 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2817 for vca in vca_list:
2818 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2819 "id"
2820 ) and vca.get("config_sw_installed"):
2821 from_vca_ee_id = vca.get("ee_id")
2822 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2823 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2824 "id"
2825 ) and vca.get("config_sw_installed"):
2826 to_vca_ee_id = vca.get("ee_id")
2827 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2828 if from_vca_ee_id and to_vca_ee_id:
2829 # add relation
2830 await self.vca_map[vca_type].add_relation(
2831 ee_id_1=from_vca_ee_id,
2832 ee_id_2=to_vca_ee_id,
2833 endpoint_1=from_vca_endpoint,
2834 endpoint_2=to_vca_endpoint,
2835 vca_id=vca_id,
2836 )
2837 # remove entry from relations list
2838 ns_relations.remove(r)
2839 else:
2840 # check failed peers
2841 try:
2842 vca_status_list = db_nsr.get("configurationStatus")
2843 if vca_status_list:
2844 for i in range(len(vca_list)):
2845 vca = vca_list[i]
2846 vca_status = vca_status_list[i]
2847 if vca.get("member-vnf-index") == r.get("entities")[
2848 0
2849 ].get("id"):
2850 if vca_status.get("status") == "BROKEN":
2851 # peer broken: remove relation from list
2852 ns_relations.remove(r)
2853 if vca.get("member-vnf-index") == r.get("entities")[
2854 1
2855 ].get("id"):
2856 if vca_status.get("status") == "BROKEN":
2857 # peer broken: remove relation from list
2858 ns_relations.remove(r)
2859 except Exception:
2860 # ignore
2861 pass
2862
2863 # for each defined VNF relation, find the VCA's related
2864 for r in vnf_relations.copy():
2865 from_vca_ee_id = None
2866 to_vca_ee_id = None
2867 from_vca_endpoint = None
2868 to_vca_endpoint = None
2869 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2870 for vca in vca_list:
2871 key_to_check = "vdu_id"
2872 if vca.get("vdu_id") is None:
2873 key_to_check = "vnfd_id"
2874 if vca.get(key_to_check) == r.get("entities")[0].get(
2875 "id"
2876 ) and vca.get("config_sw_installed"):
2877 from_vca_ee_id = vca.get("ee_id")
2878 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2879 if vca.get(key_to_check) == r.get("entities")[1].get(
2880 "id"
2881 ) and vca.get("config_sw_installed"):
2882 to_vca_ee_id = vca.get("ee_id")
2883 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2884 if from_vca_ee_id and to_vca_ee_id:
2885 # add relation
2886 await self.vca_map[vca_type].add_relation(
2887 ee_id_1=from_vca_ee_id,
2888 ee_id_2=to_vca_ee_id,
2889 endpoint_1=from_vca_endpoint,
2890 endpoint_2=to_vca_endpoint,
2891 vca_id=vca_id,
2892 )
2893 # remove entry from relations list
2894 vnf_relations.remove(r)
2895 else:
2896 # check failed peers
2897 try:
2898 vca_status_list = db_nsr.get("configurationStatus")
2899 if vca_status_list:
2900 for i in range(len(vca_list)):
2901 vca = vca_list[i]
2902 vca_status = vca_status_list[i]
2903 if vca.get("vdu_id") == r.get("entities")[0].get(
2904 "id"
2905 ):
2906 if vca_status.get("status") == "BROKEN":
2907 # peer broken: remove relation from list
2908 vnf_relations.remove(r)
2909 if vca.get("vdu_id") == r.get("entities")[1].get(
2910 "id"
2911 ):
2912 if vca_status.get("status") == "BROKEN":
2913 # peer broken: remove relation from list
2914 vnf_relations.remove(r)
2915 except Exception:
2916 # ignore
2917 pass
2918
2919 # wait for next try
2920 await asyncio.sleep(5.0)
2921
2922 if not ns_relations and not vnf_relations:
2923 self.logger.debug("Relations added")
2924 break
2925
2926 return True
2927
2928 except Exception as e:
2929 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2930 return False
2931
2932 async def _install_kdu(
2933 self,
2934 nsr_id: str,
2935 nsr_db_path: str,
2936 vnfr_data: dict,
2937 kdu_index: int,
2938 kdud: dict,
2939 vnfd: dict,
2940 k8s_instance_info: dict,
2941 k8params: dict = None,
2942 timeout: int = 600,
2943 vca_id: str = None,
2944 ):
2945
2946 try:
2947 k8sclustertype = k8s_instance_info["k8scluster-type"]
2948 # Instantiate kdu
2949 db_dict_install = {
2950 "collection": "nsrs",
2951 "filter": {"_id": nsr_id},
2952 "path": nsr_db_path,
2953 }
2954
2955 if k8s_instance_info.get("kdu-deployment-name"):
2956 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2957 else:
2958 kdu_instance = self.k8scluster_map[
2959 k8sclustertype
2960 ].generate_kdu_instance_name(
2961 db_dict=db_dict_install,
2962 kdu_model=k8s_instance_info["kdu-model"],
2963 kdu_name=k8s_instance_info["kdu-name"],
2964 )
2965
2966 # Update the nsrs table with the kdu-instance value
2967 self.update_db_2(
2968 item="nsrs",
2969 _id=nsr_id,
2970 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
2971 )
2972
2973 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
2974 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
2975 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
2976 # namespace, this first verification could be removed, and the next step would be done for any kind
2977 # of KNF.
2978 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
2979 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
2980 if k8sclustertype in ("juju", "juju-bundle"):
2981 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
2982 # that the user passed a namespace which he wants its KDU to be deployed in)
2983 if (
2984 self.db.count(
2985 table="nsrs",
2986 q_filter={
2987 "_id": nsr_id,
2988 "_admin.projects_write": k8s_instance_info["namespace"],
2989 "_admin.projects_read": k8s_instance_info["namespace"],
2990 },
2991 )
2992 > 0
2993 ):
2994 self.logger.debug(
2995 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
2996 )
2997 self.update_db_2(
2998 item="nsrs",
2999 _id=nsr_id,
3000 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3001 )
3002 k8s_instance_info["namespace"] = kdu_instance
3003
3004 await self.k8scluster_map[k8sclustertype].install(
3005 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3006 kdu_model=k8s_instance_info["kdu-model"],
3007 atomic=True,
3008 params=k8params,
3009 db_dict=db_dict_install,
3010 timeout=timeout,
3011 kdu_name=k8s_instance_info["kdu-name"],
3012 namespace=k8s_instance_info["namespace"],
3013 kdu_instance=kdu_instance,
3014 vca_id=vca_id,
3015 )
3016
3017 # Obtain services to obtain management service ip
3018 services = await self.k8scluster_map[k8sclustertype].get_services(
3019 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3020 kdu_instance=kdu_instance,
3021 namespace=k8s_instance_info["namespace"],
3022 )
3023
3024 # Obtain management service info (if exists)
3025 vnfr_update_dict = {}
3026 kdu_config = get_configuration(vnfd, kdud["name"])
3027 if kdu_config:
3028 target_ee_list = kdu_config.get("execution-environment-list", [])
3029 else:
3030 target_ee_list = []
3031
3032 if services:
3033 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3034 mgmt_services = [
3035 service
3036 for service in kdud.get("service", [])
3037 if service.get("mgmt-service")
3038 ]
3039 for mgmt_service in mgmt_services:
3040 for service in services:
3041 if service["name"].startswith(mgmt_service["name"]):
3042 # Mgmt service found, Obtain service ip
3043 ip = service.get("external_ip", service.get("cluster_ip"))
3044 if isinstance(ip, list) and len(ip) == 1:
3045 ip = ip[0]
3046
3047 vnfr_update_dict[
3048 "kdur.{}.ip-address".format(kdu_index)
3049 ] = ip
3050
3051 # Check if must update also mgmt ip at the vnf
3052 service_external_cp = mgmt_service.get(
3053 "external-connection-point-ref"
3054 )
3055 if service_external_cp:
3056 if (
3057 deep_get(vnfd, ("mgmt-interface", "cp"))
3058 == service_external_cp
3059 ):
3060 vnfr_update_dict["ip-address"] = ip
3061
3062 if find_in_list(
3063 target_ee_list,
3064 lambda ee: ee.get(
3065 "external-connection-point-ref", ""
3066 )
3067 == service_external_cp,
3068 ):
3069 vnfr_update_dict[
3070 "kdur.{}.ip-address".format(kdu_index)
3071 ] = ip
3072 break
3073 else:
3074 self.logger.warn(
3075 "Mgmt service name: {} not found".format(
3076 mgmt_service["name"]
3077 )
3078 )
3079
3080 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3081 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3082
3083 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3084 if (
3085 kdu_config
3086 and kdu_config.get("initial-config-primitive")
3087 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3088 ):
3089 initial_config_primitive_list = kdu_config.get(
3090 "initial-config-primitive"
3091 )
3092 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3093
3094 for initial_config_primitive in initial_config_primitive_list:
3095 primitive_params_ = self._map_primitive_params(
3096 initial_config_primitive, {}, {}
3097 )
3098
3099 await asyncio.wait_for(
3100 self.k8scluster_map[k8sclustertype].exec_primitive(
3101 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3102 kdu_instance=kdu_instance,
3103 primitive_name=initial_config_primitive["name"],
3104 params=primitive_params_,
3105 db_dict=db_dict_install,
3106 vca_id=vca_id,
3107 ),
3108 timeout=timeout,
3109 )
3110
3111 except Exception as e:
3112 # Prepare update db with error and raise exception
3113 try:
3114 self.update_db_2(
3115 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3116 )
3117 self.update_db_2(
3118 "vnfrs",
3119 vnfr_data.get("_id"),
3120 {"kdur.{}.status".format(kdu_index): "ERROR"},
3121 )
3122 except Exception:
3123 # ignore to keep original exception
3124 pass
3125 # reraise original error
3126 raise
3127
3128 return kdu_instance
3129
3130 async def deploy_kdus(
3131 self,
3132 logging_text,
3133 nsr_id,
3134 nslcmop_id,
3135 db_vnfrs,
3136 db_vnfds,
3137 task_instantiation_info,
3138 ):
3139 # Launch kdus if present in the descriptor
3140
3141 k8scluster_id_2_uuic = {
3142 "helm-chart-v3": {},
3143 "helm-chart": {},
3144 "juju-bundle": {},
3145 }
3146
3147 async def _get_cluster_id(cluster_id, cluster_type):
3148 nonlocal k8scluster_id_2_uuic
3149 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3150 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3151
3152 # check if K8scluster is creating and wait look if previous tasks in process
3153 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3154 "k8scluster", cluster_id
3155 )
3156 if task_dependency:
3157 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3158 task_name, cluster_id
3159 )
3160 self.logger.debug(logging_text + text)
3161 await asyncio.wait(task_dependency, timeout=3600)
3162
3163 db_k8scluster = self.db.get_one(
3164 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3165 )
3166 if not db_k8scluster:
3167 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3168
3169 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3170 if not k8s_id:
3171 if cluster_type == "helm-chart-v3":
3172 try:
3173 # backward compatibility for existing clusters that have not been initialized for helm v3
3174 k8s_credentials = yaml.safe_dump(
3175 db_k8scluster.get("credentials")
3176 )
3177 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3178 k8s_credentials, reuse_cluster_uuid=cluster_id
3179 )
3180 db_k8scluster_update = {}
3181 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3182 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3183 db_k8scluster_update[
3184 "_admin.helm-chart-v3.created"
3185 ] = uninstall_sw
3186 db_k8scluster_update[
3187 "_admin.helm-chart-v3.operationalState"
3188 ] = "ENABLED"
3189 self.update_db_2(
3190 "k8sclusters", cluster_id, db_k8scluster_update
3191 )
3192 except Exception as e:
3193 self.logger.error(
3194 logging_text
3195 + "error initializing helm-v3 cluster: {}".format(str(e))
3196 )
3197 raise LcmException(
3198 "K8s cluster '{}' has not been initialized for '{}'".format(
3199 cluster_id, cluster_type
3200 )
3201 )
3202 else:
3203 raise LcmException(
3204 "K8s cluster '{}' has not been initialized for '{}'".format(
3205 cluster_id, cluster_type
3206 )
3207 )
3208 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3209 return k8s_id
3210
3211 logging_text += "Deploy kdus: "
3212 step = ""
3213 try:
3214 db_nsr_update = {"_admin.deployed.K8s": []}
3215 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3216
3217 index = 0
3218 updated_cluster_list = []
3219 updated_v3_cluster_list = []
3220
3221 for vnfr_data in db_vnfrs.values():
3222 vca_id = self.get_vca_id(vnfr_data, {})
3223 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3224 # Step 0: Prepare and set parameters
3225 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3226 vnfd_id = vnfr_data.get("vnfd-id")
3227 vnfd_with_id = find_in_list(
3228 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3229 )
3230 kdud = next(
3231 kdud
3232 for kdud in vnfd_with_id["kdu"]
3233 if kdud["name"] == kdur["kdu-name"]
3234 )
3235 namespace = kdur.get("k8s-namespace")
3236 kdu_deployment_name = kdur.get("kdu-deployment-name")
3237 if kdur.get("helm-chart"):
3238 kdumodel = kdur["helm-chart"]
3239 # Default version: helm3, if helm-version is v2 assign v2
3240 k8sclustertype = "helm-chart-v3"
3241 self.logger.debug("kdur: {}".format(kdur))
3242 if (
3243 kdur.get("helm-version")
3244 and kdur.get("helm-version") == "v2"
3245 ):
3246 k8sclustertype = "helm-chart"
3247 elif kdur.get("juju-bundle"):
3248 kdumodel = kdur["juju-bundle"]
3249 k8sclustertype = "juju-bundle"
3250 else:
3251 raise LcmException(
3252 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3253 "juju-bundle. Maybe an old NBI version is running".format(
3254 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3255 )
3256 )
3257 # check if kdumodel is a file and exists
3258 try:
3259 vnfd_with_id = find_in_list(
3260 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3261 )
3262 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3263 if storage and storage.get(
3264 "pkg-dir"
3265 ): # may be not present if vnfd has not artifacts
3266 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3267 filename = "{}/{}/{}s/{}".format(
3268 storage["folder"],
3269 storage["pkg-dir"],
3270 k8sclustertype,
3271 kdumodel,
3272 )
3273 if self.fs.file_exists(
3274 filename, mode="file"
3275 ) or self.fs.file_exists(filename, mode="dir"):
3276 kdumodel = self.fs.path + filename
3277 except (asyncio.TimeoutError, asyncio.CancelledError):
3278 raise
3279 except Exception: # it is not a file
3280 pass
3281
3282 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3283 step = "Synchronize repos for k8s cluster '{}'".format(
3284 k8s_cluster_id
3285 )
3286 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3287
3288 # Synchronize repos
3289 if (
3290 k8sclustertype == "helm-chart"
3291 and cluster_uuid not in updated_cluster_list
3292 ) or (
3293 k8sclustertype == "helm-chart-v3"
3294 and cluster_uuid not in updated_v3_cluster_list
3295 ):
3296 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3297 self.k8scluster_map[k8sclustertype].synchronize_repos(
3298 cluster_uuid=cluster_uuid
3299 )
3300 )
3301 if del_repo_list or added_repo_dict:
3302 if k8sclustertype == "helm-chart":
3303 unset = {
3304 "_admin.helm_charts_added." + item: None
3305 for item in del_repo_list
3306 }
3307 updated = {
3308 "_admin.helm_charts_added." + item: name
3309 for item, name in added_repo_dict.items()
3310 }
3311 updated_cluster_list.append(cluster_uuid)
3312 elif k8sclustertype == "helm-chart-v3":
3313 unset = {
3314 "_admin.helm_charts_v3_added." + item: None
3315 for item in del_repo_list
3316 }
3317 updated = {
3318 "_admin.helm_charts_v3_added." + item: name
3319 for item, name in added_repo_dict.items()
3320 }
3321 updated_v3_cluster_list.append(cluster_uuid)
3322 self.logger.debug(
3323 logging_text + "repos synchronized on k8s cluster "
3324 "'{}' to_delete: {}, to_add: {}".format(
3325 k8s_cluster_id, del_repo_list, added_repo_dict
3326 )
3327 )
3328 self.db.set_one(
3329 "k8sclusters",
3330 {"_id": k8s_cluster_id},
3331 updated,
3332 unset=unset,
3333 )
3334
3335 # Instantiate kdu
3336 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3337 vnfr_data["member-vnf-index-ref"],
3338 kdur["kdu-name"],
3339 k8s_cluster_id,
3340 )
3341 k8s_instance_info = {
3342 "kdu-instance": None,
3343 "k8scluster-uuid": cluster_uuid,
3344 "k8scluster-type": k8sclustertype,
3345 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3346 "kdu-name": kdur["kdu-name"],
3347 "kdu-model": kdumodel,
3348 "namespace": namespace,
3349 "kdu-deployment-name": kdu_deployment_name,
3350 }
3351 db_path = "_admin.deployed.K8s.{}".format(index)
3352 db_nsr_update[db_path] = k8s_instance_info
3353 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3354 vnfd_with_id = find_in_list(
3355 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3356 )
3357 task = asyncio.ensure_future(
3358 self._install_kdu(
3359 nsr_id,
3360 db_path,
3361 vnfr_data,
3362 kdu_index,
3363 kdud,
3364 vnfd_with_id,
3365 k8s_instance_info,
3366 k8params=desc_params,
3367 timeout=1800,
3368 vca_id=vca_id,
3369 )
3370 )
3371 self.lcm_tasks.register(
3372 "ns",
3373 nsr_id,
3374 nslcmop_id,
3375 "instantiate_KDU-{}".format(index),
3376 task,
3377 )
3378 task_instantiation_info[task] = "Deploying KDU {}".format(
3379 kdur["kdu-name"]
3380 )
3381
3382 index += 1
3383
3384 except (LcmException, asyncio.CancelledError):
3385 raise
3386 except Exception as e:
3387 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3388 if isinstance(e, (N2VCException, DbException)):
3389 self.logger.error(logging_text + msg)
3390 else:
3391 self.logger.critical(logging_text + msg, exc_info=True)
3392 raise LcmException(msg)
3393 finally:
3394 if db_nsr_update:
3395 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3396
3397 def _deploy_n2vc(
3398 self,
3399 logging_text,
3400 db_nsr,
3401 db_vnfr,
3402 nslcmop_id,
3403 nsr_id,
3404 nsi_id,
3405 vnfd_id,
3406 vdu_id,
3407 kdu_name,
3408 member_vnf_index,
3409 vdu_index,
3410 vdu_name,
3411 deploy_params,
3412 descriptor_config,
3413 base_folder,
3414 task_instantiation_info,
3415 stage,
3416 ):
3417 # launch instantiate_N2VC in a asyncio task and register task object
3418 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3419 # if not found, create one entry and update database
3420 # fill db_nsr._admin.deployed.VCA.<index>
3421
3422 self.logger.debug(
3423 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3424 )
3425 if "execution-environment-list" in descriptor_config:
3426 ee_list = descriptor_config.get("execution-environment-list", [])
3427 elif "juju" in descriptor_config:
3428 ee_list = [descriptor_config] # ns charms
3429 else: # other types as script are not supported
3430 ee_list = []
3431
3432 for ee_item in ee_list:
3433 self.logger.debug(
3434 logging_text
3435 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3436 ee_item.get("juju"), ee_item.get("helm-chart")
3437 )
3438 )
3439 ee_descriptor_id = ee_item.get("id")
3440 if ee_item.get("juju"):
3441 vca_name = ee_item["juju"].get("charm")
3442 vca_type = (
3443 "lxc_proxy_charm"
3444 if ee_item["juju"].get("charm") is not None
3445 else "native_charm"
3446 )
3447 if ee_item["juju"].get("cloud") == "k8s":
3448 vca_type = "k8s_proxy_charm"
3449 elif ee_item["juju"].get("proxy") is False:
3450 vca_type = "native_charm"
3451 elif ee_item.get("helm-chart"):
3452 vca_name = ee_item["helm-chart"]
3453 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3454 vca_type = "helm"
3455 else:
3456 vca_type = "helm-v3"
3457 else:
3458 self.logger.debug(
3459 logging_text + "skipping non juju neither charm configuration"
3460 )
3461 continue
3462
3463 vca_index = -1
3464 for vca_index, vca_deployed in enumerate(
3465 db_nsr["_admin"]["deployed"]["VCA"]
3466 ):
3467 if not vca_deployed:
3468 continue
3469 if (
3470 vca_deployed.get("member-vnf-index") == member_vnf_index
3471 and vca_deployed.get("vdu_id") == vdu_id
3472 and vca_deployed.get("kdu_name") == kdu_name
3473 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3474 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3475 ):
3476 break
3477 else:
3478 # not found, create one.
3479 target = (
3480 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3481 )
3482 if vdu_id:
3483 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3484 elif kdu_name:
3485 target += "/kdu/{}".format(kdu_name)
3486 vca_deployed = {
3487 "target_element": target,
3488 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3489 "member-vnf-index": member_vnf_index,
3490 "vdu_id": vdu_id,
3491 "kdu_name": kdu_name,
3492 "vdu_count_index": vdu_index,
3493 "operational-status": "init", # TODO revise
3494 "detailed-status": "", # TODO revise
3495 "step": "initial-deploy", # TODO revise
3496 "vnfd_id": vnfd_id,
3497 "vdu_name": vdu_name,
3498 "type": vca_type,
3499 "ee_descriptor_id": ee_descriptor_id,
3500 }
3501 vca_index += 1
3502
3503 # create VCA and configurationStatus in db
3504 db_dict = {
3505 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3506 "configurationStatus.{}".format(vca_index): dict(),
3507 }
3508 self.update_db_2("nsrs", nsr_id, db_dict)
3509
3510 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3511
3512 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3513 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3514 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3515
3516 # Launch task
3517 task_n2vc = asyncio.ensure_future(
3518 self.instantiate_N2VC(
3519 logging_text=logging_text,
3520 vca_index=vca_index,
3521 nsi_id=nsi_id,
3522 db_nsr=db_nsr,
3523 db_vnfr=db_vnfr,
3524 vdu_id=vdu_id,
3525 kdu_name=kdu_name,
3526 vdu_index=vdu_index,
3527 deploy_params=deploy_params,
3528 config_descriptor=descriptor_config,
3529 base_folder=base_folder,
3530 nslcmop_id=nslcmop_id,
3531 stage=stage,
3532 vca_type=vca_type,
3533 vca_name=vca_name,
3534 ee_config_descriptor=ee_item,
3535 )
3536 )
3537 self.lcm_tasks.register(
3538 "ns",
3539 nsr_id,
3540 nslcmop_id,
3541 "instantiate_N2VC-{}".format(vca_index),
3542 task_n2vc,
3543 )
3544 task_instantiation_info[
3545 task_n2vc
3546 ] = self.task_name_deploy_vca + " {}.{}".format(
3547 member_vnf_index or "", vdu_id or ""
3548 )
3549
3550 @staticmethod
3551 def _create_nslcmop(nsr_id, operation, params):
3552 """
3553 Creates a ns-lcm-opp content to be stored at database.
3554 :param nsr_id: internal id of the instance
3555 :param operation: instantiate, terminate, scale, action, ...
3556 :param params: user parameters for the operation
3557 :return: dictionary following SOL005 format
3558 """
3559 # Raise exception if invalid arguments
3560 if not (nsr_id and operation and params):
3561 raise LcmException(
3562 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3563 )
3564 now = time()
3565 _id = str(uuid4())
3566 nslcmop = {
3567 "id": _id,
3568 "_id": _id,
3569 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3570 "operationState": "PROCESSING",
3571 "statusEnteredTime": now,
3572 "nsInstanceId": nsr_id,
3573 "lcmOperationType": operation,
3574 "startTime": now,
3575 "isAutomaticInvocation": False,
3576 "operationParams": params,
3577 "isCancelPending": False,
3578 "links": {
3579 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3580 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3581 },
3582 }
3583 return nslcmop
3584
3585 def _format_additional_params(self, params):
3586 params = params or {}
3587 for key, value in params.items():
3588 if str(value).startswith("!!yaml "):
3589 params[key] = yaml.safe_load(value[7:])
3590 return params
3591
3592 def _get_terminate_primitive_params(self, seq, vnf_index):
3593 primitive = seq.get("name")
3594 primitive_params = {}
3595 params = {
3596 "member_vnf_index": vnf_index,
3597 "primitive": primitive,
3598 "primitive_params": primitive_params,
3599 }
3600 desc_params = {}
3601 return self._map_primitive_params(seq, params, desc_params)
3602
3603 # sub-operations
3604
3605 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3606 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3607 if op.get("operationState") == "COMPLETED":
3608 # b. Skip sub-operation
3609 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3610 return self.SUBOPERATION_STATUS_SKIP
3611 else:
3612 # c. retry executing sub-operation
3613 # The sub-operation exists, and operationState != 'COMPLETED'
3614 # Update operationState = 'PROCESSING' to indicate a retry.
3615 operationState = "PROCESSING"
3616 detailed_status = "In progress"
3617 self._update_suboperation_status(
3618 db_nslcmop, op_index, operationState, detailed_status
3619 )
3620 # Return the sub-operation index
3621 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3622 # with arguments extracted from the sub-operation
3623 return op_index
3624
3625 # Find a sub-operation where all keys in a matching dictionary must match
3626 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3627 def _find_suboperation(self, db_nslcmop, match):
3628 if db_nslcmop and match:
3629 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3630 for i, op in enumerate(op_list):
3631 if all(op.get(k) == match[k] for k in match):
3632 return i
3633 return self.SUBOPERATION_STATUS_NOT_FOUND
3634
3635 # Update status for a sub-operation given its index
3636 def _update_suboperation_status(
3637 self, db_nslcmop, op_index, operationState, detailed_status
3638 ):
3639 # Update DB for HA tasks
3640 q_filter = {"_id": db_nslcmop["_id"]}
3641 update_dict = {
3642 "_admin.operations.{}.operationState".format(op_index): operationState,
3643 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3644 }
3645 self.db.set_one(
3646 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3647 )
3648
3649 # Add sub-operation, return the index of the added sub-operation
3650 # Optionally, set operationState, detailed-status, and operationType
3651 # Status and type are currently set for 'scale' sub-operations:
3652 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3653 # 'detailed-status' : status message
3654 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3655 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3656 def _add_suboperation(
3657 self,
3658 db_nslcmop,
3659 vnf_index,
3660 vdu_id,
3661 vdu_count_index,
3662 vdu_name,
3663 primitive,
3664 mapped_primitive_params,
3665 operationState=None,
3666 detailed_status=None,
3667 operationType=None,
3668 RO_nsr_id=None,
3669 RO_scaling_info=None,
3670 ):
3671 if not db_nslcmop:
3672 return self.SUBOPERATION_STATUS_NOT_FOUND
3673 # Get the "_admin.operations" list, if it exists
3674 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3675 op_list = db_nslcmop_admin.get("operations")
3676 # Create or append to the "_admin.operations" list
3677 new_op = {
3678 "member_vnf_index": vnf_index,
3679 "vdu_id": vdu_id,
3680 "vdu_count_index": vdu_count_index,
3681 "primitive": primitive,
3682 "primitive_params": mapped_primitive_params,
3683 }
3684 if operationState:
3685 new_op["operationState"] = operationState
3686 if detailed_status:
3687 new_op["detailed-status"] = detailed_status
3688 if operationType:
3689 new_op["lcmOperationType"] = operationType
3690 if RO_nsr_id:
3691 new_op["RO_nsr_id"] = RO_nsr_id
3692 if RO_scaling_info:
3693 new_op["RO_scaling_info"] = RO_scaling_info
3694 if not op_list:
3695 # No existing operations, create key 'operations' with current operation as first list element
3696 db_nslcmop_admin.update({"operations": [new_op]})
3697 op_list = db_nslcmop_admin.get("operations")
3698 else:
3699 # Existing operations, append operation to list
3700 op_list.append(new_op)
3701
3702 db_nslcmop_update = {"_admin.operations": op_list}
3703 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3704 op_index = len(op_list) - 1
3705 return op_index
3706
3707 # Helper methods for scale() sub-operations
3708
3709 # pre-scale/post-scale:
3710 # Check for 3 different cases:
3711 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3712 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3713 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3714 def _check_or_add_scale_suboperation(
3715 self,
3716 db_nslcmop,
3717 vnf_index,
3718 vnf_config_primitive,
3719 primitive_params,
3720 operationType,
3721 RO_nsr_id=None,
3722 RO_scaling_info=None,
3723 ):
3724 # Find this sub-operation
3725 if RO_nsr_id and RO_scaling_info:
3726 operationType = "SCALE-RO"
3727 match = {
3728 "member_vnf_index": vnf_index,
3729 "RO_nsr_id": RO_nsr_id,
3730 "RO_scaling_info": RO_scaling_info,
3731 }
3732 else:
3733 match = {
3734 "member_vnf_index": vnf_index,
3735 "primitive": vnf_config_primitive,
3736 "primitive_params": primitive_params,
3737 "lcmOperationType": operationType,
3738 }
3739 op_index = self._find_suboperation(db_nslcmop, match)
3740 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3741 # a. New sub-operation
3742 # The sub-operation does not exist, add it.
3743 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3744 # The following parameters are set to None for all kind of scaling:
3745 vdu_id = None
3746 vdu_count_index = None
3747 vdu_name = None
3748 if RO_nsr_id and RO_scaling_info:
3749 vnf_config_primitive = None
3750 primitive_params = None
3751 else:
3752 RO_nsr_id = None
3753 RO_scaling_info = None
3754 # Initial status for sub-operation
3755 operationState = "PROCESSING"
3756 detailed_status = "In progress"
3757 # Add sub-operation for pre/post-scaling (zero or more operations)
3758 self._add_suboperation(
3759 db_nslcmop,
3760 vnf_index,
3761 vdu_id,
3762 vdu_count_index,
3763 vdu_name,
3764 vnf_config_primitive,
3765 primitive_params,
3766 operationState,
3767 detailed_status,
3768 operationType,
3769 RO_nsr_id,
3770 RO_scaling_info,
3771 )
3772 return self.SUBOPERATION_STATUS_NEW
3773 else:
3774 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3775 # or op_index (operationState != 'COMPLETED')
3776 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3777
3778 # Function to return execution_environment id
3779
3780 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3781 # TODO vdu_index_count
3782 for vca in vca_deployed_list:
3783 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3784 return vca["ee_id"]
3785
3786 async def destroy_N2VC(
3787 self,
3788 logging_text,
3789 db_nslcmop,
3790 vca_deployed,
3791 config_descriptor,
3792 vca_index,
3793 destroy_ee=True,
3794 exec_primitives=True,
3795 scaling_in=False,
3796 vca_id: str = None,
3797 ):
3798 """
3799 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3800 :param logging_text:
3801 :param db_nslcmop:
3802 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3803 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3804 :param vca_index: index in the database _admin.deployed.VCA
3805 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3806 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3807 not executed properly
3808 :param scaling_in: True destroys the application, False destroys the model
3809 :return: None or exception
3810 """
3811
3812 self.logger.debug(
3813 logging_text
3814 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3815 vca_index, vca_deployed, config_descriptor, destroy_ee
3816 )
3817 )
3818
3819 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3820
3821 # execute terminate_primitives
3822 if exec_primitives:
3823 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3824 config_descriptor.get("terminate-config-primitive"),
3825 vca_deployed.get("ee_descriptor_id"),
3826 )
3827 vdu_id = vca_deployed.get("vdu_id")
3828 vdu_count_index = vca_deployed.get("vdu_count_index")
3829 vdu_name = vca_deployed.get("vdu_name")
3830 vnf_index = vca_deployed.get("member-vnf-index")
3831 if terminate_primitives and vca_deployed.get("needed_terminate"):
3832 for seq in terminate_primitives:
3833 # For each sequence in list, get primitive and call _ns_execute_primitive()
3834 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3835 vnf_index, seq.get("name")
3836 )
3837 self.logger.debug(logging_text + step)
3838 # Create the primitive for each sequence, i.e. "primitive": "touch"
3839 primitive = seq.get("name")
3840 mapped_primitive_params = self._get_terminate_primitive_params(
3841 seq, vnf_index
3842 )
3843
3844 # Add sub-operation
3845 self._add_suboperation(
3846 db_nslcmop,
3847 vnf_index,
3848 vdu_id,
3849 vdu_count_index,
3850 vdu_name,
3851 primitive,
3852 mapped_primitive_params,
3853 )
3854 # Sub-operations: Call _ns_execute_primitive() instead of action()
3855 try:
3856 result, result_detail = await self._ns_execute_primitive(
3857 vca_deployed["ee_id"],
3858 primitive,
3859 mapped_primitive_params,
3860 vca_type=vca_type,
3861 vca_id=vca_id,
3862 )
3863 except LcmException:
3864 # this happens when VCA is not deployed. In this case it is not needed to terminate
3865 continue
3866 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3867 if result not in result_ok:
3868 raise LcmException(
3869 "terminate_primitive {} for vnf_member_index={} fails with "
3870 "error {}".format(seq.get("name"), vnf_index, result_detail)
3871 )
3872 # set that this VCA do not need terminated
3873 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3874 vca_index
3875 )
3876 self.update_db_2(
3877 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3878 )
3879
3880 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3881 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3882
3883 if destroy_ee:
3884 await self.vca_map[vca_type].delete_execution_environment(
3885 vca_deployed["ee_id"],
3886 scaling_in=scaling_in,
3887 vca_type=vca_type,
3888 vca_id=vca_id,
3889 )
3890
3891 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3892 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3893 namespace = "." + db_nsr["_id"]
3894 try:
3895 await self.n2vc.delete_namespace(
3896 namespace=namespace,
3897 total_timeout=self.timeout_charm_delete,
3898 vca_id=vca_id,
3899 )
3900 except N2VCNotFound: # already deleted. Skip
3901 pass
3902 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3903
3904 async def _terminate_RO(
3905 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3906 ):
3907 """
3908 Terminates a deployment from RO
3909 :param logging_text:
3910 :param nsr_deployed: db_nsr._admin.deployed
3911 :param nsr_id:
3912 :param nslcmop_id:
3913 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3914 this method will update only the index 2, but it will write on database the concatenated content of the list
3915 :return:
3916 """
3917 db_nsr_update = {}
3918 failed_detail = []
3919 ro_nsr_id = ro_delete_action = None
3920 if nsr_deployed and nsr_deployed.get("RO"):
3921 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3922 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3923 try:
3924 if ro_nsr_id:
3925 stage[2] = "Deleting ns from VIM."
3926 db_nsr_update["detailed-status"] = " ".join(stage)
3927 self._write_op_status(nslcmop_id, stage)
3928 self.logger.debug(logging_text + stage[2])
3929 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3930 self._write_op_status(nslcmop_id, stage)
3931 desc = await self.RO.delete("ns", ro_nsr_id)
3932 ro_delete_action = desc["action_id"]
3933 db_nsr_update[
3934 "_admin.deployed.RO.nsr_delete_action_id"
3935 ] = ro_delete_action
3936 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3937 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3938 if ro_delete_action:
3939 # wait until NS is deleted from VIM
3940 stage[2] = "Waiting ns deleted from VIM."
3941 detailed_status_old = None
3942 self.logger.debug(
3943 logging_text
3944 + stage[2]
3945 + " RO_id={} ro_delete_action={}".format(
3946 ro_nsr_id, ro_delete_action
3947 )
3948 )
3949 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3950 self._write_op_status(nslcmop_id, stage)
3951
3952 delete_timeout = 20 * 60 # 20 minutes
3953 while delete_timeout > 0:
3954 desc = await self.RO.show(
3955 "ns",
3956 item_id_name=ro_nsr_id,
3957 extra_item="action",
3958 extra_item_id=ro_delete_action,
3959 )
3960
3961 # deploymentStatus
3962 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3963
3964 ns_status, ns_status_info = self.RO.check_action_status(desc)
3965 if ns_status == "ERROR":
3966 raise ROclient.ROClientException(ns_status_info)
3967 elif ns_status == "BUILD":
3968 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3969 elif ns_status == "ACTIVE":
3970 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3971 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3972 break
3973 else:
3974 assert (
3975 False
3976 ), "ROclient.check_action_status returns unknown {}".format(
3977 ns_status
3978 )
3979 if stage[2] != detailed_status_old:
3980 detailed_status_old = stage[2]
3981 db_nsr_update["detailed-status"] = " ".join(stage)
3982 self._write_op_status(nslcmop_id, stage)
3983 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3984 await asyncio.sleep(5, loop=self.loop)
3985 delete_timeout -= 5
3986 else: # delete_timeout <= 0:
3987 raise ROclient.ROClientException(
3988 "Timeout waiting ns deleted from VIM"
3989 )
3990
3991 except Exception as e:
3992 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3993 if (
3994 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3995 ): # not found
3996 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3997 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3998 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3999 self.logger.debug(
4000 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4001 )
4002 elif (
4003 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4004 ): # conflict
4005 failed_detail.append("delete conflict: {}".format(e))
4006 self.logger.debug(
4007 logging_text
4008 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4009 )
4010 else:
4011 failed_detail.append("delete error: {}".format(e))
4012 self.logger.error(
4013 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4014 )
4015
4016 # Delete nsd
4017 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4018 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4019 try:
4020 stage[2] = "Deleting nsd from RO."
4021 db_nsr_update["detailed-status"] = " ".join(stage)
4022 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4023 self._write_op_status(nslcmop_id, stage)
4024 await self.RO.delete("nsd", ro_nsd_id)
4025 self.logger.debug(
4026 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4027 )
4028 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4029 except Exception as e:
4030 if (
4031 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4032 ): # not found
4033 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4034 self.logger.debug(
4035 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4036 )
4037 elif (
4038 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4039 ): # conflict
4040 failed_detail.append(
4041 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4042 )
4043 self.logger.debug(logging_text + failed_detail[-1])
4044 else:
4045 failed_detail.append(
4046 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4047 )
4048 self.logger.error(logging_text + failed_detail[-1])
4049
4050 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4051 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4052 if not vnf_deployed or not vnf_deployed["id"]:
4053 continue
4054 try:
4055 ro_vnfd_id = vnf_deployed["id"]
4056 stage[
4057 2
4058 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4059 vnf_deployed["member-vnf-index"], ro_vnfd_id
4060 )
4061 db_nsr_update["detailed-status"] = " ".join(stage)
4062 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4063 self._write_op_status(nslcmop_id, stage)
4064 await self.RO.delete("vnfd", ro_vnfd_id)
4065 self.logger.debug(
4066 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4067 )
4068 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4069 except Exception as e:
4070 if (
4071 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4072 ): # not found
4073 db_nsr_update[
4074 "_admin.deployed.RO.vnfd.{}.id".format(index)
4075 ] = None
4076 self.logger.debug(
4077 logging_text
4078 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4079 )
4080 elif (
4081 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4082 ): # conflict
4083 failed_detail.append(
4084 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4085 )
4086 self.logger.debug(logging_text + failed_detail[-1])
4087 else:
4088 failed_detail.append(
4089 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4090 )
4091 self.logger.error(logging_text + failed_detail[-1])
4092
4093 if failed_detail:
4094 stage[2] = "Error deleting from VIM"
4095 else:
4096 stage[2] = "Deleted from VIM"
4097 db_nsr_update["detailed-status"] = " ".join(stage)
4098 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4099 self._write_op_status(nslcmop_id, stage)
4100
4101 if failed_detail:
4102 raise LcmException("; ".join(failed_detail))
4103
4104 async def terminate(self, nsr_id, nslcmop_id):
4105 # Try to lock HA task here
4106 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4107 if not task_is_locked_by_me:
4108 return
4109
4110 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4111 self.logger.debug(logging_text + "Enter")
4112 timeout_ns_terminate = self.timeout_ns_terminate
4113 db_nsr = None
4114 db_nslcmop = None
4115 operation_params = None
4116 exc = None
4117 error_list = [] # annotates all failed error messages
4118 db_nslcmop_update = {}
4119 autoremove = False # autoremove after terminated
4120 tasks_dict_info = {}
4121 db_nsr_update = {}
4122 stage = [
4123 "Stage 1/3: Preparing task.",
4124 "Waiting for previous operations to terminate.",
4125 "",
4126 ]
4127 # ^ contains [stage, step, VIM-status]
4128 try:
4129 # wait for any previous tasks in process
4130 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4131
4132 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4133 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4134 operation_params = db_nslcmop.get("operationParams") or {}
4135 if operation_params.get("timeout_ns_terminate"):
4136 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4137 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4138 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4139
4140 db_nsr_update["operational-status"] = "terminating"
4141 db_nsr_update["config-status"] = "terminating"
4142 self._write_ns_status(
4143 nsr_id=nsr_id,
4144 ns_state="TERMINATING",
4145 current_operation="TERMINATING",
4146 current_operation_id=nslcmop_id,
4147 other_update=db_nsr_update,
4148 )
4149 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4150 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4151 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4152 return
4153
4154 stage[1] = "Getting vnf descriptors from db."
4155 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4156 db_vnfrs_dict = {
4157 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4158 }
4159 db_vnfds_from_id = {}
4160 db_vnfds_from_member_index = {}
4161 # Loop over VNFRs
4162 for vnfr in db_vnfrs_list:
4163 vnfd_id = vnfr["vnfd-id"]
4164 if vnfd_id not in db_vnfds_from_id:
4165 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4166 db_vnfds_from_id[vnfd_id] = vnfd
4167 db_vnfds_from_member_index[
4168 vnfr["member-vnf-index-ref"]
4169 ] = db_vnfds_from_id[vnfd_id]
4170
4171 # Destroy individual execution environments when there are terminating primitives.
4172 # Rest of EE will be deleted at once
4173 # TODO - check before calling _destroy_N2VC
4174 # if not operation_params.get("skip_terminate_primitives"):#
4175 # or not vca.get("needed_terminate"):
4176 stage[0] = "Stage 2/3 execute terminating primitives."
4177 self.logger.debug(logging_text + stage[0])
4178 stage[1] = "Looking execution environment that needs terminate."
4179 self.logger.debug(logging_text + stage[1])
4180
4181 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4182 config_descriptor = None
4183 vca_member_vnf_index = vca.get("member-vnf-index")
4184 vca_id = self.get_vca_id(
4185 db_vnfrs_dict.get(vca_member_vnf_index)
4186 if vca_member_vnf_index
4187 else None,
4188 db_nsr,
4189 )
4190 if not vca or not vca.get("ee_id"):
4191 continue
4192 if not vca.get("member-vnf-index"):
4193 # ns
4194 config_descriptor = db_nsr.get("ns-configuration")
4195 elif vca.get("vdu_id"):
4196 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4197 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4198 elif vca.get("kdu_name"):
4199 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4200 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4201 else:
4202 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4203 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4204 vca_type = vca.get("type")
4205 exec_terminate_primitives = not operation_params.get(
4206 "skip_terminate_primitives"
4207 ) and vca.get("needed_terminate")
4208 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4209 # pending native charms
4210 destroy_ee = (
4211 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4212 )
4213 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4214 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4215 task = asyncio.ensure_future(
4216 self.destroy_N2VC(
4217 logging_text,
4218 db_nslcmop,
4219 vca,
4220 config_descriptor,
4221 vca_index,
4222 destroy_ee,
4223 exec_terminate_primitives,
4224 vca_id=vca_id,
4225 )
4226 )
4227 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4228
4229 # wait for pending tasks of terminate primitives
4230 if tasks_dict_info:
4231 self.logger.debug(
4232 logging_text
4233 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4234 )
4235 error_list = await self._wait_for_tasks(
4236 logging_text,
4237 tasks_dict_info,
4238 min(self.timeout_charm_delete, timeout_ns_terminate),
4239 stage,
4240 nslcmop_id,
4241 )
4242 tasks_dict_info.clear()
4243 if error_list:
4244 return # raise LcmException("; ".join(error_list))
4245
4246 # remove All execution environments at once
4247 stage[0] = "Stage 3/3 delete all."
4248
4249 if nsr_deployed.get("VCA"):
4250 stage[1] = "Deleting all execution environments."
4251 self.logger.debug(logging_text + stage[1])
4252 vca_id = self.get_vca_id({}, db_nsr)
4253 task_delete_ee = asyncio.ensure_future(
4254 asyncio.wait_for(
4255 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4256 timeout=self.timeout_charm_delete,
4257 )
4258 )
4259 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4260 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4261
4262 # Delete from k8scluster
4263 stage[1] = "Deleting KDUs."
4264 self.logger.debug(logging_text + stage[1])
4265 # print(nsr_deployed)
4266 for kdu in get_iterable(nsr_deployed, "K8s"):
4267 if not kdu or not kdu.get("kdu-instance"):
4268 continue
4269 kdu_instance = kdu.get("kdu-instance")
4270 if kdu.get("k8scluster-type") in self.k8scluster_map:
4271 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4272 vca_id = self.get_vca_id({}, db_nsr)
4273 task_delete_kdu_instance = asyncio.ensure_future(
4274 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4275 cluster_uuid=kdu.get("k8scluster-uuid"),
4276 kdu_instance=kdu_instance,
4277 vca_id=vca_id,
4278 )
4279 )
4280 else:
4281 self.logger.error(
4282 logging_text
4283 + "Unknown k8s deployment type {}".format(
4284 kdu.get("k8scluster-type")
4285 )
4286 )
4287 continue
4288 tasks_dict_info[
4289 task_delete_kdu_instance
4290 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4291
4292 # remove from RO
4293 stage[1] = "Deleting ns from VIM."
4294 if self.ng_ro:
4295 task_delete_ro = asyncio.ensure_future(
4296 self._terminate_ng_ro(
4297 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4298 )
4299 )
4300 else:
4301 task_delete_ro = asyncio.ensure_future(
4302 self._terminate_RO(
4303 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4304 )
4305 )
4306 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4307
4308 # rest of staff will be done at finally
4309
4310 except (
4311 ROclient.ROClientException,
4312 DbException,
4313 LcmException,
4314 N2VCException,
4315 ) as e:
4316 self.logger.error(logging_text + "Exit Exception {}".format(e))
4317 exc = e
4318 except asyncio.CancelledError:
4319 self.logger.error(
4320 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4321 )
4322 exc = "Operation was cancelled"
4323 except Exception as e:
4324 exc = traceback.format_exc()
4325 self.logger.critical(
4326 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4327 exc_info=True,
4328 )
4329 finally:
4330 if exc:
4331 error_list.append(str(exc))
4332 try:
4333 # wait for pending tasks
4334 if tasks_dict_info:
4335 stage[1] = "Waiting for terminate pending tasks."
4336 self.logger.debug(logging_text + stage[1])
4337 error_list += await self._wait_for_tasks(
4338 logging_text,
4339 tasks_dict_info,
4340 timeout_ns_terminate,
4341 stage,
4342 nslcmop_id,
4343 )
4344 stage[1] = stage[2] = ""
4345 except asyncio.CancelledError:
4346 error_list.append("Cancelled")
4347 # TODO cancell all tasks
4348 except Exception as exc:
4349 error_list.append(str(exc))
4350 # update status at database
4351 if error_list:
4352 error_detail = "; ".join(error_list)
4353 # self.logger.error(logging_text + error_detail)
4354 error_description_nslcmop = "{} Detail: {}".format(
4355 stage[0], error_detail
4356 )
4357 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4358 nslcmop_id, stage[0]
4359 )
4360
4361 db_nsr_update["operational-status"] = "failed"
4362 db_nsr_update["detailed-status"] = (
4363 error_description_nsr + " Detail: " + error_detail
4364 )
4365 db_nslcmop_update["detailed-status"] = error_detail
4366 nslcmop_operation_state = "FAILED"
4367 ns_state = "BROKEN"
4368 else:
4369 error_detail = None
4370 error_description_nsr = error_description_nslcmop = None
4371 ns_state = "NOT_INSTANTIATED"
4372 db_nsr_update["operational-status"] = "terminated"
4373 db_nsr_update["detailed-status"] = "Done"
4374 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4375 db_nslcmop_update["detailed-status"] = "Done"
4376 nslcmop_operation_state = "COMPLETED"
4377
4378 if db_nsr:
4379 self._write_ns_status(
4380 nsr_id=nsr_id,
4381 ns_state=ns_state,
4382 current_operation="IDLE",
4383 current_operation_id=None,
4384 error_description=error_description_nsr,
4385 error_detail=error_detail,
4386 other_update=db_nsr_update,
4387 )
4388 self._write_op_status(
4389 op_id=nslcmop_id,
4390 stage="",
4391 error_message=error_description_nslcmop,
4392 operation_state=nslcmop_operation_state,
4393 other_update=db_nslcmop_update,
4394 )
4395 if ns_state == "NOT_INSTANTIATED":
4396 try:
4397 self.db.set_list(
4398 "vnfrs",
4399 {"nsr-id-ref": nsr_id},
4400 {"_admin.nsState": "NOT_INSTANTIATED"},
4401 )
4402 except DbException as e:
4403 self.logger.warn(
4404 logging_text
4405 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4406 nsr_id, e
4407 )
4408 )
4409 if operation_params:
4410 autoremove = operation_params.get("autoremove", False)
4411 if nslcmop_operation_state:
4412 try:
4413 await self.msg.aiowrite(
4414 "ns",
4415 "terminated",
4416 {
4417 "nsr_id": nsr_id,
4418 "nslcmop_id": nslcmop_id,
4419 "operationState": nslcmop_operation_state,
4420 "autoremove": autoremove,
4421 },
4422 loop=self.loop,
4423 )
4424 except Exception as e:
4425 self.logger.error(
4426 logging_text + "kafka_write notification Exception {}".format(e)
4427 )
4428
4429 self.logger.debug(logging_text + "Exit")
4430 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4431
4432 async def _wait_for_tasks(
4433 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4434 ):
4435 time_start = time()
4436 error_detail_list = []
4437 error_list = []
4438 pending_tasks = list(created_tasks_info.keys())
4439 num_tasks = len(pending_tasks)
4440 num_done = 0
4441 stage[1] = "{}/{}.".format(num_done, num_tasks)
4442 self._write_op_status(nslcmop_id, stage)
4443 while pending_tasks:
4444 new_error = None
4445 _timeout = timeout + time_start - time()
4446 done, pending_tasks = await asyncio.wait(
4447 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4448 )
4449 num_done += len(done)
4450 if not done: # Timeout
4451 for task in pending_tasks:
4452 new_error = created_tasks_info[task] + ": Timeout"
4453 error_detail_list.append(new_error)
4454 error_list.append(new_error)
4455 break
4456 for task in done:
4457 if task.cancelled():
4458 exc = "Cancelled"
4459 else:
4460 exc = task.exception()
4461 if exc:
4462 if isinstance(exc, asyncio.TimeoutError):
4463 exc = "Timeout"
4464 new_error = created_tasks_info[task] + ": {}".format(exc)
4465 error_list.append(created_tasks_info[task])
4466 error_detail_list.append(new_error)
4467 if isinstance(
4468 exc,
4469 (
4470 str,
4471 DbException,
4472 N2VCException,
4473 ROclient.ROClientException,
4474 LcmException,
4475 K8sException,
4476 NgRoException,
4477 ),
4478 ):
4479 self.logger.error(logging_text + new_error)
4480 else:
4481 exc_traceback = "".join(
4482 traceback.format_exception(None, exc, exc.__traceback__)
4483 )
4484 self.logger.error(
4485 logging_text
4486 + created_tasks_info[task]
4487 + " "
4488 + exc_traceback
4489 )
4490 else:
4491 self.logger.debug(
4492 logging_text + created_tasks_info[task] + ": Done"
4493 )
4494 stage[1] = "{}/{}.".format(num_done, num_tasks)
4495 if new_error:
4496 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4497 if nsr_id: # update also nsr
4498 self.update_db_2(
4499 "nsrs",
4500 nsr_id,
4501 {
4502 "errorDescription": "Error at: " + ", ".join(error_list),
4503 "errorDetail": ". ".join(error_detail_list),
4504 },
4505 )
4506 self._write_op_status(nslcmop_id, stage)
4507 return error_detail_list
4508
4509 @staticmethod
4510 def _map_primitive_params(primitive_desc, params, instantiation_params):
4511 """
4512 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4513 The default-value is used. If it is between < > it look for a value at instantiation_params
4514 :param primitive_desc: portion of VNFD/NSD that describes primitive
4515 :param params: Params provided by user
4516 :param instantiation_params: Instantiation params provided by user
4517 :return: a dictionary with the calculated params
4518 """
4519 calculated_params = {}
4520 for parameter in primitive_desc.get("parameter", ()):
4521 param_name = parameter["name"]
4522 if param_name in params:
4523 calculated_params[param_name] = params[param_name]
4524 elif "default-value" in parameter or "value" in parameter:
4525 if "value" in parameter:
4526 calculated_params[param_name] = parameter["value"]
4527 else:
4528 calculated_params[param_name] = parameter["default-value"]
4529 if (
4530 isinstance(calculated_params[param_name], str)
4531 and calculated_params[param_name].startswith("<")
4532 and calculated_params[param_name].endswith(">")
4533 ):
4534 if calculated_params[param_name][1:-1] in instantiation_params:
4535 calculated_params[param_name] = instantiation_params[
4536 calculated_params[param_name][1:-1]
4537 ]
4538 else:
4539 raise LcmException(
4540 "Parameter {} needed to execute primitive {} not provided".format(
4541 calculated_params[param_name], primitive_desc["name"]
4542 )
4543 )
4544 else:
4545 raise LcmException(
4546 "Parameter {} needed to execute primitive {} not provided".format(
4547 param_name, primitive_desc["name"]
4548 )
4549 )
4550
4551 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4552 calculated_params[param_name] = yaml.safe_dump(
4553 calculated_params[param_name], default_flow_style=True, width=256
4554 )
4555 elif isinstance(calculated_params[param_name], str) and calculated_params[
4556 param_name
4557 ].startswith("!!yaml "):
4558 calculated_params[param_name] = calculated_params[param_name][7:]
4559 if parameter.get("data-type") == "INTEGER":
4560 try:
4561 calculated_params[param_name] = int(calculated_params[param_name])
4562 except ValueError: # error converting string to int
4563 raise LcmException(
4564 "Parameter {} of primitive {} must be integer".format(
4565 param_name, primitive_desc["name"]
4566 )
4567 )
4568 elif parameter.get("data-type") == "BOOLEAN":
4569 calculated_params[param_name] = not (
4570 (str(calculated_params[param_name])).lower() == "false"
4571 )
4572
4573 # add always ns_config_info if primitive name is config
4574 if primitive_desc["name"] == "config":
4575 if "ns_config_info" in instantiation_params:
4576 calculated_params["ns_config_info"] = instantiation_params[
4577 "ns_config_info"
4578 ]
4579 return calculated_params
4580
4581 def _look_for_deployed_vca(
4582 self,
4583 deployed_vca,
4584 member_vnf_index,
4585 vdu_id,
4586 vdu_count_index,
4587 kdu_name=None,
4588 ee_descriptor_id=None,
4589 ):
4590 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4591 for vca in deployed_vca:
4592 if not vca:
4593 continue
4594 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4595 continue
4596 if (
4597 vdu_count_index is not None
4598 and vdu_count_index != vca["vdu_count_index"]
4599 ):
4600 continue
4601 if kdu_name and kdu_name != vca["kdu_name"]:
4602 continue
4603 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4604 continue
4605 break
4606 else:
4607 # vca_deployed not found
4608 raise LcmException(
4609 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4610 " is not deployed".format(
4611 member_vnf_index,
4612 vdu_id,
4613 vdu_count_index,
4614 kdu_name,
4615 ee_descriptor_id,
4616 )
4617 )
4618 # get ee_id
4619 ee_id = vca.get("ee_id")
4620 vca_type = vca.get(
4621 "type", "lxc_proxy_charm"
4622 ) # default value for backward compatibility - proxy charm
4623 if not ee_id:
4624 raise LcmException(
4625 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4626 "execution environment".format(
4627 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4628 )
4629 )
4630 return ee_id, vca_type
4631
4632 async def _ns_execute_primitive(
4633 self,
4634 ee_id,
4635 primitive,
4636 primitive_params,
4637 retries=0,
4638 retries_interval=30,
4639 timeout=None,
4640 vca_type=None,
4641 db_dict=None,
4642 vca_id: str = None,
4643 ) -> (str, str):
4644 try:
4645 if primitive == "config":
4646 primitive_params = {"params": primitive_params}
4647
4648 vca_type = vca_type or "lxc_proxy_charm"
4649
4650 while retries >= 0:
4651 try:
4652 output = await asyncio.wait_for(
4653 self.vca_map[vca_type].exec_primitive(
4654 ee_id=ee_id,
4655 primitive_name=primitive,
4656 params_dict=primitive_params,
4657 progress_timeout=self.timeout_progress_primitive,
4658 total_timeout=self.timeout_primitive,
4659 db_dict=db_dict,
4660 vca_id=vca_id,
4661 vca_type=vca_type,
4662 ),
4663 timeout=timeout or self.timeout_primitive,
4664 )
4665 # execution was OK
4666 break
4667 except asyncio.CancelledError:
4668 raise
4669 except Exception as e: # asyncio.TimeoutError
4670 if isinstance(e, asyncio.TimeoutError):
4671 e = "Timeout"
4672 retries -= 1
4673 if retries >= 0:
4674 self.logger.debug(
4675 "Error executing action {} on {} -> {}".format(
4676 primitive, ee_id, e
4677 )
4678 )
4679 # wait and retry
4680 await asyncio.sleep(retries_interval, loop=self.loop)
4681 else:
4682 return "FAILED", str(e)
4683
4684 return "COMPLETED", output
4685
4686 except (LcmException, asyncio.CancelledError):
4687 raise
4688 except Exception as e:
4689 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4690
4691 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4692 """
4693 Updating the vca_status with latest juju information in nsrs record
4694 :param: nsr_id: Id of the nsr
4695 :param: nslcmop_id: Id of the nslcmop
4696 :return: None
4697 """
4698
4699 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4700 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4701 vca_id = self.get_vca_id({}, db_nsr)
4702 if db_nsr["_admin"]["deployed"]["K8s"]:
4703 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4704 cluster_uuid, kdu_instance, cluster_type = (
4705 k8s["k8scluster-uuid"],
4706 k8s["kdu-instance"],
4707 k8s["k8scluster-type"],
4708 )
4709 await self._on_update_k8s_db(
4710 cluster_uuid=cluster_uuid,
4711 kdu_instance=kdu_instance,
4712 filter={"_id": nsr_id},
4713 vca_id=vca_id,
4714 cluster_type=cluster_type,
4715 )
4716 else:
4717 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4718 table, filter = "nsrs", {"_id": nsr_id}
4719 path = "_admin.deployed.VCA.{}.".format(vca_index)
4720 await self._on_update_n2vc_db(table, filter, path, {})
4721
4722 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4723 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4724
4725 async def action(self, nsr_id, nslcmop_id):
4726 # Try to lock HA task here
4727 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4728 if not task_is_locked_by_me:
4729 return
4730
4731 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4732 self.logger.debug(logging_text + "Enter")
4733 # get all needed from database
4734 db_nsr = None
4735 db_nslcmop = None
4736 db_nsr_update = {}
4737 db_nslcmop_update = {}
4738 nslcmop_operation_state = None
4739 error_description_nslcmop = None
4740 exc = None
4741 try:
4742 # wait for any previous tasks in process
4743 step = "Waiting for previous operations to terminate"
4744 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4745
4746 self._write_ns_status(
4747 nsr_id=nsr_id,
4748 ns_state=None,
4749 current_operation="RUNNING ACTION",
4750 current_operation_id=nslcmop_id,
4751 )
4752
4753 step = "Getting information from database"
4754 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4755 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4756 if db_nslcmop["operationParams"].get("primitive_params"):
4757 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4758 db_nslcmop["operationParams"]["primitive_params"]
4759 )
4760
4761 nsr_deployed = db_nsr["_admin"].get("deployed")
4762 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4763 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4764 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4765 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4766 primitive = db_nslcmop["operationParams"]["primitive"]
4767 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4768 timeout_ns_action = db_nslcmop["operationParams"].get(
4769 "timeout_ns_action", self.timeout_primitive
4770 )
4771
4772 if vnf_index:
4773 step = "Getting vnfr from database"
4774 db_vnfr = self.db.get_one(
4775 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4776 )
4777 if db_vnfr.get("kdur"):
4778 kdur_list = []
4779 for kdur in db_vnfr["kdur"]:
4780 if kdur.get("additionalParams"):
4781 kdur["additionalParams"] = json.loads(
4782 kdur["additionalParams"]
4783 )
4784 kdur_list.append(kdur)
4785 db_vnfr["kdur"] = kdur_list
4786 step = "Getting vnfd from database"
4787 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4788 else:
4789 step = "Getting nsd from database"
4790 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4791
4792 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4793 # for backward compatibility
4794 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4795 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4796 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4797 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4798
4799 # look for primitive
4800 config_primitive_desc = descriptor_configuration = None
4801 if vdu_id:
4802 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4803 elif kdu_name:
4804 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4805 elif vnf_index:
4806 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4807 else:
4808 descriptor_configuration = db_nsd.get("ns-configuration")
4809
4810 if descriptor_configuration and descriptor_configuration.get(
4811 "config-primitive"
4812 ):
4813 for config_primitive in descriptor_configuration["config-primitive"]:
4814 if config_primitive["name"] == primitive:
4815 config_primitive_desc = config_primitive
4816 break
4817
4818 if not config_primitive_desc:
4819 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4820 raise LcmException(
4821 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4822 primitive
4823 )
4824 )
4825 primitive_name = primitive
4826 ee_descriptor_id = None
4827 else:
4828 primitive_name = config_primitive_desc.get(
4829 "execution-environment-primitive", primitive
4830 )
4831 ee_descriptor_id = config_primitive_desc.get(
4832 "execution-environment-ref"
4833 )
4834
4835 if vnf_index:
4836 if vdu_id:
4837 vdur = next(
4838 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4839 )
4840 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4841 elif kdu_name:
4842 kdur = next(
4843 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4844 )
4845 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4846 else:
4847 desc_params = parse_yaml_strings(
4848 db_vnfr.get("additionalParamsForVnf")
4849 )
4850 else:
4851 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4852 if kdu_name and get_configuration(db_vnfd, kdu_name):
4853 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4854 actions = set()
4855 for primitive in kdu_configuration.get("initial-config-primitive", []):
4856 actions.add(primitive["name"])
4857 for primitive in kdu_configuration.get("config-primitive", []):
4858 actions.add(primitive["name"])
4859 kdu_action = True if primitive_name in actions else False
4860
4861 # TODO check if ns is in a proper status
4862 if kdu_name and (
4863 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4864 ):
4865 # kdur and desc_params already set from before
4866 if primitive_params:
4867 desc_params.update(primitive_params)
4868 # TODO Check if we will need something at vnf level
4869 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4870 if (
4871 kdu_name == kdu["kdu-name"]
4872 and kdu["member-vnf-index"] == vnf_index
4873 ):
4874 break
4875 else:
4876 raise LcmException(
4877 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4878 )
4879
4880 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4881 msg = "unknown k8scluster-type '{}'".format(
4882 kdu.get("k8scluster-type")
4883 )
4884 raise LcmException(msg)
4885
4886 db_dict = {
4887 "collection": "nsrs",
4888 "filter": {"_id": nsr_id},
4889 "path": "_admin.deployed.K8s.{}".format(index),
4890 }
4891 self.logger.debug(
4892 logging_text
4893 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4894 )
4895 step = "Executing kdu {}".format(primitive_name)
4896 if primitive_name == "upgrade":
4897 if desc_params.get("kdu_model"):
4898 kdu_model = desc_params.get("kdu_model")
4899 del desc_params["kdu_model"]
4900 else:
4901 kdu_model = kdu.get("kdu-model")
4902 parts = kdu_model.split(sep=":")
4903 if len(parts) == 2:
4904 kdu_model = parts[0]
4905 if desc_params.get("kdu_atomic_upgrade"):
4906 atomic_upgrade = desc_params.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
4907 del desc_params["kdu_atomic_upgrade"]
4908 else:
4909 atomic_upgrade = True
4910
4911 detailed_status = await asyncio.wait_for(
4912 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4913 cluster_uuid=kdu.get("k8scluster-uuid"),
4914 kdu_instance=kdu.get("kdu-instance"),
4915 atomic=atomic_upgrade,
4916 kdu_model=kdu_model,
4917 params=desc_params,
4918 db_dict=db_dict,
4919 timeout=timeout_ns_action,
4920 ),
4921 timeout=timeout_ns_action + 10,
4922 )
4923 self.logger.debug(
4924 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4925 )
4926 elif primitive_name == "rollback":
4927 detailed_status = await asyncio.wait_for(
4928 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4929 cluster_uuid=kdu.get("k8scluster-uuid"),
4930 kdu_instance=kdu.get("kdu-instance"),
4931 db_dict=db_dict,
4932 ),
4933 timeout=timeout_ns_action,
4934 )
4935 elif primitive_name == "status":
4936 detailed_status = await asyncio.wait_for(
4937 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4938 cluster_uuid=kdu.get("k8scluster-uuid"),
4939 kdu_instance=kdu.get("kdu-instance"),
4940 vca_id=vca_id,
4941 ),
4942 timeout=timeout_ns_action,
4943 )
4944 else:
4945 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4946 kdu["kdu-name"], nsr_id
4947 )
4948 params = self._map_primitive_params(
4949 config_primitive_desc, primitive_params, desc_params
4950 )
4951
4952 detailed_status = await asyncio.wait_for(
4953 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4954 cluster_uuid=kdu.get("k8scluster-uuid"),
4955 kdu_instance=kdu_instance,
4956 primitive_name=primitive_name,
4957 params=params,
4958 db_dict=db_dict,
4959 timeout=timeout_ns_action,
4960 vca_id=vca_id,
4961 ),
4962 timeout=timeout_ns_action,
4963 )
4964
4965 if detailed_status:
4966 nslcmop_operation_state = "COMPLETED"
4967 else:
4968 detailed_status = ""
4969 nslcmop_operation_state = "FAILED"
4970 else:
4971 ee_id, vca_type = self._look_for_deployed_vca(
4972 nsr_deployed["VCA"],
4973 member_vnf_index=vnf_index,
4974 vdu_id=vdu_id,
4975 vdu_count_index=vdu_count_index,
4976 ee_descriptor_id=ee_descriptor_id,
4977 )
4978 for vca_index, vca_deployed in enumerate(
4979 db_nsr["_admin"]["deployed"]["VCA"]
4980 ):
4981 if vca_deployed.get("member-vnf-index") == vnf_index:
4982 db_dict = {
4983 "collection": "nsrs",
4984 "filter": {"_id": nsr_id},
4985 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4986 }
4987 break
4988 (
4989 nslcmop_operation_state,
4990 detailed_status,
4991 ) = await self._ns_execute_primitive(
4992 ee_id,
4993 primitive=primitive_name,
4994 primitive_params=self._map_primitive_params(
4995 config_primitive_desc, primitive_params, desc_params
4996 ),
4997 timeout=timeout_ns_action,
4998 vca_type=vca_type,
4999 db_dict=db_dict,
5000 vca_id=vca_id,
5001 )
5002
5003 db_nslcmop_update["detailed-status"] = detailed_status
5004 error_description_nslcmop = (
5005 detailed_status if nslcmop_operation_state == "FAILED" else ""
5006 )
5007 self.logger.debug(
5008 logging_text
5009 + " task Done with result {} {}".format(
5010 nslcmop_operation_state, detailed_status
5011 )
5012 )
5013 return # database update is called inside finally
5014
5015 except (DbException, LcmException, N2VCException, K8sException) as e:
5016 self.logger.error(logging_text + "Exit Exception {}".format(e))
5017 exc = e
5018 except asyncio.CancelledError:
5019 self.logger.error(
5020 logging_text + "Cancelled Exception while '{}'".format(step)
5021 )
5022 exc = "Operation was cancelled"
5023 except asyncio.TimeoutError:
5024 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5025 exc = "Timeout"
5026 except Exception as e:
5027 exc = traceback.format_exc()
5028 self.logger.critical(
5029 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5030 exc_info=True,
5031 )
5032 finally:
5033 if exc:
5034 db_nslcmop_update[
5035 "detailed-status"
5036 ] = (
5037 detailed_status
5038 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5039 nslcmop_operation_state = "FAILED"
5040 if db_nsr:
5041 self._write_ns_status(
5042 nsr_id=nsr_id,
5043 ns_state=db_nsr[
5044 "nsState"
5045 ], # TODO check if degraded. For the moment use previous status
5046 current_operation="IDLE",
5047 current_operation_id=None,
5048 # error_description=error_description_nsr,
5049 # error_detail=error_detail,
5050 other_update=db_nsr_update,
5051 )
5052
5053 self._write_op_status(
5054 op_id=nslcmop_id,
5055 stage="",
5056 error_message=error_description_nslcmop,
5057 operation_state=nslcmop_operation_state,
5058 other_update=db_nslcmop_update,
5059 )
5060
5061 if nslcmop_operation_state:
5062 try:
5063 await self.msg.aiowrite(
5064 "ns",
5065 "actioned",
5066 {
5067 "nsr_id": nsr_id,
5068 "nslcmop_id": nslcmop_id,
5069 "operationState": nslcmop_operation_state,
5070 },
5071 loop=self.loop,
5072 )
5073 except Exception as e:
5074 self.logger.error(
5075 logging_text + "kafka_write notification Exception {}".format(e)
5076 )
5077 self.logger.debug(logging_text + "Exit")
5078 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5079 return nslcmop_operation_state, detailed_status
5080
5081 async def scale(self, nsr_id, nslcmop_id):
5082 # Try to lock HA task here
5083 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5084 if not task_is_locked_by_me:
5085 return
5086
5087 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5088 stage = ["", "", ""]
5089 tasks_dict_info = {}
5090 # ^ stage, step, VIM progress
5091 self.logger.debug(logging_text + "Enter")
5092 # get all needed from database
5093 db_nsr = None
5094 db_nslcmop_update = {}
5095 db_nsr_update = {}
5096 exc = None
5097 # in case of error, indicates what part of scale was failed to put nsr at error status
5098 scale_process = None
5099 old_operational_status = ""
5100 old_config_status = ""
5101 nsi_id = None
5102 try:
5103 # wait for any previous tasks in process
5104 step = "Waiting for previous operations to terminate"
5105 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5106 self._write_ns_status(
5107 nsr_id=nsr_id,
5108 ns_state=None,
5109 current_operation="SCALING",
5110 current_operation_id=nslcmop_id,
5111 )
5112
5113 step = "Getting nslcmop from database"
5114 self.logger.debug(
5115 step + " after having waited for previous tasks to be completed"
5116 )
5117 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5118
5119 step = "Getting nsr from database"
5120 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5121 old_operational_status = db_nsr["operational-status"]
5122 old_config_status = db_nsr["config-status"]
5123
5124 step = "Parsing scaling parameters"
5125 db_nsr_update["operational-status"] = "scaling"
5126 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5127 nsr_deployed = db_nsr["_admin"].get("deployed")
5128
5129 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5130 "scaleByStepData"
5131 ]["member-vnf-index"]
5132 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5133 "scaleByStepData"
5134 ]["scaling-group-descriptor"]
5135 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5136 # for backward compatibility
5137 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5138 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5139 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5141
5142 step = "Getting vnfr from database"
5143 db_vnfr = self.db.get_one(
5144 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5145 )
5146
5147 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5148
5149 step = "Getting vnfd from database"
5150 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5151
5152 base_folder = db_vnfd["_admin"]["storage"]
5153
5154 step = "Getting scaling-group-descriptor"
5155 scaling_descriptor = find_in_list(
5156 get_scaling_aspect(db_vnfd),
5157 lambda scale_desc: scale_desc["name"] == scaling_group,
5158 )
5159 if not scaling_descriptor:
5160 raise LcmException(
5161 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5162 "at vnfd:scaling-group-descriptor".format(scaling_group)
5163 )
5164
5165 step = "Sending scale order to VIM"
5166 # TODO check if ns is in a proper status
5167 nb_scale_op = 0
5168 if not db_nsr["_admin"].get("scaling-group"):
5169 self.update_db_2(
5170 "nsrs",
5171 nsr_id,
5172 {
5173 "_admin.scaling-group": [
5174 {"name": scaling_group, "nb-scale-op": 0}
5175 ]
5176 },
5177 )
5178 admin_scale_index = 0
5179 else:
5180 for admin_scale_index, admin_scale_info in enumerate(
5181 db_nsr["_admin"]["scaling-group"]
5182 ):
5183 if admin_scale_info["name"] == scaling_group:
5184 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5185 break
5186 else: # not found, set index one plus last element and add new entry with the name
5187 admin_scale_index += 1
5188 db_nsr_update[
5189 "_admin.scaling-group.{}.name".format(admin_scale_index)
5190 ] = scaling_group
5191
5192 vca_scaling_info = []
5193 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5194 if scaling_type == "SCALE_OUT":
5195 if "aspect-delta-details" not in scaling_descriptor:
5196 raise LcmException(
5197 "Aspect delta details not fount in scaling descriptor {}".format(
5198 scaling_descriptor["name"]
5199 )
5200 )
5201 # count if max-instance-count is reached
5202 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5203
5204 scaling_info["scaling_direction"] = "OUT"
5205 scaling_info["vdu-create"] = {}
5206 scaling_info["kdu-create"] = {}
5207 for delta in deltas:
5208 for vdu_delta in delta.get("vdu-delta", {}):
5209 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5210 # vdu_index also provides the number of instance of the targeted vdu
5211 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5212 cloud_init_text = self._get_vdu_cloud_init_content(
5213 vdud, db_vnfd
5214 )
5215 if cloud_init_text:
5216 additional_params = (
5217 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5218 or {}
5219 )
5220 cloud_init_list = []
5221
5222 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5223 max_instance_count = 10
5224 if vdu_profile and "max-number-of-instances" in vdu_profile:
5225 max_instance_count = vdu_profile.get(
5226 "max-number-of-instances", 10
5227 )
5228
5229 default_instance_num = get_number_of_instances(
5230 db_vnfd, vdud["id"]
5231 )
5232 instances_number = vdu_delta.get("number-of-instances", 1)
5233 nb_scale_op += instances_number
5234
5235 new_instance_count = nb_scale_op + default_instance_num
5236 # Control if new count is over max and vdu count is less than max.
5237 # Then assign new instance count
5238 if new_instance_count > max_instance_count > vdu_count:
5239 instances_number = new_instance_count - max_instance_count
5240 else:
5241 instances_number = instances_number
5242
5243 if new_instance_count > max_instance_count:
5244 raise LcmException(
5245 "reached the limit of {} (max-instance-count) "
5246 "scaling-out operations for the "
5247 "scaling-group-descriptor '{}'".format(
5248 nb_scale_op, scaling_group
5249 )
5250 )
5251 for x in range(vdu_delta.get("number-of-instances", 1)):
5252 if cloud_init_text:
5253 # TODO Information of its own ip is not available because db_vnfr is not updated.
5254 additional_params["OSM"] = get_osm_params(
5255 db_vnfr, vdu_delta["id"], vdu_index + x
5256 )
5257 cloud_init_list.append(
5258 self._parse_cloud_init(
5259 cloud_init_text,
5260 additional_params,
5261 db_vnfd["id"],
5262 vdud["id"],
5263 )
5264 )
5265 vca_scaling_info.append(
5266 {
5267 "osm_vdu_id": vdu_delta["id"],
5268 "member-vnf-index": vnf_index,
5269 "type": "create",
5270 "vdu_index": vdu_index + x,
5271 }
5272 )
5273 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5274 for kdu_delta in delta.get("kdu-resource-delta", {}):
5275 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5276 kdu_name = kdu_profile["kdu-name"]
5277 resource_name = kdu_profile["resource-name"]
5278
5279 # Might have different kdus in the same delta
5280 # Should have list for each kdu
5281 if not scaling_info["kdu-create"].get(kdu_name, None):
5282 scaling_info["kdu-create"][kdu_name] = []
5283
5284 kdur = get_kdur(db_vnfr, kdu_name)
5285 if kdur.get("helm-chart"):
5286 k8s_cluster_type = "helm-chart-v3"
5287 self.logger.debug("kdur: {}".format(kdur))
5288 if (
5289 kdur.get("helm-version")
5290 and kdur.get("helm-version") == "v2"
5291 ):
5292 k8s_cluster_type = "helm-chart"
5293 raise NotImplementedError
5294 elif kdur.get("juju-bundle"):
5295 k8s_cluster_type = "juju-bundle"
5296 else:
5297 raise LcmException(
5298 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5299 "juju-bundle. Maybe an old NBI version is running".format(
5300 db_vnfr["member-vnf-index-ref"], kdu_name
5301 )
5302 )
5303
5304 max_instance_count = 10
5305 if kdu_profile and "max-number-of-instances" in kdu_profile:
5306 max_instance_count = kdu_profile.get(
5307 "max-number-of-instances", 10
5308 )
5309
5310 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5311 deployed_kdu, _ = get_deployed_kdu(
5312 nsr_deployed, kdu_name, vnf_index
5313 )
5314 if deployed_kdu is None:
5315 raise LcmException(
5316 "KDU '{}' for vnf '{}' not deployed".format(
5317 kdu_name, vnf_index
5318 )
5319 )
5320 kdu_instance = deployed_kdu.get("kdu-instance")
5321 instance_num = await self.k8scluster_map[
5322 k8s_cluster_type
5323 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5324 kdu_replica_count = instance_num + kdu_delta.get(
5325 "number-of-instances", 1
5326 )
5327
5328 # Control if new count is over max and instance_num is less than max.
5329 # Then assign max instance number to kdu replica count
5330 if kdu_replica_count > max_instance_count > instance_num:
5331 kdu_replica_count = max_instance_count
5332 if kdu_replica_count > max_instance_count:
5333 raise LcmException(
5334 "reached the limit of {} (max-instance-count) "
5335 "scaling-out operations for the "
5336 "scaling-group-descriptor '{}'".format(
5337 instance_num, scaling_group
5338 )
5339 )
5340
5341 for x in range(kdu_delta.get("number-of-instances", 1)):
5342 vca_scaling_info.append(
5343 {
5344 "osm_kdu_id": kdu_name,
5345 "member-vnf-index": vnf_index,
5346 "type": "create",
5347 "kdu_index": instance_num + x - 1,
5348 }
5349 )
5350 scaling_info["kdu-create"][kdu_name].append(
5351 {
5352 "member-vnf-index": vnf_index,
5353 "type": "create",
5354 "k8s-cluster-type": k8s_cluster_type,
5355 "resource-name": resource_name,
5356 "scale": kdu_replica_count,
5357 }
5358 )
5359 elif scaling_type == "SCALE_IN":
5360 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5361
5362 scaling_info["scaling_direction"] = "IN"
5363 scaling_info["vdu-delete"] = {}
5364 scaling_info["kdu-delete"] = {}
5365
5366 for delta in deltas:
5367 for vdu_delta in delta.get("vdu-delta", {}):
5368 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5369 min_instance_count = 0
5370 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5371 if vdu_profile and "min-number-of-instances" in vdu_profile:
5372 min_instance_count = vdu_profile["min-number-of-instances"]
5373
5374 default_instance_num = get_number_of_instances(
5375 db_vnfd, vdu_delta["id"]
5376 )
5377 instance_num = vdu_delta.get("number-of-instances", 1)
5378 nb_scale_op -= instance_num
5379
5380 new_instance_count = nb_scale_op + default_instance_num
5381
5382 if new_instance_count < min_instance_count < vdu_count:
5383 instances_number = min_instance_count - new_instance_count
5384 else:
5385 instances_number = instance_num
5386
5387 if new_instance_count < min_instance_count:
5388 raise LcmException(
5389 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5390 "scaling-group-descriptor '{}'".format(
5391 nb_scale_op, scaling_group
5392 )
5393 )
5394 for x in range(vdu_delta.get("number-of-instances", 1)):
5395 vca_scaling_info.append(
5396 {
5397 "osm_vdu_id": vdu_delta["id"],
5398 "member-vnf-index": vnf_index,
5399 "type": "delete",
5400 "vdu_index": vdu_index - 1 - x,
5401 }
5402 )
5403 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5404 for kdu_delta in delta.get("kdu-resource-delta", {}):
5405 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5406 kdu_name = kdu_profile["kdu-name"]
5407 resource_name = kdu_profile["resource-name"]
5408
5409 if not scaling_info["kdu-delete"].get(kdu_name, None):
5410 scaling_info["kdu-delete"][kdu_name] = []
5411
5412 kdur = get_kdur(db_vnfr, kdu_name)
5413 if kdur.get("helm-chart"):
5414 k8s_cluster_type = "helm-chart-v3"
5415 self.logger.debug("kdur: {}".format(kdur))
5416 if (
5417 kdur.get("helm-version")
5418 and kdur.get("helm-version") == "v2"
5419 ):
5420 k8s_cluster_type = "helm-chart"
5421 raise NotImplementedError
5422 elif kdur.get("juju-bundle"):
5423 k8s_cluster_type = "juju-bundle"
5424 else:
5425 raise LcmException(
5426 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5427 "juju-bundle. Maybe an old NBI version is running".format(
5428 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5429 )
5430 )
5431
5432 min_instance_count = 0
5433 if kdu_profile and "min-number-of-instances" in kdu_profile:
5434 min_instance_count = kdu_profile["min-number-of-instances"]
5435
5436 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5437 deployed_kdu, _ = get_deployed_kdu(
5438 nsr_deployed, kdu_name, vnf_index
5439 )
5440 if deployed_kdu is None:
5441 raise LcmException(
5442 "KDU '{}' for vnf '{}' not deployed".format(
5443 kdu_name, vnf_index
5444 )
5445 )
5446 kdu_instance = deployed_kdu.get("kdu-instance")
5447 instance_num = await self.k8scluster_map[
5448 k8s_cluster_type
5449 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5450 kdu_replica_count = instance_num - kdu_delta.get(
5451 "number-of-instances", 1
5452 )
5453
5454 if kdu_replica_count < min_instance_count < instance_num:
5455 kdu_replica_count = min_instance_count
5456 if kdu_replica_count < min_instance_count:
5457 raise LcmException(
5458 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5459 "scaling-group-descriptor '{}'".format(
5460 instance_num, scaling_group
5461 )
5462 )
5463
5464 for x in range(kdu_delta.get("number-of-instances", 1)):
5465 vca_scaling_info.append(
5466 {
5467 "osm_kdu_id": kdu_name,
5468 "member-vnf-index": vnf_index,
5469 "type": "delete",
5470 "kdu_index": instance_num - x - 1,
5471 }
5472 )
5473 scaling_info["kdu-delete"][kdu_name].append(
5474 {
5475 "member-vnf-index": vnf_index,
5476 "type": "delete",
5477 "k8s-cluster-type": k8s_cluster_type,
5478 "resource-name": resource_name,
5479 "scale": kdu_replica_count,
5480 }
5481 )
5482
5483 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5484 vdu_delete = copy(scaling_info.get("vdu-delete"))
5485 if scaling_info["scaling_direction"] == "IN":
5486 for vdur in reversed(db_vnfr["vdur"]):
5487 if vdu_delete.get(vdur["vdu-id-ref"]):
5488 vdu_delete[vdur["vdu-id-ref"]] -= 1
5489 scaling_info["vdu"].append(
5490 {
5491 "name": vdur.get("name") or vdur.get("vdu-name"),
5492 "vdu_id": vdur["vdu-id-ref"],
5493 "interface": [],
5494 }
5495 )
5496 for interface in vdur["interfaces"]:
5497 scaling_info["vdu"][-1]["interface"].append(
5498 {
5499 "name": interface["name"],
5500 "ip_address": interface["ip-address"],
5501 "mac_address": interface.get("mac-address"),
5502 }
5503 )
5504 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5505
5506 # PRE-SCALE BEGIN
5507 step = "Executing pre-scale vnf-config-primitive"
5508 if scaling_descriptor.get("scaling-config-action"):
5509 for scaling_config_action in scaling_descriptor[
5510 "scaling-config-action"
5511 ]:
5512 if (
5513 scaling_config_action.get("trigger") == "pre-scale-in"
5514 and scaling_type == "SCALE_IN"
5515 ) or (
5516 scaling_config_action.get("trigger") == "pre-scale-out"
5517 and scaling_type == "SCALE_OUT"
5518 ):
5519 vnf_config_primitive = scaling_config_action[
5520 "vnf-config-primitive-name-ref"
5521 ]
5522 step = db_nslcmop_update[
5523 "detailed-status"
5524 ] = "executing pre-scale scaling-config-action '{}'".format(
5525 vnf_config_primitive
5526 )
5527
5528 # look for primitive
5529 for config_primitive in (
5530 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5531 ).get("config-primitive", ()):
5532 if config_primitive["name"] == vnf_config_primitive:
5533 break
5534 else:
5535 raise LcmException(
5536 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5537 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5538 "primitive".format(scaling_group, vnf_config_primitive)
5539 )
5540
5541 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5542 if db_vnfr.get("additionalParamsForVnf"):
5543 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5544
5545 scale_process = "VCA"
5546 db_nsr_update["config-status"] = "configuring pre-scaling"
5547 primitive_params = self._map_primitive_params(
5548 config_primitive, {}, vnfr_params
5549 )
5550
5551 # Pre-scale retry check: Check if this sub-operation has been executed before
5552 op_index = self._check_or_add_scale_suboperation(
5553 db_nslcmop,
5554 vnf_index,
5555 vnf_config_primitive,
5556 primitive_params,
5557 "PRE-SCALE",
5558 )
5559 if op_index == self.SUBOPERATION_STATUS_SKIP:
5560 # Skip sub-operation
5561 result = "COMPLETED"
5562 result_detail = "Done"
5563 self.logger.debug(
5564 logging_text
5565 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5566 vnf_config_primitive, result, result_detail
5567 )
5568 )
5569 else:
5570 if op_index == self.SUBOPERATION_STATUS_NEW:
5571 # New sub-operation: Get index of this sub-operation
5572 op_index = (
5573 len(db_nslcmop.get("_admin", {}).get("operations"))
5574 - 1
5575 )
5576 self.logger.debug(
5577 logging_text
5578 + "vnf_config_primitive={} New sub-operation".format(
5579 vnf_config_primitive
5580 )
5581 )
5582 else:
5583 # retry: Get registered params for this existing sub-operation
5584 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5585 op_index
5586 ]
5587 vnf_index = op.get("member_vnf_index")
5588 vnf_config_primitive = op.get("primitive")
5589 primitive_params = op.get("primitive_params")
5590 self.logger.debug(
5591 logging_text
5592 + "vnf_config_primitive={} Sub-operation retry".format(
5593 vnf_config_primitive
5594 )
5595 )
5596 # Execute the primitive, either with new (first-time) or registered (reintent) args
5597 ee_descriptor_id = config_primitive.get(
5598 "execution-environment-ref"
5599 )
5600 primitive_name = config_primitive.get(
5601 "execution-environment-primitive", vnf_config_primitive
5602 )
5603 ee_id, vca_type = self._look_for_deployed_vca(
5604 nsr_deployed["VCA"],
5605 member_vnf_index=vnf_index,
5606 vdu_id=None,
5607 vdu_count_index=None,
5608 ee_descriptor_id=ee_descriptor_id,
5609 )
5610 result, result_detail = await self._ns_execute_primitive(
5611 ee_id,
5612 primitive_name,
5613 primitive_params,
5614 vca_type=vca_type,
5615 vca_id=vca_id,
5616 )
5617 self.logger.debug(
5618 logging_text
5619 + "vnf_config_primitive={} Done with result {} {}".format(
5620 vnf_config_primitive, result, result_detail
5621 )
5622 )
5623 # Update operationState = COMPLETED | FAILED
5624 self._update_suboperation_status(
5625 db_nslcmop, op_index, result, result_detail
5626 )
5627
5628 if result == "FAILED":
5629 raise LcmException(result_detail)
5630 db_nsr_update["config-status"] = old_config_status
5631 scale_process = None
5632 # PRE-SCALE END
5633
5634 db_nsr_update[
5635 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5636 ] = nb_scale_op
5637 db_nsr_update[
5638 "_admin.scaling-group.{}.time".format(admin_scale_index)
5639 ] = time()
5640
5641 # SCALE-IN VCA - BEGIN
5642 if vca_scaling_info:
5643 step = db_nslcmop_update[
5644 "detailed-status"
5645 ] = "Deleting the execution environments"
5646 scale_process = "VCA"
5647 for vca_info in vca_scaling_info:
5648 if vca_info["type"] == "delete":
5649 member_vnf_index = str(vca_info["member-vnf-index"])
5650 self.logger.debug(
5651 logging_text + "vdu info: {}".format(vca_info)
5652 )
5653 if vca_info.get("osm_vdu_id"):
5654 vdu_id = vca_info["osm_vdu_id"]
5655 vdu_index = int(vca_info["vdu_index"])
5656 stage[
5657 1
5658 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5659 member_vnf_index, vdu_id, vdu_index
5660 )
5661 else:
5662 vdu_index = 0
5663 kdu_id = vca_info["osm_kdu_id"]
5664 stage[
5665 1
5666 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5667 member_vnf_index, kdu_id, vdu_index
5668 )
5669 stage[2] = step = "Scaling in VCA"
5670 self._write_op_status(op_id=nslcmop_id, stage=stage)
5671 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5672 config_update = db_nsr["configurationStatus"]
5673 for vca_index, vca in enumerate(vca_update):
5674 if (
5675 (vca or vca.get("ee_id"))
5676 and vca["member-vnf-index"] == member_vnf_index
5677 and vca["vdu_count_index"] == vdu_index
5678 ):
5679 if vca.get("vdu_id"):
5680 config_descriptor = get_configuration(
5681 db_vnfd, vca.get("vdu_id")
5682 )
5683 elif vca.get("kdu_name"):
5684 config_descriptor = get_configuration(
5685 db_vnfd, vca.get("kdu_name")
5686 )
5687 else:
5688 config_descriptor = get_configuration(
5689 db_vnfd, db_vnfd["id"]
5690 )
5691 operation_params = (
5692 db_nslcmop.get("operationParams") or {}
5693 )
5694 exec_terminate_primitives = not operation_params.get(
5695 "skip_terminate_primitives"
5696 ) and vca.get("needed_terminate")
5697 task = asyncio.ensure_future(
5698 asyncio.wait_for(
5699 self.destroy_N2VC(
5700 logging_text,
5701 db_nslcmop,
5702 vca,
5703 config_descriptor,
5704 vca_index,
5705 destroy_ee=True,
5706 exec_primitives=exec_terminate_primitives,
5707 scaling_in=True,
5708 vca_id=vca_id,
5709 ),
5710 timeout=self.timeout_charm_delete,
5711 )
5712 )
5713 tasks_dict_info[task] = "Terminating VCA {}".format(
5714 vca.get("ee_id")
5715 )
5716 del vca_update[vca_index]
5717 del config_update[vca_index]
5718 # wait for pending tasks of terminate primitives
5719 if tasks_dict_info:
5720 self.logger.debug(
5721 logging_text
5722 + "Waiting for tasks {}".format(
5723 list(tasks_dict_info.keys())
5724 )
5725 )
5726 error_list = await self._wait_for_tasks(
5727 logging_text,
5728 tasks_dict_info,
5729 min(
5730 self.timeout_charm_delete, self.timeout_ns_terminate
5731 ),
5732 stage,
5733 nslcmop_id,
5734 )
5735 tasks_dict_info.clear()
5736 if error_list:
5737 raise LcmException("; ".join(error_list))
5738
5739 db_vca_and_config_update = {
5740 "_admin.deployed.VCA": vca_update,
5741 "configurationStatus": config_update,
5742 }
5743 self.update_db_2(
5744 "nsrs", db_nsr["_id"], db_vca_and_config_update
5745 )
5746 scale_process = None
5747 # SCALE-IN VCA - END
5748
5749 # SCALE RO - BEGIN
5750 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5751 scale_process = "RO"
5752 if self.ro_config.get("ng"):
5753 await self._scale_ng_ro(
5754 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5755 )
5756 scaling_info.pop("vdu-create", None)
5757 scaling_info.pop("vdu-delete", None)
5758
5759 scale_process = None
5760 # SCALE RO - END
5761
5762 # SCALE KDU - BEGIN
5763 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5764 scale_process = "KDU"
5765 await self._scale_kdu(
5766 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5767 )
5768 scaling_info.pop("kdu-create", None)
5769 scaling_info.pop("kdu-delete", None)
5770
5771 scale_process = None
5772 # SCALE KDU - END
5773
5774 if db_nsr_update:
5775 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5776
5777 # SCALE-UP VCA - BEGIN
5778 if vca_scaling_info:
5779 step = db_nslcmop_update[
5780 "detailed-status"
5781 ] = "Creating new execution environments"
5782 scale_process = "VCA"
5783 for vca_info in vca_scaling_info:
5784 if vca_info["type"] == "create":
5785 member_vnf_index = str(vca_info["member-vnf-index"])
5786 self.logger.debug(
5787 logging_text + "vdu info: {}".format(vca_info)
5788 )
5789 vnfd_id = db_vnfr["vnfd-ref"]
5790 if vca_info.get("osm_vdu_id"):
5791 vdu_index = int(vca_info["vdu_index"])
5792 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5793 if db_vnfr.get("additionalParamsForVnf"):
5794 deploy_params.update(
5795 parse_yaml_strings(
5796 db_vnfr["additionalParamsForVnf"].copy()
5797 )
5798 )
5799 descriptor_config = get_configuration(
5800 db_vnfd, db_vnfd["id"]
5801 )
5802 if descriptor_config:
5803 vdu_id = None
5804 vdu_name = None
5805 kdu_name = None
5806 self._deploy_n2vc(
5807 logging_text=logging_text
5808 + "member_vnf_index={} ".format(member_vnf_index),
5809 db_nsr=db_nsr,
5810 db_vnfr=db_vnfr,
5811 nslcmop_id=nslcmop_id,
5812 nsr_id=nsr_id,
5813 nsi_id=nsi_id,
5814 vnfd_id=vnfd_id,
5815 vdu_id=vdu_id,
5816 kdu_name=kdu_name,
5817 member_vnf_index=member_vnf_index,
5818 vdu_index=vdu_index,
5819 vdu_name=vdu_name,
5820 deploy_params=deploy_params,
5821 descriptor_config=descriptor_config,
5822 base_folder=base_folder,
5823 task_instantiation_info=tasks_dict_info,
5824 stage=stage,
5825 )
5826 vdu_id = vca_info["osm_vdu_id"]
5827 vdur = find_in_list(
5828 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5829 )
5830 descriptor_config = get_configuration(db_vnfd, vdu_id)
5831 if vdur.get("additionalParams"):
5832 deploy_params_vdu = parse_yaml_strings(
5833 vdur["additionalParams"]
5834 )
5835 else:
5836 deploy_params_vdu = deploy_params
5837 deploy_params_vdu["OSM"] = get_osm_params(
5838 db_vnfr, vdu_id, vdu_count_index=vdu_index
5839 )
5840 if descriptor_config:
5841 vdu_name = None
5842 kdu_name = None
5843 stage[
5844 1
5845 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5846 member_vnf_index, vdu_id, vdu_index
5847 )
5848 stage[2] = step = "Scaling out VCA"
5849 self._write_op_status(op_id=nslcmop_id, stage=stage)
5850 self._deploy_n2vc(
5851 logging_text=logging_text
5852 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5853 member_vnf_index, vdu_id, vdu_index
5854 ),
5855 db_nsr=db_nsr,
5856 db_vnfr=db_vnfr,
5857 nslcmop_id=nslcmop_id,
5858 nsr_id=nsr_id,
5859 nsi_id=nsi_id,
5860 vnfd_id=vnfd_id,
5861 vdu_id=vdu_id,
5862 kdu_name=kdu_name,
5863 member_vnf_index=member_vnf_index,
5864 vdu_index=vdu_index,
5865 vdu_name=vdu_name,
5866 deploy_params=deploy_params_vdu,
5867 descriptor_config=descriptor_config,
5868 base_folder=base_folder,
5869 task_instantiation_info=tasks_dict_info,
5870 stage=stage,
5871 )
5872 else:
5873 kdu_name = vca_info["osm_kdu_id"]
5874 descriptor_config = get_configuration(db_vnfd, kdu_name)
5875 if descriptor_config:
5876 vdu_id = None
5877 kdu_index = int(vca_info["kdu_index"])
5878 vdu_name = None
5879 kdur = next(
5880 x
5881 for x in db_vnfr["kdur"]
5882 if x["kdu-name"] == kdu_name
5883 )
5884 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5885 if kdur.get("additionalParams"):
5886 deploy_params_kdu = parse_yaml_strings(
5887 kdur["additionalParams"]
5888 )
5889
5890 self._deploy_n2vc(
5891 logging_text=logging_text,
5892 db_nsr=db_nsr,
5893 db_vnfr=db_vnfr,
5894 nslcmop_id=nslcmop_id,
5895 nsr_id=nsr_id,
5896 nsi_id=nsi_id,
5897 vnfd_id=vnfd_id,
5898 vdu_id=vdu_id,
5899 kdu_name=kdu_name,
5900 member_vnf_index=member_vnf_index,
5901 vdu_index=kdu_index,
5902 vdu_name=vdu_name,
5903 deploy_params=deploy_params_kdu,
5904 descriptor_config=descriptor_config,
5905 base_folder=base_folder,
5906 task_instantiation_info=tasks_dict_info,
5907 stage=stage,
5908 )
5909 # SCALE-UP VCA - END
5910 scale_process = None
5911
5912 # POST-SCALE BEGIN
5913 # execute primitive service POST-SCALING
5914 step = "Executing post-scale vnf-config-primitive"
5915 if scaling_descriptor.get("scaling-config-action"):
5916 for scaling_config_action in scaling_descriptor[
5917 "scaling-config-action"
5918 ]:
5919 if (
5920 scaling_config_action.get("trigger") == "post-scale-in"
5921 and scaling_type == "SCALE_IN"
5922 ) or (
5923 scaling_config_action.get("trigger") == "post-scale-out"
5924 and scaling_type == "SCALE_OUT"
5925 ):
5926 vnf_config_primitive = scaling_config_action[
5927 "vnf-config-primitive-name-ref"
5928 ]
5929 step = db_nslcmop_update[
5930 "detailed-status"
5931 ] = "executing post-scale scaling-config-action '{}'".format(
5932 vnf_config_primitive
5933 )
5934
5935 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5936 if db_vnfr.get("additionalParamsForVnf"):
5937 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5938
5939 # look for primitive
5940 for config_primitive in (
5941 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5942 ).get("config-primitive", ()):
5943 if config_primitive["name"] == vnf_config_primitive:
5944 break
5945 else:
5946 raise LcmException(
5947 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5948 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5949 "config-primitive".format(
5950 scaling_group, vnf_config_primitive
5951 )
5952 )
5953 scale_process = "VCA"
5954 db_nsr_update["config-status"] = "configuring post-scaling"
5955 primitive_params = self._map_primitive_params(
5956 config_primitive, {}, vnfr_params
5957 )
5958
5959 # Post-scale retry check: Check if this sub-operation has been executed before
5960 op_index = self._check_or_add_scale_suboperation(
5961 db_nslcmop,
5962 vnf_index,
5963 vnf_config_primitive,
5964 primitive_params,
5965 "POST-SCALE",
5966 )
5967 if op_index == self.SUBOPERATION_STATUS_SKIP:
5968 # Skip sub-operation
5969 result = "COMPLETED"
5970 result_detail = "Done"
5971 self.logger.debug(
5972 logging_text
5973 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5974 vnf_config_primitive, result, result_detail
5975 )
5976 )
5977 else:
5978 if op_index == self.SUBOPERATION_STATUS_NEW:
5979 # New sub-operation: Get index of this sub-operation
5980 op_index = (
5981 len(db_nslcmop.get("_admin", {}).get("operations"))
5982 - 1
5983 )
5984 self.logger.debug(
5985 logging_text
5986 + "vnf_config_primitive={} New sub-operation".format(
5987 vnf_config_primitive
5988 )
5989 )
5990 else:
5991 # retry: Get registered params for this existing sub-operation
5992 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5993 op_index
5994 ]
5995 vnf_index = op.get("member_vnf_index")
5996 vnf_config_primitive = op.get("primitive")
5997 primitive_params = op.get("primitive_params")
5998 self.logger.debug(
5999 logging_text
6000 + "vnf_config_primitive={} Sub-operation retry".format(
6001 vnf_config_primitive
6002 )
6003 )
6004 # Execute the primitive, either with new (first-time) or registered (reintent) args
6005 ee_descriptor_id = config_primitive.get(
6006 "execution-environment-ref"
6007 )
6008 primitive_name = config_primitive.get(
6009 "execution-environment-primitive", vnf_config_primitive
6010 )
6011 ee_id, vca_type = self._look_for_deployed_vca(
6012 nsr_deployed["VCA"],
6013 member_vnf_index=vnf_index,
6014 vdu_id=None,
6015 vdu_count_index=None,
6016 ee_descriptor_id=ee_descriptor_id,
6017 )
6018 result, result_detail = await self._ns_execute_primitive(
6019 ee_id,
6020 primitive_name,
6021 primitive_params,
6022 vca_type=vca_type,
6023 vca_id=vca_id,
6024 )
6025 self.logger.debug(
6026 logging_text
6027 + "vnf_config_primitive={} Done with result {} {}".format(
6028 vnf_config_primitive, result, result_detail
6029 )
6030 )
6031 # Update operationState = COMPLETED | FAILED
6032 self._update_suboperation_status(
6033 db_nslcmop, op_index, result, result_detail
6034 )
6035
6036 if result == "FAILED":
6037 raise LcmException(result_detail)
6038 db_nsr_update["config-status"] = old_config_status
6039 scale_process = None
6040 # POST-SCALE END
6041
6042 db_nsr_update[
6043 "detailed-status"
6044 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6045 db_nsr_update["operational-status"] = (
6046 "running"
6047 if old_operational_status == "failed"
6048 else old_operational_status
6049 )
6050 db_nsr_update["config-status"] = old_config_status
6051 return
6052 except (
6053 ROclient.ROClientException,
6054 DbException,
6055 LcmException,
6056 NgRoException,
6057 ) as e:
6058 self.logger.error(logging_text + "Exit Exception {}".format(e))
6059 exc = e
6060 except asyncio.CancelledError:
6061 self.logger.error(
6062 logging_text + "Cancelled Exception while '{}'".format(step)
6063 )
6064 exc = "Operation was cancelled"
6065 except Exception as e:
6066 exc = traceback.format_exc()
6067 self.logger.critical(
6068 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6069 exc_info=True,
6070 )
6071 finally:
6072 self._write_ns_status(
6073 nsr_id=nsr_id,
6074 ns_state=None,
6075 current_operation="IDLE",
6076 current_operation_id=None,
6077 )
6078 if tasks_dict_info:
6079 stage[1] = "Waiting for instantiate pending tasks."
6080 self.logger.debug(logging_text + stage[1])
6081 exc = await self._wait_for_tasks(
6082 logging_text,
6083 tasks_dict_info,
6084 self.timeout_ns_deploy,
6085 stage,
6086 nslcmop_id,
6087 nsr_id=nsr_id,
6088 )
6089 if exc:
6090 db_nslcmop_update[
6091 "detailed-status"
6092 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6093 nslcmop_operation_state = "FAILED"
6094 if db_nsr:
6095 db_nsr_update["operational-status"] = old_operational_status
6096 db_nsr_update["config-status"] = old_config_status
6097 db_nsr_update["detailed-status"] = ""
6098 if scale_process:
6099 if "VCA" in scale_process:
6100 db_nsr_update["config-status"] = "failed"
6101 if "RO" in scale_process:
6102 db_nsr_update["operational-status"] = "failed"
6103 db_nsr_update[
6104 "detailed-status"
6105 ] = "FAILED scaling nslcmop={} {}: {}".format(
6106 nslcmop_id, step, exc
6107 )
6108 else:
6109 error_description_nslcmop = None
6110 nslcmop_operation_state = "COMPLETED"
6111 db_nslcmop_update["detailed-status"] = "Done"
6112
6113 self._write_op_status(
6114 op_id=nslcmop_id,
6115 stage="",
6116 error_message=error_description_nslcmop,
6117 operation_state=nslcmop_operation_state,
6118 other_update=db_nslcmop_update,
6119 )
6120 if db_nsr:
6121 self._write_ns_status(
6122 nsr_id=nsr_id,
6123 ns_state=None,
6124 current_operation="IDLE",
6125 current_operation_id=None,
6126 other_update=db_nsr_update,
6127 )
6128
6129 if nslcmop_operation_state:
6130 try:
6131 msg = {
6132 "nsr_id": nsr_id,
6133 "nslcmop_id": nslcmop_id,
6134 "operationState": nslcmop_operation_state,
6135 }
6136 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6137 except Exception as e:
6138 self.logger.error(
6139 logging_text + "kafka_write notification Exception {}".format(e)
6140 )
6141 self.logger.debug(logging_text + "Exit")
6142 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6143
6144 async def _scale_kdu(
6145 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6146 ):
6147 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6148 for kdu_name in _scaling_info:
6149 for kdu_scaling_info in _scaling_info[kdu_name]:
6150 deployed_kdu, index = get_deployed_kdu(
6151 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6152 )
6153 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6154 kdu_instance = deployed_kdu["kdu-instance"]
6155 scale = int(kdu_scaling_info["scale"])
6156 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6157
6158 db_dict = {
6159 "collection": "nsrs",
6160 "filter": {"_id": nsr_id},
6161 "path": "_admin.deployed.K8s.{}".format(index),
6162 }
6163
6164 step = "scaling application {}".format(
6165 kdu_scaling_info["resource-name"]
6166 )
6167 self.logger.debug(logging_text + step)
6168
6169 if kdu_scaling_info["type"] == "delete":
6170 kdu_config = get_configuration(db_vnfd, kdu_name)
6171 if (
6172 kdu_config
6173 and kdu_config.get("terminate-config-primitive")
6174 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6175 ):
6176 terminate_config_primitive_list = kdu_config.get(
6177 "terminate-config-primitive"
6178 )
6179 terminate_config_primitive_list.sort(
6180 key=lambda val: int(val["seq"])
6181 )
6182
6183 for (
6184 terminate_config_primitive
6185 ) in terminate_config_primitive_list:
6186 primitive_params_ = self._map_primitive_params(
6187 terminate_config_primitive, {}, {}
6188 )
6189 step = "execute terminate config primitive"
6190 self.logger.debug(logging_text + step)
6191 await asyncio.wait_for(
6192 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6193 cluster_uuid=cluster_uuid,
6194 kdu_instance=kdu_instance,
6195 primitive_name=terminate_config_primitive["name"],
6196 params=primitive_params_,
6197 db_dict=db_dict,
6198 vca_id=vca_id,
6199 ),
6200 timeout=600,
6201 )
6202
6203 await asyncio.wait_for(
6204 self.k8scluster_map[k8s_cluster_type].scale(
6205 kdu_instance,
6206 scale,
6207 kdu_scaling_info["resource-name"],
6208 vca_id=vca_id,
6209 ),
6210 timeout=self.timeout_vca_on_error,
6211 )
6212
6213 if kdu_scaling_info["type"] == "create":
6214 kdu_config = get_configuration(db_vnfd, kdu_name)
6215 if (
6216 kdu_config
6217 and kdu_config.get("initial-config-primitive")
6218 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6219 ):
6220 initial_config_primitive_list = kdu_config.get(
6221 "initial-config-primitive"
6222 )
6223 initial_config_primitive_list.sort(
6224 key=lambda val: int(val["seq"])
6225 )
6226
6227 for initial_config_primitive in initial_config_primitive_list:
6228 primitive_params_ = self._map_primitive_params(
6229 initial_config_primitive, {}, {}
6230 )
6231 step = "execute initial config primitive"
6232 self.logger.debug(logging_text + step)
6233 await asyncio.wait_for(
6234 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6235 cluster_uuid=cluster_uuid,
6236 kdu_instance=kdu_instance,
6237 primitive_name=initial_config_primitive["name"],
6238 params=primitive_params_,
6239 db_dict=db_dict,
6240 vca_id=vca_id,
6241 ),
6242 timeout=600,
6243 )
6244
6245 async def _scale_ng_ro(
6246 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6247 ):
6248 nsr_id = db_nslcmop["nsInstanceId"]
6249 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6250 db_vnfrs = {}
6251
6252 # read from db: vnfd's for every vnf
6253 db_vnfds = []
6254
6255 # for each vnf in ns, read vnfd
6256 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6257 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6258 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6259 # if we haven't this vnfd, read it from db
6260 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6261 # read from db
6262 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6263 db_vnfds.append(vnfd)
6264 n2vc_key = self.n2vc.get_public_key()
6265 n2vc_key_list = [n2vc_key]
6266 self.scale_vnfr(
6267 db_vnfr,
6268 vdu_scaling_info.get("vdu-create"),
6269 vdu_scaling_info.get("vdu-delete"),
6270 mark_delete=True,
6271 )
6272 # db_vnfr has been updated, update db_vnfrs to use it
6273 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6274 await self._instantiate_ng_ro(
6275 logging_text,
6276 nsr_id,
6277 db_nsd,
6278 db_nsr,
6279 db_nslcmop,
6280 db_vnfrs,
6281 db_vnfds,
6282 n2vc_key_list,
6283 stage=stage,
6284 start_deploy=time(),
6285 timeout_ns_deploy=self.timeout_ns_deploy,
6286 )
6287 if vdu_scaling_info.get("vdu-delete"):
6288 self.scale_vnfr(
6289 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6290 )
6291
6292 async def add_prometheus_metrics(
6293 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6294 ):
6295 if not self.prometheus:
6296 return
6297 # look if exist a file called 'prometheus*.j2' and
6298 artifact_content = self.fs.dir_ls(artifact_path)
6299 job_file = next(
6300 (
6301 f
6302 for f in artifact_content
6303 if f.startswith("prometheus") and f.endswith(".j2")
6304 ),
6305 None,
6306 )
6307 if not job_file:
6308 return
6309 with self.fs.file_open((artifact_path, job_file), "r") as f:
6310 job_data = f.read()
6311
6312 # TODO get_service
6313 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6314 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6315 host_port = "80"
6316 vnfr_id = vnfr_id.replace("-", "")
6317 variables = {
6318 "JOB_NAME": vnfr_id,
6319 "TARGET_IP": target_ip,
6320 "EXPORTER_POD_IP": host_name,
6321 "EXPORTER_POD_PORT": host_port,
6322 }
6323 job_list = self.prometheus.parse_job(job_data, variables)
6324 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6325 for job in job_list:
6326 if (
6327 not isinstance(job.get("job_name"), str)
6328 or vnfr_id not in job["job_name"]
6329 ):
6330 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6331 job["nsr_id"] = nsr_id
6332 job_dict = {jl["job_name"]: jl for jl in job_list}
6333 if await self.prometheus.update(job_dict):
6334 return list(job_dict.keys())
6335
6336 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6337 """
6338 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6339
6340 :param: vim_account_id: VIM Account ID
6341
6342 :return: (cloud_name, cloud_credential)
6343 """
6344 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6345 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6346
6347 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6348 """
6349 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6350
6351 :param: vim_account_id: VIM Account ID
6352
6353 :return: (cloud_name, cloud_credential)
6354 """
6355 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6356 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")