Bug 2051 fixed: avoid trying to insert pk in a VM for a KNF VCA
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 nsr_id = filter.get("_id")
366 try:
367 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
368 cluster_uuid=cluster_uuid,
369 kdu_instance=kdu_instance,
370 yaml_format=False,
371 complete_status=True,
372 vca_id=vca_id,
373 )
374
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 if cluster_type in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self.k8sclusterjuju.update_vca_status(
383 db_dict["vcaStatus"],
384 kdu_instance,
385 vca_id=vca_id,
386 )
387
388 self.logger.debug(
389 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
390 )
391
392 # write to database
393 self.update_db_2("nsrs", nsr_id, db_dict)
394 except (asyncio.CancelledError, asyncio.TimeoutError):
395 raise
396 except Exception as e:
397 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
398
399 @staticmethod
400 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
401 try:
402 env = Environment(undefined=StrictUndefined)
403 template = env.from_string(cloud_init_text)
404 return template.render(additional_params or {})
405 except UndefinedError as e:
406 raise LcmException(
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
410 )
411 except (TemplateError, TemplateNotFound) as e:
412 raise LcmException(
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
414 vnfd_id, vdu_id, e
415 )
416 )
417
418 def _get_vdu_cloud_init_content(self, vdu, vnfd):
419 cloud_init_content = cloud_init_file = None
420 try:
421 if vdu.get("cloud-init-file"):
422 base_folder = vnfd["_admin"]["storage"]
423 if base_folder["pkg-dir"]:
424 cloud_init_file = "{}/{}/cloud_init/{}".format(
425 base_folder["folder"],
426 base_folder["pkg-dir"],
427 vdu["cloud-init-file"],
428 )
429 else:
430 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
431 base_folder["folder"],
432 vdu["cloud-init-file"],
433 )
434 with self.fs.file_open(cloud_init_file, "r") as ci_file:
435 cloud_init_content = ci_file.read()
436 elif vdu.get("cloud-init"):
437 cloud_init_content = vdu["cloud-init"]
438
439 return cloud_init_content
440 except FsException as e:
441 raise LcmException(
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd["id"], vdu["id"], cloud_init_file, e
444 )
445 )
446
447 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
448 vdur = next(
449 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
450 {}
451 )
452 additional_params = vdur.get("additionalParams")
453 return parse_yaml_strings(additional_params)
454
455 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
456 """
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
463 """
464 vnfd_RO = deepcopy(vnfd)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO.pop("_id", None)
467 vnfd_RO.pop("_admin", None)
468 vnfd_RO.pop("monitoring-param", None)
469 vnfd_RO.pop("scaling-group-descriptor", None)
470 vnfd_RO.pop("kdu", None)
471 vnfd_RO.pop("k8s-cluster", None)
472 if new_id:
473 vnfd_RO["id"] = new_id
474
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu in get_iterable(vnfd_RO, "vdu"):
477 vdu.pop("cloud-init-file", None)
478 vdu.pop("cloud-init", None)
479 return vnfd_RO
480
481 @staticmethod
482 def ip_profile_2_RO(ip_profile):
483 RO_ip_profile = deepcopy(ip_profile)
484 if "dns-server" in RO_ip_profile:
485 if isinstance(RO_ip_profile["dns-server"], list):
486 RO_ip_profile["dns-address"] = []
487 for ds in RO_ip_profile.pop("dns-server"):
488 RO_ip_profile["dns-address"].append(ds["address"])
489 else:
490 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
491 if RO_ip_profile.get("ip-version") == "ipv4":
492 RO_ip_profile["ip-version"] = "IPv4"
493 if RO_ip_profile.get("ip-version") == "ipv6":
494 RO_ip_profile["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile:
496 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
497 return RO_ip_profile
498
499 def _get_ro_vim_id_for_vim_account(self, vim_account):
500 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
501 if db_vim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "VIM={} is not available. operationalState={}".format(
504 vim_account, db_vim["_admin"]["operationalState"]
505 )
506 )
507 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
508 return RO_vim_id
509
510 def get_ro_wim_id_for_wim_account(self, wim_account):
511 if isinstance(wim_account, str):
512 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
513 if db_wim["_admin"]["operationalState"] != "ENABLED":
514 raise LcmException(
515 "WIM={} is not available. operationalState={}".format(
516 wim_account, db_wim["_admin"]["operationalState"]
517 )
518 )
519 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
520 return RO_wim_id
521 else:
522 return wim_account
523
524 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
525
526 db_vdu_push_list = []
527 template_vdur = []
528 db_update = {"_admin.modified": time()}
529 if vdu_create:
530 for vdu_id, vdu_count in vdu_create.items():
531 vdur = next(
532 (
533 vdur
534 for vdur in reversed(db_vnfr["vdur"])
535 if vdur["vdu-id-ref"] == vdu_id
536 ),
537 None,
538 )
539 if not vdur:
540 # Read the template saved in the db:
541 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
542 vdur_template = db_vnfr.get("vdur-template")
543 if not vdur_template:
544 raise LcmException(
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
546 vdu_id
547 )
548 )
549 vdur = vdur_template[0]
550 #Delete a template from the database after using it
551 self.db.set_one("vnfrs",
552 {"_id": db_vnfr["_id"]},
553 None,
554 pull={"vdur-template": {"_id": vdur['_id']}}
555 )
556 for count in range(vdu_count):
557 vdur_copy = deepcopy(vdur)
558 vdur_copy["status"] = "BUILD"
559 vdur_copy["status-detailed"] = None
560 vdur_copy["ip-address"] = None
561 vdur_copy["_id"] = str(uuid4())
562 vdur_copy["count-index"] += count + 1
563 vdur_copy["id"] = "{}-{}".format(
564 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
565 )
566 vdur_copy.pop("vim_info", None)
567 for iface in vdur_copy["interfaces"]:
568 if iface.get("fixed-ip"):
569 iface["ip-address"] = self.increment_ip_mac(
570 iface["ip-address"], count + 1
571 )
572 else:
573 iface.pop("ip-address", None)
574 if iface.get("fixed-mac"):
575 iface["mac-address"] = self.increment_ip_mac(
576 iface["mac-address"], count + 1
577 )
578 else:
579 iface.pop("mac-address", None)
580 if db_vnfr["vdur"]:
581 iface.pop(
582 "mgmt_vnf", None
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list.append(vdur_copy)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
586 if vdu_delete:
587 if len(db_vnfr["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur = [db_vnfr["vdur"][0]]
591 for vdu_id, vdu_count in vdu_delete.items():
592 if mark_delete:
593 indexes_to_delete = [
594 iv[0]
595 for iv in enumerate(db_vnfr["vdur"])
596 if iv[1]["vdu-id-ref"] == vdu_id
597 ]
598 db_update.update(
599 {
600 "vdur.{}.status".format(i): "DELETING"
601 for i in indexes_to_delete[-vdu_count:]
602 }
603 )
604 else:
605 # it must be deleted one by one because common.db does not allow otherwise
606 vdus_to_delete = [
607 v
608 for v in reversed(db_vnfr["vdur"])
609 if v["vdu-id-ref"] == vdu_id
610 ]
611 for vdu in vdus_to_delete[:vdu_count]:
612 self.db.set_one(
613 "vnfrs",
614 {"_id": db_vnfr["_id"]},
615 None,
616 pull={"vdur": {"_id": vdu["_id"]}},
617 )
618 db_push = {}
619 if db_vdu_push_list:
620 db_push["vdur"] = db_vdu_push_list
621 if template_vdur:
622 db_push["vdur-template"] = template_vdur
623 if not db_push:
624 db_push = None
625 db_vnfr["vdur-template"] = template_vdur
626 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
627 # modify passed dictionary db_vnfr
628 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
629 db_vnfr["vdur"] = db_vnfr_["vdur"]
630
631 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
632 """
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
638 """
639
640 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
641 for net_RO in get_iterable(nsr_desc_RO, "nets"):
642 if vld["id"] != net_RO.get("ns_net_osm_id"):
643 continue
644 vld["vim-id"] = net_RO.get("vim_net_id")
645 vld["name"] = net_RO.get("vim_name")
646 vld["status"] = net_RO.get("status")
647 vld["status-detailed"] = net_RO.get("error_msg")
648 ns_update_nsr["vld.{}".format(vld_index)] = vld
649 break
650 else:
651 raise LcmException(
652 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
653 )
654
655 def set_vnfr_at_error(self, db_vnfrs, error_text):
656 try:
657 for db_vnfr in db_vnfrs.values():
658 vnfr_update = {"status": "ERROR"}
659 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
660 if "status" not in vdur:
661 vdur["status"] = "ERROR"
662 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
663 if error_text:
664 vdur["status-detailed"] = str(error_text)
665 vnfr_update[
666 "vdur.{}.status-detailed".format(vdu_index)
667 ] = "ERROR"
668 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
669 except DbException as e:
670 self.logger.error("Cannot update vnf. {}".format(e))
671
672 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
673 """
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
678 """
679 for vnf_index, db_vnfr in db_vnfrs.items():
680 for vnf_RO in nsr_desc_RO["vnfs"]:
681 if vnf_RO["member_vnf_index"] != vnf_index:
682 continue
683 vnfr_update = {}
684 if vnf_RO.get("ip_address"):
685 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
686 "ip_address"
687 ].split(";")[0]
688 elif not db_vnfr.get("ip-address"):
689 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
692 vnf_index
693 )
694 )
695
696 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
697 vdur_RO_count_index = 0
698 if vdur.get("pdu-type"):
699 continue
700 for vdur_RO in get_iterable(vnf_RO, "vms"):
701 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
702 continue
703 if vdur["count-index"] != vdur_RO_count_index:
704 vdur_RO_count_index += 1
705 continue
706 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
707 if vdur_RO.get("ip_address"):
708 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
709 else:
710 vdur["ip-address"] = None
711 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
712 vdur["name"] = vdur_RO.get("vim_name")
713 vdur["status"] = vdur_RO.get("status")
714 vdur["status-detailed"] = vdur_RO.get("error_msg")
715 for ifacer in get_iterable(vdur, "interfaces"):
716 for interface_RO in get_iterable(vdur_RO, "interfaces"):
717 if ifacer["name"] == interface_RO.get("internal_name"):
718 ifacer["ip-address"] = interface_RO.get(
719 "ip_address"
720 )
721 ifacer["mac-address"] = interface_RO.get(
722 "mac_address"
723 )
724 break
725 else:
726 raise LcmException(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
730 )
731 )
732 vnfr_update["vdur.{}".format(vdu_index)] = vdur
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
737 "VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
739 )
740 )
741
742 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
743 for net_RO in get_iterable(nsr_desc_RO, "nets"):
744 if vld["id"] != net_RO.get("vnf_net_osm_id"):
745 continue
746 vld["vim-id"] = net_RO.get("vim_net_id")
747 vld["name"] = net_RO.get("vim_name")
748 vld["status"] = net_RO.get("status")
749 vld["status-detailed"] = net_RO.get("error_msg")
750 vnfr_update["vld.{}".format(vld_index)] = vld
751 break
752 else:
753 raise LcmException(
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
755 vnf_index, vld["id"]
756 )
757 )
758
759 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
760 break
761
762 else:
763 raise LcmException(
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
765 vnf_index
766 )
767 )
768
769 def _get_ns_config_info(self, nsr_id):
770 """
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
776 """
777 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
778 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
779 mapping = {}
780 ns_config_info = {"osm-config-mapping": mapping}
781 for vca in vca_deployed_list:
782 if not vca["member-vnf-index"]:
783 continue
784 if not vca["vdu_id"]:
785 mapping[vca["member-vnf-index"]] = vca["application"]
786 else:
787 mapping[
788 "{}.{}.{}".format(
789 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
790 )
791 ] = vca["application"]
792 return ns_config_info
793
794 async def _instantiate_ng_ro(
795 self,
796 logging_text,
797 nsr_id,
798 nsd,
799 db_nsr,
800 db_nslcmop,
801 db_vnfrs,
802 db_vnfds,
803 n2vc_key_list,
804 stage,
805 start_deploy,
806 timeout_ns_deploy,
807 ):
808
809 db_vims = {}
810
811 def get_vim_account(vim_account_id):
812 nonlocal db_vims
813 if vim_account_id in db_vims:
814 return db_vims[vim_account_id]
815 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
816 db_vims[vim_account_id] = db_vim
817 return db_vim
818
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim, target_vld, vld_params, target_sdn
822 ):
823 if vld_params.get("ip-profile"):
824 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
825 "ip-profile"
826 ]
827 if vld_params.get("provider-network"):
828 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
829 "provider-network"
830 ]
831 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
832 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
833 "provider-network"
834 ]["sdn-ports"]
835 if vld_params.get("wimAccountId"):
836 target_wim = "wim:{}".format(vld_params["wimAccountId"])
837 target_vld["vim_info"][target_wim] = {}
838 for param in ("vim-network-name", "vim-network-id"):
839 if vld_params.get(param):
840 if isinstance(vld_params[param], dict):
841 for vim, vim_net in vld_params[param].items():
842 other_target_vim = "vim:" + vim
843 populate_dict(
844 target_vld["vim_info"],
845 (other_target_vim, param.replace("-", "_")),
846 vim_net,
847 )
848 else: # isinstance str
849 target_vld["vim_info"][target_vim][
850 param.replace("-", "_")
851 ] = vld_params[param]
852 if vld_params.get("common_id"):
853 target_vld["common_id"] = vld_params.get("common_id")
854
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target, ns_params):
857 for vnf_params in ns_params.get("vnf", ()):
858 if vnf_params.get("vimAccountId"):
859 target_vnf = next(
860 (
861 vnfr
862 for vnfr in db_vnfrs.values()
863 if vnf_params["member-vnf-index"]
864 == vnfr["member-vnf-index-ref"]
865 ),
866 None,
867 )
868 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
869 for a_index, a_vld in enumerate(target["ns"]["vld"]):
870 target_vld = find_in_list(
871 get_iterable(vdur, "interfaces"),
872 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
873 )
874 if target_vld:
875 if vnf_params.get("vimAccountId") not in a_vld.get(
876 "vim_info", {}
877 ):
878 target["ns"]["vld"][a_index].get("vim_info").update(
879 {
880 "vim:{}".format(vnf_params["vimAccountId"]): {
881 "vim_network_name": ""
882 }
883 }
884 )
885
886 nslcmop_id = db_nslcmop["_id"]
887 target = {
888 "name": db_nsr["name"],
889 "ns": {"vld": []},
890 "vnf": [],
891 "image": deepcopy(db_nsr["image"]),
892 "flavor": deepcopy(db_nsr["flavor"]),
893 "action_id": nslcmop_id,
894 "cloud_init_content": {},
895 }
896 for image in target["image"]:
897 image["vim_info"] = {}
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = {}
900 if db_nsr.get("affinity-or-anti-affinity-group"):
901 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group["vim_info"] = {}
904
905 if db_nslcmop.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate = self.db.get_list(
908 "nslcmops",
909 {
910 "nsInstanceId": db_nslcmop["nsInstanceId"],
911 "lcmOperationType": "instantiate",
912 },
913 )[-1]
914 ns_params = db_nslcmop_instantiate.get("operationParams")
915 else:
916 ns_params = db_nslcmop.get("operationParams")
917 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
918 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
919
920 cp2target = {}
921 for vld_index, vld in enumerate(db_nsr.get("vld")):
922 target_vim = "vim:{}".format(ns_params["vimAccountId"])
923 target_vld = {
924 "id": vld["id"],
925 "name": vld["name"],
926 "mgmt-network": vld.get("mgmt-network", False),
927 "type": vld.get("type"),
928 "vim_info": {
929 target_vim: {
930 "vim_network_name": vld.get("vim-network-name"),
931 "vim_account_id": ns_params["vimAccountId"],
932 }
933 },
934 }
935 # check if this network needs SDN assist
936 if vld.get("pci-interfaces"):
937 db_vim = get_vim_account(ns_params["vimAccountId"])
938 sdnc_id = db_vim["config"].get("sdn-controller")
939 if sdnc_id:
940 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
941 target_sdn = "sdn:{}".format(sdnc_id)
942 target_vld["vim_info"][target_sdn] = {
943 "sdn": True,
944 "target_vim": target_vim,
945 "vlds": [sdn_vld],
946 "type": vld.get("type"),
947 }
948
949 nsd_vnf_profiles = get_vnf_profiles(nsd)
950 for nsd_vnf_profile in nsd_vnf_profiles:
951 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
952 if cp["virtual-link-profile-id"] == vld["id"]:
953 cp2target[
954 "member_vnf:{}.{}".format(
955 cp["constituent-cpd-id"][0][
956 "constituent-base-element-id"
957 ],
958 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
959 )
960 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
961
962 # check at nsd descriptor, if there is an ip-profile
963 vld_params = {}
964 nsd_vlp = find_in_list(
965 get_virtual_link_profiles(nsd),
966 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
967 == vld["id"],
968 )
969 if (
970 nsd_vlp
971 and nsd_vlp.get("virtual-link-protocol-data")
972 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
973 ):
974 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
975 "l3-protocol-data"
976 ]
977 ip_profile_dest_data = {}
978 if "ip-version" in ip_profile_source_data:
979 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
980 "ip-version"
981 ]
982 if "cidr" in ip_profile_source_data:
983 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
984 "cidr"
985 ]
986 if "gateway-ip" in ip_profile_source_data:
987 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
988 "gateway-ip"
989 ]
990 if "dhcp-enabled" in ip_profile_source_data:
991 ip_profile_dest_data["dhcp-params"] = {
992 "enabled": ip_profile_source_data["dhcp-enabled"]
993 }
994 vld_params["ip-profile"] = ip_profile_dest_data
995
996 # update vld_params with instantiation params
997 vld_instantiation_params = find_in_list(
998 get_iterable(ns_params, "vld"),
999 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1000 )
1001 if vld_instantiation_params:
1002 vld_params.update(vld_instantiation_params)
1003 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1004 target["ns"]["vld"].append(target_vld)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target, ns_params)
1007
1008 for vnfr in db_vnfrs.values():
1009 vnfd = find_in_list(
1010 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1011 )
1012 vnf_params = find_in_list(
1013 get_iterable(ns_params, "vnf"),
1014 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1015 )
1016 target_vnf = deepcopy(vnfr)
1017 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1018 for vld in target_vnf.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp = find_in_list(
1021 vnfd.get("int-virtual-link-desc", ()),
1022 lambda cpd: cpd.get("id") == vld["id"],
1023 )
1024 if vnf_cp:
1025 ns_cp = "member_vnf:{}.{}".format(
1026 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1027 )
1028 if cp2target.get(ns_cp):
1029 vld["target"] = cp2target[ns_cp]
1030
1031 vld["vim_info"] = {
1032 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1033 }
1034 # check if this network needs SDN assist
1035 target_sdn = None
1036 if vld.get("pci-interfaces"):
1037 db_vim = get_vim_account(vnfr["vim-account-id"])
1038 sdnc_id = db_vim["config"].get("sdn-controller")
1039 if sdnc_id:
1040 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1041 target_sdn = "sdn:{}".format(sdnc_id)
1042 vld["vim_info"][target_sdn] = {
1043 "sdn": True,
1044 "target_vim": target_vim,
1045 "vlds": [sdn_vld],
1046 "type": vld.get("type"),
1047 }
1048
1049 # check at vnfd descriptor, if there is an ip-profile
1050 vld_params = {}
1051 vnfd_vlp = find_in_list(
1052 get_virtual_link_profiles(vnfd),
1053 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1054 )
1055 if (
1056 vnfd_vlp
1057 and vnfd_vlp.get("virtual-link-protocol-data")
1058 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1059 ):
1060 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1061 "l3-protocol-data"
1062 ]
1063 ip_profile_dest_data = {}
1064 if "ip-version" in ip_profile_source_data:
1065 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1066 "ip-version"
1067 ]
1068 if "cidr" in ip_profile_source_data:
1069 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1070 "cidr"
1071 ]
1072 if "gateway-ip" in ip_profile_source_data:
1073 ip_profile_dest_data[
1074 "gateway-address"
1075 ] = ip_profile_source_data["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data:
1077 ip_profile_dest_data["dhcp-params"] = {
1078 "enabled": ip_profile_source_data["dhcp-enabled"]
1079 }
1080
1081 vld_params["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1083 if vnf_params:
1084 vld_instantiation_params = find_in_list(
1085 get_iterable(vnf_params, "internal-vld"),
1086 lambda i_vld: i_vld["name"] == vld["id"],
1087 )
1088 if vld_instantiation_params:
1089 vld_params.update(vld_instantiation_params)
1090 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1091
1092 vdur_list = []
1093 for vdur in target_vnf.get("vdur", ()):
1094 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1097
1098 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1099
1100 if ssh_keys_all:
1101 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1102 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1103 if (
1104 vdu_configuration
1105 and vdu_configuration.get("config-access")
1106 and vdu_configuration.get("config-access").get("ssh-access")
1107 ):
1108 vdur["ssh-keys"] = ssh_keys_all
1109 vdur["ssh-access-required"] = vdu_configuration[
1110 "config-access"
1111 ]["ssh-access"]["required"]
1112 elif (
1113 vnf_configuration
1114 and vnf_configuration.get("config-access")
1115 and vnf_configuration.get("config-access").get("ssh-access")
1116 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1117 ):
1118 vdur["ssh-keys"] = ssh_keys_all
1119 vdur["ssh-access-required"] = vnf_configuration[
1120 "config-access"
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation and find_in_list(
1123 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1124 ):
1125 vdur["ssh-keys"] = ssh_keys_instantiation
1126
1127 self.logger.debug("NS > vdur > {}".format(vdur))
1128
1129 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1130 # cloud-init
1131 if vdud.get("cloud-init-file"):
1132 vdur["cloud-init"] = "{}:file:{}".format(
1133 vnfd["_id"], vdud.get("cloud-init-file")
1134 )
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur["cloud-init"] not in target["cloud_init_content"]:
1137 base_folder = vnfd["_admin"]["storage"]
1138 if base_folder["pkg-dir"]:
1139 cloud_init_file = "{}/{}/cloud_init/{}".format(
1140 base_folder["folder"],
1141 base_folder["pkg-dir"],
1142 vdud.get("cloud-init-file"),
1143 )
1144 else:
1145 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1146 base_folder["folder"],
1147 vdud.get("cloud-init-file"),
1148 )
1149 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1150 target["cloud_init_content"][
1151 vdur["cloud-init"]
1152 ] = ci_file.read()
1153 elif vdud.get("cloud-init"):
1154 vdur["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1156 )
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1159 "cloud-init"
1160 ]
1161 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1162 deploy_params_vdu = self._format_additional_params(
1163 vdur.get("additionalParams") or {}
1164 )
1165 deploy_params_vdu["OSM"] = get_osm_params(
1166 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1167 )
1168 vdur["additionalParams"] = deploy_params_vdu
1169
1170 # flavor
1171 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1172 if target_vim not in ns_flavor["vim_info"]:
1173 ns_flavor["vim_info"][target_vim] = {}
1174
1175 # deal with images
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id = int(vdur["ns-image-id"])
1179 if vdur.get("alt-image-ids"):
1180 db_vim = get_vim_account(vnfr["vim-account-id"])
1181 vim_type = db_vim["vim_type"]
1182 for alt_image_id in vdur.get("alt-image-ids"):
1183 ns_alt_image = target["image"][int(alt_image_id)]
1184 if vim_type == ns_alt_image.get("vim-type"):
1185 # must use alternative image
1186 self.logger.debug(
1187 "use alternative image id: {}".format(alt_image_id)
1188 )
1189 ns_image_id = alt_image_id
1190 vdur["ns-image-id"] = ns_image_id
1191 break
1192 ns_image = target["image"][int(ns_image_id)]
1193 if target_vim not in ns_image["vim_info"]:
1194 ns_image["vim_info"][target_vim] = {}
1195
1196 # Affinity groups
1197 if vdur.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1199 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1200 if target_vim not in ns_ags["vim_info"]:
1201 ns_ags["vim_info"][target_vim] = {}
1202
1203 vdur["vim_info"] = {target_vim: {}}
1204 # instantiation parameters
1205 # if vnf_params:
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list.append(vdur)
1209 target_vnf["vdur"] = vdur_list
1210 target["vnf"].append(target_vnf)
1211
1212 desc = await self.RO.deploy(nsr_id, target)
1213 self.logger.debug("RO return > {}".format(desc))
1214 action_id = desc["action_id"]
1215 await self._wait_ng_ro(
1216 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1217 )
1218
1219 # Updating NSR
1220 db_nsr_update = {
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage),
1223 }
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1226 self._write_op_status(nslcmop_id, stage)
1227 self.logger.debug(
1228 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1229 )
1230 return
1231
1232 async def _wait_ng_ro(
1233 self,
1234 nsr_id,
1235 action_id,
1236 nslcmop_id=None,
1237 start_time=None,
1238 timeout=600,
1239 stage=None,
1240 ):
1241 detailed_status_old = None
1242 db_nsr_update = {}
1243 start_time = start_time or time()
1244 while time() <= start_time + timeout:
1245 desc_status = await self.RO.status(nsr_id, action_id)
1246 self.logger.debug("Wait NG RO > {}".format(desc_status))
1247 if desc_status["status"] == "FAILED":
1248 raise NgRoException(desc_status["details"])
1249 elif desc_status["status"] == "BUILD":
1250 if stage:
1251 stage[2] = "VIM: ({})".format(desc_status["details"])
1252 elif desc_status["status"] == "DONE":
1253 if stage:
1254 stage[2] = "Deployed at VIM"
1255 break
1256 else:
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status["status"]
1259 )
1260 if stage and nslcmop_id and stage[2] != detailed_status_old:
1261 detailed_status_old = stage[2]
1262 db_nsr_update["detailed-status"] = " ".join(stage)
1263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1264 self._write_op_status(nslcmop_id, stage)
1265 await asyncio.sleep(15, loop=self.loop)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1268
1269 async def _terminate_ng_ro(
1270 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1271 ):
1272 db_nsr_update = {}
1273 failed_detail = []
1274 action_id = None
1275 start_deploy = time()
1276 try:
1277 target = {
1278 "ns": {"vld": []},
1279 "vnf": [],
1280 "image": [],
1281 "flavor": [],
1282 "action_id": nslcmop_id,
1283 }
1284 desc = await self.RO.deploy(nsr_id, target)
1285 action_id = desc["action_id"]
1286 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1288 self.logger.debug(
1289 logging_text
1290 + "ns terminate action at RO. action_id={}".format(action_id)
1291 )
1292
1293 # wait until done
1294 delete_timeout = 20 * 60 # 20 minutes
1295 await self._wait_ng_ro(
1296 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1297 )
1298
1299 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 # delete all nsr
1302 await self.RO.delete(nsr_id)
1303 except Exception as e:
1304 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1305 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1308 self.logger.debug(
1309 logging_text + "RO_action_id={} already deleted".format(action_id)
1310 )
1311 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1312 failed_detail.append("delete conflict: {}".format(e))
1313 self.logger.debug(
1314 logging_text
1315 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1316 )
1317 else:
1318 failed_detail.append("delete error: {}".format(e))
1319 self.logger.error(
1320 logging_text
1321 + "RO_action_id={} delete error: {}".format(action_id, e)
1322 )
1323
1324 if failed_detail:
1325 stage[2] = "Error deleting from VIM"
1326 else:
1327 stage[2] = "Deleted from VIM"
1328 db_nsr_update["detailed-status"] = " ".join(stage)
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 self._write_op_status(nslcmop_id, stage)
1331
1332 if failed_detail:
1333 raise LcmException("; ".join(failed_detail))
1334 return
1335
1336 async def instantiate_RO(
1337 self,
1338 logging_text,
1339 nsr_id,
1340 nsd,
1341 db_nsr,
1342 db_nslcmop,
1343 db_vnfrs,
1344 db_vnfds,
1345 n2vc_key_list,
1346 stage,
1347 ):
1348 """
1349 Instantiate at RO
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1355 :param db_vnfrs:
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1360 """
1361 try:
1362 start_deploy = time()
1363 ns_params = db_nslcmop.get("operationParams")
1364 if ns_params and ns_params.get("timeout_ns_deploy"):
1365 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1366 else:
1367 timeout_ns_deploy = self.timeout.get(
1368 "ns_deploy", self.timeout_ns_deploy
1369 )
1370
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage[2] = "Waiting for Placement."
1373 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr in db_vnfrs.values():
1376 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1377 break
1378 else:
1379 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1380
1381 return await self._instantiate_ng_ro(
1382 logging_text,
1383 nsr_id,
1384 nsd,
1385 db_nsr,
1386 db_nslcmop,
1387 db_vnfrs,
1388 db_vnfds,
1389 n2vc_key_list,
1390 stage,
1391 start_deploy,
1392 timeout_ns_deploy,
1393 )
1394 except Exception as e:
1395 stage[2] = "ERROR deploying at VIM"
1396 self.set_vnfr_at_error(db_vnfrs, str(e))
1397 self.logger.error(
1398 "Error deploying at VIM {}".format(e),
1399 exc_info=not isinstance(
1400 e,
1401 (
1402 ROclient.ROClientException,
1403 LcmException,
1404 DbException,
1405 NgRoException,
1406 ),
1407 ),
1408 )
1409 raise
1410
1411 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1412 """
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param kdu_name:
1418 :return: IP address
1419 """
1420
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1422 nb_tries = 0
1423
1424 while nb_tries < 360:
1425 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1426 kdur = next(
1427 (
1428 x
1429 for x in get_iterable(db_vnfr, "kdur")
1430 if x.get("kdu-name") == kdu_name
1431 ),
1432 None,
1433 )
1434 if not kdur:
1435 raise LcmException(
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1437 )
1438 if kdur.get("status"):
1439 if kdur["status"] in ("READY", "ENABLED"):
1440 return kdur.get("ip-address")
1441 else:
1442 raise LcmException(
1443 "target KDU={} is in error state".format(kdu_name)
1444 )
1445
1446 await asyncio.sleep(10, loop=self.loop)
1447 nb_tries += 1
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1449
1450 async def wait_vm_up_insert_key_ro(
1451 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1452 ):
1453 """
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1456 :param nsr_id:
1457 :param vnfr_id:
1458 :param vdu_id:
1459 :param vdu_index:
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1462 :return: IP address
1463 """
1464
1465 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1466 ro_nsr_id = None
1467 ip_address = None
1468 nb_tries = 0
1469 target_vdu_id = None
1470 ro_retries = 0
1471
1472 while True:
1473
1474 ro_retries += 1
1475 if ro_retries >= 360: # 1 hour
1476 raise LcmException(
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1478 )
1479
1480 await asyncio.sleep(10, loop=self.loop)
1481
1482 # get ip address
1483 if not target_vdu_id:
1484 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1485
1486 if not vdu_id: # for the VNF case
1487 if db_vnfr.get("status") == "ERROR":
1488 raise LcmException(
1489 "Cannot inject ssh-key because target VNF is in error state"
1490 )
1491 ip_address = db_vnfr.get("ip-address")
1492 if not ip_address:
1493 continue
1494 vdur = next(
1495 (
1496 x
1497 for x in get_iterable(db_vnfr, "vdur")
1498 if x.get("ip-address") == ip_address
1499 ),
1500 None,
1501 )
1502 else: # VDU case
1503 vdur = next(
1504 (
1505 x
1506 for x in get_iterable(db_vnfr, "vdur")
1507 if x.get("vdu-id-ref") == vdu_id
1508 and x.get("count-index") == vdu_index
1509 ),
1510 None,
1511 )
1512
1513 if (
1514 not vdur and len(db_vnfr.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur = db_vnfr["vdur"][0]
1517 if not vdur:
1518 raise LcmException(
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id, vdu_id, vdu_index
1521 )
1522 )
1523 # New generation RO stores information at "vim_info"
1524 ng_ro_status = None
1525 target_vim = None
1526 if vdur.get("vim_info"):
1527 target_vim = next(
1528 t for t in vdur["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1531 if (
1532 vdur.get("pdu-type")
1533 or vdur.get("status") == "ACTIVE"
1534 or ng_ro_status == "ACTIVE"
1535 ):
1536 ip_address = vdur.get("ip-address")
1537 if not ip_address:
1538 continue
1539 target_vdu_id = vdur["vdu-id-ref"]
1540 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1541 raise LcmException(
1542 "Cannot inject ssh-key because target VM is in error state"
1543 )
1544
1545 if not target_vdu_id:
1546 continue
1547
1548 # inject public key into machine
1549 if pub_key and user:
1550 self.logger.debug(logging_text + "Inserting RO key")
1551 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1552 if vdur.get("pdu-type"):
1553 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1554 return ip_address
1555 try:
1556 ro_vm_id = "{}-{}".format(
1557 db_vnfr["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1559 if self.ng_ro:
1560 target = {
1561 "action": {
1562 "action": "inject_ssh_key",
1563 "key": pub_key,
1564 "user": user,
1565 },
1566 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1567 }
1568 desc = await self.RO.deploy(nsr_id, target)
1569 action_id = desc["action_id"]
1570 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1571 break
1572 else:
1573 # wait until NS is deployed at RO
1574 if not ro_nsr_id:
1575 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1576 ro_nsr_id = deep_get(
1577 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1578 )
1579 if not ro_nsr_id:
1580 continue
1581 result_dict = await self.RO.create_action(
1582 item="ns",
1583 item_id_name=ro_nsr_id,
1584 descriptor={
1585 "add_public_key": pub_key,
1586 "vms": [ro_vm_id],
1587 "user": user,
1588 },
1589 )
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict or not isinstance(result_dict, dict):
1592 raise LcmException(
1593 "Unknown response from RO when injecting key"
1594 )
1595 for result in result_dict.values():
1596 if result.get("vim_result") == 200:
1597 break
1598 else:
1599 raise ROclient.ROClientException(
1600 "error injecting key: {}".format(
1601 result.get("description")
1602 )
1603 )
1604 break
1605 except NgRoException as e:
1606 raise LcmException(
1607 "Reaching max tries injecting key. Error: {}".format(e)
1608 )
1609 except ROclient.ROClientException as e:
1610 if not nb_tries:
1611 self.logger.debug(
1612 logging_text
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1614 e, 20 * 10
1615 )
1616 )
1617 nb_tries += 1
1618 if nb_tries >= 20:
1619 raise LcmException(
1620 "Reaching max tries injecting key. Error: {}".format(e)
1621 )
1622 else:
1623 break
1624
1625 return ip_address
1626
1627 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1628 """
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1630 """
1631 my_vca = vca_deployed_list[vca_index]
1632 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1634 return
1635 timeout = 300
1636 while timeout >= 0:
1637 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1638 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1639 configuration_status_list = db_nsr["configurationStatus"]
1640 for index, vca_deployed in enumerate(configuration_status_list):
1641 if index == vca_index:
1642 # myself
1643 continue
1644 if not my_vca.get("member-vnf-index") or (
1645 vca_deployed.get("member-vnf-index")
1646 == my_vca.get("member-vnf-index")
1647 ):
1648 internal_status = configuration_status_list[index].get("status")
1649 if internal_status == "READY":
1650 continue
1651 elif internal_status == "BROKEN":
1652 raise LcmException(
1653 "Configuration aborted because dependent charm/s has failed"
1654 )
1655 else:
1656 break
1657 else:
1658 # no dependencies, return
1659 return
1660 await asyncio.sleep(10)
1661 timeout -= 1
1662
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1664
1665 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1666 vca_id = None
1667 if db_vnfr:
1668 vca_id = deep_get(db_vnfr, ("vca-id",))
1669 elif db_nsr:
1670 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1671 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1672 return vca_id
1673
1674 async def instantiate_N2VC(
1675 self,
1676 logging_text,
1677 vca_index,
1678 nsi_id,
1679 db_nsr,
1680 db_vnfr,
1681 vdu_id,
1682 kdu_name,
1683 vdu_index,
1684 config_descriptor,
1685 deploy_params,
1686 base_folder,
1687 nslcmop_id,
1688 stage,
1689 vca_type,
1690 vca_name,
1691 ee_config_descriptor,
1692 ):
1693 nsr_id = db_nsr["_id"]
1694 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1695 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1696 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1697 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1698 db_dict = {
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id},
1701 "path": db_update_entry,
1702 }
1703 step = ""
1704 try:
1705
1706 element_type = "NS"
1707 element_under_configuration = nsr_id
1708
1709 vnfr_id = None
1710 if db_vnfr:
1711 vnfr_id = db_vnfr["_id"]
1712 osm_config["osm"]["vnf_id"] = vnfr_id
1713
1714 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1715
1716 if vca_type == "native_charm":
1717 index_number = 0
1718 else:
1719 index_number = vdu_index or 0
1720
1721 if vnfr_id:
1722 element_type = "VNF"
1723 element_under_configuration = vnfr_id
1724 namespace += ".{}-{}".format(vnfr_id, index_number)
1725 if vdu_id:
1726 namespace += ".{}-{}".format(vdu_id, index_number)
1727 element_type = "VDU"
1728 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1729 osm_config["osm"]["vdu_id"] = vdu_id
1730 elif kdu_name:
1731 namespace += ".{}".format(kdu_name)
1732 element_type = "KDU"
1733 element_under_configuration = kdu_name
1734 osm_config["osm"]["kdu_name"] = kdu_name
1735
1736 # Get artifact path
1737 if base_folder["pkg-dir"]:
1738 artifact_path = "{}/{}/{}/{}".format(
1739 base_folder["folder"],
1740 base_folder["pkg-dir"],
1741 "charms"
1742 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1743 else "helm-charts",
1744 vca_name,
1745 )
1746 else:
1747 artifact_path = "{}/Scripts/{}/{}/".format(
1748 base_folder["folder"],
1749 "charms"
1750 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1751 else "helm-charts",
1752 vca_name,
1753 )
1754
1755 self.logger.debug("Artifact path > {}".format(artifact_path))
1756
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list = config_descriptor.get(
1759 "initial-config-primitive"
1760 )
1761
1762 self.logger.debug(
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1765 )
1766 )
1767
1768 # add config if not present for NS charm
1769 ee_descriptor_id = ee_config_descriptor.get("id")
1770 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1771 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1773 )
1774
1775 self.logger.debug(
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1778 )
1779 )
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id = vca_deployed.get("ee_id")
1783
1784 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1785 # create or register execution environment in VCA
1786 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1787
1788 self._write_configuration_status(
1789 nsr_id=nsr_id,
1790 vca_index=vca_index,
1791 status="CREATING",
1792 element_under_configuration=element_under_configuration,
1793 element_type=element_type,
1794 )
1795
1796 step = "create execution environment"
1797 self.logger.debug(logging_text + step)
1798
1799 ee_id = None
1800 credentials = None
1801 if vca_type == "k8s_proxy_charm":
1802 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1803 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1804 namespace=namespace,
1805 artifact_path=artifact_path,
1806 db_dict=db_dict,
1807 vca_id=vca_id,
1808 )
1809 elif vca_type == "helm" or vca_type == "helm-v3":
1810 ee_id, credentials = await self.vca_map[
1811 vca_type
1812 ].create_execution_environment(
1813 namespace=namespace,
1814 reuse_ee_id=ee_id,
1815 db_dict=db_dict,
1816 config=osm_config,
1817 artifact_path=artifact_path,
1818 vca_type=vca_type,
1819 )
1820 else:
1821 ee_id, credentials = await self.vca_map[
1822 vca_type
1823 ].create_execution_environment(
1824 namespace=namespace,
1825 reuse_ee_id=ee_id,
1826 db_dict=db_dict,
1827 vca_id=vca_id,
1828 )
1829
1830 elif vca_type == "native_charm":
1831 step = "Waiting to VM being up and getting IP address"
1832 self.logger.debug(logging_text + step)
1833 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1834 logging_text,
1835 nsr_id,
1836 vnfr_id,
1837 vdu_id,
1838 vdu_index,
1839 user=None,
1840 pub_key=None,
1841 )
1842 credentials = {"hostname": rw_mgmt_ip}
1843 # get username
1844 username = deep_get(
1845 config_descriptor, ("config-access", "ssh-access", "default-user")
1846 )
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username and initial_config_primitive_list:
1850 for config_primitive in initial_config_primitive_list:
1851 for param in config_primitive.get("parameter", ()):
1852 if param["name"] == "ssh-username":
1853 username = param["value"]
1854 break
1855 if not username:
1856 raise LcmException(
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1859 )
1860 credentials["username"] = username
1861 # n2vc_redesign STEP 3.2
1862
1863 self._write_configuration_status(
1864 nsr_id=nsr_id,
1865 vca_index=vca_index,
1866 status="REGISTERING",
1867 element_under_configuration=element_under_configuration,
1868 element_type=element_type,
1869 )
1870
1871 step = "register execution environment {}".format(credentials)
1872 self.logger.debug(logging_text + step)
1873 ee_id = await self.vca_map[vca_type].register_execution_environment(
1874 credentials=credentials,
1875 namespace=namespace,
1876 db_dict=db_dict,
1877 vca_id=vca_id,
1878 )
1879
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts = ee_id.split(".")
1883 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1884 if len(ee_id_parts) >= 2:
1885 model_name = ee_id_parts[0]
1886 application_name = ee_id_parts[1]
1887 db_nsr_update[db_update_entry + "model"] = model_name
1888 db_nsr_update[db_update_entry + "application"] = application_name
1889
1890 # n2vc_redesign STEP 3.3
1891 step = "Install configuration Software"
1892
1893 self._write_configuration_status(
1894 nsr_id=nsr_id,
1895 vca_index=vca_index,
1896 status="INSTALLING SW",
1897 element_under_configuration=element_under_configuration,
1898 element_type=element_type,
1899 other_update=db_nsr_update,
1900 )
1901
1902 # TODO check if already done
1903 self.logger.debug(logging_text + step)
1904 config = None
1905 if vca_type == "native_charm":
1906 config_primitive = next(
1907 (p for p in initial_config_primitive_list if p["name"] == "config"),
1908 None,
1909 )
1910 if config_primitive:
1911 config = self._map_primitive_params(
1912 config_primitive, {}, deploy_params
1913 )
1914 num_units = 1
1915 if vca_type == "lxc_proxy_charm":
1916 if element_type == "NS":
1917 num_units = db_nsr.get("config-units") or 1
1918 elif element_type == "VNF":
1919 num_units = db_vnfr.get("config-units") or 1
1920 elif element_type == "VDU":
1921 for v in db_vnfr["vdur"]:
1922 if vdu_id == v["vdu-id-ref"]:
1923 num_units = v.get("config-units") or 1
1924 break
1925 if vca_type != "k8s_proxy_charm":
1926 await self.vca_map[vca_type].install_configuration_sw(
1927 ee_id=ee_id,
1928 artifact_path=artifact_path,
1929 db_dict=db_dict,
1930 config=config,
1931 num_units=num_units,
1932 vca_id=vca_id,
1933 vca_type=vca_type,
1934 )
1935
1936 # write in db flag of configuration_sw already installed
1937 self.update_db_2(
1938 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1939 )
1940
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self._add_vca_relations(
1943 logging_text=logging_text,
1944 nsr_id=nsr_id,
1945 vca_type=vca_type,
1946 vca_index=vca_index,
1947 )
1948
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1952 pub_key = None
1953 user = None
1954 # self.logger.debug("get ssh key block")
1955 if deep_get(
1956 config_descriptor, ("config-access", "ssh-access", "required")
1957 ):
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1960 user = deep_get(
1961 config_descriptor,
1962 ("config-access", "ssh-access", "default-user"),
1963 )
1964 step = "Install configuration Software, getting public ssh key"
1965 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1966 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1967 )
1968
1969 step = "Insert public key into VM user={} ssh_key={}".format(
1970 user, pub_key
1971 )
1972 else:
1973 # self.logger.debug("no need to get ssh key")
1974 step = "Waiting to VM being up and getting IP address"
1975 self.logger.debug(logging_text + step)
1976
1977 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1978 rw_mgmt_ip = None
1979
1980 # n2vc_redesign STEP 5.1
1981 # wait for RO (ip-address) Insert pub_key into VM
1982 if vnfr_id:
1983 if kdu_name:
1984 rw_mgmt_ip = await self.wait_kdu_up(
1985 logging_text, nsr_id, vnfr_id, kdu_name
1986 )
1987
1988 # This verification is needed in order to avoid trying to add a public key
1989 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1990 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1991 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1992 # or it is a KNF)
1993 elif db_vnfr.get('vdur'):
1994 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1995 logging_text,
1996 nsr_id,
1997 vnfr_id,
1998 vdu_id,
1999 vdu_index,
2000 user=user,
2001 pub_key=pub_key,
2002 )
2003
2004 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2005
2006 # store rw_mgmt_ip in deploy params for later replacement
2007 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2008
2009 # n2vc_redesign STEP 6 Execute initial config primitive
2010 step = "execute initial config primitive"
2011
2012 # wait for dependent primitives execution (NS -> VNF -> VDU)
2013 if initial_config_primitive_list:
2014 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2015
2016 # stage, in function of element type: vdu, kdu, vnf or ns
2017 my_vca = vca_deployed_list[vca_index]
2018 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2019 # VDU or KDU
2020 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2021 elif my_vca.get("member-vnf-index"):
2022 # VNF
2023 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2024 else:
2025 # NS
2026 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2027
2028 self._write_configuration_status(
2029 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2030 )
2031
2032 self._write_op_status(op_id=nslcmop_id, stage=stage)
2033
2034 check_if_terminated_needed = True
2035 for initial_config_primitive in initial_config_primitive_list:
2036 # adding information on the vca_deployed if it is a NS execution environment
2037 if not vca_deployed["member-vnf-index"]:
2038 deploy_params["ns_config_info"] = json.dumps(
2039 self._get_ns_config_info(nsr_id)
2040 )
2041 # TODO check if already done
2042 primitive_params_ = self._map_primitive_params(
2043 initial_config_primitive, {}, deploy_params
2044 )
2045
2046 step = "execute primitive '{}' params '{}'".format(
2047 initial_config_primitive["name"], primitive_params_
2048 )
2049 self.logger.debug(logging_text + step)
2050 await self.vca_map[vca_type].exec_primitive(
2051 ee_id=ee_id,
2052 primitive_name=initial_config_primitive["name"],
2053 params_dict=primitive_params_,
2054 db_dict=db_dict,
2055 vca_id=vca_id,
2056 vca_type=vca_type,
2057 )
2058 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2059 if check_if_terminated_needed:
2060 if config_descriptor.get("terminate-config-primitive"):
2061 self.update_db_2(
2062 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2063 )
2064 check_if_terminated_needed = False
2065
2066 # TODO register in database that primitive is done
2067
2068 # STEP 7 Configure metrics
2069 if vca_type == "helm" or vca_type == "helm-v3":
2070 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2071 ee_id=ee_id,
2072 artifact_path=artifact_path,
2073 ee_config_descriptor=ee_config_descriptor,
2074 vnfr_id=vnfr_id,
2075 nsr_id=nsr_id,
2076 target_ip=rw_mgmt_ip,
2077 )
2078 if prometheus_jobs:
2079 self.update_db_2(
2080 "nsrs",
2081 nsr_id,
2082 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2083 )
2084
2085 for job in prometheus_jobs:
2086 self.db.set_one(
2087 "prometheus_jobs",
2088 {
2089 "job_name": job["job_name"]
2090 },
2091 job,
2092 upsert=True,
2093 fail_on_empty=False,
2094 )
2095
2096 step = "instantiated at VCA"
2097 self.logger.debug(logging_text + step)
2098
2099 self._write_configuration_status(
2100 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2101 )
2102
2103 except Exception as e: # TODO not use Exception but N2VC exception
2104 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2105 if not isinstance(
2106 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2107 ):
2108 self.logger.error(
2109 "Exception while {} : {}".format(step, e), exc_info=True
2110 )
2111 self._write_configuration_status(
2112 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2113 )
2114 raise LcmException("{} {}".format(step, e)) from e
2115
2116 def _write_ns_status(
2117 self,
2118 nsr_id: str,
2119 ns_state: str,
2120 current_operation: str,
2121 current_operation_id: str,
2122 error_description: str = None,
2123 error_detail: str = None,
2124 other_update: dict = None,
2125 ):
2126 """
2127 Update db_nsr fields.
2128 :param nsr_id:
2129 :param ns_state:
2130 :param current_operation:
2131 :param current_operation_id:
2132 :param error_description:
2133 :param error_detail:
2134 :param other_update: Other required changes at database if provided, will be cleared
2135 :return:
2136 """
2137 try:
2138 db_dict = other_update or {}
2139 db_dict[
2140 "_admin.nslcmop"
2141 ] = current_operation_id # for backward compatibility
2142 db_dict["_admin.current-operation"] = current_operation_id
2143 db_dict["_admin.operation-type"] = (
2144 current_operation if current_operation != "IDLE" else None
2145 )
2146 db_dict["currentOperation"] = current_operation
2147 db_dict["currentOperationID"] = current_operation_id
2148 db_dict["errorDescription"] = error_description
2149 db_dict["errorDetail"] = error_detail
2150
2151 if ns_state:
2152 db_dict["nsState"] = ns_state
2153 self.update_db_2("nsrs", nsr_id, db_dict)
2154 except DbException as e:
2155 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2156
2157 def _write_op_status(
2158 self,
2159 op_id: str,
2160 stage: list = None,
2161 error_message: str = None,
2162 queuePosition: int = 0,
2163 operation_state: str = None,
2164 other_update: dict = None,
2165 ):
2166 try:
2167 db_dict = other_update or {}
2168 db_dict["queuePosition"] = queuePosition
2169 if isinstance(stage, list):
2170 db_dict["stage"] = stage[0]
2171 db_dict["detailed-status"] = " ".join(stage)
2172 elif stage is not None:
2173 db_dict["stage"] = str(stage)
2174
2175 if error_message is not None:
2176 db_dict["errorMessage"] = error_message
2177 if operation_state is not None:
2178 db_dict["operationState"] = operation_state
2179 db_dict["statusEnteredTime"] = time()
2180 self.update_db_2("nslcmops", op_id, db_dict)
2181 except DbException as e:
2182 self.logger.warn(
2183 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2184 )
2185
2186 def _write_all_config_status(self, db_nsr: dict, status: str):
2187 try:
2188 nsr_id = db_nsr["_id"]
2189 # configurationStatus
2190 config_status = db_nsr.get("configurationStatus")
2191 if config_status:
2192 db_nsr_update = {
2193 "configurationStatus.{}.status".format(index): status
2194 for index, v in enumerate(config_status)
2195 if v
2196 }
2197 # update status
2198 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2199
2200 except DbException as e:
2201 self.logger.warn(
2202 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2203 )
2204
2205 def _write_configuration_status(
2206 self,
2207 nsr_id: str,
2208 vca_index: int,
2209 status: str = None,
2210 element_under_configuration: str = None,
2211 element_type: str = None,
2212 other_update: dict = None,
2213 ):
2214
2215 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2216 # .format(vca_index, status))
2217
2218 try:
2219 db_path = "configurationStatus.{}.".format(vca_index)
2220 db_dict = other_update or {}
2221 if status:
2222 db_dict[db_path + "status"] = status
2223 if element_under_configuration:
2224 db_dict[
2225 db_path + "elementUnderConfiguration"
2226 ] = element_under_configuration
2227 if element_type:
2228 db_dict[db_path + "elementType"] = element_type
2229 self.update_db_2("nsrs", nsr_id, db_dict)
2230 except DbException as e:
2231 self.logger.warn(
2232 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2233 status, nsr_id, vca_index, e
2234 )
2235 )
2236
2237 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2238 """
2239 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2240 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2241 Database is used because the result can be obtained from a different LCM worker in case of HA.
2242 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2243 :param db_nslcmop: database content of nslcmop
2244 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2245 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2246 computed 'vim-account-id'
2247 """
2248 modified = False
2249 nslcmop_id = db_nslcmop["_id"]
2250 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2251 if placement_engine == "PLA":
2252 self.logger.debug(
2253 logging_text + "Invoke and wait for placement optimization"
2254 )
2255 await self.msg.aiowrite(
2256 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2257 )
2258 db_poll_interval = 5
2259 wait = db_poll_interval * 10
2260 pla_result = None
2261 while not pla_result and wait >= 0:
2262 await asyncio.sleep(db_poll_interval)
2263 wait -= db_poll_interval
2264 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2265 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2266
2267 if not pla_result:
2268 raise LcmException(
2269 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2270 )
2271
2272 for pla_vnf in pla_result["vnf"]:
2273 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2274 if not pla_vnf.get("vimAccountId") or not vnfr:
2275 continue
2276 modified = True
2277 self.db.set_one(
2278 "vnfrs",
2279 {"_id": vnfr["_id"]},
2280 {"vim-account-id": pla_vnf["vimAccountId"]},
2281 )
2282 # Modifies db_vnfrs
2283 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2284 return modified
2285
2286 def update_nsrs_with_pla_result(self, params):
2287 try:
2288 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2289 self.update_db_2(
2290 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2291 )
2292 except Exception as e:
2293 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2294
2295 async def instantiate(self, nsr_id, nslcmop_id):
2296 """
2297
2298 :param nsr_id: ns instance to deploy
2299 :param nslcmop_id: operation to run
2300 :return:
2301 """
2302
2303 # Try to lock HA task here
2304 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2305 if not task_is_locked_by_me:
2306 self.logger.debug(
2307 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2308 )
2309 return
2310
2311 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2312 self.logger.debug(logging_text + "Enter")
2313
2314 # get all needed from database
2315
2316 # database nsrs record
2317 db_nsr = None
2318
2319 # database nslcmops record
2320 db_nslcmop = None
2321
2322 # update operation on nsrs
2323 db_nsr_update = {}
2324 # update operation on nslcmops
2325 db_nslcmop_update = {}
2326
2327 nslcmop_operation_state = None
2328 db_vnfrs = {} # vnf's info indexed by member-index
2329 # n2vc_info = {}
2330 tasks_dict_info = {} # from task to info text
2331 exc = None
2332 error_list = []
2333 stage = [
2334 "Stage 1/5: preparation of the environment.",
2335 "Waiting for previous operations to terminate.",
2336 "",
2337 ]
2338 # ^ stage, step, VIM progress
2339 try:
2340 # wait for any previous tasks in process
2341 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2342
2343 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2344 stage[1] = "Reading from database."
2345 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2346 db_nsr_update["detailed-status"] = "creating"
2347 db_nsr_update["operational-status"] = "init"
2348 self._write_ns_status(
2349 nsr_id=nsr_id,
2350 ns_state="BUILDING",
2351 current_operation="INSTANTIATING",
2352 current_operation_id=nslcmop_id,
2353 other_update=db_nsr_update,
2354 )
2355 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2356
2357 # read from db: operation
2358 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2359 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2360 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2361 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2362 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2363 )
2364 ns_params = db_nslcmop.get("operationParams")
2365 if ns_params and ns_params.get("timeout_ns_deploy"):
2366 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2367 else:
2368 timeout_ns_deploy = self.timeout.get(
2369 "ns_deploy", self.timeout_ns_deploy
2370 )
2371
2372 # read from db: ns
2373 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2374 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2375 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2376 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2377 self.fs.sync(db_nsr["nsd-id"])
2378 db_nsr["nsd"] = nsd
2379 # nsr_name = db_nsr["name"] # TODO short-name??
2380
2381 # read from db: vnf's of this ns
2382 stage[1] = "Getting vnfrs from db."
2383 self.logger.debug(logging_text + stage[1])
2384 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2385
2386 # read from db: vnfd's for every vnf
2387 db_vnfds = [] # every vnfd data
2388
2389 # for each vnf in ns, read vnfd
2390 for vnfr in db_vnfrs_list:
2391 if vnfr.get("kdur"):
2392 kdur_list = []
2393 for kdur in vnfr["kdur"]:
2394 if kdur.get("additionalParams"):
2395 kdur["additionalParams"] = json.loads(
2396 kdur["additionalParams"]
2397 )
2398 kdur_list.append(kdur)
2399 vnfr["kdur"] = kdur_list
2400
2401 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2402 vnfd_id = vnfr["vnfd-id"]
2403 vnfd_ref = vnfr["vnfd-ref"]
2404 self.fs.sync(vnfd_id)
2405
2406 # if we haven't this vnfd, read it from db
2407 if vnfd_id not in db_vnfds:
2408 # read from db
2409 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2410 vnfd_id, vnfd_ref
2411 )
2412 self.logger.debug(logging_text + stage[1])
2413 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2414
2415 # store vnfd
2416 db_vnfds.append(vnfd)
2417
2418 # Get or generates the _admin.deployed.VCA list
2419 vca_deployed_list = None
2420 if db_nsr["_admin"].get("deployed"):
2421 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2422 if vca_deployed_list is None:
2423 vca_deployed_list = []
2424 configuration_status_list = []
2425 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2426 db_nsr_update["configurationStatus"] = configuration_status_list
2427 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2428 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2429 elif isinstance(vca_deployed_list, dict):
2430 # maintain backward compatibility. Change a dict to list at database
2431 vca_deployed_list = list(vca_deployed_list.values())
2432 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2433 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2434
2435 if not isinstance(
2436 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2437 ):
2438 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2439 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2440
2441 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2442 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2443 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2444 self.db.set_list(
2445 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2446 )
2447
2448 # n2vc_redesign STEP 2 Deploy Network Scenario
2449 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2450 self._write_op_status(op_id=nslcmop_id, stage=stage)
2451
2452 stage[1] = "Deploying KDUs."
2453 # self.logger.debug(logging_text + "Before deploy_kdus")
2454 # Call to deploy_kdus in case exists the "vdu:kdu" param
2455 await self.deploy_kdus(
2456 logging_text=logging_text,
2457 nsr_id=nsr_id,
2458 nslcmop_id=nslcmop_id,
2459 db_vnfrs=db_vnfrs,
2460 db_vnfds=db_vnfds,
2461 task_instantiation_info=tasks_dict_info,
2462 )
2463
2464 stage[1] = "Getting VCA public key."
2465 # n2vc_redesign STEP 1 Get VCA public ssh-key
2466 # feature 1429. Add n2vc public key to needed VMs
2467 n2vc_key = self.n2vc.get_public_key()
2468 n2vc_key_list = [n2vc_key]
2469 if self.vca_config.get("public_key"):
2470 n2vc_key_list.append(self.vca_config["public_key"])
2471
2472 stage[1] = "Deploying NS at VIM."
2473 task_ro = asyncio.ensure_future(
2474 self.instantiate_RO(
2475 logging_text=logging_text,
2476 nsr_id=nsr_id,
2477 nsd=nsd,
2478 db_nsr=db_nsr,
2479 db_nslcmop=db_nslcmop,
2480 db_vnfrs=db_vnfrs,
2481 db_vnfds=db_vnfds,
2482 n2vc_key_list=n2vc_key_list,
2483 stage=stage,
2484 )
2485 )
2486 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2487 tasks_dict_info[task_ro] = "Deploying at VIM"
2488
2489 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2490 stage[1] = "Deploying Execution Environments."
2491 self.logger.debug(logging_text + stage[1])
2492
2493 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2494 for vnf_profile in get_vnf_profiles(nsd):
2495 vnfd_id = vnf_profile["vnfd-id"]
2496 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2497 member_vnf_index = str(vnf_profile["id"])
2498 db_vnfr = db_vnfrs[member_vnf_index]
2499 base_folder = vnfd["_admin"]["storage"]
2500 vdu_id = None
2501 vdu_index = 0
2502 vdu_name = None
2503 kdu_name = None
2504
2505 # Get additional parameters
2506 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2507 if db_vnfr.get("additionalParamsForVnf"):
2508 deploy_params.update(
2509 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2510 )
2511
2512 descriptor_config = get_configuration(vnfd, vnfd["id"])
2513 if descriptor_config:
2514 self._deploy_n2vc(
2515 logging_text=logging_text
2516 + "member_vnf_index={} ".format(member_vnf_index),
2517 db_nsr=db_nsr,
2518 db_vnfr=db_vnfr,
2519 nslcmop_id=nslcmop_id,
2520 nsr_id=nsr_id,
2521 nsi_id=nsi_id,
2522 vnfd_id=vnfd_id,
2523 vdu_id=vdu_id,
2524 kdu_name=kdu_name,
2525 member_vnf_index=member_vnf_index,
2526 vdu_index=vdu_index,
2527 vdu_name=vdu_name,
2528 deploy_params=deploy_params,
2529 descriptor_config=descriptor_config,
2530 base_folder=base_folder,
2531 task_instantiation_info=tasks_dict_info,
2532 stage=stage,
2533 )
2534
2535 # Deploy charms for each VDU that supports one.
2536 for vdud in get_vdu_list(vnfd):
2537 vdu_id = vdud["id"]
2538 descriptor_config = get_configuration(vnfd, vdu_id)
2539 vdur = find_in_list(
2540 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2541 )
2542
2543 if vdur.get("additionalParams"):
2544 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2545 else:
2546 deploy_params_vdu = deploy_params
2547 deploy_params_vdu["OSM"] = get_osm_params(
2548 db_vnfr, vdu_id, vdu_count_index=0
2549 )
2550 vdud_count = get_number_of_instances(vnfd, vdu_id)
2551
2552 self.logger.debug("VDUD > {}".format(vdud))
2553 self.logger.debug(
2554 "Descriptor config > {}".format(descriptor_config)
2555 )
2556 if descriptor_config:
2557 vdu_name = None
2558 kdu_name = None
2559 for vdu_index in range(vdud_count):
2560 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2561 self._deploy_n2vc(
2562 logging_text=logging_text
2563 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2564 member_vnf_index, vdu_id, vdu_index
2565 ),
2566 db_nsr=db_nsr,
2567 db_vnfr=db_vnfr,
2568 nslcmop_id=nslcmop_id,
2569 nsr_id=nsr_id,
2570 nsi_id=nsi_id,
2571 vnfd_id=vnfd_id,
2572 vdu_id=vdu_id,
2573 kdu_name=kdu_name,
2574 member_vnf_index=member_vnf_index,
2575 vdu_index=vdu_index,
2576 vdu_name=vdu_name,
2577 deploy_params=deploy_params_vdu,
2578 descriptor_config=descriptor_config,
2579 base_folder=base_folder,
2580 task_instantiation_info=tasks_dict_info,
2581 stage=stage,
2582 )
2583 for kdud in get_kdu_list(vnfd):
2584 kdu_name = kdud["name"]
2585 descriptor_config = get_configuration(vnfd, kdu_name)
2586 if descriptor_config:
2587 vdu_id = None
2588 vdu_index = 0
2589 vdu_name = None
2590 kdur = next(
2591 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2592 )
2593 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2594 if kdur.get("additionalParams"):
2595 deploy_params_kdu.update(
2596 parse_yaml_strings(kdur["additionalParams"].copy())
2597 )
2598
2599 self._deploy_n2vc(
2600 logging_text=logging_text,
2601 db_nsr=db_nsr,
2602 db_vnfr=db_vnfr,
2603 nslcmop_id=nslcmop_id,
2604 nsr_id=nsr_id,
2605 nsi_id=nsi_id,
2606 vnfd_id=vnfd_id,
2607 vdu_id=vdu_id,
2608 kdu_name=kdu_name,
2609 member_vnf_index=member_vnf_index,
2610 vdu_index=vdu_index,
2611 vdu_name=vdu_name,
2612 deploy_params=deploy_params_kdu,
2613 descriptor_config=descriptor_config,
2614 base_folder=base_folder,
2615 task_instantiation_info=tasks_dict_info,
2616 stage=stage,
2617 )
2618
2619 # Check if this NS has a charm configuration
2620 descriptor_config = nsd.get("ns-configuration")
2621 if descriptor_config and descriptor_config.get("juju"):
2622 vnfd_id = None
2623 db_vnfr = None
2624 member_vnf_index = None
2625 vdu_id = None
2626 kdu_name = None
2627 vdu_index = 0
2628 vdu_name = None
2629
2630 # Get additional parameters
2631 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2632 if db_nsr.get("additionalParamsForNs"):
2633 deploy_params.update(
2634 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2635 )
2636 base_folder = nsd["_admin"]["storage"]
2637 self._deploy_n2vc(
2638 logging_text=logging_text,
2639 db_nsr=db_nsr,
2640 db_vnfr=db_vnfr,
2641 nslcmop_id=nslcmop_id,
2642 nsr_id=nsr_id,
2643 nsi_id=nsi_id,
2644 vnfd_id=vnfd_id,
2645 vdu_id=vdu_id,
2646 kdu_name=kdu_name,
2647 member_vnf_index=member_vnf_index,
2648 vdu_index=vdu_index,
2649 vdu_name=vdu_name,
2650 deploy_params=deploy_params,
2651 descriptor_config=descriptor_config,
2652 base_folder=base_folder,
2653 task_instantiation_info=tasks_dict_info,
2654 stage=stage,
2655 )
2656
2657 # rest of staff will be done at finally
2658
2659 except (
2660 ROclient.ROClientException,
2661 DbException,
2662 LcmException,
2663 N2VCException,
2664 ) as e:
2665 self.logger.error(
2666 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2667 )
2668 exc = e
2669 except asyncio.CancelledError:
2670 self.logger.error(
2671 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2672 )
2673 exc = "Operation was cancelled"
2674 except Exception as e:
2675 exc = traceback.format_exc()
2676 self.logger.critical(
2677 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2678 exc_info=True,
2679 )
2680 finally:
2681 if exc:
2682 error_list.append(str(exc))
2683 try:
2684 # wait for pending tasks
2685 if tasks_dict_info:
2686 stage[1] = "Waiting for instantiate pending tasks."
2687 self.logger.debug(logging_text + stage[1])
2688 error_list += await self._wait_for_tasks(
2689 logging_text,
2690 tasks_dict_info,
2691 timeout_ns_deploy,
2692 stage,
2693 nslcmop_id,
2694 nsr_id=nsr_id,
2695 )
2696 stage[1] = stage[2] = ""
2697 except asyncio.CancelledError:
2698 error_list.append("Cancelled")
2699 # TODO cancel all tasks
2700 except Exception as exc:
2701 error_list.append(str(exc))
2702
2703 # update operation-status
2704 db_nsr_update["operational-status"] = "running"
2705 # let's begin with VCA 'configured' status (later we can change it)
2706 db_nsr_update["config-status"] = "configured"
2707 for task, task_name in tasks_dict_info.items():
2708 if not task.done() or task.cancelled() or task.exception():
2709 if task_name.startswith(self.task_name_deploy_vca):
2710 # A N2VC task is pending
2711 db_nsr_update["config-status"] = "failed"
2712 else:
2713 # RO or KDU task is pending
2714 db_nsr_update["operational-status"] = "failed"
2715
2716 # update status at database
2717 if error_list:
2718 error_detail = ". ".join(error_list)
2719 self.logger.error(logging_text + error_detail)
2720 error_description_nslcmop = "{} Detail: {}".format(
2721 stage[0], error_detail
2722 )
2723 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2724 nslcmop_id, stage[0]
2725 )
2726
2727 db_nsr_update["detailed-status"] = (
2728 error_description_nsr + " Detail: " + error_detail
2729 )
2730 db_nslcmop_update["detailed-status"] = error_detail
2731 nslcmop_operation_state = "FAILED"
2732 ns_state = "BROKEN"
2733 else:
2734 error_detail = None
2735 error_description_nsr = error_description_nslcmop = None
2736 ns_state = "READY"
2737 db_nsr_update["detailed-status"] = "Done"
2738 db_nslcmop_update["detailed-status"] = "Done"
2739 nslcmop_operation_state = "COMPLETED"
2740
2741 if db_nsr:
2742 self._write_ns_status(
2743 nsr_id=nsr_id,
2744 ns_state=ns_state,
2745 current_operation="IDLE",
2746 current_operation_id=None,
2747 error_description=error_description_nsr,
2748 error_detail=error_detail,
2749 other_update=db_nsr_update,
2750 )
2751 self._write_op_status(
2752 op_id=nslcmop_id,
2753 stage="",
2754 error_message=error_description_nslcmop,
2755 operation_state=nslcmop_operation_state,
2756 other_update=db_nslcmop_update,
2757 )
2758
2759 if nslcmop_operation_state:
2760 try:
2761 await self.msg.aiowrite(
2762 "ns",
2763 "instantiated",
2764 {
2765 "nsr_id": nsr_id,
2766 "nslcmop_id": nslcmop_id,
2767 "operationState": nslcmop_operation_state,
2768 },
2769 loop=self.loop,
2770 )
2771 except Exception as e:
2772 self.logger.error(
2773 logging_text + "kafka_write notification Exception {}".format(e)
2774 )
2775
2776 self.logger.debug(logging_text + "Exit")
2777 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2778
2779 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2780 if vnfd_id not in cached_vnfds:
2781 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2782 return cached_vnfds[vnfd_id]
2783
2784 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2785 if vnf_profile_id not in cached_vnfrs:
2786 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2787 "vnfrs",
2788 {
2789 "member-vnf-index-ref": vnf_profile_id,
2790 "nsr-id-ref": nsr_id,
2791 },
2792 )
2793 return cached_vnfrs[vnf_profile_id]
2794
2795 def _is_deployed_vca_in_relation(
2796 self, vca: DeployedVCA, relation: Relation
2797 ) -> bool:
2798 found = False
2799 for endpoint in (relation.provider, relation.requirer):
2800 if endpoint["kdu-resource-profile-id"]:
2801 continue
2802 found = (
2803 vca.vnf_profile_id == endpoint.vnf_profile_id
2804 and vca.vdu_profile_id == endpoint.vdu_profile_id
2805 and vca.execution_environment_ref == endpoint.execution_environment_ref
2806 )
2807 if found:
2808 break
2809 return found
2810
2811 def _update_ee_relation_data_with_implicit_data(
2812 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2813 ):
2814 ee_relation_data = safe_get_ee_relation(
2815 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2816 )
2817 ee_relation_level = EELevel.get_level(ee_relation_data)
2818 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2819 "execution-environment-ref"
2820 ]:
2821 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2822 vnfd_id = vnf_profile["vnfd-id"]
2823 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2824 entity_id = (
2825 vnfd_id
2826 if ee_relation_level == EELevel.VNF
2827 else ee_relation_data["vdu-profile-id"]
2828 )
2829 ee = get_juju_ee_ref(db_vnfd, entity_id)
2830 if not ee:
2831 raise Exception(
2832 f"not execution environments found for ee_relation {ee_relation_data}"
2833 )
2834 ee_relation_data["execution-environment-ref"] = ee["id"]
2835 return ee_relation_data
2836
2837 def _get_ns_relations(
2838 self,
2839 nsr_id: str,
2840 nsd: Dict[str, Any],
2841 vca: DeployedVCA,
2842 cached_vnfds: Dict[str, Any],
2843 ) -> List[Relation]:
2844 relations = []
2845 db_ns_relations = get_ns_configuration_relation_list(nsd)
2846 for r in db_ns_relations:
2847 provider_dict = None
2848 requirer_dict = None
2849 if all(key in r for key in ("provider", "requirer")):
2850 provider_dict = r["provider"]
2851 requirer_dict = r["requirer"]
2852 elif "entities" in r:
2853 provider_id = r["entities"][0]["id"]
2854 provider_dict = {
2855 "nsr-id": nsr_id,
2856 "endpoint": r["entities"][0]["endpoint"],
2857 }
2858 if provider_id != nsd["id"]:
2859 provider_dict["vnf-profile-id"] = provider_id
2860 requirer_id = r["entities"][1]["id"]
2861 requirer_dict = {
2862 "nsr-id": nsr_id,
2863 "endpoint": r["entities"][1]["endpoint"],
2864 }
2865 if requirer_id != nsd["id"]:
2866 requirer_dict["vnf-profile-id"] = requirer_id
2867 else:
2868 raise Exception("provider/requirer or entities must be included in the relation.")
2869 relation_provider = self._update_ee_relation_data_with_implicit_data(
2870 nsr_id, nsd, provider_dict, cached_vnfds
2871 )
2872 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2873 nsr_id, nsd, requirer_dict, cached_vnfds
2874 )
2875 provider = EERelation(relation_provider)
2876 requirer = EERelation(relation_requirer)
2877 relation = Relation(r["name"], provider, requirer)
2878 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2879 if vca_in_relation:
2880 relations.append(relation)
2881 return relations
2882
2883 def _get_vnf_relations(
2884 self,
2885 nsr_id: str,
2886 nsd: Dict[str, Any],
2887 vca: DeployedVCA,
2888 cached_vnfds: Dict[str, Any],
2889 ) -> List[Relation]:
2890 relations = []
2891 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2892 vnf_profile_id = vnf_profile["id"]
2893 vnfd_id = vnf_profile["vnfd-id"]
2894 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2895 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2896 for r in db_vnf_relations:
2897 provider_dict = None
2898 requirer_dict = None
2899 if all(key in r for key in ("provider", "requirer")):
2900 provider_dict = r["provider"]
2901 requirer_dict = r["requirer"]
2902 elif "entities" in r:
2903 provider_id = r["entities"][0]["id"]
2904 provider_dict = {
2905 "nsr-id": nsr_id,
2906 "vnf-profile-id": vnf_profile_id,
2907 "endpoint": r["entities"][0]["endpoint"],
2908 }
2909 if provider_id != vnfd_id:
2910 provider_dict["vdu-profile-id"] = provider_id
2911 requirer_id = r["entities"][1]["id"]
2912 requirer_dict = {
2913 "nsr-id": nsr_id,
2914 "vnf-profile-id": vnf_profile_id,
2915 "endpoint": r["entities"][1]["endpoint"],
2916 }
2917 if requirer_id != vnfd_id:
2918 requirer_dict["vdu-profile-id"] = requirer_id
2919 else:
2920 raise Exception("provider/requirer or entities must be included in the relation.")
2921 relation_provider = self._update_ee_relation_data_with_implicit_data(
2922 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2923 )
2924 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2925 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2926 )
2927 provider = EERelation(relation_provider)
2928 requirer = EERelation(relation_requirer)
2929 relation = Relation(r["name"], provider, requirer)
2930 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2931 if vca_in_relation:
2932 relations.append(relation)
2933 return relations
2934
2935 def _get_kdu_resource_data(
2936 self,
2937 ee_relation: EERelation,
2938 db_nsr: Dict[str, Any],
2939 cached_vnfds: Dict[str, Any],
2940 ) -> DeployedK8sResource:
2941 nsd = get_nsd(db_nsr)
2942 vnf_profiles = get_vnf_profiles(nsd)
2943 vnfd_id = find_in_list(
2944 vnf_profiles,
2945 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2946 )["vnfd-id"]
2947 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2948 kdu_resource_profile = get_kdu_resource_profile(
2949 db_vnfd, ee_relation.kdu_resource_profile_id
2950 )
2951 kdu_name = kdu_resource_profile["kdu-name"]
2952 deployed_kdu, _ = get_deployed_kdu(
2953 db_nsr.get("_admin", ()).get("deployed", ()),
2954 kdu_name,
2955 ee_relation.vnf_profile_id,
2956 )
2957 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2958 return deployed_kdu
2959
2960 def _get_deployed_component(
2961 self,
2962 ee_relation: EERelation,
2963 db_nsr: Dict[str, Any],
2964 cached_vnfds: Dict[str, Any],
2965 ) -> DeployedComponent:
2966 nsr_id = db_nsr["_id"]
2967 deployed_component = None
2968 ee_level = EELevel.get_level(ee_relation)
2969 if ee_level == EELevel.NS:
2970 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2971 if vca:
2972 deployed_component = DeployedVCA(nsr_id, vca)
2973 elif ee_level == EELevel.VNF:
2974 vca = get_deployed_vca(
2975 db_nsr,
2976 {
2977 "vdu_id": None,
2978 "member-vnf-index": ee_relation.vnf_profile_id,
2979 "ee_descriptor_id": ee_relation.execution_environment_ref,
2980 },
2981 )
2982 if vca:
2983 deployed_component = DeployedVCA(nsr_id, vca)
2984 elif ee_level == EELevel.VDU:
2985 vca = get_deployed_vca(
2986 db_nsr,
2987 {
2988 "vdu_id": ee_relation.vdu_profile_id,
2989 "member-vnf-index": ee_relation.vnf_profile_id,
2990 "ee_descriptor_id": ee_relation.execution_environment_ref,
2991 },
2992 )
2993 if vca:
2994 deployed_component = DeployedVCA(nsr_id, vca)
2995 elif ee_level == EELevel.KDU:
2996 kdu_resource_data = self._get_kdu_resource_data(
2997 ee_relation, db_nsr, cached_vnfds
2998 )
2999 if kdu_resource_data:
3000 deployed_component = DeployedK8sResource(kdu_resource_data)
3001 return deployed_component
3002
3003 async def _add_relation(
3004 self,
3005 relation: Relation,
3006 vca_type: str,
3007 db_nsr: Dict[str, Any],
3008 cached_vnfds: Dict[str, Any],
3009 cached_vnfrs: Dict[str, Any],
3010 ) -> bool:
3011 deployed_provider = self._get_deployed_component(
3012 relation.provider, db_nsr, cached_vnfds
3013 )
3014 deployed_requirer = self._get_deployed_component(
3015 relation.requirer, db_nsr, cached_vnfds
3016 )
3017 if (
3018 deployed_provider
3019 and deployed_requirer
3020 and deployed_provider.config_sw_installed
3021 and deployed_requirer.config_sw_installed
3022 ):
3023 provider_db_vnfr = (
3024 self._get_vnfr(
3025 relation.provider.nsr_id,
3026 relation.provider.vnf_profile_id,
3027 cached_vnfrs,
3028 )
3029 if relation.provider.vnf_profile_id
3030 else None
3031 )
3032 requirer_db_vnfr = (
3033 self._get_vnfr(
3034 relation.requirer.nsr_id,
3035 relation.requirer.vnf_profile_id,
3036 cached_vnfrs,
3037 )
3038 if relation.requirer.vnf_profile_id
3039 else None
3040 )
3041 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3042 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3043 provider_relation_endpoint = RelationEndpoint(
3044 deployed_provider.ee_id,
3045 provider_vca_id,
3046 relation.provider.endpoint,
3047 )
3048 requirer_relation_endpoint = RelationEndpoint(
3049 deployed_requirer.ee_id,
3050 requirer_vca_id,
3051 relation.requirer.endpoint,
3052 )
3053 await self.vca_map[vca_type].add_relation(
3054 provider=provider_relation_endpoint,
3055 requirer=requirer_relation_endpoint,
3056 )
3057 # remove entry from relations list
3058 return True
3059 return False
3060
3061 async def _add_vca_relations(
3062 self,
3063 logging_text,
3064 nsr_id,
3065 vca_type: str,
3066 vca_index: int,
3067 timeout: int = 3600,
3068 ) -> bool:
3069
3070 # steps:
3071 # 1. find all relations for this VCA
3072 # 2. wait for other peers related
3073 # 3. add relations
3074
3075 try:
3076 # STEP 1: find all relations for this VCA
3077
3078 # read nsr record
3079 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3080 nsd = get_nsd(db_nsr)
3081
3082 # this VCA data
3083 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3084 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3085
3086 cached_vnfds = {}
3087 cached_vnfrs = {}
3088 relations = []
3089 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3090 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3091
3092 # if no relations, terminate
3093 if not relations:
3094 self.logger.debug(logging_text + " No relations")
3095 return True
3096
3097 self.logger.debug(logging_text + " adding relations {}".format(relations))
3098
3099 # add all relations
3100 start = time()
3101 while True:
3102 # check timeout
3103 now = time()
3104 if now - start >= timeout:
3105 self.logger.error(logging_text + " : timeout adding relations")
3106 return False
3107
3108 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3109 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3110
3111 # for each relation, find the VCA's related
3112 for relation in relations.copy():
3113 added = await self._add_relation(
3114 relation,
3115 vca_type,
3116 db_nsr,
3117 cached_vnfds,
3118 cached_vnfrs,
3119 )
3120 if added:
3121 relations.remove(relation)
3122
3123 if not relations:
3124 self.logger.debug("Relations added")
3125 break
3126 await asyncio.sleep(5.0)
3127
3128 return True
3129
3130 except Exception as e:
3131 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3132 return False
3133
3134 async def _install_kdu(
3135 self,
3136 nsr_id: str,
3137 nsr_db_path: str,
3138 vnfr_data: dict,
3139 kdu_index: int,
3140 kdud: dict,
3141 vnfd: dict,
3142 k8s_instance_info: dict,
3143 k8params: dict = None,
3144 timeout: int = 600,
3145 vca_id: str = None,
3146 ):
3147
3148 try:
3149 k8sclustertype = k8s_instance_info["k8scluster-type"]
3150 # Instantiate kdu
3151 db_dict_install = {
3152 "collection": "nsrs",
3153 "filter": {"_id": nsr_id},
3154 "path": nsr_db_path,
3155 }
3156
3157 if k8s_instance_info.get("kdu-deployment-name"):
3158 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3159 else:
3160 kdu_instance = self.k8scluster_map[
3161 k8sclustertype
3162 ].generate_kdu_instance_name(
3163 db_dict=db_dict_install,
3164 kdu_model=k8s_instance_info["kdu-model"],
3165 kdu_name=k8s_instance_info["kdu-name"],
3166 )
3167
3168 # Update the nsrs table with the kdu-instance value
3169 self.update_db_2(
3170 item="nsrs",
3171 _id=nsr_id,
3172 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
3173 )
3174
3175 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3176 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3177 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3178 # namespace, this first verification could be removed, and the next step would be done for any kind
3179 # of KNF.
3180 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3181 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3182 if k8sclustertype in ("juju", "juju-bundle"):
3183 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3184 # that the user passed a namespace which he wants its KDU to be deployed in)
3185 if (
3186 self.db.count(
3187 table="nsrs",
3188 q_filter={
3189 "_id": nsr_id,
3190 "_admin.projects_write": k8s_instance_info["namespace"],
3191 "_admin.projects_read": k8s_instance_info["namespace"],
3192 },
3193 )
3194 > 0
3195 ):
3196 self.logger.debug(
3197 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3198 )
3199 self.update_db_2(
3200 item="nsrs",
3201 _id=nsr_id,
3202 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3203 )
3204 k8s_instance_info["namespace"] = kdu_instance
3205
3206 await self.k8scluster_map[k8sclustertype].install(
3207 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3208 kdu_model=k8s_instance_info["kdu-model"],
3209 atomic=True,
3210 params=k8params,
3211 db_dict=db_dict_install,
3212 timeout=timeout,
3213 kdu_name=k8s_instance_info["kdu-name"],
3214 namespace=k8s_instance_info["namespace"],
3215 kdu_instance=kdu_instance,
3216 vca_id=vca_id,
3217 )
3218
3219 # Obtain services to obtain management service ip
3220 services = await self.k8scluster_map[k8sclustertype].get_services(
3221 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3222 kdu_instance=kdu_instance,
3223 namespace=k8s_instance_info["namespace"],
3224 )
3225
3226 # Obtain management service info (if exists)
3227 vnfr_update_dict = {}
3228 kdu_config = get_configuration(vnfd, kdud["name"])
3229 if kdu_config:
3230 target_ee_list = kdu_config.get("execution-environment-list", [])
3231 else:
3232 target_ee_list = []
3233
3234 if services:
3235 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3236 mgmt_services = [
3237 service
3238 for service in kdud.get("service", [])
3239 if service.get("mgmt-service")
3240 ]
3241 for mgmt_service in mgmt_services:
3242 for service in services:
3243 if service["name"].startswith(mgmt_service["name"]):
3244 # Mgmt service found, Obtain service ip
3245 ip = service.get("external_ip", service.get("cluster_ip"))
3246 if isinstance(ip, list) and len(ip) == 1:
3247 ip = ip[0]
3248
3249 vnfr_update_dict[
3250 "kdur.{}.ip-address".format(kdu_index)
3251 ] = ip
3252
3253 # Check if must update also mgmt ip at the vnf
3254 service_external_cp = mgmt_service.get(
3255 "external-connection-point-ref"
3256 )
3257 if service_external_cp:
3258 if (
3259 deep_get(vnfd, ("mgmt-interface", "cp"))
3260 == service_external_cp
3261 ):
3262 vnfr_update_dict["ip-address"] = ip
3263
3264 if find_in_list(
3265 target_ee_list,
3266 lambda ee: ee.get(
3267 "external-connection-point-ref", ""
3268 )
3269 == service_external_cp,
3270 ):
3271 vnfr_update_dict[
3272 "kdur.{}.ip-address".format(kdu_index)
3273 ] = ip
3274 break
3275 else:
3276 self.logger.warn(
3277 "Mgmt service name: {} not found".format(
3278 mgmt_service["name"]
3279 )
3280 )
3281
3282 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3283 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3284
3285 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3286 if (
3287 kdu_config
3288 and kdu_config.get("initial-config-primitive")
3289 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3290 ):
3291 initial_config_primitive_list = kdu_config.get(
3292 "initial-config-primitive"
3293 )
3294 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3295
3296 for initial_config_primitive in initial_config_primitive_list:
3297 primitive_params_ = self._map_primitive_params(
3298 initial_config_primitive, {}, {}
3299 )
3300
3301 await asyncio.wait_for(
3302 self.k8scluster_map[k8sclustertype].exec_primitive(
3303 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3304 kdu_instance=kdu_instance,
3305 primitive_name=initial_config_primitive["name"],
3306 params=primitive_params_,
3307 db_dict=db_dict_install,
3308 vca_id=vca_id,
3309 ),
3310 timeout=timeout,
3311 )
3312
3313 except Exception as e:
3314 # Prepare update db with error and raise exception
3315 try:
3316 self.update_db_2(
3317 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3318 )
3319 self.update_db_2(
3320 "vnfrs",
3321 vnfr_data.get("_id"),
3322 {"kdur.{}.status".format(kdu_index): "ERROR"},
3323 )
3324 except Exception:
3325 # ignore to keep original exception
3326 pass
3327 # reraise original error
3328 raise
3329
3330 return kdu_instance
3331
3332 async def deploy_kdus(
3333 self,
3334 logging_text,
3335 nsr_id,
3336 nslcmop_id,
3337 db_vnfrs,
3338 db_vnfds,
3339 task_instantiation_info,
3340 ):
3341 # Launch kdus if present in the descriptor
3342
3343 k8scluster_id_2_uuic = {
3344 "helm-chart-v3": {},
3345 "helm-chart": {},
3346 "juju-bundle": {},
3347 }
3348
3349 async def _get_cluster_id(cluster_id, cluster_type):
3350 nonlocal k8scluster_id_2_uuic
3351 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3352 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3353
3354 # check if K8scluster is creating and wait look if previous tasks in process
3355 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3356 "k8scluster", cluster_id
3357 )
3358 if task_dependency:
3359 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3360 task_name, cluster_id
3361 )
3362 self.logger.debug(logging_text + text)
3363 await asyncio.wait(task_dependency, timeout=3600)
3364
3365 db_k8scluster = self.db.get_one(
3366 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3367 )
3368 if not db_k8scluster:
3369 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3370
3371 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3372 if not k8s_id:
3373 if cluster_type == "helm-chart-v3":
3374 try:
3375 # backward compatibility for existing clusters that have not been initialized for helm v3
3376 k8s_credentials = yaml.safe_dump(
3377 db_k8scluster.get("credentials")
3378 )
3379 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3380 k8s_credentials, reuse_cluster_uuid=cluster_id
3381 )
3382 db_k8scluster_update = {}
3383 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3384 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3385 db_k8scluster_update[
3386 "_admin.helm-chart-v3.created"
3387 ] = uninstall_sw
3388 db_k8scluster_update[
3389 "_admin.helm-chart-v3.operationalState"
3390 ] = "ENABLED"
3391 self.update_db_2(
3392 "k8sclusters", cluster_id, db_k8scluster_update
3393 )
3394 except Exception as e:
3395 self.logger.error(
3396 logging_text
3397 + "error initializing helm-v3 cluster: {}".format(str(e))
3398 )
3399 raise LcmException(
3400 "K8s cluster '{}' has not been initialized for '{}'".format(
3401 cluster_id, cluster_type
3402 )
3403 )
3404 else:
3405 raise LcmException(
3406 "K8s cluster '{}' has not been initialized for '{}'".format(
3407 cluster_id, cluster_type
3408 )
3409 )
3410 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3411 return k8s_id
3412
3413 logging_text += "Deploy kdus: "
3414 step = ""
3415 try:
3416 db_nsr_update = {"_admin.deployed.K8s": []}
3417 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3418
3419 index = 0
3420 updated_cluster_list = []
3421 updated_v3_cluster_list = []
3422
3423 for vnfr_data in db_vnfrs.values():
3424 vca_id = self.get_vca_id(vnfr_data, {})
3425 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3426 # Step 0: Prepare and set parameters
3427 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3428 vnfd_id = vnfr_data.get("vnfd-id")
3429 vnfd_with_id = find_in_list(
3430 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3431 )
3432 kdud = next(
3433 kdud
3434 for kdud in vnfd_with_id["kdu"]
3435 if kdud["name"] == kdur["kdu-name"]
3436 )
3437 namespace = kdur.get("k8s-namespace")
3438 kdu_deployment_name = kdur.get("kdu-deployment-name")
3439 if kdur.get("helm-chart"):
3440 kdumodel = kdur["helm-chart"]
3441 # Default version: helm3, if helm-version is v2 assign v2
3442 k8sclustertype = "helm-chart-v3"
3443 self.logger.debug("kdur: {}".format(kdur))
3444 if (
3445 kdur.get("helm-version")
3446 and kdur.get("helm-version") == "v2"
3447 ):
3448 k8sclustertype = "helm-chart"
3449 elif kdur.get("juju-bundle"):
3450 kdumodel = kdur["juju-bundle"]
3451 k8sclustertype = "juju-bundle"
3452 else:
3453 raise LcmException(
3454 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3455 "juju-bundle. Maybe an old NBI version is running".format(
3456 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3457 )
3458 )
3459 # check if kdumodel is a file and exists
3460 try:
3461 vnfd_with_id = find_in_list(
3462 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3463 )
3464 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3465 if storage: # may be not present if vnfd has not artifacts
3466 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3467 if storage["pkg-dir"]:
3468 filename = "{}/{}/{}s/{}".format(
3469 storage["folder"],
3470 storage["pkg-dir"],
3471 k8sclustertype,
3472 kdumodel,
3473 )
3474 else:
3475 filename = "{}/Scripts/{}s/{}".format(
3476 storage["folder"],
3477 k8sclustertype,
3478 kdumodel,
3479 )
3480 if self.fs.file_exists(
3481 filename, mode="file"
3482 ) or self.fs.file_exists(filename, mode="dir"):
3483 kdumodel = self.fs.path + filename
3484 except (asyncio.TimeoutError, asyncio.CancelledError):
3485 raise
3486 except Exception: # it is not a file
3487 pass
3488
3489 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3490 step = "Synchronize repos for k8s cluster '{}'".format(
3491 k8s_cluster_id
3492 )
3493 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3494
3495 # Synchronize repos
3496 if (
3497 k8sclustertype == "helm-chart"
3498 and cluster_uuid not in updated_cluster_list
3499 ) or (
3500 k8sclustertype == "helm-chart-v3"
3501 and cluster_uuid not in updated_v3_cluster_list
3502 ):
3503 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3504 self.k8scluster_map[k8sclustertype].synchronize_repos(
3505 cluster_uuid=cluster_uuid
3506 )
3507 )
3508 if del_repo_list or added_repo_dict:
3509 if k8sclustertype == "helm-chart":
3510 unset = {
3511 "_admin.helm_charts_added." + item: None
3512 for item in del_repo_list
3513 }
3514 updated = {
3515 "_admin.helm_charts_added." + item: name
3516 for item, name in added_repo_dict.items()
3517 }
3518 updated_cluster_list.append(cluster_uuid)
3519 elif k8sclustertype == "helm-chart-v3":
3520 unset = {
3521 "_admin.helm_charts_v3_added." + item: None
3522 for item in del_repo_list
3523 }
3524 updated = {
3525 "_admin.helm_charts_v3_added." + item: name
3526 for item, name in added_repo_dict.items()
3527 }
3528 updated_v3_cluster_list.append(cluster_uuid)
3529 self.logger.debug(
3530 logging_text + "repos synchronized on k8s cluster "
3531 "'{}' to_delete: {}, to_add: {}".format(
3532 k8s_cluster_id, del_repo_list, added_repo_dict
3533 )
3534 )
3535 self.db.set_one(
3536 "k8sclusters",
3537 {"_id": k8s_cluster_id},
3538 updated,
3539 unset=unset,
3540 )
3541
3542 # Instantiate kdu
3543 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3544 vnfr_data["member-vnf-index-ref"],
3545 kdur["kdu-name"],
3546 k8s_cluster_id,
3547 )
3548 k8s_instance_info = {
3549 "kdu-instance": None,
3550 "k8scluster-uuid": cluster_uuid,
3551 "k8scluster-type": k8sclustertype,
3552 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3553 "kdu-name": kdur["kdu-name"],
3554 "kdu-model": kdumodel,
3555 "namespace": namespace,
3556 "kdu-deployment-name": kdu_deployment_name,
3557 }
3558 db_path = "_admin.deployed.K8s.{}".format(index)
3559 db_nsr_update[db_path] = k8s_instance_info
3560 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3561 vnfd_with_id = find_in_list(
3562 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3563 )
3564 task = asyncio.ensure_future(
3565 self._install_kdu(
3566 nsr_id,
3567 db_path,
3568 vnfr_data,
3569 kdu_index,
3570 kdud,
3571 vnfd_with_id,
3572 k8s_instance_info,
3573 k8params=desc_params,
3574 timeout=1800,
3575 vca_id=vca_id,
3576 )
3577 )
3578 self.lcm_tasks.register(
3579 "ns",
3580 nsr_id,
3581 nslcmop_id,
3582 "instantiate_KDU-{}".format(index),
3583 task,
3584 )
3585 task_instantiation_info[task] = "Deploying KDU {}".format(
3586 kdur["kdu-name"]
3587 )
3588
3589 index += 1
3590
3591 except (LcmException, asyncio.CancelledError):
3592 raise
3593 except Exception as e:
3594 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3595 if isinstance(e, (N2VCException, DbException)):
3596 self.logger.error(logging_text + msg)
3597 else:
3598 self.logger.critical(logging_text + msg, exc_info=True)
3599 raise LcmException(msg)
3600 finally:
3601 if db_nsr_update:
3602 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3603
3604 def _deploy_n2vc(
3605 self,
3606 logging_text,
3607 db_nsr,
3608 db_vnfr,
3609 nslcmop_id,
3610 nsr_id,
3611 nsi_id,
3612 vnfd_id,
3613 vdu_id,
3614 kdu_name,
3615 member_vnf_index,
3616 vdu_index,
3617 vdu_name,
3618 deploy_params,
3619 descriptor_config,
3620 base_folder,
3621 task_instantiation_info,
3622 stage,
3623 ):
3624 # launch instantiate_N2VC in a asyncio task and register task object
3625 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3626 # if not found, create one entry and update database
3627 # fill db_nsr._admin.deployed.VCA.<index>
3628
3629 self.logger.debug(
3630 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3631 )
3632 if "execution-environment-list" in descriptor_config:
3633 ee_list = descriptor_config.get("execution-environment-list", [])
3634 elif "juju" in descriptor_config:
3635 ee_list = [descriptor_config] # ns charms
3636 else: # other types as script are not supported
3637 ee_list = []
3638
3639 for ee_item in ee_list:
3640 self.logger.debug(
3641 logging_text
3642 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3643 ee_item.get("juju"), ee_item.get("helm-chart")
3644 )
3645 )
3646 ee_descriptor_id = ee_item.get("id")
3647 if ee_item.get("juju"):
3648 vca_name = ee_item["juju"].get("charm")
3649 vca_type = (
3650 "lxc_proxy_charm"
3651 if ee_item["juju"].get("charm") is not None
3652 else "native_charm"
3653 )
3654 if ee_item["juju"].get("cloud") == "k8s":
3655 vca_type = "k8s_proxy_charm"
3656 elif ee_item["juju"].get("proxy") is False:
3657 vca_type = "native_charm"
3658 elif ee_item.get("helm-chart"):
3659 vca_name = ee_item["helm-chart"]
3660 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3661 vca_type = "helm"
3662 else:
3663 vca_type = "helm-v3"
3664 else:
3665 self.logger.debug(
3666 logging_text + "skipping non juju neither charm configuration"
3667 )
3668 continue
3669
3670 vca_index = -1
3671 for vca_index, vca_deployed in enumerate(
3672 db_nsr["_admin"]["deployed"]["VCA"]
3673 ):
3674 if not vca_deployed:
3675 continue
3676 if (
3677 vca_deployed.get("member-vnf-index") == member_vnf_index
3678 and vca_deployed.get("vdu_id") == vdu_id
3679 and vca_deployed.get("kdu_name") == kdu_name
3680 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3681 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3682 ):
3683 break
3684 else:
3685 # not found, create one.
3686 target = (
3687 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3688 )
3689 if vdu_id:
3690 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3691 elif kdu_name:
3692 target += "/kdu/{}".format(kdu_name)
3693 vca_deployed = {
3694 "target_element": target,
3695 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3696 "member-vnf-index": member_vnf_index,
3697 "vdu_id": vdu_id,
3698 "kdu_name": kdu_name,
3699 "vdu_count_index": vdu_index,
3700 "operational-status": "init", # TODO revise
3701 "detailed-status": "", # TODO revise
3702 "step": "initial-deploy", # TODO revise
3703 "vnfd_id": vnfd_id,
3704 "vdu_name": vdu_name,
3705 "type": vca_type,
3706 "ee_descriptor_id": ee_descriptor_id,
3707 }
3708 vca_index += 1
3709
3710 # create VCA and configurationStatus in db
3711 db_dict = {
3712 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3713 "configurationStatus.{}".format(vca_index): dict(),
3714 }
3715 self.update_db_2("nsrs", nsr_id, db_dict)
3716
3717 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3718
3719 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3720 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3721 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3722
3723 # Launch task
3724 task_n2vc = asyncio.ensure_future(
3725 self.instantiate_N2VC(
3726 logging_text=logging_text,
3727 vca_index=vca_index,
3728 nsi_id=nsi_id,
3729 db_nsr=db_nsr,
3730 db_vnfr=db_vnfr,
3731 vdu_id=vdu_id,
3732 kdu_name=kdu_name,
3733 vdu_index=vdu_index,
3734 deploy_params=deploy_params,
3735 config_descriptor=descriptor_config,
3736 base_folder=base_folder,
3737 nslcmop_id=nslcmop_id,
3738 stage=stage,
3739 vca_type=vca_type,
3740 vca_name=vca_name,
3741 ee_config_descriptor=ee_item,
3742 )
3743 )
3744 self.lcm_tasks.register(
3745 "ns",
3746 nsr_id,
3747 nslcmop_id,
3748 "instantiate_N2VC-{}".format(vca_index),
3749 task_n2vc,
3750 )
3751 task_instantiation_info[
3752 task_n2vc
3753 ] = self.task_name_deploy_vca + " {}.{}".format(
3754 member_vnf_index or "", vdu_id or ""
3755 )
3756
3757 @staticmethod
3758 def _create_nslcmop(nsr_id, operation, params):
3759 """
3760 Creates a ns-lcm-opp content to be stored at database.
3761 :param nsr_id: internal id of the instance
3762 :param operation: instantiate, terminate, scale, action, ...
3763 :param params: user parameters for the operation
3764 :return: dictionary following SOL005 format
3765 """
3766 # Raise exception if invalid arguments
3767 if not (nsr_id and operation and params):
3768 raise LcmException(
3769 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3770 )
3771 now = time()
3772 _id = str(uuid4())
3773 nslcmop = {
3774 "id": _id,
3775 "_id": _id,
3776 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3777 "operationState": "PROCESSING",
3778 "statusEnteredTime": now,
3779 "nsInstanceId": nsr_id,
3780 "lcmOperationType": operation,
3781 "startTime": now,
3782 "isAutomaticInvocation": False,
3783 "operationParams": params,
3784 "isCancelPending": False,
3785 "links": {
3786 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3787 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3788 },
3789 }
3790 return nslcmop
3791
3792 def _format_additional_params(self, params):
3793 params = params or {}
3794 for key, value in params.items():
3795 if str(value).startswith("!!yaml "):
3796 params[key] = yaml.safe_load(value[7:])
3797 return params
3798
3799 def _get_terminate_primitive_params(self, seq, vnf_index):
3800 primitive = seq.get("name")
3801 primitive_params = {}
3802 params = {
3803 "member_vnf_index": vnf_index,
3804 "primitive": primitive,
3805 "primitive_params": primitive_params,
3806 }
3807 desc_params = {}
3808 return self._map_primitive_params(seq, params, desc_params)
3809
3810 # sub-operations
3811
3812 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3813 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3814 if op.get("operationState") == "COMPLETED":
3815 # b. Skip sub-operation
3816 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3817 return self.SUBOPERATION_STATUS_SKIP
3818 else:
3819 # c. retry executing sub-operation
3820 # The sub-operation exists, and operationState != 'COMPLETED'
3821 # Update operationState = 'PROCESSING' to indicate a retry.
3822 operationState = "PROCESSING"
3823 detailed_status = "In progress"
3824 self._update_suboperation_status(
3825 db_nslcmop, op_index, operationState, detailed_status
3826 )
3827 # Return the sub-operation index
3828 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3829 # with arguments extracted from the sub-operation
3830 return op_index
3831
3832 # Find a sub-operation where all keys in a matching dictionary must match
3833 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3834 def _find_suboperation(self, db_nslcmop, match):
3835 if db_nslcmop and match:
3836 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3837 for i, op in enumerate(op_list):
3838 if all(op.get(k) == match[k] for k in match):
3839 return i
3840 return self.SUBOPERATION_STATUS_NOT_FOUND
3841
3842 # Update status for a sub-operation given its index
3843 def _update_suboperation_status(
3844 self, db_nslcmop, op_index, operationState, detailed_status
3845 ):
3846 # Update DB for HA tasks
3847 q_filter = {"_id": db_nslcmop["_id"]}
3848 update_dict = {
3849 "_admin.operations.{}.operationState".format(op_index): operationState,
3850 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3851 }
3852 self.db.set_one(
3853 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3854 )
3855
3856 # Add sub-operation, return the index of the added sub-operation
3857 # Optionally, set operationState, detailed-status, and operationType
3858 # Status and type are currently set for 'scale' sub-operations:
3859 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3860 # 'detailed-status' : status message
3861 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3862 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3863 def _add_suboperation(
3864 self,
3865 db_nslcmop,
3866 vnf_index,
3867 vdu_id,
3868 vdu_count_index,
3869 vdu_name,
3870 primitive,
3871 mapped_primitive_params,
3872 operationState=None,
3873 detailed_status=None,
3874 operationType=None,
3875 RO_nsr_id=None,
3876 RO_scaling_info=None,
3877 ):
3878 if not db_nslcmop:
3879 return self.SUBOPERATION_STATUS_NOT_FOUND
3880 # Get the "_admin.operations" list, if it exists
3881 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3882 op_list = db_nslcmop_admin.get("operations")
3883 # Create or append to the "_admin.operations" list
3884 new_op = {
3885 "member_vnf_index": vnf_index,
3886 "vdu_id": vdu_id,
3887 "vdu_count_index": vdu_count_index,
3888 "primitive": primitive,
3889 "primitive_params": mapped_primitive_params,
3890 }
3891 if operationState:
3892 new_op["operationState"] = operationState
3893 if detailed_status:
3894 new_op["detailed-status"] = detailed_status
3895 if operationType:
3896 new_op["lcmOperationType"] = operationType
3897 if RO_nsr_id:
3898 new_op["RO_nsr_id"] = RO_nsr_id
3899 if RO_scaling_info:
3900 new_op["RO_scaling_info"] = RO_scaling_info
3901 if not op_list:
3902 # No existing operations, create key 'operations' with current operation as first list element
3903 db_nslcmop_admin.update({"operations": [new_op]})
3904 op_list = db_nslcmop_admin.get("operations")
3905 else:
3906 # Existing operations, append operation to list
3907 op_list.append(new_op)
3908
3909 db_nslcmop_update = {"_admin.operations": op_list}
3910 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3911 op_index = len(op_list) - 1
3912 return op_index
3913
3914 # Helper methods for scale() sub-operations
3915
3916 # pre-scale/post-scale:
3917 # Check for 3 different cases:
3918 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3919 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3920 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3921 def _check_or_add_scale_suboperation(
3922 self,
3923 db_nslcmop,
3924 vnf_index,
3925 vnf_config_primitive,
3926 primitive_params,
3927 operationType,
3928 RO_nsr_id=None,
3929 RO_scaling_info=None,
3930 ):
3931 # Find this sub-operation
3932 if RO_nsr_id and RO_scaling_info:
3933 operationType = "SCALE-RO"
3934 match = {
3935 "member_vnf_index": vnf_index,
3936 "RO_nsr_id": RO_nsr_id,
3937 "RO_scaling_info": RO_scaling_info,
3938 }
3939 else:
3940 match = {
3941 "member_vnf_index": vnf_index,
3942 "primitive": vnf_config_primitive,
3943 "primitive_params": primitive_params,
3944 "lcmOperationType": operationType,
3945 }
3946 op_index = self._find_suboperation(db_nslcmop, match)
3947 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3948 # a. New sub-operation
3949 # The sub-operation does not exist, add it.
3950 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3951 # The following parameters are set to None for all kind of scaling:
3952 vdu_id = None
3953 vdu_count_index = None
3954 vdu_name = None
3955 if RO_nsr_id and RO_scaling_info:
3956 vnf_config_primitive = None
3957 primitive_params = None
3958 else:
3959 RO_nsr_id = None
3960 RO_scaling_info = None
3961 # Initial status for sub-operation
3962 operationState = "PROCESSING"
3963 detailed_status = "In progress"
3964 # Add sub-operation for pre/post-scaling (zero or more operations)
3965 self._add_suboperation(
3966 db_nslcmop,
3967 vnf_index,
3968 vdu_id,
3969 vdu_count_index,
3970 vdu_name,
3971 vnf_config_primitive,
3972 primitive_params,
3973 operationState,
3974 detailed_status,
3975 operationType,
3976 RO_nsr_id,
3977 RO_scaling_info,
3978 )
3979 return self.SUBOPERATION_STATUS_NEW
3980 else:
3981 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3982 # or op_index (operationState != 'COMPLETED')
3983 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3984
3985 # Function to return execution_environment id
3986
3987 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3988 # TODO vdu_index_count
3989 for vca in vca_deployed_list:
3990 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3991 return vca["ee_id"]
3992
3993 async def destroy_N2VC(
3994 self,
3995 logging_text,
3996 db_nslcmop,
3997 vca_deployed,
3998 config_descriptor,
3999 vca_index,
4000 destroy_ee=True,
4001 exec_primitives=True,
4002 scaling_in=False,
4003 vca_id: str = None,
4004 ):
4005 """
4006 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4007 :param logging_text:
4008 :param db_nslcmop:
4009 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4010 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4011 :param vca_index: index in the database _admin.deployed.VCA
4012 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4013 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4014 not executed properly
4015 :param scaling_in: True destroys the application, False destroys the model
4016 :return: None or exception
4017 """
4018
4019 self.logger.debug(
4020 logging_text
4021 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4022 vca_index, vca_deployed, config_descriptor, destroy_ee
4023 )
4024 )
4025
4026 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4027
4028 # execute terminate_primitives
4029 if exec_primitives:
4030 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4031 config_descriptor.get("terminate-config-primitive"),
4032 vca_deployed.get("ee_descriptor_id"),
4033 )
4034 vdu_id = vca_deployed.get("vdu_id")
4035 vdu_count_index = vca_deployed.get("vdu_count_index")
4036 vdu_name = vca_deployed.get("vdu_name")
4037 vnf_index = vca_deployed.get("member-vnf-index")
4038 if terminate_primitives and vca_deployed.get("needed_terminate"):
4039 for seq in terminate_primitives:
4040 # For each sequence in list, get primitive and call _ns_execute_primitive()
4041 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4042 vnf_index, seq.get("name")
4043 )
4044 self.logger.debug(logging_text + step)
4045 # Create the primitive for each sequence, i.e. "primitive": "touch"
4046 primitive = seq.get("name")
4047 mapped_primitive_params = self._get_terminate_primitive_params(
4048 seq, vnf_index
4049 )
4050
4051 # Add sub-operation
4052 self._add_suboperation(
4053 db_nslcmop,
4054 vnf_index,
4055 vdu_id,
4056 vdu_count_index,
4057 vdu_name,
4058 primitive,
4059 mapped_primitive_params,
4060 )
4061 # Sub-operations: Call _ns_execute_primitive() instead of action()
4062 try:
4063 result, result_detail = await self._ns_execute_primitive(
4064 vca_deployed["ee_id"],
4065 primitive,
4066 mapped_primitive_params,
4067 vca_type=vca_type,
4068 vca_id=vca_id,
4069 )
4070 except LcmException:
4071 # this happens when VCA is not deployed. In this case it is not needed to terminate
4072 continue
4073 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4074 if result not in result_ok:
4075 raise LcmException(
4076 "terminate_primitive {} for vnf_member_index={} fails with "
4077 "error {}".format(seq.get("name"), vnf_index, result_detail)
4078 )
4079 # set that this VCA do not need terminated
4080 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4081 vca_index
4082 )
4083 self.update_db_2(
4084 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4085 )
4086
4087 # Delete Prometheus Jobs if any
4088 # This uses NSR_ID, so it will destroy any jobs under this index
4089 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4090
4091 if destroy_ee:
4092 await self.vca_map[vca_type].delete_execution_environment(
4093 vca_deployed["ee_id"],
4094 scaling_in=scaling_in,
4095 vca_type=vca_type,
4096 vca_id=vca_id,
4097 )
4098
4099 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4100 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4101 namespace = "." + db_nsr["_id"]
4102 try:
4103 await self.n2vc.delete_namespace(
4104 namespace=namespace,
4105 total_timeout=self.timeout_charm_delete,
4106 vca_id=vca_id,
4107 )
4108 except N2VCNotFound: # already deleted. Skip
4109 pass
4110 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4111
4112 async def _terminate_RO(
4113 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4114 ):
4115 """
4116 Terminates a deployment from RO
4117 :param logging_text:
4118 :param nsr_deployed: db_nsr._admin.deployed
4119 :param nsr_id:
4120 :param nslcmop_id:
4121 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4122 this method will update only the index 2, but it will write on database the concatenated content of the list
4123 :return:
4124 """
4125 db_nsr_update = {}
4126 failed_detail = []
4127 ro_nsr_id = ro_delete_action = None
4128 if nsr_deployed and nsr_deployed.get("RO"):
4129 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4130 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4131 try:
4132 if ro_nsr_id:
4133 stage[2] = "Deleting ns from VIM."
4134 db_nsr_update["detailed-status"] = " ".join(stage)
4135 self._write_op_status(nslcmop_id, stage)
4136 self.logger.debug(logging_text + stage[2])
4137 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4138 self._write_op_status(nslcmop_id, stage)
4139 desc = await self.RO.delete("ns", ro_nsr_id)
4140 ro_delete_action = desc["action_id"]
4141 db_nsr_update[
4142 "_admin.deployed.RO.nsr_delete_action_id"
4143 ] = ro_delete_action
4144 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4145 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4146 if ro_delete_action:
4147 # wait until NS is deleted from VIM
4148 stage[2] = "Waiting ns deleted from VIM."
4149 detailed_status_old = None
4150 self.logger.debug(
4151 logging_text
4152 + stage[2]
4153 + " RO_id={} ro_delete_action={}".format(
4154 ro_nsr_id, ro_delete_action
4155 )
4156 )
4157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4158 self._write_op_status(nslcmop_id, stage)
4159
4160 delete_timeout = 20 * 60 # 20 minutes
4161 while delete_timeout > 0:
4162 desc = await self.RO.show(
4163 "ns",
4164 item_id_name=ro_nsr_id,
4165 extra_item="action",
4166 extra_item_id=ro_delete_action,
4167 )
4168
4169 # deploymentStatus
4170 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4171
4172 ns_status, ns_status_info = self.RO.check_action_status(desc)
4173 if ns_status == "ERROR":
4174 raise ROclient.ROClientException(ns_status_info)
4175 elif ns_status == "BUILD":
4176 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4177 elif ns_status == "ACTIVE":
4178 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4179 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4180 break
4181 else:
4182 assert (
4183 False
4184 ), "ROclient.check_action_status returns unknown {}".format(
4185 ns_status
4186 )
4187 if stage[2] != detailed_status_old:
4188 detailed_status_old = stage[2]
4189 db_nsr_update["detailed-status"] = " ".join(stage)
4190 self._write_op_status(nslcmop_id, stage)
4191 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4192 await asyncio.sleep(5, loop=self.loop)
4193 delete_timeout -= 5
4194 else: # delete_timeout <= 0:
4195 raise ROclient.ROClientException(
4196 "Timeout waiting ns deleted from VIM"
4197 )
4198
4199 except Exception as e:
4200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4201 if (
4202 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4203 ): # not found
4204 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4205 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4206 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4207 self.logger.debug(
4208 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4209 )
4210 elif (
4211 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4212 ): # conflict
4213 failed_detail.append("delete conflict: {}".format(e))
4214 self.logger.debug(
4215 logging_text
4216 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4217 )
4218 else:
4219 failed_detail.append("delete error: {}".format(e))
4220 self.logger.error(
4221 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4222 )
4223
4224 # Delete nsd
4225 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4226 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4227 try:
4228 stage[2] = "Deleting nsd from RO."
4229 db_nsr_update["detailed-status"] = " ".join(stage)
4230 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4231 self._write_op_status(nslcmop_id, stage)
4232 await self.RO.delete("nsd", ro_nsd_id)
4233 self.logger.debug(
4234 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4235 )
4236 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4237 except Exception as e:
4238 if (
4239 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4240 ): # not found
4241 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4242 self.logger.debug(
4243 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4244 )
4245 elif (
4246 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4247 ): # conflict
4248 failed_detail.append(
4249 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4250 )
4251 self.logger.debug(logging_text + failed_detail[-1])
4252 else:
4253 failed_detail.append(
4254 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4255 )
4256 self.logger.error(logging_text + failed_detail[-1])
4257
4258 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4259 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4260 if not vnf_deployed or not vnf_deployed["id"]:
4261 continue
4262 try:
4263 ro_vnfd_id = vnf_deployed["id"]
4264 stage[
4265 2
4266 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4267 vnf_deployed["member-vnf-index"], ro_vnfd_id
4268 )
4269 db_nsr_update["detailed-status"] = " ".join(stage)
4270 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4271 self._write_op_status(nslcmop_id, stage)
4272 await self.RO.delete("vnfd", ro_vnfd_id)
4273 self.logger.debug(
4274 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4275 )
4276 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4277 except Exception as e:
4278 if (
4279 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4280 ): # not found
4281 db_nsr_update[
4282 "_admin.deployed.RO.vnfd.{}.id".format(index)
4283 ] = None
4284 self.logger.debug(
4285 logging_text
4286 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4287 )
4288 elif (
4289 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4290 ): # conflict
4291 failed_detail.append(
4292 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4293 )
4294 self.logger.debug(logging_text + failed_detail[-1])
4295 else:
4296 failed_detail.append(
4297 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4298 )
4299 self.logger.error(logging_text + failed_detail[-1])
4300
4301 if failed_detail:
4302 stage[2] = "Error deleting from VIM"
4303 else:
4304 stage[2] = "Deleted from VIM"
4305 db_nsr_update["detailed-status"] = " ".join(stage)
4306 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4307 self._write_op_status(nslcmop_id, stage)
4308
4309 if failed_detail:
4310 raise LcmException("; ".join(failed_detail))
4311
4312 async def terminate(self, nsr_id, nslcmop_id):
4313 # Try to lock HA task here
4314 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4315 if not task_is_locked_by_me:
4316 return
4317
4318 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4319 self.logger.debug(logging_text + "Enter")
4320 timeout_ns_terminate = self.timeout_ns_terminate
4321 db_nsr = None
4322 db_nslcmop = None
4323 operation_params = None
4324 exc = None
4325 error_list = [] # annotates all failed error messages
4326 db_nslcmop_update = {}
4327 autoremove = False # autoremove after terminated
4328 tasks_dict_info = {}
4329 db_nsr_update = {}
4330 stage = [
4331 "Stage 1/3: Preparing task.",
4332 "Waiting for previous operations to terminate.",
4333 "",
4334 ]
4335 # ^ contains [stage, step, VIM-status]
4336 try:
4337 # wait for any previous tasks in process
4338 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4339
4340 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4341 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4342 operation_params = db_nslcmop.get("operationParams") or {}
4343 if operation_params.get("timeout_ns_terminate"):
4344 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4345 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4346 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4347
4348 db_nsr_update["operational-status"] = "terminating"
4349 db_nsr_update["config-status"] = "terminating"
4350 self._write_ns_status(
4351 nsr_id=nsr_id,
4352 ns_state="TERMINATING",
4353 current_operation="TERMINATING",
4354 current_operation_id=nslcmop_id,
4355 other_update=db_nsr_update,
4356 )
4357 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4358 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4359 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4360 return
4361
4362 stage[1] = "Getting vnf descriptors from db."
4363 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4364 db_vnfrs_dict = {
4365 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4366 }
4367 db_vnfds_from_id = {}
4368 db_vnfds_from_member_index = {}
4369 # Loop over VNFRs
4370 for vnfr in db_vnfrs_list:
4371 vnfd_id = vnfr["vnfd-id"]
4372 if vnfd_id not in db_vnfds_from_id:
4373 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4374 db_vnfds_from_id[vnfd_id] = vnfd
4375 db_vnfds_from_member_index[
4376 vnfr["member-vnf-index-ref"]
4377 ] = db_vnfds_from_id[vnfd_id]
4378
4379 # Destroy individual execution environments when there are terminating primitives.
4380 # Rest of EE will be deleted at once
4381 # TODO - check before calling _destroy_N2VC
4382 # if not operation_params.get("skip_terminate_primitives"):#
4383 # or not vca.get("needed_terminate"):
4384 stage[0] = "Stage 2/3 execute terminating primitives."
4385 self.logger.debug(logging_text + stage[0])
4386 stage[1] = "Looking execution environment that needs terminate."
4387 self.logger.debug(logging_text + stage[1])
4388
4389 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4390 config_descriptor = None
4391 vca_member_vnf_index = vca.get("member-vnf-index")
4392 vca_id = self.get_vca_id(
4393 db_vnfrs_dict.get(vca_member_vnf_index)
4394 if vca_member_vnf_index
4395 else None,
4396 db_nsr,
4397 )
4398 if not vca or not vca.get("ee_id"):
4399 continue
4400 if not vca.get("member-vnf-index"):
4401 # ns
4402 config_descriptor = db_nsr.get("ns-configuration")
4403 elif vca.get("vdu_id"):
4404 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4405 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4406 elif vca.get("kdu_name"):
4407 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4408 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4409 else:
4410 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4411 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4412 vca_type = vca.get("type")
4413 exec_terminate_primitives = not operation_params.get(
4414 "skip_terminate_primitives"
4415 ) and vca.get("needed_terminate")
4416 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4417 # pending native charms
4418 destroy_ee = (
4419 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4420 )
4421 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4422 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4423 task = asyncio.ensure_future(
4424 self.destroy_N2VC(
4425 logging_text,
4426 db_nslcmop,
4427 vca,
4428 config_descriptor,
4429 vca_index,
4430 destroy_ee,
4431 exec_terminate_primitives,
4432 vca_id=vca_id,
4433 )
4434 )
4435 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4436
4437 # wait for pending tasks of terminate primitives
4438 if tasks_dict_info:
4439 self.logger.debug(
4440 logging_text
4441 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4442 )
4443 error_list = await self._wait_for_tasks(
4444 logging_text,
4445 tasks_dict_info,
4446 min(self.timeout_charm_delete, timeout_ns_terminate),
4447 stage,
4448 nslcmop_id,
4449 )
4450 tasks_dict_info.clear()
4451 if error_list:
4452 return # raise LcmException("; ".join(error_list))
4453
4454 # remove All execution environments at once
4455 stage[0] = "Stage 3/3 delete all."
4456
4457 if nsr_deployed.get("VCA"):
4458 stage[1] = "Deleting all execution environments."
4459 self.logger.debug(logging_text + stage[1])
4460 vca_id = self.get_vca_id({}, db_nsr)
4461 task_delete_ee = asyncio.ensure_future(
4462 asyncio.wait_for(
4463 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4464 timeout=self.timeout_charm_delete,
4465 )
4466 )
4467 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4468 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4469
4470 # Delete from k8scluster
4471 stage[1] = "Deleting KDUs."
4472 self.logger.debug(logging_text + stage[1])
4473 # print(nsr_deployed)
4474 for kdu in get_iterable(nsr_deployed, "K8s"):
4475 if not kdu or not kdu.get("kdu-instance"):
4476 continue
4477 kdu_instance = kdu.get("kdu-instance")
4478 if kdu.get("k8scluster-type") in self.k8scluster_map:
4479 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4480 vca_id = self.get_vca_id({}, db_nsr)
4481 task_delete_kdu_instance = asyncio.ensure_future(
4482 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4483 cluster_uuid=kdu.get("k8scluster-uuid"),
4484 kdu_instance=kdu_instance,
4485 vca_id=vca_id,
4486 )
4487 )
4488 else:
4489 self.logger.error(
4490 logging_text
4491 + "Unknown k8s deployment type {}".format(
4492 kdu.get("k8scluster-type")
4493 )
4494 )
4495 continue
4496 tasks_dict_info[
4497 task_delete_kdu_instance
4498 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4499
4500 # remove from RO
4501 stage[1] = "Deleting ns from VIM."
4502 if self.ng_ro:
4503 task_delete_ro = asyncio.ensure_future(
4504 self._terminate_ng_ro(
4505 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4506 )
4507 )
4508 else:
4509 task_delete_ro = asyncio.ensure_future(
4510 self._terminate_RO(
4511 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4512 )
4513 )
4514 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4515
4516 # rest of staff will be done at finally
4517
4518 except (
4519 ROclient.ROClientException,
4520 DbException,
4521 LcmException,
4522 N2VCException,
4523 ) as e:
4524 self.logger.error(logging_text + "Exit Exception {}".format(e))
4525 exc = e
4526 except asyncio.CancelledError:
4527 self.logger.error(
4528 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4529 )
4530 exc = "Operation was cancelled"
4531 except Exception as e:
4532 exc = traceback.format_exc()
4533 self.logger.critical(
4534 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4535 exc_info=True,
4536 )
4537 finally:
4538 if exc:
4539 error_list.append(str(exc))
4540 try:
4541 # wait for pending tasks
4542 if tasks_dict_info:
4543 stage[1] = "Waiting for terminate pending tasks."
4544 self.logger.debug(logging_text + stage[1])
4545 error_list += await self._wait_for_tasks(
4546 logging_text,
4547 tasks_dict_info,
4548 timeout_ns_terminate,
4549 stage,
4550 nslcmop_id,
4551 )
4552 stage[1] = stage[2] = ""
4553 except asyncio.CancelledError:
4554 error_list.append("Cancelled")
4555 # TODO cancell all tasks
4556 except Exception as exc:
4557 error_list.append(str(exc))
4558 # update status at database
4559 if error_list:
4560 error_detail = "; ".join(error_list)
4561 # self.logger.error(logging_text + error_detail)
4562 error_description_nslcmop = "{} Detail: {}".format(
4563 stage[0], error_detail
4564 )
4565 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4566 nslcmop_id, stage[0]
4567 )
4568
4569 db_nsr_update["operational-status"] = "failed"
4570 db_nsr_update["detailed-status"] = (
4571 error_description_nsr + " Detail: " + error_detail
4572 )
4573 db_nslcmop_update["detailed-status"] = error_detail
4574 nslcmop_operation_state = "FAILED"
4575 ns_state = "BROKEN"
4576 else:
4577 error_detail = None
4578 error_description_nsr = error_description_nslcmop = None
4579 ns_state = "NOT_INSTANTIATED"
4580 db_nsr_update["operational-status"] = "terminated"
4581 db_nsr_update["detailed-status"] = "Done"
4582 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4583 db_nslcmop_update["detailed-status"] = "Done"
4584 nslcmop_operation_state = "COMPLETED"
4585
4586 if db_nsr:
4587 self._write_ns_status(
4588 nsr_id=nsr_id,
4589 ns_state=ns_state,
4590 current_operation="IDLE",
4591 current_operation_id=None,
4592 error_description=error_description_nsr,
4593 error_detail=error_detail,
4594 other_update=db_nsr_update,
4595 )
4596 self._write_op_status(
4597 op_id=nslcmop_id,
4598 stage="",
4599 error_message=error_description_nslcmop,
4600 operation_state=nslcmop_operation_state,
4601 other_update=db_nslcmop_update,
4602 )
4603 if ns_state == "NOT_INSTANTIATED":
4604 try:
4605 self.db.set_list(
4606 "vnfrs",
4607 {"nsr-id-ref": nsr_id},
4608 {"_admin.nsState": "NOT_INSTANTIATED"},
4609 )
4610 except DbException as e:
4611 self.logger.warn(
4612 logging_text
4613 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4614 nsr_id, e
4615 )
4616 )
4617 if operation_params:
4618 autoremove = operation_params.get("autoremove", False)
4619 if nslcmop_operation_state:
4620 try:
4621 await self.msg.aiowrite(
4622 "ns",
4623 "terminated",
4624 {
4625 "nsr_id": nsr_id,
4626 "nslcmop_id": nslcmop_id,
4627 "operationState": nslcmop_operation_state,
4628 "autoremove": autoremove,
4629 },
4630 loop=self.loop,
4631 )
4632 except Exception as e:
4633 self.logger.error(
4634 logging_text + "kafka_write notification Exception {}".format(e)
4635 )
4636
4637 self.logger.debug(logging_text + "Exit")
4638 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4639
4640 async def _wait_for_tasks(
4641 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4642 ):
4643 time_start = time()
4644 error_detail_list = []
4645 error_list = []
4646 pending_tasks = list(created_tasks_info.keys())
4647 num_tasks = len(pending_tasks)
4648 num_done = 0
4649 stage[1] = "{}/{}.".format(num_done, num_tasks)
4650 self._write_op_status(nslcmop_id, stage)
4651 while pending_tasks:
4652 new_error = None
4653 _timeout = timeout + time_start - time()
4654 done, pending_tasks = await asyncio.wait(
4655 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4656 )
4657 num_done += len(done)
4658 if not done: # Timeout
4659 for task in pending_tasks:
4660 new_error = created_tasks_info[task] + ": Timeout"
4661 error_detail_list.append(new_error)
4662 error_list.append(new_error)
4663 break
4664 for task in done:
4665 if task.cancelled():
4666 exc = "Cancelled"
4667 else:
4668 exc = task.exception()
4669 if exc:
4670 if isinstance(exc, asyncio.TimeoutError):
4671 exc = "Timeout"
4672 new_error = created_tasks_info[task] + ": {}".format(exc)
4673 error_list.append(created_tasks_info[task])
4674 error_detail_list.append(new_error)
4675 if isinstance(
4676 exc,
4677 (
4678 str,
4679 DbException,
4680 N2VCException,
4681 ROclient.ROClientException,
4682 LcmException,
4683 K8sException,
4684 NgRoException,
4685 ),
4686 ):
4687 self.logger.error(logging_text + new_error)
4688 else:
4689 exc_traceback = "".join(
4690 traceback.format_exception(None, exc, exc.__traceback__)
4691 )
4692 self.logger.error(
4693 logging_text
4694 + created_tasks_info[task]
4695 + " "
4696 + exc_traceback
4697 )
4698 else:
4699 self.logger.debug(
4700 logging_text + created_tasks_info[task] + ": Done"
4701 )
4702 stage[1] = "{}/{}.".format(num_done, num_tasks)
4703 if new_error:
4704 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4705 if nsr_id: # update also nsr
4706 self.update_db_2(
4707 "nsrs",
4708 nsr_id,
4709 {
4710 "errorDescription": "Error at: " + ", ".join(error_list),
4711 "errorDetail": ". ".join(error_detail_list),
4712 },
4713 )
4714 self._write_op_status(nslcmop_id, stage)
4715 return error_detail_list
4716
4717 @staticmethod
4718 def _map_primitive_params(primitive_desc, params, instantiation_params):
4719 """
4720 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4721 The default-value is used. If it is between < > it look for a value at instantiation_params
4722 :param primitive_desc: portion of VNFD/NSD that describes primitive
4723 :param params: Params provided by user
4724 :param instantiation_params: Instantiation params provided by user
4725 :return: a dictionary with the calculated params
4726 """
4727 calculated_params = {}
4728 for parameter in primitive_desc.get("parameter", ()):
4729 param_name = parameter["name"]
4730 if param_name in params:
4731 calculated_params[param_name] = params[param_name]
4732 elif "default-value" in parameter or "value" in parameter:
4733 if "value" in parameter:
4734 calculated_params[param_name] = parameter["value"]
4735 else:
4736 calculated_params[param_name] = parameter["default-value"]
4737 if (
4738 isinstance(calculated_params[param_name], str)
4739 and calculated_params[param_name].startswith("<")
4740 and calculated_params[param_name].endswith(">")
4741 ):
4742 if calculated_params[param_name][1:-1] in instantiation_params:
4743 calculated_params[param_name] = instantiation_params[
4744 calculated_params[param_name][1:-1]
4745 ]
4746 else:
4747 raise LcmException(
4748 "Parameter {} needed to execute primitive {} not provided".format(
4749 calculated_params[param_name], primitive_desc["name"]
4750 )
4751 )
4752 else:
4753 raise LcmException(
4754 "Parameter {} needed to execute primitive {} not provided".format(
4755 param_name, primitive_desc["name"]
4756 )
4757 )
4758
4759 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4760 calculated_params[param_name] = yaml.safe_dump(
4761 calculated_params[param_name], default_flow_style=True, width=256
4762 )
4763 elif isinstance(calculated_params[param_name], str) and calculated_params[
4764 param_name
4765 ].startswith("!!yaml "):
4766 calculated_params[param_name] = calculated_params[param_name][7:]
4767 if parameter.get("data-type") == "INTEGER":
4768 try:
4769 calculated_params[param_name] = int(calculated_params[param_name])
4770 except ValueError: # error converting string to int
4771 raise LcmException(
4772 "Parameter {} of primitive {} must be integer".format(
4773 param_name, primitive_desc["name"]
4774 )
4775 )
4776 elif parameter.get("data-type") == "BOOLEAN":
4777 calculated_params[param_name] = not (
4778 (str(calculated_params[param_name])).lower() == "false"
4779 )
4780
4781 # add always ns_config_info if primitive name is config
4782 if primitive_desc["name"] == "config":
4783 if "ns_config_info" in instantiation_params:
4784 calculated_params["ns_config_info"] = instantiation_params[
4785 "ns_config_info"
4786 ]
4787 return calculated_params
4788
4789 def _look_for_deployed_vca(
4790 self,
4791 deployed_vca,
4792 member_vnf_index,
4793 vdu_id,
4794 vdu_count_index,
4795 kdu_name=None,
4796 ee_descriptor_id=None,
4797 ):
4798 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4799 for vca in deployed_vca:
4800 if not vca:
4801 continue
4802 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4803 continue
4804 if (
4805 vdu_count_index is not None
4806 and vdu_count_index != vca["vdu_count_index"]
4807 ):
4808 continue
4809 if kdu_name and kdu_name != vca["kdu_name"]:
4810 continue
4811 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4812 continue
4813 break
4814 else:
4815 # vca_deployed not found
4816 raise LcmException(
4817 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4818 " is not deployed".format(
4819 member_vnf_index,
4820 vdu_id,
4821 vdu_count_index,
4822 kdu_name,
4823 ee_descriptor_id,
4824 )
4825 )
4826 # get ee_id
4827 ee_id = vca.get("ee_id")
4828 vca_type = vca.get(
4829 "type", "lxc_proxy_charm"
4830 ) # default value for backward compatibility - proxy charm
4831 if not ee_id:
4832 raise LcmException(
4833 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4834 "execution environment".format(
4835 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4836 )
4837 )
4838 return ee_id, vca_type
4839
4840 async def _ns_execute_primitive(
4841 self,
4842 ee_id,
4843 primitive,
4844 primitive_params,
4845 retries=0,
4846 retries_interval=30,
4847 timeout=None,
4848 vca_type=None,
4849 db_dict=None,
4850 vca_id: str = None,
4851 ) -> (str, str):
4852 try:
4853 if primitive == "config":
4854 primitive_params = {"params": primitive_params}
4855
4856 vca_type = vca_type or "lxc_proxy_charm"
4857
4858 while retries >= 0:
4859 try:
4860 output = await asyncio.wait_for(
4861 self.vca_map[vca_type].exec_primitive(
4862 ee_id=ee_id,
4863 primitive_name=primitive,
4864 params_dict=primitive_params,
4865 progress_timeout=self.timeout_progress_primitive,
4866 total_timeout=self.timeout_primitive,
4867 db_dict=db_dict,
4868 vca_id=vca_id,
4869 vca_type=vca_type,
4870 ),
4871 timeout=timeout or self.timeout_primitive,
4872 )
4873 # execution was OK
4874 break
4875 except asyncio.CancelledError:
4876 raise
4877 except Exception as e: # asyncio.TimeoutError
4878 if isinstance(e, asyncio.TimeoutError):
4879 e = "Timeout"
4880 retries -= 1
4881 if retries >= 0:
4882 self.logger.debug(
4883 "Error executing action {} on {} -> {}".format(
4884 primitive, ee_id, e
4885 )
4886 )
4887 # wait and retry
4888 await asyncio.sleep(retries_interval, loop=self.loop)
4889 else:
4890 return "FAILED", str(e)
4891
4892 return "COMPLETED", output
4893
4894 except (LcmException, asyncio.CancelledError):
4895 raise
4896 except Exception as e:
4897 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4898
4899 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4900 """
4901 Updating the vca_status with latest juju information in nsrs record
4902 :param: nsr_id: Id of the nsr
4903 :param: nslcmop_id: Id of the nslcmop
4904 :return: None
4905 """
4906
4907 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4908 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4909 vca_id = self.get_vca_id({}, db_nsr)
4910 if db_nsr["_admin"]["deployed"]["K8s"]:
4911 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4912 cluster_uuid, kdu_instance, cluster_type = (
4913 k8s["k8scluster-uuid"],
4914 k8s["kdu-instance"],
4915 k8s["k8scluster-type"],
4916 )
4917 await self._on_update_k8s_db(
4918 cluster_uuid=cluster_uuid,
4919 kdu_instance=kdu_instance,
4920 filter={"_id": nsr_id},
4921 vca_id=vca_id,
4922 cluster_type=cluster_type,
4923 )
4924 else:
4925 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4926 table, filter = "nsrs", {"_id": nsr_id}
4927 path = "_admin.deployed.VCA.{}.".format(vca_index)
4928 await self._on_update_n2vc_db(table, filter, path, {})
4929
4930 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4931 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4932
4933 async def action(self, nsr_id, nslcmop_id):
4934 # Try to lock HA task here
4935 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4936 if not task_is_locked_by_me:
4937 return
4938
4939 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4940 self.logger.debug(logging_text + "Enter")
4941 # get all needed from database
4942 db_nsr = None
4943 db_nslcmop = None
4944 db_nsr_update = {}
4945 db_nslcmop_update = {}
4946 nslcmop_operation_state = None
4947 error_description_nslcmop = None
4948 exc = None
4949 try:
4950 # wait for any previous tasks in process
4951 step = "Waiting for previous operations to terminate"
4952 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4953
4954 self._write_ns_status(
4955 nsr_id=nsr_id,
4956 ns_state=None,
4957 current_operation="RUNNING ACTION",
4958 current_operation_id=nslcmop_id,
4959 )
4960
4961 step = "Getting information from database"
4962 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4963 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4964 if db_nslcmop["operationParams"].get("primitive_params"):
4965 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4966 db_nslcmop["operationParams"]["primitive_params"]
4967 )
4968
4969 nsr_deployed = db_nsr["_admin"].get("deployed")
4970 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4971 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4972 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4973 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4974 primitive = db_nslcmop["operationParams"]["primitive"]
4975 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4976 timeout_ns_action = db_nslcmop["operationParams"].get(
4977 "timeout_ns_action", self.timeout_primitive
4978 )
4979
4980 if vnf_index:
4981 step = "Getting vnfr from database"
4982 db_vnfr = self.db.get_one(
4983 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4984 )
4985 if db_vnfr.get("kdur"):
4986 kdur_list = []
4987 for kdur in db_vnfr["kdur"]:
4988 if kdur.get("additionalParams"):
4989 kdur["additionalParams"] = json.loads(
4990 kdur["additionalParams"]
4991 )
4992 kdur_list.append(kdur)
4993 db_vnfr["kdur"] = kdur_list
4994 step = "Getting vnfd from database"
4995 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4996 else:
4997 step = "Getting nsd from database"
4998 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4999
5000 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5001 # for backward compatibility
5002 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5003 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5004 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5005 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5006
5007 # look for primitive
5008 config_primitive_desc = descriptor_configuration = None
5009 if vdu_id:
5010 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5011 elif kdu_name:
5012 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5013 elif vnf_index:
5014 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5015 else:
5016 descriptor_configuration = db_nsd.get("ns-configuration")
5017
5018 if descriptor_configuration and descriptor_configuration.get(
5019 "config-primitive"
5020 ):
5021 for config_primitive in descriptor_configuration["config-primitive"]:
5022 if config_primitive["name"] == primitive:
5023 config_primitive_desc = config_primitive
5024 break
5025
5026 if not config_primitive_desc:
5027 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5028 raise LcmException(
5029 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5030 primitive
5031 )
5032 )
5033 primitive_name = primitive
5034 ee_descriptor_id = None
5035 else:
5036 primitive_name = config_primitive_desc.get(
5037 "execution-environment-primitive", primitive
5038 )
5039 ee_descriptor_id = config_primitive_desc.get(
5040 "execution-environment-ref"
5041 )
5042
5043 if vnf_index:
5044 if vdu_id:
5045 vdur = next(
5046 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5047 )
5048 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5049 elif kdu_name:
5050 kdur = next(
5051 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5052 )
5053 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5054 else:
5055 desc_params = parse_yaml_strings(
5056 db_vnfr.get("additionalParamsForVnf")
5057 )
5058 else:
5059 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5060 if kdu_name and get_configuration(db_vnfd, kdu_name):
5061 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5062 actions = set()
5063 for primitive in kdu_configuration.get("initial-config-primitive", []):
5064 actions.add(primitive["name"])
5065 for primitive in kdu_configuration.get("config-primitive", []):
5066 actions.add(primitive["name"])
5067 kdu_action = True if primitive_name in actions else False
5068
5069 # TODO check if ns is in a proper status
5070 if kdu_name and (
5071 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5072 ):
5073 # kdur and desc_params already set from before
5074 if primitive_params:
5075 desc_params.update(primitive_params)
5076 # TODO Check if we will need something at vnf level
5077 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5078 if (
5079 kdu_name == kdu["kdu-name"]
5080 and kdu["member-vnf-index"] == vnf_index
5081 ):
5082 break
5083 else:
5084 raise LcmException(
5085 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5086 )
5087
5088 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5089 msg = "unknown k8scluster-type '{}'".format(
5090 kdu.get("k8scluster-type")
5091 )
5092 raise LcmException(msg)
5093
5094 db_dict = {
5095 "collection": "nsrs",
5096 "filter": {"_id": nsr_id},
5097 "path": "_admin.deployed.K8s.{}".format(index),
5098 }
5099 self.logger.debug(
5100 logging_text
5101 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5102 )
5103 step = "Executing kdu {}".format(primitive_name)
5104 if primitive_name == "upgrade":
5105 if desc_params.get("kdu_model"):
5106 kdu_model = desc_params.get("kdu_model")
5107 del desc_params["kdu_model"]
5108 else:
5109 kdu_model = kdu.get("kdu-model")
5110 parts = kdu_model.split(sep=":")
5111 if len(parts) == 2:
5112 kdu_model = parts[0]
5113
5114 detailed_status = await asyncio.wait_for(
5115 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5116 cluster_uuid=kdu.get("k8scluster-uuid"),
5117 kdu_instance=kdu.get("kdu-instance"),
5118 atomic=True,
5119 kdu_model=kdu_model,
5120 params=desc_params,
5121 db_dict=db_dict,
5122 timeout=timeout_ns_action,
5123 ),
5124 timeout=timeout_ns_action + 10,
5125 )
5126 self.logger.debug(
5127 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5128 )
5129 elif primitive_name == "rollback":
5130 detailed_status = await asyncio.wait_for(
5131 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5132 cluster_uuid=kdu.get("k8scluster-uuid"),
5133 kdu_instance=kdu.get("kdu-instance"),
5134 db_dict=db_dict,
5135 ),
5136 timeout=timeout_ns_action,
5137 )
5138 elif primitive_name == "status":
5139 detailed_status = await asyncio.wait_for(
5140 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5141 cluster_uuid=kdu.get("k8scluster-uuid"),
5142 kdu_instance=kdu.get("kdu-instance"),
5143 vca_id=vca_id,
5144 ),
5145 timeout=timeout_ns_action,
5146 )
5147 else:
5148 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5149 kdu["kdu-name"], nsr_id
5150 )
5151 params = self._map_primitive_params(
5152 config_primitive_desc, primitive_params, desc_params
5153 )
5154
5155 detailed_status = await asyncio.wait_for(
5156 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5157 cluster_uuid=kdu.get("k8scluster-uuid"),
5158 kdu_instance=kdu_instance,
5159 primitive_name=primitive_name,
5160 params=params,
5161 db_dict=db_dict,
5162 timeout=timeout_ns_action,
5163 vca_id=vca_id,
5164 ),
5165 timeout=timeout_ns_action,
5166 )
5167
5168 if detailed_status:
5169 nslcmop_operation_state = "COMPLETED"
5170 else:
5171 detailed_status = ""
5172 nslcmop_operation_state = "FAILED"
5173 else:
5174 ee_id, vca_type = self._look_for_deployed_vca(
5175 nsr_deployed["VCA"],
5176 member_vnf_index=vnf_index,
5177 vdu_id=vdu_id,
5178 vdu_count_index=vdu_count_index,
5179 ee_descriptor_id=ee_descriptor_id,
5180 )
5181 for vca_index, vca_deployed in enumerate(
5182 db_nsr["_admin"]["deployed"]["VCA"]
5183 ):
5184 if vca_deployed.get("member-vnf-index") == vnf_index:
5185 db_dict = {
5186 "collection": "nsrs",
5187 "filter": {"_id": nsr_id},
5188 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5189 }
5190 break
5191 (
5192 nslcmop_operation_state,
5193 detailed_status,
5194 ) = await self._ns_execute_primitive(
5195 ee_id,
5196 primitive=primitive_name,
5197 primitive_params=self._map_primitive_params(
5198 config_primitive_desc, primitive_params, desc_params
5199 ),
5200 timeout=timeout_ns_action,
5201 vca_type=vca_type,
5202 db_dict=db_dict,
5203 vca_id=vca_id,
5204 )
5205
5206 db_nslcmop_update["detailed-status"] = detailed_status
5207 error_description_nslcmop = (
5208 detailed_status if nslcmop_operation_state == "FAILED" else ""
5209 )
5210 self.logger.debug(
5211 logging_text
5212 + " task Done with result {} {}".format(
5213 nslcmop_operation_state, detailed_status
5214 )
5215 )
5216 return # database update is called inside finally
5217
5218 except (DbException, LcmException, N2VCException, K8sException) as e:
5219 self.logger.error(logging_text + "Exit Exception {}".format(e))
5220 exc = e
5221 except asyncio.CancelledError:
5222 self.logger.error(
5223 logging_text + "Cancelled Exception while '{}'".format(step)
5224 )
5225 exc = "Operation was cancelled"
5226 except asyncio.TimeoutError:
5227 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5228 exc = "Timeout"
5229 except Exception as e:
5230 exc = traceback.format_exc()
5231 self.logger.critical(
5232 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5233 exc_info=True,
5234 )
5235 finally:
5236 if exc:
5237 db_nslcmop_update[
5238 "detailed-status"
5239 ] = (
5240 detailed_status
5241 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5242 nslcmop_operation_state = "FAILED"
5243 if db_nsr:
5244 self._write_ns_status(
5245 nsr_id=nsr_id,
5246 ns_state=db_nsr[
5247 "nsState"
5248 ], # TODO check if degraded. For the moment use previous status
5249 current_operation="IDLE",
5250 current_operation_id=None,
5251 # error_description=error_description_nsr,
5252 # error_detail=error_detail,
5253 other_update=db_nsr_update,
5254 )
5255
5256 self._write_op_status(
5257 op_id=nslcmop_id,
5258 stage="",
5259 error_message=error_description_nslcmop,
5260 operation_state=nslcmop_operation_state,
5261 other_update=db_nslcmop_update,
5262 )
5263
5264 if nslcmop_operation_state:
5265 try:
5266 await self.msg.aiowrite(
5267 "ns",
5268 "actioned",
5269 {
5270 "nsr_id": nsr_id,
5271 "nslcmop_id": nslcmop_id,
5272 "operationState": nslcmop_operation_state,
5273 },
5274 loop=self.loop,
5275 )
5276 except Exception as e:
5277 self.logger.error(
5278 logging_text + "kafka_write notification Exception {}".format(e)
5279 )
5280 self.logger.debug(logging_text + "Exit")
5281 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5282 return nslcmop_operation_state, detailed_status
5283
5284 async def scale(self, nsr_id, nslcmop_id):
5285 # Try to lock HA task here
5286 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5287 if not task_is_locked_by_me:
5288 return
5289
5290 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5291 stage = ["", "", ""]
5292 tasks_dict_info = {}
5293 # ^ stage, step, VIM progress
5294 self.logger.debug(logging_text + "Enter")
5295 # get all needed from database
5296 db_nsr = None
5297 db_nslcmop_update = {}
5298 db_nsr_update = {}
5299 exc = None
5300 # in case of error, indicates what part of scale was failed to put nsr at error status
5301 scale_process = None
5302 old_operational_status = ""
5303 old_config_status = ""
5304 nsi_id = None
5305 try:
5306 # wait for any previous tasks in process
5307 step = "Waiting for previous operations to terminate"
5308 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5309 self._write_ns_status(
5310 nsr_id=nsr_id,
5311 ns_state=None,
5312 current_operation="SCALING",
5313 current_operation_id=nslcmop_id,
5314 )
5315
5316 step = "Getting nslcmop from database"
5317 self.logger.debug(
5318 step + " after having waited for previous tasks to be completed"
5319 )
5320 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5321
5322 step = "Getting nsr from database"
5323 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5324 old_operational_status = db_nsr["operational-status"]
5325 old_config_status = db_nsr["config-status"]
5326
5327 step = "Parsing scaling parameters"
5328 db_nsr_update["operational-status"] = "scaling"
5329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5330 nsr_deployed = db_nsr["_admin"].get("deployed")
5331
5332 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5333 "scaleByStepData"
5334 ]["member-vnf-index"]
5335 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5336 "scaleByStepData"
5337 ]["scaling-group-descriptor"]
5338 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5339 # for backward compatibility
5340 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5341 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5342 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5343 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5344
5345 step = "Getting vnfr from database"
5346 db_vnfr = self.db.get_one(
5347 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5348 )
5349
5350 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5351
5352 step = "Getting vnfd from database"
5353 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5354
5355 base_folder = db_vnfd["_admin"]["storage"]
5356
5357 step = "Getting scaling-group-descriptor"
5358 scaling_descriptor = find_in_list(
5359 get_scaling_aspect(db_vnfd),
5360 lambda scale_desc: scale_desc["name"] == scaling_group,
5361 )
5362 if not scaling_descriptor:
5363 raise LcmException(
5364 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5365 "at vnfd:scaling-group-descriptor".format(scaling_group)
5366 )
5367
5368 step = "Sending scale order to VIM"
5369 # TODO check if ns is in a proper status
5370 nb_scale_op = 0
5371 if not db_nsr["_admin"].get("scaling-group"):
5372 self.update_db_2(
5373 "nsrs",
5374 nsr_id,
5375 {
5376 "_admin.scaling-group": [
5377 {"name": scaling_group, "nb-scale-op": 0}
5378 ]
5379 },
5380 )
5381 admin_scale_index = 0
5382 else:
5383 for admin_scale_index, admin_scale_info in enumerate(
5384 db_nsr["_admin"]["scaling-group"]
5385 ):
5386 if admin_scale_info["name"] == scaling_group:
5387 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5388 break
5389 else: # not found, set index one plus last element and add new entry with the name
5390 admin_scale_index += 1
5391 db_nsr_update[
5392 "_admin.scaling-group.{}.name".format(admin_scale_index)
5393 ] = scaling_group
5394
5395 vca_scaling_info = []
5396 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5397 if scaling_type == "SCALE_OUT":
5398 if "aspect-delta-details" not in scaling_descriptor:
5399 raise LcmException(
5400 "Aspect delta details not fount in scaling descriptor {}".format(
5401 scaling_descriptor["name"]
5402 )
5403 )
5404 # count if max-instance-count is reached
5405 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5406
5407 scaling_info["scaling_direction"] = "OUT"
5408 scaling_info["vdu-create"] = {}
5409 scaling_info["kdu-create"] = {}
5410 for delta in deltas:
5411 for vdu_delta in delta.get("vdu-delta", {}):
5412 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5413 # vdu_index also provides the number of instance of the targeted vdu
5414 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5415 cloud_init_text = self._get_vdu_cloud_init_content(
5416 vdud, db_vnfd
5417 )
5418 if cloud_init_text:
5419 additional_params = (
5420 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5421 or {}
5422 )
5423 cloud_init_list = []
5424
5425 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5426 max_instance_count = 10
5427 if vdu_profile and "max-number-of-instances" in vdu_profile:
5428 max_instance_count = vdu_profile.get(
5429 "max-number-of-instances", 10
5430 )
5431
5432 default_instance_num = get_number_of_instances(
5433 db_vnfd, vdud["id"]
5434 )
5435 instances_number = vdu_delta.get("number-of-instances", 1)
5436 nb_scale_op += instances_number
5437
5438 new_instance_count = nb_scale_op + default_instance_num
5439 # Control if new count is over max and vdu count is less than max.
5440 # Then assign new instance count
5441 if new_instance_count > max_instance_count > vdu_count:
5442 instances_number = new_instance_count - max_instance_count
5443 else:
5444 instances_number = instances_number
5445
5446 if new_instance_count > max_instance_count:
5447 raise LcmException(
5448 "reached the limit of {} (max-instance-count) "
5449 "scaling-out operations for the "
5450 "scaling-group-descriptor '{}'".format(
5451 nb_scale_op, scaling_group
5452 )
5453 )
5454 for x in range(vdu_delta.get("number-of-instances", 1)):
5455 if cloud_init_text:
5456 # TODO Information of its own ip is not available because db_vnfr is not updated.
5457 additional_params["OSM"] = get_osm_params(
5458 db_vnfr, vdu_delta["id"], vdu_index + x
5459 )
5460 cloud_init_list.append(
5461 self._parse_cloud_init(
5462 cloud_init_text,
5463 additional_params,
5464 db_vnfd["id"],
5465 vdud["id"],
5466 )
5467 )
5468 vca_scaling_info.append(
5469 {
5470 "osm_vdu_id": vdu_delta["id"],
5471 "member-vnf-index": vnf_index,
5472 "type": "create",
5473 "vdu_index": vdu_index + x,
5474 }
5475 )
5476 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5477 for kdu_delta in delta.get("kdu-resource-delta", {}):
5478 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5479 kdu_name = kdu_profile["kdu-name"]
5480 resource_name = kdu_profile["resource-name"]
5481
5482 # Might have different kdus in the same delta
5483 # Should have list for each kdu
5484 if not scaling_info["kdu-create"].get(kdu_name, None):
5485 scaling_info["kdu-create"][kdu_name] = []
5486
5487 kdur = get_kdur(db_vnfr, kdu_name)
5488 if kdur.get("helm-chart"):
5489 k8s_cluster_type = "helm-chart-v3"
5490 self.logger.debug("kdur: {}".format(kdur))
5491 if (
5492 kdur.get("helm-version")
5493 and kdur.get("helm-version") == "v2"
5494 ):
5495 k8s_cluster_type = "helm-chart"
5496 raise NotImplementedError
5497 elif kdur.get("juju-bundle"):
5498 k8s_cluster_type = "juju-bundle"
5499 else:
5500 raise LcmException(
5501 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5502 "juju-bundle. Maybe an old NBI version is running".format(
5503 db_vnfr["member-vnf-index-ref"], kdu_name
5504 )
5505 )
5506
5507 max_instance_count = 10
5508 if kdu_profile and "max-number-of-instances" in kdu_profile:
5509 max_instance_count = kdu_profile.get(
5510 "max-number-of-instances", 10
5511 )
5512
5513 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5514 deployed_kdu, _ = get_deployed_kdu(
5515 nsr_deployed, kdu_name, vnf_index
5516 )
5517 if deployed_kdu is None:
5518 raise LcmException(
5519 "KDU '{}' for vnf '{}' not deployed".format(
5520 kdu_name, vnf_index
5521 )
5522 )
5523 kdu_instance = deployed_kdu.get("kdu-instance")
5524 instance_num = await self.k8scluster_map[
5525 k8s_cluster_type
5526 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5527 kdu_replica_count = instance_num + kdu_delta.get(
5528 "number-of-instances", 1
5529 )
5530
5531 # Control if new count is over max and instance_num is less than max.
5532 # Then assign max instance number to kdu replica count
5533 if kdu_replica_count > max_instance_count > instance_num:
5534 kdu_replica_count = max_instance_count
5535 if kdu_replica_count > max_instance_count:
5536 raise LcmException(
5537 "reached the limit of {} (max-instance-count) "
5538 "scaling-out operations for the "
5539 "scaling-group-descriptor '{}'".format(
5540 instance_num, scaling_group
5541 )
5542 )
5543
5544 for x in range(kdu_delta.get("number-of-instances", 1)):
5545 vca_scaling_info.append(
5546 {
5547 "osm_kdu_id": kdu_name,
5548 "member-vnf-index": vnf_index,
5549 "type": "create",
5550 "kdu_index": instance_num + x - 1,
5551 }
5552 )
5553 scaling_info["kdu-create"][kdu_name].append(
5554 {
5555 "member-vnf-index": vnf_index,
5556 "type": "create",
5557 "k8s-cluster-type": k8s_cluster_type,
5558 "resource-name": resource_name,
5559 "scale": kdu_replica_count,
5560 }
5561 )
5562 elif scaling_type == "SCALE_IN":
5563 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5564
5565 scaling_info["scaling_direction"] = "IN"
5566 scaling_info["vdu-delete"] = {}
5567 scaling_info["kdu-delete"] = {}
5568
5569 for delta in deltas:
5570 for vdu_delta in delta.get("vdu-delta", {}):
5571 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5572 min_instance_count = 0
5573 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5574 if vdu_profile and "min-number-of-instances" in vdu_profile:
5575 min_instance_count = vdu_profile["min-number-of-instances"]
5576
5577 default_instance_num = get_number_of_instances(
5578 db_vnfd, vdu_delta["id"]
5579 )
5580 instance_num = vdu_delta.get("number-of-instances", 1)
5581 nb_scale_op -= instance_num
5582
5583 new_instance_count = nb_scale_op + default_instance_num
5584
5585 if new_instance_count < min_instance_count < vdu_count:
5586 instances_number = min_instance_count - new_instance_count
5587 else:
5588 instances_number = instance_num
5589
5590 if new_instance_count < min_instance_count:
5591 raise LcmException(
5592 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5593 "scaling-group-descriptor '{}'".format(
5594 nb_scale_op, scaling_group
5595 )
5596 )
5597 for x in range(vdu_delta.get("number-of-instances", 1)):
5598 vca_scaling_info.append(
5599 {
5600 "osm_vdu_id": vdu_delta["id"],
5601 "member-vnf-index": vnf_index,
5602 "type": "delete",
5603 "vdu_index": vdu_index - 1 - x,
5604 }
5605 )
5606 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5607 for kdu_delta in delta.get("kdu-resource-delta", {}):
5608 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5609 kdu_name = kdu_profile["kdu-name"]
5610 resource_name = kdu_profile["resource-name"]
5611
5612 if not scaling_info["kdu-delete"].get(kdu_name, None):
5613 scaling_info["kdu-delete"][kdu_name] = []
5614
5615 kdur = get_kdur(db_vnfr, kdu_name)
5616 if kdur.get("helm-chart"):
5617 k8s_cluster_type = "helm-chart-v3"
5618 self.logger.debug("kdur: {}".format(kdur))
5619 if (
5620 kdur.get("helm-version")
5621 and kdur.get("helm-version") == "v2"
5622 ):
5623 k8s_cluster_type = "helm-chart"
5624 raise NotImplementedError
5625 elif kdur.get("juju-bundle"):
5626 k8s_cluster_type = "juju-bundle"
5627 else:
5628 raise LcmException(
5629 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5630 "juju-bundle. Maybe an old NBI version is running".format(
5631 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5632 )
5633 )
5634
5635 min_instance_count = 0
5636 if kdu_profile and "min-number-of-instances" in kdu_profile:
5637 min_instance_count = kdu_profile["min-number-of-instances"]
5638
5639 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5640 deployed_kdu, _ = get_deployed_kdu(
5641 nsr_deployed, kdu_name, vnf_index
5642 )
5643 if deployed_kdu is None:
5644 raise LcmException(
5645 "KDU '{}' for vnf '{}' not deployed".format(
5646 kdu_name, vnf_index
5647 )
5648 )
5649 kdu_instance = deployed_kdu.get("kdu-instance")
5650 instance_num = await self.k8scluster_map[
5651 k8s_cluster_type
5652 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5653 kdu_replica_count = instance_num - kdu_delta.get(
5654 "number-of-instances", 1
5655 )
5656
5657 if kdu_replica_count < min_instance_count < instance_num:
5658 kdu_replica_count = min_instance_count
5659 if kdu_replica_count < min_instance_count:
5660 raise LcmException(
5661 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5662 "scaling-group-descriptor '{}'".format(
5663 instance_num, scaling_group
5664 )
5665 )
5666
5667 for x in range(kdu_delta.get("number-of-instances", 1)):
5668 vca_scaling_info.append(
5669 {
5670 "osm_kdu_id": kdu_name,
5671 "member-vnf-index": vnf_index,
5672 "type": "delete",
5673 "kdu_index": instance_num - x - 1,
5674 }
5675 )
5676 scaling_info["kdu-delete"][kdu_name].append(
5677 {
5678 "member-vnf-index": vnf_index,
5679 "type": "delete",
5680 "k8s-cluster-type": k8s_cluster_type,
5681 "resource-name": resource_name,
5682 "scale": kdu_replica_count,
5683 }
5684 )
5685
5686 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5687 vdu_delete = copy(scaling_info.get("vdu-delete"))
5688 if scaling_info["scaling_direction"] == "IN":
5689 for vdur in reversed(db_vnfr["vdur"]):
5690 if vdu_delete.get(vdur["vdu-id-ref"]):
5691 vdu_delete[vdur["vdu-id-ref"]] -= 1
5692 scaling_info["vdu"].append(
5693 {
5694 "name": vdur.get("name") or vdur.get("vdu-name"),
5695 "vdu_id": vdur["vdu-id-ref"],
5696 "interface": [],
5697 }
5698 )
5699 for interface in vdur["interfaces"]:
5700 scaling_info["vdu"][-1]["interface"].append(
5701 {
5702 "name": interface["name"],
5703 "ip_address": interface["ip-address"],
5704 "mac_address": interface.get("mac-address"),
5705 }
5706 )
5707 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5708
5709 # PRE-SCALE BEGIN
5710 step = "Executing pre-scale vnf-config-primitive"
5711 if scaling_descriptor.get("scaling-config-action"):
5712 for scaling_config_action in scaling_descriptor[
5713 "scaling-config-action"
5714 ]:
5715 if (
5716 scaling_config_action.get("trigger") == "pre-scale-in"
5717 and scaling_type == "SCALE_IN"
5718 ) or (
5719 scaling_config_action.get("trigger") == "pre-scale-out"
5720 and scaling_type == "SCALE_OUT"
5721 ):
5722 vnf_config_primitive = scaling_config_action[
5723 "vnf-config-primitive-name-ref"
5724 ]
5725 step = db_nslcmop_update[
5726 "detailed-status"
5727 ] = "executing pre-scale scaling-config-action '{}'".format(
5728 vnf_config_primitive
5729 )
5730
5731 # look for primitive
5732 for config_primitive in (
5733 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5734 ).get("config-primitive", ()):
5735 if config_primitive["name"] == vnf_config_primitive:
5736 break
5737 else:
5738 raise LcmException(
5739 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5740 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5741 "primitive".format(scaling_group, vnf_config_primitive)
5742 )
5743
5744 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5745 if db_vnfr.get("additionalParamsForVnf"):
5746 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5747
5748 scale_process = "VCA"
5749 db_nsr_update["config-status"] = "configuring pre-scaling"
5750 primitive_params = self._map_primitive_params(
5751 config_primitive, {}, vnfr_params
5752 )
5753
5754 # Pre-scale retry check: Check if this sub-operation has been executed before
5755 op_index = self._check_or_add_scale_suboperation(
5756 db_nslcmop,
5757 vnf_index,
5758 vnf_config_primitive,
5759 primitive_params,
5760 "PRE-SCALE",
5761 )
5762 if op_index == self.SUBOPERATION_STATUS_SKIP:
5763 # Skip sub-operation
5764 result = "COMPLETED"
5765 result_detail = "Done"
5766 self.logger.debug(
5767 logging_text
5768 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5769 vnf_config_primitive, result, result_detail
5770 )
5771 )
5772 else:
5773 if op_index == self.SUBOPERATION_STATUS_NEW:
5774 # New sub-operation: Get index of this sub-operation
5775 op_index = (
5776 len(db_nslcmop.get("_admin", {}).get("operations"))
5777 - 1
5778 )
5779 self.logger.debug(
5780 logging_text
5781 + "vnf_config_primitive={} New sub-operation".format(
5782 vnf_config_primitive
5783 )
5784 )
5785 else:
5786 # retry: Get registered params for this existing sub-operation
5787 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5788 op_index
5789 ]
5790 vnf_index = op.get("member_vnf_index")
5791 vnf_config_primitive = op.get("primitive")
5792 primitive_params = op.get("primitive_params")
5793 self.logger.debug(
5794 logging_text
5795 + "vnf_config_primitive={} Sub-operation retry".format(
5796 vnf_config_primitive
5797 )
5798 )
5799 # Execute the primitive, either with new (first-time) or registered (reintent) args
5800 ee_descriptor_id = config_primitive.get(
5801 "execution-environment-ref"
5802 )
5803 primitive_name = config_primitive.get(
5804 "execution-environment-primitive", vnf_config_primitive
5805 )
5806 ee_id, vca_type = self._look_for_deployed_vca(
5807 nsr_deployed["VCA"],
5808 member_vnf_index=vnf_index,
5809 vdu_id=None,
5810 vdu_count_index=None,
5811 ee_descriptor_id=ee_descriptor_id,
5812 )
5813 result, result_detail = await self._ns_execute_primitive(
5814 ee_id,
5815 primitive_name,
5816 primitive_params,
5817 vca_type=vca_type,
5818 vca_id=vca_id,
5819 )
5820 self.logger.debug(
5821 logging_text
5822 + "vnf_config_primitive={} Done with result {} {}".format(
5823 vnf_config_primitive, result, result_detail
5824 )
5825 )
5826 # Update operationState = COMPLETED | FAILED
5827 self._update_suboperation_status(
5828 db_nslcmop, op_index, result, result_detail
5829 )
5830
5831 if result == "FAILED":
5832 raise LcmException(result_detail)
5833 db_nsr_update["config-status"] = old_config_status
5834 scale_process = None
5835 # PRE-SCALE END
5836
5837 db_nsr_update[
5838 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5839 ] = nb_scale_op
5840 db_nsr_update[
5841 "_admin.scaling-group.{}.time".format(admin_scale_index)
5842 ] = time()
5843
5844 # SCALE-IN VCA - BEGIN
5845 if vca_scaling_info:
5846 step = db_nslcmop_update[
5847 "detailed-status"
5848 ] = "Deleting the execution environments"
5849 scale_process = "VCA"
5850 for vca_info in vca_scaling_info:
5851 if vca_info["type"] == "delete":
5852 member_vnf_index = str(vca_info["member-vnf-index"])
5853 self.logger.debug(
5854 logging_text + "vdu info: {}".format(vca_info)
5855 )
5856 if vca_info.get("osm_vdu_id"):
5857 vdu_id = vca_info["osm_vdu_id"]
5858 vdu_index = int(vca_info["vdu_index"])
5859 stage[
5860 1
5861 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5862 member_vnf_index, vdu_id, vdu_index
5863 )
5864 else:
5865 vdu_index = 0
5866 kdu_id = vca_info["osm_kdu_id"]
5867 stage[
5868 1
5869 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5870 member_vnf_index, kdu_id, vdu_index
5871 )
5872 stage[2] = step = "Scaling in VCA"
5873 self._write_op_status(op_id=nslcmop_id, stage=stage)
5874 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5875 config_update = db_nsr["configurationStatus"]
5876 for vca_index, vca in enumerate(vca_update):
5877 if (
5878 (vca or vca.get("ee_id"))
5879 and vca["member-vnf-index"] == member_vnf_index
5880 and vca["vdu_count_index"] == vdu_index
5881 ):
5882 if vca.get("vdu_id"):
5883 config_descriptor = get_configuration(
5884 db_vnfd, vca.get("vdu_id")
5885 )
5886 elif vca.get("kdu_name"):
5887 config_descriptor = get_configuration(
5888 db_vnfd, vca.get("kdu_name")
5889 )
5890 else:
5891 config_descriptor = get_configuration(
5892 db_vnfd, db_vnfd["id"]
5893 )
5894 operation_params = (
5895 db_nslcmop.get("operationParams") or {}
5896 )
5897 exec_terminate_primitives = not operation_params.get(
5898 "skip_terminate_primitives"
5899 ) and vca.get("needed_terminate")
5900 task = asyncio.ensure_future(
5901 asyncio.wait_for(
5902 self.destroy_N2VC(
5903 logging_text,
5904 db_nslcmop,
5905 vca,
5906 config_descriptor,
5907 vca_index,
5908 destroy_ee=True,
5909 exec_primitives=exec_terminate_primitives,
5910 scaling_in=True,
5911 vca_id=vca_id,
5912 ),
5913 timeout=self.timeout_charm_delete,
5914 )
5915 )
5916 tasks_dict_info[task] = "Terminating VCA {}".format(
5917 vca.get("ee_id")
5918 )
5919 del vca_update[vca_index]
5920 del config_update[vca_index]
5921 # wait for pending tasks of terminate primitives
5922 if tasks_dict_info:
5923 self.logger.debug(
5924 logging_text
5925 + "Waiting for tasks {}".format(
5926 list(tasks_dict_info.keys())
5927 )
5928 )
5929 error_list = await self._wait_for_tasks(
5930 logging_text,
5931 tasks_dict_info,
5932 min(
5933 self.timeout_charm_delete, self.timeout_ns_terminate
5934 ),
5935 stage,
5936 nslcmop_id,
5937 )
5938 tasks_dict_info.clear()
5939 if error_list:
5940 raise LcmException("; ".join(error_list))
5941
5942 db_vca_and_config_update = {
5943 "_admin.deployed.VCA": vca_update,
5944 "configurationStatus": config_update,
5945 }
5946 self.update_db_2(
5947 "nsrs", db_nsr["_id"], db_vca_and_config_update
5948 )
5949 scale_process = None
5950 # SCALE-IN VCA - END
5951
5952 # SCALE RO - BEGIN
5953 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5954 scale_process = "RO"
5955 if self.ro_config.get("ng"):
5956 await self._scale_ng_ro(
5957 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5958 )
5959 scaling_info.pop("vdu-create", None)
5960 scaling_info.pop("vdu-delete", None)
5961
5962 scale_process = None
5963 # SCALE RO - END
5964
5965 # SCALE KDU - BEGIN
5966 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5967 scale_process = "KDU"
5968 await self._scale_kdu(
5969 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5970 )
5971 scaling_info.pop("kdu-create", None)
5972 scaling_info.pop("kdu-delete", None)
5973
5974 scale_process = None
5975 # SCALE KDU - END
5976
5977 if db_nsr_update:
5978 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5979
5980 # SCALE-UP VCA - BEGIN
5981 if vca_scaling_info:
5982 step = db_nslcmop_update[
5983 "detailed-status"
5984 ] = "Creating new execution environments"
5985 scale_process = "VCA"
5986 for vca_info in vca_scaling_info:
5987 if vca_info["type"] == "create":
5988 member_vnf_index = str(vca_info["member-vnf-index"])
5989 self.logger.debug(
5990 logging_text + "vdu info: {}".format(vca_info)
5991 )
5992 vnfd_id = db_vnfr["vnfd-ref"]
5993 if vca_info.get("osm_vdu_id"):
5994 vdu_index = int(vca_info["vdu_index"])
5995 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5996 if db_vnfr.get("additionalParamsForVnf"):
5997 deploy_params.update(
5998 parse_yaml_strings(
5999 db_vnfr["additionalParamsForVnf"].copy()
6000 )
6001 )
6002 descriptor_config = get_configuration(
6003 db_vnfd, db_vnfd["id"]
6004 )
6005 if descriptor_config:
6006 vdu_id = None
6007 vdu_name = None
6008 kdu_name = None
6009 self._deploy_n2vc(
6010 logging_text=logging_text
6011 + "member_vnf_index={} ".format(member_vnf_index),
6012 db_nsr=db_nsr,
6013 db_vnfr=db_vnfr,
6014 nslcmop_id=nslcmop_id,
6015 nsr_id=nsr_id,
6016 nsi_id=nsi_id,
6017 vnfd_id=vnfd_id,
6018 vdu_id=vdu_id,
6019 kdu_name=kdu_name,
6020 member_vnf_index=member_vnf_index,
6021 vdu_index=vdu_index,
6022 vdu_name=vdu_name,
6023 deploy_params=deploy_params,
6024 descriptor_config=descriptor_config,
6025 base_folder=base_folder,
6026 task_instantiation_info=tasks_dict_info,
6027 stage=stage,
6028 )
6029 vdu_id = vca_info["osm_vdu_id"]
6030 vdur = find_in_list(
6031 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6032 )
6033 descriptor_config = get_configuration(db_vnfd, vdu_id)
6034 if vdur.get("additionalParams"):
6035 deploy_params_vdu = parse_yaml_strings(
6036 vdur["additionalParams"]
6037 )
6038 else:
6039 deploy_params_vdu = deploy_params
6040 deploy_params_vdu["OSM"] = get_osm_params(
6041 db_vnfr, vdu_id, vdu_count_index=vdu_index
6042 )
6043 if descriptor_config:
6044 vdu_name = None
6045 kdu_name = None
6046 stage[
6047 1
6048 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6049 member_vnf_index, vdu_id, vdu_index
6050 )
6051 stage[2] = step = "Scaling out VCA"
6052 self._write_op_status(op_id=nslcmop_id, stage=stage)
6053 self._deploy_n2vc(
6054 logging_text=logging_text
6055 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6056 member_vnf_index, vdu_id, vdu_index
6057 ),
6058 db_nsr=db_nsr,
6059 db_vnfr=db_vnfr,
6060 nslcmop_id=nslcmop_id,
6061 nsr_id=nsr_id,
6062 nsi_id=nsi_id,
6063 vnfd_id=vnfd_id,
6064 vdu_id=vdu_id,
6065 kdu_name=kdu_name,
6066 member_vnf_index=member_vnf_index,
6067 vdu_index=vdu_index,
6068 vdu_name=vdu_name,
6069 deploy_params=deploy_params_vdu,
6070 descriptor_config=descriptor_config,
6071 base_folder=base_folder,
6072 task_instantiation_info=tasks_dict_info,
6073 stage=stage,
6074 )
6075 else:
6076 kdu_name = vca_info["osm_kdu_id"]
6077 descriptor_config = get_configuration(db_vnfd, kdu_name)
6078 if descriptor_config:
6079 vdu_id = None
6080 kdu_index = int(vca_info["kdu_index"])
6081 vdu_name = None
6082 kdur = next(
6083 x
6084 for x in db_vnfr["kdur"]
6085 if x["kdu-name"] == kdu_name
6086 )
6087 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6088 if kdur.get("additionalParams"):
6089 deploy_params_kdu = parse_yaml_strings(
6090 kdur["additionalParams"]
6091 )
6092
6093 self._deploy_n2vc(
6094 logging_text=logging_text,
6095 db_nsr=db_nsr,
6096 db_vnfr=db_vnfr,
6097 nslcmop_id=nslcmop_id,
6098 nsr_id=nsr_id,
6099 nsi_id=nsi_id,
6100 vnfd_id=vnfd_id,
6101 vdu_id=vdu_id,
6102 kdu_name=kdu_name,
6103 member_vnf_index=member_vnf_index,
6104 vdu_index=kdu_index,
6105 vdu_name=vdu_name,
6106 deploy_params=deploy_params_kdu,
6107 descriptor_config=descriptor_config,
6108 base_folder=base_folder,
6109 task_instantiation_info=tasks_dict_info,
6110 stage=stage,
6111 )
6112 # SCALE-UP VCA - END
6113 scale_process = None
6114
6115 # POST-SCALE BEGIN
6116 # execute primitive service POST-SCALING
6117 step = "Executing post-scale vnf-config-primitive"
6118 if scaling_descriptor.get("scaling-config-action"):
6119 for scaling_config_action in scaling_descriptor[
6120 "scaling-config-action"
6121 ]:
6122 if (
6123 scaling_config_action.get("trigger") == "post-scale-in"
6124 and scaling_type == "SCALE_IN"
6125 ) or (
6126 scaling_config_action.get("trigger") == "post-scale-out"
6127 and scaling_type == "SCALE_OUT"
6128 ):
6129 vnf_config_primitive = scaling_config_action[
6130 "vnf-config-primitive-name-ref"
6131 ]
6132 step = db_nslcmop_update[
6133 "detailed-status"
6134 ] = "executing post-scale scaling-config-action '{}'".format(
6135 vnf_config_primitive
6136 )
6137
6138 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6139 if db_vnfr.get("additionalParamsForVnf"):
6140 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6141
6142 # look for primitive
6143 for config_primitive in (
6144 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6145 ).get("config-primitive", ()):
6146 if config_primitive["name"] == vnf_config_primitive:
6147 break
6148 else:
6149 raise LcmException(
6150 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6151 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6152 "config-primitive".format(
6153 scaling_group, vnf_config_primitive
6154 )
6155 )
6156 scale_process = "VCA"
6157 db_nsr_update["config-status"] = "configuring post-scaling"
6158 primitive_params = self._map_primitive_params(
6159 config_primitive, {}, vnfr_params
6160 )
6161
6162 # Post-scale retry check: Check if this sub-operation has been executed before
6163 op_index = self._check_or_add_scale_suboperation(
6164 db_nslcmop,
6165 vnf_index,
6166 vnf_config_primitive,
6167 primitive_params,
6168 "POST-SCALE",
6169 )
6170 if op_index == self.SUBOPERATION_STATUS_SKIP:
6171 # Skip sub-operation
6172 result = "COMPLETED"
6173 result_detail = "Done"
6174 self.logger.debug(
6175 logging_text
6176 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6177 vnf_config_primitive, result, result_detail
6178 )
6179 )
6180 else:
6181 if op_index == self.SUBOPERATION_STATUS_NEW:
6182 # New sub-operation: Get index of this sub-operation
6183 op_index = (
6184 len(db_nslcmop.get("_admin", {}).get("operations"))
6185 - 1
6186 )
6187 self.logger.debug(
6188 logging_text
6189 + "vnf_config_primitive={} New sub-operation".format(
6190 vnf_config_primitive
6191 )
6192 )
6193 else:
6194 # retry: Get registered params for this existing sub-operation
6195 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6196 op_index
6197 ]
6198 vnf_index = op.get("member_vnf_index")
6199 vnf_config_primitive = op.get("primitive")
6200 primitive_params = op.get("primitive_params")
6201 self.logger.debug(
6202 logging_text
6203 + "vnf_config_primitive={} Sub-operation retry".format(
6204 vnf_config_primitive
6205 )
6206 )
6207 # Execute the primitive, either with new (first-time) or registered (reintent) args
6208 ee_descriptor_id = config_primitive.get(
6209 "execution-environment-ref"
6210 )
6211 primitive_name = config_primitive.get(
6212 "execution-environment-primitive", vnf_config_primitive
6213 )
6214 ee_id, vca_type = self._look_for_deployed_vca(
6215 nsr_deployed["VCA"],
6216 member_vnf_index=vnf_index,
6217 vdu_id=None,
6218 vdu_count_index=None,
6219 ee_descriptor_id=ee_descriptor_id,
6220 )
6221 result, result_detail = await self._ns_execute_primitive(
6222 ee_id,
6223 primitive_name,
6224 primitive_params,
6225 vca_type=vca_type,
6226 vca_id=vca_id,
6227 )
6228 self.logger.debug(
6229 logging_text
6230 + "vnf_config_primitive={} Done with result {} {}".format(
6231 vnf_config_primitive, result, result_detail
6232 )
6233 )
6234 # Update operationState = COMPLETED | FAILED
6235 self._update_suboperation_status(
6236 db_nslcmop, op_index, result, result_detail
6237 )
6238
6239 if result == "FAILED":
6240 raise LcmException(result_detail)
6241 db_nsr_update["config-status"] = old_config_status
6242 scale_process = None
6243 # POST-SCALE END
6244
6245 db_nsr_update[
6246 "detailed-status"
6247 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6248 db_nsr_update["operational-status"] = (
6249 "running"
6250 if old_operational_status == "failed"
6251 else old_operational_status
6252 )
6253 db_nsr_update["config-status"] = old_config_status
6254 return
6255 except (
6256 ROclient.ROClientException,
6257 DbException,
6258 LcmException,
6259 NgRoException,
6260 ) as e:
6261 self.logger.error(logging_text + "Exit Exception {}".format(e))
6262 exc = e
6263 except asyncio.CancelledError:
6264 self.logger.error(
6265 logging_text + "Cancelled Exception while '{}'".format(step)
6266 )
6267 exc = "Operation was cancelled"
6268 except Exception as e:
6269 exc = traceback.format_exc()
6270 self.logger.critical(
6271 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6272 exc_info=True,
6273 )
6274 finally:
6275 self._write_ns_status(
6276 nsr_id=nsr_id,
6277 ns_state=None,
6278 current_operation="IDLE",
6279 current_operation_id=None,
6280 )
6281 if tasks_dict_info:
6282 stage[1] = "Waiting for instantiate pending tasks."
6283 self.logger.debug(logging_text + stage[1])
6284 exc = await self._wait_for_tasks(
6285 logging_text,
6286 tasks_dict_info,
6287 self.timeout_ns_deploy,
6288 stage,
6289 nslcmop_id,
6290 nsr_id=nsr_id,
6291 )
6292 if exc:
6293 db_nslcmop_update[
6294 "detailed-status"
6295 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6296 nslcmop_operation_state = "FAILED"
6297 if db_nsr:
6298 db_nsr_update["operational-status"] = old_operational_status
6299 db_nsr_update["config-status"] = old_config_status
6300 db_nsr_update["detailed-status"] = ""
6301 if scale_process:
6302 if "VCA" in scale_process:
6303 db_nsr_update["config-status"] = "failed"
6304 if "RO" in scale_process:
6305 db_nsr_update["operational-status"] = "failed"
6306 db_nsr_update[
6307 "detailed-status"
6308 ] = "FAILED scaling nslcmop={} {}: {}".format(
6309 nslcmop_id, step, exc
6310 )
6311 else:
6312 error_description_nslcmop = None
6313 nslcmop_operation_state = "COMPLETED"
6314 db_nslcmop_update["detailed-status"] = "Done"
6315
6316 self._write_op_status(
6317 op_id=nslcmop_id,
6318 stage="",
6319 error_message=error_description_nslcmop,
6320 operation_state=nslcmop_operation_state,
6321 other_update=db_nslcmop_update,
6322 )
6323 if db_nsr:
6324 self._write_ns_status(
6325 nsr_id=nsr_id,
6326 ns_state=None,
6327 current_operation="IDLE",
6328 current_operation_id=None,
6329 other_update=db_nsr_update,
6330 )
6331
6332 if nslcmop_operation_state:
6333 try:
6334 msg = {
6335 "nsr_id": nsr_id,
6336 "nslcmop_id": nslcmop_id,
6337 "operationState": nslcmop_operation_state,
6338 }
6339 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6340 except Exception as e:
6341 self.logger.error(
6342 logging_text + "kafka_write notification Exception {}".format(e)
6343 )
6344 self.logger.debug(logging_text + "Exit")
6345 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6346
6347 async def _scale_kdu(
6348 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6349 ):
6350 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6351 for kdu_name in _scaling_info:
6352 for kdu_scaling_info in _scaling_info[kdu_name]:
6353 deployed_kdu, index = get_deployed_kdu(
6354 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6355 )
6356 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6357 kdu_instance = deployed_kdu["kdu-instance"]
6358 scale = int(kdu_scaling_info["scale"])
6359 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6360
6361 db_dict = {
6362 "collection": "nsrs",
6363 "filter": {"_id": nsr_id},
6364 "path": "_admin.deployed.K8s.{}".format(index),
6365 }
6366
6367 step = "scaling application {}".format(
6368 kdu_scaling_info["resource-name"]
6369 )
6370 self.logger.debug(logging_text + step)
6371
6372 if kdu_scaling_info["type"] == "delete":
6373 kdu_config = get_configuration(db_vnfd, kdu_name)
6374 if (
6375 kdu_config
6376 and kdu_config.get("terminate-config-primitive")
6377 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6378 ):
6379 terminate_config_primitive_list = kdu_config.get(
6380 "terminate-config-primitive"
6381 )
6382 terminate_config_primitive_list.sort(
6383 key=lambda val: int(val["seq"])
6384 )
6385
6386 for (
6387 terminate_config_primitive
6388 ) in terminate_config_primitive_list:
6389 primitive_params_ = self._map_primitive_params(
6390 terminate_config_primitive, {}, {}
6391 )
6392 step = "execute terminate config primitive"
6393 self.logger.debug(logging_text + step)
6394 await asyncio.wait_for(
6395 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6396 cluster_uuid=cluster_uuid,
6397 kdu_instance=kdu_instance,
6398 primitive_name=terminate_config_primitive["name"],
6399 params=primitive_params_,
6400 db_dict=db_dict,
6401 vca_id=vca_id,
6402 ),
6403 timeout=600,
6404 )
6405
6406 await asyncio.wait_for(
6407 self.k8scluster_map[k8s_cluster_type].scale(
6408 kdu_instance,
6409 scale,
6410 kdu_scaling_info["resource-name"],
6411 vca_id=vca_id,
6412 ),
6413 timeout=self.timeout_vca_on_error,
6414 )
6415
6416 if kdu_scaling_info["type"] == "create":
6417 kdu_config = get_configuration(db_vnfd, kdu_name)
6418 if (
6419 kdu_config
6420 and kdu_config.get("initial-config-primitive")
6421 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6422 ):
6423 initial_config_primitive_list = kdu_config.get(
6424 "initial-config-primitive"
6425 )
6426 initial_config_primitive_list.sort(
6427 key=lambda val: int(val["seq"])
6428 )
6429
6430 for initial_config_primitive in initial_config_primitive_list:
6431 primitive_params_ = self._map_primitive_params(
6432 initial_config_primitive, {}, {}
6433 )
6434 step = "execute initial config primitive"
6435 self.logger.debug(logging_text + step)
6436 await asyncio.wait_for(
6437 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6438 cluster_uuid=cluster_uuid,
6439 kdu_instance=kdu_instance,
6440 primitive_name=initial_config_primitive["name"],
6441 params=primitive_params_,
6442 db_dict=db_dict,
6443 vca_id=vca_id,
6444 ),
6445 timeout=600,
6446 )
6447
6448 async def _scale_ng_ro(
6449 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6450 ):
6451 nsr_id = db_nslcmop["nsInstanceId"]
6452 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6453 db_vnfrs = {}
6454
6455 # read from db: vnfd's for every vnf
6456 db_vnfds = []
6457
6458 # for each vnf in ns, read vnfd
6459 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6460 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6461 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6462 # if we haven't this vnfd, read it from db
6463 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6464 # read from db
6465 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6466 db_vnfds.append(vnfd)
6467 n2vc_key = self.n2vc.get_public_key()
6468 n2vc_key_list = [n2vc_key]
6469 self.scale_vnfr(
6470 db_vnfr,
6471 vdu_scaling_info.get("vdu-create"),
6472 vdu_scaling_info.get("vdu-delete"),
6473 mark_delete=True,
6474 )
6475 # db_vnfr has been updated, update db_vnfrs to use it
6476 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6477 await self._instantiate_ng_ro(
6478 logging_text,
6479 nsr_id,
6480 db_nsd,
6481 db_nsr,
6482 db_nslcmop,
6483 db_vnfrs,
6484 db_vnfds,
6485 n2vc_key_list,
6486 stage=stage,
6487 start_deploy=time(),
6488 timeout_ns_deploy=self.timeout_ns_deploy,
6489 )
6490 if vdu_scaling_info.get("vdu-delete"):
6491 self.scale_vnfr(
6492 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6493 )
6494
6495 async def extract_prometheus_scrape_jobs(
6496 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6497 ):
6498 # look if exist a file called 'prometheus*.j2' and
6499 artifact_content = self.fs.dir_ls(artifact_path)
6500 job_file = next(
6501 (
6502 f
6503 for f in artifact_content
6504 if f.startswith("prometheus") and f.endswith(".j2")
6505 ),
6506 None,
6507 )
6508 if not job_file:
6509 return
6510 with self.fs.file_open((artifact_path, job_file), "r") as f:
6511 job_data = f.read()
6512
6513 # TODO get_service
6514 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6515 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6516 host_port = "80"
6517 vnfr_id = vnfr_id.replace("-", "")
6518 variables = {
6519 "JOB_NAME": vnfr_id,
6520 "TARGET_IP": target_ip,
6521 "EXPORTER_POD_IP": host_name,
6522 "EXPORTER_POD_PORT": host_port,
6523 }
6524 job_list = parse_job(job_data, variables)
6525 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6526 for job in job_list:
6527 if (
6528 not isinstance(job.get("job_name"), str)
6529 or vnfr_id not in job["job_name"]
6530 ):
6531 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6532 job["nsr_id"] = nsr_id
6533 job["vnfr_id"] = vnfr_id
6534 return job_list
6535
6536 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6537 """
6538 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6539
6540 :param: vim_account_id: VIM Account ID
6541
6542 :return: (cloud_name, cloud_credential)
6543 """
6544 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6545 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6546
6547 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6548 """
6549 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6550
6551 :param: vim_account_id: VIM Account ID
6552
6553 :return: (cloud_name, cloud_credential)
6554 """
6555 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6556 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")