Bug 1950 fixed to update the deploy_params_kdu dict instead of overwriting it
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
444 )
445 additional_params = vdur.get("additionalParams")
446 return parse_yaml_strings(additional_params)
447
448 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
449 """
450 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
451 :param vnfd: input vnfd
452 :param new_id: overrides vnf id if provided
453 :param additionalParams: Instantiation params for VNFs provided
454 :param nsrId: Id of the NSR
455 :return: copy of vnfd
456 """
457 vnfd_RO = deepcopy(vnfd)
458 # remove unused by RO configuration, monitoring, scaling and internal keys
459 vnfd_RO.pop("_id", None)
460 vnfd_RO.pop("_admin", None)
461 vnfd_RO.pop("monitoring-param", None)
462 vnfd_RO.pop("scaling-group-descriptor", None)
463 vnfd_RO.pop("kdu", None)
464 vnfd_RO.pop("k8s-cluster", None)
465 if new_id:
466 vnfd_RO["id"] = new_id
467
468 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
469 for vdu in get_iterable(vnfd_RO, "vdu"):
470 vdu.pop("cloud-init-file", None)
471 vdu.pop("cloud-init", None)
472 return vnfd_RO
473
474 @staticmethod
475 def ip_profile_2_RO(ip_profile):
476 RO_ip_profile = deepcopy(ip_profile)
477 if "dns-server" in RO_ip_profile:
478 if isinstance(RO_ip_profile["dns-server"], list):
479 RO_ip_profile["dns-address"] = []
480 for ds in RO_ip_profile.pop("dns-server"):
481 RO_ip_profile["dns-address"].append(ds["address"])
482 else:
483 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
484 if RO_ip_profile.get("ip-version") == "ipv4":
485 RO_ip_profile["ip-version"] = "IPv4"
486 if RO_ip_profile.get("ip-version") == "ipv6":
487 RO_ip_profile["ip-version"] = "IPv6"
488 if "dhcp-params" in RO_ip_profile:
489 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
490 return RO_ip_profile
491
492 def _get_ro_vim_id_for_vim_account(self, vim_account):
493 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
494 if db_vim["_admin"]["operationalState"] != "ENABLED":
495 raise LcmException(
496 "VIM={} is not available. operationalState={}".format(
497 vim_account, db_vim["_admin"]["operationalState"]
498 )
499 )
500 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
501 return RO_vim_id
502
503 def get_ro_wim_id_for_wim_account(self, wim_account):
504 if isinstance(wim_account, str):
505 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
506 if db_wim["_admin"]["operationalState"] != "ENABLED":
507 raise LcmException(
508 "WIM={} is not available. operationalState={}".format(
509 wim_account, db_wim["_admin"]["operationalState"]
510 )
511 )
512 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
513 return RO_wim_id
514 else:
515 return wim_account
516
517 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
518
519 db_vdu_push_list = []
520 db_update = {"_admin.modified": time()}
521 if vdu_create:
522 for vdu_id, vdu_count in vdu_create.items():
523 vdur = next(
524 (
525 vdur
526 for vdur in reversed(db_vnfr["vdur"])
527 if vdur["vdu-id-ref"] == vdu_id
528 ),
529 None,
530 )
531 if not vdur:
532 raise LcmException(
533 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
534 vdu_id
535 )
536 )
537
538 for count in range(vdu_count):
539 vdur_copy = deepcopy(vdur)
540 vdur_copy["status"] = "BUILD"
541 vdur_copy["status-detailed"] = None
542 vdur_copy["ip-address"] = None
543 vdur_copy["_id"] = str(uuid4())
544 vdur_copy["count-index"] += count + 1
545 vdur_copy["id"] = "{}-{}".format(
546 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
547 )
548 vdur_copy.pop("vim_info", None)
549 for iface in vdur_copy["interfaces"]:
550 if iface.get("fixed-ip"):
551 iface["ip-address"] = self.increment_ip_mac(
552 iface["ip-address"], count + 1
553 )
554 else:
555 iface.pop("ip-address", None)
556 if iface.get("fixed-mac"):
557 iface["mac-address"] = self.increment_ip_mac(
558 iface["mac-address"], count + 1
559 )
560 else:
561 iface.pop("mac-address", None)
562 iface.pop(
563 "mgmt_vnf", None
564 ) # only first vdu can be managment of vnf
565 db_vdu_push_list.append(vdur_copy)
566 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
567 if vdu_delete:
568 for vdu_id, vdu_count in vdu_delete.items():
569 if mark_delete:
570 indexes_to_delete = [
571 iv[0]
572 for iv in enumerate(db_vnfr["vdur"])
573 if iv[1]["vdu-id-ref"] == vdu_id
574 ]
575 db_update.update(
576 {
577 "vdur.{}.status".format(i): "DELETING"
578 for i in indexes_to_delete[-vdu_count:]
579 }
580 )
581 else:
582 # it must be deleted one by one because common.db does not allow otherwise
583 vdus_to_delete = [
584 v
585 for v in reversed(db_vnfr["vdur"])
586 if v["vdu-id-ref"] == vdu_id
587 ]
588 for vdu in vdus_to_delete[:vdu_count]:
589 self.db.set_one(
590 "vnfrs",
591 {"_id": db_vnfr["_id"]},
592 None,
593 pull={"vdur": {"_id": vdu["_id"]}},
594 )
595 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
596 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
597 # modify passed dictionary db_vnfr
598 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
599 db_vnfr["vdur"] = db_vnfr_["vdur"]
600
601 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
602 """
603 Updates database nsr with the RO info for the created vld
604 :param ns_update_nsr: dictionary to be filled with the updated info
605 :param db_nsr: content of db_nsr. This is also modified
606 :param nsr_desc_RO: nsr descriptor from RO
607 :return: Nothing, LcmException is raised on errors
608 """
609
610 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
611 for net_RO in get_iterable(nsr_desc_RO, "nets"):
612 if vld["id"] != net_RO.get("ns_net_osm_id"):
613 continue
614 vld["vim-id"] = net_RO.get("vim_net_id")
615 vld["name"] = net_RO.get("vim_name")
616 vld["status"] = net_RO.get("status")
617 vld["status-detailed"] = net_RO.get("error_msg")
618 ns_update_nsr["vld.{}".format(vld_index)] = vld
619 break
620 else:
621 raise LcmException(
622 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
623 )
624
625 def set_vnfr_at_error(self, db_vnfrs, error_text):
626 try:
627 for db_vnfr in db_vnfrs.values():
628 vnfr_update = {"status": "ERROR"}
629 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
630 if "status" not in vdur:
631 vdur["status"] = "ERROR"
632 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
633 if error_text:
634 vdur["status-detailed"] = str(error_text)
635 vnfr_update[
636 "vdur.{}.status-detailed".format(vdu_index)
637 ] = "ERROR"
638 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
639 except DbException as e:
640 self.logger.error("Cannot update vnf. {}".format(e))
641
642 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
643 """
644 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
645 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
646 :param nsr_desc_RO: nsr descriptor from RO
647 :return: Nothing, LcmException is raised on errors
648 """
649 for vnf_index, db_vnfr in db_vnfrs.items():
650 for vnf_RO in nsr_desc_RO["vnfs"]:
651 if vnf_RO["member_vnf_index"] != vnf_index:
652 continue
653 vnfr_update = {}
654 if vnf_RO.get("ip_address"):
655 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
656 "ip_address"
657 ].split(";")[0]
658 elif not db_vnfr.get("ip-address"):
659 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
660 raise LcmExceptionNoMgmtIP(
661 "ns member_vnf_index '{}' has no IP address".format(
662 vnf_index
663 )
664 )
665
666 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
667 vdur_RO_count_index = 0
668 if vdur.get("pdu-type"):
669 continue
670 for vdur_RO in get_iterable(vnf_RO, "vms"):
671 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
672 continue
673 if vdur["count-index"] != vdur_RO_count_index:
674 vdur_RO_count_index += 1
675 continue
676 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
677 if vdur_RO.get("ip_address"):
678 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
679 else:
680 vdur["ip-address"] = None
681 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
682 vdur["name"] = vdur_RO.get("vim_name")
683 vdur["status"] = vdur_RO.get("status")
684 vdur["status-detailed"] = vdur_RO.get("error_msg")
685 for ifacer in get_iterable(vdur, "interfaces"):
686 for interface_RO in get_iterable(vdur_RO, "interfaces"):
687 if ifacer["name"] == interface_RO.get("internal_name"):
688 ifacer["ip-address"] = interface_RO.get(
689 "ip_address"
690 )
691 ifacer["mac-address"] = interface_RO.get(
692 "mac_address"
693 )
694 break
695 else:
696 raise LcmException(
697 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
698 "from VIM info".format(
699 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
700 )
701 )
702 vnfr_update["vdur.{}".format(vdu_index)] = vdur
703 break
704 else:
705 raise LcmException(
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
707 "VIM info".format(
708 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
709 )
710 )
711
712 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
713 for net_RO in get_iterable(nsr_desc_RO, "nets"):
714 if vld["id"] != net_RO.get("vnf_net_osm_id"):
715 continue
716 vld["vim-id"] = net_RO.get("vim_net_id")
717 vld["name"] = net_RO.get("vim_name")
718 vld["status"] = net_RO.get("status")
719 vld["status-detailed"] = net_RO.get("error_msg")
720 vnfr_update["vld.{}".format(vld_index)] = vld
721 break
722 else:
723 raise LcmException(
724 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
725 vnf_index, vld["id"]
726 )
727 )
728
729 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
730 break
731
732 else:
733 raise LcmException(
734 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
735 vnf_index
736 )
737 )
738
739 def _get_ns_config_info(self, nsr_id):
740 """
741 Generates a mapping between vnf,vdu elements and the N2VC id
742 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
743 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
744 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
745 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
746 """
747 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
748 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
749 mapping = {}
750 ns_config_info = {"osm-config-mapping": mapping}
751 for vca in vca_deployed_list:
752 if not vca["member-vnf-index"]:
753 continue
754 if not vca["vdu_id"]:
755 mapping[vca["member-vnf-index"]] = vca["application"]
756 else:
757 mapping[
758 "{}.{}.{}".format(
759 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
760 )
761 ] = vca["application"]
762 return ns_config_info
763
764 async def _instantiate_ng_ro(
765 self,
766 logging_text,
767 nsr_id,
768 nsd,
769 db_nsr,
770 db_nslcmop,
771 db_vnfrs,
772 db_vnfds,
773 n2vc_key_list,
774 stage,
775 start_deploy,
776 timeout_ns_deploy,
777 ):
778
779 db_vims = {}
780
781 def get_vim_account(vim_account_id):
782 nonlocal db_vims
783 if vim_account_id in db_vims:
784 return db_vims[vim_account_id]
785 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
786 db_vims[vim_account_id] = db_vim
787 return db_vim
788
789 # modify target_vld info with instantiation parameters
790 def parse_vld_instantiation_params(
791 target_vim, target_vld, vld_params, target_sdn
792 ):
793 if vld_params.get("ip-profile"):
794 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
795 "ip-profile"
796 ]
797 if vld_params.get("provider-network"):
798 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
799 "provider-network"
800 ]
801 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
802 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
803 "provider-network"
804 ]["sdn-ports"]
805 if vld_params.get("wimAccountId"):
806 target_wim = "wim:{}".format(vld_params["wimAccountId"])
807 target_vld["vim_info"][target_wim] = {}
808 for param in ("vim-network-name", "vim-network-id"):
809 if vld_params.get(param):
810 if isinstance(vld_params[param], dict):
811 for vim, vim_net in vld_params[param].items():
812 other_target_vim = "vim:" + vim
813 populate_dict(
814 target_vld["vim_info"],
815 (other_target_vim, param.replace("-", "_")),
816 vim_net,
817 )
818 else: # isinstance str
819 target_vld["vim_info"][target_vim][
820 param.replace("-", "_")
821 ] = vld_params[param]
822 if vld_params.get("common_id"):
823 target_vld["common_id"] = vld_params.get("common_id")
824
825 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
826 def update_ns_vld_target(target, ns_params):
827 for vnf_params in ns_params.get("vnf", ()):
828 if vnf_params.get("vimAccountId"):
829 target_vnf = next(
830 (
831 vnfr
832 for vnfr in db_vnfrs.values()
833 if vnf_params["member-vnf-index"]
834 == vnfr["member-vnf-index-ref"]
835 ),
836 None,
837 )
838 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
839 for a_index, a_vld in enumerate(target["ns"]["vld"]):
840 target_vld = find_in_list(
841 get_iterable(vdur, "interfaces"),
842 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
843 )
844 if target_vld:
845 if vnf_params.get("vimAccountId") not in a_vld.get(
846 "vim_info", {}
847 ):
848 target["ns"]["vld"][a_index].get("vim_info").update(
849 {
850 "vim:{}".format(vnf_params["vimAccountId"]): {
851 "vim_network_name": ""
852 }
853 }
854 )
855
856 nslcmop_id = db_nslcmop["_id"]
857 target = {
858 "name": db_nsr["name"],
859 "ns": {"vld": []},
860 "vnf": [],
861 "image": deepcopy(db_nsr["image"]),
862 "flavor": deepcopy(db_nsr["flavor"]),
863 "action_id": nslcmop_id,
864 "cloud_init_content": {},
865 }
866 for image in target["image"]:
867 image["vim_info"] = {}
868 for flavor in target["flavor"]:
869 flavor["vim_info"] = {}
870
871 if db_nslcmop.get("lcmOperationType") != "instantiate":
872 # get parameters of instantiation:
873 db_nslcmop_instantiate = self.db.get_list(
874 "nslcmops",
875 {
876 "nsInstanceId": db_nslcmop["nsInstanceId"],
877 "lcmOperationType": "instantiate",
878 },
879 )[-1]
880 ns_params = db_nslcmop_instantiate.get("operationParams")
881 else:
882 ns_params = db_nslcmop.get("operationParams")
883 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
884 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
885
886 cp2target = {}
887 for vld_index, vld in enumerate(db_nsr.get("vld")):
888 target_vim = "vim:{}".format(ns_params["vimAccountId"])
889 target_vld = {
890 "id": vld["id"],
891 "name": vld["name"],
892 "mgmt-network": vld.get("mgmt-network", False),
893 "type": vld.get("type"),
894 "vim_info": {
895 target_vim: {
896 "vim_network_name": vld.get("vim-network-name"),
897 "vim_account_id": ns_params["vimAccountId"],
898 }
899 },
900 }
901 # check if this network needs SDN assist
902 if vld.get("pci-interfaces"):
903 db_vim = get_vim_account(ns_params["vimAccountId"])
904 sdnc_id = db_vim["config"].get("sdn-controller")
905 if sdnc_id:
906 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
907 target_sdn = "sdn:{}".format(sdnc_id)
908 target_vld["vim_info"][target_sdn] = {
909 "sdn": True,
910 "target_vim": target_vim,
911 "vlds": [sdn_vld],
912 "type": vld.get("type"),
913 }
914
915 nsd_vnf_profiles = get_vnf_profiles(nsd)
916 for nsd_vnf_profile in nsd_vnf_profiles:
917 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
918 if cp["virtual-link-profile-id"] == vld["id"]:
919 cp2target[
920 "member_vnf:{}.{}".format(
921 cp["constituent-cpd-id"][0][
922 "constituent-base-element-id"
923 ],
924 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
925 )
926 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
927
928 # check at nsd descriptor, if there is an ip-profile
929 vld_params = {}
930 nsd_vlp = find_in_list(
931 get_virtual_link_profiles(nsd),
932 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
933 == vld["id"],
934 )
935 if (
936 nsd_vlp
937 and nsd_vlp.get("virtual-link-protocol-data")
938 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
939 ):
940 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
941 "l3-protocol-data"
942 ]
943 ip_profile_dest_data = {}
944 if "ip-version" in ip_profile_source_data:
945 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
946 "ip-version"
947 ]
948 if "cidr" in ip_profile_source_data:
949 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
950 "cidr"
951 ]
952 if "gateway-ip" in ip_profile_source_data:
953 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
954 "gateway-ip"
955 ]
956 if "dhcp-enabled" in ip_profile_source_data:
957 ip_profile_dest_data["dhcp-params"] = {
958 "enabled": ip_profile_source_data["dhcp-enabled"]
959 }
960 vld_params["ip-profile"] = ip_profile_dest_data
961
962 # update vld_params with instantiation params
963 vld_instantiation_params = find_in_list(
964 get_iterable(ns_params, "vld"),
965 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
966 )
967 if vld_instantiation_params:
968 vld_params.update(vld_instantiation_params)
969 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
970 target["ns"]["vld"].append(target_vld)
971 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
972 update_ns_vld_target(target, ns_params)
973
974 for vnfr in db_vnfrs.values():
975 vnfd = find_in_list(
976 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
977 )
978 vnf_params = find_in_list(
979 get_iterable(ns_params, "vnf"),
980 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
981 )
982 target_vnf = deepcopy(vnfr)
983 target_vim = "vim:{}".format(vnfr["vim-account-id"])
984 for vld in target_vnf.get("vld", ()):
985 # check if connected to a ns.vld, to fill target'
986 vnf_cp = find_in_list(
987 vnfd.get("int-virtual-link-desc", ()),
988 lambda cpd: cpd.get("id") == vld["id"],
989 )
990 if vnf_cp:
991 ns_cp = "member_vnf:{}.{}".format(
992 vnfr["member-vnf-index-ref"], vnf_cp["id"]
993 )
994 if cp2target.get(ns_cp):
995 vld["target"] = cp2target[ns_cp]
996
997 vld["vim_info"] = {
998 target_vim: {"vim_network_name": vld.get("vim-network-name")}
999 }
1000 # check if this network needs SDN assist
1001 target_sdn = None
1002 if vld.get("pci-interfaces"):
1003 db_vim = get_vim_account(vnfr["vim-account-id"])
1004 sdnc_id = db_vim["config"].get("sdn-controller")
1005 if sdnc_id:
1006 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1007 target_sdn = "sdn:{}".format(sdnc_id)
1008 vld["vim_info"][target_sdn] = {
1009 "sdn": True,
1010 "target_vim": target_vim,
1011 "vlds": [sdn_vld],
1012 "type": vld.get("type"),
1013 }
1014
1015 # check at vnfd descriptor, if there is an ip-profile
1016 vld_params = {}
1017 vnfd_vlp = find_in_list(
1018 get_virtual_link_profiles(vnfd),
1019 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1020 )
1021 if (
1022 vnfd_vlp
1023 and vnfd_vlp.get("virtual-link-protocol-data")
1024 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1025 ):
1026 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1027 "l3-protocol-data"
1028 ]
1029 ip_profile_dest_data = {}
1030 if "ip-version" in ip_profile_source_data:
1031 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1032 "ip-version"
1033 ]
1034 if "cidr" in ip_profile_source_data:
1035 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1036 "cidr"
1037 ]
1038 if "gateway-ip" in ip_profile_source_data:
1039 ip_profile_dest_data[
1040 "gateway-address"
1041 ] = ip_profile_source_data["gateway-ip"]
1042 if "dhcp-enabled" in ip_profile_source_data:
1043 ip_profile_dest_data["dhcp-params"] = {
1044 "enabled": ip_profile_source_data["dhcp-enabled"]
1045 }
1046
1047 vld_params["ip-profile"] = ip_profile_dest_data
1048 # update vld_params with instantiation params
1049 if vnf_params:
1050 vld_instantiation_params = find_in_list(
1051 get_iterable(vnf_params, "internal-vld"),
1052 lambda i_vld: i_vld["name"] == vld["id"],
1053 )
1054 if vld_instantiation_params:
1055 vld_params.update(vld_instantiation_params)
1056 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1057
1058 vdur_list = []
1059 for vdur in target_vnf.get("vdur", ()):
1060 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1061 continue # This vdu must not be created
1062 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1063
1064 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1065
1066 if ssh_keys_all:
1067 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1068 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1069 if (
1070 vdu_configuration
1071 and vdu_configuration.get("config-access")
1072 and vdu_configuration.get("config-access").get("ssh-access")
1073 ):
1074 vdur["ssh-keys"] = ssh_keys_all
1075 vdur["ssh-access-required"] = vdu_configuration[
1076 "config-access"
1077 ]["ssh-access"]["required"]
1078 elif (
1079 vnf_configuration
1080 and vnf_configuration.get("config-access")
1081 and vnf_configuration.get("config-access").get("ssh-access")
1082 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1083 ):
1084 vdur["ssh-keys"] = ssh_keys_all
1085 vdur["ssh-access-required"] = vnf_configuration[
1086 "config-access"
1087 ]["ssh-access"]["required"]
1088 elif ssh_keys_instantiation and find_in_list(
1089 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1090 ):
1091 vdur["ssh-keys"] = ssh_keys_instantiation
1092
1093 self.logger.debug("NS > vdur > {}".format(vdur))
1094
1095 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1096 # cloud-init
1097 if vdud.get("cloud-init-file"):
1098 vdur["cloud-init"] = "{}:file:{}".format(
1099 vnfd["_id"], vdud.get("cloud-init-file")
1100 )
1101 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1102 if vdur["cloud-init"] not in target["cloud_init_content"]:
1103 base_folder = vnfd["_admin"]["storage"]
1104 if base_folder["pkg-dir"]:
1105 cloud_init_file = "{}/{}/cloud_init/{}".format(
1106 base_folder["folder"],
1107 base_folder["pkg-dir"],
1108 vdud.get("cloud-init-file"),
1109 )
1110 else:
1111 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1112 base_folder["folder"],
1113 vdud.get("cloud-init-file"),
1114 )
1115 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1116 target["cloud_init_content"][
1117 vdur["cloud-init"]
1118 ] = ci_file.read()
1119 elif vdud.get("cloud-init"):
1120 vdur["cloud-init"] = "{}:vdu:{}".format(
1121 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1122 )
1123 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1124 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1125 "cloud-init"
1126 ]
1127 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1128 deploy_params_vdu = self._format_additional_params(
1129 vdur.get("additionalParams") or {}
1130 )
1131 deploy_params_vdu["OSM"] = get_osm_params(
1132 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1133 )
1134 vdur["additionalParams"] = deploy_params_vdu
1135
1136 # flavor
1137 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1138 if target_vim not in ns_flavor["vim_info"]:
1139 ns_flavor["vim_info"][target_vim] = {}
1140
1141 # deal with images
1142 # in case alternative images are provided we must check if they should be applied
1143 # for the vim_type, modify the vim_type taking into account
1144 ns_image_id = int(vdur["ns-image-id"])
1145 if vdur.get("alt-image-ids"):
1146 db_vim = get_vim_account(vnfr["vim-account-id"])
1147 vim_type = db_vim["vim_type"]
1148 for alt_image_id in vdur.get("alt-image-ids"):
1149 ns_alt_image = target["image"][int(alt_image_id)]
1150 if vim_type == ns_alt_image.get("vim-type"):
1151 # must use alternative image
1152 self.logger.debug(
1153 "use alternative image id: {}".format(alt_image_id)
1154 )
1155 ns_image_id = alt_image_id
1156 vdur["ns-image-id"] = ns_image_id
1157 break
1158 ns_image = target["image"][int(ns_image_id)]
1159 if target_vim not in ns_image["vim_info"]:
1160 ns_image["vim_info"][target_vim] = {}
1161
1162 vdur["vim_info"] = {target_vim: {}}
1163 # instantiation parameters
1164 # if vnf_params:
1165 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1166 # vdud["id"]), None)
1167 vdur_list.append(vdur)
1168 target_vnf["vdur"] = vdur_list
1169 target["vnf"].append(target_vnf)
1170
1171 desc = await self.RO.deploy(nsr_id, target)
1172 self.logger.debug("RO return > {}".format(desc))
1173 action_id = desc["action_id"]
1174 await self._wait_ng_ro(
1175 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1176 )
1177
1178 # Updating NSR
1179 db_nsr_update = {
1180 "_admin.deployed.RO.operational-status": "running",
1181 "detailed-status": " ".join(stage),
1182 }
1183 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1185 self._write_op_status(nslcmop_id, stage)
1186 self.logger.debug(
1187 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1188 )
1189 return
1190
1191 async def _wait_ng_ro(
1192 self,
1193 nsr_id,
1194 action_id,
1195 nslcmop_id=None,
1196 start_time=None,
1197 timeout=600,
1198 stage=None,
1199 ):
1200 detailed_status_old = None
1201 db_nsr_update = {}
1202 start_time = start_time or time()
1203 while time() <= start_time + timeout:
1204 desc_status = await self.RO.status(nsr_id, action_id)
1205 self.logger.debug("Wait NG RO > {}".format(desc_status))
1206 if desc_status["status"] == "FAILED":
1207 raise NgRoException(desc_status["details"])
1208 elif desc_status["status"] == "BUILD":
1209 if stage:
1210 stage[2] = "VIM: ({})".format(desc_status["details"])
1211 elif desc_status["status"] == "DONE":
1212 if stage:
1213 stage[2] = "Deployed at VIM"
1214 break
1215 else:
1216 assert False, "ROclient.check_ns_status returns unknown {}".format(
1217 desc_status["status"]
1218 )
1219 if stage and nslcmop_id and stage[2] != detailed_status_old:
1220 detailed_status_old = stage[2]
1221 db_nsr_update["detailed-status"] = " ".join(stage)
1222 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1223 self._write_op_status(nslcmop_id, stage)
1224 await asyncio.sleep(15, loop=self.loop)
1225 else: # timeout_ns_deploy
1226 raise NgRoException("Timeout waiting ns to deploy")
1227
1228 async def _terminate_ng_ro(
1229 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1230 ):
1231 db_nsr_update = {}
1232 failed_detail = []
1233 action_id = None
1234 start_deploy = time()
1235 try:
1236 target = {
1237 "ns": {"vld": []},
1238 "vnf": [],
1239 "image": [],
1240 "flavor": [],
1241 "action_id": nslcmop_id,
1242 }
1243 desc = await self.RO.deploy(nsr_id, target)
1244 action_id = desc["action_id"]
1245 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1246 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1247 self.logger.debug(
1248 logging_text
1249 + "ns terminate action at RO. action_id={}".format(action_id)
1250 )
1251
1252 # wait until done
1253 delete_timeout = 20 * 60 # 20 minutes
1254 await self._wait_ng_ro(
1255 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1256 )
1257
1258 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1260 # delete all nsr
1261 await self.RO.delete(nsr_id)
1262 except Exception as e:
1263 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1264 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1265 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1266 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1267 self.logger.debug(
1268 logging_text + "RO_action_id={} already deleted".format(action_id)
1269 )
1270 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1271 failed_detail.append("delete conflict: {}".format(e))
1272 self.logger.debug(
1273 logging_text
1274 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1275 )
1276 else:
1277 failed_detail.append("delete error: {}".format(e))
1278 self.logger.error(
1279 logging_text
1280 + "RO_action_id={} delete error: {}".format(action_id, e)
1281 )
1282
1283 if failed_detail:
1284 stage[2] = "Error deleting from VIM"
1285 else:
1286 stage[2] = "Deleted from VIM"
1287 db_nsr_update["detailed-status"] = " ".join(stage)
1288 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1289 self._write_op_status(nslcmop_id, stage)
1290
1291 if failed_detail:
1292 raise LcmException("; ".join(failed_detail))
1293 return
1294
1295 async def instantiate_RO(
1296 self,
1297 logging_text,
1298 nsr_id,
1299 nsd,
1300 db_nsr,
1301 db_nslcmop,
1302 db_vnfrs,
1303 db_vnfds,
1304 n2vc_key_list,
1305 stage,
1306 ):
1307 """
1308 Instantiate at RO
1309 :param logging_text: preffix text to use at logging
1310 :param nsr_id: nsr identity
1311 :param nsd: database content of ns descriptor
1312 :param db_nsr: database content of ns record
1313 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1314 :param db_vnfrs:
1315 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1316 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1317 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1318 :return: None or exception
1319 """
1320 try:
1321 start_deploy = time()
1322 ns_params = db_nslcmop.get("operationParams")
1323 if ns_params and ns_params.get("timeout_ns_deploy"):
1324 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1325 else:
1326 timeout_ns_deploy = self.timeout.get(
1327 "ns_deploy", self.timeout_ns_deploy
1328 )
1329
1330 # Check for and optionally request placement optimization. Database will be updated if placement activated
1331 stage[2] = "Waiting for Placement."
1332 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1333 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1334 for vnfr in db_vnfrs.values():
1335 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1336 break
1337 else:
1338 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1339
1340 return await self._instantiate_ng_ro(
1341 logging_text,
1342 nsr_id,
1343 nsd,
1344 db_nsr,
1345 db_nslcmop,
1346 db_vnfrs,
1347 db_vnfds,
1348 n2vc_key_list,
1349 stage,
1350 start_deploy,
1351 timeout_ns_deploy,
1352 )
1353 except Exception as e:
1354 stage[2] = "ERROR deploying at VIM"
1355 self.set_vnfr_at_error(db_vnfrs, str(e))
1356 self.logger.error(
1357 "Error deploying at VIM {}".format(e),
1358 exc_info=not isinstance(
1359 e,
1360 (
1361 ROclient.ROClientException,
1362 LcmException,
1363 DbException,
1364 NgRoException,
1365 ),
1366 ),
1367 )
1368 raise
1369
1370 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1371 """
1372 Wait for kdu to be up, get ip address
1373 :param logging_text: prefix use for logging
1374 :param nsr_id:
1375 :param vnfr_id:
1376 :param kdu_name:
1377 :return: IP address
1378 """
1379
1380 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1381 nb_tries = 0
1382
1383 while nb_tries < 360:
1384 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1385 kdur = next(
1386 (
1387 x
1388 for x in get_iterable(db_vnfr, "kdur")
1389 if x.get("kdu-name") == kdu_name
1390 ),
1391 None,
1392 )
1393 if not kdur:
1394 raise LcmException(
1395 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1396 )
1397 if kdur.get("status"):
1398 if kdur["status"] in ("READY", "ENABLED"):
1399 return kdur.get("ip-address")
1400 else:
1401 raise LcmException(
1402 "target KDU={} is in error state".format(kdu_name)
1403 )
1404
1405 await asyncio.sleep(10, loop=self.loop)
1406 nb_tries += 1
1407 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1408
1409 async def wait_vm_up_insert_key_ro(
1410 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1411 ):
1412 """
1413 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param vdu_id:
1418 :param vdu_index:
1419 :param pub_key: public ssh key to inject, None to skip
1420 :param user: user to apply the public ssh key
1421 :return: IP address
1422 """
1423
1424 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1425 ro_nsr_id = None
1426 ip_address = None
1427 nb_tries = 0
1428 target_vdu_id = None
1429 ro_retries = 0
1430
1431 while True:
1432
1433 ro_retries += 1
1434 if ro_retries >= 360: # 1 hour
1435 raise LcmException(
1436 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1437 )
1438
1439 await asyncio.sleep(10, loop=self.loop)
1440
1441 # get ip address
1442 if not target_vdu_id:
1443 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1444
1445 if not vdu_id: # for the VNF case
1446 if db_vnfr.get("status") == "ERROR":
1447 raise LcmException(
1448 "Cannot inject ssh-key because target VNF is in error state"
1449 )
1450 ip_address = db_vnfr.get("ip-address")
1451 if not ip_address:
1452 continue
1453 vdur = next(
1454 (
1455 x
1456 for x in get_iterable(db_vnfr, "vdur")
1457 if x.get("ip-address") == ip_address
1458 ),
1459 None,
1460 )
1461 else: # VDU case
1462 vdur = next(
1463 (
1464 x
1465 for x in get_iterable(db_vnfr, "vdur")
1466 if x.get("vdu-id-ref") == vdu_id
1467 and x.get("count-index") == vdu_index
1468 ),
1469 None,
1470 )
1471
1472 if (
1473 not vdur and len(db_vnfr.get("vdur", ())) == 1
1474 ): # If only one, this should be the target vdu
1475 vdur = db_vnfr["vdur"][0]
1476 if not vdur:
1477 raise LcmException(
1478 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1479 vnfr_id, vdu_id, vdu_index
1480 )
1481 )
1482 # New generation RO stores information at "vim_info"
1483 ng_ro_status = None
1484 target_vim = None
1485 if vdur.get("vim_info"):
1486 target_vim = next(
1487 t for t in vdur["vim_info"]
1488 ) # there should be only one key
1489 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1490 if (
1491 vdur.get("pdu-type")
1492 or vdur.get("status") == "ACTIVE"
1493 or ng_ro_status == "ACTIVE"
1494 ):
1495 ip_address = vdur.get("ip-address")
1496 if not ip_address:
1497 continue
1498 target_vdu_id = vdur["vdu-id-ref"]
1499 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1500 raise LcmException(
1501 "Cannot inject ssh-key because target VM is in error state"
1502 )
1503
1504 if not target_vdu_id:
1505 continue
1506
1507 # inject public key into machine
1508 if pub_key and user:
1509 self.logger.debug(logging_text + "Inserting RO key")
1510 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1511 if vdur.get("pdu-type"):
1512 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1513 return ip_address
1514 try:
1515 ro_vm_id = "{}-{}".format(
1516 db_vnfr["member-vnf-index-ref"], target_vdu_id
1517 ) # TODO add vdu_index
1518 if self.ng_ro:
1519 target = {
1520 "action": {
1521 "action": "inject_ssh_key",
1522 "key": pub_key,
1523 "user": user,
1524 },
1525 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1526 }
1527 desc = await self.RO.deploy(nsr_id, target)
1528 action_id = desc["action_id"]
1529 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1530 break
1531 else:
1532 # wait until NS is deployed at RO
1533 if not ro_nsr_id:
1534 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1535 ro_nsr_id = deep_get(
1536 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1537 )
1538 if not ro_nsr_id:
1539 continue
1540 result_dict = await self.RO.create_action(
1541 item="ns",
1542 item_id_name=ro_nsr_id,
1543 descriptor={
1544 "add_public_key": pub_key,
1545 "vms": [ro_vm_id],
1546 "user": user,
1547 },
1548 )
1549 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1550 if not result_dict or not isinstance(result_dict, dict):
1551 raise LcmException(
1552 "Unknown response from RO when injecting key"
1553 )
1554 for result in result_dict.values():
1555 if result.get("vim_result") == 200:
1556 break
1557 else:
1558 raise ROclient.ROClientException(
1559 "error injecting key: {}".format(
1560 result.get("description")
1561 )
1562 )
1563 break
1564 except NgRoException as e:
1565 raise LcmException(
1566 "Reaching max tries injecting key. Error: {}".format(e)
1567 )
1568 except ROclient.ROClientException as e:
1569 if not nb_tries:
1570 self.logger.debug(
1571 logging_text
1572 + "error injecting key: {}. Retrying until {} seconds".format(
1573 e, 20 * 10
1574 )
1575 )
1576 nb_tries += 1
1577 if nb_tries >= 20:
1578 raise LcmException(
1579 "Reaching max tries injecting key. Error: {}".format(e)
1580 )
1581 else:
1582 break
1583
1584 return ip_address
1585
1586 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1587 """
1588 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1589 """
1590 my_vca = vca_deployed_list[vca_index]
1591 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1592 # vdu or kdu: no dependencies
1593 return
1594 timeout = 300
1595 while timeout >= 0:
1596 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1597 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1598 configuration_status_list = db_nsr["configurationStatus"]
1599 for index, vca_deployed in enumerate(configuration_status_list):
1600 if index == vca_index:
1601 # myself
1602 continue
1603 if not my_vca.get("member-vnf-index") or (
1604 vca_deployed.get("member-vnf-index")
1605 == my_vca.get("member-vnf-index")
1606 ):
1607 internal_status = configuration_status_list[index].get("status")
1608 if internal_status == "READY":
1609 continue
1610 elif internal_status == "BROKEN":
1611 raise LcmException(
1612 "Configuration aborted because dependent charm/s has failed"
1613 )
1614 else:
1615 break
1616 else:
1617 # no dependencies, return
1618 return
1619 await asyncio.sleep(10)
1620 timeout -= 1
1621
1622 raise LcmException("Configuration aborted because dependent charm/s timeout")
1623
1624 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1625 vca_id = None
1626 if db_vnfr:
1627 vca_id = deep_get(db_vnfr, ("vca-id",))
1628 elif db_nsr:
1629 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1630 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1631 return vca_id
1632
1633 async def instantiate_N2VC(
1634 self,
1635 logging_text,
1636 vca_index,
1637 nsi_id,
1638 db_nsr,
1639 db_vnfr,
1640 vdu_id,
1641 kdu_name,
1642 vdu_index,
1643 config_descriptor,
1644 deploy_params,
1645 base_folder,
1646 nslcmop_id,
1647 stage,
1648 vca_type,
1649 vca_name,
1650 ee_config_descriptor,
1651 ):
1652 nsr_id = db_nsr["_id"]
1653 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1654 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1655 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1656 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1657 db_dict = {
1658 "collection": "nsrs",
1659 "filter": {"_id": nsr_id},
1660 "path": db_update_entry,
1661 }
1662 step = ""
1663 try:
1664
1665 element_type = "NS"
1666 element_under_configuration = nsr_id
1667
1668 vnfr_id = None
1669 if db_vnfr:
1670 vnfr_id = db_vnfr["_id"]
1671 osm_config["osm"]["vnf_id"] = vnfr_id
1672
1673 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1674
1675 if vca_type == "native_charm":
1676 index_number = 0
1677 else:
1678 index_number = vdu_index or 0
1679
1680 if vnfr_id:
1681 element_type = "VNF"
1682 element_under_configuration = vnfr_id
1683 namespace += ".{}-{}".format(vnfr_id, index_number)
1684 if vdu_id:
1685 namespace += ".{}-{}".format(vdu_id, index_number)
1686 element_type = "VDU"
1687 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1688 osm_config["osm"]["vdu_id"] = vdu_id
1689 elif kdu_name:
1690 namespace += ".{}".format(kdu_name)
1691 element_type = "KDU"
1692 element_under_configuration = kdu_name
1693 osm_config["osm"]["kdu_name"] = kdu_name
1694
1695 # Get artifact path
1696 if base_folder["pkg-dir"]:
1697 artifact_path = "{}/{}/{}/{}".format(
1698 base_folder["folder"],
1699 base_folder["pkg-dir"],
1700 "charms"
1701 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1702 else "helm-charts",
1703 vca_name,
1704 )
1705 else:
1706 artifact_path = "{}/Scripts/{}/{}/".format(
1707 base_folder["folder"],
1708 "charms"
1709 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1710 else "helm-charts",
1711 vca_name,
1712 )
1713
1714 self.logger.debug("Artifact path > {}".format(artifact_path))
1715
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list = config_descriptor.get(
1718 "initial-config-primitive"
1719 )
1720
1721 self.logger.debug(
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1724 )
1725 )
1726
1727 # add config if not present for NS charm
1728 ee_descriptor_id = ee_config_descriptor.get("id")
1729 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1730 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1732 )
1733
1734 self.logger.debug(
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1737 )
1738 )
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id = vca_deployed.get("ee_id")
1742
1743 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1744 # create or register execution environment in VCA
1745 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1746
1747 self._write_configuration_status(
1748 nsr_id=nsr_id,
1749 vca_index=vca_index,
1750 status="CREATING",
1751 element_under_configuration=element_under_configuration,
1752 element_type=element_type,
1753 )
1754
1755 step = "create execution environment"
1756 self.logger.debug(logging_text + step)
1757
1758 ee_id = None
1759 credentials = None
1760 if vca_type == "k8s_proxy_charm":
1761 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1762 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1763 namespace=namespace,
1764 artifact_path=artifact_path,
1765 db_dict=db_dict,
1766 vca_id=vca_id,
1767 )
1768 elif vca_type == "helm" or vca_type == "helm-v3":
1769 ee_id, credentials = await self.vca_map[
1770 vca_type
1771 ].create_execution_environment(
1772 namespace=namespace,
1773 reuse_ee_id=ee_id,
1774 db_dict=db_dict,
1775 config=osm_config,
1776 artifact_path=artifact_path,
1777 vca_type=vca_type,
1778 )
1779 else:
1780 ee_id, credentials = await self.vca_map[
1781 vca_type
1782 ].create_execution_environment(
1783 namespace=namespace,
1784 reuse_ee_id=ee_id,
1785 db_dict=db_dict,
1786 vca_id=vca_id,
1787 )
1788
1789 elif vca_type == "native_charm":
1790 step = "Waiting to VM being up and getting IP address"
1791 self.logger.debug(logging_text + step)
1792 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1793 logging_text,
1794 nsr_id,
1795 vnfr_id,
1796 vdu_id,
1797 vdu_index,
1798 user=None,
1799 pub_key=None,
1800 )
1801 credentials = {"hostname": rw_mgmt_ip}
1802 # get username
1803 username = deep_get(
1804 config_descriptor, ("config-access", "ssh-access", "default-user")
1805 )
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username and initial_config_primitive_list:
1809 for config_primitive in initial_config_primitive_list:
1810 for param in config_primitive.get("parameter", ()):
1811 if param["name"] == "ssh-username":
1812 username = param["value"]
1813 break
1814 if not username:
1815 raise LcmException(
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1818 )
1819 credentials["username"] = username
1820 # n2vc_redesign STEP 3.2
1821
1822 self._write_configuration_status(
1823 nsr_id=nsr_id,
1824 vca_index=vca_index,
1825 status="REGISTERING",
1826 element_under_configuration=element_under_configuration,
1827 element_type=element_type,
1828 )
1829
1830 step = "register execution environment {}".format(credentials)
1831 self.logger.debug(logging_text + step)
1832 ee_id = await self.vca_map[vca_type].register_execution_environment(
1833 credentials=credentials,
1834 namespace=namespace,
1835 db_dict=db_dict,
1836 vca_id=vca_id,
1837 )
1838
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts = ee_id.split(".")
1842 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1843 if len(ee_id_parts) >= 2:
1844 model_name = ee_id_parts[0]
1845 application_name = ee_id_parts[1]
1846 db_nsr_update[db_update_entry + "model"] = model_name
1847 db_nsr_update[db_update_entry + "application"] = application_name
1848
1849 # n2vc_redesign STEP 3.3
1850 step = "Install configuration Software"
1851
1852 self._write_configuration_status(
1853 nsr_id=nsr_id,
1854 vca_index=vca_index,
1855 status="INSTALLING SW",
1856 element_under_configuration=element_under_configuration,
1857 element_type=element_type,
1858 other_update=db_nsr_update,
1859 )
1860
1861 # TODO check if already done
1862 self.logger.debug(logging_text + step)
1863 config = None
1864 if vca_type == "native_charm":
1865 config_primitive = next(
1866 (p for p in initial_config_primitive_list if p["name"] == "config"),
1867 None,
1868 )
1869 if config_primitive:
1870 config = self._map_primitive_params(
1871 config_primitive, {}, deploy_params
1872 )
1873 num_units = 1
1874 if vca_type == "lxc_proxy_charm":
1875 if element_type == "NS":
1876 num_units = db_nsr.get("config-units") or 1
1877 elif element_type == "VNF":
1878 num_units = db_vnfr.get("config-units") or 1
1879 elif element_type == "VDU":
1880 for v in db_vnfr["vdur"]:
1881 if vdu_id == v["vdu-id-ref"]:
1882 num_units = v.get("config-units") or 1
1883 break
1884 if vca_type != "k8s_proxy_charm":
1885 await self.vca_map[vca_type].install_configuration_sw(
1886 ee_id=ee_id,
1887 artifact_path=artifact_path,
1888 db_dict=db_dict,
1889 config=config,
1890 num_units=num_units,
1891 vca_id=vca_id,
1892 vca_type=vca_type,
1893 )
1894
1895 # write in db flag of configuration_sw already installed
1896 self.update_db_2(
1897 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1898 )
1899
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self._add_vca_relations(
1902 logging_text=logging_text,
1903 nsr_id=nsr_id,
1904 vca_type=vca_type,
1905 vca_index=vca_index,
1906 )
1907
1908 # if SSH access is required, then get execution environment SSH public
1909 # if native charm we have waited already to VM be UP
1910 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1911 pub_key = None
1912 user = None
1913 # self.logger.debug("get ssh key block")
1914 if deep_get(
1915 config_descriptor, ("config-access", "ssh-access", "required")
1916 ):
1917 # self.logger.debug("ssh key needed")
1918 # Needed to inject a ssh key
1919 user = deep_get(
1920 config_descriptor,
1921 ("config-access", "ssh-access", "default-user"),
1922 )
1923 step = "Install configuration Software, getting public ssh key"
1924 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1925 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1926 )
1927
1928 step = "Insert public key into VM user={} ssh_key={}".format(
1929 user, pub_key
1930 )
1931 else:
1932 # self.logger.debug("no need to get ssh key")
1933 step = "Waiting to VM being up and getting IP address"
1934 self.logger.debug(logging_text + step)
1935
1936 # n2vc_redesign STEP 5.1
1937 # wait for RO (ip-address) Insert pub_key into VM
1938 if vnfr_id:
1939 if kdu_name:
1940 rw_mgmt_ip = await self.wait_kdu_up(
1941 logging_text, nsr_id, vnfr_id, kdu_name
1942 )
1943 else:
1944 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1945 logging_text,
1946 nsr_id,
1947 vnfr_id,
1948 vdu_id,
1949 vdu_index,
1950 user=user,
1951 pub_key=pub_key,
1952 )
1953 else:
1954 rw_mgmt_ip = None # This is for a NS configuration
1955
1956 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1957
1958 # store rw_mgmt_ip in deploy params for later replacement
1959 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1960
1961 # n2vc_redesign STEP 6 Execute initial config primitive
1962 step = "execute initial config primitive"
1963
1964 # wait for dependent primitives execution (NS -> VNF -> VDU)
1965 if initial_config_primitive_list:
1966 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1967
1968 # stage, in function of element type: vdu, kdu, vnf or ns
1969 my_vca = vca_deployed_list[vca_index]
1970 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1971 # VDU or KDU
1972 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1973 elif my_vca.get("member-vnf-index"):
1974 # VNF
1975 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1976 else:
1977 # NS
1978 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1979
1980 self._write_configuration_status(
1981 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1982 )
1983
1984 self._write_op_status(op_id=nslcmop_id, stage=stage)
1985
1986 check_if_terminated_needed = True
1987 for initial_config_primitive in initial_config_primitive_list:
1988 # adding information on the vca_deployed if it is a NS execution environment
1989 if not vca_deployed["member-vnf-index"]:
1990 deploy_params["ns_config_info"] = json.dumps(
1991 self._get_ns_config_info(nsr_id)
1992 )
1993 # TODO check if already done
1994 primitive_params_ = self._map_primitive_params(
1995 initial_config_primitive, {}, deploy_params
1996 )
1997
1998 step = "execute primitive '{}' params '{}'".format(
1999 initial_config_primitive["name"], primitive_params_
2000 )
2001 self.logger.debug(logging_text + step)
2002 await self.vca_map[vca_type].exec_primitive(
2003 ee_id=ee_id,
2004 primitive_name=initial_config_primitive["name"],
2005 params_dict=primitive_params_,
2006 db_dict=db_dict,
2007 vca_id=vca_id,
2008 vca_type=vca_type,
2009 )
2010 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2011 if check_if_terminated_needed:
2012 if config_descriptor.get("terminate-config-primitive"):
2013 self.update_db_2(
2014 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2015 )
2016 check_if_terminated_needed = False
2017
2018 # TODO register in database that primitive is done
2019
2020 # STEP 7 Configure metrics
2021 if vca_type == "helm" or vca_type == "helm-v3":
2022 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2023 ee_id=ee_id,
2024 artifact_path=artifact_path,
2025 ee_config_descriptor=ee_config_descriptor,
2026 vnfr_id=vnfr_id,
2027 nsr_id=nsr_id,
2028 target_ip=rw_mgmt_ip,
2029 )
2030 if prometheus_jobs:
2031 self.update_db_2(
2032 "nsrs",
2033 nsr_id,
2034 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2035 )
2036
2037 for job in prometheus_jobs:
2038 self.db.set_one(
2039 "prometheus_jobs",
2040 {
2041 "job_name": job["job_name"]
2042 },
2043 job,
2044 upsert=True,
2045 fail_on_empty=False
2046 )
2047
2048 step = "instantiated at VCA"
2049 self.logger.debug(logging_text + step)
2050
2051 self._write_configuration_status(
2052 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2053 )
2054
2055 except Exception as e: # TODO not use Exception but N2VC exception
2056 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2057 if not isinstance(
2058 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2059 ):
2060 self.logger.error(
2061 "Exception while {} : {}".format(step, e), exc_info=True
2062 )
2063 self._write_configuration_status(
2064 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2065 )
2066 raise LcmException("{} {}".format(step, e)) from e
2067
2068 def _write_ns_status(
2069 self,
2070 nsr_id: str,
2071 ns_state: str,
2072 current_operation: str,
2073 current_operation_id: str,
2074 error_description: str = None,
2075 error_detail: str = None,
2076 other_update: dict = None,
2077 ):
2078 """
2079 Update db_nsr fields.
2080 :param nsr_id:
2081 :param ns_state:
2082 :param current_operation:
2083 :param current_operation_id:
2084 :param error_description:
2085 :param error_detail:
2086 :param other_update: Other required changes at database if provided, will be cleared
2087 :return:
2088 """
2089 try:
2090 db_dict = other_update or {}
2091 db_dict[
2092 "_admin.nslcmop"
2093 ] = current_operation_id # for backward compatibility
2094 db_dict["_admin.current-operation"] = current_operation_id
2095 db_dict["_admin.operation-type"] = (
2096 current_operation if current_operation != "IDLE" else None
2097 )
2098 db_dict["currentOperation"] = current_operation
2099 db_dict["currentOperationID"] = current_operation_id
2100 db_dict["errorDescription"] = error_description
2101 db_dict["errorDetail"] = error_detail
2102
2103 if ns_state:
2104 db_dict["nsState"] = ns_state
2105 self.update_db_2("nsrs", nsr_id, db_dict)
2106 except DbException as e:
2107 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2108
2109 def _write_op_status(
2110 self,
2111 op_id: str,
2112 stage: list = None,
2113 error_message: str = None,
2114 queuePosition: int = 0,
2115 operation_state: str = None,
2116 other_update: dict = None,
2117 ):
2118 try:
2119 db_dict = other_update or {}
2120 db_dict["queuePosition"] = queuePosition
2121 if isinstance(stage, list):
2122 db_dict["stage"] = stage[0]
2123 db_dict["detailed-status"] = " ".join(stage)
2124 elif stage is not None:
2125 db_dict["stage"] = str(stage)
2126
2127 if error_message is not None:
2128 db_dict["errorMessage"] = error_message
2129 if operation_state is not None:
2130 db_dict["operationState"] = operation_state
2131 db_dict["statusEnteredTime"] = time()
2132 self.update_db_2("nslcmops", op_id, db_dict)
2133 except DbException as e:
2134 self.logger.warn(
2135 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2136 )
2137
2138 def _write_all_config_status(self, db_nsr: dict, status: str):
2139 try:
2140 nsr_id = db_nsr["_id"]
2141 # configurationStatus
2142 config_status = db_nsr.get("configurationStatus")
2143 if config_status:
2144 db_nsr_update = {
2145 "configurationStatus.{}.status".format(index): status
2146 for index, v in enumerate(config_status)
2147 if v
2148 }
2149 # update status
2150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2151
2152 except DbException as e:
2153 self.logger.warn(
2154 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2155 )
2156
2157 def _write_configuration_status(
2158 self,
2159 nsr_id: str,
2160 vca_index: int,
2161 status: str = None,
2162 element_under_configuration: str = None,
2163 element_type: str = None,
2164 other_update: dict = None,
2165 ):
2166
2167 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2168 # .format(vca_index, status))
2169
2170 try:
2171 db_path = "configurationStatus.{}.".format(vca_index)
2172 db_dict = other_update or {}
2173 if status:
2174 db_dict[db_path + "status"] = status
2175 if element_under_configuration:
2176 db_dict[
2177 db_path + "elementUnderConfiguration"
2178 ] = element_under_configuration
2179 if element_type:
2180 db_dict[db_path + "elementType"] = element_type
2181 self.update_db_2("nsrs", nsr_id, db_dict)
2182 except DbException as e:
2183 self.logger.warn(
2184 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2185 status, nsr_id, vca_index, e
2186 )
2187 )
2188
2189 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2190 """
2191 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2192 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2193 Database is used because the result can be obtained from a different LCM worker in case of HA.
2194 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2195 :param db_nslcmop: database content of nslcmop
2196 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2197 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2198 computed 'vim-account-id'
2199 """
2200 modified = False
2201 nslcmop_id = db_nslcmop["_id"]
2202 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2203 if placement_engine == "PLA":
2204 self.logger.debug(
2205 logging_text + "Invoke and wait for placement optimization"
2206 )
2207 await self.msg.aiowrite(
2208 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2209 )
2210 db_poll_interval = 5
2211 wait = db_poll_interval * 10
2212 pla_result = None
2213 while not pla_result and wait >= 0:
2214 await asyncio.sleep(db_poll_interval)
2215 wait -= db_poll_interval
2216 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2217 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2218
2219 if not pla_result:
2220 raise LcmException(
2221 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2222 )
2223
2224 for pla_vnf in pla_result["vnf"]:
2225 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2226 if not pla_vnf.get("vimAccountId") or not vnfr:
2227 continue
2228 modified = True
2229 self.db.set_one(
2230 "vnfrs",
2231 {"_id": vnfr["_id"]},
2232 {"vim-account-id": pla_vnf["vimAccountId"]},
2233 )
2234 # Modifies db_vnfrs
2235 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2236 return modified
2237
2238 def update_nsrs_with_pla_result(self, params):
2239 try:
2240 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2241 self.update_db_2(
2242 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2243 )
2244 except Exception as e:
2245 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2246
2247 async def instantiate(self, nsr_id, nslcmop_id):
2248 """
2249
2250 :param nsr_id: ns instance to deploy
2251 :param nslcmop_id: operation to run
2252 :return:
2253 """
2254
2255 # Try to lock HA task here
2256 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2257 if not task_is_locked_by_me:
2258 self.logger.debug(
2259 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2260 )
2261 return
2262
2263 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2264 self.logger.debug(logging_text + "Enter")
2265
2266 # get all needed from database
2267
2268 # database nsrs record
2269 db_nsr = None
2270
2271 # database nslcmops record
2272 db_nslcmop = None
2273
2274 # update operation on nsrs
2275 db_nsr_update = {}
2276 # update operation on nslcmops
2277 db_nslcmop_update = {}
2278
2279 nslcmop_operation_state = None
2280 db_vnfrs = {} # vnf's info indexed by member-index
2281 # n2vc_info = {}
2282 tasks_dict_info = {} # from task to info text
2283 exc = None
2284 error_list = []
2285 stage = [
2286 "Stage 1/5: preparation of the environment.",
2287 "Waiting for previous operations to terminate.",
2288 "",
2289 ]
2290 # ^ stage, step, VIM progress
2291 try:
2292 # wait for any previous tasks in process
2293 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2294
2295 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2296 stage[1] = "Reading from database."
2297 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2298 db_nsr_update["detailed-status"] = "creating"
2299 db_nsr_update["operational-status"] = "init"
2300 self._write_ns_status(
2301 nsr_id=nsr_id,
2302 ns_state="BUILDING",
2303 current_operation="INSTANTIATING",
2304 current_operation_id=nslcmop_id,
2305 other_update=db_nsr_update,
2306 )
2307 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2308
2309 # read from db: operation
2310 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2311 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2312 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2313 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2314 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2315 )
2316 ns_params = db_nslcmop.get("operationParams")
2317 if ns_params and ns_params.get("timeout_ns_deploy"):
2318 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2319 else:
2320 timeout_ns_deploy = self.timeout.get(
2321 "ns_deploy", self.timeout_ns_deploy
2322 )
2323
2324 # read from db: ns
2325 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2326 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2327 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2328 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2329 self.fs.sync(db_nsr["nsd-id"])
2330 db_nsr["nsd"] = nsd
2331 # nsr_name = db_nsr["name"] # TODO short-name??
2332
2333 # read from db: vnf's of this ns
2334 stage[1] = "Getting vnfrs from db."
2335 self.logger.debug(logging_text + stage[1])
2336 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2337
2338 # read from db: vnfd's for every vnf
2339 db_vnfds = [] # every vnfd data
2340
2341 # for each vnf in ns, read vnfd
2342 for vnfr in db_vnfrs_list:
2343 if vnfr.get("kdur"):
2344 kdur_list = []
2345 for kdur in vnfr["kdur"]:
2346 if kdur.get("additionalParams"):
2347 kdur["additionalParams"] = json.loads(
2348 kdur["additionalParams"]
2349 )
2350 kdur_list.append(kdur)
2351 vnfr["kdur"] = kdur_list
2352
2353 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2354 vnfd_id = vnfr["vnfd-id"]
2355 vnfd_ref = vnfr["vnfd-ref"]
2356 self.fs.sync(vnfd_id)
2357
2358 # if we haven't this vnfd, read it from db
2359 if vnfd_id not in db_vnfds:
2360 # read from db
2361 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2362 vnfd_id, vnfd_ref
2363 )
2364 self.logger.debug(logging_text + stage[1])
2365 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2366
2367 # store vnfd
2368 db_vnfds.append(vnfd)
2369
2370 # Get or generates the _admin.deployed.VCA list
2371 vca_deployed_list = None
2372 if db_nsr["_admin"].get("deployed"):
2373 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2374 if vca_deployed_list is None:
2375 vca_deployed_list = []
2376 configuration_status_list = []
2377 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2378 db_nsr_update["configurationStatus"] = configuration_status_list
2379 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2380 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2381 elif isinstance(vca_deployed_list, dict):
2382 # maintain backward compatibility. Change a dict to list at database
2383 vca_deployed_list = list(vca_deployed_list.values())
2384 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2385 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2386
2387 if not isinstance(
2388 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2389 ):
2390 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2391 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2392
2393 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2394 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2395 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2396 self.db.set_list(
2397 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2398 )
2399
2400 # n2vc_redesign STEP 2 Deploy Network Scenario
2401 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2402 self._write_op_status(op_id=nslcmop_id, stage=stage)
2403
2404 stage[1] = "Deploying KDUs."
2405 # self.logger.debug(logging_text + "Before deploy_kdus")
2406 # Call to deploy_kdus in case exists the "vdu:kdu" param
2407 await self.deploy_kdus(
2408 logging_text=logging_text,
2409 nsr_id=nsr_id,
2410 nslcmop_id=nslcmop_id,
2411 db_vnfrs=db_vnfrs,
2412 db_vnfds=db_vnfds,
2413 task_instantiation_info=tasks_dict_info,
2414 )
2415
2416 stage[1] = "Getting VCA public key."
2417 # n2vc_redesign STEP 1 Get VCA public ssh-key
2418 # feature 1429. Add n2vc public key to needed VMs
2419 n2vc_key = self.n2vc.get_public_key()
2420 n2vc_key_list = [n2vc_key]
2421 if self.vca_config.get("public_key"):
2422 n2vc_key_list.append(self.vca_config["public_key"])
2423
2424 stage[1] = "Deploying NS at VIM."
2425 task_ro = asyncio.ensure_future(
2426 self.instantiate_RO(
2427 logging_text=logging_text,
2428 nsr_id=nsr_id,
2429 nsd=nsd,
2430 db_nsr=db_nsr,
2431 db_nslcmop=db_nslcmop,
2432 db_vnfrs=db_vnfrs,
2433 db_vnfds=db_vnfds,
2434 n2vc_key_list=n2vc_key_list,
2435 stage=stage,
2436 )
2437 )
2438 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2439 tasks_dict_info[task_ro] = "Deploying at VIM"
2440
2441 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2442 stage[1] = "Deploying Execution Environments."
2443 self.logger.debug(logging_text + stage[1])
2444
2445 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2446 for vnf_profile in get_vnf_profiles(nsd):
2447 vnfd_id = vnf_profile["vnfd-id"]
2448 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2449 member_vnf_index = str(vnf_profile["id"])
2450 db_vnfr = db_vnfrs[member_vnf_index]
2451 base_folder = vnfd["_admin"]["storage"]
2452 vdu_id = None
2453 vdu_index = 0
2454 vdu_name = None
2455 kdu_name = None
2456
2457 # Get additional parameters
2458 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2459 if db_vnfr.get("additionalParamsForVnf"):
2460 deploy_params.update(
2461 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2462 )
2463
2464 descriptor_config = get_configuration(vnfd, vnfd["id"])
2465 if descriptor_config:
2466 self._deploy_n2vc(
2467 logging_text=logging_text
2468 + "member_vnf_index={} ".format(member_vnf_index),
2469 db_nsr=db_nsr,
2470 db_vnfr=db_vnfr,
2471 nslcmop_id=nslcmop_id,
2472 nsr_id=nsr_id,
2473 nsi_id=nsi_id,
2474 vnfd_id=vnfd_id,
2475 vdu_id=vdu_id,
2476 kdu_name=kdu_name,
2477 member_vnf_index=member_vnf_index,
2478 vdu_index=vdu_index,
2479 vdu_name=vdu_name,
2480 deploy_params=deploy_params,
2481 descriptor_config=descriptor_config,
2482 base_folder=base_folder,
2483 task_instantiation_info=tasks_dict_info,
2484 stage=stage,
2485 )
2486
2487 # Deploy charms for each VDU that supports one.
2488 for vdud in get_vdu_list(vnfd):
2489 vdu_id = vdud["id"]
2490 descriptor_config = get_configuration(vnfd, vdu_id)
2491 vdur = find_in_list(
2492 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2493 )
2494
2495 if vdur.get("additionalParams"):
2496 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2497 else:
2498 deploy_params_vdu = deploy_params
2499 deploy_params_vdu["OSM"] = get_osm_params(
2500 db_vnfr, vdu_id, vdu_count_index=0
2501 )
2502 vdud_count = get_number_of_instances(vnfd, vdu_id)
2503
2504 self.logger.debug("VDUD > {}".format(vdud))
2505 self.logger.debug(
2506 "Descriptor config > {}".format(descriptor_config)
2507 )
2508 if descriptor_config:
2509 vdu_name = None
2510 kdu_name = None
2511 for vdu_index in range(vdud_count):
2512 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2513 self._deploy_n2vc(
2514 logging_text=logging_text
2515 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2516 member_vnf_index, vdu_id, vdu_index
2517 ),
2518 db_nsr=db_nsr,
2519 db_vnfr=db_vnfr,
2520 nslcmop_id=nslcmop_id,
2521 nsr_id=nsr_id,
2522 nsi_id=nsi_id,
2523 vnfd_id=vnfd_id,
2524 vdu_id=vdu_id,
2525 kdu_name=kdu_name,
2526 member_vnf_index=member_vnf_index,
2527 vdu_index=vdu_index,
2528 vdu_name=vdu_name,
2529 deploy_params=deploy_params_vdu,
2530 descriptor_config=descriptor_config,
2531 base_folder=base_folder,
2532 task_instantiation_info=tasks_dict_info,
2533 stage=stage,
2534 )
2535 for kdud in get_kdu_list(vnfd):
2536 kdu_name = kdud["name"]
2537 descriptor_config = get_configuration(vnfd, kdu_name)
2538 if descriptor_config:
2539 vdu_id = None
2540 vdu_index = 0
2541 vdu_name = None
2542 kdur = next(
2543 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2544 )
2545 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2546 if kdur.get("additionalParams"):
2547 deploy_params_kdu.update(
2548 parse_yaml_strings(kdur["additionalParams"].copy())
2549 )
2550
2551 self._deploy_n2vc(
2552 logging_text=logging_text,
2553 db_nsr=db_nsr,
2554 db_vnfr=db_vnfr,
2555 nslcmop_id=nslcmop_id,
2556 nsr_id=nsr_id,
2557 nsi_id=nsi_id,
2558 vnfd_id=vnfd_id,
2559 vdu_id=vdu_id,
2560 kdu_name=kdu_name,
2561 member_vnf_index=member_vnf_index,
2562 vdu_index=vdu_index,
2563 vdu_name=vdu_name,
2564 deploy_params=deploy_params_kdu,
2565 descriptor_config=descriptor_config,
2566 base_folder=base_folder,
2567 task_instantiation_info=tasks_dict_info,
2568 stage=stage,
2569 )
2570
2571 # Check if this NS has a charm configuration
2572 descriptor_config = nsd.get("ns-configuration")
2573 if descriptor_config and descriptor_config.get("juju"):
2574 vnfd_id = None
2575 db_vnfr = None
2576 member_vnf_index = None
2577 vdu_id = None
2578 kdu_name = None
2579 vdu_index = 0
2580 vdu_name = None
2581
2582 # Get additional parameters
2583 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2584 if db_nsr.get("additionalParamsForNs"):
2585 deploy_params.update(
2586 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2587 )
2588 base_folder = nsd["_admin"]["storage"]
2589 self._deploy_n2vc(
2590 logging_text=logging_text,
2591 db_nsr=db_nsr,
2592 db_vnfr=db_vnfr,
2593 nslcmop_id=nslcmop_id,
2594 nsr_id=nsr_id,
2595 nsi_id=nsi_id,
2596 vnfd_id=vnfd_id,
2597 vdu_id=vdu_id,
2598 kdu_name=kdu_name,
2599 member_vnf_index=member_vnf_index,
2600 vdu_index=vdu_index,
2601 vdu_name=vdu_name,
2602 deploy_params=deploy_params,
2603 descriptor_config=descriptor_config,
2604 base_folder=base_folder,
2605 task_instantiation_info=tasks_dict_info,
2606 stage=stage,
2607 )
2608
2609 # rest of staff will be done at finally
2610
2611 except (
2612 ROclient.ROClientException,
2613 DbException,
2614 LcmException,
2615 N2VCException,
2616 ) as e:
2617 self.logger.error(
2618 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2619 )
2620 exc = e
2621 except asyncio.CancelledError:
2622 self.logger.error(
2623 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2624 )
2625 exc = "Operation was cancelled"
2626 except Exception as e:
2627 exc = traceback.format_exc()
2628 self.logger.critical(
2629 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2630 exc_info=True,
2631 )
2632 finally:
2633 if exc:
2634 error_list.append(str(exc))
2635 try:
2636 # wait for pending tasks
2637 if tasks_dict_info:
2638 stage[1] = "Waiting for instantiate pending tasks."
2639 self.logger.debug(logging_text + stage[1])
2640 error_list += await self._wait_for_tasks(
2641 logging_text,
2642 tasks_dict_info,
2643 timeout_ns_deploy,
2644 stage,
2645 nslcmop_id,
2646 nsr_id=nsr_id,
2647 )
2648 stage[1] = stage[2] = ""
2649 except asyncio.CancelledError:
2650 error_list.append("Cancelled")
2651 # TODO cancel all tasks
2652 except Exception as exc:
2653 error_list.append(str(exc))
2654
2655 # update operation-status
2656 db_nsr_update["operational-status"] = "running"
2657 # let's begin with VCA 'configured' status (later we can change it)
2658 db_nsr_update["config-status"] = "configured"
2659 for task, task_name in tasks_dict_info.items():
2660 if not task.done() or task.cancelled() or task.exception():
2661 if task_name.startswith(self.task_name_deploy_vca):
2662 # A N2VC task is pending
2663 db_nsr_update["config-status"] = "failed"
2664 else:
2665 # RO or KDU task is pending
2666 db_nsr_update["operational-status"] = "failed"
2667
2668 # update status at database
2669 if error_list:
2670 error_detail = ". ".join(error_list)
2671 self.logger.error(logging_text + error_detail)
2672 error_description_nslcmop = "{} Detail: {}".format(
2673 stage[0], error_detail
2674 )
2675 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2676 nslcmop_id, stage[0]
2677 )
2678
2679 db_nsr_update["detailed-status"] = (
2680 error_description_nsr + " Detail: " + error_detail
2681 )
2682 db_nslcmop_update["detailed-status"] = error_detail
2683 nslcmop_operation_state = "FAILED"
2684 ns_state = "BROKEN"
2685 else:
2686 error_detail = None
2687 error_description_nsr = error_description_nslcmop = None
2688 ns_state = "READY"
2689 db_nsr_update["detailed-status"] = "Done"
2690 db_nslcmop_update["detailed-status"] = "Done"
2691 nslcmop_operation_state = "COMPLETED"
2692
2693 if db_nsr:
2694 self._write_ns_status(
2695 nsr_id=nsr_id,
2696 ns_state=ns_state,
2697 current_operation="IDLE",
2698 current_operation_id=None,
2699 error_description=error_description_nsr,
2700 error_detail=error_detail,
2701 other_update=db_nsr_update,
2702 )
2703 self._write_op_status(
2704 op_id=nslcmop_id,
2705 stage="",
2706 error_message=error_description_nslcmop,
2707 operation_state=nslcmop_operation_state,
2708 other_update=db_nslcmop_update,
2709 )
2710
2711 if nslcmop_operation_state:
2712 try:
2713 await self.msg.aiowrite(
2714 "ns",
2715 "instantiated",
2716 {
2717 "nsr_id": nsr_id,
2718 "nslcmop_id": nslcmop_id,
2719 "operationState": nslcmop_operation_state,
2720 },
2721 loop=self.loop,
2722 )
2723 except Exception as e:
2724 self.logger.error(
2725 logging_text + "kafka_write notification Exception {}".format(e)
2726 )
2727
2728 self.logger.debug(logging_text + "Exit")
2729 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2730
2731 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2732 if vnfd_id not in cached_vnfds:
2733 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2734 return cached_vnfds[vnfd_id]
2735
2736 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2737 if vnf_profile_id not in cached_vnfrs:
2738 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2739 "vnfrs",
2740 {
2741 "member-vnf-index-ref": vnf_profile_id,
2742 "nsr-id-ref": nsr_id,
2743 },
2744 )
2745 return cached_vnfrs[vnf_profile_id]
2746
2747 def _is_deployed_vca_in_relation(
2748 self, vca: DeployedVCA, relation: Relation
2749 ) -> bool:
2750 found = False
2751 for endpoint in (relation.provider, relation.requirer):
2752 if endpoint["kdu-resource-profile-id"]:
2753 continue
2754 found = (
2755 vca.vnf_profile_id == endpoint.vnf_profile_id
2756 and vca.vdu_profile_id == endpoint.vdu_profile_id
2757 and vca.execution_environment_ref == endpoint.execution_environment_ref
2758 )
2759 if found:
2760 break
2761 return found
2762
2763 def _update_ee_relation_data_with_implicit_data(
2764 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2765 ):
2766 ee_relation_data = safe_get_ee_relation(
2767 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2768 )
2769 ee_relation_level = EELevel.get_level(ee_relation_data)
2770 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2771 "execution-environment-ref"
2772 ]:
2773 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2774 vnfd_id = vnf_profile["vnfd-id"]
2775 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2776 entity_id = (
2777 vnfd_id
2778 if ee_relation_level == EELevel.VNF
2779 else ee_relation_data["vdu-profile-id"]
2780 )
2781 ee = get_juju_ee_ref(db_vnfd, entity_id)
2782 if not ee:
2783 raise Exception(
2784 f"not execution environments found for ee_relation {ee_relation_data}"
2785 )
2786 ee_relation_data["execution-environment-ref"] = ee["id"]
2787 return ee_relation_data
2788
2789 def _get_ns_relations(
2790 self,
2791 nsr_id: str,
2792 nsd: Dict[str, Any],
2793 vca: DeployedVCA,
2794 cached_vnfds: Dict[str, Any],
2795 ) -> List[Relation]:
2796 relations = []
2797 db_ns_relations = get_ns_configuration_relation_list(nsd)
2798 for r in db_ns_relations:
2799 provider_dict = None
2800 requirer_dict = None
2801 if all(key in r for key in ("provider", "requirer")):
2802 provider_dict = r["provider"]
2803 requirer_dict = r["requirer"]
2804 elif "entities" in r:
2805 provider_id = r["entities"][0]["id"]
2806 provider_dict = {
2807 "nsr-id": nsr_id,
2808 "endpoint": r["entities"][0]["endpoint"],
2809 }
2810 if provider_id != nsd["id"]:
2811 provider_dict["vnf-profile-id"] = provider_id
2812 requirer_id = r["entities"][1]["id"]
2813 requirer_dict = {
2814 "nsr-id": nsr_id,
2815 "endpoint": r["entities"][1]["endpoint"],
2816 }
2817 if requirer_id != nsd["id"]:
2818 requirer_dict["vnf-profile-id"] = requirer_id
2819 else:
2820 raise Exception("provider/requirer or entities must be included in the relation.")
2821 relation_provider = self._update_ee_relation_data_with_implicit_data(
2822 nsr_id, nsd, provider_dict, cached_vnfds
2823 )
2824 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2825 nsr_id, nsd, requirer_dict, cached_vnfds
2826 )
2827 provider = EERelation(relation_provider)
2828 requirer = EERelation(relation_requirer)
2829 relation = Relation(r["name"], provider, requirer)
2830 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2831 if vca_in_relation:
2832 relations.append(relation)
2833 return relations
2834
2835 def _get_vnf_relations(
2836 self,
2837 nsr_id: str,
2838 nsd: Dict[str, Any],
2839 vca: DeployedVCA,
2840 cached_vnfds: Dict[str, Any],
2841 ) -> List[Relation]:
2842 relations = []
2843 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2844 vnf_profile_id = vnf_profile["id"]
2845 vnfd_id = vnf_profile["vnfd-id"]
2846 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2847 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2848 for r in db_vnf_relations:
2849 provider_dict = None
2850 requirer_dict = None
2851 if all(key in r for key in ("provider", "requirer")):
2852 provider_dict = r["provider"]
2853 requirer_dict = r["requirer"]
2854 elif "entities" in r:
2855 provider_id = r["entities"][0]["id"]
2856 provider_dict = {
2857 "nsr-id": nsr_id,
2858 "vnf-profile-id": vnf_profile_id,
2859 "endpoint": r["entities"][0]["endpoint"],
2860 }
2861 if provider_id != vnfd_id:
2862 provider_dict["vdu-profile-id"] = provider_id
2863 requirer_id = r["entities"][1]["id"]
2864 requirer_dict = {
2865 "nsr-id": nsr_id,
2866 "vnf-profile-id": vnf_profile_id,
2867 "endpoint": r["entities"][1]["endpoint"],
2868 }
2869 if requirer_id != vnfd_id:
2870 requirer_dict["vdu-profile-id"] = requirer_id
2871 else:
2872 raise Exception("provider/requirer or entities must be included in the relation.")
2873 relation_provider = self._update_ee_relation_data_with_implicit_data(
2874 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2875 )
2876 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2877 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2878 )
2879 provider = EERelation(relation_provider)
2880 requirer = EERelation(relation_requirer)
2881 relation = Relation(r["name"], provider, requirer)
2882 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2883 if vca_in_relation:
2884 relations.append(relation)
2885 return relations
2886
2887 def _get_kdu_resource_data(
2888 self,
2889 ee_relation: EERelation,
2890 db_nsr: Dict[str, Any],
2891 cached_vnfds: Dict[str, Any],
2892 ) -> DeployedK8sResource:
2893 nsd = get_nsd(db_nsr)
2894 vnf_profiles = get_vnf_profiles(nsd)
2895 vnfd_id = find_in_list(
2896 vnf_profiles,
2897 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2898 )["vnfd-id"]
2899 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2900 kdu_resource_profile = get_kdu_resource_profile(
2901 db_vnfd, ee_relation.kdu_resource_profile_id
2902 )
2903 kdu_name = kdu_resource_profile["kdu-name"]
2904 deployed_kdu, _ = get_deployed_kdu(
2905 db_nsr.get("_admin", ()).get("deployed", ()),
2906 kdu_name,
2907 ee_relation.vnf_profile_id,
2908 )
2909 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2910 return deployed_kdu
2911
2912 def _get_deployed_component(
2913 self,
2914 ee_relation: EERelation,
2915 db_nsr: Dict[str, Any],
2916 cached_vnfds: Dict[str, Any],
2917 ) -> DeployedComponent:
2918 nsr_id = db_nsr["_id"]
2919 deployed_component = None
2920 ee_level = EELevel.get_level(ee_relation)
2921 if ee_level == EELevel.NS:
2922 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2923 if vca:
2924 deployed_component = DeployedVCA(nsr_id, vca)
2925 elif ee_level == EELevel.VNF:
2926 vca = get_deployed_vca(
2927 db_nsr,
2928 {
2929 "vdu_id": None,
2930 "member-vnf-index": ee_relation.vnf_profile_id,
2931 "ee_descriptor_id": ee_relation.execution_environment_ref,
2932 },
2933 )
2934 if vca:
2935 deployed_component = DeployedVCA(nsr_id, vca)
2936 elif ee_level == EELevel.VDU:
2937 vca = get_deployed_vca(
2938 db_nsr,
2939 {
2940 "vdu_id": ee_relation.vdu_profile_id,
2941 "member-vnf-index": ee_relation.vnf_profile_id,
2942 "ee_descriptor_id": ee_relation.execution_environment_ref,
2943 },
2944 )
2945 if vca:
2946 deployed_component = DeployedVCA(nsr_id, vca)
2947 elif ee_level == EELevel.KDU:
2948 kdu_resource_data = self._get_kdu_resource_data(
2949 ee_relation, db_nsr, cached_vnfds
2950 )
2951 if kdu_resource_data:
2952 deployed_component = DeployedK8sResource(kdu_resource_data)
2953 return deployed_component
2954
2955 async def _add_relation(
2956 self,
2957 relation: Relation,
2958 vca_type: str,
2959 db_nsr: Dict[str, Any],
2960 cached_vnfds: Dict[str, Any],
2961 cached_vnfrs: Dict[str, Any],
2962 ) -> bool:
2963 deployed_provider = self._get_deployed_component(
2964 relation.provider, db_nsr, cached_vnfds
2965 )
2966 deployed_requirer = self._get_deployed_component(
2967 relation.requirer, db_nsr, cached_vnfds
2968 )
2969 if (
2970 deployed_provider
2971 and deployed_requirer
2972 and deployed_provider.config_sw_installed
2973 and deployed_requirer.config_sw_installed
2974 ):
2975 provider_db_vnfr = (
2976 self._get_vnfr(
2977 relation.provider.nsr_id,
2978 relation.provider.vnf_profile_id,
2979 cached_vnfrs,
2980 )
2981 if relation.provider.vnf_profile_id
2982 else None
2983 )
2984 requirer_db_vnfr = (
2985 self._get_vnfr(
2986 relation.requirer.nsr_id,
2987 relation.requirer.vnf_profile_id,
2988 cached_vnfrs,
2989 )
2990 if relation.requirer.vnf_profile_id
2991 else None
2992 )
2993 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
2994 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
2995 provider_relation_endpoint = RelationEndpoint(
2996 deployed_provider.ee_id,
2997 provider_vca_id,
2998 relation.provider.endpoint,
2999 )
3000 requirer_relation_endpoint = RelationEndpoint(
3001 deployed_requirer.ee_id,
3002 requirer_vca_id,
3003 relation.requirer.endpoint,
3004 )
3005 await self.vca_map[vca_type].add_relation(
3006 provider=provider_relation_endpoint,
3007 requirer=requirer_relation_endpoint,
3008 )
3009 # remove entry from relations list
3010 return True
3011 return False
3012
3013 async def _add_vca_relations(
3014 self,
3015 logging_text,
3016 nsr_id,
3017 vca_type: str,
3018 vca_index: int,
3019 timeout: int = 3600,
3020 ) -> bool:
3021
3022 # steps:
3023 # 1. find all relations for this VCA
3024 # 2. wait for other peers related
3025 # 3. add relations
3026
3027 try:
3028 # STEP 1: find all relations for this VCA
3029
3030 # read nsr record
3031 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3032 nsd = get_nsd(db_nsr)
3033
3034 # this VCA data
3035 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3036 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3037
3038 cached_vnfds = {}
3039 cached_vnfrs = {}
3040 relations = []
3041 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3042 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3043
3044 # if no relations, terminate
3045 if not relations:
3046 self.logger.debug(logging_text + " No relations")
3047 return True
3048
3049 self.logger.debug(logging_text + " adding relations {}".format(relations))
3050
3051 # add all relations
3052 start = time()
3053 while True:
3054 # check timeout
3055 now = time()
3056 if now - start >= timeout:
3057 self.logger.error(logging_text + " : timeout adding relations")
3058 return False
3059
3060 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3061 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3062
3063 # for each relation, find the VCA's related
3064 for relation in relations.copy():
3065 added = await self._add_relation(
3066 relation,
3067 vca_type,
3068 db_nsr,
3069 cached_vnfds,
3070 cached_vnfrs,
3071 )
3072 if added:
3073 relations.remove(relation)
3074
3075 if not relations:
3076 self.logger.debug("Relations added")
3077 break
3078 await asyncio.sleep(5.0)
3079
3080 return True
3081
3082 except Exception as e:
3083 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3084 return False
3085
3086 async def _install_kdu(
3087 self,
3088 nsr_id: str,
3089 nsr_db_path: str,
3090 vnfr_data: dict,
3091 kdu_index: int,
3092 kdud: dict,
3093 vnfd: dict,
3094 k8s_instance_info: dict,
3095 k8params: dict = None,
3096 timeout: int = 600,
3097 vca_id: str = None,
3098 ):
3099
3100 try:
3101 k8sclustertype = k8s_instance_info["k8scluster-type"]
3102 # Instantiate kdu
3103 db_dict_install = {
3104 "collection": "nsrs",
3105 "filter": {"_id": nsr_id},
3106 "path": nsr_db_path,
3107 }
3108
3109 if k8s_instance_info.get("kdu-deployment-name"):
3110 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3111 else:
3112 kdu_instance = self.k8scluster_map[
3113 k8sclustertype
3114 ].generate_kdu_instance_name(
3115 db_dict=db_dict_install,
3116 kdu_model=k8s_instance_info["kdu-model"],
3117 kdu_name=k8s_instance_info["kdu-name"],
3118 )
3119 self.update_db_2(
3120 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3121 )
3122 await self.k8scluster_map[k8sclustertype].install(
3123 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3124 kdu_model=k8s_instance_info["kdu-model"],
3125 atomic=True,
3126 params=k8params,
3127 db_dict=db_dict_install,
3128 timeout=timeout,
3129 kdu_name=k8s_instance_info["kdu-name"],
3130 namespace=k8s_instance_info["namespace"],
3131 kdu_instance=kdu_instance,
3132 vca_id=vca_id,
3133 )
3134 self.update_db_2(
3135 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3136 )
3137
3138 # Obtain services to obtain management service ip
3139 services = await self.k8scluster_map[k8sclustertype].get_services(
3140 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3141 kdu_instance=kdu_instance,
3142 namespace=k8s_instance_info["namespace"],
3143 )
3144
3145 # Obtain management service info (if exists)
3146 vnfr_update_dict = {}
3147 kdu_config = get_configuration(vnfd, kdud["name"])
3148 if kdu_config:
3149 target_ee_list = kdu_config.get("execution-environment-list", [])
3150 else:
3151 target_ee_list = []
3152
3153 if services:
3154 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3155 mgmt_services = [
3156 service
3157 for service in kdud.get("service", [])
3158 if service.get("mgmt-service")
3159 ]
3160 for mgmt_service in mgmt_services:
3161 for service in services:
3162 if service["name"].startswith(mgmt_service["name"]):
3163 # Mgmt service found, Obtain service ip
3164 ip = service.get("external_ip", service.get("cluster_ip"))
3165 if isinstance(ip, list) and len(ip) == 1:
3166 ip = ip[0]
3167
3168 vnfr_update_dict[
3169 "kdur.{}.ip-address".format(kdu_index)
3170 ] = ip
3171
3172 # Check if must update also mgmt ip at the vnf
3173 service_external_cp = mgmt_service.get(
3174 "external-connection-point-ref"
3175 )
3176 if service_external_cp:
3177 if (
3178 deep_get(vnfd, ("mgmt-interface", "cp"))
3179 == service_external_cp
3180 ):
3181 vnfr_update_dict["ip-address"] = ip
3182
3183 if find_in_list(
3184 target_ee_list,
3185 lambda ee: ee.get(
3186 "external-connection-point-ref", ""
3187 )
3188 == service_external_cp,
3189 ):
3190 vnfr_update_dict[
3191 "kdur.{}.ip-address".format(kdu_index)
3192 ] = ip
3193 break
3194 else:
3195 self.logger.warn(
3196 "Mgmt service name: {} not found".format(
3197 mgmt_service["name"]
3198 )
3199 )
3200
3201 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3202 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3203
3204 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3205 if (
3206 kdu_config
3207 and kdu_config.get("initial-config-primitive")
3208 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3209 ):
3210 initial_config_primitive_list = kdu_config.get(
3211 "initial-config-primitive"
3212 )
3213 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3214
3215 for initial_config_primitive in initial_config_primitive_list:
3216 primitive_params_ = self._map_primitive_params(
3217 initial_config_primitive, {}, {}
3218 )
3219
3220 await asyncio.wait_for(
3221 self.k8scluster_map[k8sclustertype].exec_primitive(
3222 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3223 kdu_instance=kdu_instance,
3224 primitive_name=initial_config_primitive["name"],
3225 params=primitive_params_,
3226 db_dict=db_dict_install,
3227 vca_id=vca_id,
3228 ),
3229 timeout=timeout,
3230 )
3231
3232 except Exception as e:
3233 # Prepare update db with error and raise exception
3234 try:
3235 self.update_db_2(
3236 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3237 )
3238 self.update_db_2(
3239 "vnfrs",
3240 vnfr_data.get("_id"),
3241 {"kdur.{}.status".format(kdu_index): "ERROR"},
3242 )
3243 except Exception:
3244 # ignore to keep original exception
3245 pass
3246 # reraise original error
3247 raise
3248
3249 return kdu_instance
3250
3251 async def deploy_kdus(
3252 self,
3253 logging_text,
3254 nsr_id,
3255 nslcmop_id,
3256 db_vnfrs,
3257 db_vnfds,
3258 task_instantiation_info,
3259 ):
3260 # Launch kdus if present in the descriptor
3261
3262 k8scluster_id_2_uuic = {
3263 "helm-chart-v3": {},
3264 "helm-chart": {},
3265 "juju-bundle": {},
3266 }
3267
3268 async def _get_cluster_id(cluster_id, cluster_type):
3269 nonlocal k8scluster_id_2_uuic
3270 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3271 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3272
3273 # check if K8scluster is creating and wait look if previous tasks in process
3274 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3275 "k8scluster", cluster_id
3276 )
3277 if task_dependency:
3278 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3279 task_name, cluster_id
3280 )
3281 self.logger.debug(logging_text + text)
3282 await asyncio.wait(task_dependency, timeout=3600)
3283
3284 db_k8scluster = self.db.get_one(
3285 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3286 )
3287 if not db_k8scluster:
3288 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3289
3290 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3291 if not k8s_id:
3292 if cluster_type == "helm-chart-v3":
3293 try:
3294 # backward compatibility for existing clusters that have not been initialized for helm v3
3295 k8s_credentials = yaml.safe_dump(
3296 db_k8scluster.get("credentials")
3297 )
3298 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3299 k8s_credentials, reuse_cluster_uuid=cluster_id
3300 )
3301 db_k8scluster_update = {}
3302 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3303 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3304 db_k8scluster_update[
3305 "_admin.helm-chart-v3.created"
3306 ] = uninstall_sw
3307 db_k8scluster_update[
3308 "_admin.helm-chart-v3.operationalState"
3309 ] = "ENABLED"
3310 self.update_db_2(
3311 "k8sclusters", cluster_id, db_k8scluster_update
3312 )
3313 except Exception as e:
3314 self.logger.error(
3315 logging_text
3316 + "error initializing helm-v3 cluster: {}".format(str(e))
3317 )
3318 raise LcmException(
3319 "K8s cluster '{}' has not been initialized for '{}'".format(
3320 cluster_id, cluster_type
3321 )
3322 )
3323 else:
3324 raise LcmException(
3325 "K8s cluster '{}' has not been initialized for '{}'".format(
3326 cluster_id, cluster_type
3327 )
3328 )
3329 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3330 return k8s_id
3331
3332 logging_text += "Deploy kdus: "
3333 step = ""
3334 try:
3335 db_nsr_update = {"_admin.deployed.K8s": []}
3336 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3337
3338 index = 0
3339 updated_cluster_list = []
3340 updated_v3_cluster_list = []
3341
3342 for vnfr_data in db_vnfrs.values():
3343 vca_id = self.get_vca_id(vnfr_data, {})
3344 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3345 # Step 0: Prepare and set parameters
3346 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3347 vnfd_id = vnfr_data.get("vnfd-id")
3348 vnfd_with_id = find_in_list(
3349 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3350 )
3351 kdud = next(
3352 kdud
3353 for kdud in vnfd_with_id["kdu"]
3354 if kdud["name"] == kdur["kdu-name"]
3355 )
3356 namespace = kdur.get("k8s-namespace")
3357 kdu_deployment_name = kdur.get("kdu-deployment-name")
3358 if kdur.get("helm-chart"):
3359 kdumodel = kdur["helm-chart"]
3360 # Default version: helm3, if helm-version is v2 assign v2
3361 k8sclustertype = "helm-chart-v3"
3362 self.logger.debug("kdur: {}".format(kdur))
3363 if (
3364 kdur.get("helm-version")
3365 and kdur.get("helm-version") == "v2"
3366 ):
3367 k8sclustertype = "helm-chart"
3368 elif kdur.get("juju-bundle"):
3369 kdumodel = kdur["juju-bundle"]
3370 k8sclustertype = "juju-bundle"
3371 else:
3372 raise LcmException(
3373 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3374 "juju-bundle. Maybe an old NBI version is running".format(
3375 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3376 )
3377 )
3378 # check if kdumodel is a file and exists
3379 try:
3380 vnfd_with_id = find_in_list(
3381 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3382 )
3383 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3384 if storage: # may be not present if vnfd has not artifacts
3385 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3386 if storage["pkg-dir"]:
3387 filename = "{}/{}/{}s/{}".format(
3388 storage["folder"],
3389 storage["pkg-dir"],
3390 k8sclustertype,
3391 kdumodel,
3392 )
3393 else:
3394 filename = "{}/Scripts/{}s/{}".format(
3395 storage["folder"],
3396 k8sclustertype,
3397 kdumodel,
3398 )
3399 if self.fs.file_exists(
3400 filename, mode="file"
3401 ) or self.fs.file_exists(filename, mode="dir"):
3402 kdumodel = self.fs.path + filename
3403 except (asyncio.TimeoutError, asyncio.CancelledError):
3404 raise
3405 except Exception: # it is not a file
3406 pass
3407
3408 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3409 step = "Synchronize repos for k8s cluster '{}'".format(
3410 k8s_cluster_id
3411 )
3412 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3413
3414 # Synchronize repos
3415 if (
3416 k8sclustertype == "helm-chart"
3417 and cluster_uuid not in updated_cluster_list
3418 ) or (
3419 k8sclustertype == "helm-chart-v3"
3420 and cluster_uuid not in updated_v3_cluster_list
3421 ):
3422 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3423 self.k8scluster_map[k8sclustertype].synchronize_repos(
3424 cluster_uuid=cluster_uuid
3425 )
3426 )
3427 if del_repo_list or added_repo_dict:
3428 if k8sclustertype == "helm-chart":
3429 unset = {
3430 "_admin.helm_charts_added." + item: None
3431 for item in del_repo_list
3432 }
3433 updated = {
3434 "_admin.helm_charts_added." + item: name
3435 for item, name in added_repo_dict.items()
3436 }
3437 updated_cluster_list.append(cluster_uuid)
3438 elif k8sclustertype == "helm-chart-v3":
3439 unset = {
3440 "_admin.helm_charts_v3_added." + item: None
3441 for item in del_repo_list
3442 }
3443 updated = {
3444 "_admin.helm_charts_v3_added." + item: name
3445 for item, name in added_repo_dict.items()
3446 }
3447 updated_v3_cluster_list.append(cluster_uuid)
3448 self.logger.debug(
3449 logging_text + "repos synchronized on k8s cluster "
3450 "'{}' to_delete: {}, to_add: {}".format(
3451 k8s_cluster_id, del_repo_list, added_repo_dict
3452 )
3453 )
3454 self.db.set_one(
3455 "k8sclusters",
3456 {"_id": k8s_cluster_id},
3457 updated,
3458 unset=unset,
3459 )
3460
3461 # Instantiate kdu
3462 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3463 vnfr_data["member-vnf-index-ref"],
3464 kdur["kdu-name"],
3465 k8s_cluster_id,
3466 )
3467 k8s_instance_info = {
3468 "kdu-instance": None,
3469 "k8scluster-uuid": cluster_uuid,
3470 "k8scluster-type": k8sclustertype,
3471 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3472 "kdu-name": kdur["kdu-name"],
3473 "kdu-model": kdumodel,
3474 "namespace": namespace,
3475 "kdu-deployment-name": kdu_deployment_name,
3476 }
3477 db_path = "_admin.deployed.K8s.{}".format(index)
3478 db_nsr_update[db_path] = k8s_instance_info
3479 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3480 vnfd_with_id = find_in_list(
3481 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3482 )
3483 task = asyncio.ensure_future(
3484 self._install_kdu(
3485 nsr_id,
3486 db_path,
3487 vnfr_data,
3488 kdu_index,
3489 kdud,
3490 vnfd_with_id,
3491 k8s_instance_info,
3492 k8params=desc_params,
3493 timeout=600,
3494 vca_id=vca_id,
3495 )
3496 )
3497 self.lcm_tasks.register(
3498 "ns",
3499 nsr_id,
3500 nslcmop_id,
3501 "instantiate_KDU-{}".format(index),
3502 task,
3503 )
3504 task_instantiation_info[task] = "Deploying KDU {}".format(
3505 kdur["kdu-name"]
3506 )
3507
3508 index += 1
3509
3510 except (LcmException, asyncio.CancelledError):
3511 raise
3512 except Exception as e:
3513 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3514 if isinstance(e, (N2VCException, DbException)):
3515 self.logger.error(logging_text + msg)
3516 else:
3517 self.logger.critical(logging_text + msg, exc_info=True)
3518 raise LcmException(msg)
3519 finally:
3520 if db_nsr_update:
3521 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3522
3523 def _deploy_n2vc(
3524 self,
3525 logging_text,
3526 db_nsr,
3527 db_vnfr,
3528 nslcmop_id,
3529 nsr_id,
3530 nsi_id,
3531 vnfd_id,
3532 vdu_id,
3533 kdu_name,
3534 member_vnf_index,
3535 vdu_index,
3536 vdu_name,
3537 deploy_params,
3538 descriptor_config,
3539 base_folder,
3540 task_instantiation_info,
3541 stage,
3542 ):
3543 # launch instantiate_N2VC in a asyncio task and register task object
3544 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3545 # if not found, create one entry and update database
3546 # fill db_nsr._admin.deployed.VCA.<index>
3547
3548 self.logger.debug(
3549 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3550 )
3551 if "execution-environment-list" in descriptor_config:
3552 ee_list = descriptor_config.get("execution-environment-list", [])
3553 elif "juju" in descriptor_config:
3554 ee_list = [descriptor_config] # ns charms
3555 else: # other types as script are not supported
3556 ee_list = []
3557
3558 for ee_item in ee_list:
3559 self.logger.debug(
3560 logging_text
3561 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3562 ee_item.get("juju"), ee_item.get("helm-chart")
3563 )
3564 )
3565 ee_descriptor_id = ee_item.get("id")
3566 if ee_item.get("juju"):
3567 vca_name = ee_item["juju"].get("charm")
3568 vca_type = (
3569 "lxc_proxy_charm"
3570 if ee_item["juju"].get("charm") is not None
3571 else "native_charm"
3572 )
3573 if ee_item["juju"].get("cloud") == "k8s":
3574 vca_type = "k8s_proxy_charm"
3575 elif ee_item["juju"].get("proxy") is False:
3576 vca_type = "native_charm"
3577 elif ee_item.get("helm-chart"):
3578 vca_name = ee_item["helm-chart"]
3579 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3580 vca_type = "helm"
3581 else:
3582 vca_type = "helm-v3"
3583 else:
3584 self.logger.debug(
3585 logging_text + "skipping non juju neither charm configuration"
3586 )
3587 continue
3588
3589 vca_index = -1
3590 for vca_index, vca_deployed in enumerate(
3591 db_nsr["_admin"]["deployed"]["VCA"]
3592 ):
3593 if not vca_deployed:
3594 continue
3595 if (
3596 vca_deployed.get("member-vnf-index") == member_vnf_index
3597 and vca_deployed.get("vdu_id") == vdu_id
3598 and vca_deployed.get("kdu_name") == kdu_name
3599 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3600 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3601 ):
3602 break
3603 else:
3604 # not found, create one.
3605 target = (
3606 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3607 )
3608 if vdu_id:
3609 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3610 elif kdu_name:
3611 target += "/kdu/{}".format(kdu_name)
3612 vca_deployed = {
3613 "target_element": target,
3614 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3615 "member-vnf-index": member_vnf_index,
3616 "vdu_id": vdu_id,
3617 "kdu_name": kdu_name,
3618 "vdu_count_index": vdu_index,
3619 "operational-status": "init", # TODO revise
3620 "detailed-status": "", # TODO revise
3621 "step": "initial-deploy", # TODO revise
3622 "vnfd_id": vnfd_id,
3623 "vdu_name": vdu_name,
3624 "type": vca_type,
3625 "ee_descriptor_id": ee_descriptor_id,
3626 }
3627 vca_index += 1
3628
3629 # create VCA and configurationStatus in db
3630 db_dict = {
3631 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3632 "configurationStatus.{}".format(vca_index): dict(),
3633 }
3634 self.update_db_2("nsrs", nsr_id, db_dict)
3635
3636 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3637
3638 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3639 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3640 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3641
3642 # Launch task
3643 task_n2vc = asyncio.ensure_future(
3644 self.instantiate_N2VC(
3645 logging_text=logging_text,
3646 vca_index=vca_index,
3647 nsi_id=nsi_id,
3648 db_nsr=db_nsr,
3649 db_vnfr=db_vnfr,
3650 vdu_id=vdu_id,
3651 kdu_name=kdu_name,
3652 vdu_index=vdu_index,
3653 deploy_params=deploy_params,
3654 config_descriptor=descriptor_config,
3655 base_folder=base_folder,
3656 nslcmop_id=nslcmop_id,
3657 stage=stage,
3658 vca_type=vca_type,
3659 vca_name=vca_name,
3660 ee_config_descriptor=ee_item,
3661 )
3662 )
3663 self.lcm_tasks.register(
3664 "ns",
3665 nsr_id,
3666 nslcmop_id,
3667 "instantiate_N2VC-{}".format(vca_index),
3668 task_n2vc,
3669 )
3670 task_instantiation_info[
3671 task_n2vc
3672 ] = self.task_name_deploy_vca + " {}.{}".format(
3673 member_vnf_index or "", vdu_id or ""
3674 )
3675
3676 @staticmethod
3677 def _create_nslcmop(nsr_id, operation, params):
3678 """
3679 Creates a ns-lcm-opp content to be stored at database.
3680 :param nsr_id: internal id of the instance
3681 :param operation: instantiate, terminate, scale, action, ...
3682 :param params: user parameters for the operation
3683 :return: dictionary following SOL005 format
3684 """
3685 # Raise exception if invalid arguments
3686 if not (nsr_id and operation and params):
3687 raise LcmException(
3688 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3689 )
3690 now = time()
3691 _id = str(uuid4())
3692 nslcmop = {
3693 "id": _id,
3694 "_id": _id,
3695 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3696 "operationState": "PROCESSING",
3697 "statusEnteredTime": now,
3698 "nsInstanceId": nsr_id,
3699 "lcmOperationType": operation,
3700 "startTime": now,
3701 "isAutomaticInvocation": False,
3702 "operationParams": params,
3703 "isCancelPending": False,
3704 "links": {
3705 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3706 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3707 },
3708 }
3709 return nslcmop
3710
3711 def _format_additional_params(self, params):
3712 params = params or {}
3713 for key, value in params.items():
3714 if str(value).startswith("!!yaml "):
3715 params[key] = yaml.safe_load(value[7:])
3716 return params
3717
3718 def _get_terminate_primitive_params(self, seq, vnf_index):
3719 primitive = seq.get("name")
3720 primitive_params = {}
3721 params = {
3722 "member_vnf_index": vnf_index,
3723 "primitive": primitive,
3724 "primitive_params": primitive_params,
3725 }
3726 desc_params = {}
3727 return self._map_primitive_params(seq, params, desc_params)
3728
3729 # sub-operations
3730
3731 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3732 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3733 if op.get("operationState") == "COMPLETED":
3734 # b. Skip sub-operation
3735 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3736 return self.SUBOPERATION_STATUS_SKIP
3737 else:
3738 # c. retry executing sub-operation
3739 # The sub-operation exists, and operationState != 'COMPLETED'
3740 # Update operationState = 'PROCESSING' to indicate a retry.
3741 operationState = "PROCESSING"
3742 detailed_status = "In progress"
3743 self._update_suboperation_status(
3744 db_nslcmop, op_index, operationState, detailed_status
3745 )
3746 # Return the sub-operation index
3747 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3748 # with arguments extracted from the sub-operation
3749 return op_index
3750
3751 # Find a sub-operation where all keys in a matching dictionary must match
3752 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3753 def _find_suboperation(self, db_nslcmop, match):
3754 if db_nslcmop and match:
3755 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3756 for i, op in enumerate(op_list):
3757 if all(op.get(k) == match[k] for k in match):
3758 return i
3759 return self.SUBOPERATION_STATUS_NOT_FOUND
3760
3761 # Update status for a sub-operation given its index
3762 def _update_suboperation_status(
3763 self, db_nslcmop, op_index, operationState, detailed_status
3764 ):
3765 # Update DB for HA tasks
3766 q_filter = {"_id": db_nslcmop["_id"]}
3767 update_dict = {
3768 "_admin.operations.{}.operationState".format(op_index): operationState,
3769 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3770 }
3771 self.db.set_one(
3772 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3773 )
3774
3775 # Add sub-operation, return the index of the added sub-operation
3776 # Optionally, set operationState, detailed-status, and operationType
3777 # Status and type are currently set for 'scale' sub-operations:
3778 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3779 # 'detailed-status' : status message
3780 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3781 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3782 def _add_suboperation(
3783 self,
3784 db_nslcmop,
3785 vnf_index,
3786 vdu_id,
3787 vdu_count_index,
3788 vdu_name,
3789 primitive,
3790 mapped_primitive_params,
3791 operationState=None,
3792 detailed_status=None,
3793 operationType=None,
3794 RO_nsr_id=None,
3795 RO_scaling_info=None,
3796 ):
3797 if not db_nslcmop:
3798 return self.SUBOPERATION_STATUS_NOT_FOUND
3799 # Get the "_admin.operations" list, if it exists
3800 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3801 op_list = db_nslcmop_admin.get("operations")
3802 # Create or append to the "_admin.operations" list
3803 new_op = {
3804 "member_vnf_index": vnf_index,
3805 "vdu_id": vdu_id,
3806 "vdu_count_index": vdu_count_index,
3807 "primitive": primitive,
3808 "primitive_params": mapped_primitive_params,
3809 }
3810 if operationState:
3811 new_op["operationState"] = operationState
3812 if detailed_status:
3813 new_op["detailed-status"] = detailed_status
3814 if operationType:
3815 new_op["lcmOperationType"] = operationType
3816 if RO_nsr_id:
3817 new_op["RO_nsr_id"] = RO_nsr_id
3818 if RO_scaling_info:
3819 new_op["RO_scaling_info"] = RO_scaling_info
3820 if not op_list:
3821 # No existing operations, create key 'operations' with current operation as first list element
3822 db_nslcmop_admin.update({"operations": [new_op]})
3823 op_list = db_nslcmop_admin.get("operations")
3824 else:
3825 # Existing operations, append operation to list
3826 op_list.append(new_op)
3827
3828 db_nslcmop_update = {"_admin.operations": op_list}
3829 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3830 op_index = len(op_list) - 1
3831 return op_index
3832
3833 # Helper methods for scale() sub-operations
3834
3835 # pre-scale/post-scale:
3836 # Check for 3 different cases:
3837 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3838 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3839 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3840 def _check_or_add_scale_suboperation(
3841 self,
3842 db_nslcmop,
3843 vnf_index,
3844 vnf_config_primitive,
3845 primitive_params,
3846 operationType,
3847 RO_nsr_id=None,
3848 RO_scaling_info=None,
3849 ):
3850 # Find this sub-operation
3851 if RO_nsr_id and RO_scaling_info:
3852 operationType = "SCALE-RO"
3853 match = {
3854 "member_vnf_index": vnf_index,
3855 "RO_nsr_id": RO_nsr_id,
3856 "RO_scaling_info": RO_scaling_info,
3857 }
3858 else:
3859 match = {
3860 "member_vnf_index": vnf_index,
3861 "primitive": vnf_config_primitive,
3862 "primitive_params": primitive_params,
3863 "lcmOperationType": operationType,
3864 }
3865 op_index = self._find_suboperation(db_nslcmop, match)
3866 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3867 # a. New sub-operation
3868 # The sub-operation does not exist, add it.
3869 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3870 # The following parameters are set to None for all kind of scaling:
3871 vdu_id = None
3872 vdu_count_index = None
3873 vdu_name = None
3874 if RO_nsr_id and RO_scaling_info:
3875 vnf_config_primitive = None
3876 primitive_params = None
3877 else:
3878 RO_nsr_id = None
3879 RO_scaling_info = None
3880 # Initial status for sub-operation
3881 operationState = "PROCESSING"
3882 detailed_status = "In progress"
3883 # Add sub-operation for pre/post-scaling (zero or more operations)
3884 self._add_suboperation(
3885 db_nslcmop,
3886 vnf_index,
3887 vdu_id,
3888 vdu_count_index,
3889 vdu_name,
3890 vnf_config_primitive,
3891 primitive_params,
3892 operationState,
3893 detailed_status,
3894 operationType,
3895 RO_nsr_id,
3896 RO_scaling_info,
3897 )
3898 return self.SUBOPERATION_STATUS_NEW
3899 else:
3900 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3901 # or op_index (operationState != 'COMPLETED')
3902 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3903
3904 # Function to return execution_environment id
3905
3906 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3907 # TODO vdu_index_count
3908 for vca in vca_deployed_list:
3909 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3910 return vca["ee_id"]
3911
3912 async def destroy_N2VC(
3913 self,
3914 logging_text,
3915 db_nslcmop,
3916 vca_deployed,
3917 config_descriptor,
3918 vca_index,
3919 destroy_ee=True,
3920 exec_primitives=True,
3921 scaling_in=False,
3922 vca_id: str = None,
3923 ):
3924 """
3925 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3926 :param logging_text:
3927 :param db_nslcmop:
3928 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3929 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3930 :param vca_index: index in the database _admin.deployed.VCA
3931 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3932 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3933 not executed properly
3934 :param scaling_in: True destroys the application, False destroys the model
3935 :return: None or exception
3936 """
3937
3938 self.logger.debug(
3939 logging_text
3940 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3941 vca_index, vca_deployed, config_descriptor, destroy_ee
3942 )
3943 )
3944
3945 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3946
3947 # execute terminate_primitives
3948 if exec_primitives:
3949 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3950 config_descriptor.get("terminate-config-primitive"),
3951 vca_deployed.get("ee_descriptor_id"),
3952 )
3953 vdu_id = vca_deployed.get("vdu_id")
3954 vdu_count_index = vca_deployed.get("vdu_count_index")
3955 vdu_name = vca_deployed.get("vdu_name")
3956 vnf_index = vca_deployed.get("member-vnf-index")
3957 if terminate_primitives and vca_deployed.get("needed_terminate"):
3958 for seq in terminate_primitives:
3959 # For each sequence in list, get primitive and call _ns_execute_primitive()
3960 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3961 vnf_index, seq.get("name")
3962 )
3963 self.logger.debug(logging_text + step)
3964 # Create the primitive for each sequence, i.e. "primitive": "touch"
3965 primitive = seq.get("name")
3966 mapped_primitive_params = self._get_terminate_primitive_params(
3967 seq, vnf_index
3968 )
3969
3970 # Add sub-operation
3971 self._add_suboperation(
3972 db_nslcmop,
3973 vnf_index,
3974 vdu_id,
3975 vdu_count_index,
3976 vdu_name,
3977 primitive,
3978 mapped_primitive_params,
3979 )
3980 # Sub-operations: Call _ns_execute_primitive() instead of action()
3981 try:
3982 result, result_detail = await self._ns_execute_primitive(
3983 vca_deployed["ee_id"],
3984 primitive,
3985 mapped_primitive_params,
3986 vca_type=vca_type,
3987 vca_id=vca_id,
3988 )
3989 except LcmException:
3990 # this happens when VCA is not deployed. In this case it is not needed to terminate
3991 continue
3992 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3993 if result not in result_ok:
3994 raise LcmException(
3995 "terminate_primitive {} for vnf_member_index={} fails with "
3996 "error {}".format(seq.get("name"), vnf_index, result_detail)
3997 )
3998 # set that this VCA do not need terminated
3999 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4000 vca_index
4001 )
4002 self.update_db_2(
4003 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4004 )
4005
4006 # Delete Prometheus Jobs if any
4007 # This uses NSR_ID, so it will destroy any jobs under this index
4008 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4009
4010 if destroy_ee:
4011 await self.vca_map[vca_type].delete_execution_environment(
4012 vca_deployed["ee_id"],
4013 scaling_in=scaling_in,
4014 vca_type=vca_type,
4015 vca_id=vca_id,
4016 )
4017
4018 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4019 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4020 namespace = "." + db_nsr["_id"]
4021 try:
4022 await self.n2vc.delete_namespace(
4023 namespace=namespace,
4024 total_timeout=self.timeout_charm_delete,
4025 vca_id=vca_id,
4026 )
4027 except N2VCNotFound: # already deleted. Skip
4028 pass
4029 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4030
4031 async def _terminate_RO(
4032 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4033 ):
4034 """
4035 Terminates a deployment from RO
4036 :param logging_text:
4037 :param nsr_deployed: db_nsr._admin.deployed
4038 :param nsr_id:
4039 :param nslcmop_id:
4040 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4041 this method will update only the index 2, but it will write on database the concatenated content of the list
4042 :return:
4043 """
4044 db_nsr_update = {}
4045 failed_detail = []
4046 ro_nsr_id = ro_delete_action = None
4047 if nsr_deployed and nsr_deployed.get("RO"):
4048 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4049 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4050 try:
4051 if ro_nsr_id:
4052 stage[2] = "Deleting ns from VIM."
4053 db_nsr_update["detailed-status"] = " ".join(stage)
4054 self._write_op_status(nslcmop_id, stage)
4055 self.logger.debug(logging_text + stage[2])
4056 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4057 self._write_op_status(nslcmop_id, stage)
4058 desc = await self.RO.delete("ns", ro_nsr_id)
4059 ro_delete_action = desc["action_id"]
4060 db_nsr_update[
4061 "_admin.deployed.RO.nsr_delete_action_id"
4062 ] = ro_delete_action
4063 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4064 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4065 if ro_delete_action:
4066 # wait until NS is deleted from VIM
4067 stage[2] = "Waiting ns deleted from VIM."
4068 detailed_status_old = None
4069 self.logger.debug(
4070 logging_text
4071 + stage[2]
4072 + " RO_id={} ro_delete_action={}".format(
4073 ro_nsr_id, ro_delete_action
4074 )
4075 )
4076 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4077 self._write_op_status(nslcmop_id, stage)
4078
4079 delete_timeout = 20 * 60 # 20 minutes
4080 while delete_timeout > 0:
4081 desc = await self.RO.show(
4082 "ns",
4083 item_id_name=ro_nsr_id,
4084 extra_item="action",
4085 extra_item_id=ro_delete_action,
4086 )
4087
4088 # deploymentStatus
4089 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4090
4091 ns_status, ns_status_info = self.RO.check_action_status(desc)
4092 if ns_status == "ERROR":
4093 raise ROclient.ROClientException(ns_status_info)
4094 elif ns_status == "BUILD":
4095 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4096 elif ns_status == "ACTIVE":
4097 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4098 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4099 break
4100 else:
4101 assert (
4102 False
4103 ), "ROclient.check_action_status returns unknown {}".format(
4104 ns_status
4105 )
4106 if stage[2] != detailed_status_old:
4107 detailed_status_old = stage[2]
4108 db_nsr_update["detailed-status"] = " ".join(stage)
4109 self._write_op_status(nslcmop_id, stage)
4110 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4111 await asyncio.sleep(5, loop=self.loop)
4112 delete_timeout -= 5
4113 else: # delete_timeout <= 0:
4114 raise ROclient.ROClientException(
4115 "Timeout waiting ns deleted from VIM"
4116 )
4117
4118 except Exception as e:
4119 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4120 if (
4121 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4122 ): # not found
4123 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4124 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4125 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4126 self.logger.debug(
4127 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4128 )
4129 elif (
4130 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4131 ): # conflict
4132 failed_detail.append("delete conflict: {}".format(e))
4133 self.logger.debug(
4134 logging_text
4135 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4136 )
4137 else:
4138 failed_detail.append("delete error: {}".format(e))
4139 self.logger.error(
4140 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4141 )
4142
4143 # Delete nsd
4144 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4145 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4146 try:
4147 stage[2] = "Deleting nsd from RO."
4148 db_nsr_update["detailed-status"] = " ".join(stage)
4149 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4150 self._write_op_status(nslcmop_id, stage)
4151 await self.RO.delete("nsd", ro_nsd_id)
4152 self.logger.debug(
4153 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4154 )
4155 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4156 except Exception as e:
4157 if (
4158 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4159 ): # not found
4160 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4161 self.logger.debug(
4162 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4163 )
4164 elif (
4165 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4166 ): # conflict
4167 failed_detail.append(
4168 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4169 )
4170 self.logger.debug(logging_text + failed_detail[-1])
4171 else:
4172 failed_detail.append(
4173 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4174 )
4175 self.logger.error(logging_text + failed_detail[-1])
4176
4177 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4178 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4179 if not vnf_deployed or not vnf_deployed["id"]:
4180 continue
4181 try:
4182 ro_vnfd_id = vnf_deployed["id"]
4183 stage[
4184 2
4185 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4186 vnf_deployed["member-vnf-index"], ro_vnfd_id
4187 )
4188 db_nsr_update["detailed-status"] = " ".join(stage)
4189 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4190 self._write_op_status(nslcmop_id, stage)
4191 await self.RO.delete("vnfd", ro_vnfd_id)
4192 self.logger.debug(
4193 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4194 )
4195 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4196 except Exception as e:
4197 if (
4198 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4199 ): # not found
4200 db_nsr_update[
4201 "_admin.deployed.RO.vnfd.{}.id".format(index)
4202 ] = None
4203 self.logger.debug(
4204 logging_text
4205 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4206 )
4207 elif (
4208 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4209 ): # conflict
4210 failed_detail.append(
4211 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4212 )
4213 self.logger.debug(logging_text + failed_detail[-1])
4214 else:
4215 failed_detail.append(
4216 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4217 )
4218 self.logger.error(logging_text + failed_detail[-1])
4219
4220 if failed_detail:
4221 stage[2] = "Error deleting from VIM"
4222 else:
4223 stage[2] = "Deleted from VIM"
4224 db_nsr_update["detailed-status"] = " ".join(stage)
4225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4226 self._write_op_status(nslcmop_id, stage)
4227
4228 if failed_detail:
4229 raise LcmException("; ".join(failed_detail))
4230
4231 async def terminate(self, nsr_id, nslcmop_id):
4232 # Try to lock HA task here
4233 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4234 if not task_is_locked_by_me:
4235 return
4236
4237 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4238 self.logger.debug(logging_text + "Enter")
4239 timeout_ns_terminate = self.timeout_ns_terminate
4240 db_nsr = None
4241 db_nslcmop = None
4242 operation_params = None
4243 exc = None
4244 error_list = [] # annotates all failed error messages
4245 db_nslcmop_update = {}
4246 autoremove = False # autoremove after terminated
4247 tasks_dict_info = {}
4248 db_nsr_update = {}
4249 stage = [
4250 "Stage 1/3: Preparing task.",
4251 "Waiting for previous operations to terminate.",
4252 "",
4253 ]
4254 # ^ contains [stage, step, VIM-status]
4255 try:
4256 # wait for any previous tasks in process
4257 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4258
4259 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4260 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4261 operation_params = db_nslcmop.get("operationParams") or {}
4262 if operation_params.get("timeout_ns_terminate"):
4263 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4264 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4265 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4266
4267 db_nsr_update["operational-status"] = "terminating"
4268 db_nsr_update["config-status"] = "terminating"
4269 self._write_ns_status(
4270 nsr_id=nsr_id,
4271 ns_state="TERMINATING",
4272 current_operation="TERMINATING",
4273 current_operation_id=nslcmop_id,
4274 other_update=db_nsr_update,
4275 )
4276 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4277 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4278 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4279 return
4280
4281 stage[1] = "Getting vnf descriptors from db."
4282 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4283 db_vnfrs_dict = {
4284 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4285 }
4286 db_vnfds_from_id = {}
4287 db_vnfds_from_member_index = {}
4288 # Loop over VNFRs
4289 for vnfr in db_vnfrs_list:
4290 vnfd_id = vnfr["vnfd-id"]
4291 if vnfd_id not in db_vnfds_from_id:
4292 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4293 db_vnfds_from_id[vnfd_id] = vnfd
4294 db_vnfds_from_member_index[
4295 vnfr["member-vnf-index-ref"]
4296 ] = db_vnfds_from_id[vnfd_id]
4297
4298 # Destroy individual execution environments when there are terminating primitives.
4299 # Rest of EE will be deleted at once
4300 # TODO - check before calling _destroy_N2VC
4301 # if not operation_params.get("skip_terminate_primitives"):#
4302 # or not vca.get("needed_terminate"):
4303 stage[0] = "Stage 2/3 execute terminating primitives."
4304 self.logger.debug(logging_text + stage[0])
4305 stage[1] = "Looking execution environment that needs terminate."
4306 self.logger.debug(logging_text + stage[1])
4307
4308 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4309 config_descriptor = None
4310 vca_member_vnf_index = vca.get("member-vnf-index")
4311 vca_id = self.get_vca_id(
4312 db_vnfrs_dict.get(vca_member_vnf_index)
4313 if vca_member_vnf_index
4314 else None,
4315 db_nsr,
4316 )
4317 if not vca or not vca.get("ee_id"):
4318 continue
4319 if not vca.get("member-vnf-index"):
4320 # ns
4321 config_descriptor = db_nsr.get("ns-configuration")
4322 elif vca.get("vdu_id"):
4323 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4324 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4325 elif vca.get("kdu_name"):
4326 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4327 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4328 else:
4329 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4330 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4331 vca_type = vca.get("type")
4332 exec_terminate_primitives = not operation_params.get(
4333 "skip_terminate_primitives"
4334 ) and vca.get("needed_terminate")
4335 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4336 # pending native charms
4337 destroy_ee = (
4338 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4339 )
4340 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4341 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4342 task = asyncio.ensure_future(
4343 self.destroy_N2VC(
4344 logging_text,
4345 db_nslcmop,
4346 vca,
4347 config_descriptor,
4348 vca_index,
4349 destroy_ee,
4350 exec_terminate_primitives,
4351 vca_id=vca_id,
4352 )
4353 )
4354 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4355
4356 # wait for pending tasks of terminate primitives
4357 if tasks_dict_info:
4358 self.logger.debug(
4359 logging_text
4360 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4361 )
4362 error_list = await self._wait_for_tasks(
4363 logging_text,
4364 tasks_dict_info,
4365 min(self.timeout_charm_delete, timeout_ns_terminate),
4366 stage,
4367 nslcmop_id,
4368 )
4369 tasks_dict_info.clear()
4370 if error_list:
4371 return # raise LcmException("; ".join(error_list))
4372
4373 # remove All execution environments at once
4374 stage[0] = "Stage 3/3 delete all."
4375
4376 if nsr_deployed.get("VCA"):
4377 stage[1] = "Deleting all execution environments."
4378 self.logger.debug(logging_text + stage[1])
4379 vca_id = self.get_vca_id({}, db_nsr)
4380 task_delete_ee = asyncio.ensure_future(
4381 asyncio.wait_for(
4382 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4383 timeout=self.timeout_charm_delete,
4384 )
4385 )
4386 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4387 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4388
4389 # Delete from k8scluster
4390 stage[1] = "Deleting KDUs."
4391 self.logger.debug(logging_text + stage[1])
4392 # print(nsr_deployed)
4393 for kdu in get_iterable(nsr_deployed, "K8s"):
4394 if not kdu or not kdu.get("kdu-instance"):
4395 continue
4396 kdu_instance = kdu.get("kdu-instance")
4397 if kdu.get("k8scluster-type") in self.k8scluster_map:
4398 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4399 vca_id = self.get_vca_id({}, db_nsr)
4400 task_delete_kdu_instance = asyncio.ensure_future(
4401 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4402 cluster_uuid=kdu.get("k8scluster-uuid"),
4403 kdu_instance=kdu_instance,
4404 vca_id=vca_id,
4405 )
4406 )
4407 else:
4408 self.logger.error(
4409 logging_text
4410 + "Unknown k8s deployment type {}".format(
4411 kdu.get("k8scluster-type")
4412 )
4413 )
4414 continue
4415 tasks_dict_info[
4416 task_delete_kdu_instance
4417 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4418
4419 # remove from RO
4420 stage[1] = "Deleting ns from VIM."
4421 if self.ng_ro:
4422 task_delete_ro = asyncio.ensure_future(
4423 self._terminate_ng_ro(
4424 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4425 )
4426 )
4427 else:
4428 task_delete_ro = asyncio.ensure_future(
4429 self._terminate_RO(
4430 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4431 )
4432 )
4433 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4434
4435 # rest of staff will be done at finally
4436
4437 except (
4438 ROclient.ROClientException,
4439 DbException,
4440 LcmException,
4441 N2VCException,
4442 ) as e:
4443 self.logger.error(logging_text + "Exit Exception {}".format(e))
4444 exc = e
4445 except asyncio.CancelledError:
4446 self.logger.error(
4447 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4448 )
4449 exc = "Operation was cancelled"
4450 except Exception as e:
4451 exc = traceback.format_exc()
4452 self.logger.critical(
4453 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4454 exc_info=True,
4455 )
4456 finally:
4457 if exc:
4458 error_list.append(str(exc))
4459 try:
4460 # wait for pending tasks
4461 if tasks_dict_info:
4462 stage[1] = "Waiting for terminate pending tasks."
4463 self.logger.debug(logging_text + stage[1])
4464 error_list += await self._wait_for_tasks(
4465 logging_text,
4466 tasks_dict_info,
4467 timeout_ns_terminate,
4468 stage,
4469 nslcmop_id,
4470 )
4471 stage[1] = stage[2] = ""
4472 except asyncio.CancelledError:
4473 error_list.append("Cancelled")
4474 # TODO cancell all tasks
4475 except Exception as exc:
4476 error_list.append(str(exc))
4477 # update status at database
4478 if error_list:
4479 error_detail = "; ".join(error_list)
4480 # self.logger.error(logging_text + error_detail)
4481 error_description_nslcmop = "{} Detail: {}".format(
4482 stage[0], error_detail
4483 )
4484 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4485 nslcmop_id, stage[0]
4486 )
4487
4488 db_nsr_update["operational-status"] = "failed"
4489 db_nsr_update["detailed-status"] = (
4490 error_description_nsr + " Detail: " + error_detail
4491 )
4492 db_nslcmop_update["detailed-status"] = error_detail
4493 nslcmop_operation_state = "FAILED"
4494 ns_state = "BROKEN"
4495 else:
4496 error_detail = None
4497 error_description_nsr = error_description_nslcmop = None
4498 ns_state = "NOT_INSTANTIATED"
4499 db_nsr_update["operational-status"] = "terminated"
4500 db_nsr_update["detailed-status"] = "Done"
4501 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4502 db_nslcmop_update["detailed-status"] = "Done"
4503 nslcmop_operation_state = "COMPLETED"
4504
4505 if db_nsr:
4506 self._write_ns_status(
4507 nsr_id=nsr_id,
4508 ns_state=ns_state,
4509 current_operation="IDLE",
4510 current_operation_id=None,
4511 error_description=error_description_nsr,
4512 error_detail=error_detail,
4513 other_update=db_nsr_update,
4514 )
4515 self._write_op_status(
4516 op_id=nslcmop_id,
4517 stage="",
4518 error_message=error_description_nslcmop,
4519 operation_state=nslcmop_operation_state,
4520 other_update=db_nslcmop_update,
4521 )
4522 if ns_state == "NOT_INSTANTIATED":
4523 try:
4524 self.db.set_list(
4525 "vnfrs",
4526 {"nsr-id-ref": nsr_id},
4527 {"_admin.nsState": "NOT_INSTANTIATED"},
4528 )
4529 except DbException as e:
4530 self.logger.warn(
4531 logging_text
4532 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4533 nsr_id, e
4534 )
4535 )
4536 if operation_params:
4537 autoremove = operation_params.get("autoremove", False)
4538 if nslcmop_operation_state:
4539 try:
4540 await self.msg.aiowrite(
4541 "ns",
4542 "terminated",
4543 {
4544 "nsr_id": nsr_id,
4545 "nslcmop_id": nslcmop_id,
4546 "operationState": nslcmop_operation_state,
4547 "autoremove": autoremove,
4548 },
4549 loop=self.loop,
4550 )
4551 except Exception as e:
4552 self.logger.error(
4553 logging_text + "kafka_write notification Exception {}".format(e)
4554 )
4555
4556 self.logger.debug(logging_text + "Exit")
4557 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4558
4559 async def _wait_for_tasks(
4560 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4561 ):
4562 time_start = time()
4563 error_detail_list = []
4564 error_list = []
4565 pending_tasks = list(created_tasks_info.keys())
4566 num_tasks = len(pending_tasks)
4567 num_done = 0
4568 stage[1] = "{}/{}.".format(num_done, num_tasks)
4569 self._write_op_status(nslcmop_id, stage)
4570 while pending_tasks:
4571 new_error = None
4572 _timeout = timeout + time_start - time()
4573 done, pending_tasks = await asyncio.wait(
4574 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4575 )
4576 num_done += len(done)
4577 if not done: # Timeout
4578 for task in pending_tasks:
4579 new_error = created_tasks_info[task] + ": Timeout"
4580 error_detail_list.append(new_error)
4581 error_list.append(new_error)
4582 break
4583 for task in done:
4584 if task.cancelled():
4585 exc = "Cancelled"
4586 else:
4587 exc = task.exception()
4588 if exc:
4589 if isinstance(exc, asyncio.TimeoutError):
4590 exc = "Timeout"
4591 new_error = created_tasks_info[task] + ": {}".format(exc)
4592 error_list.append(created_tasks_info[task])
4593 error_detail_list.append(new_error)
4594 if isinstance(
4595 exc,
4596 (
4597 str,
4598 DbException,
4599 N2VCException,
4600 ROclient.ROClientException,
4601 LcmException,
4602 K8sException,
4603 NgRoException,
4604 ),
4605 ):
4606 self.logger.error(logging_text + new_error)
4607 else:
4608 exc_traceback = "".join(
4609 traceback.format_exception(None, exc, exc.__traceback__)
4610 )
4611 self.logger.error(
4612 logging_text
4613 + created_tasks_info[task]
4614 + " "
4615 + exc_traceback
4616 )
4617 else:
4618 self.logger.debug(
4619 logging_text + created_tasks_info[task] + ": Done"
4620 )
4621 stage[1] = "{}/{}.".format(num_done, num_tasks)
4622 if new_error:
4623 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4624 if nsr_id: # update also nsr
4625 self.update_db_2(
4626 "nsrs",
4627 nsr_id,
4628 {
4629 "errorDescription": "Error at: " + ", ".join(error_list),
4630 "errorDetail": ". ".join(error_detail_list),
4631 },
4632 )
4633 self._write_op_status(nslcmop_id, stage)
4634 return error_detail_list
4635
4636 @staticmethod
4637 def _map_primitive_params(primitive_desc, params, instantiation_params):
4638 """
4639 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4640 The default-value is used. If it is between < > it look for a value at instantiation_params
4641 :param primitive_desc: portion of VNFD/NSD that describes primitive
4642 :param params: Params provided by user
4643 :param instantiation_params: Instantiation params provided by user
4644 :return: a dictionary with the calculated params
4645 """
4646 calculated_params = {}
4647 for parameter in primitive_desc.get("parameter", ()):
4648 param_name = parameter["name"]
4649 if param_name in params:
4650 calculated_params[param_name] = params[param_name]
4651 elif "default-value" in parameter or "value" in parameter:
4652 if "value" in parameter:
4653 calculated_params[param_name] = parameter["value"]
4654 else:
4655 calculated_params[param_name] = parameter["default-value"]
4656 if (
4657 isinstance(calculated_params[param_name], str)
4658 and calculated_params[param_name].startswith("<")
4659 and calculated_params[param_name].endswith(">")
4660 ):
4661 if calculated_params[param_name][1:-1] in instantiation_params:
4662 calculated_params[param_name] = instantiation_params[
4663 calculated_params[param_name][1:-1]
4664 ]
4665 else:
4666 raise LcmException(
4667 "Parameter {} needed to execute primitive {} not provided".format(
4668 calculated_params[param_name], primitive_desc["name"]
4669 )
4670 )
4671 else:
4672 raise LcmException(
4673 "Parameter {} needed to execute primitive {} not provided".format(
4674 param_name, primitive_desc["name"]
4675 )
4676 )
4677
4678 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4679 calculated_params[param_name] = yaml.safe_dump(
4680 calculated_params[param_name], default_flow_style=True, width=256
4681 )
4682 elif isinstance(calculated_params[param_name], str) and calculated_params[
4683 param_name
4684 ].startswith("!!yaml "):
4685 calculated_params[param_name] = calculated_params[param_name][7:]
4686 if parameter.get("data-type") == "INTEGER":
4687 try:
4688 calculated_params[param_name] = int(calculated_params[param_name])
4689 except ValueError: # error converting string to int
4690 raise LcmException(
4691 "Parameter {} of primitive {} must be integer".format(
4692 param_name, primitive_desc["name"]
4693 )
4694 )
4695 elif parameter.get("data-type") == "BOOLEAN":
4696 calculated_params[param_name] = not (
4697 (str(calculated_params[param_name])).lower() == "false"
4698 )
4699
4700 # add always ns_config_info if primitive name is config
4701 if primitive_desc["name"] == "config":
4702 if "ns_config_info" in instantiation_params:
4703 calculated_params["ns_config_info"] = instantiation_params[
4704 "ns_config_info"
4705 ]
4706 return calculated_params
4707
4708 def _look_for_deployed_vca(
4709 self,
4710 deployed_vca,
4711 member_vnf_index,
4712 vdu_id,
4713 vdu_count_index,
4714 kdu_name=None,
4715 ee_descriptor_id=None,
4716 ):
4717 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4718 for vca in deployed_vca:
4719 if not vca:
4720 continue
4721 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4722 continue
4723 if (
4724 vdu_count_index is not None
4725 and vdu_count_index != vca["vdu_count_index"]
4726 ):
4727 continue
4728 if kdu_name and kdu_name != vca["kdu_name"]:
4729 continue
4730 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4731 continue
4732 break
4733 else:
4734 # vca_deployed not found
4735 raise LcmException(
4736 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4737 " is not deployed".format(
4738 member_vnf_index,
4739 vdu_id,
4740 vdu_count_index,
4741 kdu_name,
4742 ee_descriptor_id,
4743 )
4744 )
4745 # get ee_id
4746 ee_id = vca.get("ee_id")
4747 vca_type = vca.get(
4748 "type", "lxc_proxy_charm"
4749 ) # default value for backward compatibility - proxy charm
4750 if not ee_id:
4751 raise LcmException(
4752 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4753 "execution environment".format(
4754 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4755 )
4756 )
4757 return ee_id, vca_type
4758
4759 async def _ns_execute_primitive(
4760 self,
4761 ee_id,
4762 primitive,
4763 primitive_params,
4764 retries=0,
4765 retries_interval=30,
4766 timeout=None,
4767 vca_type=None,
4768 db_dict=None,
4769 vca_id: str = None,
4770 ) -> (str, str):
4771 try:
4772 if primitive == "config":
4773 primitive_params = {"params": primitive_params}
4774
4775 vca_type = vca_type or "lxc_proxy_charm"
4776
4777 while retries >= 0:
4778 try:
4779 output = await asyncio.wait_for(
4780 self.vca_map[vca_type].exec_primitive(
4781 ee_id=ee_id,
4782 primitive_name=primitive,
4783 params_dict=primitive_params,
4784 progress_timeout=self.timeout_progress_primitive,
4785 total_timeout=self.timeout_primitive,
4786 db_dict=db_dict,
4787 vca_id=vca_id,
4788 vca_type=vca_type,
4789 ),
4790 timeout=timeout or self.timeout_primitive,
4791 )
4792 # execution was OK
4793 break
4794 except asyncio.CancelledError:
4795 raise
4796 except Exception as e: # asyncio.TimeoutError
4797 if isinstance(e, asyncio.TimeoutError):
4798 e = "Timeout"
4799 retries -= 1
4800 if retries >= 0:
4801 self.logger.debug(
4802 "Error executing action {} on {} -> {}".format(
4803 primitive, ee_id, e
4804 )
4805 )
4806 # wait and retry
4807 await asyncio.sleep(retries_interval, loop=self.loop)
4808 else:
4809 return "FAILED", str(e)
4810
4811 return "COMPLETED", output
4812
4813 except (LcmException, asyncio.CancelledError):
4814 raise
4815 except Exception as e:
4816 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4817
4818 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4819 """
4820 Updating the vca_status with latest juju information in nsrs record
4821 :param: nsr_id: Id of the nsr
4822 :param: nslcmop_id: Id of the nslcmop
4823 :return: None
4824 """
4825
4826 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4827 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4828 vca_id = self.get_vca_id({}, db_nsr)
4829 if db_nsr["_admin"]["deployed"]["K8s"]:
4830 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4831 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4832 await self._on_update_k8s_db(
4833 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4834 )
4835 else:
4836 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4837 table, filter = "nsrs", {"_id": nsr_id}
4838 path = "_admin.deployed.VCA.{}.".format(vca_index)
4839 await self._on_update_n2vc_db(table, filter, path, {})
4840
4841 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4842 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4843
4844 async def action(self, nsr_id, nslcmop_id):
4845 # Try to lock HA task here
4846 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4847 if not task_is_locked_by_me:
4848 return
4849
4850 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4851 self.logger.debug(logging_text + "Enter")
4852 # get all needed from database
4853 db_nsr = None
4854 db_nslcmop = None
4855 db_nsr_update = {}
4856 db_nslcmop_update = {}
4857 nslcmop_operation_state = None
4858 error_description_nslcmop = None
4859 exc = None
4860 try:
4861 # wait for any previous tasks in process
4862 step = "Waiting for previous operations to terminate"
4863 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4864
4865 self._write_ns_status(
4866 nsr_id=nsr_id,
4867 ns_state=None,
4868 current_operation="RUNNING ACTION",
4869 current_operation_id=nslcmop_id,
4870 )
4871
4872 step = "Getting information from database"
4873 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4874 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4875 if db_nslcmop["operationParams"].get("primitive_params"):
4876 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4877 db_nslcmop["operationParams"]["primitive_params"]
4878 )
4879
4880 nsr_deployed = db_nsr["_admin"].get("deployed")
4881 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4882 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4883 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4884 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4885 primitive = db_nslcmop["operationParams"]["primitive"]
4886 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4887 timeout_ns_action = db_nslcmop["operationParams"].get(
4888 "timeout_ns_action", self.timeout_primitive
4889 )
4890
4891 if vnf_index:
4892 step = "Getting vnfr from database"
4893 db_vnfr = self.db.get_one(
4894 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4895 )
4896 if db_vnfr.get("kdur"):
4897 kdur_list = []
4898 for kdur in db_vnfr["kdur"]:
4899 if kdur.get("additionalParams"):
4900 kdur["additionalParams"] = json.loads(
4901 kdur["additionalParams"]
4902 )
4903 kdur_list.append(kdur)
4904 db_vnfr["kdur"] = kdur_list
4905 step = "Getting vnfd from database"
4906 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4907 else:
4908 step = "Getting nsd from database"
4909 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4910
4911 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4912 # for backward compatibility
4913 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4914 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4915 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4916 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4917
4918 # look for primitive
4919 config_primitive_desc = descriptor_configuration = None
4920 if vdu_id:
4921 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4922 elif kdu_name:
4923 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4924 elif vnf_index:
4925 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4926 else:
4927 descriptor_configuration = db_nsd.get("ns-configuration")
4928
4929 if descriptor_configuration and descriptor_configuration.get(
4930 "config-primitive"
4931 ):
4932 for config_primitive in descriptor_configuration["config-primitive"]:
4933 if config_primitive["name"] == primitive:
4934 config_primitive_desc = config_primitive
4935 break
4936
4937 if not config_primitive_desc:
4938 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4939 raise LcmException(
4940 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4941 primitive
4942 )
4943 )
4944 primitive_name = primitive
4945 ee_descriptor_id = None
4946 else:
4947 primitive_name = config_primitive_desc.get(
4948 "execution-environment-primitive", primitive
4949 )
4950 ee_descriptor_id = config_primitive_desc.get(
4951 "execution-environment-ref"
4952 )
4953
4954 if vnf_index:
4955 if vdu_id:
4956 vdur = next(
4957 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4958 )
4959 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4960 elif kdu_name:
4961 kdur = next(
4962 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4963 )
4964 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4965 else:
4966 desc_params = parse_yaml_strings(
4967 db_vnfr.get("additionalParamsForVnf")
4968 )
4969 else:
4970 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4971 if kdu_name and get_configuration(db_vnfd, kdu_name):
4972 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4973 actions = set()
4974 for primitive in kdu_configuration.get("initial-config-primitive", []):
4975 actions.add(primitive["name"])
4976 for primitive in kdu_configuration.get("config-primitive", []):
4977 actions.add(primitive["name"])
4978 kdu_action = True if primitive_name in actions else False
4979
4980 # TODO check if ns is in a proper status
4981 if kdu_name and (
4982 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4983 ):
4984 # kdur and desc_params already set from before
4985 if primitive_params:
4986 desc_params.update(primitive_params)
4987 # TODO Check if we will need something at vnf level
4988 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4989 if (
4990 kdu_name == kdu["kdu-name"]
4991 and kdu["member-vnf-index"] == vnf_index
4992 ):
4993 break
4994 else:
4995 raise LcmException(
4996 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4997 )
4998
4999 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5000 msg = "unknown k8scluster-type '{}'".format(
5001 kdu.get("k8scluster-type")
5002 )
5003 raise LcmException(msg)
5004
5005 db_dict = {
5006 "collection": "nsrs",
5007 "filter": {"_id": nsr_id},
5008 "path": "_admin.deployed.K8s.{}".format(index),
5009 }
5010 self.logger.debug(
5011 logging_text
5012 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5013 )
5014 step = "Executing kdu {}".format(primitive_name)
5015 if primitive_name == "upgrade":
5016 if desc_params.get("kdu_model"):
5017 kdu_model = desc_params.get("kdu_model")
5018 del desc_params["kdu_model"]
5019 else:
5020 kdu_model = kdu.get("kdu-model")
5021 parts = kdu_model.split(sep=":")
5022 if len(parts) == 2:
5023 kdu_model = parts[0]
5024
5025 detailed_status = await asyncio.wait_for(
5026 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5027 cluster_uuid=kdu.get("k8scluster-uuid"),
5028 kdu_instance=kdu.get("kdu-instance"),
5029 atomic=True,
5030 kdu_model=kdu_model,
5031 params=desc_params,
5032 db_dict=db_dict,
5033 timeout=timeout_ns_action,
5034 ),
5035 timeout=timeout_ns_action + 10,
5036 )
5037 self.logger.debug(
5038 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5039 )
5040 elif primitive_name == "rollback":
5041 detailed_status = await asyncio.wait_for(
5042 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5043 cluster_uuid=kdu.get("k8scluster-uuid"),
5044 kdu_instance=kdu.get("kdu-instance"),
5045 db_dict=db_dict,
5046 ),
5047 timeout=timeout_ns_action,
5048 )
5049 elif primitive_name == "status":
5050 detailed_status = await asyncio.wait_for(
5051 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5052 cluster_uuid=kdu.get("k8scluster-uuid"),
5053 kdu_instance=kdu.get("kdu-instance"),
5054 vca_id=vca_id,
5055 ),
5056 timeout=timeout_ns_action,
5057 )
5058 else:
5059 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5060 kdu["kdu-name"], nsr_id
5061 )
5062 params = self._map_primitive_params(
5063 config_primitive_desc, primitive_params, desc_params
5064 )
5065
5066 detailed_status = await asyncio.wait_for(
5067 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5068 cluster_uuid=kdu.get("k8scluster-uuid"),
5069 kdu_instance=kdu_instance,
5070 primitive_name=primitive_name,
5071 params=params,
5072 db_dict=db_dict,
5073 timeout=timeout_ns_action,
5074 vca_id=vca_id,
5075 ),
5076 timeout=timeout_ns_action,
5077 )
5078
5079 if detailed_status:
5080 nslcmop_operation_state = "COMPLETED"
5081 else:
5082 detailed_status = ""
5083 nslcmop_operation_state = "FAILED"
5084 else:
5085 ee_id, vca_type = self._look_for_deployed_vca(
5086 nsr_deployed["VCA"],
5087 member_vnf_index=vnf_index,
5088 vdu_id=vdu_id,
5089 vdu_count_index=vdu_count_index,
5090 ee_descriptor_id=ee_descriptor_id,
5091 )
5092 for vca_index, vca_deployed in enumerate(
5093 db_nsr["_admin"]["deployed"]["VCA"]
5094 ):
5095 if vca_deployed.get("member-vnf-index") == vnf_index:
5096 db_dict = {
5097 "collection": "nsrs",
5098 "filter": {"_id": nsr_id},
5099 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5100 }
5101 break
5102 (
5103 nslcmop_operation_state,
5104 detailed_status,
5105 ) = await self._ns_execute_primitive(
5106 ee_id,
5107 primitive=primitive_name,
5108 primitive_params=self._map_primitive_params(
5109 config_primitive_desc, primitive_params, desc_params
5110 ),
5111 timeout=timeout_ns_action,
5112 vca_type=vca_type,
5113 db_dict=db_dict,
5114 vca_id=vca_id,
5115 )
5116
5117 db_nslcmop_update["detailed-status"] = detailed_status
5118 error_description_nslcmop = (
5119 detailed_status if nslcmop_operation_state == "FAILED" else ""
5120 )
5121 self.logger.debug(
5122 logging_text
5123 + " task Done with result {} {}".format(
5124 nslcmop_operation_state, detailed_status
5125 )
5126 )
5127 return # database update is called inside finally
5128
5129 except (DbException, LcmException, N2VCException, K8sException) as e:
5130 self.logger.error(logging_text + "Exit Exception {}".format(e))
5131 exc = e
5132 except asyncio.CancelledError:
5133 self.logger.error(
5134 logging_text + "Cancelled Exception while '{}'".format(step)
5135 )
5136 exc = "Operation was cancelled"
5137 except asyncio.TimeoutError:
5138 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5139 exc = "Timeout"
5140 except Exception as e:
5141 exc = traceback.format_exc()
5142 self.logger.critical(
5143 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5144 exc_info=True,
5145 )
5146 finally:
5147 if exc:
5148 db_nslcmop_update[
5149 "detailed-status"
5150 ] = (
5151 detailed_status
5152 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5153 nslcmop_operation_state = "FAILED"
5154 if db_nsr:
5155 self._write_ns_status(
5156 nsr_id=nsr_id,
5157 ns_state=db_nsr[
5158 "nsState"
5159 ], # TODO check if degraded. For the moment use previous status
5160 current_operation="IDLE",
5161 current_operation_id=None,
5162 # error_description=error_description_nsr,
5163 # error_detail=error_detail,
5164 other_update=db_nsr_update,
5165 )
5166
5167 self._write_op_status(
5168 op_id=nslcmop_id,
5169 stage="",
5170 error_message=error_description_nslcmop,
5171 operation_state=nslcmop_operation_state,
5172 other_update=db_nslcmop_update,
5173 )
5174
5175 if nslcmop_operation_state:
5176 try:
5177 await self.msg.aiowrite(
5178 "ns",
5179 "actioned",
5180 {
5181 "nsr_id": nsr_id,
5182 "nslcmop_id": nslcmop_id,
5183 "operationState": nslcmop_operation_state,
5184 },
5185 loop=self.loop,
5186 )
5187 except Exception as e:
5188 self.logger.error(
5189 logging_text + "kafka_write notification Exception {}".format(e)
5190 )
5191 self.logger.debug(logging_text + "Exit")
5192 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5193 return nslcmop_operation_state, detailed_status
5194
5195 async def scale(self, nsr_id, nslcmop_id):
5196 # Try to lock HA task here
5197 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5198 if not task_is_locked_by_me:
5199 return
5200
5201 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5202 stage = ["", "", ""]
5203 tasks_dict_info = {}
5204 # ^ stage, step, VIM progress
5205 self.logger.debug(logging_text + "Enter")
5206 # get all needed from database
5207 db_nsr = None
5208 db_nslcmop_update = {}
5209 db_nsr_update = {}
5210 exc = None
5211 # in case of error, indicates what part of scale was failed to put nsr at error status
5212 scale_process = None
5213 old_operational_status = ""
5214 old_config_status = ""
5215 nsi_id = None
5216 try:
5217 # wait for any previous tasks in process
5218 step = "Waiting for previous operations to terminate"
5219 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5220 self._write_ns_status(
5221 nsr_id=nsr_id,
5222 ns_state=None,
5223 current_operation="SCALING",
5224 current_operation_id=nslcmop_id,
5225 )
5226
5227 step = "Getting nslcmop from database"
5228 self.logger.debug(
5229 step + " after having waited for previous tasks to be completed"
5230 )
5231 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5232
5233 step = "Getting nsr from database"
5234 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5235 old_operational_status = db_nsr["operational-status"]
5236 old_config_status = db_nsr["config-status"]
5237
5238 step = "Parsing scaling parameters"
5239 db_nsr_update["operational-status"] = "scaling"
5240 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5241 nsr_deployed = db_nsr["_admin"].get("deployed")
5242
5243 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5244 "scaleByStepData"
5245 ]["member-vnf-index"]
5246 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5247 "scaleByStepData"
5248 ]["scaling-group-descriptor"]
5249 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5250 # for backward compatibility
5251 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5252 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5253 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5254 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5255
5256 step = "Getting vnfr from database"
5257 db_vnfr = self.db.get_one(
5258 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5259 )
5260
5261 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5262
5263 step = "Getting vnfd from database"
5264 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5265
5266 base_folder = db_vnfd["_admin"]["storage"]
5267
5268 step = "Getting scaling-group-descriptor"
5269 scaling_descriptor = find_in_list(
5270 get_scaling_aspect(db_vnfd),
5271 lambda scale_desc: scale_desc["name"] == scaling_group,
5272 )
5273 if not scaling_descriptor:
5274 raise LcmException(
5275 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5276 "at vnfd:scaling-group-descriptor".format(scaling_group)
5277 )
5278
5279 step = "Sending scale order to VIM"
5280 # TODO check if ns is in a proper status
5281 nb_scale_op = 0
5282 if not db_nsr["_admin"].get("scaling-group"):
5283 self.update_db_2(
5284 "nsrs",
5285 nsr_id,
5286 {
5287 "_admin.scaling-group": [
5288 {"name": scaling_group, "nb-scale-op": 0}
5289 ]
5290 },
5291 )
5292 admin_scale_index = 0
5293 else:
5294 for admin_scale_index, admin_scale_info in enumerate(
5295 db_nsr["_admin"]["scaling-group"]
5296 ):
5297 if admin_scale_info["name"] == scaling_group:
5298 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5299 break
5300 else: # not found, set index one plus last element and add new entry with the name
5301 admin_scale_index += 1
5302 db_nsr_update[
5303 "_admin.scaling-group.{}.name".format(admin_scale_index)
5304 ] = scaling_group
5305
5306 vca_scaling_info = []
5307 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5308 if scaling_type == "SCALE_OUT":
5309 if "aspect-delta-details" not in scaling_descriptor:
5310 raise LcmException(
5311 "Aspect delta details not fount in scaling descriptor {}".format(
5312 scaling_descriptor["name"]
5313 )
5314 )
5315 # count if max-instance-count is reached
5316 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5317
5318 scaling_info["scaling_direction"] = "OUT"
5319 scaling_info["vdu-create"] = {}
5320 scaling_info["kdu-create"] = {}
5321 for delta in deltas:
5322 for vdu_delta in delta.get("vdu-delta", {}):
5323 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5324 # vdu_index also provides the number of instance of the targeted vdu
5325 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5326 cloud_init_text = self._get_vdu_cloud_init_content(
5327 vdud, db_vnfd
5328 )
5329 if cloud_init_text:
5330 additional_params = (
5331 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5332 or {}
5333 )
5334 cloud_init_list = []
5335
5336 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5337 max_instance_count = 10
5338 if vdu_profile and "max-number-of-instances" in vdu_profile:
5339 max_instance_count = vdu_profile.get(
5340 "max-number-of-instances", 10
5341 )
5342
5343 default_instance_num = get_number_of_instances(
5344 db_vnfd, vdud["id"]
5345 )
5346 instances_number = vdu_delta.get("number-of-instances", 1)
5347 nb_scale_op += instances_number
5348
5349 new_instance_count = nb_scale_op + default_instance_num
5350 # Control if new count is over max and vdu count is less than max.
5351 # Then assign new instance count
5352 if new_instance_count > max_instance_count > vdu_count:
5353 instances_number = new_instance_count - max_instance_count
5354 else:
5355 instances_number = instances_number
5356
5357 if new_instance_count > max_instance_count:
5358 raise LcmException(
5359 "reached the limit of {} (max-instance-count) "
5360 "scaling-out operations for the "
5361 "scaling-group-descriptor '{}'".format(
5362 nb_scale_op, scaling_group
5363 )
5364 )
5365 for x in range(vdu_delta.get("number-of-instances", 1)):
5366 if cloud_init_text:
5367 # TODO Information of its own ip is not available because db_vnfr is not updated.
5368 additional_params["OSM"] = get_osm_params(
5369 db_vnfr, vdu_delta["id"], vdu_index + x
5370 )
5371 cloud_init_list.append(
5372 self._parse_cloud_init(
5373 cloud_init_text,
5374 additional_params,
5375 db_vnfd["id"],
5376 vdud["id"],
5377 )
5378 )
5379 vca_scaling_info.append(
5380 {
5381 "osm_vdu_id": vdu_delta["id"],
5382 "member-vnf-index": vnf_index,
5383 "type": "create",
5384 "vdu_index": vdu_index + x,
5385 }
5386 )
5387 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5388 for kdu_delta in delta.get("kdu-resource-delta", {}):
5389 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5390 kdu_name = kdu_profile["kdu-name"]
5391 resource_name = kdu_profile["resource-name"]
5392
5393 # Might have different kdus in the same delta
5394 # Should have list for each kdu
5395 if not scaling_info["kdu-create"].get(kdu_name, None):
5396 scaling_info["kdu-create"][kdu_name] = []
5397
5398 kdur = get_kdur(db_vnfr, kdu_name)
5399 if kdur.get("helm-chart"):
5400 k8s_cluster_type = "helm-chart-v3"
5401 self.logger.debug("kdur: {}".format(kdur))
5402 if (
5403 kdur.get("helm-version")
5404 and kdur.get("helm-version") == "v2"
5405 ):
5406 k8s_cluster_type = "helm-chart"
5407 raise NotImplementedError
5408 elif kdur.get("juju-bundle"):
5409 k8s_cluster_type = "juju-bundle"
5410 else:
5411 raise LcmException(
5412 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5413 "juju-bundle. Maybe an old NBI version is running".format(
5414 db_vnfr["member-vnf-index-ref"], kdu_name
5415 )
5416 )
5417
5418 max_instance_count = 10
5419 if kdu_profile and "max-number-of-instances" in kdu_profile:
5420 max_instance_count = kdu_profile.get(
5421 "max-number-of-instances", 10
5422 )
5423
5424 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5425 deployed_kdu, _ = get_deployed_kdu(
5426 nsr_deployed, kdu_name, vnf_index
5427 )
5428 if deployed_kdu is None:
5429 raise LcmException(
5430 "KDU '{}' for vnf '{}' not deployed".format(
5431 kdu_name, vnf_index
5432 )
5433 )
5434 kdu_instance = deployed_kdu.get("kdu-instance")
5435 instance_num = await self.k8scluster_map[
5436 k8s_cluster_type
5437 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5438 kdu_replica_count = instance_num + kdu_delta.get(
5439 "number-of-instances", 1
5440 )
5441
5442 # Control if new count is over max and instance_num is less than max.
5443 # Then assign max instance number to kdu replica count
5444 if kdu_replica_count > max_instance_count > instance_num:
5445 kdu_replica_count = max_instance_count
5446 if kdu_replica_count > max_instance_count:
5447 raise LcmException(
5448 "reached the limit of {} (max-instance-count) "
5449 "scaling-out operations for the "
5450 "scaling-group-descriptor '{}'".format(
5451 instance_num, scaling_group
5452 )
5453 )
5454
5455 for x in range(kdu_delta.get("number-of-instances", 1)):
5456 vca_scaling_info.append(
5457 {
5458 "osm_kdu_id": kdu_name,
5459 "member-vnf-index": vnf_index,
5460 "type": "create",
5461 "kdu_index": instance_num + x - 1,
5462 }
5463 )
5464 scaling_info["kdu-create"][kdu_name].append(
5465 {
5466 "member-vnf-index": vnf_index,
5467 "type": "create",
5468 "k8s-cluster-type": k8s_cluster_type,
5469 "resource-name": resource_name,
5470 "scale": kdu_replica_count,
5471 }
5472 )
5473 elif scaling_type == "SCALE_IN":
5474 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5475
5476 scaling_info["scaling_direction"] = "IN"
5477 scaling_info["vdu-delete"] = {}
5478 scaling_info["kdu-delete"] = {}
5479
5480 for delta in deltas:
5481 for vdu_delta in delta.get("vdu-delta", {}):
5482 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5483 min_instance_count = 0
5484 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5485 if vdu_profile and "min-number-of-instances" in vdu_profile:
5486 min_instance_count = vdu_profile["min-number-of-instances"]
5487
5488 default_instance_num = get_number_of_instances(
5489 db_vnfd, vdu_delta["id"]
5490 )
5491 instance_num = vdu_delta.get("number-of-instances", 1)
5492 nb_scale_op -= instance_num
5493
5494 new_instance_count = nb_scale_op + default_instance_num
5495
5496 if new_instance_count < min_instance_count < vdu_count:
5497 instances_number = min_instance_count - new_instance_count
5498 else:
5499 instances_number = instance_num
5500
5501 if new_instance_count < min_instance_count:
5502 raise LcmException(
5503 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5504 "scaling-group-descriptor '{}'".format(
5505 nb_scale_op, scaling_group
5506 )
5507 )
5508 for x in range(vdu_delta.get("number-of-instances", 1)):
5509 vca_scaling_info.append(
5510 {
5511 "osm_vdu_id": vdu_delta["id"],
5512 "member-vnf-index": vnf_index,
5513 "type": "delete",
5514 "vdu_index": vdu_index - 1 - x,
5515 }
5516 )
5517 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5518 for kdu_delta in delta.get("kdu-resource-delta", {}):
5519 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5520 kdu_name = kdu_profile["kdu-name"]
5521 resource_name = kdu_profile["resource-name"]
5522
5523 if not scaling_info["kdu-delete"].get(kdu_name, None):
5524 scaling_info["kdu-delete"][kdu_name] = []
5525
5526 kdur = get_kdur(db_vnfr, kdu_name)
5527 if kdur.get("helm-chart"):
5528 k8s_cluster_type = "helm-chart-v3"
5529 self.logger.debug("kdur: {}".format(kdur))
5530 if (
5531 kdur.get("helm-version")
5532 and kdur.get("helm-version") == "v2"
5533 ):
5534 k8s_cluster_type = "helm-chart"
5535 raise NotImplementedError
5536 elif kdur.get("juju-bundle"):
5537 k8s_cluster_type = "juju-bundle"
5538 else:
5539 raise LcmException(
5540 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5541 "juju-bundle. Maybe an old NBI version is running".format(
5542 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5543 )
5544 )
5545
5546 min_instance_count = 0
5547 if kdu_profile and "min-number-of-instances" in kdu_profile:
5548 min_instance_count = kdu_profile["min-number-of-instances"]
5549
5550 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5551 deployed_kdu, _ = get_deployed_kdu(
5552 nsr_deployed, kdu_name, vnf_index
5553 )
5554 if deployed_kdu is None:
5555 raise LcmException(
5556 "KDU '{}' for vnf '{}' not deployed".format(
5557 kdu_name, vnf_index
5558 )
5559 )
5560 kdu_instance = deployed_kdu.get("kdu-instance")
5561 instance_num = await self.k8scluster_map[
5562 k8s_cluster_type
5563 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5564 kdu_replica_count = instance_num - kdu_delta.get(
5565 "number-of-instances", 1
5566 )
5567
5568 if kdu_replica_count < min_instance_count < instance_num:
5569 kdu_replica_count = min_instance_count
5570 if kdu_replica_count < min_instance_count:
5571 raise LcmException(
5572 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5573 "scaling-group-descriptor '{}'".format(
5574 instance_num, scaling_group
5575 )
5576 )
5577
5578 for x in range(kdu_delta.get("number-of-instances", 1)):
5579 vca_scaling_info.append(
5580 {
5581 "osm_kdu_id": kdu_name,
5582 "member-vnf-index": vnf_index,
5583 "type": "delete",
5584 "kdu_index": instance_num - x - 1,
5585 }
5586 )
5587 scaling_info["kdu-delete"][kdu_name].append(
5588 {
5589 "member-vnf-index": vnf_index,
5590 "type": "delete",
5591 "k8s-cluster-type": k8s_cluster_type,
5592 "resource-name": resource_name,
5593 "scale": kdu_replica_count,
5594 }
5595 )
5596
5597 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5598 vdu_delete = copy(scaling_info.get("vdu-delete"))
5599 if scaling_info["scaling_direction"] == "IN":
5600 for vdur in reversed(db_vnfr["vdur"]):
5601 if vdu_delete.get(vdur["vdu-id-ref"]):
5602 vdu_delete[vdur["vdu-id-ref"]] -= 1
5603 scaling_info["vdu"].append(
5604 {
5605 "name": vdur.get("name") or vdur.get("vdu-name"),
5606 "vdu_id": vdur["vdu-id-ref"],
5607 "interface": [],
5608 }
5609 )
5610 for interface in vdur["interfaces"]:
5611 scaling_info["vdu"][-1]["interface"].append(
5612 {
5613 "name": interface["name"],
5614 "ip_address": interface["ip-address"],
5615 "mac_address": interface.get("mac-address"),
5616 }
5617 )
5618 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5619
5620 # PRE-SCALE BEGIN
5621 step = "Executing pre-scale vnf-config-primitive"
5622 if scaling_descriptor.get("scaling-config-action"):
5623 for scaling_config_action in scaling_descriptor[
5624 "scaling-config-action"
5625 ]:
5626 if (
5627 scaling_config_action.get("trigger") == "pre-scale-in"
5628 and scaling_type == "SCALE_IN"
5629 ) or (
5630 scaling_config_action.get("trigger") == "pre-scale-out"
5631 and scaling_type == "SCALE_OUT"
5632 ):
5633 vnf_config_primitive = scaling_config_action[
5634 "vnf-config-primitive-name-ref"
5635 ]
5636 step = db_nslcmop_update[
5637 "detailed-status"
5638 ] = "executing pre-scale scaling-config-action '{}'".format(
5639 vnf_config_primitive
5640 )
5641
5642 # look for primitive
5643 for config_primitive in (
5644 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5645 ).get("config-primitive", ()):
5646 if config_primitive["name"] == vnf_config_primitive:
5647 break
5648 else:
5649 raise LcmException(
5650 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5651 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5652 "primitive".format(scaling_group, vnf_config_primitive)
5653 )
5654
5655 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5656 if db_vnfr.get("additionalParamsForVnf"):
5657 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5658
5659 scale_process = "VCA"
5660 db_nsr_update["config-status"] = "configuring pre-scaling"
5661 primitive_params = self._map_primitive_params(
5662 config_primitive, {}, vnfr_params
5663 )
5664
5665 # Pre-scale retry check: Check if this sub-operation has been executed before
5666 op_index = self._check_or_add_scale_suboperation(
5667 db_nslcmop,
5668 vnf_index,
5669 vnf_config_primitive,
5670 primitive_params,
5671 "PRE-SCALE",
5672 )
5673 if op_index == self.SUBOPERATION_STATUS_SKIP:
5674 # Skip sub-operation
5675 result = "COMPLETED"
5676 result_detail = "Done"
5677 self.logger.debug(
5678 logging_text
5679 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5680 vnf_config_primitive, result, result_detail
5681 )
5682 )
5683 else:
5684 if op_index == self.SUBOPERATION_STATUS_NEW:
5685 # New sub-operation: Get index of this sub-operation
5686 op_index = (
5687 len(db_nslcmop.get("_admin", {}).get("operations"))
5688 - 1
5689 )
5690 self.logger.debug(
5691 logging_text
5692 + "vnf_config_primitive={} New sub-operation".format(
5693 vnf_config_primitive
5694 )
5695 )
5696 else:
5697 # retry: Get registered params for this existing sub-operation
5698 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5699 op_index
5700 ]
5701 vnf_index = op.get("member_vnf_index")
5702 vnf_config_primitive = op.get("primitive")
5703 primitive_params = op.get("primitive_params")
5704 self.logger.debug(
5705 logging_text
5706 + "vnf_config_primitive={} Sub-operation retry".format(
5707 vnf_config_primitive
5708 )
5709 )
5710 # Execute the primitive, either with new (first-time) or registered (reintent) args
5711 ee_descriptor_id = config_primitive.get(
5712 "execution-environment-ref"
5713 )
5714 primitive_name = config_primitive.get(
5715 "execution-environment-primitive", vnf_config_primitive
5716 )
5717 ee_id, vca_type = self._look_for_deployed_vca(
5718 nsr_deployed["VCA"],
5719 member_vnf_index=vnf_index,
5720 vdu_id=None,
5721 vdu_count_index=None,
5722 ee_descriptor_id=ee_descriptor_id,
5723 )
5724 result, result_detail = await self._ns_execute_primitive(
5725 ee_id,
5726 primitive_name,
5727 primitive_params,
5728 vca_type=vca_type,
5729 vca_id=vca_id,
5730 )
5731 self.logger.debug(
5732 logging_text
5733 + "vnf_config_primitive={} Done with result {} {}".format(
5734 vnf_config_primitive, result, result_detail
5735 )
5736 )
5737 # Update operationState = COMPLETED | FAILED
5738 self._update_suboperation_status(
5739 db_nslcmop, op_index, result, result_detail
5740 )
5741
5742 if result == "FAILED":
5743 raise LcmException(result_detail)
5744 db_nsr_update["config-status"] = old_config_status
5745 scale_process = None
5746 # PRE-SCALE END
5747
5748 db_nsr_update[
5749 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5750 ] = nb_scale_op
5751 db_nsr_update[
5752 "_admin.scaling-group.{}.time".format(admin_scale_index)
5753 ] = time()
5754
5755 # SCALE-IN VCA - BEGIN
5756 if vca_scaling_info:
5757 step = db_nslcmop_update[
5758 "detailed-status"
5759 ] = "Deleting the execution environments"
5760 scale_process = "VCA"
5761 for vca_info in vca_scaling_info:
5762 if vca_info["type"] == "delete":
5763 member_vnf_index = str(vca_info["member-vnf-index"])
5764 self.logger.debug(
5765 logging_text + "vdu info: {}".format(vca_info)
5766 )
5767 if vca_info.get("osm_vdu_id"):
5768 vdu_id = vca_info["osm_vdu_id"]
5769 vdu_index = int(vca_info["vdu_index"])
5770 stage[
5771 1
5772 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5773 member_vnf_index, vdu_id, vdu_index
5774 )
5775 else:
5776 vdu_index = 0
5777 kdu_id = vca_info["osm_kdu_id"]
5778 stage[
5779 1
5780 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5781 member_vnf_index, kdu_id, vdu_index
5782 )
5783 stage[2] = step = "Scaling in VCA"
5784 self._write_op_status(op_id=nslcmop_id, stage=stage)
5785 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5786 config_update = db_nsr["configurationStatus"]
5787 for vca_index, vca in enumerate(vca_update):
5788 if (
5789 (vca or vca.get("ee_id"))
5790 and vca["member-vnf-index"] == member_vnf_index
5791 and vca["vdu_count_index"] == vdu_index
5792 ):
5793 if vca.get("vdu_id"):
5794 config_descriptor = get_configuration(
5795 db_vnfd, vca.get("vdu_id")
5796 )
5797 elif vca.get("kdu_name"):
5798 config_descriptor = get_configuration(
5799 db_vnfd, vca.get("kdu_name")
5800 )
5801 else:
5802 config_descriptor = get_configuration(
5803 db_vnfd, db_vnfd["id"]
5804 )
5805 operation_params = (
5806 db_nslcmop.get("operationParams") or {}
5807 )
5808 exec_terminate_primitives = not operation_params.get(
5809 "skip_terminate_primitives"
5810 ) and vca.get("needed_terminate")
5811 task = asyncio.ensure_future(
5812 asyncio.wait_for(
5813 self.destroy_N2VC(
5814 logging_text,
5815 db_nslcmop,
5816 vca,
5817 config_descriptor,
5818 vca_index,
5819 destroy_ee=True,
5820 exec_primitives=exec_terminate_primitives,
5821 scaling_in=True,
5822 vca_id=vca_id,
5823 ),
5824 timeout=self.timeout_charm_delete,
5825 )
5826 )
5827 tasks_dict_info[task] = "Terminating VCA {}".format(
5828 vca.get("ee_id")
5829 )
5830 del vca_update[vca_index]
5831 del config_update[vca_index]
5832 # wait for pending tasks of terminate primitives
5833 if tasks_dict_info:
5834 self.logger.debug(
5835 logging_text
5836 + "Waiting for tasks {}".format(
5837 list(tasks_dict_info.keys())
5838 )
5839 )
5840 error_list = await self._wait_for_tasks(
5841 logging_text,
5842 tasks_dict_info,
5843 min(
5844 self.timeout_charm_delete, self.timeout_ns_terminate
5845 ),
5846 stage,
5847 nslcmop_id,
5848 )
5849 tasks_dict_info.clear()
5850 if error_list:
5851 raise LcmException("; ".join(error_list))
5852
5853 db_vca_and_config_update = {
5854 "_admin.deployed.VCA": vca_update,
5855 "configurationStatus": config_update,
5856 }
5857 self.update_db_2(
5858 "nsrs", db_nsr["_id"], db_vca_and_config_update
5859 )
5860 scale_process = None
5861 # SCALE-IN VCA - END
5862
5863 # SCALE RO - BEGIN
5864 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5865 scale_process = "RO"
5866 if self.ro_config.get("ng"):
5867 await self._scale_ng_ro(
5868 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5869 )
5870 scaling_info.pop("vdu-create", None)
5871 scaling_info.pop("vdu-delete", None)
5872
5873 scale_process = None
5874 # SCALE RO - END
5875
5876 # SCALE KDU - BEGIN
5877 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5878 scale_process = "KDU"
5879 await self._scale_kdu(
5880 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5881 )
5882 scaling_info.pop("kdu-create", None)
5883 scaling_info.pop("kdu-delete", None)
5884
5885 scale_process = None
5886 # SCALE KDU - END
5887
5888 if db_nsr_update:
5889 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5890
5891 # SCALE-UP VCA - BEGIN
5892 if vca_scaling_info:
5893 step = db_nslcmop_update[
5894 "detailed-status"
5895 ] = "Creating new execution environments"
5896 scale_process = "VCA"
5897 for vca_info in vca_scaling_info:
5898 if vca_info["type"] == "create":
5899 member_vnf_index = str(vca_info["member-vnf-index"])
5900 self.logger.debug(
5901 logging_text + "vdu info: {}".format(vca_info)
5902 )
5903 vnfd_id = db_vnfr["vnfd-ref"]
5904 if vca_info.get("osm_vdu_id"):
5905 vdu_index = int(vca_info["vdu_index"])
5906 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5907 if db_vnfr.get("additionalParamsForVnf"):
5908 deploy_params.update(
5909 parse_yaml_strings(
5910 db_vnfr["additionalParamsForVnf"].copy()
5911 )
5912 )
5913 descriptor_config = get_configuration(
5914 db_vnfd, db_vnfd["id"]
5915 )
5916 if descriptor_config:
5917 vdu_id = None
5918 vdu_name = None
5919 kdu_name = None
5920 self._deploy_n2vc(
5921 logging_text=logging_text
5922 + "member_vnf_index={} ".format(member_vnf_index),
5923 db_nsr=db_nsr,
5924 db_vnfr=db_vnfr,
5925 nslcmop_id=nslcmop_id,
5926 nsr_id=nsr_id,
5927 nsi_id=nsi_id,
5928 vnfd_id=vnfd_id,
5929 vdu_id=vdu_id,
5930 kdu_name=kdu_name,
5931 member_vnf_index=member_vnf_index,
5932 vdu_index=vdu_index,
5933 vdu_name=vdu_name,
5934 deploy_params=deploy_params,
5935 descriptor_config=descriptor_config,
5936 base_folder=base_folder,
5937 task_instantiation_info=tasks_dict_info,
5938 stage=stage,
5939 )
5940 vdu_id = vca_info["osm_vdu_id"]
5941 vdur = find_in_list(
5942 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5943 )
5944 descriptor_config = get_configuration(db_vnfd, vdu_id)
5945 if vdur.get("additionalParams"):
5946 deploy_params_vdu = parse_yaml_strings(
5947 vdur["additionalParams"]
5948 )
5949 else:
5950 deploy_params_vdu = deploy_params
5951 deploy_params_vdu["OSM"] = get_osm_params(
5952 db_vnfr, vdu_id, vdu_count_index=vdu_index
5953 )
5954 if descriptor_config:
5955 vdu_name = None
5956 kdu_name = None
5957 stage[
5958 1
5959 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5960 member_vnf_index, vdu_id, vdu_index
5961 )
5962 stage[2] = step = "Scaling out VCA"
5963 self._write_op_status(op_id=nslcmop_id, stage=stage)
5964 self._deploy_n2vc(
5965 logging_text=logging_text
5966 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5967 member_vnf_index, vdu_id, vdu_index
5968 ),
5969 db_nsr=db_nsr,
5970 db_vnfr=db_vnfr,
5971 nslcmop_id=nslcmop_id,
5972 nsr_id=nsr_id,
5973 nsi_id=nsi_id,
5974 vnfd_id=vnfd_id,
5975 vdu_id=vdu_id,
5976 kdu_name=kdu_name,
5977 member_vnf_index=member_vnf_index,
5978 vdu_index=vdu_index,
5979 vdu_name=vdu_name,
5980 deploy_params=deploy_params_vdu,
5981 descriptor_config=descriptor_config,
5982 base_folder=base_folder,
5983 task_instantiation_info=tasks_dict_info,
5984 stage=stage,
5985 )
5986 else:
5987 kdu_name = vca_info["osm_kdu_id"]
5988 descriptor_config = get_configuration(db_vnfd, kdu_name)
5989 if descriptor_config:
5990 vdu_id = None
5991 kdu_index = int(vca_info["kdu_index"])
5992 vdu_name = None
5993 kdur = next(
5994 x
5995 for x in db_vnfr["kdur"]
5996 if x["kdu-name"] == kdu_name
5997 )
5998 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5999 if kdur.get("additionalParams"):
6000 deploy_params_kdu = parse_yaml_strings(
6001 kdur["additionalParams"]
6002 )
6003
6004 self._deploy_n2vc(
6005 logging_text=logging_text,
6006 db_nsr=db_nsr,
6007 db_vnfr=db_vnfr,
6008 nslcmop_id=nslcmop_id,
6009 nsr_id=nsr_id,
6010 nsi_id=nsi_id,
6011 vnfd_id=vnfd_id,
6012 vdu_id=vdu_id,
6013 kdu_name=kdu_name,
6014 member_vnf_index=member_vnf_index,
6015 vdu_index=kdu_index,
6016 vdu_name=vdu_name,
6017 deploy_params=deploy_params_kdu,
6018 descriptor_config=descriptor_config,
6019 base_folder=base_folder,
6020 task_instantiation_info=tasks_dict_info,
6021 stage=stage,
6022 )
6023 # SCALE-UP VCA - END
6024 scale_process = None
6025
6026 # POST-SCALE BEGIN
6027 # execute primitive service POST-SCALING
6028 step = "Executing post-scale vnf-config-primitive"
6029 if scaling_descriptor.get("scaling-config-action"):
6030 for scaling_config_action in scaling_descriptor[
6031 "scaling-config-action"
6032 ]:
6033 if (
6034 scaling_config_action.get("trigger") == "post-scale-in"
6035 and scaling_type == "SCALE_IN"
6036 ) or (
6037 scaling_config_action.get("trigger") == "post-scale-out"
6038 and scaling_type == "SCALE_OUT"
6039 ):
6040 vnf_config_primitive = scaling_config_action[
6041 "vnf-config-primitive-name-ref"
6042 ]
6043 step = db_nslcmop_update[
6044 "detailed-status"
6045 ] = "executing post-scale scaling-config-action '{}'".format(
6046 vnf_config_primitive
6047 )
6048
6049 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6050 if db_vnfr.get("additionalParamsForVnf"):
6051 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6052
6053 # look for primitive
6054 for config_primitive in (
6055 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6056 ).get("config-primitive", ()):
6057 if config_primitive["name"] == vnf_config_primitive:
6058 break
6059 else:
6060 raise LcmException(
6061 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6062 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6063 "config-primitive".format(
6064 scaling_group, vnf_config_primitive
6065 )
6066 )
6067 scale_process = "VCA"
6068 db_nsr_update["config-status"] = "configuring post-scaling"
6069 primitive_params = self._map_primitive_params(
6070 config_primitive, {}, vnfr_params
6071 )
6072
6073 # Post-scale retry check: Check if this sub-operation has been executed before
6074 op_index = self._check_or_add_scale_suboperation(
6075 db_nslcmop,
6076 vnf_index,
6077 vnf_config_primitive,
6078 primitive_params,
6079 "POST-SCALE",
6080 )
6081 if op_index == self.SUBOPERATION_STATUS_SKIP:
6082 # Skip sub-operation
6083 result = "COMPLETED"
6084 result_detail = "Done"
6085 self.logger.debug(
6086 logging_text
6087 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6088 vnf_config_primitive, result, result_detail
6089 )
6090 )
6091 else:
6092 if op_index == self.SUBOPERATION_STATUS_NEW:
6093 # New sub-operation: Get index of this sub-operation
6094 op_index = (
6095 len(db_nslcmop.get("_admin", {}).get("operations"))
6096 - 1
6097 )
6098 self.logger.debug(
6099 logging_text
6100 + "vnf_config_primitive={} New sub-operation".format(
6101 vnf_config_primitive
6102 )
6103 )
6104 else:
6105 # retry: Get registered params for this existing sub-operation
6106 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6107 op_index
6108 ]
6109 vnf_index = op.get("member_vnf_index")
6110 vnf_config_primitive = op.get("primitive")
6111 primitive_params = op.get("primitive_params")
6112 self.logger.debug(
6113 logging_text
6114 + "vnf_config_primitive={} Sub-operation retry".format(
6115 vnf_config_primitive
6116 )
6117 )
6118 # Execute the primitive, either with new (first-time) or registered (reintent) args
6119 ee_descriptor_id = config_primitive.get(
6120 "execution-environment-ref"
6121 )
6122 primitive_name = config_primitive.get(
6123 "execution-environment-primitive", vnf_config_primitive
6124 )
6125 ee_id, vca_type = self._look_for_deployed_vca(
6126 nsr_deployed["VCA"],
6127 member_vnf_index=vnf_index,
6128 vdu_id=None,
6129 vdu_count_index=None,
6130 ee_descriptor_id=ee_descriptor_id,
6131 )
6132 result, result_detail = await self._ns_execute_primitive(
6133 ee_id,
6134 primitive_name,
6135 primitive_params,
6136 vca_type=vca_type,
6137 vca_id=vca_id,
6138 )
6139 self.logger.debug(
6140 logging_text
6141 + "vnf_config_primitive={} Done with result {} {}".format(
6142 vnf_config_primitive, result, result_detail
6143 )
6144 )
6145 # Update operationState = COMPLETED | FAILED
6146 self._update_suboperation_status(
6147 db_nslcmop, op_index, result, result_detail
6148 )
6149
6150 if result == "FAILED":
6151 raise LcmException(result_detail)
6152 db_nsr_update["config-status"] = old_config_status
6153 scale_process = None
6154 # POST-SCALE END
6155
6156 db_nsr_update[
6157 "detailed-status"
6158 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6159 db_nsr_update["operational-status"] = (
6160 "running"
6161 if old_operational_status == "failed"
6162 else old_operational_status
6163 )
6164 db_nsr_update["config-status"] = old_config_status
6165 return
6166 except (
6167 ROclient.ROClientException,
6168 DbException,
6169 LcmException,
6170 NgRoException,
6171 ) as e:
6172 self.logger.error(logging_text + "Exit Exception {}".format(e))
6173 exc = e
6174 except asyncio.CancelledError:
6175 self.logger.error(
6176 logging_text + "Cancelled Exception while '{}'".format(step)
6177 )
6178 exc = "Operation was cancelled"
6179 except Exception as e:
6180 exc = traceback.format_exc()
6181 self.logger.critical(
6182 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6183 exc_info=True,
6184 )
6185 finally:
6186 self._write_ns_status(
6187 nsr_id=nsr_id,
6188 ns_state=None,
6189 current_operation="IDLE",
6190 current_operation_id=None,
6191 )
6192 if tasks_dict_info:
6193 stage[1] = "Waiting for instantiate pending tasks."
6194 self.logger.debug(logging_text + stage[1])
6195 exc = await self._wait_for_tasks(
6196 logging_text,
6197 tasks_dict_info,
6198 self.timeout_ns_deploy,
6199 stage,
6200 nslcmop_id,
6201 nsr_id=nsr_id,
6202 )
6203 if exc:
6204 db_nslcmop_update[
6205 "detailed-status"
6206 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6207 nslcmop_operation_state = "FAILED"
6208 if db_nsr:
6209 db_nsr_update["operational-status"] = old_operational_status
6210 db_nsr_update["config-status"] = old_config_status
6211 db_nsr_update["detailed-status"] = ""
6212 if scale_process:
6213 if "VCA" in scale_process:
6214 db_nsr_update["config-status"] = "failed"
6215 if "RO" in scale_process:
6216 db_nsr_update["operational-status"] = "failed"
6217 db_nsr_update[
6218 "detailed-status"
6219 ] = "FAILED scaling nslcmop={} {}: {}".format(
6220 nslcmop_id, step, exc
6221 )
6222 else:
6223 error_description_nslcmop = None
6224 nslcmop_operation_state = "COMPLETED"
6225 db_nslcmop_update["detailed-status"] = "Done"
6226
6227 self._write_op_status(
6228 op_id=nslcmop_id,
6229 stage="",
6230 error_message=error_description_nslcmop,
6231 operation_state=nslcmop_operation_state,
6232 other_update=db_nslcmop_update,
6233 )
6234 if db_nsr:
6235 self._write_ns_status(
6236 nsr_id=nsr_id,
6237 ns_state=None,
6238 current_operation="IDLE",
6239 current_operation_id=None,
6240 other_update=db_nsr_update,
6241 )
6242
6243 if nslcmop_operation_state:
6244 try:
6245 msg = {
6246 "nsr_id": nsr_id,
6247 "nslcmop_id": nslcmop_id,
6248 "operationState": nslcmop_operation_state,
6249 }
6250 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6251 except Exception as e:
6252 self.logger.error(
6253 logging_text + "kafka_write notification Exception {}".format(e)
6254 )
6255 self.logger.debug(logging_text + "Exit")
6256 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6257
6258 async def _scale_kdu(
6259 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6260 ):
6261 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6262 for kdu_name in _scaling_info:
6263 for kdu_scaling_info in _scaling_info[kdu_name]:
6264 deployed_kdu, index = get_deployed_kdu(
6265 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6266 )
6267 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6268 kdu_instance = deployed_kdu["kdu-instance"]
6269 scale = int(kdu_scaling_info["scale"])
6270 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6271
6272 db_dict = {
6273 "collection": "nsrs",
6274 "filter": {"_id": nsr_id},
6275 "path": "_admin.deployed.K8s.{}".format(index),
6276 }
6277
6278 step = "scaling application {}".format(
6279 kdu_scaling_info["resource-name"]
6280 )
6281 self.logger.debug(logging_text + step)
6282
6283 if kdu_scaling_info["type"] == "delete":
6284 kdu_config = get_configuration(db_vnfd, kdu_name)
6285 if (
6286 kdu_config
6287 and kdu_config.get("terminate-config-primitive")
6288 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6289 ):
6290 terminate_config_primitive_list = kdu_config.get(
6291 "terminate-config-primitive"
6292 )
6293 terminate_config_primitive_list.sort(
6294 key=lambda val: int(val["seq"])
6295 )
6296
6297 for (
6298 terminate_config_primitive
6299 ) in terminate_config_primitive_list:
6300 primitive_params_ = self._map_primitive_params(
6301 terminate_config_primitive, {}, {}
6302 )
6303 step = "execute terminate config primitive"
6304 self.logger.debug(logging_text + step)
6305 await asyncio.wait_for(
6306 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6307 cluster_uuid=cluster_uuid,
6308 kdu_instance=kdu_instance,
6309 primitive_name=terminate_config_primitive["name"],
6310 params=primitive_params_,
6311 db_dict=db_dict,
6312 vca_id=vca_id,
6313 ),
6314 timeout=600,
6315 )
6316
6317 await asyncio.wait_for(
6318 self.k8scluster_map[k8s_cluster_type].scale(
6319 kdu_instance,
6320 scale,
6321 kdu_scaling_info["resource-name"],
6322 vca_id=vca_id,
6323 ),
6324 timeout=self.timeout_vca_on_error,
6325 )
6326
6327 if kdu_scaling_info["type"] == "create":
6328 kdu_config = get_configuration(db_vnfd, kdu_name)
6329 if (
6330 kdu_config
6331 and kdu_config.get("initial-config-primitive")
6332 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6333 ):
6334 initial_config_primitive_list = kdu_config.get(
6335 "initial-config-primitive"
6336 )
6337 initial_config_primitive_list.sort(
6338 key=lambda val: int(val["seq"])
6339 )
6340
6341 for initial_config_primitive in initial_config_primitive_list:
6342 primitive_params_ = self._map_primitive_params(
6343 initial_config_primitive, {}, {}
6344 )
6345 step = "execute initial config primitive"
6346 self.logger.debug(logging_text + step)
6347 await asyncio.wait_for(
6348 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6349 cluster_uuid=cluster_uuid,
6350 kdu_instance=kdu_instance,
6351 primitive_name=initial_config_primitive["name"],
6352 params=primitive_params_,
6353 db_dict=db_dict,
6354 vca_id=vca_id,
6355 ),
6356 timeout=600,
6357 )
6358
6359 async def _scale_ng_ro(
6360 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6361 ):
6362 nsr_id = db_nslcmop["nsInstanceId"]
6363 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6364 db_vnfrs = {}
6365
6366 # read from db: vnfd's for every vnf
6367 db_vnfds = []
6368
6369 # for each vnf in ns, read vnfd
6370 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6371 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6372 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6373 # if we haven't this vnfd, read it from db
6374 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6375 # read from db
6376 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6377 db_vnfds.append(vnfd)
6378 n2vc_key = self.n2vc.get_public_key()
6379 n2vc_key_list = [n2vc_key]
6380 self.scale_vnfr(
6381 db_vnfr,
6382 vdu_scaling_info.get("vdu-create"),
6383 vdu_scaling_info.get("vdu-delete"),
6384 mark_delete=True,
6385 )
6386 # db_vnfr has been updated, update db_vnfrs to use it
6387 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6388 await self._instantiate_ng_ro(
6389 logging_text,
6390 nsr_id,
6391 db_nsd,
6392 db_nsr,
6393 db_nslcmop,
6394 db_vnfrs,
6395 db_vnfds,
6396 n2vc_key_list,
6397 stage=stage,
6398 start_deploy=time(),
6399 timeout_ns_deploy=self.timeout_ns_deploy,
6400 )
6401 if vdu_scaling_info.get("vdu-delete"):
6402 self.scale_vnfr(
6403 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6404 )
6405
6406 async def extract_prometheus_scrape_jobs(
6407 self,
6408 ee_id,
6409 artifact_path,
6410 ee_config_descriptor,
6411 vnfr_id,
6412 nsr_id,
6413 target_ip
6414 ):
6415 # look if exist a file called 'prometheus*.j2' and
6416 artifact_content = self.fs.dir_ls(artifact_path)
6417 job_file = next(
6418 (
6419 f
6420 for f in artifact_content
6421 if f.startswith("prometheus") and f.endswith(".j2")
6422 ),
6423 None,
6424 )
6425 if not job_file:
6426 return
6427 with self.fs.file_open((artifact_path, job_file), "r") as f:
6428 job_data = f.read()
6429
6430 # TODO get_service
6431 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6432 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6433 host_port = "80"
6434 vnfr_id = vnfr_id.replace("-", "")
6435 variables = {
6436 "JOB_NAME": vnfr_id,
6437 "TARGET_IP": target_ip,
6438 "EXPORTER_POD_IP": host_name,
6439 "EXPORTER_POD_PORT": host_port,
6440 }
6441 job_list = parse_job(job_data, variables)
6442 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6443 for job in job_list:
6444 if (
6445 not isinstance(job.get("job_name"), str)
6446 or vnfr_id not in job["job_name"]
6447 ):
6448 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6449 job["nsr_id"] = nsr_id
6450 job["vnfr_id"] = vnfr_id
6451 return job_list
6452
6453 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6454 """
6455 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6456
6457 :param: vim_account_id: VIM Account ID
6458
6459 :return: (cloud_name, cloud_credential)
6460 """
6461 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6462 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6463
6464 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6465 """
6466 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6467
6468 :param: vim_account_id: VIM Account ID
6469
6470 :return: (cloud_name, cloud_credential)
6471 """
6472 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6473 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")