Fix bug 1762: support entities in relations for backwards compatibility
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99
100 from copy import copy, deepcopy
101 from time import time
102 from uuid import uuid4
103
104 from random import randint
105
106 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
107
108
109 class NsLcm(LcmBase):
110 timeout_vca_on_error = (
111 5 * 60
112 ) # Time for charm from first time at blocked,error status to mark as failed
113 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
114 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
115 timeout_charm_delete = 10 * 60
116 timeout_primitive = 30 * 60 # timeout for primitive execution
117 timeout_progress_primitive = (
118 10 * 60
119 ) # timeout for some progress in a primitive execution
120
121 SUBOPERATION_STATUS_NOT_FOUND = -1
122 SUBOPERATION_STATUS_NEW = -2
123 SUBOPERATION_STATUS_SKIP = -3
124 task_name_deploy_vca = "Deploying VCA"
125
126 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
127 """
128 Init, Connect to database, filesystem storage, and messaging
129 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
130 :return: None
131 """
132 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
133
134 self.db = Database().instance.db
135 self.fs = Filesystem().instance.fs
136 self.loop = loop
137 self.lcm_tasks = lcm_tasks
138 self.timeout = config["timeout"]
139 self.ro_config = config["ro_config"]
140 self.ng_ro = config["ro_config"].get("ng")
141 self.vca_config = config["VCA"].copy()
142
143 # create N2VC connector
144 self.n2vc = N2VCJujuConnector(
145 log=self.logger,
146 loop=self.loop,
147 on_update_db=self._on_update_n2vc_db,
148 fs=self.fs,
149 db=self.db,
150 )
151
152 self.conn_helm_ee = LCMHelmConn(
153 log=self.logger,
154 loop=self.loop,
155 vca_config=self.vca_config,
156 on_update_db=self._on_update_n2vc_db,
157 )
158
159 self.k8sclusterhelm2 = K8sHelmConnector(
160 kubectl_command=self.vca_config.get("kubectlpath"),
161 helm_command=self.vca_config.get("helmpath"),
162 log=self.logger,
163 on_update_db=None,
164 fs=self.fs,
165 db=self.db,
166 )
167
168 self.k8sclusterhelm3 = K8sHelm3Connector(
169 kubectl_command=self.vca_config.get("kubectlpath"),
170 helm_command=self.vca_config.get("helm3path"),
171 fs=self.fs,
172 log=self.logger,
173 db=self.db,
174 on_update_db=None,
175 )
176
177 self.k8sclusterjuju = K8sJujuConnector(
178 kubectl_command=self.vca_config.get("kubectlpath"),
179 juju_command=self.vca_config.get("jujupath"),
180 log=self.logger,
181 loop=self.loop,
182 on_update_db=self._on_update_k8s_db,
183 fs=self.fs,
184 db=self.db,
185 )
186
187 self.k8scluster_map = {
188 "helm-chart": self.k8sclusterhelm2,
189 "helm-chart-v3": self.k8sclusterhelm3,
190 "chart": self.k8sclusterhelm3,
191 "juju-bundle": self.k8sclusterjuju,
192 "juju": self.k8sclusterjuju,
193 }
194
195 self.vca_map = {
196 "lxc_proxy_charm": self.n2vc,
197 "native_charm": self.n2vc,
198 "k8s_proxy_charm": self.n2vc,
199 "helm": self.conn_helm_ee,
200 "helm-v3": self.conn_helm_ee,
201 }
202
203 self.prometheus = prometheus
204
205 # create RO client
206 self.RO = NgRoClient(self.loop, **self.ro_config)
207
208 @staticmethod
209 def increment_ip_mac(ip_mac, vm_index=1):
210 if not isinstance(ip_mac, str):
211 return ip_mac
212 try:
213 # try with ipv4 look for last dot
214 i = ip_mac.rfind(".")
215 if i > 0:
216 i += 1
217 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
218 # try with ipv6 or mac look for last colon. Operate in hex
219 i = ip_mac.rfind(":")
220 if i > 0:
221 i += 1
222 # format in hex, len can be 2 for mac or 4 for ipv6
223 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
224 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
225 )
226 except Exception:
227 pass
228 return None
229
230 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
231
232 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
233
234 try:
235 # TODO filter RO descriptor fields...
236
237 # write to database
238 db_dict = dict()
239 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
240 db_dict["deploymentStatus"] = ro_descriptor
241 self.update_db_2("nsrs", nsrs_id, db_dict)
242
243 except Exception as e:
244 self.logger.warn(
245 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
246 )
247
248 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
249
250 # remove last dot from path (if exists)
251 if path.endswith("."):
252 path = path[:-1]
253
254 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
255 # .format(table, filter, path, updated_data))
256 try:
257
258 nsr_id = filter.get("_id")
259
260 # read ns record from database
261 nsr = self.db.get_one(table="nsrs", q_filter=filter)
262 current_ns_status = nsr.get("nsState")
263
264 # get vca status for NS
265 status_dict = await self.n2vc.get_status(
266 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
267 )
268
269 # vcaStatus
270 db_dict = dict()
271 db_dict["vcaStatus"] = status_dict
272 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
273
274 # update configurationStatus for this VCA
275 try:
276 vca_index = int(path[path.rfind(".") + 1 :])
277
278 vca_list = deep_get(
279 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
280 )
281 vca_status = vca_list[vca_index].get("status")
282
283 configuration_status_list = nsr.get("configurationStatus")
284 config_status = configuration_status_list[vca_index].get("status")
285
286 if config_status == "BROKEN" and vca_status != "failed":
287 db_dict["configurationStatus"][vca_index] = "READY"
288 elif config_status != "BROKEN" and vca_status == "failed":
289 db_dict["configurationStatus"][vca_index] = "BROKEN"
290 except Exception as e:
291 # not update configurationStatus
292 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
293
294 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
295 # if nsState = 'DEGRADED' check if all is OK
296 is_degraded = False
297 if current_ns_status in ("READY", "DEGRADED"):
298 error_description = ""
299 # check machines
300 if status_dict.get("machines"):
301 for machine_id in status_dict.get("machines"):
302 machine = status_dict.get("machines").get(machine_id)
303 # check machine agent-status
304 if machine.get("agent-status"):
305 s = machine.get("agent-status").get("status")
306 if s != "started":
307 is_degraded = True
308 error_description += (
309 "machine {} agent-status={} ; ".format(
310 machine_id, s
311 )
312 )
313 # check machine instance status
314 if machine.get("instance-status"):
315 s = machine.get("instance-status").get("status")
316 if s != "running":
317 is_degraded = True
318 error_description += (
319 "machine {} instance-status={} ; ".format(
320 machine_id, s
321 )
322 )
323 # check applications
324 if status_dict.get("applications"):
325 for app_id in status_dict.get("applications"):
326 app = status_dict.get("applications").get(app_id)
327 # check application status
328 if app.get("status"):
329 s = app.get("status").get("status")
330 if s != "active":
331 is_degraded = True
332 error_description += (
333 "application {} status={} ; ".format(app_id, s)
334 )
335
336 if error_description:
337 db_dict["errorDescription"] = error_description
338 if current_ns_status == "READY" and is_degraded:
339 db_dict["nsState"] = "DEGRADED"
340 if current_ns_status == "DEGRADED" and not is_degraded:
341 db_dict["nsState"] = "READY"
342
343 # write to database
344 self.update_db_2("nsrs", nsr_id, db_dict)
345
346 except (asyncio.CancelledError, asyncio.TimeoutError):
347 raise
348 except Exception as e:
349 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
350
351 async def _on_update_k8s_db(
352 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
353 ):
354 """
355 Updating vca status in NSR record
356 :param cluster_uuid: UUID of a k8s cluster
357 :param kdu_instance: The unique name of the KDU instance
358 :param filter: To get nsr_id
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 try:
366 nsr_id = filter.get("_id")
367
368 # get vca status for NS
369 vca_status = await self.k8sclusterjuju.status_kdu(
370 cluster_uuid,
371 kdu_instance,
372 complete_status=True,
373 yaml_format=False,
374 vca_id=vca_id,
375 )
376 # vcaStatus
377 db_dict = dict()
378 db_dict["vcaStatus"] = {nsr_id: vca_status}
379
380 await self.k8sclusterjuju.update_vca_status(
381 db_dict["vcaStatus"],
382 kdu_instance,
383 vca_id=vca_id,
384 )
385
386 # write to database
387 self.update_db_2("nsrs", nsr_id, db_dict)
388
389 except (asyncio.CancelledError, asyncio.TimeoutError):
390 raise
391 except Exception as e:
392 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
393
394 @staticmethod
395 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
396 try:
397 env = Environment(undefined=StrictUndefined)
398 template = env.from_string(cloud_init_text)
399 return template.render(additional_params or {})
400 except UndefinedError as e:
401 raise LcmException(
402 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
403 "file, must be provided in the instantiation parameters inside the "
404 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
405 )
406 except (TemplateError, TemplateNotFound) as e:
407 raise LcmException(
408 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
409 vnfd_id, vdu_id, e
410 )
411 )
412
413 def _get_vdu_cloud_init_content(self, vdu, vnfd):
414 cloud_init_content = cloud_init_file = None
415 try:
416 if vdu.get("cloud-init-file"):
417 base_folder = vnfd["_admin"]["storage"]
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 with self.fs.file_open(cloud_init_file, "r") as ci_file:
424 cloud_init_content = ci_file.read()
425 elif vdu.get("cloud-init"):
426 cloud_init_content = vdu["cloud-init"]
427
428 return cloud_init_content
429 except FsException as e:
430 raise LcmException(
431 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
432 vnfd["id"], vdu["id"], cloud_init_file, e
433 )
434 )
435
436 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
437 vdur = next(
438 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
439 )
440 additional_params = vdur.get("additionalParams")
441 return parse_yaml_strings(additional_params)
442
443 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
444 """
445 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
446 :param vnfd: input vnfd
447 :param new_id: overrides vnf id if provided
448 :param additionalParams: Instantiation params for VNFs provided
449 :param nsrId: Id of the NSR
450 :return: copy of vnfd
451 """
452 vnfd_RO = deepcopy(vnfd)
453 # remove unused by RO configuration, monitoring, scaling and internal keys
454 vnfd_RO.pop("_id", None)
455 vnfd_RO.pop("_admin", None)
456 vnfd_RO.pop("monitoring-param", None)
457 vnfd_RO.pop("scaling-group-descriptor", None)
458 vnfd_RO.pop("kdu", None)
459 vnfd_RO.pop("k8s-cluster", None)
460 if new_id:
461 vnfd_RO["id"] = new_id
462
463 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
464 for vdu in get_iterable(vnfd_RO, "vdu"):
465 vdu.pop("cloud-init-file", None)
466 vdu.pop("cloud-init", None)
467 return vnfd_RO
468
469 @staticmethod
470 def ip_profile_2_RO(ip_profile):
471 RO_ip_profile = deepcopy(ip_profile)
472 if "dns-server" in RO_ip_profile:
473 if isinstance(RO_ip_profile["dns-server"], list):
474 RO_ip_profile["dns-address"] = []
475 for ds in RO_ip_profile.pop("dns-server"):
476 RO_ip_profile["dns-address"].append(ds["address"])
477 else:
478 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
479 if RO_ip_profile.get("ip-version") == "ipv4":
480 RO_ip_profile["ip-version"] = "IPv4"
481 if RO_ip_profile.get("ip-version") == "ipv6":
482 RO_ip_profile["ip-version"] = "IPv6"
483 if "dhcp-params" in RO_ip_profile:
484 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
485 return RO_ip_profile
486
487 def _get_ro_vim_id_for_vim_account(self, vim_account):
488 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
489 if db_vim["_admin"]["operationalState"] != "ENABLED":
490 raise LcmException(
491 "VIM={} is not available. operationalState={}".format(
492 vim_account, db_vim["_admin"]["operationalState"]
493 )
494 )
495 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
496 return RO_vim_id
497
498 def get_ro_wim_id_for_wim_account(self, wim_account):
499 if isinstance(wim_account, str):
500 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
501 if db_wim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "WIM={} is not available. operationalState={}".format(
504 wim_account, db_wim["_admin"]["operationalState"]
505 )
506 )
507 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
508 return RO_wim_id
509 else:
510 return wim_account
511
512 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
513
514 db_vdu_push_list = []
515 db_update = {"_admin.modified": time()}
516 if vdu_create:
517 for vdu_id, vdu_count in vdu_create.items():
518 vdur = next(
519 (
520 vdur
521 for vdur in reversed(db_vnfr["vdur"])
522 if vdur["vdu-id-ref"] == vdu_id
523 ),
524 None,
525 )
526 if not vdur:
527 raise LcmException(
528 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_id
530 )
531 )
532
533 for count in range(vdu_count):
534 vdur_copy = deepcopy(vdur)
535 vdur_copy["status"] = "BUILD"
536 vdur_copy["status-detailed"] = None
537 vdur_copy["ip-address"]: None
538 vdur_copy["_id"] = str(uuid4())
539 vdur_copy["count-index"] += count + 1
540 vdur_copy["id"] = "{}-{}".format(
541 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
542 )
543 vdur_copy.pop("vim_info", None)
544 for iface in vdur_copy["interfaces"]:
545 if iface.get("fixed-ip"):
546 iface["ip-address"] = self.increment_ip_mac(
547 iface["ip-address"], count + 1
548 )
549 else:
550 iface.pop("ip-address", None)
551 if iface.get("fixed-mac"):
552 iface["mac-address"] = self.increment_ip_mac(
553 iface["mac-address"], count + 1
554 )
555 else:
556 iface.pop("mac-address", None)
557 iface.pop(
558 "mgmt_vnf", None
559 ) # only first vdu can be managment of vnf
560 db_vdu_push_list.append(vdur_copy)
561 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
562 if vdu_delete:
563 for vdu_id, vdu_count in vdu_delete.items():
564 if mark_delete:
565 indexes_to_delete = [
566 iv[0]
567 for iv in enumerate(db_vnfr["vdur"])
568 if iv[1]["vdu-id-ref"] == vdu_id
569 ]
570 db_update.update(
571 {
572 "vdur.{}.status".format(i): "DELETING"
573 for i in indexes_to_delete[-vdu_count:]
574 }
575 )
576 else:
577 # it must be deleted one by one because common.db does not allow otherwise
578 vdus_to_delete = [
579 v
580 for v in reversed(db_vnfr["vdur"])
581 if v["vdu-id-ref"] == vdu_id
582 ]
583 for vdu in vdus_to_delete[:vdu_count]:
584 self.db.set_one(
585 "vnfrs",
586 {"_id": db_vnfr["_id"]},
587 None,
588 pull={"vdur": {"_id": vdu["_id"]}},
589 )
590 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
591 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
592 # modify passed dictionary db_vnfr
593 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
594 db_vnfr["vdur"] = db_vnfr_["vdur"]
595
596 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
597 """
598 Updates database nsr with the RO info for the created vld
599 :param ns_update_nsr: dictionary to be filled with the updated info
600 :param db_nsr: content of db_nsr. This is also modified
601 :param nsr_desc_RO: nsr descriptor from RO
602 :return: Nothing, LcmException is raised on errors
603 """
604
605 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
606 for net_RO in get_iterable(nsr_desc_RO, "nets"):
607 if vld["id"] != net_RO.get("ns_net_osm_id"):
608 continue
609 vld["vim-id"] = net_RO.get("vim_net_id")
610 vld["name"] = net_RO.get("vim_name")
611 vld["status"] = net_RO.get("status")
612 vld["status-detailed"] = net_RO.get("error_msg")
613 ns_update_nsr["vld.{}".format(vld_index)] = vld
614 break
615 else:
616 raise LcmException(
617 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
618 )
619
620 def set_vnfr_at_error(self, db_vnfrs, error_text):
621 try:
622 for db_vnfr in db_vnfrs.values():
623 vnfr_update = {"status": "ERROR"}
624 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
625 if "status" not in vdur:
626 vdur["status"] = "ERROR"
627 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
628 if error_text:
629 vdur["status-detailed"] = str(error_text)
630 vnfr_update[
631 "vdur.{}.status-detailed".format(vdu_index)
632 ] = "ERROR"
633 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
634 except DbException as e:
635 self.logger.error("Cannot update vnf. {}".format(e))
636
637 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
638 """
639 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
640 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
641 :param nsr_desc_RO: nsr descriptor from RO
642 :return: Nothing, LcmException is raised on errors
643 """
644 for vnf_index, db_vnfr in db_vnfrs.items():
645 for vnf_RO in nsr_desc_RO["vnfs"]:
646 if vnf_RO["member_vnf_index"] != vnf_index:
647 continue
648 vnfr_update = {}
649 if vnf_RO.get("ip_address"):
650 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
651 "ip_address"
652 ].split(";")[0]
653 elif not db_vnfr.get("ip-address"):
654 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
655 raise LcmExceptionNoMgmtIP(
656 "ns member_vnf_index '{}' has no IP address".format(
657 vnf_index
658 )
659 )
660
661 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
662 vdur_RO_count_index = 0
663 if vdur.get("pdu-type"):
664 continue
665 for vdur_RO in get_iterable(vnf_RO, "vms"):
666 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
667 continue
668 if vdur["count-index"] != vdur_RO_count_index:
669 vdur_RO_count_index += 1
670 continue
671 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
672 if vdur_RO.get("ip_address"):
673 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
674 else:
675 vdur["ip-address"] = None
676 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
677 vdur["name"] = vdur_RO.get("vim_name")
678 vdur["status"] = vdur_RO.get("status")
679 vdur["status-detailed"] = vdur_RO.get("error_msg")
680 for ifacer in get_iterable(vdur, "interfaces"):
681 for interface_RO in get_iterable(vdur_RO, "interfaces"):
682 if ifacer["name"] == interface_RO.get("internal_name"):
683 ifacer["ip-address"] = interface_RO.get(
684 "ip_address"
685 )
686 ifacer["mac-address"] = interface_RO.get(
687 "mac_address"
688 )
689 break
690 else:
691 raise LcmException(
692 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
693 "from VIM info".format(
694 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
695 )
696 )
697 vnfr_update["vdur.{}".format(vdu_index)] = vdur
698 break
699 else:
700 raise LcmException(
701 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
702 "VIM info".format(
703 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
704 )
705 )
706
707 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
708 for net_RO in get_iterable(nsr_desc_RO, "nets"):
709 if vld["id"] != net_RO.get("vnf_net_osm_id"):
710 continue
711 vld["vim-id"] = net_RO.get("vim_net_id")
712 vld["name"] = net_RO.get("vim_name")
713 vld["status"] = net_RO.get("status")
714 vld["status-detailed"] = net_RO.get("error_msg")
715 vnfr_update["vld.{}".format(vld_index)] = vld
716 break
717 else:
718 raise LcmException(
719 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
720 vnf_index, vld["id"]
721 )
722 )
723
724 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
725 break
726
727 else:
728 raise LcmException(
729 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
730 vnf_index
731 )
732 )
733
734 def _get_ns_config_info(self, nsr_id):
735 """
736 Generates a mapping between vnf,vdu elements and the N2VC id
737 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
738 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
739 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
740 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
741 """
742 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
743 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
744 mapping = {}
745 ns_config_info = {"osm-config-mapping": mapping}
746 for vca in vca_deployed_list:
747 if not vca["member-vnf-index"]:
748 continue
749 if not vca["vdu_id"]:
750 mapping[vca["member-vnf-index"]] = vca["application"]
751 else:
752 mapping[
753 "{}.{}.{}".format(
754 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
755 )
756 ] = vca["application"]
757 return ns_config_info
758
759 async def _instantiate_ng_ro(
760 self,
761 logging_text,
762 nsr_id,
763 nsd,
764 db_nsr,
765 db_nslcmop,
766 db_vnfrs,
767 db_vnfds,
768 n2vc_key_list,
769 stage,
770 start_deploy,
771 timeout_ns_deploy,
772 ):
773
774 db_vims = {}
775
776 def get_vim_account(vim_account_id):
777 nonlocal db_vims
778 if vim_account_id in db_vims:
779 return db_vims[vim_account_id]
780 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
781 db_vims[vim_account_id] = db_vim
782 return db_vim
783
784 # modify target_vld info with instantiation parameters
785 def parse_vld_instantiation_params(
786 target_vim, target_vld, vld_params, target_sdn
787 ):
788 if vld_params.get("ip-profile"):
789 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
790 "ip-profile"
791 ]
792 if vld_params.get("provider-network"):
793 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
794 "provider-network"
795 ]
796 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
797 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
798 "provider-network"
799 ]["sdn-ports"]
800 if vld_params.get("wimAccountId"):
801 target_wim = "wim:{}".format(vld_params["wimAccountId"])
802 target_vld["vim_info"][target_wim] = {}
803 for param in ("vim-network-name", "vim-network-id"):
804 if vld_params.get(param):
805 if isinstance(vld_params[param], dict):
806 for vim, vim_net in vld_params[param].items():
807 other_target_vim = "vim:" + vim
808 populate_dict(
809 target_vld["vim_info"],
810 (other_target_vim, param.replace("-", "_")),
811 vim_net,
812 )
813 else: # isinstance str
814 target_vld["vim_info"][target_vim][
815 param.replace("-", "_")
816 ] = vld_params[param]
817 if vld_params.get("common_id"):
818 target_vld["common_id"] = vld_params.get("common_id")
819
820 nslcmop_id = db_nslcmop["_id"]
821 target = {
822 "name": db_nsr["name"],
823 "ns": {"vld": []},
824 "vnf": [],
825 "image": deepcopy(db_nsr["image"]),
826 "flavor": deepcopy(db_nsr["flavor"]),
827 "action_id": nslcmop_id,
828 "cloud_init_content": {},
829 }
830 for image in target["image"]:
831 image["vim_info"] = {}
832 for flavor in target["flavor"]:
833 flavor["vim_info"] = {}
834
835 if db_nslcmop.get("lcmOperationType") != "instantiate":
836 # get parameters of instantiation:
837 db_nslcmop_instantiate = self.db.get_list(
838 "nslcmops",
839 {
840 "nsInstanceId": db_nslcmop["nsInstanceId"],
841 "lcmOperationType": "instantiate",
842 },
843 )[-1]
844 ns_params = db_nslcmop_instantiate.get("operationParams")
845 else:
846 ns_params = db_nslcmop.get("operationParams")
847 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
848 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
849
850 cp2target = {}
851 for vld_index, vld in enumerate(db_nsr.get("vld")):
852 target_vim = "vim:{}".format(ns_params["vimAccountId"])
853 target_vld = {
854 "id": vld["id"],
855 "name": vld["name"],
856 "mgmt-network": vld.get("mgmt-network", False),
857 "type": vld.get("type"),
858 "vim_info": {
859 target_vim: {
860 "vim_network_name": vld.get("vim-network-name"),
861 "vim_account_id": ns_params["vimAccountId"],
862 }
863 },
864 }
865 # check if this network needs SDN assist
866 if vld.get("pci-interfaces"):
867 db_vim = get_vim_account(ns_params["vimAccountId"])
868 sdnc_id = db_vim["config"].get("sdn-controller")
869 if sdnc_id:
870 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
871 target_sdn = "sdn:{}".format(sdnc_id)
872 target_vld["vim_info"][target_sdn] = {
873 "sdn": True,
874 "target_vim": target_vim,
875 "vlds": [sdn_vld],
876 "type": vld.get("type"),
877 }
878
879 nsd_vnf_profiles = get_vnf_profiles(nsd)
880 for nsd_vnf_profile in nsd_vnf_profiles:
881 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
882 if cp["virtual-link-profile-id"] == vld["id"]:
883 cp2target[
884 "member_vnf:{}.{}".format(
885 cp["constituent-cpd-id"][0][
886 "constituent-base-element-id"
887 ],
888 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
889 )
890 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
891
892 # check at nsd descriptor, if there is an ip-profile
893 vld_params = {}
894 nsd_vlp = find_in_list(
895 get_virtual_link_profiles(nsd),
896 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
897 == vld["id"],
898 )
899 if (
900 nsd_vlp
901 and nsd_vlp.get("virtual-link-protocol-data")
902 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
903 ):
904 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
905 "l3-protocol-data"
906 ]
907 ip_profile_dest_data = {}
908 if "ip-version" in ip_profile_source_data:
909 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
910 "ip-version"
911 ]
912 if "cidr" in ip_profile_source_data:
913 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
914 "cidr"
915 ]
916 if "gateway-ip" in ip_profile_source_data:
917 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
918 "gateway-ip"
919 ]
920 if "dhcp-enabled" in ip_profile_source_data:
921 ip_profile_dest_data["dhcp-params"] = {
922 "enabled": ip_profile_source_data["dhcp-enabled"]
923 }
924 vld_params["ip-profile"] = ip_profile_dest_data
925
926 # update vld_params with instantiation params
927 vld_instantiation_params = find_in_list(
928 get_iterable(ns_params, "vld"),
929 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
930 )
931 if vld_instantiation_params:
932 vld_params.update(vld_instantiation_params)
933 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
934 target["ns"]["vld"].append(target_vld)
935
936 for vnfr in db_vnfrs.values():
937 vnfd = find_in_list(
938 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
939 )
940 vnf_params = find_in_list(
941 get_iterable(ns_params, "vnf"),
942 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
943 )
944 target_vnf = deepcopy(vnfr)
945 target_vim = "vim:{}".format(vnfr["vim-account-id"])
946 for vld in target_vnf.get("vld", ()):
947 # check if connected to a ns.vld, to fill target'
948 vnf_cp = find_in_list(
949 vnfd.get("int-virtual-link-desc", ()),
950 lambda cpd: cpd.get("id") == vld["id"],
951 )
952 if vnf_cp:
953 ns_cp = "member_vnf:{}.{}".format(
954 vnfr["member-vnf-index-ref"], vnf_cp["id"]
955 )
956 if cp2target.get(ns_cp):
957 vld["target"] = cp2target[ns_cp]
958
959 vld["vim_info"] = {
960 target_vim: {"vim_network_name": vld.get("vim-network-name")}
961 }
962 # check if this network needs SDN assist
963 target_sdn = None
964 if vld.get("pci-interfaces"):
965 db_vim = get_vim_account(vnfr["vim-account-id"])
966 sdnc_id = db_vim["config"].get("sdn-controller")
967 if sdnc_id:
968 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
969 target_sdn = "sdn:{}".format(sdnc_id)
970 vld["vim_info"][target_sdn] = {
971 "sdn": True,
972 "target_vim": target_vim,
973 "vlds": [sdn_vld],
974 "type": vld.get("type"),
975 }
976
977 # check at vnfd descriptor, if there is an ip-profile
978 vld_params = {}
979 vnfd_vlp = find_in_list(
980 get_virtual_link_profiles(vnfd),
981 lambda a_link_profile: a_link_profile["id"] == vld["id"],
982 )
983 if (
984 vnfd_vlp
985 and vnfd_vlp.get("virtual-link-protocol-data")
986 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
987 ):
988 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
989 "l3-protocol-data"
990 ]
991 ip_profile_dest_data = {}
992 if "ip-version" in ip_profile_source_data:
993 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
994 "ip-version"
995 ]
996 if "cidr" in ip_profile_source_data:
997 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
998 "cidr"
999 ]
1000 if "gateway-ip" in ip_profile_source_data:
1001 ip_profile_dest_data[
1002 "gateway-address"
1003 ] = ip_profile_source_data["gateway-ip"]
1004 if "dhcp-enabled" in ip_profile_source_data:
1005 ip_profile_dest_data["dhcp-params"] = {
1006 "enabled": ip_profile_source_data["dhcp-enabled"]
1007 }
1008
1009 vld_params["ip-profile"] = ip_profile_dest_data
1010 # update vld_params with instantiation params
1011 if vnf_params:
1012 vld_instantiation_params = find_in_list(
1013 get_iterable(vnf_params, "internal-vld"),
1014 lambda i_vld: i_vld["name"] == vld["id"],
1015 )
1016 if vld_instantiation_params:
1017 vld_params.update(vld_instantiation_params)
1018 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1019
1020 vdur_list = []
1021 for vdur in target_vnf.get("vdur", ()):
1022 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1023 continue # This vdu must not be created
1024 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1025
1026 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1027
1028 if ssh_keys_all:
1029 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1030 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1031 if (
1032 vdu_configuration
1033 and vdu_configuration.get("config-access")
1034 and vdu_configuration.get("config-access").get("ssh-access")
1035 ):
1036 vdur["ssh-keys"] = ssh_keys_all
1037 vdur["ssh-access-required"] = vdu_configuration[
1038 "config-access"
1039 ]["ssh-access"]["required"]
1040 elif (
1041 vnf_configuration
1042 and vnf_configuration.get("config-access")
1043 and vnf_configuration.get("config-access").get("ssh-access")
1044 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1045 ):
1046 vdur["ssh-keys"] = ssh_keys_all
1047 vdur["ssh-access-required"] = vnf_configuration[
1048 "config-access"
1049 ]["ssh-access"]["required"]
1050 elif ssh_keys_instantiation and find_in_list(
1051 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1052 ):
1053 vdur["ssh-keys"] = ssh_keys_instantiation
1054
1055 self.logger.debug("NS > vdur > {}".format(vdur))
1056
1057 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1058 # cloud-init
1059 if vdud.get("cloud-init-file"):
1060 vdur["cloud-init"] = "{}:file:{}".format(
1061 vnfd["_id"], vdud.get("cloud-init-file")
1062 )
1063 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1064 if vdur["cloud-init"] not in target["cloud_init_content"]:
1065 base_folder = vnfd["_admin"]["storage"]
1066 cloud_init_file = "{}/{}/cloud_init/{}".format(
1067 base_folder["folder"],
1068 base_folder["pkg-dir"],
1069 vdud.get("cloud-init-file"),
1070 )
1071 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1072 target["cloud_init_content"][
1073 vdur["cloud-init"]
1074 ] = ci_file.read()
1075 elif vdud.get("cloud-init"):
1076 vdur["cloud-init"] = "{}:vdu:{}".format(
1077 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1078 )
1079 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1080 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1081 "cloud-init"
1082 ]
1083 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1084 deploy_params_vdu = self._format_additional_params(
1085 vdur.get("additionalParams") or {}
1086 )
1087 deploy_params_vdu["OSM"] = get_osm_params(
1088 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1089 )
1090 vdur["additionalParams"] = deploy_params_vdu
1091
1092 # flavor
1093 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1094 if target_vim not in ns_flavor["vim_info"]:
1095 ns_flavor["vim_info"][target_vim] = {}
1096
1097 # deal with images
1098 # in case alternative images are provided we must check if they should be applied
1099 # for the vim_type, modify the vim_type taking into account
1100 ns_image_id = int(vdur["ns-image-id"])
1101 if vdur.get("alt-image-ids"):
1102 db_vim = get_vim_account(vnfr["vim-account-id"])
1103 vim_type = db_vim["vim_type"]
1104 for alt_image_id in vdur.get("alt-image-ids"):
1105 ns_alt_image = target["image"][int(alt_image_id)]
1106 if vim_type == ns_alt_image.get("vim-type"):
1107 # must use alternative image
1108 self.logger.debug(
1109 "use alternative image id: {}".format(alt_image_id)
1110 )
1111 ns_image_id = alt_image_id
1112 vdur["ns-image-id"] = ns_image_id
1113 break
1114 ns_image = target["image"][int(ns_image_id)]
1115 if target_vim not in ns_image["vim_info"]:
1116 ns_image["vim_info"][target_vim] = {}
1117
1118 vdur["vim_info"] = {target_vim: {}}
1119 # instantiation parameters
1120 # if vnf_params:
1121 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1122 # vdud["id"]), None)
1123 vdur_list.append(vdur)
1124 target_vnf["vdur"] = vdur_list
1125 target["vnf"].append(target_vnf)
1126
1127 desc = await self.RO.deploy(nsr_id, target)
1128 self.logger.debug("RO return > {}".format(desc))
1129 action_id = desc["action_id"]
1130 await self._wait_ng_ro(
1131 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1132 )
1133
1134 # Updating NSR
1135 db_nsr_update = {
1136 "_admin.deployed.RO.operational-status": "running",
1137 "detailed-status": " ".join(stage),
1138 }
1139 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1141 self._write_op_status(nslcmop_id, stage)
1142 self.logger.debug(
1143 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1144 )
1145 return
1146
1147 async def _wait_ng_ro(
1148 self,
1149 nsr_id,
1150 action_id,
1151 nslcmop_id=None,
1152 start_time=None,
1153 timeout=600,
1154 stage=None,
1155 ):
1156 detailed_status_old = None
1157 db_nsr_update = {}
1158 start_time = start_time or time()
1159 while time() <= start_time + timeout:
1160 desc_status = await self.RO.status(nsr_id, action_id)
1161 self.logger.debug("Wait NG RO > {}".format(desc_status))
1162 if desc_status["status"] == "FAILED":
1163 raise NgRoException(desc_status["details"])
1164 elif desc_status["status"] == "BUILD":
1165 if stage:
1166 stage[2] = "VIM: ({})".format(desc_status["details"])
1167 elif desc_status["status"] == "DONE":
1168 if stage:
1169 stage[2] = "Deployed at VIM"
1170 break
1171 else:
1172 assert False, "ROclient.check_ns_status returns unknown {}".format(
1173 desc_status["status"]
1174 )
1175 if stage and nslcmop_id and stage[2] != detailed_status_old:
1176 detailed_status_old = stage[2]
1177 db_nsr_update["detailed-status"] = " ".join(stage)
1178 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1179 self._write_op_status(nslcmop_id, stage)
1180 await asyncio.sleep(15, loop=self.loop)
1181 else: # timeout_ns_deploy
1182 raise NgRoException("Timeout waiting ns to deploy")
1183
1184 async def _terminate_ng_ro(
1185 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1186 ):
1187 db_nsr_update = {}
1188 failed_detail = []
1189 action_id = None
1190 start_deploy = time()
1191 try:
1192 target = {
1193 "ns": {"vld": []},
1194 "vnf": [],
1195 "image": [],
1196 "flavor": [],
1197 "action_id": nslcmop_id,
1198 }
1199 desc = await self.RO.deploy(nsr_id, target)
1200 action_id = desc["action_id"]
1201 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1202 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1203 self.logger.debug(
1204 logging_text
1205 + "ns terminate action at RO. action_id={}".format(action_id)
1206 )
1207
1208 # wait until done
1209 delete_timeout = 20 * 60 # 20 minutes
1210 await self._wait_ng_ro(
1211 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1212 )
1213
1214 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1215 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1216 # delete all nsr
1217 await self.RO.delete(nsr_id)
1218 except Exception as e:
1219 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1220 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1221 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1222 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1223 self.logger.debug(
1224 logging_text + "RO_action_id={} already deleted".format(action_id)
1225 )
1226 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1227 failed_detail.append("delete conflict: {}".format(e))
1228 self.logger.debug(
1229 logging_text
1230 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1231 )
1232 else:
1233 failed_detail.append("delete error: {}".format(e))
1234 self.logger.error(
1235 logging_text
1236 + "RO_action_id={} delete error: {}".format(action_id, e)
1237 )
1238
1239 if failed_detail:
1240 stage[2] = "Error deleting from VIM"
1241 else:
1242 stage[2] = "Deleted from VIM"
1243 db_nsr_update["detailed-status"] = " ".join(stage)
1244 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1245 self._write_op_status(nslcmop_id, stage)
1246
1247 if failed_detail:
1248 raise LcmException("; ".join(failed_detail))
1249 return
1250
1251 async def instantiate_RO(
1252 self,
1253 logging_text,
1254 nsr_id,
1255 nsd,
1256 db_nsr,
1257 db_nslcmop,
1258 db_vnfrs,
1259 db_vnfds,
1260 n2vc_key_list,
1261 stage,
1262 ):
1263 """
1264 Instantiate at RO
1265 :param logging_text: preffix text to use at logging
1266 :param nsr_id: nsr identity
1267 :param nsd: database content of ns descriptor
1268 :param db_nsr: database content of ns record
1269 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1270 :param db_vnfrs:
1271 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1272 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1273 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1274 :return: None or exception
1275 """
1276 try:
1277 start_deploy = time()
1278 ns_params = db_nslcmop.get("operationParams")
1279 if ns_params and ns_params.get("timeout_ns_deploy"):
1280 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1281 else:
1282 timeout_ns_deploy = self.timeout.get(
1283 "ns_deploy", self.timeout_ns_deploy
1284 )
1285
1286 # Check for and optionally request placement optimization. Database will be updated if placement activated
1287 stage[2] = "Waiting for Placement."
1288 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1289 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1290 for vnfr in db_vnfrs.values():
1291 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1292 break
1293 else:
1294 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1295
1296 return await self._instantiate_ng_ro(
1297 logging_text,
1298 nsr_id,
1299 nsd,
1300 db_nsr,
1301 db_nslcmop,
1302 db_vnfrs,
1303 db_vnfds,
1304 n2vc_key_list,
1305 stage,
1306 start_deploy,
1307 timeout_ns_deploy,
1308 )
1309 except Exception as e:
1310 stage[2] = "ERROR deploying at VIM"
1311 self.set_vnfr_at_error(db_vnfrs, str(e))
1312 self.logger.error(
1313 "Error deploying at VIM {}".format(e),
1314 exc_info=not isinstance(
1315 e,
1316 (
1317 ROclient.ROClientException,
1318 LcmException,
1319 DbException,
1320 NgRoException,
1321 ),
1322 ),
1323 )
1324 raise
1325
1326 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1327 """
1328 Wait for kdu to be up, get ip address
1329 :param logging_text: prefix use for logging
1330 :param nsr_id:
1331 :param vnfr_id:
1332 :param kdu_name:
1333 :return: IP address
1334 """
1335
1336 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1337 nb_tries = 0
1338
1339 while nb_tries < 360:
1340 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1341 kdur = next(
1342 (
1343 x
1344 for x in get_iterable(db_vnfr, "kdur")
1345 if x.get("kdu-name") == kdu_name
1346 ),
1347 None,
1348 )
1349 if not kdur:
1350 raise LcmException(
1351 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1352 )
1353 if kdur.get("status"):
1354 if kdur["status"] in ("READY", "ENABLED"):
1355 return kdur.get("ip-address")
1356 else:
1357 raise LcmException(
1358 "target KDU={} is in error state".format(kdu_name)
1359 )
1360
1361 await asyncio.sleep(10, loop=self.loop)
1362 nb_tries += 1
1363 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1364
1365 async def wait_vm_up_insert_key_ro(
1366 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1367 ):
1368 """
1369 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1370 :param logging_text: prefix use for logging
1371 :param nsr_id:
1372 :param vnfr_id:
1373 :param vdu_id:
1374 :param vdu_index:
1375 :param pub_key: public ssh key to inject, None to skip
1376 :param user: user to apply the public ssh key
1377 :return: IP address
1378 """
1379
1380 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1381 ro_nsr_id = None
1382 ip_address = None
1383 nb_tries = 0
1384 target_vdu_id = None
1385 ro_retries = 0
1386
1387 while True:
1388
1389 ro_retries += 1
1390 if ro_retries >= 360: # 1 hour
1391 raise LcmException(
1392 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1393 )
1394
1395 await asyncio.sleep(10, loop=self.loop)
1396
1397 # get ip address
1398 if not target_vdu_id:
1399 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1400
1401 if not vdu_id: # for the VNF case
1402 if db_vnfr.get("status") == "ERROR":
1403 raise LcmException(
1404 "Cannot inject ssh-key because target VNF is in error state"
1405 )
1406 ip_address = db_vnfr.get("ip-address")
1407 if not ip_address:
1408 continue
1409 vdur = next(
1410 (
1411 x
1412 for x in get_iterable(db_vnfr, "vdur")
1413 if x.get("ip-address") == ip_address
1414 ),
1415 None,
1416 )
1417 else: # VDU case
1418 vdur = next(
1419 (
1420 x
1421 for x in get_iterable(db_vnfr, "vdur")
1422 if x.get("vdu-id-ref") == vdu_id
1423 and x.get("count-index") == vdu_index
1424 ),
1425 None,
1426 )
1427
1428 if (
1429 not vdur and len(db_vnfr.get("vdur", ())) == 1
1430 ): # If only one, this should be the target vdu
1431 vdur = db_vnfr["vdur"][0]
1432 if not vdur:
1433 raise LcmException(
1434 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1435 vnfr_id, vdu_id, vdu_index
1436 )
1437 )
1438 # New generation RO stores information at "vim_info"
1439 ng_ro_status = None
1440 target_vim = None
1441 if vdur.get("vim_info"):
1442 target_vim = next(
1443 t for t in vdur["vim_info"]
1444 ) # there should be only one key
1445 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1446 if (
1447 vdur.get("pdu-type")
1448 or vdur.get("status") == "ACTIVE"
1449 or ng_ro_status == "ACTIVE"
1450 ):
1451 ip_address = vdur.get("ip-address")
1452 if not ip_address:
1453 continue
1454 target_vdu_id = vdur["vdu-id-ref"]
1455 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1456 raise LcmException(
1457 "Cannot inject ssh-key because target VM is in error state"
1458 )
1459
1460 if not target_vdu_id:
1461 continue
1462
1463 # inject public key into machine
1464 if pub_key and user:
1465 self.logger.debug(logging_text + "Inserting RO key")
1466 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1467 if vdur.get("pdu-type"):
1468 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1469 return ip_address
1470 try:
1471 ro_vm_id = "{}-{}".format(
1472 db_vnfr["member-vnf-index-ref"], target_vdu_id
1473 ) # TODO add vdu_index
1474 if self.ng_ro:
1475 target = {
1476 "action": {
1477 "action": "inject_ssh_key",
1478 "key": pub_key,
1479 "user": user,
1480 },
1481 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1482 }
1483 desc = await self.RO.deploy(nsr_id, target)
1484 action_id = desc["action_id"]
1485 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1486 break
1487 else:
1488 # wait until NS is deployed at RO
1489 if not ro_nsr_id:
1490 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1491 ro_nsr_id = deep_get(
1492 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1493 )
1494 if not ro_nsr_id:
1495 continue
1496 result_dict = await self.RO.create_action(
1497 item="ns",
1498 item_id_name=ro_nsr_id,
1499 descriptor={
1500 "add_public_key": pub_key,
1501 "vms": [ro_vm_id],
1502 "user": user,
1503 },
1504 )
1505 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1506 if not result_dict or not isinstance(result_dict, dict):
1507 raise LcmException(
1508 "Unknown response from RO when injecting key"
1509 )
1510 for result in result_dict.values():
1511 if result.get("vim_result") == 200:
1512 break
1513 else:
1514 raise ROclient.ROClientException(
1515 "error injecting key: {}".format(
1516 result.get("description")
1517 )
1518 )
1519 break
1520 except NgRoException as e:
1521 raise LcmException(
1522 "Reaching max tries injecting key. Error: {}".format(e)
1523 )
1524 except ROclient.ROClientException as e:
1525 if not nb_tries:
1526 self.logger.debug(
1527 logging_text
1528 + "error injecting key: {}. Retrying until {} seconds".format(
1529 e, 20 * 10
1530 )
1531 )
1532 nb_tries += 1
1533 if nb_tries >= 20:
1534 raise LcmException(
1535 "Reaching max tries injecting key. Error: {}".format(e)
1536 )
1537 else:
1538 break
1539
1540 return ip_address
1541
1542 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1543 """
1544 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1545 """
1546 my_vca = vca_deployed_list[vca_index]
1547 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1548 # vdu or kdu: no dependencies
1549 return
1550 timeout = 300
1551 while timeout >= 0:
1552 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1553 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1554 configuration_status_list = db_nsr["configurationStatus"]
1555 for index, vca_deployed in enumerate(configuration_status_list):
1556 if index == vca_index:
1557 # myself
1558 continue
1559 if not my_vca.get("member-vnf-index") or (
1560 vca_deployed.get("member-vnf-index")
1561 == my_vca.get("member-vnf-index")
1562 ):
1563 internal_status = configuration_status_list[index].get("status")
1564 if internal_status == "READY":
1565 continue
1566 elif internal_status == "BROKEN":
1567 raise LcmException(
1568 "Configuration aborted because dependent charm/s has failed"
1569 )
1570 else:
1571 break
1572 else:
1573 # no dependencies, return
1574 return
1575 await asyncio.sleep(10)
1576 timeout -= 1
1577
1578 raise LcmException("Configuration aborted because dependent charm/s timeout")
1579
1580 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1581 vca_id = None
1582 if db_vnfr:
1583 vca_id = deep_get(db_vnfr, ("vca-id",))
1584 elif db_nsr:
1585 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1586 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1587 return vca_id
1588
1589 async def instantiate_N2VC(
1590 self,
1591 logging_text,
1592 vca_index,
1593 nsi_id,
1594 db_nsr,
1595 db_vnfr,
1596 vdu_id,
1597 kdu_name,
1598 vdu_index,
1599 config_descriptor,
1600 deploy_params,
1601 base_folder,
1602 nslcmop_id,
1603 stage,
1604 vca_type,
1605 vca_name,
1606 ee_config_descriptor,
1607 ):
1608 nsr_id = db_nsr["_id"]
1609 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1610 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1611 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1612 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1613 db_dict = {
1614 "collection": "nsrs",
1615 "filter": {"_id": nsr_id},
1616 "path": db_update_entry,
1617 }
1618 step = ""
1619 try:
1620
1621 element_type = "NS"
1622 element_under_configuration = nsr_id
1623
1624 vnfr_id = None
1625 if db_vnfr:
1626 vnfr_id = db_vnfr["_id"]
1627 osm_config["osm"]["vnf_id"] = vnfr_id
1628
1629 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1630
1631 if vca_type == "native_charm":
1632 index_number = 0
1633 else:
1634 index_number = vdu_index or 0
1635
1636 if vnfr_id:
1637 element_type = "VNF"
1638 element_under_configuration = vnfr_id
1639 namespace += ".{}-{}".format(vnfr_id, index_number)
1640 if vdu_id:
1641 namespace += ".{}-{}".format(vdu_id, index_number)
1642 element_type = "VDU"
1643 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1644 osm_config["osm"]["vdu_id"] = vdu_id
1645 elif kdu_name:
1646 namespace += ".{}".format(kdu_name)
1647 element_type = "KDU"
1648 element_under_configuration = kdu_name
1649 osm_config["osm"]["kdu_name"] = kdu_name
1650
1651 # Get artifact path
1652 artifact_path = "{}/{}/{}/{}".format(
1653 base_folder["folder"],
1654 base_folder["pkg-dir"],
1655 "charms"
1656 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1657 else "helm-charts",
1658 vca_name,
1659 )
1660
1661 self.logger.debug("Artifact path > {}".format(artifact_path))
1662
1663 # get initial_config_primitive_list that applies to this element
1664 initial_config_primitive_list = config_descriptor.get(
1665 "initial-config-primitive"
1666 )
1667
1668 self.logger.debug(
1669 "Initial config primitive list > {}".format(
1670 initial_config_primitive_list
1671 )
1672 )
1673
1674 # add config if not present for NS charm
1675 ee_descriptor_id = ee_config_descriptor.get("id")
1676 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1677 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1678 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1679 )
1680
1681 self.logger.debug(
1682 "Initial config primitive list #2 > {}".format(
1683 initial_config_primitive_list
1684 )
1685 )
1686 # n2vc_redesign STEP 3.1
1687 # find old ee_id if exists
1688 ee_id = vca_deployed.get("ee_id")
1689
1690 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1691 # create or register execution environment in VCA
1692 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1693
1694 self._write_configuration_status(
1695 nsr_id=nsr_id,
1696 vca_index=vca_index,
1697 status="CREATING",
1698 element_under_configuration=element_under_configuration,
1699 element_type=element_type,
1700 )
1701
1702 step = "create execution environment"
1703 self.logger.debug(logging_text + step)
1704
1705 ee_id = None
1706 credentials = None
1707 if vca_type == "k8s_proxy_charm":
1708 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1709 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1710 namespace=namespace,
1711 artifact_path=artifact_path,
1712 db_dict=db_dict,
1713 vca_id=vca_id,
1714 )
1715 elif vca_type == "helm" or vca_type == "helm-v3":
1716 ee_id, credentials = await self.vca_map[
1717 vca_type
1718 ].create_execution_environment(
1719 namespace=namespace,
1720 reuse_ee_id=ee_id,
1721 db_dict=db_dict,
1722 config=osm_config,
1723 artifact_path=artifact_path,
1724 vca_type=vca_type,
1725 )
1726 else:
1727 ee_id, credentials = await self.vca_map[
1728 vca_type
1729 ].create_execution_environment(
1730 namespace=namespace,
1731 reuse_ee_id=ee_id,
1732 db_dict=db_dict,
1733 vca_id=vca_id,
1734 )
1735
1736 elif vca_type == "native_charm":
1737 step = "Waiting to VM being up and getting IP address"
1738 self.logger.debug(logging_text + step)
1739 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1740 logging_text,
1741 nsr_id,
1742 vnfr_id,
1743 vdu_id,
1744 vdu_index,
1745 user=None,
1746 pub_key=None,
1747 )
1748 credentials = {"hostname": rw_mgmt_ip}
1749 # get username
1750 username = deep_get(
1751 config_descriptor, ("config-access", "ssh-access", "default-user")
1752 )
1753 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1754 # merged. Meanwhile let's get username from initial-config-primitive
1755 if not username and initial_config_primitive_list:
1756 for config_primitive in initial_config_primitive_list:
1757 for param in config_primitive.get("parameter", ()):
1758 if param["name"] == "ssh-username":
1759 username = param["value"]
1760 break
1761 if not username:
1762 raise LcmException(
1763 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1764 "'config-access.ssh-access.default-user'"
1765 )
1766 credentials["username"] = username
1767 # n2vc_redesign STEP 3.2
1768
1769 self._write_configuration_status(
1770 nsr_id=nsr_id,
1771 vca_index=vca_index,
1772 status="REGISTERING",
1773 element_under_configuration=element_under_configuration,
1774 element_type=element_type,
1775 )
1776
1777 step = "register execution environment {}".format(credentials)
1778 self.logger.debug(logging_text + step)
1779 ee_id = await self.vca_map[vca_type].register_execution_environment(
1780 credentials=credentials,
1781 namespace=namespace,
1782 db_dict=db_dict,
1783 vca_id=vca_id,
1784 )
1785
1786 # for compatibility with MON/POL modules, the need model and application name at database
1787 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1788 ee_id_parts = ee_id.split(".")
1789 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1790 if len(ee_id_parts) >= 2:
1791 model_name = ee_id_parts[0]
1792 application_name = ee_id_parts[1]
1793 db_nsr_update[db_update_entry + "model"] = model_name
1794 db_nsr_update[db_update_entry + "application"] = application_name
1795
1796 # n2vc_redesign STEP 3.3
1797 step = "Install configuration Software"
1798
1799 self._write_configuration_status(
1800 nsr_id=nsr_id,
1801 vca_index=vca_index,
1802 status="INSTALLING SW",
1803 element_under_configuration=element_under_configuration,
1804 element_type=element_type,
1805 other_update=db_nsr_update,
1806 )
1807
1808 # TODO check if already done
1809 self.logger.debug(logging_text + step)
1810 config = None
1811 if vca_type == "native_charm":
1812 config_primitive = next(
1813 (p for p in initial_config_primitive_list if p["name"] == "config"),
1814 None,
1815 )
1816 if config_primitive:
1817 config = self._map_primitive_params(
1818 config_primitive, {}, deploy_params
1819 )
1820 num_units = 1
1821 if vca_type == "lxc_proxy_charm":
1822 if element_type == "NS":
1823 num_units = db_nsr.get("config-units") or 1
1824 elif element_type == "VNF":
1825 num_units = db_vnfr.get("config-units") or 1
1826 elif element_type == "VDU":
1827 for v in db_vnfr["vdur"]:
1828 if vdu_id == v["vdu-id-ref"]:
1829 num_units = v.get("config-units") or 1
1830 break
1831 if vca_type != "k8s_proxy_charm":
1832 await self.vca_map[vca_type].install_configuration_sw(
1833 ee_id=ee_id,
1834 artifact_path=artifact_path,
1835 db_dict=db_dict,
1836 config=config,
1837 num_units=num_units,
1838 vca_id=vca_id,
1839 vca_type=vca_type,
1840 )
1841
1842 # write in db flag of configuration_sw already installed
1843 self.update_db_2(
1844 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1845 )
1846
1847 # add relations for this VCA (wait for other peers related with this VCA)
1848 await self._add_vca_relations(
1849 logging_text=logging_text,
1850 nsr_id=nsr_id,
1851 vca_type=vca_type,
1852 vca_index=vca_index,
1853 )
1854
1855 # if SSH access is required, then get execution environment SSH public
1856 # if native charm we have waited already to VM be UP
1857 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1858 pub_key = None
1859 user = None
1860 # self.logger.debug("get ssh key block")
1861 if deep_get(
1862 config_descriptor, ("config-access", "ssh-access", "required")
1863 ):
1864 # self.logger.debug("ssh key needed")
1865 # Needed to inject a ssh key
1866 user = deep_get(
1867 config_descriptor,
1868 ("config-access", "ssh-access", "default-user"),
1869 )
1870 step = "Install configuration Software, getting public ssh key"
1871 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1872 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1873 )
1874
1875 step = "Insert public key into VM user={} ssh_key={}".format(
1876 user, pub_key
1877 )
1878 else:
1879 # self.logger.debug("no need to get ssh key")
1880 step = "Waiting to VM being up and getting IP address"
1881 self.logger.debug(logging_text + step)
1882
1883 # n2vc_redesign STEP 5.1
1884 # wait for RO (ip-address) Insert pub_key into VM
1885 if vnfr_id:
1886 if kdu_name:
1887 rw_mgmt_ip = await self.wait_kdu_up(
1888 logging_text, nsr_id, vnfr_id, kdu_name
1889 )
1890 else:
1891 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1892 logging_text,
1893 nsr_id,
1894 vnfr_id,
1895 vdu_id,
1896 vdu_index,
1897 user=user,
1898 pub_key=pub_key,
1899 )
1900 else:
1901 rw_mgmt_ip = None # This is for a NS configuration
1902
1903 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1904
1905 # store rw_mgmt_ip in deploy params for later replacement
1906 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1907
1908 # n2vc_redesign STEP 6 Execute initial config primitive
1909 step = "execute initial config primitive"
1910
1911 # wait for dependent primitives execution (NS -> VNF -> VDU)
1912 if initial_config_primitive_list:
1913 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1914
1915 # stage, in function of element type: vdu, kdu, vnf or ns
1916 my_vca = vca_deployed_list[vca_index]
1917 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1918 # VDU or KDU
1919 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1920 elif my_vca.get("member-vnf-index"):
1921 # VNF
1922 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1923 else:
1924 # NS
1925 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1926
1927 self._write_configuration_status(
1928 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1929 )
1930
1931 self._write_op_status(op_id=nslcmop_id, stage=stage)
1932
1933 check_if_terminated_needed = True
1934 for initial_config_primitive in initial_config_primitive_list:
1935 # adding information on the vca_deployed if it is a NS execution environment
1936 if not vca_deployed["member-vnf-index"]:
1937 deploy_params["ns_config_info"] = json.dumps(
1938 self._get_ns_config_info(nsr_id)
1939 )
1940 # TODO check if already done
1941 primitive_params_ = self._map_primitive_params(
1942 initial_config_primitive, {}, deploy_params
1943 )
1944
1945 step = "execute primitive '{}' params '{}'".format(
1946 initial_config_primitive["name"], primitive_params_
1947 )
1948 self.logger.debug(logging_text + step)
1949 await self.vca_map[vca_type].exec_primitive(
1950 ee_id=ee_id,
1951 primitive_name=initial_config_primitive["name"],
1952 params_dict=primitive_params_,
1953 db_dict=db_dict,
1954 vca_id=vca_id,
1955 vca_type=vca_type,
1956 )
1957 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1958 if check_if_terminated_needed:
1959 if config_descriptor.get("terminate-config-primitive"):
1960 self.update_db_2(
1961 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1962 )
1963 check_if_terminated_needed = False
1964
1965 # TODO register in database that primitive is done
1966
1967 # STEP 7 Configure metrics
1968 if vca_type == "helm" or vca_type == "helm-v3":
1969 prometheus_jobs = await self.add_prometheus_metrics(
1970 ee_id=ee_id,
1971 artifact_path=artifact_path,
1972 ee_config_descriptor=ee_config_descriptor,
1973 vnfr_id=vnfr_id,
1974 nsr_id=nsr_id,
1975 target_ip=rw_mgmt_ip,
1976 )
1977 if prometheus_jobs:
1978 self.update_db_2(
1979 "nsrs",
1980 nsr_id,
1981 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1982 )
1983
1984 step = "instantiated at VCA"
1985 self.logger.debug(logging_text + step)
1986
1987 self._write_configuration_status(
1988 nsr_id=nsr_id, vca_index=vca_index, status="READY"
1989 )
1990
1991 except Exception as e: # TODO not use Exception but N2VC exception
1992 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1993 if not isinstance(
1994 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
1995 ):
1996 self.logger.error(
1997 "Exception while {} : {}".format(step, e), exc_info=True
1998 )
1999 self._write_configuration_status(
2000 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2001 )
2002 raise LcmException("{} {}".format(step, e)) from e
2003
2004 def _write_ns_status(
2005 self,
2006 nsr_id: str,
2007 ns_state: str,
2008 current_operation: str,
2009 current_operation_id: str,
2010 error_description: str = None,
2011 error_detail: str = None,
2012 other_update: dict = None,
2013 ):
2014 """
2015 Update db_nsr fields.
2016 :param nsr_id:
2017 :param ns_state:
2018 :param current_operation:
2019 :param current_operation_id:
2020 :param error_description:
2021 :param error_detail:
2022 :param other_update: Other required changes at database if provided, will be cleared
2023 :return:
2024 """
2025 try:
2026 db_dict = other_update or {}
2027 db_dict[
2028 "_admin.nslcmop"
2029 ] = current_operation_id # for backward compatibility
2030 db_dict["_admin.current-operation"] = current_operation_id
2031 db_dict["_admin.operation-type"] = (
2032 current_operation if current_operation != "IDLE" else None
2033 )
2034 db_dict["currentOperation"] = current_operation
2035 db_dict["currentOperationID"] = current_operation_id
2036 db_dict["errorDescription"] = error_description
2037 db_dict["errorDetail"] = error_detail
2038
2039 if ns_state:
2040 db_dict["nsState"] = ns_state
2041 self.update_db_2("nsrs", nsr_id, db_dict)
2042 except DbException as e:
2043 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2044
2045 def _write_op_status(
2046 self,
2047 op_id: str,
2048 stage: list = None,
2049 error_message: str = None,
2050 queuePosition: int = 0,
2051 operation_state: str = None,
2052 other_update: dict = None,
2053 ):
2054 try:
2055 db_dict = other_update or {}
2056 db_dict["queuePosition"] = queuePosition
2057 if isinstance(stage, list):
2058 db_dict["stage"] = stage[0]
2059 db_dict["detailed-status"] = " ".join(stage)
2060 elif stage is not None:
2061 db_dict["stage"] = str(stage)
2062
2063 if error_message is not None:
2064 db_dict["errorMessage"] = error_message
2065 if operation_state is not None:
2066 db_dict["operationState"] = operation_state
2067 db_dict["statusEnteredTime"] = time()
2068 self.update_db_2("nslcmops", op_id, db_dict)
2069 except DbException as e:
2070 self.logger.warn(
2071 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2072 )
2073
2074 def _write_all_config_status(self, db_nsr: dict, status: str):
2075 try:
2076 nsr_id = db_nsr["_id"]
2077 # configurationStatus
2078 config_status = db_nsr.get("configurationStatus")
2079 if config_status:
2080 db_nsr_update = {
2081 "configurationStatus.{}.status".format(index): status
2082 for index, v in enumerate(config_status)
2083 if v
2084 }
2085 # update status
2086 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2087
2088 except DbException as e:
2089 self.logger.warn(
2090 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2091 )
2092
2093 def _write_configuration_status(
2094 self,
2095 nsr_id: str,
2096 vca_index: int,
2097 status: str = None,
2098 element_under_configuration: str = None,
2099 element_type: str = None,
2100 other_update: dict = None,
2101 ):
2102
2103 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2104 # .format(vca_index, status))
2105
2106 try:
2107 db_path = "configurationStatus.{}.".format(vca_index)
2108 db_dict = other_update or {}
2109 if status:
2110 db_dict[db_path + "status"] = status
2111 if element_under_configuration:
2112 db_dict[
2113 db_path + "elementUnderConfiguration"
2114 ] = element_under_configuration
2115 if element_type:
2116 db_dict[db_path + "elementType"] = element_type
2117 self.update_db_2("nsrs", nsr_id, db_dict)
2118 except DbException as e:
2119 self.logger.warn(
2120 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2121 status, nsr_id, vca_index, e
2122 )
2123 )
2124
2125 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2126 """
2127 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2128 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2129 Database is used because the result can be obtained from a different LCM worker in case of HA.
2130 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2131 :param db_nslcmop: database content of nslcmop
2132 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2133 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2134 computed 'vim-account-id'
2135 """
2136 modified = False
2137 nslcmop_id = db_nslcmop["_id"]
2138 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2139 if placement_engine == "PLA":
2140 self.logger.debug(
2141 logging_text + "Invoke and wait for placement optimization"
2142 )
2143 await self.msg.aiowrite(
2144 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2145 )
2146 db_poll_interval = 5
2147 wait = db_poll_interval * 10
2148 pla_result = None
2149 while not pla_result and wait >= 0:
2150 await asyncio.sleep(db_poll_interval)
2151 wait -= db_poll_interval
2152 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2153 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2154
2155 if not pla_result:
2156 raise LcmException(
2157 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2158 )
2159
2160 for pla_vnf in pla_result["vnf"]:
2161 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2162 if not pla_vnf.get("vimAccountId") or not vnfr:
2163 continue
2164 modified = True
2165 self.db.set_one(
2166 "vnfrs",
2167 {"_id": vnfr["_id"]},
2168 {"vim-account-id": pla_vnf["vimAccountId"]},
2169 )
2170 # Modifies db_vnfrs
2171 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2172 return modified
2173
2174 def update_nsrs_with_pla_result(self, params):
2175 try:
2176 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2177 self.update_db_2(
2178 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2179 )
2180 except Exception as e:
2181 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2182
2183 async def instantiate(self, nsr_id, nslcmop_id):
2184 """
2185
2186 :param nsr_id: ns instance to deploy
2187 :param nslcmop_id: operation to run
2188 :return:
2189 """
2190
2191 # Try to lock HA task here
2192 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2193 if not task_is_locked_by_me:
2194 self.logger.debug(
2195 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2196 )
2197 return
2198
2199 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2200 self.logger.debug(logging_text + "Enter")
2201
2202 # get all needed from database
2203
2204 # database nsrs record
2205 db_nsr = None
2206
2207 # database nslcmops record
2208 db_nslcmop = None
2209
2210 # update operation on nsrs
2211 db_nsr_update = {}
2212 # update operation on nslcmops
2213 db_nslcmop_update = {}
2214
2215 nslcmop_operation_state = None
2216 db_vnfrs = {} # vnf's info indexed by member-index
2217 # n2vc_info = {}
2218 tasks_dict_info = {} # from task to info text
2219 exc = None
2220 error_list = []
2221 stage = [
2222 "Stage 1/5: preparation of the environment.",
2223 "Waiting for previous operations to terminate.",
2224 "",
2225 ]
2226 # ^ stage, step, VIM progress
2227 try:
2228 # wait for any previous tasks in process
2229 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2230
2231 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2232 stage[1] = "Reading from database."
2233 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2234 db_nsr_update["detailed-status"] = "creating"
2235 db_nsr_update["operational-status"] = "init"
2236 self._write_ns_status(
2237 nsr_id=nsr_id,
2238 ns_state="BUILDING",
2239 current_operation="INSTANTIATING",
2240 current_operation_id=nslcmop_id,
2241 other_update=db_nsr_update,
2242 )
2243 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2244
2245 # read from db: operation
2246 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2247 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2248 ns_params = db_nslcmop.get("operationParams")
2249 if ns_params and ns_params.get("timeout_ns_deploy"):
2250 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2251 else:
2252 timeout_ns_deploy = self.timeout.get(
2253 "ns_deploy", self.timeout_ns_deploy
2254 )
2255
2256 # read from db: ns
2257 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2258 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2259 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2260 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2261 self.fs.sync(db_nsr["nsd-id"])
2262 db_nsr["nsd"] = nsd
2263 # nsr_name = db_nsr["name"] # TODO short-name??
2264
2265 # read from db: vnf's of this ns
2266 stage[1] = "Getting vnfrs from db."
2267 self.logger.debug(logging_text + stage[1])
2268 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2269
2270 # read from db: vnfd's for every vnf
2271 db_vnfds = [] # every vnfd data
2272
2273 # for each vnf in ns, read vnfd
2274 for vnfr in db_vnfrs_list:
2275 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2276 vnfd_id = vnfr["vnfd-id"]
2277 vnfd_ref = vnfr["vnfd-ref"]
2278 self.fs.sync(vnfd_id)
2279
2280 # if we haven't this vnfd, read it from db
2281 if vnfd_id not in db_vnfds:
2282 # read from db
2283 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2284 vnfd_id, vnfd_ref
2285 )
2286 self.logger.debug(logging_text + stage[1])
2287 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2288
2289 # store vnfd
2290 db_vnfds.append(vnfd)
2291
2292 # Get or generates the _admin.deployed.VCA list
2293 vca_deployed_list = None
2294 if db_nsr["_admin"].get("deployed"):
2295 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2296 if vca_deployed_list is None:
2297 vca_deployed_list = []
2298 configuration_status_list = []
2299 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2300 db_nsr_update["configurationStatus"] = configuration_status_list
2301 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2302 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2303 elif isinstance(vca_deployed_list, dict):
2304 # maintain backward compatibility. Change a dict to list at database
2305 vca_deployed_list = list(vca_deployed_list.values())
2306 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2307 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2308
2309 if not isinstance(
2310 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2311 ):
2312 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2313 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2314
2315 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2316 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2317 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2318 self.db.set_list(
2319 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2320 )
2321
2322 # n2vc_redesign STEP 2 Deploy Network Scenario
2323 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2324 self._write_op_status(op_id=nslcmop_id, stage=stage)
2325
2326 stage[1] = "Deploying KDUs."
2327 # self.logger.debug(logging_text + "Before deploy_kdus")
2328 # Call to deploy_kdus in case exists the "vdu:kdu" param
2329 await self.deploy_kdus(
2330 logging_text=logging_text,
2331 nsr_id=nsr_id,
2332 nslcmop_id=nslcmop_id,
2333 db_vnfrs=db_vnfrs,
2334 db_vnfds=db_vnfds,
2335 task_instantiation_info=tasks_dict_info,
2336 )
2337
2338 stage[1] = "Getting VCA public key."
2339 # n2vc_redesign STEP 1 Get VCA public ssh-key
2340 # feature 1429. Add n2vc public key to needed VMs
2341 n2vc_key = self.n2vc.get_public_key()
2342 n2vc_key_list = [n2vc_key]
2343 if self.vca_config.get("public_key"):
2344 n2vc_key_list.append(self.vca_config["public_key"])
2345
2346 stage[1] = "Deploying NS at VIM."
2347 task_ro = asyncio.ensure_future(
2348 self.instantiate_RO(
2349 logging_text=logging_text,
2350 nsr_id=nsr_id,
2351 nsd=nsd,
2352 db_nsr=db_nsr,
2353 db_nslcmop=db_nslcmop,
2354 db_vnfrs=db_vnfrs,
2355 db_vnfds=db_vnfds,
2356 n2vc_key_list=n2vc_key_list,
2357 stage=stage,
2358 )
2359 )
2360 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2361 tasks_dict_info[task_ro] = "Deploying at VIM"
2362
2363 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2364 stage[1] = "Deploying Execution Environments."
2365 self.logger.debug(logging_text + stage[1])
2366
2367 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2368 for vnf_profile in get_vnf_profiles(nsd):
2369 vnfd_id = vnf_profile["vnfd-id"]
2370 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2371 member_vnf_index = str(vnf_profile["id"])
2372 db_vnfr = db_vnfrs[member_vnf_index]
2373 base_folder = vnfd["_admin"]["storage"]
2374 vdu_id = None
2375 vdu_index = 0
2376 vdu_name = None
2377 kdu_name = None
2378
2379 # Get additional parameters
2380 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2381 if db_vnfr.get("additionalParamsForVnf"):
2382 deploy_params.update(
2383 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2384 )
2385
2386 descriptor_config = get_configuration(vnfd, vnfd["id"])
2387 if descriptor_config:
2388 self._deploy_n2vc(
2389 logging_text=logging_text
2390 + "member_vnf_index={} ".format(member_vnf_index),
2391 db_nsr=db_nsr,
2392 db_vnfr=db_vnfr,
2393 nslcmop_id=nslcmop_id,
2394 nsr_id=nsr_id,
2395 nsi_id=nsi_id,
2396 vnfd_id=vnfd_id,
2397 vdu_id=vdu_id,
2398 kdu_name=kdu_name,
2399 member_vnf_index=member_vnf_index,
2400 vdu_index=vdu_index,
2401 vdu_name=vdu_name,
2402 deploy_params=deploy_params,
2403 descriptor_config=descriptor_config,
2404 base_folder=base_folder,
2405 task_instantiation_info=tasks_dict_info,
2406 stage=stage,
2407 )
2408
2409 # Deploy charms for each VDU that supports one.
2410 for vdud in get_vdu_list(vnfd):
2411 vdu_id = vdud["id"]
2412 descriptor_config = get_configuration(vnfd, vdu_id)
2413 vdur = find_in_list(
2414 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2415 )
2416
2417 if vdur.get("additionalParams"):
2418 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2419 else:
2420 deploy_params_vdu = deploy_params
2421 deploy_params_vdu["OSM"] = get_osm_params(
2422 db_vnfr, vdu_id, vdu_count_index=0
2423 )
2424 vdud_count = get_number_of_instances(vnfd, vdu_id)
2425
2426 self.logger.debug("VDUD > {}".format(vdud))
2427 self.logger.debug(
2428 "Descriptor config > {}".format(descriptor_config)
2429 )
2430 if descriptor_config:
2431 vdu_name = None
2432 kdu_name = None
2433 for vdu_index in range(vdud_count):
2434 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2435 self._deploy_n2vc(
2436 logging_text=logging_text
2437 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2438 member_vnf_index, vdu_id, vdu_index
2439 ),
2440 db_nsr=db_nsr,
2441 db_vnfr=db_vnfr,
2442 nslcmop_id=nslcmop_id,
2443 nsr_id=nsr_id,
2444 nsi_id=nsi_id,
2445 vnfd_id=vnfd_id,
2446 vdu_id=vdu_id,
2447 kdu_name=kdu_name,
2448 member_vnf_index=member_vnf_index,
2449 vdu_index=vdu_index,
2450 vdu_name=vdu_name,
2451 deploy_params=deploy_params_vdu,
2452 descriptor_config=descriptor_config,
2453 base_folder=base_folder,
2454 task_instantiation_info=tasks_dict_info,
2455 stage=stage,
2456 )
2457 for kdud in get_kdu_list(vnfd):
2458 kdu_name = kdud["name"]
2459 descriptor_config = get_configuration(vnfd, kdu_name)
2460 if descriptor_config:
2461 vdu_id = None
2462 vdu_index = 0
2463 vdu_name = None
2464 kdur = next(
2465 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2466 )
2467 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2468 if kdur.get("additionalParams"):
2469 deploy_params_kdu = parse_yaml_strings(
2470 kdur["additionalParams"]
2471 )
2472
2473 self._deploy_n2vc(
2474 logging_text=logging_text,
2475 db_nsr=db_nsr,
2476 db_vnfr=db_vnfr,
2477 nslcmop_id=nslcmop_id,
2478 nsr_id=nsr_id,
2479 nsi_id=nsi_id,
2480 vnfd_id=vnfd_id,
2481 vdu_id=vdu_id,
2482 kdu_name=kdu_name,
2483 member_vnf_index=member_vnf_index,
2484 vdu_index=vdu_index,
2485 vdu_name=vdu_name,
2486 deploy_params=deploy_params_kdu,
2487 descriptor_config=descriptor_config,
2488 base_folder=base_folder,
2489 task_instantiation_info=tasks_dict_info,
2490 stage=stage,
2491 )
2492
2493 # Check if this NS has a charm configuration
2494 descriptor_config = nsd.get("ns-configuration")
2495 if descriptor_config and descriptor_config.get("juju"):
2496 vnfd_id = None
2497 db_vnfr = None
2498 member_vnf_index = None
2499 vdu_id = None
2500 kdu_name = None
2501 vdu_index = 0
2502 vdu_name = None
2503
2504 # Get additional parameters
2505 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2506 if db_nsr.get("additionalParamsForNs"):
2507 deploy_params.update(
2508 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2509 )
2510 base_folder = nsd["_admin"]["storage"]
2511 self._deploy_n2vc(
2512 logging_text=logging_text,
2513 db_nsr=db_nsr,
2514 db_vnfr=db_vnfr,
2515 nslcmop_id=nslcmop_id,
2516 nsr_id=nsr_id,
2517 nsi_id=nsi_id,
2518 vnfd_id=vnfd_id,
2519 vdu_id=vdu_id,
2520 kdu_name=kdu_name,
2521 member_vnf_index=member_vnf_index,
2522 vdu_index=vdu_index,
2523 vdu_name=vdu_name,
2524 deploy_params=deploy_params,
2525 descriptor_config=descriptor_config,
2526 base_folder=base_folder,
2527 task_instantiation_info=tasks_dict_info,
2528 stage=stage,
2529 )
2530
2531 # rest of staff will be done at finally
2532
2533 except (
2534 ROclient.ROClientException,
2535 DbException,
2536 LcmException,
2537 N2VCException,
2538 ) as e:
2539 self.logger.error(
2540 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2541 )
2542 exc = e
2543 except asyncio.CancelledError:
2544 self.logger.error(
2545 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2546 )
2547 exc = "Operation was cancelled"
2548 except Exception as e:
2549 exc = traceback.format_exc()
2550 self.logger.critical(
2551 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2552 exc_info=True,
2553 )
2554 finally:
2555 if exc:
2556 error_list.append(str(exc))
2557 try:
2558 # wait for pending tasks
2559 if tasks_dict_info:
2560 stage[1] = "Waiting for instantiate pending tasks."
2561 self.logger.debug(logging_text + stage[1])
2562 error_list += await self._wait_for_tasks(
2563 logging_text,
2564 tasks_dict_info,
2565 timeout_ns_deploy,
2566 stage,
2567 nslcmop_id,
2568 nsr_id=nsr_id,
2569 )
2570 stage[1] = stage[2] = ""
2571 except asyncio.CancelledError:
2572 error_list.append("Cancelled")
2573 # TODO cancel all tasks
2574 except Exception as exc:
2575 error_list.append(str(exc))
2576
2577 # update operation-status
2578 db_nsr_update["operational-status"] = "running"
2579 # let's begin with VCA 'configured' status (later we can change it)
2580 db_nsr_update["config-status"] = "configured"
2581 for task, task_name in tasks_dict_info.items():
2582 if not task.done() or task.cancelled() or task.exception():
2583 if task_name.startswith(self.task_name_deploy_vca):
2584 # A N2VC task is pending
2585 db_nsr_update["config-status"] = "failed"
2586 else:
2587 # RO or KDU task is pending
2588 db_nsr_update["operational-status"] = "failed"
2589
2590 # update status at database
2591 if error_list:
2592 error_detail = ". ".join(error_list)
2593 self.logger.error(logging_text + error_detail)
2594 error_description_nslcmop = "{} Detail: {}".format(
2595 stage[0], error_detail
2596 )
2597 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2598 nslcmop_id, stage[0]
2599 )
2600
2601 db_nsr_update["detailed-status"] = (
2602 error_description_nsr + " Detail: " + error_detail
2603 )
2604 db_nslcmop_update["detailed-status"] = error_detail
2605 nslcmop_operation_state = "FAILED"
2606 ns_state = "BROKEN"
2607 else:
2608 error_detail = None
2609 error_description_nsr = error_description_nslcmop = None
2610 ns_state = "READY"
2611 db_nsr_update["detailed-status"] = "Done"
2612 db_nslcmop_update["detailed-status"] = "Done"
2613 nslcmop_operation_state = "COMPLETED"
2614
2615 if db_nsr:
2616 self._write_ns_status(
2617 nsr_id=nsr_id,
2618 ns_state=ns_state,
2619 current_operation="IDLE",
2620 current_operation_id=None,
2621 error_description=error_description_nsr,
2622 error_detail=error_detail,
2623 other_update=db_nsr_update,
2624 )
2625 self._write_op_status(
2626 op_id=nslcmop_id,
2627 stage="",
2628 error_message=error_description_nslcmop,
2629 operation_state=nslcmop_operation_state,
2630 other_update=db_nslcmop_update,
2631 )
2632
2633 if nslcmop_operation_state:
2634 try:
2635 await self.msg.aiowrite(
2636 "ns",
2637 "instantiated",
2638 {
2639 "nsr_id": nsr_id,
2640 "nslcmop_id": nslcmop_id,
2641 "operationState": nslcmop_operation_state,
2642 },
2643 loop=self.loop,
2644 )
2645 except Exception as e:
2646 self.logger.error(
2647 logging_text + "kafka_write notification Exception {}".format(e)
2648 )
2649
2650 self.logger.debug(logging_text + "Exit")
2651 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2652
2653 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2654 if vnfd_id not in cached_vnfds:
2655 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2656 return cached_vnfds[vnfd_id]
2657
2658 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2659 if vnf_profile_id not in cached_vnfrs:
2660 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2661 "vnfrs",
2662 {
2663 "member-vnf-index-ref": vnf_profile_id,
2664 "nsr-id-ref": nsr_id,
2665 },
2666 )
2667 return cached_vnfrs[vnf_profile_id]
2668
2669 def _is_deployed_vca_in_relation(
2670 self, vca: DeployedVCA, relation: Relation
2671 ) -> bool:
2672 found = False
2673 for endpoint in (relation.provider, relation.requirer):
2674 if endpoint["kdu-resource-profile-id"]:
2675 continue
2676 found = (
2677 vca.vnf_profile_id == endpoint.vnf_profile_id
2678 and vca.vdu_profile_id == endpoint.vdu_profile_id
2679 and vca.execution_environment_ref == endpoint.execution_environment_ref
2680 )
2681 if found:
2682 break
2683 return found
2684
2685 def _update_ee_relation_data_with_implicit_data(
2686 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2687 ):
2688 ee_relation_data = safe_get_ee_relation(
2689 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2690 )
2691 ee_relation_level = EELevel.get_level(ee_relation_data)
2692 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2693 "execution-environment-ref"
2694 ]:
2695 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2696 vnfd_id = vnf_profile["vnfd-id"]
2697 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2698 entity_id = (
2699 vnfd_id
2700 if ee_relation_level == EELevel.VNF
2701 else ee_relation_data["vdu-profile-id"]
2702 )
2703 ee = get_juju_ee_ref(db_vnfd, entity_id)
2704 if not ee:
2705 raise Exception(
2706 f"not execution environments found for ee_relation {ee_relation_data}"
2707 )
2708 ee_relation_data["execution-environment-ref"] = ee["id"]
2709 return ee_relation_data
2710
2711 def _get_ns_relations(
2712 self,
2713 nsr_id: str,
2714 nsd: Dict[str, Any],
2715 vca: DeployedVCA,
2716 cached_vnfds: Dict[str, Any],
2717 ) -> List[Relation]:
2718 relations = []
2719 db_ns_relations = get_ns_configuration_relation_list(nsd)
2720 for r in db_ns_relations:
2721 provider_dict = None
2722 requirer_dict = None
2723 if all(key in r for key in ("provider", "requirer")):
2724 provider_dict = r["provider"]
2725 requirer_dict = r["requirer"]
2726 elif "entities" in r:
2727 provider_id = r["entities"][0]["id"]
2728 provider_dict = {
2729 "nsr-id": nsr_id,
2730 "endpoint": r["entities"][0]["endpoint"],
2731 }
2732 if provider_id != nsd["id"]:
2733 provider_dict["vnf-profile-id"] = provider_id
2734 requirer_id = r["entities"][1]["id"]
2735 requirer_dict = {
2736 "nsr-id": nsr_id,
2737 "endpoint": r["entities"][1]["endpoint"],
2738 }
2739 if requirer_id != nsd["id"]:
2740 requirer_dict["vnf-profile-id"] = requirer_id
2741 else:
2742 raise Exception("provider/requirer or entities must be included in the relation.")
2743 relation_provider = self._update_ee_relation_data_with_implicit_data(
2744 nsr_id, nsd, provider_dict, cached_vnfds
2745 )
2746 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2747 nsr_id, nsd, requirer_dict, cached_vnfds
2748 )
2749 provider = EERelation(relation_provider)
2750 requirer = EERelation(relation_requirer)
2751 relation = Relation(r["name"], provider, requirer)
2752 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2753 if vca_in_relation:
2754 relations.append(relation)
2755 return relations
2756
2757 def _get_vnf_relations(
2758 self,
2759 nsr_id: str,
2760 nsd: Dict[str, Any],
2761 vca: DeployedVCA,
2762 cached_vnfds: Dict[str, Any],
2763 ) -> List[Relation]:
2764 relations = []
2765 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2766 vnf_profile_id = vnf_profile["id"]
2767 vnfd_id = vnf_profile["vnfd-id"]
2768 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2769 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2770 for r in db_vnf_relations:
2771 provider_dict = None
2772 requirer_dict = None
2773 if all(key in r for key in ("provider", "requirer")):
2774 provider_dict = r["provider"]
2775 requirer_dict = r["requirer"]
2776 elif "entities" in r:
2777 provider_id = r["entities"][0]["id"]
2778 provider_dict = {
2779 "nsr-id": nsr_id,
2780 "vnf-profile-id": vnf_profile_id,
2781 "endpoint": r["entities"][0]["endpoint"],
2782 }
2783 if provider_id != vnfd_id:
2784 provider_dict["vdu-profile-id"] = provider_id
2785 requirer_id = r["entities"][1]["id"]
2786 requirer_dict = {
2787 "nsr-id": nsr_id,
2788 "vnf-profile-id": vnf_profile_id,
2789 "endpoint": r["entities"][1]["endpoint"],
2790 }
2791 if requirer_id != vnfd_id:
2792 requirer_dict["vdu-profile-id"] = requirer_id
2793 else:
2794 raise Exception("provider/requirer or entities must be included in the relation.")
2795 relation_provider = self._update_ee_relation_data_with_implicit_data(
2796 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2797 )
2798 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2799 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2800 )
2801 provider = EERelation(relation_provider)
2802 requirer = EERelation(relation_requirer)
2803 relation = Relation(r["name"], provider, requirer)
2804 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2805 if vca_in_relation:
2806 relations.append(relation)
2807 return relations
2808
2809 def _get_kdu_resource_data(
2810 self,
2811 ee_relation: EERelation,
2812 db_nsr: Dict[str, Any],
2813 cached_vnfds: Dict[str, Any],
2814 ) -> DeployedK8sResource:
2815 nsd = get_nsd(db_nsr)
2816 vnf_profiles = get_vnf_profiles(nsd)
2817 vnfd_id = find_in_list(
2818 vnf_profiles,
2819 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2820 )["vnfd-id"]
2821 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2822 kdu_resource_profile = get_kdu_resource_profile(
2823 db_vnfd, ee_relation.kdu_resource_profile_id
2824 )
2825 kdu_name = kdu_resource_profile["kdu-name"]
2826 deployed_kdu, _ = get_deployed_kdu(
2827 db_nsr.get("_admin", ()).get("deployed", ()),
2828 kdu_name,
2829 ee_relation.vnf_profile_id,
2830 )
2831 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2832 return deployed_kdu
2833
2834 def _get_deployed_component(
2835 self,
2836 ee_relation: EERelation,
2837 db_nsr: Dict[str, Any],
2838 cached_vnfds: Dict[str, Any],
2839 ) -> DeployedComponent:
2840 nsr_id = db_nsr["_id"]
2841 deployed_component = None
2842 ee_level = EELevel.get_level(ee_relation)
2843 if ee_level == EELevel.NS:
2844 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2845 if vca:
2846 deployed_component = DeployedVCA(nsr_id, vca)
2847 elif ee_level == EELevel.VNF:
2848 vca = get_deployed_vca(
2849 db_nsr,
2850 {
2851 "vdu_id": None,
2852 "member-vnf-index": ee_relation.vnf_profile_id,
2853 "ee_descriptor_id": ee_relation.execution_environment_ref,
2854 },
2855 )
2856 if vca:
2857 deployed_component = DeployedVCA(nsr_id, vca)
2858 elif ee_level == EELevel.VDU:
2859 vca = get_deployed_vca(
2860 db_nsr,
2861 {
2862 "vdu_id": ee_relation.vdu_profile_id,
2863 "member-vnf-index": ee_relation.vnf_profile_id,
2864 "ee_descriptor_id": ee_relation.execution_environment_ref,
2865 },
2866 )
2867 if vca:
2868 deployed_component = DeployedVCA(nsr_id, vca)
2869 elif ee_level == EELevel.KDU:
2870 kdu_resource_data = self._get_kdu_resource_data(
2871 ee_relation, db_nsr, cached_vnfds
2872 )
2873 if kdu_resource_data:
2874 deployed_component = DeployedK8sResource(kdu_resource_data)
2875 return deployed_component
2876
2877 async def _add_relation(
2878 self,
2879 relation: Relation,
2880 vca_type: str,
2881 db_nsr: Dict[str, Any],
2882 cached_vnfds: Dict[str, Any],
2883 cached_vnfrs: Dict[str, Any],
2884 ) -> bool:
2885 deployed_provider = self._get_deployed_component(
2886 relation.provider, db_nsr, cached_vnfds
2887 )
2888 deployed_requirer = self._get_deployed_component(
2889 relation.requirer, db_nsr, cached_vnfds
2890 )
2891 if (
2892 deployed_provider
2893 and deployed_requirer
2894 and deployed_provider.config_sw_installed
2895 and deployed_requirer.config_sw_installed
2896 ):
2897 provider_db_vnfr = (
2898 self._get_vnfr(
2899 relation.provider.nsr_id,
2900 relation.provider.vnf_profile_id,
2901 cached_vnfrs,
2902 )
2903 if relation.provider.vnf_profile_id
2904 else None
2905 )
2906 requirer_db_vnfr = (
2907 self._get_vnfr(
2908 relation.requirer.nsr_id,
2909 relation.requirer.vnf_profile_id,
2910 cached_vnfrs,
2911 )
2912 if relation.requirer.vnf_profile_id
2913 else None
2914 )
2915 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
2916 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
2917 provider_relation_endpoint = RelationEndpoint(
2918 deployed_provider.ee_id,
2919 provider_vca_id,
2920 relation.provider.endpoint,
2921 )
2922 requirer_relation_endpoint = RelationEndpoint(
2923 deployed_requirer.ee_id,
2924 requirer_vca_id,
2925 relation.requirer.endpoint,
2926 )
2927 await self.vca_map[vca_type].add_relation(
2928 provider=provider_relation_endpoint,
2929 requirer=requirer_relation_endpoint,
2930 )
2931 # remove entry from relations list
2932 return True
2933 return False
2934
2935 async def _add_vca_relations(
2936 self,
2937 logging_text,
2938 nsr_id,
2939 vca_type: str,
2940 vca_index: int,
2941 timeout: int = 3600,
2942 ) -> bool:
2943
2944 # steps:
2945 # 1. find all relations for this VCA
2946 # 2. wait for other peers related
2947 # 3. add relations
2948
2949 try:
2950 # STEP 1: find all relations for this VCA
2951
2952 # read nsr record
2953 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2954 nsd = get_nsd(db_nsr)
2955
2956 # this VCA data
2957 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
2958 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
2959
2960 cached_vnfds = {}
2961 cached_vnfrs = {}
2962 relations = []
2963 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
2964 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
2965
2966 # if no relations, terminate
2967 if not relations:
2968 self.logger.debug(logging_text + " No relations")
2969 return True
2970
2971 self.logger.debug(logging_text + " adding relations {}".format(relations))
2972
2973 # add all relations
2974 start = time()
2975 while True:
2976 # check timeout
2977 now = time()
2978 if now - start >= timeout:
2979 self.logger.error(logging_text + " : timeout adding relations")
2980 return False
2981
2982 # reload nsr from database (we need to update record: _admin.deployed.VCA)
2983 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2984
2985 # for each relation, find the VCA's related
2986 for relation in relations.copy():
2987 added = await self._add_relation(
2988 relation,
2989 vca_type,
2990 db_nsr,
2991 cached_vnfds,
2992 cached_vnfrs,
2993 )
2994 if added:
2995 relations.remove(relation)
2996
2997 if not relations:
2998 self.logger.debug("Relations added")
2999 break
3000 await asyncio.sleep(5.0)
3001
3002 return True
3003
3004 except Exception as e:
3005 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3006 return False
3007
3008 async def _install_kdu(
3009 self,
3010 nsr_id: str,
3011 nsr_db_path: str,
3012 vnfr_data: dict,
3013 kdu_index: int,
3014 kdud: dict,
3015 vnfd: dict,
3016 k8s_instance_info: dict,
3017 k8params: dict = None,
3018 timeout: int = 600,
3019 vca_id: str = None,
3020 ):
3021
3022 try:
3023 k8sclustertype = k8s_instance_info["k8scluster-type"]
3024 # Instantiate kdu
3025 db_dict_install = {
3026 "collection": "nsrs",
3027 "filter": {"_id": nsr_id},
3028 "path": nsr_db_path,
3029 }
3030
3031 if k8s_instance_info.get("kdu-deployment-name"):
3032 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3033 else:
3034 kdu_instance = self.k8scluster_map[
3035 k8sclustertype
3036 ].generate_kdu_instance_name(
3037 db_dict=db_dict_install,
3038 kdu_model=k8s_instance_info["kdu-model"],
3039 kdu_name=k8s_instance_info["kdu-name"],
3040 )
3041 self.update_db_2(
3042 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3043 )
3044 await self.k8scluster_map[k8sclustertype].install(
3045 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3046 kdu_model=k8s_instance_info["kdu-model"],
3047 atomic=True,
3048 params=k8params,
3049 db_dict=db_dict_install,
3050 timeout=timeout,
3051 kdu_name=k8s_instance_info["kdu-name"],
3052 namespace=k8s_instance_info["namespace"],
3053 kdu_instance=kdu_instance,
3054 vca_id=vca_id,
3055 )
3056 self.update_db_2(
3057 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3058 )
3059
3060 # Obtain services to obtain management service ip
3061 services = await self.k8scluster_map[k8sclustertype].get_services(
3062 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3063 kdu_instance=kdu_instance,
3064 namespace=k8s_instance_info["namespace"],
3065 )
3066
3067 # Obtain management service info (if exists)
3068 vnfr_update_dict = {}
3069 kdu_config = get_configuration(vnfd, kdud["name"])
3070 if kdu_config:
3071 target_ee_list = kdu_config.get("execution-environment-list", [])
3072 else:
3073 target_ee_list = []
3074
3075 if services:
3076 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3077 mgmt_services = [
3078 service
3079 for service in kdud.get("service", [])
3080 if service.get("mgmt-service")
3081 ]
3082 for mgmt_service in mgmt_services:
3083 for service in services:
3084 if service["name"].startswith(mgmt_service["name"]):
3085 # Mgmt service found, Obtain service ip
3086 ip = service.get("external_ip", service.get("cluster_ip"))
3087 if isinstance(ip, list) and len(ip) == 1:
3088 ip = ip[0]
3089
3090 vnfr_update_dict[
3091 "kdur.{}.ip-address".format(kdu_index)
3092 ] = ip
3093
3094 # Check if must update also mgmt ip at the vnf
3095 service_external_cp = mgmt_service.get(
3096 "external-connection-point-ref"
3097 )
3098 if service_external_cp:
3099 if (
3100 deep_get(vnfd, ("mgmt-interface", "cp"))
3101 == service_external_cp
3102 ):
3103 vnfr_update_dict["ip-address"] = ip
3104
3105 if find_in_list(
3106 target_ee_list,
3107 lambda ee: ee.get(
3108 "external-connection-point-ref", ""
3109 )
3110 == service_external_cp,
3111 ):
3112 vnfr_update_dict[
3113 "kdur.{}.ip-address".format(kdu_index)
3114 ] = ip
3115 break
3116 else:
3117 self.logger.warn(
3118 "Mgmt service name: {} not found".format(
3119 mgmt_service["name"]
3120 )
3121 )
3122
3123 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3124 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3125
3126 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3127 if (
3128 kdu_config
3129 and kdu_config.get("initial-config-primitive")
3130 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3131 ):
3132 initial_config_primitive_list = kdu_config.get(
3133 "initial-config-primitive"
3134 )
3135 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3136
3137 for initial_config_primitive in initial_config_primitive_list:
3138 primitive_params_ = self._map_primitive_params(
3139 initial_config_primitive, {}, {}
3140 )
3141
3142 await asyncio.wait_for(
3143 self.k8scluster_map[k8sclustertype].exec_primitive(
3144 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3145 kdu_instance=kdu_instance,
3146 primitive_name=initial_config_primitive["name"],
3147 params=primitive_params_,
3148 db_dict=db_dict_install,
3149 vca_id=vca_id,
3150 ),
3151 timeout=timeout,
3152 )
3153
3154 except Exception as e:
3155 # Prepare update db with error and raise exception
3156 try:
3157 self.update_db_2(
3158 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3159 )
3160 self.update_db_2(
3161 "vnfrs",
3162 vnfr_data.get("_id"),
3163 {"kdur.{}.status".format(kdu_index): "ERROR"},
3164 )
3165 except Exception:
3166 # ignore to keep original exception
3167 pass
3168 # reraise original error
3169 raise
3170
3171 return kdu_instance
3172
3173 async def deploy_kdus(
3174 self,
3175 logging_text,
3176 nsr_id,
3177 nslcmop_id,
3178 db_vnfrs,
3179 db_vnfds,
3180 task_instantiation_info,
3181 ):
3182 # Launch kdus if present in the descriptor
3183
3184 k8scluster_id_2_uuic = {
3185 "helm-chart-v3": {},
3186 "helm-chart": {},
3187 "juju-bundle": {},
3188 }
3189
3190 async def _get_cluster_id(cluster_id, cluster_type):
3191 nonlocal k8scluster_id_2_uuic
3192 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3193 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3194
3195 # check if K8scluster is creating and wait look if previous tasks in process
3196 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3197 "k8scluster", cluster_id
3198 )
3199 if task_dependency:
3200 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3201 task_name, cluster_id
3202 )
3203 self.logger.debug(logging_text + text)
3204 await asyncio.wait(task_dependency, timeout=3600)
3205
3206 db_k8scluster = self.db.get_one(
3207 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3208 )
3209 if not db_k8scluster:
3210 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3211
3212 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3213 if not k8s_id:
3214 if cluster_type == "helm-chart-v3":
3215 try:
3216 # backward compatibility for existing clusters that have not been initialized for helm v3
3217 k8s_credentials = yaml.safe_dump(
3218 db_k8scluster.get("credentials")
3219 )
3220 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3221 k8s_credentials, reuse_cluster_uuid=cluster_id
3222 )
3223 db_k8scluster_update = {}
3224 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3225 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3226 db_k8scluster_update[
3227 "_admin.helm-chart-v3.created"
3228 ] = uninstall_sw
3229 db_k8scluster_update[
3230 "_admin.helm-chart-v3.operationalState"
3231 ] = "ENABLED"
3232 self.update_db_2(
3233 "k8sclusters", cluster_id, db_k8scluster_update
3234 )
3235 except Exception as e:
3236 self.logger.error(
3237 logging_text
3238 + "error initializing helm-v3 cluster: {}".format(str(e))
3239 )
3240 raise LcmException(
3241 "K8s cluster '{}' has not been initialized for '{}'".format(
3242 cluster_id, cluster_type
3243 )
3244 )
3245 else:
3246 raise LcmException(
3247 "K8s cluster '{}' has not been initialized for '{}'".format(
3248 cluster_id, cluster_type
3249 )
3250 )
3251 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3252 return k8s_id
3253
3254 logging_text += "Deploy kdus: "
3255 step = ""
3256 try:
3257 db_nsr_update = {"_admin.deployed.K8s": []}
3258 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3259
3260 index = 0
3261 updated_cluster_list = []
3262 updated_v3_cluster_list = []
3263
3264 for vnfr_data in db_vnfrs.values():
3265 vca_id = self.get_vca_id(vnfr_data, {})
3266 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3267 # Step 0: Prepare and set parameters
3268 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3269 vnfd_id = vnfr_data.get("vnfd-id")
3270 vnfd_with_id = find_in_list(
3271 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3272 )
3273 kdud = next(
3274 kdud
3275 for kdud in vnfd_with_id["kdu"]
3276 if kdud["name"] == kdur["kdu-name"]
3277 )
3278 namespace = kdur.get("k8s-namespace")
3279 kdu_deployment_name = kdur.get("kdu-deployment-name")
3280 if kdur.get("helm-chart"):
3281 kdumodel = kdur["helm-chart"]
3282 # Default version: helm3, if helm-version is v2 assign v2
3283 k8sclustertype = "helm-chart-v3"
3284 self.logger.debug("kdur: {}".format(kdur))
3285 if (
3286 kdur.get("helm-version")
3287 and kdur.get("helm-version") == "v2"
3288 ):
3289 k8sclustertype = "helm-chart"
3290 elif kdur.get("juju-bundle"):
3291 kdumodel = kdur["juju-bundle"]
3292 k8sclustertype = "juju-bundle"
3293 else:
3294 raise LcmException(
3295 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3296 "juju-bundle. Maybe an old NBI version is running".format(
3297 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3298 )
3299 )
3300 # check if kdumodel is a file and exists
3301 try:
3302 vnfd_with_id = find_in_list(
3303 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3304 )
3305 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3306 if storage and storage.get(
3307 "pkg-dir"
3308 ): # may be not present if vnfd has not artifacts
3309 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3310 filename = "{}/{}/{}s/{}".format(
3311 storage["folder"],
3312 storage["pkg-dir"],
3313 k8sclustertype,
3314 kdumodel,
3315 )
3316 if self.fs.file_exists(
3317 filename, mode="file"
3318 ) or self.fs.file_exists(filename, mode="dir"):
3319 kdumodel = self.fs.path + filename
3320 except (asyncio.TimeoutError, asyncio.CancelledError):
3321 raise
3322 except Exception: # it is not a file
3323 pass
3324
3325 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3326 step = "Synchronize repos for k8s cluster '{}'".format(
3327 k8s_cluster_id
3328 )
3329 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3330
3331 # Synchronize repos
3332 if (
3333 k8sclustertype == "helm-chart"
3334 and cluster_uuid not in updated_cluster_list
3335 ) or (
3336 k8sclustertype == "helm-chart-v3"
3337 and cluster_uuid not in updated_v3_cluster_list
3338 ):
3339 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3340 self.k8scluster_map[k8sclustertype].synchronize_repos(
3341 cluster_uuid=cluster_uuid
3342 )
3343 )
3344 if del_repo_list or added_repo_dict:
3345 if k8sclustertype == "helm-chart":
3346 unset = {
3347 "_admin.helm_charts_added." + item: None
3348 for item in del_repo_list
3349 }
3350 updated = {
3351 "_admin.helm_charts_added." + item: name
3352 for item, name in added_repo_dict.items()
3353 }
3354 updated_cluster_list.append(cluster_uuid)
3355 elif k8sclustertype == "helm-chart-v3":
3356 unset = {
3357 "_admin.helm_charts_v3_added." + item: None
3358 for item in del_repo_list
3359 }
3360 updated = {
3361 "_admin.helm_charts_v3_added." + item: name
3362 for item, name in added_repo_dict.items()
3363 }
3364 updated_v3_cluster_list.append(cluster_uuid)
3365 self.logger.debug(
3366 logging_text + "repos synchronized on k8s cluster "
3367 "'{}' to_delete: {}, to_add: {}".format(
3368 k8s_cluster_id, del_repo_list, added_repo_dict
3369 )
3370 )
3371 self.db.set_one(
3372 "k8sclusters",
3373 {"_id": k8s_cluster_id},
3374 updated,
3375 unset=unset,
3376 )
3377
3378 # Instantiate kdu
3379 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3380 vnfr_data["member-vnf-index-ref"],
3381 kdur["kdu-name"],
3382 k8s_cluster_id,
3383 )
3384 k8s_instance_info = {
3385 "kdu-instance": None,
3386 "k8scluster-uuid": cluster_uuid,
3387 "k8scluster-type": k8sclustertype,
3388 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3389 "kdu-name": kdur["kdu-name"],
3390 "kdu-model": kdumodel,
3391 "namespace": namespace,
3392 "kdu-deployment-name": kdu_deployment_name,
3393 }
3394 db_path = "_admin.deployed.K8s.{}".format(index)
3395 db_nsr_update[db_path] = k8s_instance_info
3396 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3397 vnfd_with_id = find_in_list(
3398 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3399 )
3400 task = asyncio.ensure_future(
3401 self._install_kdu(
3402 nsr_id,
3403 db_path,
3404 vnfr_data,
3405 kdu_index,
3406 kdud,
3407 vnfd_with_id,
3408 k8s_instance_info,
3409 k8params=desc_params,
3410 timeout=600,
3411 vca_id=vca_id,
3412 )
3413 )
3414 self.lcm_tasks.register(
3415 "ns",
3416 nsr_id,
3417 nslcmop_id,
3418 "instantiate_KDU-{}".format(index),
3419 task,
3420 )
3421 task_instantiation_info[task] = "Deploying KDU {}".format(
3422 kdur["kdu-name"]
3423 )
3424
3425 index += 1
3426
3427 except (LcmException, asyncio.CancelledError):
3428 raise
3429 except Exception as e:
3430 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3431 if isinstance(e, (N2VCException, DbException)):
3432 self.logger.error(logging_text + msg)
3433 else:
3434 self.logger.critical(logging_text + msg, exc_info=True)
3435 raise LcmException(msg)
3436 finally:
3437 if db_nsr_update:
3438 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3439
3440 def _deploy_n2vc(
3441 self,
3442 logging_text,
3443 db_nsr,
3444 db_vnfr,
3445 nslcmop_id,
3446 nsr_id,
3447 nsi_id,
3448 vnfd_id,
3449 vdu_id,
3450 kdu_name,
3451 member_vnf_index,
3452 vdu_index,
3453 vdu_name,
3454 deploy_params,
3455 descriptor_config,
3456 base_folder,
3457 task_instantiation_info,
3458 stage,
3459 ):
3460 # launch instantiate_N2VC in a asyncio task and register task object
3461 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3462 # if not found, create one entry and update database
3463 # fill db_nsr._admin.deployed.VCA.<index>
3464
3465 self.logger.debug(
3466 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3467 )
3468 if "execution-environment-list" in descriptor_config:
3469 ee_list = descriptor_config.get("execution-environment-list", [])
3470 elif "juju" in descriptor_config:
3471 ee_list = [descriptor_config] # ns charms
3472 else: # other types as script are not supported
3473 ee_list = []
3474
3475 for ee_item in ee_list:
3476 self.logger.debug(
3477 logging_text
3478 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3479 ee_item.get("juju"), ee_item.get("helm-chart")
3480 )
3481 )
3482 ee_descriptor_id = ee_item.get("id")
3483 if ee_item.get("juju"):
3484 vca_name = ee_item["juju"].get("charm")
3485 vca_type = (
3486 "lxc_proxy_charm"
3487 if ee_item["juju"].get("charm") is not None
3488 else "native_charm"
3489 )
3490 if ee_item["juju"].get("cloud") == "k8s":
3491 vca_type = "k8s_proxy_charm"
3492 elif ee_item["juju"].get("proxy") is False:
3493 vca_type = "native_charm"
3494 elif ee_item.get("helm-chart"):
3495 vca_name = ee_item["helm-chart"]
3496 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3497 vca_type = "helm"
3498 else:
3499 vca_type = "helm-v3"
3500 else:
3501 self.logger.debug(
3502 logging_text + "skipping non juju neither charm configuration"
3503 )
3504 continue
3505
3506 vca_index = -1
3507 for vca_index, vca_deployed in enumerate(
3508 db_nsr["_admin"]["deployed"]["VCA"]
3509 ):
3510 if not vca_deployed:
3511 continue
3512 if (
3513 vca_deployed.get("member-vnf-index") == member_vnf_index
3514 and vca_deployed.get("vdu_id") == vdu_id
3515 and vca_deployed.get("kdu_name") == kdu_name
3516 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3517 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3518 ):
3519 break
3520 else:
3521 # not found, create one.
3522 target = (
3523 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3524 )
3525 if vdu_id:
3526 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3527 elif kdu_name:
3528 target += "/kdu/{}".format(kdu_name)
3529 vca_deployed = {
3530 "target_element": target,
3531 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3532 "member-vnf-index": member_vnf_index,
3533 "vdu_id": vdu_id,
3534 "kdu_name": kdu_name,
3535 "vdu_count_index": vdu_index,
3536 "operational-status": "init", # TODO revise
3537 "detailed-status": "", # TODO revise
3538 "step": "initial-deploy", # TODO revise
3539 "vnfd_id": vnfd_id,
3540 "vdu_name": vdu_name,
3541 "type": vca_type,
3542 "ee_descriptor_id": ee_descriptor_id,
3543 }
3544 vca_index += 1
3545
3546 # create VCA and configurationStatus in db
3547 db_dict = {
3548 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3549 "configurationStatus.{}".format(vca_index): dict(),
3550 }
3551 self.update_db_2("nsrs", nsr_id, db_dict)
3552
3553 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3554
3555 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3556 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3557 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3558
3559 # Launch task
3560 task_n2vc = asyncio.ensure_future(
3561 self.instantiate_N2VC(
3562 logging_text=logging_text,
3563 vca_index=vca_index,
3564 nsi_id=nsi_id,
3565 db_nsr=db_nsr,
3566 db_vnfr=db_vnfr,
3567 vdu_id=vdu_id,
3568 kdu_name=kdu_name,
3569 vdu_index=vdu_index,
3570 deploy_params=deploy_params,
3571 config_descriptor=descriptor_config,
3572 base_folder=base_folder,
3573 nslcmop_id=nslcmop_id,
3574 stage=stage,
3575 vca_type=vca_type,
3576 vca_name=vca_name,
3577 ee_config_descriptor=ee_item,
3578 )
3579 )
3580 self.lcm_tasks.register(
3581 "ns",
3582 nsr_id,
3583 nslcmop_id,
3584 "instantiate_N2VC-{}".format(vca_index),
3585 task_n2vc,
3586 )
3587 task_instantiation_info[
3588 task_n2vc
3589 ] = self.task_name_deploy_vca + " {}.{}".format(
3590 member_vnf_index or "", vdu_id or ""
3591 )
3592
3593 @staticmethod
3594 def _create_nslcmop(nsr_id, operation, params):
3595 """
3596 Creates a ns-lcm-opp content to be stored at database.
3597 :param nsr_id: internal id of the instance
3598 :param operation: instantiate, terminate, scale, action, ...
3599 :param params: user parameters for the operation
3600 :return: dictionary following SOL005 format
3601 """
3602 # Raise exception if invalid arguments
3603 if not (nsr_id and operation and params):
3604 raise LcmException(
3605 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3606 )
3607 now = time()
3608 _id = str(uuid4())
3609 nslcmop = {
3610 "id": _id,
3611 "_id": _id,
3612 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3613 "operationState": "PROCESSING",
3614 "statusEnteredTime": now,
3615 "nsInstanceId": nsr_id,
3616 "lcmOperationType": operation,
3617 "startTime": now,
3618 "isAutomaticInvocation": False,
3619 "operationParams": params,
3620 "isCancelPending": False,
3621 "links": {
3622 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3623 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3624 },
3625 }
3626 return nslcmop
3627
3628 def _format_additional_params(self, params):
3629 params = params or {}
3630 for key, value in params.items():
3631 if str(value).startswith("!!yaml "):
3632 params[key] = yaml.safe_load(value[7:])
3633 return params
3634
3635 def _get_terminate_primitive_params(self, seq, vnf_index):
3636 primitive = seq.get("name")
3637 primitive_params = {}
3638 params = {
3639 "member_vnf_index": vnf_index,
3640 "primitive": primitive,
3641 "primitive_params": primitive_params,
3642 }
3643 desc_params = {}
3644 return self._map_primitive_params(seq, params, desc_params)
3645
3646 # sub-operations
3647
3648 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3649 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3650 if op.get("operationState") == "COMPLETED":
3651 # b. Skip sub-operation
3652 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3653 return self.SUBOPERATION_STATUS_SKIP
3654 else:
3655 # c. retry executing sub-operation
3656 # The sub-operation exists, and operationState != 'COMPLETED'
3657 # Update operationState = 'PROCESSING' to indicate a retry.
3658 operationState = "PROCESSING"
3659 detailed_status = "In progress"
3660 self._update_suboperation_status(
3661 db_nslcmop, op_index, operationState, detailed_status
3662 )
3663 # Return the sub-operation index
3664 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3665 # with arguments extracted from the sub-operation
3666 return op_index
3667
3668 # Find a sub-operation where all keys in a matching dictionary must match
3669 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3670 def _find_suboperation(self, db_nslcmop, match):
3671 if db_nslcmop and match:
3672 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3673 for i, op in enumerate(op_list):
3674 if all(op.get(k) == match[k] for k in match):
3675 return i
3676 return self.SUBOPERATION_STATUS_NOT_FOUND
3677
3678 # Update status for a sub-operation given its index
3679 def _update_suboperation_status(
3680 self, db_nslcmop, op_index, operationState, detailed_status
3681 ):
3682 # Update DB for HA tasks
3683 q_filter = {"_id": db_nslcmop["_id"]}
3684 update_dict = {
3685 "_admin.operations.{}.operationState".format(op_index): operationState,
3686 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3687 }
3688 self.db.set_one(
3689 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3690 )
3691
3692 # Add sub-operation, return the index of the added sub-operation
3693 # Optionally, set operationState, detailed-status, and operationType
3694 # Status and type are currently set for 'scale' sub-operations:
3695 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3696 # 'detailed-status' : status message
3697 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3698 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3699 def _add_suboperation(
3700 self,
3701 db_nslcmop,
3702 vnf_index,
3703 vdu_id,
3704 vdu_count_index,
3705 vdu_name,
3706 primitive,
3707 mapped_primitive_params,
3708 operationState=None,
3709 detailed_status=None,
3710 operationType=None,
3711 RO_nsr_id=None,
3712 RO_scaling_info=None,
3713 ):
3714 if not db_nslcmop:
3715 return self.SUBOPERATION_STATUS_NOT_FOUND
3716 # Get the "_admin.operations" list, if it exists
3717 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3718 op_list = db_nslcmop_admin.get("operations")
3719 # Create or append to the "_admin.operations" list
3720 new_op = {
3721 "member_vnf_index": vnf_index,
3722 "vdu_id": vdu_id,
3723 "vdu_count_index": vdu_count_index,
3724 "primitive": primitive,
3725 "primitive_params": mapped_primitive_params,
3726 }
3727 if operationState:
3728 new_op["operationState"] = operationState
3729 if detailed_status:
3730 new_op["detailed-status"] = detailed_status
3731 if operationType:
3732 new_op["lcmOperationType"] = operationType
3733 if RO_nsr_id:
3734 new_op["RO_nsr_id"] = RO_nsr_id
3735 if RO_scaling_info:
3736 new_op["RO_scaling_info"] = RO_scaling_info
3737 if not op_list:
3738 # No existing operations, create key 'operations' with current operation as first list element
3739 db_nslcmop_admin.update({"operations": [new_op]})
3740 op_list = db_nslcmop_admin.get("operations")
3741 else:
3742 # Existing operations, append operation to list
3743 op_list.append(new_op)
3744
3745 db_nslcmop_update = {"_admin.operations": op_list}
3746 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3747 op_index = len(op_list) - 1
3748 return op_index
3749
3750 # Helper methods for scale() sub-operations
3751
3752 # pre-scale/post-scale:
3753 # Check for 3 different cases:
3754 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3755 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3756 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3757 def _check_or_add_scale_suboperation(
3758 self,
3759 db_nslcmop,
3760 vnf_index,
3761 vnf_config_primitive,
3762 primitive_params,
3763 operationType,
3764 RO_nsr_id=None,
3765 RO_scaling_info=None,
3766 ):
3767 # Find this sub-operation
3768 if RO_nsr_id and RO_scaling_info:
3769 operationType = "SCALE-RO"
3770 match = {
3771 "member_vnf_index": vnf_index,
3772 "RO_nsr_id": RO_nsr_id,
3773 "RO_scaling_info": RO_scaling_info,
3774 }
3775 else:
3776 match = {
3777 "member_vnf_index": vnf_index,
3778 "primitive": vnf_config_primitive,
3779 "primitive_params": primitive_params,
3780 "lcmOperationType": operationType,
3781 }
3782 op_index = self._find_suboperation(db_nslcmop, match)
3783 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3784 # a. New sub-operation
3785 # The sub-operation does not exist, add it.
3786 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3787 # The following parameters are set to None for all kind of scaling:
3788 vdu_id = None
3789 vdu_count_index = None
3790 vdu_name = None
3791 if RO_nsr_id and RO_scaling_info:
3792 vnf_config_primitive = None
3793 primitive_params = None
3794 else:
3795 RO_nsr_id = None
3796 RO_scaling_info = None
3797 # Initial status for sub-operation
3798 operationState = "PROCESSING"
3799 detailed_status = "In progress"
3800 # Add sub-operation for pre/post-scaling (zero or more operations)
3801 self._add_suboperation(
3802 db_nslcmop,
3803 vnf_index,
3804 vdu_id,
3805 vdu_count_index,
3806 vdu_name,
3807 vnf_config_primitive,
3808 primitive_params,
3809 operationState,
3810 detailed_status,
3811 operationType,
3812 RO_nsr_id,
3813 RO_scaling_info,
3814 )
3815 return self.SUBOPERATION_STATUS_NEW
3816 else:
3817 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3818 # or op_index (operationState != 'COMPLETED')
3819 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3820
3821 # Function to return execution_environment id
3822
3823 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3824 # TODO vdu_index_count
3825 for vca in vca_deployed_list:
3826 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3827 return vca["ee_id"]
3828
3829 async def destroy_N2VC(
3830 self,
3831 logging_text,
3832 db_nslcmop,
3833 vca_deployed,
3834 config_descriptor,
3835 vca_index,
3836 destroy_ee=True,
3837 exec_primitives=True,
3838 scaling_in=False,
3839 vca_id: str = None,
3840 ):
3841 """
3842 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3843 :param logging_text:
3844 :param db_nslcmop:
3845 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3846 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3847 :param vca_index: index in the database _admin.deployed.VCA
3848 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3849 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3850 not executed properly
3851 :param scaling_in: True destroys the application, False destroys the model
3852 :return: None or exception
3853 """
3854
3855 self.logger.debug(
3856 logging_text
3857 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3858 vca_index, vca_deployed, config_descriptor, destroy_ee
3859 )
3860 )
3861
3862 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3863
3864 # execute terminate_primitives
3865 if exec_primitives:
3866 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3867 config_descriptor.get("terminate-config-primitive"),
3868 vca_deployed.get("ee_descriptor_id"),
3869 )
3870 vdu_id = vca_deployed.get("vdu_id")
3871 vdu_count_index = vca_deployed.get("vdu_count_index")
3872 vdu_name = vca_deployed.get("vdu_name")
3873 vnf_index = vca_deployed.get("member-vnf-index")
3874 if terminate_primitives and vca_deployed.get("needed_terminate"):
3875 for seq in terminate_primitives:
3876 # For each sequence in list, get primitive and call _ns_execute_primitive()
3877 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3878 vnf_index, seq.get("name")
3879 )
3880 self.logger.debug(logging_text + step)
3881 # Create the primitive for each sequence, i.e. "primitive": "touch"
3882 primitive = seq.get("name")
3883 mapped_primitive_params = self._get_terminate_primitive_params(
3884 seq, vnf_index
3885 )
3886
3887 # Add sub-operation
3888 self._add_suboperation(
3889 db_nslcmop,
3890 vnf_index,
3891 vdu_id,
3892 vdu_count_index,
3893 vdu_name,
3894 primitive,
3895 mapped_primitive_params,
3896 )
3897 # Sub-operations: Call _ns_execute_primitive() instead of action()
3898 try:
3899 result, result_detail = await self._ns_execute_primitive(
3900 vca_deployed["ee_id"],
3901 primitive,
3902 mapped_primitive_params,
3903 vca_type=vca_type,
3904 vca_id=vca_id,
3905 )
3906 except LcmException:
3907 # this happens when VCA is not deployed. In this case it is not needed to terminate
3908 continue
3909 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3910 if result not in result_ok:
3911 raise LcmException(
3912 "terminate_primitive {} for vnf_member_index={} fails with "
3913 "error {}".format(seq.get("name"), vnf_index, result_detail)
3914 )
3915 # set that this VCA do not need terminated
3916 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3917 vca_index
3918 )
3919 self.update_db_2(
3920 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3921 )
3922
3923 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3924 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3925
3926 if destroy_ee:
3927 await self.vca_map[vca_type].delete_execution_environment(
3928 vca_deployed["ee_id"],
3929 scaling_in=scaling_in,
3930 vca_type=vca_type,
3931 vca_id=vca_id,
3932 )
3933
3934 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3935 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3936 namespace = "." + db_nsr["_id"]
3937 try:
3938 await self.n2vc.delete_namespace(
3939 namespace=namespace,
3940 total_timeout=self.timeout_charm_delete,
3941 vca_id=vca_id,
3942 )
3943 except N2VCNotFound: # already deleted. Skip
3944 pass
3945 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3946
3947 async def _terminate_RO(
3948 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3949 ):
3950 """
3951 Terminates a deployment from RO
3952 :param logging_text:
3953 :param nsr_deployed: db_nsr._admin.deployed
3954 :param nsr_id:
3955 :param nslcmop_id:
3956 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3957 this method will update only the index 2, but it will write on database the concatenated content of the list
3958 :return:
3959 """
3960 db_nsr_update = {}
3961 failed_detail = []
3962 ro_nsr_id = ro_delete_action = None
3963 if nsr_deployed and nsr_deployed.get("RO"):
3964 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3965 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3966 try:
3967 if ro_nsr_id:
3968 stage[2] = "Deleting ns from VIM."
3969 db_nsr_update["detailed-status"] = " ".join(stage)
3970 self._write_op_status(nslcmop_id, stage)
3971 self.logger.debug(logging_text + stage[2])
3972 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3973 self._write_op_status(nslcmop_id, stage)
3974 desc = await self.RO.delete("ns", ro_nsr_id)
3975 ro_delete_action = desc["action_id"]
3976 db_nsr_update[
3977 "_admin.deployed.RO.nsr_delete_action_id"
3978 ] = ro_delete_action
3979 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3980 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3981 if ro_delete_action:
3982 # wait until NS is deleted from VIM
3983 stage[2] = "Waiting ns deleted from VIM."
3984 detailed_status_old = None
3985 self.logger.debug(
3986 logging_text
3987 + stage[2]
3988 + " RO_id={} ro_delete_action={}".format(
3989 ro_nsr_id, ro_delete_action
3990 )
3991 )
3992 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3993 self._write_op_status(nslcmop_id, stage)
3994
3995 delete_timeout = 20 * 60 # 20 minutes
3996 while delete_timeout > 0:
3997 desc = await self.RO.show(
3998 "ns",
3999 item_id_name=ro_nsr_id,
4000 extra_item="action",
4001 extra_item_id=ro_delete_action,
4002 )
4003
4004 # deploymentStatus
4005 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4006
4007 ns_status, ns_status_info = self.RO.check_action_status(desc)
4008 if ns_status == "ERROR":
4009 raise ROclient.ROClientException(ns_status_info)
4010 elif ns_status == "BUILD":
4011 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4012 elif ns_status == "ACTIVE":
4013 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4014 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4015 break
4016 else:
4017 assert (
4018 False
4019 ), "ROclient.check_action_status returns unknown {}".format(
4020 ns_status
4021 )
4022 if stage[2] != detailed_status_old:
4023 detailed_status_old = stage[2]
4024 db_nsr_update["detailed-status"] = " ".join(stage)
4025 self._write_op_status(nslcmop_id, stage)
4026 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4027 await asyncio.sleep(5, loop=self.loop)
4028 delete_timeout -= 5
4029 else: # delete_timeout <= 0:
4030 raise ROclient.ROClientException(
4031 "Timeout waiting ns deleted from VIM"
4032 )
4033
4034 except Exception as e:
4035 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4036 if (
4037 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4038 ): # not found
4039 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4040 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4041 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4042 self.logger.debug(
4043 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4044 )
4045 elif (
4046 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4047 ): # conflict
4048 failed_detail.append("delete conflict: {}".format(e))
4049 self.logger.debug(
4050 logging_text
4051 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4052 )
4053 else:
4054 failed_detail.append("delete error: {}".format(e))
4055 self.logger.error(
4056 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4057 )
4058
4059 # Delete nsd
4060 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4061 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4062 try:
4063 stage[2] = "Deleting nsd from RO."
4064 db_nsr_update["detailed-status"] = " ".join(stage)
4065 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4066 self._write_op_status(nslcmop_id, stage)
4067 await self.RO.delete("nsd", ro_nsd_id)
4068 self.logger.debug(
4069 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4070 )
4071 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4072 except Exception as e:
4073 if (
4074 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4075 ): # not found
4076 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4077 self.logger.debug(
4078 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4079 )
4080 elif (
4081 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4082 ): # conflict
4083 failed_detail.append(
4084 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4085 )
4086 self.logger.debug(logging_text + failed_detail[-1])
4087 else:
4088 failed_detail.append(
4089 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4090 )
4091 self.logger.error(logging_text + failed_detail[-1])
4092
4093 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4094 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4095 if not vnf_deployed or not vnf_deployed["id"]:
4096 continue
4097 try:
4098 ro_vnfd_id = vnf_deployed["id"]
4099 stage[
4100 2
4101 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4102 vnf_deployed["member-vnf-index"], ro_vnfd_id
4103 )
4104 db_nsr_update["detailed-status"] = " ".join(stage)
4105 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4106 self._write_op_status(nslcmop_id, stage)
4107 await self.RO.delete("vnfd", ro_vnfd_id)
4108 self.logger.debug(
4109 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4110 )
4111 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4112 except Exception as e:
4113 if (
4114 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4115 ): # not found
4116 db_nsr_update[
4117 "_admin.deployed.RO.vnfd.{}.id".format(index)
4118 ] = None
4119 self.logger.debug(
4120 logging_text
4121 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4122 )
4123 elif (
4124 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4125 ): # conflict
4126 failed_detail.append(
4127 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4128 )
4129 self.logger.debug(logging_text + failed_detail[-1])
4130 else:
4131 failed_detail.append(
4132 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4133 )
4134 self.logger.error(logging_text + failed_detail[-1])
4135
4136 if failed_detail:
4137 stage[2] = "Error deleting from VIM"
4138 else:
4139 stage[2] = "Deleted from VIM"
4140 db_nsr_update["detailed-status"] = " ".join(stage)
4141 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4142 self._write_op_status(nslcmop_id, stage)
4143
4144 if failed_detail:
4145 raise LcmException("; ".join(failed_detail))
4146
4147 async def terminate(self, nsr_id, nslcmop_id):
4148 # Try to lock HA task here
4149 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4150 if not task_is_locked_by_me:
4151 return
4152
4153 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4154 self.logger.debug(logging_text + "Enter")
4155 timeout_ns_terminate = self.timeout_ns_terminate
4156 db_nsr = None
4157 db_nslcmop = None
4158 operation_params = None
4159 exc = None
4160 error_list = [] # annotates all failed error messages
4161 db_nslcmop_update = {}
4162 autoremove = False # autoremove after terminated
4163 tasks_dict_info = {}
4164 db_nsr_update = {}
4165 stage = [
4166 "Stage 1/3: Preparing task.",
4167 "Waiting for previous operations to terminate.",
4168 "",
4169 ]
4170 # ^ contains [stage, step, VIM-status]
4171 try:
4172 # wait for any previous tasks in process
4173 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4174
4175 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4176 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4177 operation_params = db_nslcmop.get("operationParams") or {}
4178 if operation_params.get("timeout_ns_terminate"):
4179 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4180 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4181 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4182
4183 db_nsr_update["operational-status"] = "terminating"
4184 db_nsr_update["config-status"] = "terminating"
4185 self._write_ns_status(
4186 nsr_id=nsr_id,
4187 ns_state="TERMINATING",
4188 current_operation="TERMINATING",
4189 current_operation_id=nslcmop_id,
4190 other_update=db_nsr_update,
4191 )
4192 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4193 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4194 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4195 return
4196
4197 stage[1] = "Getting vnf descriptors from db."
4198 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4199 db_vnfrs_dict = {
4200 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4201 }
4202 db_vnfds_from_id = {}
4203 db_vnfds_from_member_index = {}
4204 # Loop over VNFRs
4205 for vnfr in db_vnfrs_list:
4206 vnfd_id = vnfr["vnfd-id"]
4207 if vnfd_id not in db_vnfds_from_id:
4208 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4209 db_vnfds_from_id[vnfd_id] = vnfd
4210 db_vnfds_from_member_index[
4211 vnfr["member-vnf-index-ref"]
4212 ] = db_vnfds_from_id[vnfd_id]
4213
4214 # Destroy individual execution environments when there are terminating primitives.
4215 # Rest of EE will be deleted at once
4216 # TODO - check before calling _destroy_N2VC
4217 # if not operation_params.get("skip_terminate_primitives"):#
4218 # or not vca.get("needed_terminate"):
4219 stage[0] = "Stage 2/3 execute terminating primitives."
4220 self.logger.debug(logging_text + stage[0])
4221 stage[1] = "Looking execution environment that needs terminate."
4222 self.logger.debug(logging_text + stage[1])
4223
4224 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4225 config_descriptor = None
4226 vca_member_vnf_index = vca.get("member-vnf-index")
4227 vca_id = self.get_vca_id(
4228 db_vnfrs_dict.get(vca_member_vnf_index)
4229 if vca_member_vnf_index
4230 else None,
4231 db_nsr,
4232 )
4233 if not vca or not vca.get("ee_id"):
4234 continue
4235 if not vca.get("member-vnf-index"):
4236 # ns
4237 config_descriptor = db_nsr.get("ns-configuration")
4238 elif vca.get("vdu_id"):
4239 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4240 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4241 elif vca.get("kdu_name"):
4242 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4243 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4244 else:
4245 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4246 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4247 vca_type = vca.get("type")
4248 exec_terminate_primitives = not operation_params.get(
4249 "skip_terminate_primitives"
4250 ) and vca.get("needed_terminate")
4251 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4252 # pending native charms
4253 destroy_ee = (
4254 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4255 )
4256 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4257 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4258 task = asyncio.ensure_future(
4259 self.destroy_N2VC(
4260 logging_text,
4261 db_nslcmop,
4262 vca,
4263 config_descriptor,
4264 vca_index,
4265 destroy_ee,
4266 exec_terminate_primitives,
4267 vca_id=vca_id,
4268 )
4269 )
4270 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4271
4272 # wait for pending tasks of terminate primitives
4273 if tasks_dict_info:
4274 self.logger.debug(
4275 logging_text
4276 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4277 )
4278 error_list = await self._wait_for_tasks(
4279 logging_text,
4280 tasks_dict_info,
4281 min(self.timeout_charm_delete, timeout_ns_terminate),
4282 stage,
4283 nslcmop_id,
4284 )
4285 tasks_dict_info.clear()
4286 if error_list:
4287 return # raise LcmException("; ".join(error_list))
4288
4289 # remove All execution environments at once
4290 stage[0] = "Stage 3/3 delete all."
4291
4292 if nsr_deployed.get("VCA"):
4293 stage[1] = "Deleting all execution environments."
4294 self.logger.debug(logging_text + stage[1])
4295 vca_id = self.get_vca_id({}, db_nsr)
4296 task_delete_ee = asyncio.ensure_future(
4297 asyncio.wait_for(
4298 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4299 timeout=self.timeout_charm_delete,
4300 )
4301 )
4302 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4303 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4304
4305 # Delete from k8scluster
4306 stage[1] = "Deleting KDUs."
4307 self.logger.debug(logging_text + stage[1])
4308 # print(nsr_deployed)
4309 for kdu in get_iterable(nsr_deployed, "K8s"):
4310 if not kdu or not kdu.get("kdu-instance"):
4311 continue
4312 kdu_instance = kdu.get("kdu-instance")
4313 if kdu.get("k8scluster-type") in self.k8scluster_map:
4314 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4315 vca_id = self.get_vca_id({}, db_nsr)
4316 task_delete_kdu_instance = asyncio.ensure_future(
4317 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4318 cluster_uuid=kdu.get("k8scluster-uuid"),
4319 kdu_instance=kdu_instance,
4320 vca_id=vca_id,
4321 )
4322 )
4323 else:
4324 self.logger.error(
4325 logging_text
4326 + "Unknown k8s deployment type {}".format(
4327 kdu.get("k8scluster-type")
4328 )
4329 )
4330 continue
4331 tasks_dict_info[
4332 task_delete_kdu_instance
4333 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4334
4335 # remove from RO
4336 stage[1] = "Deleting ns from VIM."
4337 if self.ng_ro:
4338 task_delete_ro = asyncio.ensure_future(
4339 self._terminate_ng_ro(
4340 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4341 )
4342 )
4343 else:
4344 task_delete_ro = asyncio.ensure_future(
4345 self._terminate_RO(
4346 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4347 )
4348 )
4349 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4350
4351 # rest of staff will be done at finally
4352
4353 except (
4354 ROclient.ROClientException,
4355 DbException,
4356 LcmException,
4357 N2VCException,
4358 ) as e:
4359 self.logger.error(logging_text + "Exit Exception {}".format(e))
4360 exc = e
4361 except asyncio.CancelledError:
4362 self.logger.error(
4363 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4364 )
4365 exc = "Operation was cancelled"
4366 except Exception as e:
4367 exc = traceback.format_exc()
4368 self.logger.critical(
4369 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4370 exc_info=True,
4371 )
4372 finally:
4373 if exc:
4374 error_list.append(str(exc))
4375 try:
4376 # wait for pending tasks
4377 if tasks_dict_info:
4378 stage[1] = "Waiting for terminate pending tasks."
4379 self.logger.debug(logging_text + stage[1])
4380 error_list += await self._wait_for_tasks(
4381 logging_text,
4382 tasks_dict_info,
4383 timeout_ns_terminate,
4384 stage,
4385 nslcmop_id,
4386 )
4387 stage[1] = stage[2] = ""
4388 except asyncio.CancelledError:
4389 error_list.append("Cancelled")
4390 # TODO cancell all tasks
4391 except Exception as exc:
4392 error_list.append(str(exc))
4393 # update status at database
4394 if error_list:
4395 error_detail = "; ".join(error_list)
4396 # self.logger.error(logging_text + error_detail)
4397 error_description_nslcmop = "{} Detail: {}".format(
4398 stage[0], error_detail
4399 )
4400 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4401 nslcmop_id, stage[0]
4402 )
4403
4404 db_nsr_update["operational-status"] = "failed"
4405 db_nsr_update["detailed-status"] = (
4406 error_description_nsr + " Detail: " + error_detail
4407 )
4408 db_nslcmop_update["detailed-status"] = error_detail
4409 nslcmop_operation_state = "FAILED"
4410 ns_state = "BROKEN"
4411 else:
4412 error_detail = None
4413 error_description_nsr = error_description_nslcmop = None
4414 ns_state = "NOT_INSTANTIATED"
4415 db_nsr_update["operational-status"] = "terminated"
4416 db_nsr_update["detailed-status"] = "Done"
4417 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4418 db_nslcmop_update["detailed-status"] = "Done"
4419 nslcmop_operation_state = "COMPLETED"
4420
4421 if db_nsr:
4422 self._write_ns_status(
4423 nsr_id=nsr_id,
4424 ns_state=ns_state,
4425 current_operation="IDLE",
4426 current_operation_id=None,
4427 error_description=error_description_nsr,
4428 error_detail=error_detail,
4429 other_update=db_nsr_update,
4430 )
4431 self._write_op_status(
4432 op_id=nslcmop_id,
4433 stage="",
4434 error_message=error_description_nslcmop,
4435 operation_state=nslcmop_operation_state,
4436 other_update=db_nslcmop_update,
4437 )
4438 if ns_state == "NOT_INSTANTIATED":
4439 try:
4440 self.db.set_list(
4441 "vnfrs",
4442 {"nsr-id-ref": nsr_id},
4443 {"_admin.nsState": "NOT_INSTANTIATED"},
4444 )
4445 except DbException as e:
4446 self.logger.warn(
4447 logging_text
4448 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4449 nsr_id, e
4450 )
4451 )
4452 if operation_params:
4453 autoremove = operation_params.get("autoremove", False)
4454 if nslcmop_operation_state:
4455 try:
4456 await self.msg.aiowrite(
4457 "ns",
4458 "terminated",
4459 {
4460 "nsr_id": nsr_id,
4461 "nslcmop_id": nslcmop_id,
4462 "operationState": nslcmop_operation_state,
4463 "autoremove": autoremove,
4464 },
4465 loop=self.loop,
4466 )
4467 except Exception as e:
4468 self.logger.error(
4469 logging_text + "kafka_write notification Exception {}".format(e)
4470 )
4471
4472 self.logger.debug(logging_text + "Exit")
4473 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4474
4475 async def _wait_for_tasks(
4476 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4477 ):
4478 time_start = time()
4479 error_detail_list = []
4480 error_list = []
4481 pending_tasks = list(created_tasks_info.keys())
4482 num_tasks = len(pending_tasks)
4483 num_done = 0
4484 stage[1] = "{}/{}.".format(num_done, num_tasks)
4485 self._write_op_status(nslcmop_id, stage)
4486 while pending_tasks:
4487 new_error = None
4488 _timeout = timeout + time_start - time()
4489 done, pending_tasks = await asyncio.wait(
4490 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4491 )
4492 num_done += len(done)
4493 if not done: # Timeout
4494 for task in pending_tasks:
4495 new_error = created_tasks_info[task] + ": Timeout"
4496 error_detail_list.append(new_error)
4497 error_list.append(new_error)
4498 break
4499 for task in done:
4500 if task.cancelled():
4501 exc = "Cancelled"
4502 else:
4503 exc = task.exception()
4504 if exc:
4505 if isinstance(exc, asyncio.TimeoutError):
4506 exc = "Timeout"
4507 new_error = created_tasks_info[task] + ": {}".format(exc)
4508 error_list.append(created_tasks_info[task])
4509 error_detail_list.append(new_error)
4510 if isinstance(
4511 exc,
4512 (
4513 str,
4514 DbException,
4515 N2VCException,
4516 ROclient.ROClientException,
4517 LcmException,
4518 K8sException,
4519 NgRoException,
4520 ),
4521 ):
4522 self.logger.error(logging_text + new_error)
4523 else:
4524 exc_traceback = "".join(
4525 traceback.format_exception(None, exc, exc.__traceback__)
4526 )
4527 self.logger.error(
4528 logging_text
4529 + created_tasks_info[task]
4530 + " "
4531 + exc_traceback
4532 )
4533 else:
4534 self.logger.debug(
4535 logging_text + created_tasks_info[task] + ": Done"
4536 )
4537 stage[1] = "{}/{}.".format(num_done, num_tasks)
4538 if new_error:
4539 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4540 if nsr_id: # update also nsr
4541 self.update_db_2(
4542 "nsrs",
4543 nsr_id,
4544 {
4545 "errorDescription": "Error at: " + ", ".join(error_list),
4546 "errorDetail": ". ".join(error_detail_list),
4547 },
4548 )
4549 self._write_op_status(nslcmop_id, stage)
4550 return error_detail_list
4551
4552 @staticmethod
4553 def _map_primitive_params(primitive_desc, params, instantiation_params):
4554 """
4555 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4556 The default-value is used. If it is between < > it look for a value at instantiation_params
4557 :param primitive_desc: portion of VNFD/NSD that describes primitive
4558 :param params: Params provided by user
4559 :param instantiation_params: Instantiation params provided by user
4560 :return: a dictionary with the calculated params
4561 """
4562 calculated_params = {}
4563 for parameter in primitive_desc.get("parameter", ()):
4564 param_name = parameter["name"]
4565 if param_name in params:
4566 calculated_params[param_name] = params[param_name]
4567 elif "default-value" in parameter or "value" in parameter:
4568 if "value" in parameter:
4569 calculated_params[param_name] = parameter["value"]
4570 else:
4571 calculated_params[param_name] = parameter["default-value"]
4572 if (
4573 isinstance(calculated_params[param_name], str)
4574 and calculated_params[param_name].startswith("<")
4575 and calculated_params[param_name].endswith(">")
4576 ):
4577 if calculated_params[param_name][1:-1] in instantiation_params:
4578 calculated_params[param_name] = instantiation_params[
4579 calculated_params[param_name][1:-1]
4580 ]
4581 else:
4582 raise LcmException(
4583 "Parameter {} needed to execute primitive {} not provided".format(
4584 calculated_params[param_name], primitive_desc["name"]
4585 )
4586 )
4587 else:
4588 raise LcmException(
4589 "Parameter {} needed to execute primitive {} not provided".format(
4590 param_name, primitive_desc["name"]
4591 )
4592 )
4593
4594 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4595 calculated_params[param_name] = yaml.safe_dump(
4596 calculated_params[param_name], default_flow_style=True, width=256
4597 )
4598 elif isinstance(calculated_params[param_name], str) and calculated_params[
4599 param_name
4600 ].startswith("!!yaml "):
4601 calculated_params[param_name] = calculated_params[param_name][7:]
4602 if parameter.get("data-type") == "INTEGER":
4603 try:
4604 calculated_params[param_name] = int(calculated_params[param_name])
4605 except ValueError: # error converting string to int
4606 raise LcmException(
4607 "Parameter {} of primitive {} must be integer".format(
4608 param_name, primitive_desc["name"]
4609 )
4610 )
4611 elif parameter.get("data-type") == "BOOLEAN":
4612 calculated_params[param_name] = not (
4613 (str(calculated_params[param_name])).lower() == "false"
4614 )
4615
4616 # add always ns_config_info if primitive name is config
4617 if primitive_desc["name"] == "config":
4618 if "ns_config_info" in instantiation_params:
4619 calculated_params["ns_config_info"] = instantiation_params[
4620 "ns_config_info"
4621 ]
4622 return calculated_params
4623
4624 def _look_for_deployed_vca(
4625 self,
4626 deployed_vca,
4627 member_vnf_index,
4628 vdu_id,
4629 vdu_count_index,
4630 kdu_name=None,
4631 ee_descriptor_id=None,
4632 ):
4633 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4634 for vca in deployed_vca:
4635 if not vca:
4636 continue
4637 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4638 continue
4639 if (
4640 vdu_count_index is not None
4641 and vdu_count_index != vca["vdu_count_index"]
4642 ):
4643 continue
4644 if kdu_name and kdu_name != vca["kdu_name"]:
4645 continue
4646 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4647 continue
4648 break
4649 else:
4650 # vca_deployed not found
4651 raise LcmException(
4652 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4653 " is not deployed".format(
4654 member_vnf_index,
4655 vdu_id,
4656 vdu_count_index,
4657 kdu_name,
4658 ee_descriptor_id,
4659 )
4660 )
4661 # get ee_id
4662 ee_id = vca.get("ee_id")
4663 vca_type = vca.get(
4664 "type", "lxc_proxy_charm"
4665 ) # default value for backward compatibility - proxy charm
4666 if not ee_id:
4667 raise LcmException(
4668 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4669 "execution environment".format(
4670 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4671 )
4672 )
4673 return ee_id, vca_type
4674
4675 async def _ns_execute_primitive(
4676 self,
4677 ee_id,
4678 primitive,
4679 primitive_params,
4680 retries=0,
4681 retries_interval=30,
4682 timeout=None,
4683 vca_type=None,
4684 db_dict=None,
4685 vca_id: str = None,
4686 ) -> (str, str):
4687 try:
4688 if primitive == "config":
4689 primitive_params = {"params": primitive_params}
4690
4691 vca_type = vca_type or "lxc_proxy_charm"
4692
4693 while retries >= 0:
4694 try:
4695 output = await asyncio.wait_for(
4696 self.vca_map[vca_type].exec_primitive(
4697 ee_id=ee_id,
4698 primitive_name=primitive,
4699 params_dict=primitive_params,
4700 progress_timeout=self.timeout_progress_primitive,
4701 total_timeout=self.timeout_primitive,
4702 db_dict=db_dict,
4703 vca_id=vca_id,
4704 vca_type=vca_type,
4705 ),
4706 timeout=timeout or self.timeout_primitive,
4707 )
4708 # execution was OK
4709 break
4710 except asyncio.CancelledError:
4711 raise
4712 except Exception as e: # asyncio.TimeoutError
4713 if isinstance(e, asyncio.TimeoutError):
4714 e = "Timeout"
4715 retries -= 1
4716 if retries >= 0:
4717 self.logger.debug(
4718 "Error executing action {} on {} -> {}".format(
4719 primitive, ee_id, e
4720 )
4721 )
4722 # wait and retry
4723 await asyncio.sleep(retries_interval, loop=self.loop)
4724 else:
4725 return "FAILED", str(e)
4726
4727 return "COMPLETED", output
4728
4729 except (LcmException, asyncio.CancelledError):
4730 raise
4731 except Exception as e:
4732 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4733
4734 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4735 """
4736 Updating the vca_status with latest juju information in nsrs record
4737 :param: nsr_id: Id of the nsr
4738 :param: nslcmop_id: Id of the nslcmop
4739 :return: None
4740 """
4741
4742 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4743 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4744 vca_id = self.get_vca_id({}, db_nsr)
4745 if db_nsr["_admin"]["deployed"]["K8s"]:
4746 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4747 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4748 await self._on_update_k8s_db(
4749 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4750 )
4751 else:
4752 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4753 table, filter = "nsrs", {"_id": nsr_id}
4754 path = "_admin.deployed.VCA.{}.".format(vca_index)
4755 await self._on_update_n2vc_db(table, filter, path, {})
4756
4757 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4758 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4759
4760 async def action(self, nsr_id, nslcmop_id):
4761 # Try to lock HA task here
4762 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4763 if not task_is_locked_by_me:
4764 return
4765
4766 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4767 self.logger.debug(logging_text + "Enter")
4768 # get all needed from database
4769 db_nsr = None
4770 db_nslcmop = None
4771 db_nsr_update = {}
4772 db_nslcmop_update = {}
4773 nslcmop_operation_state = None
4774 error_description_nslcmop = None
4775 exc = None
4776 try:
4777 # wait for any previous tasks in process
4778 step = "Waiting for previous operations to terminate"
4779 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4780
4781 self._write_ns_status(
4782 nsr_id=nsr_id,
4783 ns_state=None,
4784 current_operation="RUNNING ACTION",
4785 current_operation_id=nslcmop_id,
4786 )
4787
4788 step = "Getting information from database"
4789 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4790 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4791
4792 nsr_deployed = db_nsr["_admin"].get("deployed")
4793 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4794 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4795 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4796 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4797 primitive = db_nslcmop["operationParams"]["primitive"]
4798 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4799 timeout_ns_action = db_nslcmop["operationParams"].get(
4800 "timeout_ns_action", self.timeout_primitive
4801 )
4802
4803 if vnf_index:
4804 step = "Getting vnfr from database"
4805 db_vnfr = self.db.get_one(
4806 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4807 )
4808 step = "Getting vnfd from database"
4809 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4810 else:
4811 step = "Getting nsd from database"
4812 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4813
4814 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4815 # for backward compatibility
4816 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4817 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4818 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4819 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4820
4821 # look for primitive
4822 config_primitive_desc = descriptor_configuration = None
4823 if vdu_id:
4824 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4825 elif kdu_name:
4826 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4827 elif vnf_index:
4828 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4829 else:
4830 descriptor_configuration = db_nsd.get("ns-configuration")
4831
4832 if descriptor_configuration and descriptor_configuration.get(
4833 "config-primitive"
4834 ):
4835 for config_primitive in descriptor_configuration["config-primitive"]:
4836 if config_primitive["name"] == primitive:
4837 config_primitive_desc = config_primitive
4838 break
4839
4840 if not config_primitive_desc:
4841 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4842 raise LcmException(
4843 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4844 primitive
4845 )
4846 )
4847 primitive_name = primitive
4848 ee_descriptor_id = None
4849 else:
4850 primitive_name = config_primitive_desc.get(
4851 "execution-environment-primitive", primitive
4852 )
4853 ee_descriptor_id = config_primitive_desc.get(
4854 "execution-environment-ref"
4855 )
4856
4857 if vnf_index:
4858 if vdu_id:
4859 vdur = next(
4860 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4861 )
4862 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4863 elif kdu_name:
4864 kdur = next(
4865 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4866 )
4867 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4868 else:
4869 desc_params = parse_yaml_strings(
4870 db_vnfr.get("additionalParamsForVnf")
4871 )
4872 else:
4873 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4874 if kdu_name and get_configuration(db_vnfd, kdu_name):
4875 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4876 actions = set()
4877 for primitive in kdu_configuration.get("initial-config-primitive", []):
4878 actions.add(primitive["name"])
4879 for primitive in kdu_configuration.get("config-primitive", []):
4880 actions.add(primitive["name"])
4881 kdu_action = True if primitive_name in actions else False
4882
4883 # TODO check if ns is in a proper status
4884 if kdu_name and (
4885 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4886 ):
4887 # kdur and desc_params already set from before
4888 if primitive_params:
4889 desc_params.update(primitive_params)
4890 # TODO Check if we will need something at vnf level
4891 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4892 if (
4893 kdu_name == kdu["kdu-name"]
4894 and kdu["member-vnf-index"] == vnf_index
4895 ):
4896 break
4897 else:
4898 raise LcmException(
4899 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4900 )
4901
4902 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4903 msg = "unknown k8scluster-type '{}'".format(
4904 kdu.get("k8scluster-type")
4905 )
4906 raise LcmException(msg)
4907
4908 db_dict = {
4909 "collection": "nsrs",
4910 "filter": {"_id": nsr_id},
4911 "path": "_admin.deployed.K8s.{}".format(index),
4912 }
4913 self.logger.debug(
4914 logging_text
4915 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4916 )
4917 step = "Executing kdu {}".format(primitive_name)
4918 if primitive_name == "upgrade":
4919 if desc_params.get("kdu_model"):
4920 kdu_model = desc_params.get("kdu_model")
4921 del desc_params["kdu_model"]
4922 else:
4923 kdu_model = kdu.get("kdu-model")
4924 parts = kdu_model.split(sep=":")
4925 if len(parts) == 2:
4926 kdu_model = parts[0]
4927
4928 detailed_status = await asyncio.wait_for(
4929 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4930 cluster_uuid=kdu.get("k8scluster-uuid"),
4931 kdu_instance=kdu.get("kdu-instance"),
4932 atomic=True,
4933 kdu_model=kdu_model,
4934 params=desc_params,
4935 db_dict=db_dict,
4936 timeout=timeout_ns_action,
4937 ),
4938 timeout=timeout_ns_action + 10,
4939 )
4940 self.logger.debug(
4941 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4942 )
4943 elif primitive_name == "rollback":
4944 detailed_status = await asyncio.wait_for(
4945 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4946 cluster_uuid=kdu.get("k8scluster-uuid"),
4947 kdu_instance=kdu.get("kdu-instance"),
4948 db_dict=db_dict,
4949 ),
4950 timeout=timeout_ns_action,
4951 )
4952 elif primitive_name == "status":
4953 detailed_status = await asyncio.wait_for(
4954 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4955 cluster_uuid=kdu.get("k8scluster-uuid"),
4956 kdu_instance=kdu.get("kdu-instance"),
4957 vca_id=vca_id,
4958 ),
4959 timeout=timeout_ns_action,
4960 )
4961 else:
4962 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4963 kdu["kdu-name"], nsr_id
4964 )
4965 params = self._map_primitive_params(
4966 config_primitive_desc, primitive_params, desc_params
4967 )
4968
4969 detailed_status = await asyncio.wait_for(
4970 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4971 cluster_uuid=kdu.get("k8scluster-uuid"),
4972 kdu_instance=kdu_instance,
4973 primitive_name=primitive_name,
4974 params=params,
4975 db_dict=db_dict,
4976 timeout=timeout_ns_action,
4977 vca_id=vca_id,
4978 ),
4979 timeout=timeout_ns_action,
4980 )
4981
4982 if detailed_status:
4983 nslcmop_operation_state = "COMPLETED"
4984 else:
4985 detailed_status = ""
4986 nslcmop_operation_state = "FAILED"
4987 else:
4988 ee_id, vca_type = self._look_for_deployed_vca(
4989 nsr_deployed["VCA"],
4990 member_vnf_index=vnf_index,
4991 vdu_id=vdu_id,
4992 vdu_count_index=vdu_count_index,
4993 ee_descriptor_id=ee_descriptor_id,
4994 )
4995 for vca_index, vca_deployed in enumerate(
4996 db_nsr["_admin"]["deployed"]["VCA"]
4997 ):
4998 if vca_deployed.get("member-vnf-index") == vnf_index:
4999 db_dict = {
5000 "collection": "nsrs",
5001 "filter": {"_id": nsr_id},
5002 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5003 }
5004 break
5005 (
5006 nslcmop_operation_state,
5007 detailed_status,
5008 ) = await self._ns_execute_primitive(
5009 ee_id,
5010 primitive=primitive_name,
5011 primitive_params=self._map_primitive_params(
5012 config_primitive_desc, primitive_params, desc_params
5013 ),
5014 timeout=timeout_ns_action,
5015 vca_type=vca_type,
5016 db_dict=db_dict,
5017 vca_id=vca_id,
5018 )
5019
5020 db_nslcmop_update["detailed-status"] = detailed_status
5021 error_description_nslcmop = (
5022 detailed_status if nslcmop_operation_state == "FAILED" else ""
5023 )
5024 self.logger.debug(
5025 logging_text
5026 + " task Done with result {} {}".format(
5027 nslcmop_operation_state, detailed_status
5028 )
5029 )
5030 return # database update is called inside finally
5031
5032 except (DbException, LcmException, N2VCException, K8sException) as e:
5033 self.logger.error(logging_text + "Exit Exception {}".format(e))
5034 exc = e
5035 except asyncio.CancelledError:
5036 self.logger.error(
5037 logging_text + "Cancelled Exception while '{}'".format(step)
5038 )
5039 exc = "Operation was cancelled"
5040 except asyncio.TimeoutError:
5041 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5042 exc = "Timeout"
5043 except Exception as e:
5044 exc = traceback.format_exc()
5045 self.logger.critical(
5046 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5047 exc_info=True,
5048 )
5049 finally:
5050 if exc:
5051 db_nslcmop_update[
5052 "detailed-status"
5053 ] = (
5054 detailed_status
5055 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5056 nslcmop_operation_state = "FAILED"
5057 if db_nsr:
5058 self._write_ns_status(
5059 nsr_id=nsr_id,
5060 ns_state=db_nsr[
5061 "nsState"
5062 ], # TODO check if degraded. For the moment use previous status
5063 current_operation="IDLE",
5064 current_operation_id=None,
5065 # error_description=error_description_nsr,
5066 # error_detail=error_detail,
5067 other_update=db_nsr_update,
5068 )
5069
5070 self._write_op_status(
5071 op_id=nslcmop_id,
5072 stage="",
5073 error_message=error_description_nslcmop,
5074 operation_state=nslcmop_operation_state,
5075 other_update=db_nslcmop_update,
5076 )
5077
5078 if nslcmop_operation_state:
5079 try:
5080 await self.msg.aiowrite(
5081 "ns",
5082 "actioned",
5083 {
5084 "nsr_id": nsr_id,
5085 "nslcmop_id": nslcmop_id,
5086 "operationState": nslcmop_operation_state,
5087 },
5088 loop=self.loop,
5089 )
5090 except Exception as e:
5091 self.logger.error(
5092 logging_text + "kafka_write notification Exception {}".format(e)
5093 )
5094 self.logger.debug(logging_text + "Exit")
5095 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5096 return nslcmop_operation_state, detailed_status
5097
5098 async def scale(self, nsr_id, nslcmop_id):
5099 # Try to lock HA task here
5100 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5101 if not task_is_locked_by_me:
5102 return
5103
5104 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5105 stage = ["", "", ""]
5106 tasks_dict_info = {}
5107 # ^ stage, step, VIM progress
5108 self.logger.debug(logging_text + "Enter")
5109 # get all needed from database
5110 db_nsr = None
5111 db_nslcmop_update = {}
5112 db_nsr_update = {}
5113 exc = None
5114 # in case of error, indicates what part of scale was failed to put nsr at error status
5115 scale_process = None
5116 old_operational_status = ""
5117 old_config_status = ""
5118 nsi_id = None
5119 try:
5120 # wait for any previous tasks in process
5121 step = "Waiting for previous operations to terminate"
5122 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5123 self._write_ns_status(
5124 nsr_id=nsr_id,
5125 ns_state=None,
5126 current_operation="SCALING",
5127 current_operation_id=nslcmop_id,
5128 )
5129
5130 step = "Getting nslcmop from database"
5131 self.logger.debug(
5132 step + " after having waited for previous tasks to be completed"
5133 )
5134 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5135
5136 step = "Getting nsr from database"
5137 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5138 old_operational_status = db_nsr["operational-status"]
5139 old_config_status = db_nsr["config-status"]
5140
5141 step = "Parsing scaling parameters"
5142 db_nsr_update["operational-status"] = "scaling"
5143 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5144 nsr_deployed = db_nsr["_admin"].get("deployed")
5145
5146 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5147 "scaleByStepData"
5148 ]["member-vnf-index"]
5149 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5150 "scaleByStepData"
5151 ]["scaling-group-descriptor"]
5152 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5153 # for backward compatibility
5154 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5155 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5156 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5158
5159 step = "Getting vnfr from database"
5160 db_vnfr = self.db.get_one(
5161 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5162 )
5163
5164 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5165
5166 step = "Getting vnfd from database"
5167 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5168
5169 base_folder = db_vnfd["_admin"]["storage"]
5170
5171 step = "Getting scaling-group-descriptor"
5172 scaling_descriptor = find_in_list(
5173 get_scaling_aspect(db_vnfd),
5174 lambda scale_desc: scale_desc["name"] == scaling_group,
5175 )
5176 if not scaling_descriptor:
5177 raise LcmException(
5178 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5179 "at vnfd:scaling-group-descriptor".format(scaling_group)
5180 )
5181
5182 step = "Sending scale order to VIM"
5183 # TODO check if ns is in a proper status
5184 nb_scale_op = 0
5185 if not db_nsr["_admin"].get("scaling-group"):
5186 self.update_db_2(
5187 "nsrs",
5188 nsr_id,
5189 {
5190 "_admin.scaling-group": [
5191 {"name": scaling_group, "nb-scale-op": 0}
5192 ]
5193 },
5194 )
5195 admin_scale_index = 0
5196 else:
5197 for admin_scale_index, admin_scale_info in enumerate(
5198 db_nsr["_admin"]["scaling-group"]
5199 ):
5200 if admin_scale_info["name"] == scaling_group:
5201 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5202 break
5203 else: # not found, set index one plus last element and add new entry with the name
5204 admin_scale_index += 1
5205 db_nsr_update[
5206 "_admin.scaling-group.{}.name".format(admin_scale_index)
5207 ] = scaling_group
5208
5209 vca_scaling_info = []
5210 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5211 if scaling_type == "SCALE_OUT":
5212 if "aspect-delta-details" not in scaling_descriptor:
5213 raise LcmException(
5214 "Aspect delta details not fount in scaling descriptor {}".format(
5215 scaling_descriptor["name"]
5216 )
5217 )
5218 # count if max-instance-count is reached
5219 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5220
5221 scaling_info["scaling_direction"] = "OUT"
5222 scaling_info["vdu-create"] = {}
5223 scaling_info["kdu-create"] = {}
5224 for delta in deltas:
5225 for vdu_delta in delta.get("vdu-delta", {}):
5226 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5227 # vdu_index also provides the number of instance of the targeted vdu
5228 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5229 cloud_init_text = self._get_vdu_cloud_init_content(
5230 vdud, db_vnfd
5231 )
5232 if cloud_init_text:
5233 additional_params = (
5234 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5235 or {}
5236 )
5237 cloud_init_list = []
5238
5239 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5240 max_instance_count = 10
5241 if vdu_profile and "max-number-of-instances" in vdu_profile:
5242 max_instance_count = vdu_profile.get(
5243 "max-number-of-instances", 10
5244 )
5245
5246 default_instance_num = get_number_of_instances(
5247 db_vnfd, vdud["id"]
5248 )
5249 instances_number = vdu_delta.get("number-of-instances", 1)
5250 nb_scale_op += instances_number
5251
5252 new_instance_count = nb_scale_op + default_instance_num
5253 # Control if new count is over max and vdu count is less than max.
5254 # Then assign new instance count
5255 if new_instance_count > max_instance_count > vdu_count:
5256 instances_number = new_instance_count - max_instance_count
5257 else:
5258 instances_number = instances_number
5259
5260 if new_instance_count > max_instance_count:
5261 raise LcmException(
5262 "reached the limit of {} (max-instance-count) "
5263 "scaling-out operations for the "
5264 "scaling-group-descriptor '{}'".format(
5265 nb_scale_op, scaling_group
5266 )
5267 )
5268 for x in range(vdu_delta.get("number-of-instances", 1)):
5269 if cloud_init_text:
5270 # TODO Information of its own ip is not available because db_vnfr is not updated.
5271 additional_params["OSM"] = get_osm_params(
5272 db_vnfr, vdu_delta["id"], vdu_index + x
5273 )
5274 cloud_init_list.append(
5275 self._parse_cloud_init(
5276 cloud_init_text,
5277 additional_params,
5278 db_vnfd["id"],
5279 vdud["id"],
5280 )
5281 )
5282 vca_scaling_info.append(
5283 {
5284 "osm_vdu_id": vdu_delta["id"],
5285 "member-vnf-index": vnf_index,
5286 "type": "create",
5287 "vdu_index": vdu_index + x,
5288 }
5289 )
5290 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5291 for kdu_delta in delta.get("kdu-resource-delta", {}):
5292 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5293 kdu_name = kdu_profile["kdu-name"]
5294 resource_name = kdu_profile["resource-name"]
5295
5296 # Might have different kdus in the same delta
5297 # Should have list for each kdu
5298 if not scaling_info["kdu-create"].get(kdu_name, None):
5299 scaling_info["kdu-create"][kdu_name] = []
5300
5301 kdur = get_kdur(db_vnfr, kdu_name)
5302 if kdur.get("helm-chart"):
5303 k8s_cluster_type = "helm-chart-v3"
5304 self.logger.debug("kdur: {}".format(kdur))
5305 if (
5306 kdur.get("helm-version")
5307 and kdur.get("helm-version") == "v2"
5308 ):
5309 k8s_cluster_type = "helm-chart"
5310 raise NotImplementedError
5311 elif kdur.get("juju-bundle"):
5312 k8s_cluster_type = "juju-bundle"
5313 else:
5314 raise LcmException(
5315 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5316 "juju-bundle. Maybe an old NBI version is running".format(
5317 db_vnfr["member-vnf-index-ref"], kdu_name
5318 )
5319 )
5320
5321 max_instance_count = 10
5322 if kdu_profile and "max-number-of-instances" in kdu_profile:
5323 max_instance_count = kdu_profile.get(
5324 "max-number-of-instances", 10
5325 )
5326
5327 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5328 deployed_kdu, _ = get_deployed_kdu(
5329 nsr_deployed, kdu_name, vnf_index
5330 )
5331 if deployed_kdu is None:
5332 raise LcmException(
5333 "KDU '{}' for vnf '{}' not deployed".format(
5334 kdu_name, vnf_index
5335 )
5336 )
5337 kdu_instance = deployed_kdu.get("kdu-instance")
5338 instance_num = await self.k8scluster_map[
5339 k8s_cluster_type
5340 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5341 kdu_replica_count = instance_num + kdu_delta.get(
5342 "number-of-instances", 1
5343 )
5344
5345 # Control if new count is over max and instance_num is less than max.
5346 # Then assign max instance number to kdu replica count
5347 if kdu_replica_count > max_instance_count > instance_num:
5348 kdu_replica_count = max_instance_count
5349 if kdu_replica_count > max_instance_count:
5350 raise LcmException(
5351 "reached the limit of {} (max-instance-count) "
5352 "scaling-out operations for the "
5353 "scaling-group-descriptor '{}'".format(
5354 instance_num, scaling_group
5355 )
5356 )
5357
5358 for x in range(kdu_delta.get("number-of-instances", 1)):
5359 vca_scaling_info.append(
5360 {
5361 "osm_kdu_id": kdu_name,
5362 "member-vnf-index": vnf_index,
5363 "type": "create",
5364 "kdu_index": instance_num + x - 1,
5365 }
5366 )
5367 scaling_info["kdu-create"][kdu_name].append(
5368 {
5369 "member-vnf-index": vnf_index,
5370 "type": "create",
5371 "k8s-cluster-type": k8s_cluster_type,
5372 "resource-name": resource_name,
5373 "scale": kdu_replica_count,
5374 }
5375 )
5376 elif scaling_type == "SCALE_IN":
5377 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5378
5379 scaling_info["scaling_direction"] = "IN"
5380 scaling_info["vdu-delete"] = {}
5381 scaling_info["kdu-delete"] = {}
5382
5383 for delta in deltas:
5384 for vdu_delta in delta.get("vdu-delta", {}):
5385 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5386 min_instance_count = 0
5387 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5388 if vdu_profile and "min-number-of-instances" in vdu_profile:
5389 min_instance_count = vdu_profile["min-number-of-instances"]
5390
5391 default_instance_num = get_number_of_instances(
5392 db_vnfd, vdu_delta["id"]
5393 )
5394 instance_num = vdu_delta.get("number-of-instances", 1)
5395 nb_scale_op -= instance_num
5396
5397 new_instance_count = nb_scale_op + default_instance_num
5398
5399 if new_instance_count < min_instance_count < vdu_count:
5400 instances_number = min_instance_count - new_instance_count
5401 else:
5402 instances_number = instance_num
5403
5404 if new_instance_count < min_instance_count:
5405 raise LcmException(
5406 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5407 "scaling-group-descriptor '{}'".format(
5408 nb_scale_op, scaling_group
5409 )
5410 )
5411 for x in range(vdu_delta.get("number-of-instances", 1)):
5412 vca_scaling_info.append(
5413 {
5414 "osm_vdu_id": vdu_delta["id"],
5415 "member-vnf-index": vnf_index,
5416 "type": "delete",
5417 "vdu_index": vdu_index - 1 - x,
5418 }
5419 )
5420 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5421 for kdu_delta in delta.get("kdu-resource-delta", {}):
5422 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5423 kdu_name = kdu_profile["kdu-name"]
5424 resource_name = kdu_profile["resource-name"]
5425
5426 if not scaling_info["kdu-delete"].get(kdu_name, None):
5427 scaling_info["kdu-delete"][kdu_name] = []
5428
5429 kdur = get_kdur(db_vnfr, kdu_name)
5430 if kdur.get("helm-chart"):
5431 k8s_cluster_type = "helm-chart-v3"
5432 self.logger.debug("kdur: {}".format(kdur))
5433 if (
5434 kdur.get("helm-version")
5435 and kdur.get("helm-version") == "v2"
5436 ):
5437 k8s_cluster_type = "helm-chart"
5438 raise NotImplementedError
5439 elif kdur.get("juju-bundle"):
5440 k8s_cluster_type = "juju-bundle"
5441 else:
5442 raise LcmException(
5443 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5444 "juju-bundle. Maybe an old NBI version is running".format(
5445 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5446 )
5447 )
5448
5449 min_instance_count = 0
5450 if kdu_profile and "min-number-of-instances" in kdu_profile:
5451 min_instance_count = kdu_profile["min-number-of-instances"]
5452
5453 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5454 deployed_kdu, _ = get_deployed_kdu(
5455 nsr_deployed, kdu_name, vnf_index
5456 )
5457 if deployed_kdu is None:
5458 raise LcmException(
5459 "KDU '{}' for vnf '{}' not deployed".format(
5460 kdu_name, vnf_index
5461 )
5462 )
5463 kdu_instance = deployed_kdu.get("kdu-instance")
5464 instance_num = await self.k8scluster_map[
5465 k8s_cluster_type
5466 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5467 kdu_replica_count = instance_num - kdu_delta.get(
5468 "number-of-instances", 1
5469 )
5470
5471 if kdu_replica_count < min_instance_count < instance_num:
5472 kdu_replica_count = min_instance_count
5473 if kdu_replica_count < min_instance_count:
5474 raise LcmException(
5475 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5476 "scaling-group-descriptor '{}'".format(
5477 instance_num, scaling_group
5478 )
5479 )
5480
5481 for x in range(kdu_delta.get("number-of-instances", 1)):
5482 vca_scaling_info.append(
5483 {
5484 "osm_kdu_id": kdu_name,
5485 "member-vnf-index": vnf_index,
5486 "type": "delete",
5487 "kdu_index": instance_num - x - 1,
5488 }
5489 )
5490 scaling_info["kdu-delete"][kdu_name].append(
5491 {
5492 "member-vnf-index": vnf_index,
5493 "type": "delete",
5494 "k8s-cluster-type": k8s_cluster_type,
5495 "resource-name": resource_name,
5496 "scale": kdu_replica_count,
5497 }
5498 )
5499
5500 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5501 vdu_delete = copy(scaling_info.get("vdu-delete"))
5502 if scaling_info["scaling_direction"] == "IN":
5503 for vdur in reversed(db_vnfr["vdur"]):
5504 if vdu_delete.get(vdur["vdu-id-ref"]):
5505 vdu_delete[vdur["vdu-id-ref"]] -= 1
5506 scaling_info["vdu"].append(
5507 {
5508 "name": vdur.get("name") or vdur.get("vdu-name"),
5509 "vdu_id": vdur["vdu-id-ref"],
5510 "interface": [],
5511 }
5512 )
5513 for interface in vdur["interfaces"]:
5514 scaling_info["vdu"][-1]["interface"].append(
5515 {
5516 "name": interface["name"],
5517 "ip_address": interface["ip-address"],
5518 "mac_address": interface.get("mac-address"),
5519 }
5520 )
5521 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5522
5523 # PRE-SCALE BEGIN
5524 step = "Executing pre-scale vnf-config-primitive"
5525 if scaling_descriptor.get("scaling-config-action"):
5526 for scaling_config_action in scaling_descriptor[
5527 "scaling-config-action"
5528 ]:
5529 if (
5530 scaling_config_action.get("trigger") == "pre-scale-in"
5531 and scaling_type == "SCALE_IN"
5532 ) or (
5533 scaling_config_action.get("trigger") == "pre-scale-out"
5534 and scaling_type == "SCALE_OUT"
5535 ):
5536 vnf_config_primitive = scaling_config_action[
5537 "vnf-config-primitive-name-ref"
5538 ]
5539 step = db_nslcmop_update[
5540 "detailed-status"
5541 ] = "executing pre-scale scaling-config-action '{}'".format(
5542 vnf_config_primitive
5543 )
5544
5545 # look for primitive
5546 for config_primitive in (
5547 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5548 ).get("config-primitive", ()):
5549 if config_primitive["name"] == vnf_config_primitive:
5550 break
5551 else:
5552 raise LcmException(
5553 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5554 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5555 "primitive".format(scaling_group, vnf_config_primitive)
5556 )
5557
5558 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5559 if db_vnfr.get("additionalParamsForVnf"):
5560 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5561
5562 scale_process = "VCA"
5563 db_nsr_update["config-status"] = "configuring pre-scaling"
5564 primitive_params = self._map_primitive_params(
5565 config_primitive, {}, vnfr_params
5566 )
5567
5568 # Pre-scale retry check: Check if this sub-operation has been executed before
5569 op_index = self._check_or_add_scale_suboperation(
5570 db_nslcmop,
5571 vnf_index,
5572 vnf_config_primitive,
5573 primitive_params,
5574 "PRE-SCALE",
5575 )
5576 if op_index == self.SUBOPERATION_STATUS_SKIP:
5577 # Skip sub-operation
5578 result = "COMPLETED"
5579 result_detail = "Done"
5580 self.logger.debug(
5581 logging_text
5582 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5583 vnf_config_primitive, result, result_detail
5584 )
5585 )
5586 else:
5587 if op_index == self.SUBOPERATION_STATUS_NEW:
5588 # New sub-operation: Get index of this sub-operation
5589 op_index = (
5590 len(db_nslcmop.get("_admin", {}).get("operations"))
5591 - 1
5592 )
5593 self.logger.debug(
5594 logging_text
5595 + "vnf_config_primitive={} New sub-operation".format(
5596 vnf_config_primitive
5597 )
5598 )
5599 else:
5600 # retry: Get registered params for this existing sub-operation
5601 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5602 op_index
5603 ]
5604 vnf_index = op.get("member_vnf_index")
5605 vnf_config_primitive = op.get("primitive")
5606 primitive_params = op.get("primitive_params")
5607 self.logger.debug(
5608 logging_text
5609 + "vnf_config_primitive={} Sub-operation retry".format(
5610 vnf_config_primitive
5611 )
5612 )
5613 # Execute the primitive, either with new (first-time) or registered (reintent) args
5614 ee_descriptor_id = config_primitive.get(
5615 "execution-environment-ref"
5616 )
5617 primitive_name = config_primitive.get(
5618 "execution-environment-primitive", vnf_config_primitive
5619 )
5620 ee_id, vca_type = self._look_for_deployed_vca(
5621 nsr_deployed["VCA"],
5622 member_vnf_index=vnf_index,
5623 vdu_id=None,
5624 vdu_count_index=None,
5625 ee_descriptor_id=ee_descriptor_id,
5626 )
5627 result, result_detail = await self._ns_execute_primitive(
5628 ee_id,
5629 primitive_name,
5630 primitive_params,
5631 vca_type=vca_type,
5632 vca_id=vca_id,
5633 )
5634 self.logger.debug(
5635 logging_text
5636 + "vnf_config_primitive={} Done with result {} {}".format(
5637 vnf_config_primitive, result, result_detail
5638 )
5639 )
5640 # Update operationState = COMPLETED | FAILED
5641 self._update_suboperation_status(
5642 db_nslcmop, op_index, result, result_detail
5643 )
5644
5645 if result == "FAILED":
5646 raise LcmException(result_detail)
5647 db_nsr_update["config-status"] = old_config_status
5648 scale_process = None
5649 # PRE-SCALE END
5650
5651 db_nsr_update[
5652 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5653 ] = nb_scale_op
5654 db_nsr_update[
5655 "_admin.scaling-group.{}.time".format(admin_scale_index)
5656 ] = time()
5657
5658 # SCALE-IN VCA - BEGIN
5659 if vca_scaling_info:
5660 step = db_nslcmop_update[
5661 "detailed-status"
5662 ] = "Deleting the execution environments"
5663 scale_process = "VCA"
5664 for vca_info in vca_scaling_info:
5665 if vca_info["type"] == "delete":
5666 member_vnf_index = str(vca_info["member-vnf-index"])
5667 self.logger.debug(
5668 logging_text + "vdu info: {}".format(vca_info)
5669 )
5670 if vca_info.get("osm_vdu_id"):
5671 vdu_id = vca_info["osm_vdu_id"]
5672 vdu_index = int(vca_info["vdu_index"])
5673 stage[
5674 1
5675 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5676 member_vnf_index, vdu_id, vdu_index
5677 )
5678 else:
5679 vdu_index = 0
5680 kdu_id = vca_info["osm_kdu_id"]
5681 stage[
5682 1
5683 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5684 member_vnf_index, kdu_id, vdu_index
5685 )
5686 stage[2] = step = "Scaling in VCA"
5687 self._write_op_status(op_id=nslcmop_id, stage=stage)
5688 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5689 config_update = db_nsr["configurationStatus"]
5690 for vca_index, vca in enumerate(vca_update):
5691 if (
5692 (vca or vca.get("ee_id"))
5693 and vca["member-vnf-index"] == member_vnf_index
5694 and vca["vdu_count_index"] == vdu_index
5695 ):
5696 if vca.get("vdu_id"):
5697 config_descriptor = get_configuration(
5698 db_vnfd, vca.get("vdu_id")
5699 )
5700 elif vca.get("kdu_name"):
5701 config_descriptor = get_configuration(
5702 db_vnfd, vca.get("kdu_name")
5703 )
5704 else:
5705 config_descriptor = get_configuration(
5706 db_vnfd, db_vnfd["id"]
5707 )
5708 operation_params = (
5709 db_nslcmop.get("operationParams") or {}
5710 )
5711 exec_terminate_primitives = not operation_params.get(
5712 "skip_terminate_primitives"
5713 ) and vca.get("needed_terminate")
5714 task = asyncio.ensure_future(
5715 asyncio.wait_for(
5716 self.destroy_N2VC(
5717 logging_text,
5718 db_nslcmop,
5719 vca,
5720 config_descriptor,
5721 vca_index,
5722 destroy_ee=True,
5723 exec_primitives=exec_terminate_primitives,
5724 scaling_in=True,
5725 vca_id=vca_id,
5726 ),
5727 timeout=self.timeout_charm_delete,
5728 )
5729 )
5730 tasks_dict_info[task] = "Terminating VCA {}".format(
5731 vca.get("ee_id")
5732 )
5733 del vca_update[vca_index]
5734 del config_update[vca_index]
5735 # wait for pending tasks of terminate primitives
5736 if tasks_dict_info:
5737 self.logger.debug(
5738 logging_text
5739 + "Waiting for tasks {}".format(
5740 list(tasks_dict_info.keys())
5741 )
5742 )
5743 error_list = await self._wait_for_tasks(
5744 logging_text,
5745 tasks_dict_info,
5746 min(
5747 self.timeout_charm_delete, self.timeout_ns_terminate
5748 ),
5749 stage,
5750 nslcmop_id,
5751 )
5752 tasks_dict_info.clear()
5753 if error_list:
5754 raise LcmException("; ".join(error_list))
5755
5756 db_vca_and_config_update = {
5757 "_admin.deployed.VCA": vca_update,
5758 "configurationStatus": config_update,
5759 }
5760 self.update_db_2(
5761 "nsrs", db_nsr["_id"], db_vca_and_config_update
5762 )
5763 scale_process = None
5764 # SCALE-IN VCA - END
5765
5766 # SCALE RO - BEGIN
5767 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5768 scale_process = "RO"
5769 if self.ro_config.get("ng"):
5770 await self._scale_ng_ro(
5771 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5772 )
5773 scaling_info.pop("vdu-create", None)
5774 scaling_info.pop("vdu-delete", None)
5775
5776 scale_process = None
5777 # SCALE RO - END
5778
5779 # SCALE KDU - BEGIN
5780 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5781 scale_process = "KDU"
5782 await self._scale_kdu(
5783 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5784 )
5785 scaling_info.pop("kdu-create", None)
5786 scaling_info.pop("kdu-delete", None)
5787
5788 scale_process = None
5789 # SCALE KDU - END
5790
5791 if db_nsr_update:
5792 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5793
5794 # SCALE-UP VCA - BEGIN
5795 if vca_scaling_info:
5796 step = db_nslcmop_update[
5797 "detailed-status"
5798 ] = "Creating new execution environments"
5799 scale_process = "VCA"
5800 for vca_info in vca_scaling_info:
5801 if vca_info["type"] == "create":
5802 member_vnf_index = str(vca_info["member-vnf-index"])
5803 self.logger.debug(
5804 logging_text + "vdu info: {}".format(vca_info)
5805 )
5806 vnfd_id = db_vnfr["vnfd-ref"]
5807 if vca_info.get("osm_vdu_id"):
5808 vdu_index = int(vca_info["vdu_index"])
5809 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5810 if db_vnfr.get("additionalParamsForVnf"):
5811 deploy_params.update(
5812 parse_yaml_strings(
5813 db_vnfr["additionalParamsForVnf"].copy()
5814 )
5815 )
5816 descriptor_config = get_configuration(
5817 db_vnfd, db_vnfd["id"]
5818 )
5819 if descriptor_config:
5820 vdu_id = None
5821 vdu_name = None
5822 kdu_name = None
5823 self._deploy_n2vc(
5824 logging_text=logging_text
5825 + "member_vnf_index={} ".format(member_vnf_index),
5826 db_nsr=db_nsr,
5827 db_vnfr=db_vnfr,
5828 nslcmop_id=nslcmop_id,
5829 nsr_id=nsr_id,
5830 nsi_id=nsi_id,
5831 vnfd_id=vnfd_id,
5832 vdu_id=vdu_id,
5833 kdu_name=kdu_name,
5834 member_vnf_index=member_vnf_index,
5835 vdu_index=vdu_index,
5836 vdu_name=vdu_name,
5837 deploy_params=deploy_params,
5838 descriptor_config=descriptor_config,
5839 base_folder=base_folder,
5840 task_instantiation_info=tasks_dict_info,
5841 stage=stage,
5842 )
5843 vdu_id = vca_info["osm_vdu_id"]
5844 vdur = find_in_list(
5845 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5846 )
5847 descriptor_config = get_configuration(db_vnfd, vdu_id)
5848 if vdur.get("additionalParams"):
5849 deploy_params_vdu = parse_yaml_strings(
5850 vdur["additionalParams"]
5851 )
5852 else:
5853 deploy_params_vdu = deploy_params
5854 deploy_params_vdu["OSM"] = get_osm_params(
5855 db_vnfr, vdu_id, vdu_count_index=vdu_index
5856 )
5857 if descriptor_config:
5858 vdu_name = None
5859 kdu_name = None
5860 stage[
5861 1
5862 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5863 member_vnf_index, vdu_id, vdu_index
5864 )
5865 stage[2] = step = "Scaling out VCA"
5866 self._write_op_status(op_id=nslcmop_id, stage=stage)
5867 self._deploy_n2vc(
5868 logging_text=logging_text
5869 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5870 member_vnf_index, vdu_id, vdu_index
5871 ),
5872 db_nsr=db_nsr,
5873 db_vnfr=db_vnfr,
5874 nslcmop_id=nslcmop_id,
5875 nsr_id=nsr_id,
5876 nsi_id=nsi_id,
5877 vnfd_id=vnfd_id,
5878 vdu_id=vdu_id,
5879 kdu_name=kdu_name,
5880 member_vnf_index=member_vnf_index,
5881 vdu_index=vdu_index,
5882 vdu_name=vdu_name,
5883 deploy_params=deploy_params_vdu,
5884 descriptor_config=descriptor_config,
5885 base_folder=base_folder,
5886 task_instantiation_info=tasks_dict_info,
5887 stage=stage,
5888 )
5889 else:
5890 kdu_name = vca_info["osm_kdu_id"]
5891 descriptor_config = get_configuration(db_vnfd, kdu_name)
5892 if descriptor_config:
5893 vdu_id = None
5894 kdu_index = int(vca_info["kdu_index"])
5895 vdu_name = None
5896 kdur = next(
5897 x
5898 for x in db_vnfr["kdur"]
5899 if x["kdu-name"] == kdu_name
5900 )
5901 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5902 if kdur.get("additionalParams"):
5903 deploy_params_kdu = parse_yaml_strings(
5904 kdur["additionalParams"]
5905 )
5906
5907 self._deploy_n2vc(
5908 logging_text=logging_text,
5909 db_nsr=db_nsr,
5910 db_vnfr=db_vnfr,
5911 nslcmop_id=nslcmop_id,
5912 nsr_id=nsr_id,
5913 nsi_id=nsi_id,
5914 vnfd_id=vnfd_id,
5915 vdu_id=vdu_id,
5916 kdu_name=kdu_name,
5917 member_vnf_index=member_vnf_index,
5918 vdu_index=kdu_index,
5919 vdu_name=vdu_name,
5920 deploy_params=deploy_params_kdu,
5921 descriptor_config=descriptor_config,
5922 base_folder=base_folder,
5923 task_instantiation_info=tasks_dict_info,
5924 stage=stage,
5925 )
5926 # SCALE-UP VCA - END
5927 scale_process = None
5928
5929 # POST-SCALE BEGIN
5930 # execute primitive service POST-SCALING
5931 step = "Executing post-scale vnf-config-primitive"
5932 if scaling_descriptor.get("scaling-config-action"):
5933 for scaling_config_action in scaling_descriptor[
5934 "scaling-config-action"
5935 ]:
5936 if (
5937 scaling_config_action.get("trigger") == "post-scale-in"
5938 and scaling_type == "SCALE_IN"
5939 ) or (
5940 scaling_config_action.get("trigger") == "post-scale-out"
5941 and scaling_type == "SCALE_OUT"
5942 ):
5943 vnf_config_primitive = scaling_config_action[
5944 "vnf-config-primitive-name-ref"
5945 ]
5946 step = db_nslcmop_update[
5947 "detailed-status"
5948 ] = "executing post-scale scaling-config-action '{}'".format(
5949 vnf_config_primitive
5950 )
5951
5952 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5953 if db_vnfr.get("additionalParamsForVnf"):
5954 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5955
5956 # look for primitive
5957 for config_primitive in (
5958 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5959 ).get("config-primitive", ()):
5960 if config_primitive["name"] == vnf_config_primitive:
5961 break
5962 else:
5963 raise LcmException(
5964 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5965 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5966 "config-primitive".format(
5967 scaling_group, vnf_config_primitive
5968 )
5969 )
5970 scale_process = "VCA"
5971 db_nsr_update["config-status"] = "configuring post-scaling"
5972 primitive_params = self._map_primitive_params(
5973 config_primitive, {}, vnfr_params
5974 )
5975
5976 # Post-scale retry check: Check if this sub-operation has been executed before
5977 op_index = self._check_or_add_scale_suboperation(
5978 db_nslcmop,
5979 vnf_index,
5980 vnf_config_primitive,
5981 primitive_params,
5982 "POST-SCALE",
5983 )
5984 if op_index == self.SUBOPERATION_STATUS_SKIP:
5985 # Skip sub-operation
5986 result = "COMPLETED"
5987 result_detail = "Done"
5988 self.logger.debug(
5989 logging_text
5990 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5991 vnf_config_primitive, result, result_detail
5992 )
5993 )
5994 else:
5995 if op_index == self.SUBOPERATION_STATUS_NEW:
5996 # New sub-operation: Get index of this sub-operation
5997 op_index = (
5998 len(db_nslcmop.get("_admin", {}).get("operations"))
5999 - 1
6000 )
6001 self.logger.debug(
6002 logging_text
6003 + "vnf_config_primitive={} New sub-operation".format(
6004 vnf_config_primitive
6005 )
6006 )
6007 else:
6008 # retry: Get registered params for this existing sub-operation
6009 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6010 op_index
6011 ]
6012 vnf_index = op.get("member_vnf_index")
6013 vnf_config_primitive = op.get("primitive")
6014 primitive_params = op.get("primitive_params")
6015 self.logger.debug(
6016 logging_text
6017 + "vnf_config_primitive={} Sub-operation retry".format(
6018 vnf_config_primitive
6019 )
6020 )
6021 # Execute the primitive, either with new (first-time) or registered (reintent) args
6022 ee_descriptor_id = config_primitive.get(
6023 "execution-environment-ref"
6024 )
6025 primitive_name = config_primitive.get(
6026 "execution-environment-primitive", vnf_config_primitive
6027 )
6028 ee_id, vca_type = self._look_for_deployed_vca(
6029 nsr_deployed["VCA"],
6030 member_vnf_index=vnf_index,
6031 vdu_id=None,
6032 vdu_count_index=None,
6033 ee_descriptor_id=ee_descriptor_id,
6034 )
6035 result, result_detail = await self._ns_execute_primitive(
6036 ee_id,
6037 primitive_name,
6038 primitive_params,
6039 vca_type=vca_type,
6040 vca_id=vca_id,
6041 )
6042 self.logger.debug(
6043 logging_text
6044 + "vnf_config_primitive={} Done with result {} {}".format(
6045 vnf_config_primitive, result, result_detail
6046 )
6047 )
6048 # Update operationState = COMPLETED | FAILED
6049 self._update_suboperation_status(
6050 db_nslcmop, op_index, result, result_detail
6051 )
6052
6053 if result == "FAILED":
6054 raise LcmException(result_detail)
6055 db_nsr_update["config-status"] = old_config_status
6056 scale_process = None
6057 # POST-SCALE END
6058
6059 db_nsr_update[
6060 "detailed-status"
6061 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6062 db_nsr_update["operational-status"] = (
6063 "running"
6064 if old_operational_status == "failed"
6065 else old_operational_status
6066 )
6067 db_nsr_update["config-status"] = old_config_status
6068 return
6069 except (
6070 ROclient.ROClientException,
6071 DbException,
6072 LcmException,
6073 NgRoException,
6074 ) as e:
6075 self.logger.error(logging_text + "Exit Exception {}".format(e))
6076 exc = e
6077 except asyncio.CancelledError:
6078 self.logger.error(
6079 logging_text + "Cancelled Exception while '{}'".format(step)
6080 )
6081 exc = "Operation was cancelled"
6082 except Exception as e:
6083 exc = traceback.format_exc()
6084 self.logger.critical(
6085 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6086 exc_info=True,
6087 )
6088 finally:
6089 self._write_ns_status(
6090 nsr_id=nsr_id,
6091 ns_state=None,
6092 current_operation="IDLE",
6093 current_operation_id=None,
6094 )
6095 if tasks_dict_info:
6096 stage[1] = "Waiting for instantiate pending tasks."
6097 self.logger.debug(logging_text + stage[1])
6098 exc = await self._wait_for_tasks(
6099 logging_text,
6100 tasks_dict_info,
6101 self.timeout_ns_deploy,
6102 stage,
6103 nslcmop_id,
6104 nsr_id=nsr_id,
6105 )
6106 if exc:
6107 db_nslcmop_update[
6108 "detailed-status"
6109 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6110 nslcmop_operation_state = "FAILED"
6111 if db_nsr:
6112 db_nsr_update["operational-status"] = old_operational_status
6113 db_nsr_update["config-status"] = old_config_status
6114 db_nsr_update["detailed-status"] = ""
6115 if scale_process:
6116 if "VCA" in scale_process:
6117 db_nsr_update["config-status"] = "failed"
6118 if "RO" in scale_process:
6119 db_nsr_update["operational-status"] = "failed"
6120 db_nsr_update[
6121 "detailed-status"
6122 ] = "FAILED scaling nslcmop={} {}: {}".format(
6123 nslcmop_id, step, exc
6124 )
6125 else:
6126 error_description_nslcmop = None
6127 nslcmop_operation_state = "COMPLETED"
6128 db_nslcmop_update["detailed-status"] = "Done"
6129
6130 self._write_op_status(
6131 op_id=nslcmop_id,
6132 stage="",
6133 error_message=error_description_nslcmop,
6134 operation_state=nslcmop_operation_state,
6135 other_update=db_nslcmop_update,
6136 )
6137 if db_nsr:
6138 self._write_ns_status(
6139 nsr_id=nsr_id,
6140 ns_state=None,
6141 current_operation="IDLE",
6142 current_operation_id=None,
6143 other_update=db_nsr_update,
6144 )
6145
6146 if nslcmop_operation_state:
6147 try:
6148 msg = {
6149 "nsr_id": nsr_id,
6150 "nslcmop_id": nslcmop_id,
6151 "operationState": nslcmop_operation_state,
6152 }
6153 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6154 except Exception as e:
6155 self.logger.error(
6156 logging_text + "kafka_write notification Exception {}".format(e)
6157 )
6158 self.logger.debug(logging_text + "Exit")
6159 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6160
6161 async def _scale_kdu(
6162 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6163 ):
6164 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6165 for kdu_name in _scaling_info:
6166 for kdu_scaling_info in _scaling_info[kdu_name]:
6167 deployed_kdu, index = get_deployed_kdu(
6168 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6169 )
6170 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6171 kdu_instance = deployed_kdu["kdu-instance"]
6172 scale = int(kdu_scaling_info["scale"])
6173 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6174
6175 db_dict = {
6176 "collection": "nsrs",
6177 "filter": {"_id": nsr_id},
6178 "path": "_admin.deployed.K8s.{}".format(index),
6179 }
6180
6181 step = "scaling application {}".format(
6182 kdu_scaling_info["resource-name"]
6183 )
6184 self.logger.debug(logging_text + step)
6185
6186 if kdu_scaling_info["type"] == "delete":
6187 kdu_config = get_configuration(db_vnfd, kdu_name)
6188 if (
6189 kdu_config
6190 and kdu_config.get("terminate-config-primitive")
6191 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6192 ):
6193 terminate_config_primitive_list = kdu_config.get(
6194 "terminate-config-primitive"
6195 )
6196 terminate_config_primitive_list.sort(
6197 key=lambda val: int(val["seq"])
6198 )
6199
6200 for (
6201 terminate_config_primitive
6202 ) in terminate_config_primitive_list:
6203 primitive_params_ = self._map_primitive_params(
6204 terminate_config_primitive, {}, {}
6205 )
6206 step = "execute terminate config primitive"
6207 self.logger.debug(logging_text + step)
6208 await asyncio.wait_for(
6209 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6210 cluster_uuid=cluster_uuid,
6211 kdu_instance=kdu_instance,
6212 primitive_name=terminate_config_primitive["name"],
6213 params=primitive_params_,
6214 db_dict=db_dict,
6215 vca_id=vca_id,
6216 ),
6217 timeout=600,
6218 )
6219
6220 await asyncio.wait_for(
6221 self.k8scluster_map[k8s_cluster_type].scale(
6222 kdu_instance,
6223 scale,
6224 kdu_scaling_info["resource-name"],
6225 vca_id=vca_id,
6226 ),
6227 timeout=self.timeout_vca_on_error,
6228 )
6229
6230 if kdu_scaling_info["type"] == "create":
6231 kdu_config = get_configuration(db_vnfd, kdu_name)
6232 if (
6233 kdu_config
6234 and kdu_config.get("initial-config-primitive")
6235 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6236 ):
6237 initial_config_primitive_list = kdu_config.get(
6238 "initial-config-primitive"
6239 )
6240 initial_config_primitive_list.sort(
6241 key=lambda val: int(val["seq"])
6242 )
6243
6244 for initial_config_primitive in initial_config_primitive_list:
6245 primitive_params_ = self._map_primitive_params(
6246 initial_config_primitive, {}, {}
6247 )
6248 step = "execute initial config primitive"
6249 self.logger.debug(logging_text + step)
6250 await asyncio.wait_for(
6251 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6252 cluster_uuid=cluster_uuid,
6253 kdu_instance=kdu_instance,
6254 primitive_name=initial_config_primitive["name"],
6255 params=primitive_params_,
6256 db_dict=db_dict,
6257 vca_id=vca_id,
6258 ),
6259 timeout=600,
6260 )
6261
6262 async def _scale_ng_ro(
6263 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6264 ):
6265 nsr_id = db_nslcmop["nsInstanceId"]
6266 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6267 db_vnfrs = {}
6268
6269 # read from db: vnfd's for every vnf
6270 db_vnfds = []
6271
6272 # for each vnf in ns, read vnfd
6273 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6274 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6275 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6276 # if we haven't this vnfd, read it from db
6277 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6278 # read from db
6279 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6280 db_vnfds.append(vnfd)
6281 n2vc_key = self.n2vc.get_public_key()
6282 n2vc_key_list = [n2vc_key]
6283 self.scale_vnfr(
6284 db_vnfr,
6285 vdu_scaling_info.get("vdu-create"),
6286 vdu_scaling_info.get("vdu-delete"),
6287 mark_delete=True,
6288 )
6289 # db_vnfr has been updated, update db_vnfrs to use it
6290 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6291 await self._instantiate_ng_ro(
6292 logging_text,
6293 nsr_id,
6294 db_nsd,
6295 db_nsr,
6296 db_nslcmop,
6297 db_vnfrs,
6298 db_vnfds,
6299 n2vc_key_list,
6300 stage=stage,
6301 start_deploy=time(),
6302 timeout_ns_deploy=self.timeout_ns_deploy,
6303 )
6304 if vdu_scaling_info.get("vdu-delete"):
6305 self.scale_vnfr(
6306 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6307 )
6308
6309 async def add_prometheus_metrics(
6310 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6311 ):
6312 if not self.prometheus:
6313 return
6314 # look if exist a file called 'prometheus*.j2' and
6315 artifact_content = self.fs.dir_ls(artifact_path)
6316 job_file = next(
6317 (
6318 f
6319 for f in artifact_content
6320 if f.startswith("prometheus") and f.endswith(".j2")
6321 ),
6322 None,
6323 )
6324 if not job_file:
6325 return
6326 with self.fs.file_open((artifact_path, job_file), "r") as f:
6327 job_data = f.read()
6328
6329 # TODO get_service
6330 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6331 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6332 host_port = "80"
6333 vnfr_id = vnfr_id.replace("-", "")
6334 variables = {
6335 "JOB_NAME": vnfr_id,
6336 "TARGET_IP": target_ip,
6337 "EXPORTER_POD_IP": host_name,
6338 "EXPORTER_POD_PORT": host_port,
6339 }
6340 job_list = self.prometheus.parse_job(job_data, variables)
6341 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6342 for job in job_list:
6343 if (
6344 not isinstance(job.get("job_name"), str)
6345 or vnfr_id not in job["job_name"]
6346 ):
6347 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6348 job["nsr_id"] = nsr_id
6349 job_dict = {jl["job_name"]: jl for jl in job_list}
6350 if await self.prometheus.update(job_dict):
6351 return list(job_dict.keys())
6352
6353 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6354 """
6355 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6356
6357 :param: vim_account_id: VIM Account ID
6358
6359 :return: (cloud_name, cloud_credential)
6360 """
6361 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6362 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6363
6364 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6365 """
6366 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6367
6368 :param: vim_account_id: VIM Account ID
6369
6370 :return: (cloud_name, cloud_credential)
6371 """
6372 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6373 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")