Fixes bug 2031: Increasing KDU instantiation timeout from 10 to 30 minutes.
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 nsr_id = filter.get("_id")
366 try:
367 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
368 cluster_uuid=cluster_uuid,
369 kdu_instance=kdu_instance,
370 yaml_format=False,
371 complete_status=True,
372 vca_id=vca_id,
373 )
374
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 if cluster_type in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self.k8sclusterjuju.update_vca_status(
383 db_dict["vcaStatus"],
384 kdu_instance,
385 vca_id=vca_id,
386 )
387
388 self.logger.debug(
389 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
390 )
391
392 # write to database
393 self.update_db_2("nsrs", nsr_id, db_dict)
394 except (asyncio.CancelledError, asyncio.TimeoutError):
395 raise
396 except Exception as e:
397 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
398
399 @staticmethod
400 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
401 try:
402 env = Environment(undefined=StrictUndefined)
403 template = env.from_string(cloud_init_text)
404 return template.render(additional_params or {})
405 except UndefinedError as e:
406 raise LcmException(
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
410 )
411 except (TemplateError, TemplateNotFound) as e:
412 raise LcmException(
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
414 vnfd_id, vdu_id, e
415 )
416 )
417
418 def _get_vdu_cloud_init_content(self, vdu, vnfd):
419 cloud_init_content = cloud_init_file = None
420 try:
421 if vdu.get("cloud-init-file"):
422 base_folder = vnfd["_admin"]["storage"]
423 if base_folder["pkg-dir"]:
424 cloud_init_file = "{}/{}/cloud_init/{}".format(
425 base_folder["folder"],
426 base_folder["pkg-dir"],
427 vdu["cloud-init-file"],
428 )
429 else:
430 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
431 base_folder["folder"],
432 vdu["cloud-init-file"],
433 )
434 with self.fs.file_open(cloud_init_file, "r") as ci_file:
435 cloud_init_content = ci_file.read()
436 elif vdu.get("cloud-init"):
437 cloud_init_content = vdu["cloud-init"]
438
439 return cloud_init_content
440 except FsException as e:
441 raise LcmException(
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd["id"], vdu["id"], cloud_init_file, e
444 )
445 )
446
447 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
448 vdur = next(
449 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
450 {}
451 )
452 additional_params = vdur.get("additionalParams")
453 return parse_yaml_strings(additional_params)
454
455 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
456 """
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
463 """
464 vnfd_RO = deepcopy(vnfd)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO.pop("_id", None)
467 vnfd_RO.pop("_admin", None)
468 vnfd_RO.pop("monitoring-param", None)
469 vnfd_RO.pop("scaling-group-descriptor", None)
470 vnfd_RO.pop("kdu", None)
471 vnfd_RO.pop("k8s-cluster", None)
472 if new_id:
473 vnfd_RO["id"] = new_id
474
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu in get_iterable(vnfd_RO, "vdu"):
477 vdu.pop("cloud-init-file", None)
478 vdu.pop("cloud-init", None)
479 return vnfd_RO
480
481 @staticmethod
482 def ip_profile_2_RO(ip_profile):
483 RO_ip_profile = deepcopy(ip_profile)
484 if "dns-server" in RO_ip_profile:
485 if isinstance(RO_ip_profile["dns-server"], list):
486 RO_ip_profile["dns-address"] = []
487 for ds in RO_ip_profile.pop("dns-server"):
488 RO_ip_profile["dns-address"].append(ds["address"])
489 else:
490 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
491 if RO_ip_profile.get("ip-version") == "ipv4":
492 RO_ip_profile["ip-version"] = "IPv4"
493 if RO_ip_profile.get("ip-version") == "ipv6":
494 RO_ip_profile["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile:
496 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
497 return RO_ip_profile
498
499 def _get_ro_vim_id_for_vim_account(self, vim_account):
500 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
501 if db_vim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "VIM={} is not available. operationalState={}".format(
504 vim_account, db_vim["_admin"]["operationalState"]
505 )
506 )
507 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
508 return RO_vim_id
509
510 def get_ro_wim_id_for_wim_account(self, wim_account):
511 if isinstance(wim_account, str):
512 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
513 if db_wim["_admin"]["operationalState"] != "ENABLED":
514 raise LcmException(
515 "WIM={} is not available. operationalState={}".format(
516 wim_account, db_wim["_admin"]["operationalState"]
517 )
518 )
519 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
520 return RO_wim_id
521 else:
522 return wim_account
523
524 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
525
526 db_vdu_push_list = []
527 template_vdur = []
528 db_update = {"_admin.modified": time()}
529 if vdu_create:
530 for vdu_id, vdu_count in vdu_create.items():
531 vdur = next(
532 (
533 vdur
534 for vdur in reversed(db_vnfr["vdur"])
535 if vdur["vdu-id-ref"] == vdu_id
536 ),
537 None,
538 )
539 if not vdur:
540 # Read the template saved in the db:
541 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
542 vdur_template = db_vnfr.get("vdur-template")
543 if not vdur_template:
544 raise LcmException(
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
546 vdu_id
547 )
548 )
549 vdur = vdur_template[0]
550 #Delete a template from the database after using it
551 self.db.set_one("vnfrs",
552 {"_id": db_vnfr["_id"]},
553 None,
554 pull={"vdur-template": {"_id": vdur['_id']}}
555 )
556 for count in range(vdu_count):
557 vdur_copy = deepcopy(vdur)
558 vdur_copy["status"] = "BUILD"
559 vdur_copy["status-detailed"] = None
560 vdur_copy["ip-address"] = None
561 vdur_copy["_id"] = str(uuid4())
562 vdur_copy["count-index"] += count + 1
563 vdur_copy["id"] = "{}-{}".format(
564 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
565 )
566 vdur_copy.pop("vim_info", None)
567 for iface in vdur_copy["interfaces"]:
568 if iface.get("fixed-ip"):
569 iface["ip-address"] = self.increment_ip_mac(
570 iface["ip-address"], count + 1
571 )
572 else:
573 iface.pop("ip-address", None)
574 if iface.get("fixed-mac"):
575 iface["mac-address"] = self.increment_ip_mac(
576 iface["mac-address"], count + 1
577 )
578 else:
579 iface.pop("mac-address", None)
580 if db_vnfr["vdur"]:
581 iface.pop(
582 "mgmt_vnf", None
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list.append(vdur_copy)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
586 if vdu_delete:
587 if len(db_vnfr["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur = [db_vnfr["vdur"][0]]
591 for vdu_id, vdu_count in vdu_delete.items():
592 if mark_delete:
593 indexes_to_delete = [
594 iv[0]
595 for iv in enumerate(db_vnfr["vdur"])
596 if iv[1]["vdu-id-ref"] == vdu_id
597 ]
598 db_update.update(
599 {
600 "vdur.{}.status".format(i): "DELETING"
601 for i in indexes_to_delete[-vdu_count:]
602 }
603 )
604 else:
605 # it must be deleted one by one because common.db does not allow otherwise
606 vdus_to_delete = [
607 v
608 for v in reversed(db_vnfr["vdur"])
609 if v["vdu-id-ref"] == vdu_id
610 ]
611 for vdu in vdus_to_delete[:vdu_count]:
612 self.db.set_one(
613 "vnfrs",
614 {"_id": db_vnfr["_id"]},
615 None,
616 pull={"vdur": {"_id": vdu["_id"]}},
617 )
618 db_push = {}
619 if db_vdu_push_list:
620 db_push["vdur"] = db_vdu_push_list
621 if template_vdur:
622 db_push["vdur-template"] = template_vdur
623 if not db_push:
624 db_push = None
625 db_vnfr["vdur-template"] = template_vdur
626 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
627 # modify passed dictionary db_vnfr
628 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
629 db_vnfr["vdur"] = db_vnfr_["vdur"]
630
631 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
632 """
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
638 """
639
640 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
641 for net_RO in get_iterable(nsr_desc_RO, "nets"):
642 if vld["id"] != net_RO.get("ns_net_osm_id"):
643 continue
644 vld["vim-id"] = net_RO.get("vim_net_id")
645 vld["name"] = net_RO.get("vim_name")
646 vld["status"] = net_RO.get("status")
647 vld["status-detailed"] = net_RO.get("error_msg")
648 ns_update_nsr["vld.{}".format(vld_index)] = vld
649 break
650 else:
651 raise LcmException(
652 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
653 )
654
655 def set_vnfr_at_error(self, db_vnfrs, error_text):
656 try:
657 for db_vnfr in db_vnfrs.values():
658 vnfr_update = {"status": "ERROR"}
659 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
660 if "status" not in vdur:
661 vdur["status"] = "ERROR"
662 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
663 if error_text:
664 vdur["status-detailed"] = str(error_text)
665 vnfr_update[
666 "vdur.{}.status-detailed".format(vdu_index)
667 ] = "ERROR"
668 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
669 except DbException as e:
670 self.logger.error("Cannot update vnf. {}".format(e))
671
672 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
673 """
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
678 """
679 for vnf_index, db_vnfr in db_vnfrs.items():
680 for vnf_RO in nsr_desc_RO["vnfs"]:
681 if vnf_RO["member_vnf_index"] != vnf_index:
682 continue
683 vnfr_update = {}
684 if vnf_RO.get("ip_address"):
685 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
686 "ip_address"
687 ].split(";")[0]
688 elif not db_vnfr.get("ip-address"):
689 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
692 vnf_index
693 )
694 )
695
696 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
697 vdur_RO_count_index = 0
698 if vdur.get("pdu-type"):
699 continue
700 for vdur_RO in get_iterable(vnf_RO, "vms"):
701 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
702 continue
703 if vdur["count-index"] != vdur_RO_count_index:
704 vdur_RO_count_index += 1
705 continue
706 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
707 if vdur_RO.get("ip_address"):
708 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
709 else:
710 vdur["ip-address"] = None
711 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
712 vdur["name"] = vdur_RO.get("vim_name")
713 vdur["status"] = vdur_RO.get("status")
714 vdur["status-detailed"] = vdur_RO.get("error_msg")
715 for ifacer in get_iterable(vdur, "interfaces"):
716 for interface_RO in get_iterable(vdur_RO, "interfaces"):
717 if ifacer["name"] == interface_RO.get("internal_name"):
718 ifacer["ip-address"] = interface_RO.get(
719 "ip_address"
720 )
721 ifacer["mac-address"] = interface_RO.get(
722 "mac_address"
723 )
724 break
725 else:
726 raise LcmException(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
730 )
731 )
732 vnfr_update["vdur.{}".format(vdu_index)] = vdur
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
737 "VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
739 )
740 )
741
742 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
743 for net_RO in get_iterable(nsr_desc_RO, "nets"):
744 if vld["id"] != net_RO.get("vnf_net_osm_id"):
745 continue
746 vld["vim-id"] = net_RO.get("vim_net_id")
747 vld["name"] = net_RO.get("vim_name")
748 vld["status"] = net_RO.get("status")
749 vld["status-detailed"] = net_RO.get("error_msg")
750 vnfr_update["vld.{}".format(vld_index)] = vld
751 break
752 else:
753 raise LcmException(
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
755 vnf_index, vld["id"]
756 )
757 )
758
759 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
760 break
761
762 else:
763 raise LcmException(
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
765 vnf_index
766 )
767 )
768
769 def _get_ns_config_info(self, nsr_id):
770 """
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
776 """
777 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
778 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
779 mapping = {}
780 ns_config_info = {"osm-config-mapping": mapping}
781 for vca in vca_deployed_list:
782 if not vca["member-vnf-index"]:
783 continue
784 if not vca["vdu_id"]:
785 mapping[vca["member-vnf-index"]] = vca["application"]
786 else:
787 mapping[
788 "{}.{}.{}".format(
789 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
790 )
791 ] = vca["application"]
792 return ns_config_info
793
794 async def _instantiate_ng_ro(
795 self,
796 logging_text,
797 nsr_id,
798 nsd,
799 db_nsr,
800 db_nslcmop,
801 db_vnfrs,
802 db_vnfds,
803 n2vc_key_list,
804 stage,
805 start_deploy,
806 timeout_ns_deploy,
807 ):
808
809 db_vims = {}
810
811 def get_vim_account(vim_account_id):
812 nonlocal db_vims
813 if vim_account_id in db_vims:
814 return db_vims[vim_account_id]
815 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
816 db_vims[vim_account_id] = db_vim
817 return db_vim
818
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim, target_vld, vld_params, target_sdn
822 ):
823 if vld_params.get("ip-profile"):
824 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
825 "ip-profile"
826 ]
827 if vld_params.get("provider-network"):
828 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
829 "provider-network"
830 ]
831 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
832 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
833 "provider-network"
834 ]["sdn-ports"]
835 if vld_params.get("wimAccountId"):
836 target_wim = "wim:{}".format(vld_params["wimAccountId"])
837 target_vld["vim_info"][target_wim] = {}
838 for param in ("vim-network-name", "vim-network-id"):
839 if vld_params.get(param):
840 if isinstance(vld_params[param], dict):
841 for vim, vim_net in vld_params[param].items():
842 other_target_vim = "vim:" + vim
843 populate_dict(
844 target_vld["vim_info"],
845 (other_target_vim, param.replace("-", "_")),
846 vim_net,
847 )
848 else: # isinstance str
849 target_vld["vim_info"][target_vim][
850 param.replace("-", "_")
851 ] = vld_params[param]
852 if vld_params.get("common_id"):
853 target_vld["common_id"] = vld_params.get("common_id")
854
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target, ns_params):
857 for vnf_params in ns_params.get("vnf", ()):
858 if vnf_params.get("vimAccountId"):
859 target_vnf = next(
860 (
861 vnfr
862 for vnfr in db_vnfrs.values()
863 if vnf_params["member-vnf-index"]
864 == vnfr["member-vnf-index-ref"]
865 ),
866 None,
867 )
868 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
869 for a_index, a_vld in enumerate(target["ns"]["vld"]):
870 target_vld = find_in_list(
871 get_iterable(vdur, "interfaces"),
872 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
873 )
874 if target_vld:
875 if vnf_params.get("vimAccountId") not in a_vld.get(
876 "vim_info", {}
877 ):
878 target["ns"]["vld"][a_index].get("vim_info").update(
879 {
880 "vim:{}".format(vnf_params["vimAccountId"]): {
881 "vim_network_name": ""
882 }
883 }
884 )
885
886 nslcmop_id = db_nslcmop["_id"]
887 target = {
888 "name": db_nsr["name"],
889 "ns": {"vld": []},
890 "vnf": [],
891 "image": deepcopy(db_nsr["image"]),
892 "flavor": deepcopy(db_nsr["flavor"]),
893 "action_id": nslcmop_id,
894 "cloud_init_content": {},
895 }
896 for image in target["image"]:
897 image["vim_info"] = {}
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = {}
900 if db_nsr.get("affinity-or-anti-affinity-group"):
901 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group["vim_info"] = {}
904
905 if db_nslcmop.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate = self.db.get_list(
908 "nslcmops",
909 {
910 "nsInstanceId": db_nslcmop["nsInstanceId"],
911 "lcmOperationType": "instantiate",
912 },
913 )[-1]
914 ns_params = db_nslcmop_instantiate.get("operationParams")
915 else:
916 ns_params = db_nslcmop.get("operationParams")
917 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
918 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
919
920 cp2target = {}
921 for vld_index, vld in enumerate(db_nsr.get("vld")):
922 target_vim = "vim:{}".format(ns_params["vimAccountId"])
923 target_vld = {
924 "id": vld["id"],
925 "name": vld["name"],
926 "mgmt-network": vld.get("mgmt-network", False),
927 "type": vld.get("type"),
928 "vim_info": {
929 target_vim: {
930 "vim_network_name": vld.get("vim-network-name"),
931 "vim_account_id": ns_params["vimAccountId"],
932 }
933 },
934 }
935 # check if this network needs SDN assist
936 if vld.get("pci-interfaces"):
937 db_vim = get_vim_account(ns_params["vimAccountId"])
938 sdnc_id = db_vim["config"].get("sdn-controller")
939 if sdnc_id:
940 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
941 target_sdn = "sdn:{}".format(sdnc_id)
942 target_vld["vim_info"][target_sdn] = {
943 "sdn": True,
944 "target_vim": target_vim,
945 "vlds": [sdn_vld],
946 "type": vld.get("type"),
947 }
948
949 nsd_vnf_profiles = get_vnf_profiles(nsd)
950 for nsd_vnf_profile in nsd_vnf_profiles:
951 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
952 if cp["virtual-link-profile-id"] == vld["id"]:
953 cp2target[
954 "member_vnf:{}.{}".format(
955 cp["constituent-cpd-id"][0][
956 "constituent-base-element-id"
957 ],
958 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
959 )
960 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
961
962 # check at nsd descriptor, if there is an ip-profile
963 vld_params = {}
964 nsd_vlp = find_in_list(
965 get_virtual_link_profiles(nsd),
966 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
967 == vld["id"],
968 )
969 if (
970 nsd_vlp
971 and nsd_vlp.get("virtual-link-protocol-data")
972 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
973 ):
974 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
975 "l3-protocol-data"
976 ]
977 ip_profile_dest_data = {}
978 if "ip-version" in ip_profile_source_data:
979 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
980 "ip-version"
981 ]
982 if "cidr" in ip_profile_source_data:
983 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
984 "cidr"
985 ]
986 if "gateway-ip" in ip_profile_source_data:
987 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
988 "gateway-ip"
989 ]
990 if "dhcp-enabled" in ip_profile_source_data:
991 ip_profile_dest_data["dhcp-params"] = {
992 "enabled": ip_profile_source_data["dhcp-enabled"]
993 }
994 vld_params["ip-profile"] = ip_profile_dest_data
995
996 # update vld_params with instantiation params
997 vld_instantiation_params = find_in_list(
998 get_iterable(ns_params, "vld"),
999 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1000 )
1001 if vld_instantiation_params:
1002 vld_params.update(vld_instantiation_params)
1003 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1004 target["ns"]["vld"].append(target_vld)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target, ns_params)
1007
1008 for vnfr in db_vnfrs.values():
1009 vnfd = find_in_list(
1010 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1011 )
1012 vnf_params = find_in_list(
1013 get_iterable(ns_params, "vnf"),
1014 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1015 )
1016 target_vnf = deepcopy(vnfr)
1017 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1018 for vld in target_vnf.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp = find_in_list(
1021 vnfd.get("int-virtual-link-desc", ()),
1022 lambda cpd: cpd.get("id") == vld["id"],
1023 )
1024 if vnf_cp:
1025 ns_cp = "member_vnf:{}.{}".format(
1026 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1027 )
1028 if cp2target.get(ns_cp):
1029 vld["target"] = cp2target[ns_cp]
1030
1031 vld["vim_info"] = {
1032 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1033 }
1034 # check if this network needs SDN assist
1035 target_sdn = None
1036 if vld.get("pci-interfaces"):
1037 db_vim = get_vim_account(vnfr["vim-account-id"])
1038 sdnc_id = db_vim["config"].get("sdn-controller")
1039 if sdnc_id:
1040 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1041 target_sdn = "sdn:{}".format(sdnc_id)
1042 vld["vim_info"][target_sdn] = {
1043 "sdn": True,
1044 "target_vim": target_vim,
1045 "vlds": [sdn_vld],
1046 "type": vld.get("type"),
1047 }
1048
1049 # check at vnfd descriptor, if there is an ip-profile
1050 vld_params = {}
1051 vnfd_vlp = find_in_list(
1052 get_virtual_link_profiles(vnfd),
1053 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1054 )
1055 if (
1056 vnfd_vlp
1057 and vnfd_vlp.get("virtual-link-protocol-data")
1058 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1059 ):
1060 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1061 "l3-protocol-data"
1062 ]
1063 ip_profile_dest_data = {}
1064 if "ip-version" in ip_profile_source_data:
1065 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1066 "ip-version"
1067 ]
1068 if "cidr" in ip_profile_source_data:
1069 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1070 "cidr"
1071 ]
1072 if "gateway-ip" in ip_profile_source_data:
1073 ip_profile_dest_data[
1074 "gateway-address"
1075 ] = ip_profile_source_data["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data:
1077 ip_profile_dest_data["dhcp-params"] = {
1078 "enabled": ip_profile_source_data["dhcp-enabled"]
1079 }
1080
1081 vld_params["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1083 if vnf_params:
1084 vld_instantiation_params = find_in_list(
1085 get_iterable(vnf_params, "internal-vld"),
1086 lambda i_vld: i_vld["name"] == vld["id"],
1087 )
1088 if vld_instantiation_params:
1089 vld_params.update(vld_instantiation_params)
1090 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1091
1092 vdur_list = []
1093 for vdur in target_vnf.get("vdur", ()):
1094 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1097
1098 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1099
1100 if ssh_keys_all:
1101 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1102 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1103 if (
1104 vdu_configuration
1105 and vdu_configuration.get("config-access")
1106 and vdu_configuration.get("config-access").get("ssh-access")
1107 ):
1108 vdur["ssh-keys"] = ssh_keys_all
1109 vdur["ssh-access-required"] = vdu_configuration[
1110 "config-access"
1111 ]["ssh-access"]["required"]
1112 elif (
1113 vnf_configuration
1114 and vnf_configuration.get("config-access")
1115 and vnf_configuration.get("config-access").get("ssh-access")
1116 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1117 ):
1118 vdur["ssh-keys"] = ssh_keys_all
1119 vdur["ssh-access-required"] = vnf_configuration[
1120 "config-access"
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation and find_in_list(
1123 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1124 ):
1125 vdur["ssh-keys"] = ssh_keys_instantiation
1126
1127 self.logger.debug("NS > vdur > {}".format(vdur))
1128
1129 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1130 # cloud-init
1131 if vdud.get("cloud-init-file"):
1132 vdur["cloud-init"] = "{}:file:{}".format(
1133 vnfd["_id"], vdud.get("cloud-init-file")
1134 )
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur["cloud-init"] not in target["cloud_init_content"]:
1137 base_folder = vnfd["_admin"]["storage"]
1138 if base_folder["pkg-dir"]:
1139 cloud_init_file = "{}/{}/cloud_init/{}".format(
1140 base_folder["folder"],
1141 base_folder["pkg-dir"],
1142 vdud.get("cloud-init-file"),
1143 )
1144 else:
1145 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1146 base_folder["folder"],
1147 vdud.get("cloud-init-file"),
1148 )
1149 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1150 target["cloud_init_content"][
1151 vdur["cloud-init"]
1152 ] = ci_file.read()
1153 elif vdud.get("cloud-init"):
1154 vdur["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1156 )
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1159 "cloud-init"
1160 ]
1161 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1162 deploy_params_vdu = self._format_additional_params(
1163 vdur.get("additionalParams") or {}
1164 )
1165 deploy_params_vdu["OSM"] = get_osm_params(
1166 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1167 )
1168 vdur["additionalParams"] = deploy_params_vdu
1169
1170 # flavor
1171 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1172 if target_vim not in ns_flavor["vim_info"]:
1173 ns_flavor["vim_info"][target_vim] = {}
1174
1175 # deal with images
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id = int(vdur["ns-image-id"])
1179 if vdur.get("alt-image-ids"):
1180 db_vim = get_vim_account(vnfr["vim-account-id"])
1181 vim_type = db_vim["vim_type"]
1182 for alt_image_id in vdur.get("alt-image-ids"):
1183 ns_alt_image = target["image"][int(alt_image_id)]
1184 if vim_type == ns_alt_image.get("vim-type"):
1185 # must use alternative image
1186 self.logger.debug(
1187 "use alternative image id: {}".format(alt_image_id)
1188 )
1189 ns_image_id = alt_image_id
1190 vdur["ns-image-id"] = ns_image_id
1191 break
1192 ns_image = target["image"][int(ns_image_id)]
1193 if target_vim not in ns_image["vim_info"]:
1194 ns_image["vim_info"][target_vim] = {}
1195
1196 # Affinity groups
1197 if vdur.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1199 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1200 if target_vim not in ns_ags["vim_info"]:
1201 ns_ags["vim_info"][target_vim] = {}
1202
1203 vdur["vim_info"] = {target_vim: {}}
1204 # instantiation parameters
1205 # if vnf_params:
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list.append(vdur)
1209 target_vnf["vdur"] = vdur_list
1210 target["vnf"].append(target_vnf)
1211
1212 desc = await self.RO.deploy(nsr_id, target)
1213 self.logger.debug("RO return > {}".format(desc))
1214 action_id = desc["action_id"]
1215 await self._wait_ng_ro(
1216 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1217 )
1218
1219 # Updating NSR
1220 db_nsr_update = {
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage),
1223 }
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1226 self._write_op_status(nslcmop_id, stage)
1227 self.logger.debug(
1228 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1229 )
1230 return
1231
1232 async def _wait_ng_ro(
1233 self,
1234 nsr_id,
1235 action_id,
1236 nslcmop_id=None,
1237 start_time=None,
1238 timeout=600,
1239 stage=None,
1240 ):
1241 detailed_status_old = None
1242 db_nsr_update = {}
1243 start_time = start_time or time()
1244 while time() <= start_time + timeout:
1245 desc_status = await self.RO.status(nsr_id, action_id)
1246 self.logger.debug("Wait NG RO > {}".format(desc_status))
1247 if desc_status["status"] == "FAILED":
1248 raise NgRoException(desc_status["details"])
1249 elif desc_status["status"] == "BUILD":
1250 if stage:
1251 stage[2] = "VIM: ({})".format(desc_status["details"])
1252 elif desc_status["status"] == "DONE":
1253 if stage:
1254 stage[2] = "Deployed at VIM"
1255 break
1256 else:
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status["status"]
1259 )
1260 if stage and nslcmop_id and stage[2] != detailed_status_old:
1261 detailed_status_old = stage[2]
1262 db_nsr_update["detailed-status"] = " ".join(stage)
1263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1264 self._write_op_status(nslcmop_id, stage)
1265 await asyncio.sleep(15, loop=self.loop)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1268
1269 async def _terminate_ng_ro(
1270 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1271 ):
1272 db_nsr_update = {}
1273 failed_detail = []
1274 action_id = None
1275 start_deploy = time()
1276 try:
1277 target = {
1278 "ns": {"vld": []},
1279 "vnf": [],
1280 "image": [],
1281 "flavor": [],
1282 "action_id": nslcmop_id,
1283 }
1284 desc = await self.RO.deploy(nsr_id, target)
1285 action_id = desc["action_id"]
1286 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1288 self.logger.debug(
1289 logging_text
1290 + "ns terminate action at RO. action_id={}".format(action_id)
1291 )
1292
1293 # wait until done
1294 delete_timeout = 20 * 60 # 20 minutes
1295 await self._wait_ng_ro(
1296 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1297 )
1298
1299 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 # delete all nsr
1302 await self.RO.delete(nsr_id)
1303 except Exception as e:
1304 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1305 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1308 self.logger.debug(
1309 logging_text + "RO_action_id={} already deleted".format(action_id)
1310 )
1311 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1312 failed_detail.append("delete conflict: {}".format(e))
1313 self.logger.debug(
1314 logging_text
1315 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1316 )
1317 else:
1318 failed_detail.append("delete error: {}".format(e))
1319 self.logger.error(
1320 logging_text
1321 + "RO_action_id={} delete error: {}".format(action_id, e)
1322 )
1323
1324 if failed_detail:
1325 stage[2] = "Error deleting from VIM"
1326 else:
1327 stage[2] = "Deleted from VIM"
1328 db_nsr_update["detailed-status"] = " ".join(stage)
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 self._write_op_status(nslcmop_id, stage)
1331
1332 if failed_detail:
1333 raise LcmException("; ".join(failed_detail))
1334 return
1335
1336 async def instantiate_RO(
1337 self,
1338 logging_text,
1339 nsr_id,
1340 nsd,
1341 db_nsr,
1342 db_nslcmop,
1343 db_vnfrs,
1344 db_vnfds,
1345 n2vc_key_list,
1346 stage,
1347 ):
1348 """
1349 Instantiate at RO
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1355 :param db_vnfrs:
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1360 """
1361 try:
1362 start_deploy = time()
1363 ns_params = db_nslcmop.get("operationParams")
1364 if ns_params and ns_params.get("timeout_ns_deploy"):
1365 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1366 else:
1367 timeout_ns_deploy = self.timeout.get(
1368 "ns_deploy", self.timeout_ns_deploy
1369 )
1370
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage[2] = "Waiting for Placement."
1373 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr in db_vnfrs.values():
1376 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1377 break
1378 else:
1379 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1380
1381 return await self._instantiate_ng_ro(
1382 logging_text,
1383 nsr_id,
1384 nsd,
1385 db_nsr,
1386 db_nslcmop,
1387 db_vnfrs,
1388 db_vnfds,
1389 n2vc_key_list,
1390 stage,
1391 start_deploy,
1392 timeout_ns_deploy,
1393 )
1394 except Exception as e:
1395 stage[2] = "ERROR deploying at VIM"
1396 self.set_vnfr_at_error(db_vnfrs, str(e))
1397 self.logger.error(
1398 "Error deploying at VIM {}".format(e),
1399 exc_info=not isinstance(
1400 e,
1401 (
1402 ROclient.ROClientException,
1403 LcmException,
1404 DbException,
1405 NgRoException,
1406 ),
1407 ),
1408 )
1409 raise
1410
1411 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1412 """
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param kdu_name:
1418 :return: IP address
1419 """
1420
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1422 nb_tries = 0
1423
1424 while nb_tries < 360:
1425 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1426 kdur = next(
1427 (
1428 x
1429 for x in get_iterable(db_vnfr, "kdur")
1430 if x.get("kdu-name") == kdu_name
1431 ),
1432 None,
1433 )
1434 if not kdur:
1435 raise LcmException(
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1437 )
1438 if kdur.get("status"):
1439 if kdur["status"] in ("READY", "ENABLED"):
1440 return kdur.get("ip-address")
1441 else:
1442 raise LcmException(
1443 "target KDU={} is in error state".format(kdu_name)
1444 )
1445
1446 await asyncio.sleep(10, loop=self.loop)
1447 nb_tries += 1
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1449
1450 async def wait_vm_up_insert_key_ro(
1451 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1452 ):
1453 """
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1456 :param nsr_id:
1457 :param vnfr_id:
1458 :param vdu_id:
1459 :param vdu_index:
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1462 :return: IP address
1463 """
1464
1465 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1466 ro_nsr_id = None
1467 ip_address = None
1468 nb_tries = 0
1469 target_vdu_id = None
1470 ro_retries = 0
1471
1472 while True:
1473
1474 ro_retries += 1
1475 if ro_retries >= 360: # 1 hour
1476 raise LcmException(
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1478 )
1479
1480 await asyncio.sleep(10, loop=self.loop)
1481
1482 # get ip address
1483 if not target_vdu_id:
1484 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1485
1486 if not vdu_id: # for the VNF case
1487 if db_vnfr.get("status") == "ERROR":
1488 raise LcmException(
1489 "Cannot inject ssh-key because target VNF is in error state"
1490 )
1491 ip_address = db_vnfr.get("ip-address")
1492 if not ip_address:
1493 continue
1494 vdur = next(
1495 (
1496 x
1497 for x in get_iterable(db_vnfr, "vdur")
1498 if x.get("ip-address") == ip_address
1499 ),
1500 None,
1501 )
1502 else: # VDU case
1503 vdur = next(
1504 (
1505 x
1506 for x in get_iterable(db_vnfr, "vdur")
1507 if x.get("vdu-id-ref") == vdu_id
1508 and x.get("count-index") == vdu_index
1509 ),
1510 None,
1511 )
1512
1513 if (
1514 not vdur and len(db_vnfr.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur = db_vnfr["vdur"][0]
1517 if not vdur:
1518 raise LcmException(
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id, vdu_id, vdu_index
1521 )
1522 )
1523 # New generation RO stores information at "vim_info"
1524 ng_ro_status = None
1525 target_vim = None
1526 if vdur.get("vim_info"):
1527 target_vim = next(
1528 t for t in vdur["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1531 if (
1532 vdur.get("pdu-type")
1533 or vdur.get("status") == "ACTIVE"
1534 or ng_ro_status == "ACTIVE"
1535 ):
1536 ip_address = vdur.get("ip-address")
1537 if not ip_address:
1538 continue
1539 target_vdu_id = vdur["vdu-id-ref"]
1540 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1541 raise LcmException(
1542 "Cannot inject ssh-key because target VM is in error state"
1543 )
1544
1545 if not target_vdu_id:
1546 continue
1547
1548 # inject public key into machine
1549 if pub_key and user:
1550 self.logger.debug(logging_text + "Inserting RO key")
1551 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1552 if vdur.get("pdu-type"):
1553 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1554 return ip_address
1555 try:
1556 ro_vm_id = "{}-{}".format(
1557 db_vnfr["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1559 if self.ng_ro:
1560 target = {
1561 "action": {
1562 "action": "inject_ssh_key",
1563 "key": pub_key,
1564 "user": user,
1565 },
1566 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1567 }
1568 desc = await self.RO.deploy(nsr_id, target)
1569 action_id = desc["action_id"]
1570 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1571 break
1572 else:
1573 # wait until NS is deployed at RO
1574 if not ro_nsr_id:
1575 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1576 ro_nsr_id = deep_get(
1577 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1578 )
1579 if not ro_nsr_id:
1580 continue
1581 result_dict = await self.RO.create_action(
1582 item="ns",
1583 item_id_name=ro_nsr_id,
1584 descriptor={
1585 "add_public_key": pub_key,
1586 "vms": [ro_vm_id],
1587 "user": user,
1588 },
1589 )
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict or not isinstance(result_dict, dict):
1592 raise LcmException(
1593 "Unknown response from RO when injecting key"
1594 )
1595 for result in result_dict.values():
1596 if result.get("vim_result") == 200:
1597 break
1598 else:
1599 raise ROclient.ROClientException(
1600 "error injecting key: {}".format(
1601 result.get("description")
1602 )
1603 )
1604 break
1605 except NgRoException as e:
1606 raise LcmException(
1607 "Reaching max tries injecting key. Error: {}".format(e)
1608 )
1609 except ROclient.ROClientException as e:
1610 if not nb_tries:
1611 self.logger.debug(
1612 logging_text
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1614 e, 20 * 10
1615 )
1616 )
1617 nb_tries += 1
1618 if nb_tries >= 20:
1619 raise LcmException(
1620 "Reaching max tries injecting key. Error: {}".format(e)
1621 )
1622 else:
1623 break
1624
1625 return ip_address
1626
1627 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1628 """
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1630 """
1631 my_vca = vca_deployed_list[vca_index]
1632 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1634 return
1635 timeout = 300
1636 while timeout >= 0:
1637 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1638 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1639 configuration_status_list = db_nsr["configurationStatus"]
1640 for index, vca_deployed in enumerate(configuration_status_list):
1641 if index == vca_index:
1642 # myself
1643 continue
1644 if not my_vca.get("member-vnf-index") or (
1645 vca_deployed.get("member-vnf-index")
1646 == my_vca.get("member-vnf-index")
1647 ):
1648 internal_status = configuration_status_list[index].get("status")
1649 if internal_status == "READY":
1650 continue
1651 elif internal_status == "BROKEN":
1652 raise LcmException(
1653 "Configuration aborted because dependent charm/s has failed"
1654 )
1655 else:
1656 break
1657 else:
1658 # no dependencies, return
1659 return
1660 await asyncio.sleep(10)
1661 timeout -= 1
1662
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1664
1665 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1666 vca_id = None
1667 if db_vnfr:
1668 vca_id = deep_get(db_vnfr, ("vca-id",))
1669 elif db_nsr:
1670 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1671 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1672 return vca_id
1673
1674 async def instantiate_N2VC(
1675 self,
1676 logging_text,
1677 vca_index,
1678 nsi_id,
1679 db_nsr,
1680 db_vnfr,
1681 vdu_id,
1682 kdu_name,
1683 vdu_index,
1684 config_descriptor,
1685 deploy_params,
1686 base_folder,
1687 nslcmop_id,
1688 stage,
1689 vca_type,
1690 vca_name,
1691 ee_config_descriptor,
1692 ):
1693 nsr_id = db_nsr["_id"]
1694 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1695 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1696 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1697 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1698 db_dict = {
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id},
1701 "path": db_update_entry,
1702 }
1703 step = ""
1704 try:
1705
1706 element_type = "NS"
1707 element_under_configuration = nsr_id
1708
1709 vnfr_id = None
1710 if db_vnfr:
1711 vnfr_id = db_vnfr["_id"]
1712 osm_config["osm"]["vnf_id"] = vnfr_id
1713
1714 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1715
1716 if vca_type == "native_charm":
1717 index_number = 0
1718 else:
1719 index_number = vdu_index or 0
1720
1721 if vnfr_id:
1722 element_type = "VNF"
1723 element_under_configuration = vnfr_id
1724 namespace += ".{}-{}".format(vnfr_id, index_number)
1725 if vdu_id:
1726 namespace += ".{}-{}".format(vdu_id, index_number)
1727 element_type = "VDU"
1728 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1729 osm_config["osm"]["vdu_id"] = vdu_id
1730 elif kdu_name:
1731 namespace += ".{}".format(kdu_name)
1732 element_type = "KDU"
1733 element_under_configuration = kdu_name
1734 osm_config["osm"]["kdu_name"] = kdu_name
1735
1736 # Get artifact path
1737 if base_folder["pkg-dir"]:
1738 artifact_path = "{}/{}/{}/{}".format(
1739 base_folder["folder"],
1740 base_folder["pkg-dir"],
1741 "charms"
1742 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1743 else "helm-charts",
1744 vca_name,
1745 )
1746 else:
1747 artifact_path = "{}/Scripts/{}/{}/".format(
1748 base_folder["folder"],
1749 "charms"
1750 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1751 else "helm-charts",
1752 vca_name,
1753 )
1754
1755 self.logger.debug("Artifact path > {}".format(artifact_path))
1756
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list = config_descriptor.get(
1759 "initial-config-primitive"
1760 )
1761
1762 self.logger.debug(
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1765 )
1766 )
1767
1768 # add config if not present for NS charm
1769 ee_descriptor_id = ee_config_descriptor.get("id")
1770 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1771 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1773 )
1774
1775 self.logger.debug(
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1778 )
1779 )
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id = vca_deployed.get("ee_id")
1783
1784 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1785 # create or register execution environment in VCA
1786 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1787
1788 self._write_configuration_status(
1789 nsr_id=nsr_id,
1790 vca_index=vca_index,
1791 status="CREATING",
1792 element_under_configuration=element_under_configuration,
1793 element_type=element_type,
1794 )
1795
1796 step = "create execution environment"
1797 self.logger.debug(logging_text + step)
1798
1799 ee_id = None
1800 credentials = None
1801 if vca_type == "k8s_proxy_charm":
1802 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1803 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1804 namespace=namespace,
1805 artifact_path=artifact_path,
1806 db_dict=db_dict,
1807 vca_id=vca_id,
1808 )
1809 elif vca_type == "helm" or vca_type == "helm-v3":
1810 ee_id, credentials = await self.vca_map[
1811 vca_type
1812 ].create_execution_environment(
1813 namespace=namespace,
1814 reuse_ee_id=ee_id,
1815 db_dict=db_dict,
1816 config=osm_config,
1817 artifact_path=artifact_path,
1818 vca_type=vca_type,
1819 )
1820 else:
1821 ee_id, credentials = await self.vca_map[
1822 vca_type
1823 ].create_execution_environment(
1824 namespace=namespace,
1825 reuse_ee_id=ee_id,
1826 db_dict=db_dict,
1827 vca_id=vca_id,
1828 )
1829
1830 elif vca_type == "native_charm":
1831 step = "Waiting to VM being up and getting IP address"
1832 self.logger.debug(logging_text + step)
1833 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1834 logging_text,
1835 nsr_id,
1836 vnfr_id,
1837 vdu_id,
1838 vdu_index,
1839 user=None,
1840 pub_key=None,
1841 )
1842 credentials = {"hostname": rw_mgmt_ip}
1843 # get username
1844 username = deep_get(
1845 config_descriptor, ("config-access", "ssh-access", "default-user")
1846 )
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username and initial_config_primitive_list:
1850 for config_primitive in initial_config_primitive_list:
1851 for param in config_primitive.get("parameter", ()):
1852 if param["name"] == "ssh-username":
1853 username = param["value"]
1854 break
1855 if not username:
1856 raise LcmException(
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1859 )
1860 credentials["username"] = username
1861 # n2vc_redesign STEP 3.2
1862
1863 self._write_configuration_status(
1864 nsr_id=nsr_id,
1865 vca_index=vca_index,
1866 status="REGISTERING",
1867 element_under_configuration=element_under_configuration,
1868 element_type=element_type,
1869 )
1870
1871 step = "register execution environment {}".format(credentials)
1872 self.logger.debug(logging_text + step)
1873 ee_id = await self.vca_map[vca_type].register_execution_environment(
1874 credentials=credentials,
1875 namespace=namespace,
1876 db_dict=db_dict,
1877 vca_id=vca_id,
1878 )
1879
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts = ee_id.split(".")
1883 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1884 if len(ee_id_parts) >= 2:
1885 model_name = ee_id_parts[0]
1886 application_name = ee_id_parts[1]
1887 db_nsr_update[db_update_entry + "model"] = model_name
1888 db_nsr_update[db_update_entry + "application"] = application_name
1889
1890 # n2vc_redesign STEP 3.3
1891 step = "Install configuration Software"
1892
1893 self._write_configuration_status(
1894 nsr_id=nsr_id,
1895 vca_index=vca_index,
1896 status="INSTALLING SW",
1897 element_under_configuration=element_under_configuration,
1898 element_type=element_type,
1899 other_update=db_nsr_update,
1900 )
1901
1902 # TODO check if already done
1903 self.logger.debug(logging_text + step)
1904 config = None
1905 if vca_type == "native_charm":
1906 config_primitive = next(
1907 (p for p in initial_config_primitive_list if p["name"] == "config"),
1908 None,
1909 )
1910 if config_primitive:
1911 config = self._map_primitive_params(
1912 config_primitive, {}, deploy_params
1913 )
1914 num_units = 1
1915 if vca_type == "lxc_proxy_charm":
1916 if element_type == "NS":
1917 num_units = db_nsr.get("config-units") or 1
1918 elif element_type == "VNF":
1919 num_units = db_vnfr.get("config-units") or 1
1920 elif element_type == "VDU":
1921 for v in db_vnfr["vdur"]:
1922 if vdu_id == v["vdu-id-ref"]:
1923 num_units = v.get("config-units") or 1
1924 break
1925 if vca_type != "k8s_proxy_charm":
1926 await self.vca_map[vca_type].install_configuration_sw(
1927 ee_id=ee_id,
1928 artifact_path=artifact_path,
1929 db_dict=db_dict,
1930 config=config,
1931 num_units=num_units,
1932 vca_id=vca_id,
1933 vca_type=vca_type,
1934 )
1935
1936 # write in db flag of configuration_sw already installed
1937 self.update_db_2(
1938 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1939 )
1940
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self._add_vca_relations(
1943 logging_text=logging_text,
1944 nsr_id=nsr_id,
1945 vca_type=vca_type,
1946 vca_index=vca_index,
1947 )
1948
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1952 pub_key = None
1953 user = None
1954 # self.logger.debug("get ssh key block")
1955 if deep_get(
1956 config_descriptor, ("config-access", "ssh-access", "required")
1957 ):
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1960 user = deep_get(
1961 config_descriptor,
1962 ("config-access", "ssh-access", "default-user"),
1963 )
1964 step = "Install configuration Software, getting public ssh key"
1965 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1966 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1967 )
1968
1969 step = "Insert public key into VM user={} ssh_key={}".format(
1970 user, pub_key
1971 )
1972 else:
1973 # self.logger.debug("no need to get ssh key")
1974 step = "Waiting to VM being up and getting IP address"
1975 self.logger.debug(logging_text + step)
1976
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1979 if vnfr_id:
1980 if kdu_name:
1981 rw_mgmt_ip = await self.wait_kdu_up(
1982 logging_text, nsr_id, vnfr_id, kdu_name
1983 )
1984 else:
1985 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1986 logging_text,
1987 nsr_id,
1988 vnfr_id,
1989 vdu_id,
1990 vdu_index,
1991 user=user,
1992 pub_key=pub_key,
1993 )
1994 else:
1995 rw_mgmt_ip = None # This is for a NS configuration
1996
1997 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1998
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2001
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step = "execute initial config primitive"
2004
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list:
2007 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2008
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca = vca_deployed_list[vca_index]
2011 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2012 # VDU or KDU
2013 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca.get("member-vnf-index"):
2015 # VNF
2016 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2017 else:
2018 # NS
2019 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2020
2021 self._write_configuration_status(
2022 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2023 )
2024
2025 self._write_op_status(op_id=nslcmop_id, stage=stage)
2026
2027 check_if_terminated_needed = True
2028 for initial_config_primitive in initial_config_primitive_list:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed["member-vnf-index"]:
2031 deploy_params["ns_config_info"] = json.dumps(
2032 self._get_ns_config_info(nsr_id)
2033 )
2034 # TODO check if already done
2035 primitive_params_ = self._map_primitive_params(
2036 initial_config_primitive, {}, deploy_params
2037 )
2038
2039 step = "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive["name"], primitive_params_
2041 )
2042 self.logger.debug(logging_text + step)
2043 await self.vca_map[vca_type].exec_primitive(
2044 ee_id=ee_id,
2045 primitive_name=initial_config_primitive["name"],
2046 params_dict=primitive_params_,
2047 db_dict=db_dict,
2048 vca_id=vca_id,
2049 vca_type=vca_type,
2050 )
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed:
2053 if config_descriptor.get("terminate-config-primitive"):
2054 self.update_db_2(
2055 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2056 )
2057 check_if_terminated_needed = False
2058
2059 # TODO register in database that primitive is done
2060
2061 # STEP 7 Configure metrics
2062 if vca_type == "helm" or vca_type == "helm-v3":
2063 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2064 ee_id=ee_id,
2065 artifact_path=artifact_path,
2066 ee_config_descriptor=ee_config_descriptor,
2067 vnfr_id=vnfr_id,
2068 nsr_id=nsr_id,
2069 target_ip=rw_mgmt_ip,
2070 )
2071 if prometheus_jobs:
2072 self.update_db_2(
2073 "nsrs",
2074 nsr_id,
2075 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2076 )
2077
2078 for job in prometheus_jobs:
2079 self.db.set_one(
2080 "prometheus_jobs",
2081 {
2082 "job_name": job["job_name"]
2083 },
2084 job,
2085 upsert=True,
2086 fail_on_empty=False,
2087 )
2088
2089 step = "instantiated at VCA"
2090 self.logger.debug(logging_text + step)
2091
2092 self._write_configuration_status(
2093 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2094 )
2095
2096 except Exception as e: # TODO not use Exception but N2VC exception
2097 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2098 if not isinstance(
2099 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2100 ):
2101 self.logger.error(
2102 "Exception while {} : {}".format(step, e), exc_info=True
2103 )
2104 self._write_configuration_status(
2105 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2106 )
2107 raise LcmException("{} {}".format(step, e)) from e
2108
2109 def _write_ns_status(
2110 self,
2111 nsr_id: str,
2112 ns_state: str,
2113 current_operation: str,
2114 current_operation_id: str,
2115 error_description: str = None,
2116 error_detail: str = None,
2117 other_update: dict = None,
2118 ):
2119 """
2120 Update db_nsr fields.
2121 :param nsr_id:
2122 :param ns_state:
2123 :param current_operation:
2124 :param current_operation_id:
2125 :param error_description:
2126 :param error_detail:
2127 :param other_update: Other required changes at database if provided, will be cleared
2128 :return:
2129 """
2130 try:
2131 db_dict = other_update or {}
2132 db_dict[
2133 "_admin.nslcmop"
2134 ] = current_operation_id # for backward compatibility
2135 db_dict["_admin.current-operation"] = current_operation_id
2136 db_dict["_admin.operation-type"] = (
2137 current_operation if current_operation != "IDLE" else None
2138 )
2139 db_dict["currentOperation"] = current_operation
2140 db_dict["currentOperationID"] = current_operation_id
2141 db_dict["errorDescription"] = error_description
2142 db_dict["errorDetail"] = error_detail
2143
2144 if ns_state:
2145 db_dict["nsState"] = ns_state
2146 self.update_db_2("nsrs", nsr_id, db_dict)
2147 except DbException as e:
2148 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2149
2150 def _write_op_status(
2151 self,
2152 op_id: str,
2153 stage: list = None,
2154 error_message: str = None,
2155 queuePosition: int = 0,
2156 operation_state: str = None,
2157 other_update: dict = None,
2158 ):
2159 try:
2160 db_dict = other_update or {}
2161 db_dict["queuePosition"] = queuePosition
2162 if isinstance(stage, list):
2163 db_dict["stage"] = stage[0]
2164 db_dict["detailed-status"] = " ".join(stage)
2165 elif stage is not None:
2166 db_dict["stage"] = str(stage)
2167
2168 if error_message is not None:
2169 db_dict["errorMessage"] = error_message
2170 if operation_state is not None:
2171 db_dict["operationState"] = operation_state
2172 db_dict["statusEnteredTime"] = time()
2173 self.update_db_2("nslcmops", op_id, db_dict)
2174 except DbException as e:
2175 self.logger.warn(
2176 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2177 )
2178
2179 def _write_all_config_status(self, db_nsr: dict, status: str):
2180 try:
2181 nsr_id = db_nsr["_id"]
2182 # configurationStatus
2183 config_status = db_nsr.get("configurationStatus")
2184 if config_status:
2185 db_nsr_update = {
2186 "configurationStatus.{}.status".format(index): status
2187 for index, v in enumerate(config_status)
2188 if v
2189 }
2190 # update status
2191 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2192
2193 except DbException as e:
2194 self.logger.warn(
2195 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2196 )
2197
2198 def _write_configuration_status(
2199 self,
2200 nsr_id: str,
2201 vca_index: int,
2202 status: str = None,
2203 element_under_configuration: str = None,
2204 element_type: str = None,
2205 other_update: dict = None,
2206 ):
2207
2208 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2209 # .format(vca_index, status))
2210
2211 try:
2212 db_path = "configurationStatus.{}.".format(vca_index)
2213 db_dict = other_update or {}
2214 if status:
2215 db_dict[db_path + "status"] = status
2216 if element_under_configuration:
2217 db_dict[
2218 db_path + "elementUnderConfiguration"
2219 ] = element_under_configuration
2220 if element_type:
2221 db_dict[db_path + "elementType"] = element_type
2222 self.update_db_2("nsrs", nsr_id, db_dict)
2223 except DbException as e:
2224 self.logger.warn(
2225 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2226 status, nsr_id, vca_index, e
2227 )
2228 )
2229
2230 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2231 """
2232 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2233 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2234 Database is used because the result can be obtained from a different LCM worker in case of HA.
2235 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2236 :param db_nslcmop: database content of nslcmop
2237 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2238 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2239 computed 'vim-account-id'
2240 """
2241 modified = False
2242 nslcmop_id = db_nslcmop["_id"]
2243 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2244 if placement_engine == "PLA":
2245 self.logger.debug(
2246 logging_text + "Invoke and wait for placement optimization"
2247 )
2248 await self.msg.aiowrite(
2249 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2250 )
2251 db_poll_interval = 5
2252 wait = db_poll_interval * 10
2253 pla_result = None
2254 while not pla_result and wait >= 0:
2255 await asyncio.sleep(db_poll_interval)
2256 wait -= db_poll_interval
2257 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2258 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2259
2260 if not pla_result:
2261 raise LcmException(
2262 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2263 )
2264
2265 for pla_vnf in pla_result["vnf"]:
2266 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2267 if not pla_vnf.get("vimAccountId") or not vnfr:
2268 continue
2269 modified = True
2270 self.db.set_one(
2271 "vnfrs",
2272 {"_id": vnfr["_id"]},
2273 {"vim-account-id": pla_vnf["vimAccountId"]},
2274 )
2275 # Modifies db_vnfrs
2276 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2277 return modified
2278
2279 def update_nsrs_with_pla_result(self, params):
2280 try:
2281 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2282 self.update_db_2(
2283 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2284 )
2285 except Exception as e:
2286 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2287
2288 async def instantiate(self, nsr_id, nslcmop_id):
2289 """
2290
2291 :param nsr_id: ns instance to deploy
2292 :param nslcmop_id: operation to run
2293 :return:
2294 """
2295
2296 # Try to lock HA task here
2297 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2298 if not task_is_locked_by_me:
2299 self.logger.debug(
2300 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2301 )
2302 return
2303
2304 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2305 self.logger.debug(logging_text + "Enter")
2306
2307 # get all needed from database
2308
2309 # database nsrs record
2310 db_nsr = None
2311
2312 # database nslcmops record
2313 db_nslcmop = None
2314
2315 # update operation on nsrs
2316 db_nsr_update = {}
2317 # update operation on nslcmops
2318 db_nslcmop_update = {}
2319
2320 nslcmop_operation_state = None
2321 db_vnfrs = {} # vnf's info indexed by member-index
2322 # n2vc_info = {}
2323 tasks_dict_info = {} # from task to info text
2324 exc = None
2325 error_list = []
2326 stage = [
2327 "Stage 1/5: preparation of the environment.",
2328 "Waiting for previous operations to terminate.",
2329 "",
2330 ]
2331 # ^ stage, step, VIM progress
2332 try:
2333 # wait for any previous tasks in process
2334 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2335
2336 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2337 stage[1] = "Reading from database."
2338 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2339 db_nsr_update["detailed-status"] = "creating"
2340 db_nsr_update["operational-status"] = "init"
2341 self._write_ns_status(
2342 nsr_id=nsr_id,
2343 ns_state="BUILDING",
2344 current_operation="INSTANTIATING",
2345 current_operation_id=nslcmop_id,
2346 other_update=db_nsr_update,
2347 )
2348 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2349
2350 # read from db: operation
2351 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2352 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2353 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2354 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2355 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2356 )
2357 ns_params = db_nslcmop.get("operationParams")
2358 if ns_params and ns_params.get("timeout_ns_deploy"):
2359 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2360 else:
2361 timeout_ns_deploy = self.timeout.get(
2362 "ns_deploy", self.timeout_ns_deploy
2363 )
2364
2365 # read from db: ns
2366 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2367 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2368 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2369 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2370 self.fs.sync(db_nsr["nsd-id"])
2371 db_nsr["nsd"] = nsd
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2373
2374 # read from db: vnf's of this ns
2375 stage[1] = "Getting vnfrs from db."
2376 self.logger.debug(logging_text + stage[1])
2377 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2378
2379 # read from db: vnfd's for every vnf
2380 db_vnfds = [] # every vnfd data
2381
2382 # for each vnf in ns, read vnfd
2383 for vnfr in db_vnfrs_list:
2384 if vnfr.get("kdur"):
2385 kdur_list = []
2386 for kdur in vnfr["kdur"]:
2387 if kdur.get("additionalParams"):
2388 kdur["additionalParams"] = json.loads(
2389 kdur["additionalParams"]
2390 )
2391 kdur_list.append(kdur)
2392 vnfr["kdur"] = kdur_list
2393
2394 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2395 vnfd_id = vnfr["vnfd-id"]
2396 vnfd_ref = vnfr["vnfd-ref"]
2397 self.fs.sync(vnfd_id)
2398
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id not in db_vnfds:
2401 # read from db
2402 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2403 vnfd_id, vnfd_ref
2404 )
2405 self.logger.debug(logging_text + stage[1])
2406 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2407
2408 # store vnfd
2409 db_vnfds.append(vnfd)
2410
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list = None
2413 if db_nsr["_admin"].get("deployed"):
2414 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list is None:
2416 vca_deployed_list = []
2417 configuration_status_list = []
2418 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2422 elif isinstance(vca_deployed_list, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list = list(vca_deployed_list.values())
2425 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2427
2428 if not isinstance(
2429 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2430 ):
2431 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2433
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2436 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2437 self.db.set_list(
2438 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2439 )
2440
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self._write_op_status(op_id=nslcmop_id, stage=stage)
2444
2445 stage[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self.deploy_kdus(
2449 logging_text=logging_text,
2450 nsr_id=nsr_id,
2451 nslcmop_id=nslcmop_id,
2452 db_vnfrs=db_vnfrs,
2453 db_vnfds=db_vnfds,
2454 task_instantiation_info=tasks_dict_info,
2455 )
2456
2457 stage[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key = self.n2vc.get_public_key()
2461 n2vc_key_list = [n2vc_key]
2462 if self.vca_config.get("public_key"):
2463 n2vc_key_list.append(self.vca_config["public_key"])
2464
2465 stage[1] = "Deploying NS at VIM."
2466 task_ro = asyncio.ensure_future(
2467 self.instantiate_RO(
2468 logging_text=logging_text,
2469 nsr_id=nsr_id,
2470 nsd=nsd,
2471 db_nsr=db_nsr,
2472 db_nslcmop=db_nslcmop,
2473 db_vnfrs=db_vnfrs,
2474 db_vnfds=db_vnfds,
2475 n2vc_key_list=n2vc_key_list,
2476 stage=stage,
2477 )
2478 )
2479 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2480 tasks_dict_info[task_ro] = "Deploying at VIM"
2481
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage[1] = "Deploying Execution Environments."
2484 self.logger.debug(logging_text + stage[1])
2485
2486 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile in get_vnf_profiles(nsd):
2488 vnfd_id = vnf_profile["vnfd-id"]
2489 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2490 member_vnf_index = str(vnf_profile["id"])
2491 db_vnfr = db_vnfrs[member_vnf_index]
2492 base_folder = vnfd["_admin"]["storage"]
2493 vdu_id = None
2494 vdu_index = 0
2495 vdu_name = None
2496 kdu_name = None
2497
2498 # Get additional parameters
2499 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2500 if db_vnfr.get("additionalParamsForVnf"):
2501 deploy_params.update(
2502 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2503 )
2504
2505 descriptor_config = get_configuration(vnfd, vnfd["id"])
2506 if descriptor_config:
2507 self._deploy_n2vc(
2508 logging_text=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index),
2510 db_nsr=db_nsr,
2511 db_vnfr=db_vnfr,
2512 nslcmop_id=nslcmop_id,
2513 nsr_id=nsr_id,
2514 nsi_id=nsi_id,
2515 vnfd_id=vnfd_id,
2516 vdu_id=vdu_id,
2517 kdu_name=kdu_name,
2518 member_vnf_index=member_vnf_index,
2519 vdu_index=vdu_index,
2520 vdu_name=vdu_name,
2521 deploy_params=deploy_params,
2522 descriptor_config=descriptor_config,
2523 base_folder=base_folder,
2524 task_instantiation_info=tasks_dict_info,
2525 stage=stage,
2526 )
2527
2528 # Deploy charms for each VDU that supports one.
2529 for vdud in get_vdu_list(vnfd):
2530 vdu_id = vdud["id"]
2531 descriptor_config = get_configuration(vnfd, vdu_id)
2532 vdur = find_in_list(
2533 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2534 )
2535
2536 if vdur.get("additionalParams"):
2537 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2538 else:
2539 deploy_params_vdu = deploy_params
2540 deploy_params_vdu["OSM"] = get_osm_params(
2541 db_vnfr, vdu_id, vdu_count_index=0
2542 )
2543 vdud_count = get_number_of_instances(vnfd, vdu_id)
2544
2545 self.logger.debug("VDUD > {}".format(vdud))
2546 self.logger.debug(
2547 "Descriptor config > {}".format(descriptor_config)
2548 )
2549 if descriptor_config:
2550 vdu_name = None
2551 kdu_name = None
2552 for vdu_index in range(vdud_count):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2554 self._deploy_n2vc(
2555 logging_text=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index, vdu_id, vdu_index
2558 ),
2559 db_nsr=db_nsr,
2560 db_vnfr=db_vnfr,
2561 nslcmop_id=nslcmop_id,
2562 nsr_id=nsr_id,
2563 nsi_id=nsi_id,
2564 vnfd_id=vnfd_id,
2565 vdu_id=vdu_id,
2566 kdu_name=kdu_name,
2567 member_vnf_index=member_vnf_index,
2568 vdu_index=vdu_index,
2569 vdu_name=vdu_name,
2570 deploy_params=deploy_params_vdu,
2571 descriptor_config=descriptor_config,
2572 base_folder=base_folder,
2573 task_instantiation_info=tasks_dict_info,
2574 stage=stage,
2575 )
2576 for kdud in get_kdu_list(vnfd):
2577 kdu_name = kdud["name"]
2578 descriptor_config = get_configuration(vnfd, kdu_name)
2579 if descriptor_config:
2580 vdu_id = None
2581 vdu_index = 0
2582 vdu_name = None
2583 kdur = next(
2584 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2585 )
2586 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2587 if kdur.get("additionalParams"):
2588 deploy_params_kdu.update(
2589 parse_yaml_strings(kdur["additionalParams"].copy())
2590 )
2591
2592 self._deploy_n2vc(
2593 logging_text=logging_text,
2594 db_nsr=db_nsr,
2595 db_vnfr=db_vnfr,
2596 nslcmop_id=nslcmop_id,
2597 nsr_id=nsr_id,
2598 nsi_id=nsi_id,
2599 vnfd_id=vnfd_id,
2600 vdu_id=vdu_id,
2601 kdu_name=kdu_name,
2602 member_vnf_index=member_vnf_index,
2603 vdu_index=vdu_index,
2604 vdu_name=vdu_name,
2605 deploy_params=deploy_params_kdu,
2606 descriptor_config=descriptor_config,
2607 base_folder=base_folder,
2608 task_instantiation_info=tasks_dict_info,
2609 stage=stage,
2610 )
2611
2612 # Check if this NS has a charm configuration
2613 descriptor_config = nsd.get("ns-configuration")
2614 if descriptor_config and descriptor_config.get("juju"):
2615 vnfd_id = None
2616 db_vnfr = None
2617 member_vnf_index = None
2618 vdu_id = None
2619 kdu_name = None
2620 vdu_index = 0
2621 vdu_name = None
2622
2623 # Get additional parameters
2624 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2625 if db_nsr.get("additionalParamsForNs"):
2626 deploy_params.update(
2627 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2628 )
2629 base_folder = nsd["_admin"]["storage"]
2630 self._deploy_n2vc(
2631 logging_text=logging_text,
2632 db_nsr=db_nsr,
2633 db_vnfr=db_vnfr,
2634 nslcmop_id=nslcmop_id,
2635 nsr_id=nsr_id,
2636 nsi_id=nsi_id,
2637 vnfd_id=vnfd_id,
2638 vdu_id=vdu_id,
2639 kdu_name=kdu_name,
2640 member_vnf_index=member_vnf_index,
2641 vdu_index=vdu_index,
2642 vdu_name=vdu_name,
2643 deploy_params=deploy_params,
2644 descriptor_config=descriptor_config,
2645 base_folder=base_folder,
2646 task_instantiation_info=tasks_dict_info,
2647 stage=stage,
2648 )
2649
2650 # rest of staff will be done at finally
2651
2652 except (
2653 ROclient.ROClientException,
2654 DbException,
2655 LcmException,
2656 N2VCException,
2657 ) as e:
2658 self.logger.error(
2659 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2660 )
2661 exc = e
2662 except asyncio.CancelledError:
2663 self.logger.error(
2664 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2665 )
2666 exc = "Operation was cancelled"
2667 except Exception as e:
2668 exc = traceback.format_exc()
2669 self.logger.critical(
2670 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2671 exc_info=True,
2672 )
2673 finally:
2674 if exc:
2675 error_list.append(str(exc))
2676 try:
2677 # wait for pending tasks
2678 if tasks_dict_info:
2679 stage[1] = "Waiting for instantiate pending tasks."
2680 self.logger.debug(logging_text + stage[1])
2681 error_list += await self._wait_for_tasks(
2682 logging_text,
2683 tasks_dict_info,
2684 timeout_ns_deploy,
2685 stage,
2686 nslcmop_id,
2687 nsr_id=nsr_id,
2688 )
2689 stage[1] = stage[2] = ""
2690 except asyncio.CancelledError:
2691 error_list.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc:
2694 error_list.append(str(exc))
2695
2696 # update operation-status
2697 db_nsr_update["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update["config-status"] = "configured"
2700 for task, task_name in tasks_dict_info.items():
2701 if not task.done() or task.cancelled() or task.exception():
2702 if task_name.startswith(self.task_name_deploy_vca):
2703 # A N2VC task is pending
2704 db_nsr_update["config-status"] = "failed"
2705 else:
2706 # RO or KDU task is pending
2707 db_nsr_update["operational-status"] = "failed"
2708
2709 # update status at database
2710 if error_list:
2711 error_detail = ". ".join(error_list)
2712 self.logger.error(logging_text + error_detail)
2713 error_description_nslcmop = "{} Detail: {}".format(
2714 stage[0], error_detail
2715 )
2716 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id, stage[0]
2718 )
2719
2720 db_nsr_update["detailed-status"] = (
2721 error_description_nsr + " Detail: " + error_detail
2722 )
2723 db_nslcmop_update["detailed-status"] = error_detail
2724 nslcmop_operation_state = "FAILED"
2725 ns_state = "BROKEN"
2726 else:
2727 error_detail = None
2728 error_description_nsr = error_description_nslcmop = None
2729 ns_state = "READY"
2730 db_nsr_update["detailed-status"] = "Done"
2731 db_nslcmop_update["detailed-status"] = "Done"
2732 nslcmop_operation_state = "COMPLETED"
2733
2734 if db_nsr:
2735 self._write_ns_status(
2736 nsr_id=nsr_id,
2737 ns_state=ns_state,
2738 current_operation="IDLE",
2739 current_operation_id=None,
2740 error_description=error_description_nsr,
2741 error_detail=error_detail,
2742 other_update=db_nsr_update,
2743 )
2744 self._write_op_status(
2745 op_id=nslcmop_id,
2746 stage="",
2747 error_message=error_description_nslcmop,
2748 operation_state=nslcmop_operation_state,
2749 other_update=db_nslcmop_update,
2750 )
2751
2752 if nslcmop_operation_state:
2753 try:
2754 await self.msg.aiowrite(
2755 "ns",
2756 "instantiated",
2757 {
2758 "nsr_id": nsr_id,
2759 "nslcmop_id": nslcmop_id,
2760 "operationState": nslcmop_operation_state,
2761 },
2762 loop=self.loop,
2763 )
2764 except Exception as e:
2765 self.logger.error(
2766 logging_text + "kafka_write notification Exception {}".format(e)
2767 )
2768
2769 self.logger.debug(logging_text + "Exit")
2770 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2771
2772 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2773 if vnfd_id not in cached_vnfds:
2774 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2775 return cached_vnfds[vnfd_id]
2776
2777 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2778 if vnf_profile_id not in cached_vnfrs:
2779 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2780 "vnfrs",
2781 {
2782 "member-vnf-index-ref": vnf_profile_id,
2783 "nsr-id-ref": nsr_id,
2784 },
2785 )
2786 return cached_vnfrs[vnf_profile_id]
2787
2788 def _is_deployed_vca_in_relation(
2789 self, vca: DeployedVCA, relation: Relation
2790 ) -> bool:
2791 found = False
2792 for endpoint in (relation.provider, relation.requirer):
2793 if endpoint["kdu-resource-profile-id"]:
2794 continue
2795 found = (
2796 vca.vnf_profile_id == endpoint.vnf_profile_id
2797 and vca.vdu_profile_id == endpoint.vdu_profile_id
2798 and vca.execution_environment_ref == endpoint.execution_environment_ref
2799 )
2800 if found:
2801 break
2802 return found
2803
2804 def _update_ee_relation_data_with_implicit_data(
2805 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2806 ):
2807 ee_relation_data = safe_get_ee_relation(
2808 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2809 )
2810 ee_relation_level = EELevel.get_level(ee_relation_data)
2811 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2812 "execution-environment-ref"
2813 ]:
2814 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2815 vnfd_id = vnf_profile["vnfd-id"]
2816 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2817 entity_id = (
2818 vnfd_id
2819 if ee_relation_level == EELevel.VNF
2820 else ee_relation_data["vdu-profile-id"]
2821 )
2822 ee = get_juju_ee_ref(db_vnfd, entity_id)
2823 if not ee:
2824 raise Exception(
2825 f"not execution environments found for ee_relation {ee_relation_data}"
2826 )
2827 ee_relation_data["execution-environment-ref"] = ee["id"]
2828 return ee_relation_data
2829
2830 def _get_ns_relations(
2831 self,
2832 nsr_id: str,
2833 nsd: Dict[str, Any],
2834 vca: DeployedVCA,
2835 cached_vnfds: Dict[str, Any],
2836 ) -> List[Relation]:
2837 relations = []
2838 db_ns_relations = get_ns_configuration_relation_list(nsd)
2839 for r in db_ns_relations:
2840 provider_dict = None
2841 requirer_dict = None
2842 if all(key in r for key in ("provider", "requirer")):
2843 provider_dict = r["provider"]
2844 requirer_dict = r["requirer"]
2845 elif "entities" in r:
2846 provider_id = r["entities"][0]["id"]
2847 provider_dict = {
2848 "nsr-id": nsr_id,
2849 "endpoint": r["entities"][0]["endpoint"],
2850 }
2851 if provider_id != nsd["id"]:
2852 provider_dict["vnf-profile-id"] = provider_id
2853 requirer_id = r["entities"][1]["id"]
2854 requirer_dict = {
2855 "nsr-id": nsr_id,
2856 "endpoint": r["entities"][1]["endpoint"],
2857 }
2858 if requirer_id != nsd["id"]:
2859 requirer_dict["vnf-profile-id"] = requirer_id
2860 else:
2861 raise Exception("provider/requirer or entities must be included in the relation.")
2862 relation_provider = self._update_ee_relation_data_with_implicit_data(
2863 nsr_id, nsd, provider_dict, cached_vnfds
2864 )
2865 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2866 nsr_id, nsd, requirer_dict, cached_vnfds
2867 )
2868 provider = EERelation(relation_provider)
2869 requirer = EERelation(relation_requirer)
2870 relation = Relation(r["name"], provider, requirer)
2871 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2872 if vca_in_relation:
2873 relations.append(relation)
2874 return relations
2875
2876 def _get_vnf_relations(
2877 self,
2878 nsr_id: str,
2879 nsd: Dict[str, Any],
2880 vca: DeployedVCA,
2881 cached_vnfds: Dict[str, Any],
2882 ) -> List[Relation]:
2883 relations = []
2884 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2885 vnf_profile_id = vnf_profile["id"]
2886 vnfd_id = vnf_profile["vnfd-id"]
2887 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2888 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2889 for r in db_vnf_relations:
2890 provider_dict = None
2891 requirer_dict = None
2892 if all(key in r for key in ("provider", "requirer")):
2893 provider_dict = r["provider"]
2894 requirer_dict = r["requirer"]
2895 elif "entities" in r:
2896 provider_id = r["entities"][0]["id"]
2897 provider_dict = {
2898 "nsr-id": nsr_id,
2899 "vnf-profile-id": vnf_profile_id,
2900 "endpoint": r["entities"][0]["endpoint"],
2901 }
2902 if provider_id != vnfd_id:
2903 provider_dict["vdu-profile-id"] = provider_id
2904 requirer_id = r["entities"][1]["id"]
2905 requirer_dict = {
2906 "nsr-id": nsr_id,
2907 "vnf-profile-id": vnf_profile_id,
2908 "endpoint": r["entities"][1]["endpoint"],
2909 }
2910 if requirer_id != vnfd_id:
2911 requirer_dict["vdu-profile-id"] = requirer_id
2912 else:
2913 raise Exception("provider/requirer or entities must be included in the relation.")
2914 relation_provider = self._update_ee_relation_data_with_implicit_data(
2915 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2916 )
2917 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2918 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2919 )
2920 provider = EERelation(relation_provider)
2921 requirer = EERelation(relation_requirer)
2922 relation = Relation(r["name"], provider, requirer)
2923 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2924 if vca_in_relation:
2925 relations.append(relation)
2926 return relations
2927
2928 def _get_kdu_resource_data(
2929 self,
2930 ee_relation: EERelation,
2931 db_nsr: Dict[str, Any],
2932 cached_vnfds: Dict[str, Any],
2933 ) -> DeployedK8sResource:
2934 nsd = get_nsd(db_nsr)
2935 vnf_profiles = get_vnf_profiles(nsd)
2936 vnfd_id = find_in_list(
2937 vnf_profiles,
2938 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2939 )["vnfd-id"]
2940 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2941 kdu_resource_profile = get_kdu_resource_profile(
2942 db_vnfd, ee_relation.kdu_resource_profile_id
2943 )
2944 kdu_name = kdu_resource_profile["kdu-name"]
2945 deployed_kdu, _ = get_deployed_kdu(
2946 db_nsr.get("_admin", ()).get("deployed", ()),
2947 kdu_name,
2948 ee_relation.vnf_profile_id,
2949 )
2950 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2951 return deployed_kdu
2952
2953 def _get_deployed_component(
2954 self,
2955 ee_relation: EERelation,
2956 db_nsr: Dict[str, Any],
2957 cached_vnfds: Dict[str, Any],
2958 ) -> DeployedComponent:
2959 nsr_id = db_nsr["_id"]
2960 deployed_component = None
2961 ee_level = EELevel.get_level(ee_relation)
2962 if ee_level == EELevel.NS:
2963 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2964 if vca:
2965 deployed_component = DeployedVCA(nsr_id, vca)
2966 elif ee_level == EELevel.VNF:
2967 vca = get_deployed_vca(
2968 db_nsr,
2969 {
2970 "vdu_id": None,
2971 "member-vnf-index": ee_relation.vnf_profile_id,
2972 "ee_descriptor_id": ee_relation.execution_environment_ref,
2973 },
2974 )
2975 if vca:
2976 deployed_component = DeployedVCA(nsr_id, vca)
2977 elif ee_level == EELevel.VDU:
2978 vca = get_deployed_vca(
2979 db_nsr,
2980 {
2981 "vdu_id": ee_relation.vdu_profile_id,
2982 "member-vnf-index": ee_relation.vnf_profile_id,
2983 "ee_descriptor_id": ee_relation.execution_environment_ref,
2984 },
2985 )
2986 if vca:
2987 deployed_component = DeployedVCA(nsr_id, vca)
2988 elif ee_level == EELevel.KDU:
2989 kdu_resource_data = self._get_kdu_resource_data(
2990 ee_relation, db_nsr, cached_vnfds
2991 )
2992 if kdu_resource_data:
2993 deployed_component = DeployedK8sResource(kdu_resource_data)
2994 return deployed_component
2995
2996 async def _add_relation(
2997 self,
2998 relation: Relation,
2999 vca_type: str,
3000 db_nsr: Dict[str, Any],
3001 cached_vnfds: Dict[str, Any],
3002 cached_vnfrs: Dict[str, Any],
3003 ) -> bool:
3004 deployed_provider = self._get_deployed_component(
3005 relation.provider, db_nsr, cached_vnfds
3006 )
3007 deployed_requirer = self._get_deployed_component(
3008 relation.requirer, db_nsr, cached_vnfds
3009 )
3010 if (
3011 deployed_provider
3012 and deployed_requirer
3013 and deployed_provider.config_sw_installed
3014 and deployed_requirer.config_sw_installed
3015 ):
3016 provider_db_vnfr = (
3017 self._get_vnfr(
3018 relation.provider.nsr_id,
3019 relation.provider.vnf_profile_id,
3020 cached_vnfrs,
3021 )
3022 if relation.provider.vnf_profile_id
3023 else None
3024 )
3025 requirer_db_vnfr = (
3026 self._get_vnfr(
3027 relation.requirer.nsr_id,
3028 relation.requirer.vnf_profile_id,
3029 cached_vnfrs,
3030 )
3031 if relation.requirer.vnf_profile_id
3032 else None
3033 )
3034 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3035 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3036 provider_relation_endpoint = RelationEndpoint(
3037 deployed_provider.ee_id,
3038 provider_vca_id,
3039 relation.provider.endpoint,
3040 )
3041 requirer_relation_endpoint = RelationEndpoint(
3042 deployed_requirer.ee_id,
3043 requirer_vca_id,
3044 relation.requirer.endpoint,
3045 )
3046 await self.vca_map[vca_type].add_relation(
3047 provider=provider_relation_endpoint,
3048 requirer=requirer_relation_endpoint,
3049 )
3050 # remove entry from relations list
3051 return True
3052 return False
3053
3054 async def _add_vca_relations(
3055 self,
3056 logging_text,
3057 nsr_id,
3058 vca_type: str,
3059 vca_index: int,
3060 timeout: int = 3600,
3061 ) -> bool:
3062
3063 # steps:
3064 # 1. find all relations for this VCA
3065 # 2. wait for other peers related
3066 # 3. add relations
3067
3068 try:
3069 # STEP 1: find all relations for this VCA
3070
3071 # read nsr record
3072 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3073 nsd = get_nsd(db_nsr)
3074
3075 # this VCA data
3076 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3077 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3078
3079 cached_vnfds = {}
3080 cached_vnfrs = {}
3081 relations = []
3082 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3083 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3084
3085 # if no relations, terminate
3086 if not relations:
3087 self.logger.debug(logging_text + " No relations")
3088 return True
3089
3090 self.logger.debug(logging_text + " adding relations {}".format(relations))
3091
3092 # add all relations
3093 start = time()
3094 while True:
3095 # check timeout
3096 now = time()
3097 if now - start >= timeout:
3098 self.logger.error(logging_text + " : timeout adding relations")
3099 return False
3100
3101 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3102 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3103
3104 # for each relation, find the VCA's related
3105 for relation in relations.copy():
3106 added = await self._add_relation(
3107 relation,
3108 vca_type,
3109 db_nsr,
3110 cached_vnfds,
3111 cached_vnfrs,
3112 )
3113 if added:
3114 relations.remove(relation)
3115
3116 if not relations:
3117 self.logger.debug("Relations added")
3118 break
3119 await asyncio.sleep(5.0)
3120
3121 return True
3122
3123 except Exception as e:
3124 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3125 return False
3126
3127 async def _install_kdu(
3128 self,
3129 nsr_id: str,
3130 nsr_db_path: str,
3131 vnfr_data: dict,
3132 kdu_index: int,
3133 kdud: dict,
3134 vnfd: dict,
3135 k8s_instance_info: dict,
3136 k8params: dict = None,
3137 timeout: int = 600,
3138 vca_id: str = None,
3139 ):
3140
3141 try:
3142 k8sclustertype = k8s_instance_info["k8scluster-type"]
3143 # Instantiate kdu
3144 db_dict_install = {
3145 "collection": "nsrs",
3146 "filter": {"_id": nsr_id},
3147 "path": nsr_db_path,
3148 }
3149
3150 if k8s_instance_info.get("kdu-deployment-name"):
3151 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3152 else:
3153 kdu_instance = self.k8scluster_map[
3154 k8sclustertype
3155 ].generate_kdu_instance_name(
3156 db_dict=db_dict_install,
3157 kdu_model=k8s_instance_info["kdu-model"],
3158 kdu_name=k8s_instance_info["kdu-name"],
3159 )
3160
3161 # Update the nsrs table with the kdu-instance value
3162 self.update_db_2(
3163 item="nsrs",
3164 _id=nsr_id,
3165 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
3166 )
3167
3168 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3169 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3170 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3171 # namespace, this first verification could be removed, and the next step would be done for any kind
3172 # of KNF.
3173 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3174 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3175 if k8sclustertype in ("juju", "juju-bundle"):
3176 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3177 # that the user passed a namespace which he wants its KDU to be deployed in)
3178 if (
3179 self.db.count(
3180 table="nsrs",
3181 q_filter={
3182 "_id": nsr_id,
3183 "_admin.projects_write": k8s_instance_info["namespace"],
3184 "_admin.projects_read": k8s_instance_info["namespace"],
3185 },
3186 )
3187 > 0
3188 ):
3189 self.logger.debug(
3190 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3191 )
3192 self.update_db_2(
3193 item="nsrs",
3194 _id=nsr_id,
3195 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3196 )
3197 k8s_instance_info["namespace"] = kdu_instance
3198
3199 await self.k8scluster_map[k8sclustertype].install(
3200 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3201 kdu_model=k8s_instance_info["kdu-model"],
3202 atomic=True,
3203 params=k8params,
3204 db_dict=db_dict_install,
3205 timeout=timeout,
3206 kdu_name=k8s_instance_info["kdu-name"],
3207 namespace=k8s_instance_info["namespace"],
3208 kdu_instance=kdu_instance,
3209 vca_id=vca_id,
3210 )
3211
3212 # Obtain services to obtain management service ip
3213 services = await self.k8scluster_map[k8sclustertype].get_services(
3214 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3215 kdu_instance=kdu_instance,
3216 namespace=k8s_instance_info["namespace"],
3217 )
3218
3219 # Obtain management service info (if exists)
3220 vnfr_update_dict = {}
3221 kdu_config = get_configuration(vnfd, kdud["name"])
3222 if kdu_config:
3223 target_ee_list = kdu_config.get("execution-environment-list", [])
3224 else:
3225 target_ee_list = []
3226
3227 if services:
3228 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3229 mgmt_services = [
3230 service
3231 for service in kdud.get("service", [])
3232 if service.get("mgmt-service")
3233 ]
3234 for mgmt_service in mgmt_services:
3235 for service in services:
3236 if service["name"].startswith(mgmt_service["name"]):
3237 # Mgmt service found, Obtain service ip
3238 ip = service.get("external_ip", service.get("cluster_ip"))
3239 if isinstance(ip, list) and len(ip) == 1:
3240 ip = ip[0]
3241
3242 vnfr_update_dict[
3243 "kdur.{}.ip-address".format(kdu_index)
3244 ] = ip
3245
3246 # Check if must update also mgmt ip at the vnf
3247 service_external_cp = mgmt_service.get(
3248 "external-connection-point-ref"
3249 )
3250 if service_external_cp:
3251 if (
3252 deep_get(vnfd, ("mgmt-interface", "cp"))
3253 == service_external_cp
3254 ):
3255 vnfr_update_dict["ip-address"] = ip
3256
3257 if find_in_list(
3258 target_ee_list,
3259 lambda ee: ee.get(
3260 "external-connection-point-ref", ""
3261 )
3262 == service_external_cp,
3263 ):
3264 vnfr_update_dict[
3265 "kdur.{}.ip-address".format(kdu_index)
3266 ] = ip
3267 break
3268 else:
3269 self.logger.warn(
3270 "Mgmt service name: {} not found".format(
3271 mgmt_service["name"]
3272 )
3273 )
3274
3275 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3276 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3277
3278 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3279 if (
3280 kdu_config
3281 and kdu_config.get("initial-config-primitive")
3282 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3283 ):
3284 initial_config_primitive_list = kdu_config.get(
3285 "initial-config-primitive"
3286 )
3287 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3288
3289 for initial_config_primitive in initial_config_primitive_list:
3290 primitive_params_ = self._map_primitive_params(
3291 initial_config_primitive, {}, {}
3292 )
3293
3294 await asyncio.wait_for(
3295 self.k8scluster_map[k8sclustertype].exec_primitive(
3296 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3297 kdu_instance=kdu_instance,
3298 primitive_name=initial_config_primitive["name"],
3299 params=primitive_params_,
3300 db_dict=db_dict_install,
3301 vca_id=vca_id,
3302 ),
3303 timeout=timeout,
3304 )
3305
3306 except Exception as e:
3307 # Prepare update db with error and raise exception
3308 try:
3309 self.update_db_2(
3310 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3311 )
3312 self.update_db_2(
3313 "vnfrs",
3314 vnfr_data.get("_id"),
3315 {"kdur.{}.status".format(kdu_index): "ERROR"},
3316 )
3317 except Exception:
3318 # ignore to keep original exception
3319 pass
3320 # reraise original error
3321 raise
3322
3323 return kdu_instance
3324
3325 async def deploy_kdus(
3326 self,
3327 logging_text,
3328 nsr_id,
3329 nslcmop_id,
3330 db_vnfrs,
3331 db_vnfds,
3332 task_instantiation_info,
3333 ):
3334 # Launch kdus if present in the descriptor
3335
3336 k8scluster_id_2_uuic = {
3337 "helm-chart-v3": {},
3338 "helm-chart": {},
3339 "juju-bundle": {},
3340 }
3341
3342 async def _get_cluster_id(cluster_id, cluster_type):
3343 nonlocal k8scluster_id_2_uuic
3344 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3345 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3346
3347 # check if K8scluster is creating and wait look if previous tasks in process
3348 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3349 "k8scluster", cluster_id
3350 )
3351 if task_dependency:
3352 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3353 task_name, cluster_id
3354 )
3355 self.logger.debug(logging_text + text)
3356 await asyncio.wait(task_dependency, timeout=3600)
3357
3358 db_k8scluster = self.db.get_one(
3359 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3360 )
3361 if not db_k8scluster:
3362 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3363
3364 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3365 if not k8s_id:
3366 if cluster_type == "helm-chart-v3":
3367 try:
3368 # backward compatibility for existing clusters that have not been initialized for helm v3
3369 k8s_credentials = yaml.safe_dump(
3370 db_k8scluster.get("credentials")
3371 )
3372 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3373 k8s_credentials, reuse_cluster_uuid=cluster_id
3374 )
3375 db_k8scluster_update = {}
3376 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3377 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3378 db_k8scluster_update[
3379 "_admin.helm-chart-v3.created"
3380 ] = uninstall_sw
3381 db_k8scluster_update[
3382 "_admin.helm-chart-v3.operationalState"
3383 ] = "ENABLED"
3384 self.update_db_2(
3385 "k8sclusters", cluster_id, db_k8scluster_update
3386 )
3387 except Exception as e:
3388 self.logger.error(
3389 logging_text
3390 + "error initializing helm-v3 cluster: {}".format(str(e))
3391 )
3392 raise LcmException(
3393 "K8s cluster '{}' has not been initialized for '{}'".format(
3394 cluster_id, cluster_type
3395 )
3396 )
3397 else:
3398 raise LcmException(
3399 "K8s cluster '{}' has not been initialized for '{}'".format(
3400 cluster_id, cluster_type
3401 )
3402 )
3403 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3404 return k8s_id
3405
3406 logging_text += "Deploy kdus: "
3407 step = ""
3408 try:
3409 db_nsr_update = {"_admin.deployed.K8s": []}
3410 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3411
3412 index = 0
3413 updated_cluster_list = []
3414 updated_v3_cluster_list = []
3415
3416 for vnfr_data in db_vnfrs.values():
3417 vca_id = self.get_vca_id(vnfr_data, {})
3418 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3419 # Step 0: Prepare and set parameters
3420 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3421 vnfd_id = vnfr_data.get("vnfd-id")
3422 vnfd_with_id = find_in_list(
3423 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3424 )
3425 kdud = next(
3426 kdud
3427 for kdud in vnfd_with_id["kdu"]
3428 if kdud["name"] == kdur["kdu-name"]
3429 )
3430 namespace = kdur.get("k8s-namespace")
3431 kdu_deployment_name = kdur.get("kdu-deployment-name")
3432 if kdur.get("helm-chart"):
3433 kdumodel = kdur["helm-chart"]
3434 # Default version: helm3, if helm-version is v2 assign v2
3435 k8sclustertype = "helm-chart-v3"
3436 self.logger.debug("kdur: {}".format(kdur))
3437 if (
3438 kdur.get("helm-version")
3439 and kdur.get("helm-version") == "v2"
3440 ):
3441 k8sclustertype = "helm-chart"
3442 elif kdur.get("juju-bundle"):
3443 kdumodel = kdur["juju-bundle"]
3444 k8sclustertype = "juju-bundle"
3445 else:
3446 raise LcmException(
3447 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3448 "juju-bundle. Maybe an old NBI version is running".format(
3449 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3450 )
3451 )
3452 # check if kdumodel is a file and exists
3453 try:
3454 vnfd_with_id = find_in_list(
3455 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3456 )
3457 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3458 if storage: # may be not present if vnfd has not artifacts
3459 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3460 if storage["pkg-dir"]:
3461 filename = "{}/{}/{}s/{}".format(
3462 storage["folder"],
3463 storage["pkg-dir"],
3464 k8sclustertype,
3465 kdumodel,
3466 )
3467 else:
3468 filename = "{}/Scripts/{}s/{}".format(
3469 storage["folder"],
3470 k8sclustertype,
3471 kdumodel,
3472 )
3473 if self.fs.file_exists(
3474 filename, mode="file"
3475 ) or self.fs.file_exists(filename, mode="dir"):
3476 kdumodel = self.fs.path + filename
3477 except (asyncio.TimeoutError, asyncio.CancelledError):
3478 raise
3479 except Exception: # it is not a file
3480 pass
3481
3482 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3483 step = "Synchronize repos for k8s cluster '{}'".format(
3484 k8s_cluster_id
3485 )
3486 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3487
3488 # Synchronize repos
3489 if (
3490 k8sclustertype == "helm-chart"
3491 and cluster_uuid not in updated_cluster_list
3492 ) or (
3493 k8sclustertype == "helm-chart-v3"
3494 and cluster_uuid not in updated_v3_cluster_list
3495 ):
3496 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3497 self.k8scluster_map[k8sclustertype].synchronize_repos(
3498 cluster_uuid=cluster_uuid
3499 )
3500 )
3501 if del_repo_list or added_repo_dict:
3502 if k8sclustertype == "helm-chart":
3503 unset = {
3504 "_admin.helm_charts_added." + item: None
3505 for item in del_repo_list
3506 }
3507 updated = {
3508 "_admin.helm_charts_added." + item: name
3509 for item, name in added_repo_dict.items()
3510 }
3511 updated_cluster_list.append(cluster_uuid)
3512 elif k8sclustertype == "helm-chart-v3":
3513 unset = {
3514 "_admin.helm_charts_v3_added." + item: None
3515 for item in del_repo_list
3516 }
3517 updated = {
3518 "_admin.helm_charts_v3_added." + item: name
3519 for item, name in added_repo_dict.items()
3520 }
3521 updated_v3_cluster_list.append(cluster_uuid)
3522 self.logger.debug(
3523 logging_text + "repos synchronized on k8s cluster "
3524 "'{}' to_delete: {}, to_add: {}".format(
3525 k8s_cluster_id, del_repo_list, added_repo_dict
3526 )
3527 )
3528 self.db.set_one(
3529 "k8sclusters",
3530 {"_id": k8s_cluster_id},
3531 updated,
3532 unset=unset,
3533 )
3534
3535 # Instantiate kdu
3536 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3537 vnfr_data["member-vnf-index-ref"],
3538 kdur["kdu-name"],
3539 k8s_cluster_id,
3540 )
3541 k8s_instance_info = {
3542 "kdu-instance": None,
3543 "k8scluster-uuid": cluster_uuid,
3544 "k8scluster-type": k8sclustertype,
3545 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3546 "kdu-name": kdur["kdu-name"],
3547 "kdu-model": kdumodel,
3548 "namespace": namespace,
3549 "kdu-deployment-name": kdu_deployment_name,
3550 }
3551 db_path = "_admin.deployed.K8s.{}".format(index)
3552 db_nsr_update[db_path] = k8s_instance_info
3553 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3554 vnfd_with_id = find_in_list(
3555 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3556 )
3557 task = asyncio.ensure_future(
3558 self._install_kdu(
3559 nsr_id,
3560 db_path,
3561 vnfr_data,
3562 kdu_index,
3563 kdud,
3564 vnfd_with_id,
3565 k8s_instance_info,
3566 k8params=desc_params,
3567 timeout=1800,
3568 vca_id=vca_id,
3569 )
3570 )
3571 self.lcm_tasks.register(
3572 "ns",
3573 nsr_id,
3574 nslcmop_id,
3575 "instantiate_KDU-{}".format(index),
3576 task,
3577 )
3578 task_instantiation_info[task] = "Deploying KDU {}".format(
3579 kdur["kdu-name"]
3580 )
3581
3582 index += 1
3583
3584 except (LcmException, asyncio.CancelledError):
3585 raise
3586 except Exception as e:
3587 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3588 if isinstance(e, (N2VCException, DbException)):
3589 self.logger.error(logging_text + msg)
3590 else:
3591 self.logger.critical(logging_text + msg, exc_info=True)
3592 raise LcmException(msg)
3593 finally:
3594 if db_nsr_update:
3595 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3596
3597 def _deploy_n2vc(
3598 self,
3599 logging_text,
3600 db_nsr,
3601 db_vnfr,
3602 nslcmop_id,
3603 nsr_id,
3604 nsi_id,
3605 vnfd_id,
3606 vdu_id,
3607 kdu_name,
3608 member_vnf_index,
3609 vdu_index,
3610 vdu_name,
3611 deploy_params,
3612 descriptor_config,
3613 base_folder,
3614 task_instantiation_info,
3615 stage,
3616 ):
3617 # launch instantiate_N2VC in a asyncio task and register task object
3618 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3619 # if not found, create one entry and update database
3620 # fill db_nsr._admin.deployed.VCA.<index>
3621
3622 self.logger.debug(
3623 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3624 )
3625 if "execution-environment-list" in descriptor_config:
3626 ee_list = descriptor_config.get("execution-environment-list", [])
3627 elif "juju" in descriptor_config:
3628 ee_list = [descriptor_config] # ns charms
3629 else: # other types as script are not supported
3630 ee_list = []
3631
3632 for ee_item in ee_list:
3633 self.logger.debug(
3634 logging_text
3635 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3636 ee_item.get("juju"), ee_item.get("helm-chart")
3637 )
3638 )
3639 ee_descriptor_id = ee_item.get("id")
3640 if ee_item.get("juju"):
3641 vca_name = ee_item["juju"].get("charm")
3642 vca_type = (
3643 "lxc_proxy_charm"
3644 if ee_item["juju"].get("charm") is not None
3645 else "native_charm"
3646 )
3647 if ee_item["juju"].get("cloud") == "k8s":
3648 vca_type = "k8s_proxy_charm"
3649 elif ee_item["juju"].get("proxy") is False:
3650 vca_type = "native_charm"
3651 elif ee_item.get("helm-chart"):
3652 vca_name = ee_item["helm-chart"]
3653 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3654 vca_type = "helm"
3655 else:
3656 vca_type = "helm-v3"
3657 else:
3658 self.logger.debug(
3659 logging_text + "skipping non juju neither charm configuration"
3660 )
3661 continue
3662
3663 vca_index = -1
3664 for vca_index, vca_deployed in enumerate(
3665 db_nsr["_admin"]["deployed"]["VCA"]
3666 ):
3667 if not vca_deployed:
3668 continue
3669 if (
3670 vca_deployed.get("member-vnf-index") == member_vnf_index
3671 and vca_deployed.get("vdu_id") == vdu_id
3672 and vca_deployed.get("kdu_name") == kdu_name
3673 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3674 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3675 ):
3676 break
3677 else:
3678 # not found, create one.
3679 target = (
3680 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3681 )
3682 if vdu_id:
3683 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3684 elif kdu_name:
3685 target += "/kdu/{}".format(kdu_name)
3686 vca_deployed = {
3687 "target_element": target,
3688 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3689 "member-vnf-index": member_vnf_index,
3690 "vdu_id": vdu_id,
3691 "kdu_name": kdu_name,
3692 "vdu_count_index": vdu_index,
3693 "operational-status": "init", # TODO revise
3694 "detailed-status": "", # TODO revise
3695 "step": "initial-deploy", # TODO revise
3696 "vnfd_id": vnfd_id,
3697 "vdu_name": vdu_name,
3698 "type": vca_type,
3699 "ee_descriptor_id": ee_descriptor_id,
3700 }
3701 vca_index += 1
3702
3703 # create VCA and configurationStatus in db
3704 db_dict = {
3705 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3706 "configurationStatus.{}".format(vca_index): dict(),
3707 }
3708 self.update_db_2("nsrs", nsr_id, db_dict)
3709
3710 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3711
3712 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3713 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3714 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3715
3716 # Launch task
3717 task_n2vc = asyncio.ensure_future(
3718 self.instantiate_N2VC(
3719 logging_text=logging_text,
3720 vca_index=vca_index,
3721 nsi_id=nsi_id,
3722 db_nsr=db_nsr,
3723 db_vnfr=db_vnfr,
3724 vdu_id=vdu_id,
3725 kdu_name=kdu_name,
3726 vdu_index=vdu_index,
3727 deploy_params=deploy_params,
3728 config_descriptor=descriptor_config,
3729 base_folder=base_folder,
3730 nslcmop_id=nslcmop_id,
3731 stage=stage,
3732 vca_type=vca_type,
3733 vca_name=vca_name,
3734 ee_config_descriptor=ee_item,
3735 )
3736 )
3737 self.lcm_tasks.register(
3738 "ns",
3739 nsr_id,
3740 nslcmop_id,
3741 "instantiate_N2VC-{}".format(vca_index),
3742 task_n2vc,
3743 )
3744 task_instantiation_info[
3745 task_n2vc
3746 ] = self.task_name_deploy_vca + " {}.{}".format(
3747 member_vnf_index or "", vdu_id or ""
3748 )
3749
3750 @staticmethod
3751 def _create_nslcmop(nsr_id, operation, params):
3752 """
3753 Creates a ns-lcm-opp content to be stored at database.
3754 :param nsr_id: internal id of the instance
3755 :param operation: instantiate, terminate, scale, action, ...
3756 :param params: user parameters for the operation
3757 :return: dictionary following SOL005 format
3758 """
3759 # Raise exception if invalid arguments
3760 if not (nsr_id and operation and params):
3761 raise LcmException(
3762 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3763 )
3764 now = time()
3765 _id = str(uuid4())
3766 nslcmop = {
3767 "id": _id,
3768 "_id": _id,
3769 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3770 "operationState": "PROCESSING",
3771 "statusEnteredTime": now,
3772 "nsInstanceId": nsr_id,
3773 "lcmOperationType": operation,
3774 "startTime": now,
3775 "isAutomaticInvocation": False,
3776 "operationParams": params,
3777 "isCancelPending": False,
3778 "links": {
3779 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3780 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3781 },
3782 }
3783 return nslcmop
3784
3785 def _format_additional_params(self, params):
3786 params = params or {}
3787 for key, value in params.items():
3788 if str(value).startswith("!!yaml "):
3789 params[key] = yaml.safe_load(value[7:])
3790 return params
3791
3792 def _get_terminate_primitive_params(self, seq, vnf_index):
3793 primitive = seq.get("name")
3794 primitive_params = {}
3795 params = {
3796 "member_vnf_index": vnf_index,
3797 "primitive": primitive,
3798 "primitive_params": primitive_params,
3799 }
3800 desc_params = {}
3801 return self._map_primitive_params(seq, params, desc_params)
3802
3803 # sub-operations
3804
3805 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3806 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3807 if op.get("operationState") == "COMPLETED":
3808 # b. Skip sub-operation
3809 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3810 return self.SUBOPERATION_STATUS_SKIP
3811 else:
3812 # c. retry executing sub-operation
3813 # The sub-operation exists, and operationState != 'COMPLETED'
3814 # Update operationState = 'PROCESSING' to indicate a retry.
3815 operationState = "PROCESSING"
3816 detailed_status = "In progress"
3817 self._update_suboperation_status(
3818 db_nslcmop, op_index, operationState, detailed_status
3819 )
3820 # Return the sub-operation index
3821 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3822 # with arguments extracted from the sub-operation
3823 return op_index
3824
3825 # Find a sub-operation where all keys in a matching dictionary must match
3826 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3827 def _find_suboperation(self, db_nslcmop, match):
3828 if db_nslcmop and match:
3829 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3830 for i, op in enumerate(op_list):
3831 if all(op.get(k) == match[k] for k in match):
3832 return i
3833 return self.SUBOPERATION_STATUS_NOT_FOUND
3834
3835 # Update status for a sub-operation given its index
3836 def _update_suboperation_status(
3837 self, db_nslcmop, op_index, operationState, detailed_status
3838 ):
3839 # Update DB for HA tasks
3840 q_filter = {"_id": db_nslcmop["_id"]}
3841 update_dict = {
3842 "_admin.operations.{}.operationState".format(op_index): operationState,
3843 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3844 }
3845 self.db.set_one(
3846 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3847 )
3848
3849 # Add sub-operation, return the index of the added sub-operation
3850 # Optionally, set operationState, detailed-status, and operationType
3851 # Status and type are currently set for 'scale' sub-operations:
3852 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3853 # 'detailed-status' : status message
3854 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3855 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3856 def _add_suboperation(
3857 self,
3858 db_nslcmop,
3859 vnf_index,
3860 vdu_id,
3861 vdu_count_index,
3862 vdu_name,
3863 primitive,
3864 mapped_primitive_params,
3865 operationState=None,
3866 detailed_status=None,
3867 operationType=None,
3868 RO_nsr_id=None,
3869 RO_scaling_info=None,
3870 ):
3871 if not db_nslcmop:
3872 return self.SUBOPERATION_STATUS_NOT_FOUND
3873 # Get the "_admin.operations" list, if it exists
3874 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3875 op_list = db_nslcmop_admin.get("operations")
3876 # Create or append to the "_admin.operations" list
3877 new_op = {
3878 "member_vnf_index": vnf_index,
3879 "vdu_id": vdu_id,
3880 "vdu_count_index": vdu_count_index,
3881 "primitive": primitive,
3882 "primitive_params": mapped_primitive_params,
3883 }
3884 if operationState:
3885 new_op["operationState"] = operationState
3886 if detailed_status:
3887 new_op["detailed-status"] = detailed_status
3888 if operationType:
3889 new_op["lcmOperationType"] = operationType
3890 if RO_nsr_id:
3891 new_op["RO_nsr_id"] = RO_nsr_id
3892 if RO_scaling_info:
3893 new_op["RO_scaling_info"] = RO_scaling_info
3894 if not op_list:
3895 # No existing operations, create key 'operations' with current operation as first list element
3896 db_nslcmop_admin.update({"operations": [new_op]})
3897 op_list = db_nslcmop_admin.get("operations")
3898 else:
3899 # Existing operations, append operation to list
3900 op_list.append(new_op)
3901
3902 db_nslcmop_update = {"_admin.operations": op_list}
3903 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3904 op_index = len(op_list) - 1
3905 return op_index
3906
3907 # Helper methods for scale() sub-operations
3908
3909 # pre-scale/post-scale:
3910 # Check for 3 different cases:
3911 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3912 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3913 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3914 def _check_or_add_scale_suboperation(
3915 self,
3916 db_nslcmop,
3917 vnf_index,
3918 vnf_config_primitive,
3919 primitive_params,
3920 operationType,
3921 RO_nsr_id=None,
3922 RO_scaling_info=None,
3923 ):
3924 # Find this sub-operation
3925 if RO_nsr_id and RO_scaling_info:
3926 operationType = "SCALE-RO"
3927 match = {
3928 "member_vnf_index": vnf_index,
3929 "RO_nsr_id": RO_nsr_id,
3930 "RO_scaling_info": RO_scaling_info,
3931 }
3932 else:
3933 match = {
3934 "member_vnf_index": vnf_index,
3935 "primitive": vnf_config_primitive,
3936 "primitive_params": primitive_params,
3937 "lcmOperationType": operationType,
3938 }
3939 op_index = self._find_suboperation(db_nslcmop, match)
3940 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3941 # a. New sub-operation
3942 # The sub-operation does not exist, add it.
3943 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3944 # The following parameters are set to None for all kind of scaling:
3945 vdu_id = None
3946 vdu_count_index = None
3947 vdu_name = None
3948 if RO_nsr_id and RO_scaling_info:
3949 vnf_config_primitive = None
3950 primitive_params = None
3951 else:
3952 RO_nsr_id = None
3953 RO_scaling_info = None
3954 # Initial status for sub-operation
3955 operationState = "PROCESSING"
3956 detailed_status = "In progress"
3957 # Add sub-operation for pre/post-scaling (zero or more operations)
3958 self._add_suboperation(
3959 db_nslcmop,
3960 vnf_index,
3961 vdu_id,
3962 vdu_count_index,
3963 vdu_name,
3964 vnf_config_primitive,
3965 primitive_params,
3966 operationState,
3967 detailed_status,
3968 operationType,
3969 RO_nsr_id,
3970 RO_scaling_info,
3971 )
3972 return self.SUBOPERATION_STATUS_NEW
3973 else:
3974 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3975 # or op_index (operationState != 'COMPLETED')
3976 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3977
3978 # Function to return execution_environment id
3979
3980 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3981 # TODO vdu_index_count
3982 for vca in vca_deployed_list:
3983 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3984 return vca["ee_id"]
3985
3986 async def destroy_N2VC(
3987 self,
3988 logging_text,
3989 db_nslcmop,
3990 vca_deployed,
3991 config_descriptor,
3992 vca_index,
3993 destroy_ee=True,
3994 exec_primitives=True,
3995 scaling_in=False,
3996 vca_id: str = None,
3997 ):
3998 """
3999 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4000 :param logging_text:
4001 :param db_nslcmop:
4002 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4003 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4004 :param vca_index: index in the database _admin.deployed.VCA
4005 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4006 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4007 not executed properly
4008 :param scaling_in: True destroys the application, False destroys the model
4009 :return: None or exception
4010 """
4011
4012 self.logger.debug(
4013 logging_text
4014 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4015 vca_index, vca_deployed, config_descriptor, destroy_ee
4016 )
4017 )
4018
4019 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4020
4021 # execute terminate_primitives
4022 if exec_primitives:
4023 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4024 config_descriptor.get("terminate-config-primitive"),
4025 vca_deployed.get("ee_descriptor_id"),
4026 )
4027 vdu_id = vca_deployed.get("vdu_id")
4028 vdu_count_index = vca_deployed.get("vdu_count_index")
4029 vdu_name = vca_deployed.get("vdu_name")
4030 vnf_index = vca_deployed.get("member-vnf-index")
4031 if terminate_primitives and vca_deployed.get("needed_terminate"):
4032 for seq in terminate_primitives:
4033 # For each sequence in list, get primitive and call _ns_execute_primitive()
4034 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4035 vnf_index, seq.get("name")
4036 )
4037 self.logger.debug(logging_text + step)
4038 # Create the primitive for each sequence, i.e. "primitive": "touch"
4039 primitive = seq.get("name")
4040 mapped_primitive_params = self._get_terminate_primitive_params(
4041 seq, vnf_index
4042 )
4043
4044 # Add sub-operation
4045 self._add_suboperation(
4046 db_nslcmop,
4047 vnf_index,
4048 vdu_id,
4049 vdu_count_index,
4050 vdu_name,
4051 primitive,
4052 mapped_primitive_params,
4053 )
4054 # Sub-operations: Call _ns_execute_primitive() instead of action()
4055 try:
4056 result, result_detail = await self._ns_execute_primitive(
4057 vca_deployed["ee_id"],
4058 primitive,
4059 mapped_primitive_params,
4060 vca_type=vca_type,
4061 vca_id=vca_id,
4062 )
4063 except LcmException:
4064 # this happens when VCA is not deployed. In this case it is not needed to terminate
4065 continue
4066 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4067 if result not in result_ok:
4068 raise LcmException(
4069 "terminate_primitive {} for vnf_member_index={} fails with "
4070 "error {}".format(seq.get("name"), vnf_index, result_detail)
4071 )
4072 # set that this VCA do not need terminated
4073 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4074 vca_index
4075 )
4076 self.update_db_2(
4077 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4078 )
4079
4080 # Delete Prometheus Jobs if any
4081 # This uses NSR_ID, so it will destroy any jobs under this index
4082 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4083
4084 if destroy_ee:
4085 await self.vca_map[vca_type].delete_execution_environment(
4086 vca_deployed["ee_id"],
4087 scaling_in=scaling_in,
4088 vca_type=vca_type,
4089 vca_id=vca_id,
4090 )
4091
4092 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4093 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4094 namespace = "." + db_nsr["_id"]
4095 try:
4096 await self.n2vc.delete_namespace(
4097 namespace=namespace,
4098 total_timeout=self.timeout_charm_delete,
4099 vca_id=vca_id,
4100 )
4101 except N2VCNotFound: # already deleted. Skip
4102 pass
4103 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4104
4105 async def _terminate_RO(
4106 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4107 ):
4108 """
4109 Terminates a deployment from RO
4110 :param logging_text:
4111 :param nsr_deployed: db_nsr._admin.deployed
4112 :param nsr_id:
4113 :param nslcmop_id:
4114 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4115 this method will update only the index 2, but it will write on database the concatenated content of the list
4116 :return:
4117 """
4118 db_nsr_update = {}
4119 failed_detail = []
4120 ro_nsr_id = ro_delete_action = None
4121 if nsr_deployed and nsr_deployed.get("RO"):
4122 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4123 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4124 try:
4125 if ro_nsr_id:
4126 stage[2] = "Deleting ns from VIM."
4127 db_nsr_update["detailed-status"] = " ".join(stage)
4128 self._write_op_status(nslcmop_id, stage)
4129 self.logger.debug(logging_text + stage[2])
4130 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4131 self._write_op_status(nslcmop_id, stage)
4132 desc = await self.RO.delete("ns", ro_nsr_id)
4133 ro_delete_action = desc["action_id"]
4134 db_nsr_update[
4135 "_admin.deployed.RO.nsr_delete_action_id"
4136 ] = ro_delete_action
4137 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4138 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4139 if ro_delete_action:
4140 # wait until NS is deleted from VIM
4141 stage[2] = "Waiting ns deleted from VIM."
4142 detailed_status_old = None
4143 self.logger.debug(
4144 logging_text
4145 + stage[2]
4146 + " RO_id={} ro_delete_action={}".format(
4147 ro_nsr_id, ro_delete_action
4148 )
4149 )
4150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4151 self._write_op_status(nslcmop_id, stage)
4152
4153 delete_timeout = 20 * 60 # 20 minutes
4154 while delete_timeout > 0:
4155 desc = await self.RO.show(
4156 "ns",
4157 item_id_name=ro_nsr_id,
4158 extra_item="action",
4159 extra_item_id=ro_delete_action,
4160 )
4161
4162 # deploymentStatus
4163 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4164
4165 ns_status, ns_status_info = self.RO.check_action_status(desc)
4166 if ns_status == "ERROR":
4167 raise ROclient.ROClientException(ns_status_info)
4168 elif ns_status == "BUILD":
4169 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4170 elif ns_status == "ACTIVE":
4171 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4172 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4173 break
4174 else:
4175 assert (
4176 False
4177 ), "ROclient.check_action_status returns unknown {}".format(
4178 ns_status
4179 )
4180 if stage[2] != detailed_status_old:
4181 detailed_status_old = stage[2]
4182 db_nsr_update["detailed-status"] = " ".join(stage)
4183 self._write_op_status(nslcmop_id, stage)
4184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4185 await asyncio.sleep(5, loop=self.loop)
4186 delete_timeout -= 5
4187 else: # delete_timeout <= 0:
4188 raise ROclient.ROClientException(
4189 "Timeout waiting ns deleted from VIM"
4190 )
4191
4192 except Exception as e:
4193 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4194 if (
4195 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4196 ): # not found
4197 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4198 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4199 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4200 self.logger.debug(
4201 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4202 )
4203 elif (
4204 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4205 ): # conflict
4206 failed_detail.append("delete conflict: {}".format(e))
4207 self.logger.debug(
4208 logging_text
4209 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4210 )
4211 else:
4212 failed_detail.append("delete error: {}".format(e))
4213 self.logger.error(
4214 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4215 )
4216
4217 # Delete nsd
4218 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4219 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4220 try:
4221 stage[2] = "Deleting nsd from RO."
4222 db_nsr_update["detailed-status"] = " ".join(stage)
4223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4224 self._write_op_status(nslcmop_id, stage)
4225 await self.RO.delete("nsd", ro_nsd_id)
4226 self.logger.debug(
4227 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4228 )
4229 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4230 except Exception as e:
4231 if (
4232 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4233 ): # not found
4234 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4235 self.logger.debug(
4236 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4237 )
4238 elif (
4239 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4240 ): # conflict
4241 failed_detail.append(
4242 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4243 )
4244 self.logger.debug(logging_text + failed_detail[-1])
4245 else:
4246 failed_detail.append(
4247 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4248 )
4249 self.logger.error(logging_text + failed_detail[-1])
4250
4251 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4252 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4253 if not vnf_deployed or not vnf_deployed["id"]:
4254 continue
4255 try:
4256 ro_vnfd_id = vnf_deployed["id"]
4257 stage[
4258 2
4259 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4260 vnf_deployed["member-vnf-index"], ro_vnfd_id
4261 )
4262 db_nsr_update["detailed-status"] = " ".join(stage)
4263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4264 self._write_op_status(nslcmop_id, stage)
4265 await self.RO.delete("vnfd", ro_vnfd_id)
4266 self.logger.debug(
4267 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4268 )
4269 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4270 except Exception as e:
4271 if (
4272 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4273 ): # not found
4274 db_nsr_update[
4275 "_admin.deployed.RO.vnfd.{}.id".format(index)
4276 ] = None
4277 self.logger.debug(
4278 logging_text
4279 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4280 )
4281 elif (
4282 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4283 ): # conflict
4284 failed_detail.append(
4285 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4286 )
4287 self.logger.debug(logging_text + failed_detail[-1])
4288 else:
4289 failed_detail.append(
4290 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4291 )
4292 self.logger.error(logging_text + failed_detail[-1])
4293
4294 if failed_detail:
4295 stage[2] = "Error deleting from VIM"
4296 else:
4297 stage[2] = "Deleted from VIM"
4298 db_nsr_update["detailed-status"] = " ".join(stage)
4299 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4300 self._write_op_status(nslcmop_id, stage)
4301
4302 if failed_detail:
4303 raise LcmException("; ".join(failed_detail))
4304
4305 async def terminate(self, nsr_id, nslcmop_id):
4306 # Try to lock HA task here
4307 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4308 if not task_is_locked_by_me:
4309 return
4310
4311 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4312 self.logger.debug(logging_text + "Enter")
4313 timeout_ns_terminate = self.timeout_ns_terminate
4314 db_nsr = None
4315 db_nslcmop = None
4316 operation_params = None
4317 exc = None
4318 error_list = [] # annotates all failed error messages
4319 db_nslcmop_update = {}
4320 autoremove = False # autoremove after terminated
4321 tasks_dict_info = {}
4322 db_nsr_update = {}
4323 stage = [
4324 "Stage 1/3: Preparing task.",
4325 "Waiting for previous operations to terminate.",
4326 "",
4327 ]
4328 # ^ contains [stage, step, VIM-status]
4329 try:
4330 # wait for any previous tasks in process
4331 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4332
4333 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4334 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4335 operation_params = db_nslcmop.get("operationParams") or {}
4336 if operation_params.get("timeout_ns_terminate"):
4337 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4338 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4339 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4340
4341 db_nsr_update["operational-status"] = "terminating"
4342 db_nsr_update["config-status"] = "terminating"
4343 self._write_ns_status(
4344 nsr_id=nsr_id,
4345 ns_state="TERMINATING",
4346 current_operation="TERMINATING",
4347 current_operation_id=nslcmop_id,
4348 other_update=db_nsr_update,
4349 )
4350 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4351 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4352 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4353 return
4354
4355 stage[1] = "Getting vnf descriptors from db."
4356 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4357 db_vnfrs_dict = {
4358 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4359 }
4360 db_vnfds_from_id = {}
4361 db_vnfds_from_member_index = {}
4362 # Loop over VNFRs
4363 for vnfr in db_vnfrs_list:
4364 vnfd_id = vnfr["vnfd-id"]
4365 if vnfd_id not in db_vnfds_from_id:
4366 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4367 db_vnfds_from_id[vnfd_id] = vnfd
4368 db_vnfds_from_member_index[
4369 vnfr["member-vnf-index-ref"]
4370 ] = db_vnfds_from_id[vnfd_id]
4371
4372 # Destroy individual execution environments when there are terminating primitives.
4373 # Rest of EE will be deleted at once
4374 # TODO - check before calling _destroy_N2VC
4375 # if not operation_params.get("skip_terminate_primitives"):#
4376 # or not vca.get("needed_terminate"):
4377 stage[0] = "Stage 2/3 execute terminating primitives."
4378 self.logger.debug(logging_text + stage[0])
4379 stage[1] = "Looking execution environment that needs terminate."
4380 self.logger.debug(logging_text + stage[1])
4381
4382 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4383 config_descriptor = None
4384 vca_member_vnf_index = vca.get("member-vnf-index")
4385 vca_id = self.get_vca_id(
4386 db_vnfrs_dict.get(vca_member_vnf_index)
4387 if vca_member_vnf_index
4388 else None,
4389 db_nsr,
4390 )
4391 if not vca or not vca.get("ee_id"):
4392 continue
4393 if not vca.get("member-vnf-index"):
4394 # ns
4395 config_descriptor = db_nsr.get("ns-configuration")
4396 elif vca.get("vdu_id"):
4397 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4398 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4399 elif vca.get("kdu_name"):
4400 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4401 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4402 else:
4403 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4404 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4405 vca_type = vca.get("type")
4406 exec_terminate_primitives = not operation_params.get(
4407 "skip_terminate_primitives"
4408 ) and vca.get("needed_terminate")
4409 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4410 # pending native charms
4411 destroy_ee = (
4412 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4413 )
4414 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4415 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4416 task = asyncio.ensure_future(
4417 self.destroy_N2VC(
4418 logging_text,
4419 db_nslcmop,
4420 vca,
4421 config_descriptor,
4422 vca_index,
4423 destroy_ee,
4424 exec_terminate_primitives,
4425 vca_id=vca_id,
4426 )
4427 )
4428 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4429
4430 # wait for pending tasks of terminate primitives
4431 if tasks_dict_info:
4432 self.logger.debug(
4433 logging_text
4434 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4435 )
4436 error_list = await self._wait_for_tasks(
4437 logging_text,
4438 tasks_dict_info,
4439 min(self.timeout_charm_delete, timeout_ns_terminate),
4440 stage,
4441 nslcmop_id,
4442 )
4443 tasks_dict_info.clear()
4444 if error_list:
4445 return # raise LcmException("; ".join(error_list))
4446
4447 # remove All execution environments at once
4448 stage[0] = "Stage 3/3 delete all."
4449
4450 if nsr_deployed.get("VCA"):
4451 stage[1] = "Deleting all execution environments."
4452 self.logger.debug(logging_text + stage[1])
4453 vca_id = self.get_vca_id({}, db_nsr)
4454 task_delete_ee = asyncio.ensure_future(
4455 asyncio.wait_for(
4456 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4457 timeout=self.timeout_charm_delete,
4458 )
4459 )
4460 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4461 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4462
4463 # Delete from k8scluster
4464 stage[1] = "Deleting KDUs."
4465 self.logger.debug(logging_text + stage[1])
4466 # print(nsr_deployed)
4467 for kdu in get_iterable(nsr_deployed, "K8s"):
4468 if not kdu or not kdu.get("kdu-instance"):
4469 continue
4470 kdu_instance = kdu.get("kdu-instance")
4471 if kdu.get("k8scluster-type") in self.k8scluster_map:
4472 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4473 vca_id = self.get_vca_id({}, db_nsr)
4474 task_delete_kdu_instance = asyncio.ensure_future(
4475 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4476 cluster_uuid=kdu.get("k8scluster-uuid"),
4477 kdu_instance=kdu_instance,
4478 vca_id=vca_id,
4479 )
4480 )
4481 else:
4482 self.logger.error(
4483 logging_text
4484 + "Unknown k8s deployment type {}".format(
4485 kdu.get("k8scluster-type")
4486 )
4487 )
4488 continue
4489 tasks_dict_info[
4490 task_delete_kdu_instance
4491 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4492
4493 # remove from RO
4494 stage[1] = "Deleting ns from VIM."
4495 if self.ng_ro:
4496 task_delete_ro = asyncio.ensure_future(
4497 self._terminate_ng_ro(
4498 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4499 )
4500 )
4501 else:
4502 task_delete_ro = asyncio.ensure_future(
4503 self._terminate_RO(
4504 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4505 )
4506 )
4507 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4508
4509 # rest of staff will be done at finally
4510
4511 except (
4512 ROclient.ROClientException,
4513 DbException,
4514 LcmException,
4515 N2VCException,
4516 ) as e:
4517 self.logger.error(logging_text + "Exit Exception {}".format(e))
4518 exc = e
4519 except asyncio.CancelledError:
4520 self.logger.error(
4521 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4522 )
4523 exc = "Operation was cancelled"
4524 except Exception as e:
4525 exc = traceback.format_exc()
4526 self.logger.critical(
4527 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4528 exc_info=True,
4529 )
4530 finally:
4531 if exc:
4532 error_list.append(str(exc))
4533 try:
4534 # wait for pending tasks
4535 if tasks_dict_info:
4536 stage[1] = "Waiting for terminate pending tasks."
4537 self.logger.debug(logging_text + stage[1])
4538 error_list += await self._wait_for_tasks(
4539 logging_text,
4540 tasks_dict_info,
4541 timeout_ns_terminate,
4542 stage,
4543 nslcmop_id,
4544 )
4545 stage[1] = stage[2] = ""
4546 except asyncio.CancelledError:
4547 error_list.append("Cancelled")
4548 # TODO cancell all tasks
4549 except Exception as exc:
4550 error_list.append(str(exc))
4551 # update status at database
4552 if error_list:
4553 error_detail = "; ".join(error_list)
4554 # self.logger.error(logging_text + error_detail)
4555 error_description_nslcmop = "{} Detail: {}".format(
4556 stage[0], error_detail
4557 )
4558 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4559 nslcmop_id, stage[0]
4560 )
4561
4562 db_nsr_update["operational-status"] = "failed"
4563 db_nsr_update["detailed-status"] = (
4564 error_description_nsr + " Detail: " + error_detail
4565 )
4566 db_nslcmop_update["detailed-status"] = error_detail
4567 nslcmop_operation_state = "FAILED"
4568 ns_state = "BROKEN"
4569 else:
4570 error_detail = None
4571 error_description_nsr = error_description_nslcmop = None
4572 ns_state = "NOT_INSTANTIATED"
4573 db_nsr_update["operational-status"] = "terminated"
4574 db_nsr_update["detailed-status"] = "Done"
4575 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4576 db_nslcmop_update["detailed-status"] = "Done"
4577 nslcmop_operation_state = "COMPLETED"
4578
4579 if db_nsr:
4580 self._write_ns_status(
4581 nsr_id=nsr_id,
4582 ns_state=ns_state,
4583 current_operation="IDLE",
4584 current_operation_id=None,
4585 error_description=error_description_nsr,
4586 error_detail=error_detail,
4587 other_update=db_nsr_update,
4588 )
4589 self._write_op_status(
4590 op_id=nslcmop_id,
4591 stage="",
4592 error_message=error_description_nslcmop,
4593 operation_state=nslcmop_operation_state,
4594 other_update=db_nslcmop_update,
4595 )
4596 if ns_state == "NOT_INSTANTIATED":
4597 try:
4598 self.db.set_list(
4599 "vnfrs",
4600 {"nsr-id-ref": nsr_id},
4601 {"_admin.nsState": "NOT_INSTANTIATED"},
4602 )
4603 except DbException as e:
4604 self.logger.warn(
4605 logging_text
4606 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4607 nsr_id, e
4608 )
4609 )
4610 if operation_params:
4611 autoremove = operation_params.get("autoremove", False)
4612 if nslcmop_operation_state:
4613 try:
4614 await self.msg.aiowrite(
4615 "ns",
4616 "terminated",
4617 {
4618 "nsr_id": nsr_id,
4619 "nslcmop_id": nslcmop_id,
4620 "operationState": nslcmop_operation_state,
4621 "autoremove": autoremove,
4622 },
4623 loop=self.loop,
4624 )
4625 except Exception as e:
4626 self.logger.error(
4627 logging_text + "kafka_write notification Exception {}".format(e)
4628 )
4629
4630 self.logger.debug(logging_text + "Exit")
4631 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4632
4633 async def _wait_for_tasks(
4634 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4635 ):
4636 time_start = time()
4637 error_detail_list = []
4638 error_list = []
4639 pending_tasks = list(created_tasks_info.keys())
4640 num_tasks = len(pending_tasks)
4641 num_done = 0
4642 stage[1] = "{}/{}.".format(num_done, num_tasks)
4643 self._write_op_status(nslcmop_id, stage)
4644 while pending_tasks:
4645 new_error = None
4646 _timeout = timeout + time_start - time()
4647 done, pending_tasks = await asyncio.wait(
4648 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4649 )
4650 num_done += len(done)
4651 if not done: # Timeout
4652 for task in pending_tasks:
4653 new_error = created_tasks_info[task] + ": Timeout"
4654 error_detail_list.append(new_error)
4655 error_list.append(new_error)
4656 break
4657 for task in done:
4658 if task.cancelled():
4659 exc = "Cancelled"
4660 else:
4661 exc = task.exception()
4662 if exc:
4663 if isinstance(exc, asyncio.TimeoutError):
4664 exc = "Timeout"
4665 new_error = created_tasks_info[task] + ": {}".format(exc)
4666 error_list.append(created_tasks_info[task])
4667 error_detail_list.append(new_error)
4668 if isinstance(
4669 exc,
4670 (
4671 str,
4672 DbException,
4673 N2VCException,
4674 ROclient.ROClientException,
4675 LcmException,
4676 K8sException,
4677 NgRoException,
4678 ),
4679 ):
4680 self.logger.error(logging_text + new_error)
4681 else:
4682 exc_traceback = "".join(
4683 traceback.format_exception(None, exc, exc.__traceback__)
4684 )
4685 self.logger.error(
4686 logging_text
4687 + created_tasks_info[task]
4688 + " "
4689 + exc_traceback
4690 )
4691 else:
4692 self.logger.debug(
4693 logging_text + created_tasks_info[task] + ": Done"
4694 )
4695 stage[1] = "{}/{}.".format(num_done, num_tasks)
4696 if new_error:
4697 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4698 if nsr_id: # update also nsr
4699 self.update_db_2(
4700 "nsrs",
4701 nsr_id,
4702 {
4703 "errorDescription": "Error at: " + ", ".join(error_list),
4704 "errorDetail": ". ".join(error_detail_list),
4705 },
4706 )
4707 self._write_op_status(nslcmop_id, stage)
4708 return error_detail_list
4709
4710 @staticmethod
4711 def _map_primitive_params(primitive_desc, params, instantiation_params):
4712 """
4713 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4714 The default-value is used. If it is between < > it look for a value at instantiation_params
4715 :param primitive_desc: portion of VNFD/NSD that describes primitive
4716 :param params: Params provided by user
4717 :param instantiation_params: Instantiation params provided by user
4718 :return: a dictionary with the calculated params
4719 """
4720 calculated_params = {}
4721 for parameter in primitive_desc.get("parameter", ()):
4722 param_name = parameter["name"]
4723 if param_name in params:
4724 calculated_params[param_name] = params[param_name]
4725 elif "default-value" in parameter or "value" in parameter:
4726 if "value" in parameter:
4727 calculated_params[param_name] = parameter["value"]
4728 else:
4729 calculated_params[param_name] = parameter["default-value"]
4730 if (
4731 isinstance(calculated_params[param_name], str)
4732 and calculated_params[param_name].startswith("<")
4733 and calculated_params[param_name].endswith(">")
4734 ):
4735 if calculated_params[param_name][1:-1] in instantiation_params:
4736 calculated_params[param_name] = instantiation_params[
4737 calculated_params[param_name][1:-1]
4738 ]
4739 else:
4740 raise LcmException(
4741 "Parameter {} needed to execute primitive {} not provided".format(
4742 calculated_params[param_name], primitive_desc["name"]
4743 )
4744 )
4745 else:
4746 raise LcmException(
4747 "Parameter {} needed to execute primitive {} not provided".format(
4748 param_name, primitive_desc["name"]
4749 )
4750 )
4751
4752 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4753 calculated_params[param_name] = yaml.safe_dump(
4754 calculated_params[param_name], default_flow_style=True, width=256
4755 )
4756 elif isinstance(calculated_params[param_name], str) and calculated_params[
4757 param_name
4758 ].startswith("!!yaml "):
4759 calculated_params[param_name] = calculated_params[param_name][7:]
4760 if parameter.get("data-type") == "INTEGER":
4761 try:
4762 calculated_params[param_name] = int(calculated_params[param_name])
4763 except ValueError: # error converting string to int
4764 raise LcmException(
4765 "Parameter {} of primitive {} must be integer".format(
4766 param_name, primitive_desc["name"]
4767 )
4768 )
4769 elif parameter.get("data-type") == "BOOLEAN":
4770 calculated_params[param_name] = not (
4771 (str(calculated_params[param_name])).lower() == "false"
4772 )
4773
4774 # add always ns_config_info if primitive name is config
4775 if primitive_desc["name"] == "config":
4776 if "ns_config_info" in instantiation_params:
4777 calculated_params["ns_config_info"] = instantiation_params[
4778 "ns_config_info"
4779 ]
4780 return calculated_params
4781
4782 def _look_for_deployed_vca(
4783 self,
4784 deployed_vca,
4785 member_vnf_index,
4786 vdu_id,
4787 vdu_count_index,
4788 kdu_name=None,
4789 ee_descriptor_id=None,
4790 ):
4791 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4792 for vca in deployed_vca:
4793 if not vca:
4794 continue
4795 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4796 continue
4797 if (
4798 vdu_count_index is not None
4799 and vdu_count_index != vca["vdu_count_index"]
4800 ):
4801 continue
4802 if kdu_name and kdu_name != vca["kdu_name"]:
4803 continue
4804 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4805 continue
4806 break
4807 else:
4808 # vca_deployed not found
4809 raise LcmException(
4810 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4811 " is not deployed".format(
4812 member_vnf_index,
4813 vdu_id,
4814 vdu_count_index,
4815 kdu_name,
4816 ee_descriptor_id,
4817 )
4818 )
4819 # get ee_id
4820 ee_id = vca.get("ee_id")
4821 vca_type = vca.get(
4822 "type", "lxc_proxy_charm"
4823 ) # default value for backward compatibility - proxy charm
4824 if not ee_id:
4825 raise LcmException(
4826 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4827 "execution environment".format(
4828 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4829 )
4830 )
4831 return ee_id, vca_type
4832
4833 async def _ns_execute_primitive(
4834 self,
4835 ee_id,
4836 primitive,
4837 primitive_params,
4838 retries=0,
4839 retries_interval=30,
4840 timeout=None,
4841 vca_type=None,
4842 db_dict=None,
4843 vca_id: str = None,
4844 ) -> (str, str):
4845 try:
4846 if primitive == "config":
4847 primitive_params = {"params": primitive_params}
4848
4849 vca_type = vca_type or "lxc_proxy_charm"
4850
4851 while retries >= 0:
4852 try:
4853 output = await asyncio.wait_for(
4854 self.vca_map[vca_type].exec_primitive(
4855 ee_id=ee_id,
4856 primitive_name=primitive,
4857 params_dict=primitive_params,
4858 progress_timeout=self.timeout_progress_primitive,
4859 total_timeout=self.timeout_primitive,
4860 db_dict=db_dict,
4861 vca_id=vca_id,
4862 vca_type=vca_type,
4863 ),
4864 timeout=timeout or self.timeout_primitive,
4865 )
4866 # execution was OK
4867 break
4868 except asyncio.CancelledError:
4869 raise
4870 except Exception as e: # asyncio.TimeoutError
4871 if isinstance(e, asyncio.TimeoutError):
4872 e = "Timeout"
4873 retries -= 1
4874 if retries >= 0:
4875 self.logger.debug(
4876 "Error executing action {} on {} -> {}".format(
4877 primitive, ee_id, e
4878 )
4879 )
4880 # wait and retry
4881 await asyncio.sleep(retries_interval, loop=self.loop)
4882 else:
4883 return "FAILED", str(e)
4884
4885 return "COMPLETED", output
4886
4887 except (LcmException, asyncio.CancelledError):
4888 raise
4889 except Exception as e:
4890 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4891
4892 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4893 """
4894 Updating the vca_status with latest juju information in nsrs record
4895 :param: nsr_id: Id of the nsr
4896 :param: nslcmop_id: Id of the nslcmop
4897 :return: None
4898 """
4899
4900 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4901 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4902 vca_id = self.get_vca_id({}, db_nsr)
4903 if db_nsr["_admin"]["deployed"]["K8s"]:
4904 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4905 cluster_uuid, kdu_instance, cluster_type = (
4906 k8s["k8scluster-uuid"],
4907 k8s["kdu-instance"],
4908 k8s["k8scluster-type"],
4909 )
4910 await self._on_update_k8s_db(
4911 cluster_uuid=cluster_uuid,
4912 kdu_instance=kdu_instance,
4913 filter={"_id": nsr_id},
4914 vca_id=vca_id,
4915 cluster_type=cluster_type,
4916 )
4917 else:
4918 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4919 table, filter = "nsrs", {"_id": nsr_id}
4920 path = "_admin.deployed.VCA.{}.".format(vca_index)
4921 await self._on_update_n2vc_db(table, filter, path, {})
4922
4923 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4924 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4925
4926 async def action(self, nsr_id, nslcmop_id):
4927 # Try to lock HA task here
4928 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4929 if not task_is_locked_by_me:
4930 return
4931
4932 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4933 self.logger.debug(logging_text + "Enter")
4934 # get all needed from database
4935 db_nsr = None
4936 db_nslcmop = None
4937 db_nsr_update = {}
4938 db_nslcmop_update = {}
4939 nslcmop_operation_state = None
4940 error_description_nslcmop = None
4941 exc = None
4942 try:
4943 # wait for any previous tasks in process
4944 step = "Waiting for previous operations to terminate"
4945 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4946
4947 self._write_ns_status(
4948 nsr_id=nsr_id,
4949 ns_state=None,
4950 current_operation="RUNNING ACTION",
4951 current_operation_id=nslcmop_id,
4952 )
4953
4954 step = "Getting information from database"
4955 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4956 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4957 if db_nslcmop["operationParams"].get("primitive_params"):
4958 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4959 db_nslcmop["operationParams"]["primitive_params"]
4960 )
4961
4962 nsr_deployed = db_nsr["_admin"].get("deployed")
4963 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4964 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4965 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4966 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4967 primitive = db_nslcmop["operationParams"]["primitive"]
4968 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4969 timeout_ns_action = db_nslcmop["operationParams"].get(
4970 "timeout_ns_action", self.timeout_primitive
4971 )
4972
4973 if vnf_index:
4974 step = "Getting vnfr from database"
4975 db_vnfr = self.db.get_one(
4976 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4977 )
4978 if db_vnfr.get("kdur"):
4979 kdur_list = []
4980 for kdur in db_vnfr["kdur"]:
4981 if kdur.get("additionalParams"):
4982 kdur["additionalParams"] = json.loads(
4983 kdur["additionalParams"]
4984 )
4985 kdur_list.append(kdur)
4986 db_vnfr["kdur"] = kdur_list
4987 step = "Getting vnfd from database"
4988 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4989 else:
4990 step = "Getting nsd from database"
4991 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4992
4993 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4994 # for backward compatibility
4995 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4996 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4997 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4998 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4999
5000 # look for primitive
5001 config_primitive_desc = descriptor_configuration = None
5002 if vdu_id:
5003 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5004 elif kdu_name:
5005 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5006 elif vnf_index:
5007 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5008 else:
5009 descriptor_configuration = db_nsd.get("ns-configuration")
5010
5011 if descriptor_configuration and descriptor_configuration.get(
5012 "config-primitive"
5013 ):
5014 for config_primitive in descriptor_configuration["config-primitive"]:
5015 if config_primitive["name"] == primitive:
5016 config_primitive_desc = config_primitive
5017 break
5018
5019 if not config_primitive_desc:
5020 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5021 raise LcmException(
5022 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5023 primitive
5024 )
5025 )
5026 primitive_name = primitive
5027 ee_descriptor_id = None
5028 else:
5029 primitive_name = config_primitive_desc.get(
5030 "execution-environment-primitive", primitive
5031 )
5032 ee_descriptor_id = config_primitive_desc.get(
5033 "execution-environment-ref"
5034 )
5035
5036 if vnf_index:
5037 if vdu_id:
5038 vdur = next(
5039 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5040 )
5041 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5042 elif kdu_name:
5043 kdur = next(
5044 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5045 )
5046 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5047 else:
5048 desc_params = parse_yaml_strings(
5049 db_vnfr.get("additionalParamsForVnf")
5050 )
5051 else:
5052 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5053 if kdu_name and get_configuration(db_vnfd, kdu_name):
5054 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5055 actions = set()
5056 for primitive in kdu_configuration.get("initial-config-primitive", []):
5057 actions.add(primitive["name"])
5058 for primitive in kdu_configuration.get("config-primitive", []):
5059 actions.add(primitive["name"])
5060 kdu_action = True if primitive_name in actions else False
5061
5062 # TODO check if ns is in a proper status
5063 if kdu_name and (
5064 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5065 ):
5066 # kdur and desc_params already set from before
5067 if primitive_params:
5068 desc_params.update(primitive_params)
5069 # TODO Check if we will need something at vnf level
5070 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5071 if (
5072 kdu_name == kdu["kdu-name"]
5073 and kdu["member-vnf-index"] == vnf_index
5074 ):
5075 break
5076 else:
5077 raise LcmException(
5078 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5079 )
5080
5081 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5082 msg = "unknown k8scluster-type '{}'".format(
5083 kdu.get("k8scluster-type")
5084 )
5085 raise LcmException(msg)
5086
5087 db_dict = {
5088 "collection": "nsrs",
5089 "filter": {"_id": nsr_id},
5090 "path": "_admin.deployed.K8s.{}".format(index),
5091 }
5092 self.logger.debug(
5093 logging_text
5094 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5095 )
5096 step = "Executing kdu {}".format(primitive_name)
5097 if primitive_name == "upgrade":
5098 if desc_params.get("kdu_model"):
5099 kdu_model = desc_params.get("kdu_model")
5100 del desc_params["kdu_model"]
5101 else:
5102 kdu_model = kdu.get("kdu-model")
5103 parts = kdu_model.split(sep=":")
5104 if len(parts) == 2:
5105 kdu_model = parts[0]
5106
5107 detailed_status = await asyncio.wait_for(
5108 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5109 cluster_uuid=kdu.get("k8scluster-uuid"),
5110 kdu_instance=kdu.get("kdu-instance"),
5111 atomic=True,
5112 kdu_model=kdu_model,
5113 params=desc_params,
5114 db_dict=db_dict,
5115 timeout=timeout_ns_action,
5116 ),
5117 timeout=timeout_ns_action + 10,
5118 )
5119 self.logger.debug(
5120 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5121 )
5122 elif primitive_name == "rollback":
5123 detailed_status = await asyncio.wait_for(
5124 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5125 cluster_uuid=kdu.get("k8scluster-uuid"),
5126 kdu_instance=kdu.get("kdu-instance"),
5127 db_dict=db_dict,
5128 ),
5129 timeout=timeout_ns_action,
5130 )
5131 elif primitive_name == "status":
5132 detailed_status = await asyncio.wait_for(
5133 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5134 cluster_uuid=kdu.get("k8scluster-uuid"),
5135 kdu_instance=kdu.get("kdu-instance"),
5136 vca_id=vca_id,
5137 ),
5138 timeout=timeout_ns_action,
5139 )
5140 else:
5141 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5142 kdu["kdu-name"], nsr_id
5143 )
5144 params = self._map_primitive_params(
5145 config_primitive_desc, primitive_params, desc_params
5146 )
5147
5148 detailed_status = await asyncio.wait_for(
5149 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5150 cluster_uuid=kdu.get("k8scluster-uuid"),
5151 kdu_instance=kdu_instance,
5152 primitive_name=primitive_name,
5153 params=params,
5154 db_dict=db_dict,
5155 timeout=timeout_ns_action,
5156 vca_id=vca_id,
5157 ),
5158 timeout=timeout_ns_action,
5159 )
5160
5161 if detailed_status:
5162 nslcmop_operation_state = "COMPLETED"
5163 else:
5164 detailed_status = ""
5165 nslcmop_operation_state = "FAILED"
5166 else:
5167 ee_id, vca_type = self._look_for_deployed_vca(
5168 nsr_deployed["VCA"],
5169 member_vnf_index=vnf_index,
5170 vdu_id=vdu_id,
5171 vdu_count_index=vdu_count_index,
5172 ee_descriptor_id=ee_descriptor_id,
5173 )
5174 for vca_index, vca_deployed in enumerate(
5175 db_nsr["_admin"]["deployed"]["VCA"]
5176 ):
5177 if vca_deployed.get("member-vnf-index") == vnf_index:
5178 db_dict = {
5179 "collection": "nsrs",
5180 "filter": {"_id": nsr_id},
5181 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5182 }
5183 break
5184 (
5185 nslcmop_operation_state,
5186 detailed_status,
5187 ) = await self._ns_execute_primitive(
5188 ee_id,
5189 primitive=primitive_name,
5190 primitive_params=self._map_primitive_params(
5191 config_primitive_desc, primitive_params, desc_params
5192 ),
5193 timeout=timeout_ns_action,
5194 vca_type=vca_type,
5195 db_dict=db_dict,
5196 vca_id=vca_id,
5197 )
5198
5199 db_nslcmop_update["detailed-status"] = detailed_status
5200 error_description_nslcmop = (
5201 detailed_status if nslcmop_operation_state == "FAILED" else ""
5202 )
5203 self.logger.debug(
5204 logging_text
5205 + " task Done with result {} {}".format(
5206 nslcmop_operation_state, detailed_status
5207 )
5208 )
5209 return # database update is called inside finally
5210
5211 except (DbException, LcmException, N2VCException, K8sException) as e:
5212 self.logger.error(logging_text + "Exit Exception {}".format(e))
5213 exc = e
5214 except asyncio.CancelledError:
5215 self.logger.error(
5216 logging_text + "Cancelled Exception while '{}'".format(step)
5217 )
5218 exc = "Operation was cancelled"
5219 except asyncio.TimeoutError:
5220 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5221 exc = "Timeout"
5222 except Exception as e:
5223 exc = traceback.format_exc()
5224 self.logger.critical(
5225 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5226 exc_info=True,
5227 )
5228 finally:
5229 if exc:
5230 db_nslcmop_update[
5231 "detailed-status"
5232 ] = (
5233 detailed_status
5234 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5235 nslcmop_operation_state = "FAILED"
5236 if db_nsr:
5237 self._write_ns_status(
5238 nsr_id=nsr_id,
5239 ns_state=db_nsr[
5240 "nsState"
5241 ], # TODO check if degraded. For the moment use previous status
5242 current_operation="IDLE",
5243 current_operation_id=None,
5244 # error_description=error_description_nsr,
5245 # error_detail=error_detail,
5246 other_update=db_nsr_update,
5247 )
5248
5249 self._write_op_status(
5250 op_id=nslcmop_id,
5251 stage="",
5252 error_message=error_description_nslcmop,
5253 operation_state=nslcmop_operation_state,
5254 other_update=db_nslcmop_update,
5255 )
5256
5257 if nslcmop_operation_state:
5258 try:
5259 await self.msg.aiowrite(
5260 "ns",
5261 "actioned",
5262 {
5263 "nsr_id": nsr_id,
5264 "nslcmop_id": nslcmop_id,
5265 "operationState": nslcmop_operation_state,
5266 },
5267 loop=self.loop,
5268 )
5269 except Exception as e:
5270 self.logger.error(
5271 logging_text + "kafka_write notification Exception {}".format(e)
5272 )
5273 self.logger.debug(logging_text + "Exit")
5274 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5275 return nslcmop_operation_state, detailed_status
5276
5277 async def scale(self, nsr_id, nslcmop_id):
5278 # Try to lock HA task here
5279 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5280 if not task_is_locked_by_me:
5281 return
5282
5283 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5284 stage = ["", "", ""]
5285 tasks_dict_info = {}
5286 # ^ stage, step, VIM progress
5287 self.logger.debug(logging_text + "Enter")
5288 # get all needed from database
5289 db_nsr = None
5290 db_nslcmop_update = {}
5291 db_nsr_update = {}
5292 exc = None
5293 # in case of error, indicates what part of scale was failed to put nsr at error status
5294 scale_process = None
5295 old_operational_status = ""
5296 old_config_status = ""
5297 nsi_id = None
5298 try:
5299 # wait for any previous tasks in process
5300 step = "Waiting for previous operations to terminate"
5301 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5302 self._write_ns_status(
5303 nsr_id=nsr_id,
5304 ns_state=None,
5305 current_operation="SCALING",
5306 current_operation_id=nslcmop_id,
5307 )
5308
5309 step = "Getting nslcmop from database"
5310 self.logger.debug(
5311 step + " after having waited for previous tasks to be completed"
5312 )
5313 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5314
5315 step = "Getting nsr from database"
5316 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5317 old_operational_status = db_nsr["operational-status"]
5318 old_config_status = db_nsr["config-status"]
5319
5320 step = "Parsing scaling parameters"
5321 db_nsr_update["operational-status"] = "scaling"
5322 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5323 nsr_deployed = db_nsr["_admin"].get("deployed")
5324
5325 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5326 "scaleByStepData"
5327 ]["member-vnf-index"]
5328 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5329 "scaleByStepData"
5330 ]["scaling-group-descriptor"]
5331 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5332 # for backward compatibility
5333 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5334 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5335 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5336 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5337
5338 step = "Getting vnfr from database"
5339 db_vnfr = self.db.get_one(
5340 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5341 )
5342
5343 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5344
5345 step = "Getting vnfd from database"
5346 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5347
5348 base_folder = db_vnfd["_admin"]["storage"]
5349
5350 step = "Getting scaling-group-descriptor"
5351 scaling_descriptor = find_in_list(
5352 get_scaling_aspect(db_vnfd),
5353 lambda scale_desc: scale_desc["name"] == scaling_group,
5354 )
5355 if not scaling_descriptor:
5356 raise LcmException(
5357 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5358 "at vnfd:scaling-group-descriptor".format(scaling_group)
5359 )
5360
5361 step = "Sending scale order to VIM"
5362 # TODO check if ns is in a proper status
5363 nb_scale_op = 0
5364 if not db_nsr["_admin"].get("scaling-group"):
5365 self.update_db_2(
5366 "nsrs",
5367 nsr_id,
5368 {
5369 "_admin.scaling-group": [
5370 {"name": scaling_group, "nb-scale-op": 0}
5371 ]
5372 },
5373 )
5374 admin_scale_index = 0
5375 else:
5376 for admin_scale_index, admin_scale_info in enumerate(
5377 db_nsr["_admin"]["scaling-group"]
5378 ):
5379 if admin_scale_info["name"] == scaling_group:
5380 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5381 break
5382 else: # not found, set index one plus last element and add new entry with the name
5383 admin_scale_index += 1
5384 db_nsr_update[
5385 "_admin.scaling-group.{}.name".format(admin_scale_index)
5386 ] = scaling_group
5387
5388 vca_scaling_info = []
5389 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5390 if scaling_type == "SCALE_OUT":
5391 if "aspect-delta-details" not in scaling_descriptor:
5392 raise LcmException(
5393 "Aspect delta details not fount in scaling descriptor {}".format(
5394 scaling_descriptor["name"]
5395 )
5396 )
5397 # count if max-instance-count is reached
5398 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5399
5400 scaling_info["scaling_direction"] = "OUT"
5401 scaling_info["vdu-create"] = {}
5402 scaling_info["kdu-create"] = {}
5403 for delta in deltas:
5404 for vdu_delta in delta.get("vdu-delta", {}):
5405 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5406 # vdu_index also provides the number of instance of the targeted vdu
5407 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5408 cloud_init_text = self._get_vdu_cloud_init_content(
5409 vdud, db_vnfd
5410 )
5411 if cloud_init_text:
5412 additional_params = (
5413 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5414 or {}
5415 )
5416 cloud_init_list = []
5417
5418 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5419 max_instance_count = 10
5420 if vdu_profile and "max-number-of-instances" in vdu_profile:
5421 max_instance_count = vdu_profile.get(
5422 "max-number-of-instances", 10
5423 )
5424
5425 default_instance_num = get_number_of_instances(
5426 db_vnfd, vdud["id"]
5427 )
5428 instances_number = vdu_delta.get("number-of-instances", 1)
5429 nb_scale_op += instances_number
5430
5431 new_instance_count = nb_scale_op + default_instance_num
5432 # Control if new count is over max and vdu count is less than max.
5433 # Then assign new instance count
5434 if new_instance_count > max_instance_count > vdu_count:
5435 instances_number = new_instance_count - max_instance_count
5436 else:
5437 instances_number = instances_number
5438
5439 if new_instance_count > max_instance_count:
5440 raise LcmException(
5441 "reached the limit of {} (max-instance-count) "
5442 "scaling-out operations for the "
5443 "scaling-group-descriptor '{}'".format(
5444 nb_scale_op, scaling_group
5445 )
5446 )
5447 for x in range(vdu_delta.get("number-of-instances", 1)):
5448 if cloud_init_text:
5449 # TODO Information of its own ip is not available because db_vnfr is not updated.
5450 additional_params["OSM"] = get_osm_params(
5451 db_vnfr, vdu_delta["id"], vdu_index + x
5452 )
5453 cloud_init_list.append(
5454 self._parse_cloud_init(
5455 cloud_init_text,
5456 additional_params,
5457 db_vnfd["id"],
5458 vdud["id"],
5459 )
5460 )
5461 vca_scaling_info.append(
5462 {
5463 "osm_vdu_id": vdu_delta["id"],
5464 "member-vnf-index": vnf_index,
5465 "type": "create",
5466 "vdu_index": vdu_index + x,
5467 }
5468 )
5469 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5470 for kdu_delta in delta.get("kdu-resource-delta", {}):
5471 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5472 kdu_name = kdu_profile["kdu-name"]
5473 resource_name = kdu_profile["resource-name"]
5474
5475 # Might have different kdus in the same delta
5476 # Should have list for each kdu
5477 if not scaling_info["kdu-create"].get(kdu_name, None):
5478 scaling_info["kdu-create"][kdu_name] = []
5479
5480 kdur = get_kdur(db_vnfr, kdu_name)
5481 if kdur.get("helm-chart"):
5482 k8s_cluster_type = "helm-chart-v3"
5483 self.logger.debug("kdur: {}".format(kdur))
5484 if (
5485 kdur.get("helm-version")
5486 and kdur.get("helm-version") == "v2"
5487 ):
5488 k8s_cluster_type = "helm-chart"
5489 raise NotImplementedError
5490 elif kdur.get("juju-bundle"):
5491 k8s_cluster_type = "juju-bundle"
5492 else:
5493 raise LcmException(
5494 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5495 "juju-bundle. Maybe an old NBI version is running".format(
5496 db_vnfr["member-vnf-index-ref"], kdu_name
5497 )
5498 )
5499
5500 max_instance_count = 10
5501 if kdu_profile and "max-number-of-instances" in kdu_profile:
5502 max_instance_count = kdu_profile.get(
5503 "max-number-of-instances", 10
5504 )
5505
5506 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5507 deployed_kdu, _ = get_deployed_kdu(
5508 nsr_deployed, kdu_name, vnf_index
5509 )
5510 if deployed_kdu is None:
5511 raise LcmException(
5512 "KDU '{}' for vnf '{}' not deployed".format(
5513 kdu_name, vnf_index
5514 )
5515 )
5516 kdu_instance = deployed_kdu.get("kdu-instance")
5517 instance_num = await self.k8scluster_map[
5518 k8s_cluster_type
5519 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5520 kdu_replica_count = instance_num + kdu_delta.get(
5521 "number-of-instances", 1
5522 )
5523
5524 # Control if new count is over max and instance_num is less than max.
5525 # Then assign max instance number to kdu replica count
5526 if kdu_replica_count > max_instance_count > instance_num:
5527 kdu_replica_count = max_instance_count
5528 if kdu_replica_count > max_instance_count:
5529 raise LcmException(
5530 "reached the limit of {} (max-instance-count) "
5531 "scaling-out operations for the "
5532 "scaling-group-descriptor '{}'".format(
5533 instance_num, scaling_group
5534 )
5535 )
5536
5537 for x in range(kdu_delta.get("number-of-instances", 1)):
5538 vca_scaling_info.append(
5539 {
5540 "osm_kdu_id": kdu_name,
5541 "member-vnf-index": vnf_index,
5542 "type": "create",
5543 "kdu_index": instance_num + x - 1,
5544 }
5545 )
5546 scaling_info["kdu-create"][kdu_name].append(
5547 {
5548 "member-vnf-index": vnf_index,
5549 "type": "create",
5550 "k8s-cluster-type": k8s_cluster_type,
5551 "resource-name": resource_name,
5552 "scale": kdu_replica_count,
5553 }
5554 )
5555 elif scaling_type == "SCALE_IN":
5556 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5557
5558 scaling_info["scaling_direction"] = "IN"
5559 scaling_info["vdu-delete"] = {}
5560 scaling_info["kdu-delete"] = {}
5561
5562 for delta in deltas:
5563 for vdu_delta in delta.get("vdu-delta", {}):
5564 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5565 min_instance_count = 0
5566 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5567 if vdu_profile and "min-number-of-instances" in vdu_profile:
5568 min_instance_count = vdu_profile["min-number-of-instances"]
5569
5570 default_instance_num = get_number_of_instances(
5571 db_vnfd, vdu_delta["id"]
5572 )
5573 instance_num = vdu_delta.get("number-of-instances", 1)
5574 nb_scale_op -= instance_num
5575
5576 new_instance_count = nb_scale_op + default_instance_num
5577
5578 if new_instance_count < min_instance_count < vdu_count:
5579 instances_number = min_instance_count - new_instance_count
5580 else:
5581 instances_number = instance_num
5582
5583 if new_instance_count < min_instance_count:
5584 raise LcmException(
5585 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5586 "scaling-group-descriptor '{}'".format(
5587 nb_scale_op, scaling_group
5588 )
5589 )
5590 for x in range(vdu_delta.get("number-of-instances", 1)):
5591 vca_scaling_info.append(
5592 {
5593 "osm_vdu_id": vdu_delta["id"],
5594 "member-vnf-index": vnf_index,
5595 "type": "delete",
5596 "vdu_index": vdu_index - 1 - x,
5597 }
5598 )
5599 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5600 for kdu_delta in delta.get("kdu-resource-delta", {}):
5601 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5602 kdu_name = kdu_profile["kdu-name"]
5603 resource_name = kdu_profile["resource-name"]
5604
5605 if not scaling_info["kdu-delete"].get(kdu_name, None):
5606 scaling_info["kdu-delete"][kdu_name] = []
5607
5608 kdur = get_kdur(db_vnfr, kdu_name)
5609 if kdur.get("helm-chart"):
5610 k8s_cluster_type = "helm-chart-v3"
5611 self.logger.debug("kdur: {}".format(kdur))
5612 if (
5613 kdur.get("helm-version")
5614 and kdur.get("helm-version") == "v2"
5615 ):
5616 k8s_cluster_type = "helm-chart"
5617 raise NotImplementedError
5618 elif kdur.get("juju-bundle"):
5619 k8s_cluster_type = "juju-bundle"
5620 else:
5621 raise LcmException(
5622 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5623 "juju-bundle. Maybe an old NBI version is running".format(
5624 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5625 )
5626 )
5627
5628 min_instance_count = 0
5629 if kdu_profile and "min-number-of-instances" in kdu_profile:
5630 min_instance_count = kdu_profile["min-number-of-instances"]
5631
5632 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5633 deployed_kdu, _ = get_deployed_kdu(
5634 nsr_deployed, kdu_name, vnf_index
5635 )
5636 if deployed_kdu is None:
5637 raise LcmException(
5638 "KDU '{}' for vnf '{}' not deployed".format(
5639 kdu_name, vnf_index
5640 )
5641 )
5642 kdu_instance = deployed_kdu.get("kdu-instance")
5643 instance_num = await self.k8scluster_map[
5644 k8s_cluster_type
5645 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5646 kdu_replica_count = instance_num - kdu_delta.get(
5647 "number-of-instances", 1
5648 )
5649
5650 if kdu_replica_count < min_instance_count < instance_num:
5651 kdu_replica_count = min_instance_count
5652 if kdu_replica_count < min_instance_count:
5653 raise LcmException(
5654 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5655 "scaling-group-descriptor '{}'".format(
5656 instance_num, scaling_group
5657 )
5658 )
5659
5660 for x in range(kdu_delta.get("number-of-instances", 1)):
5661 vca_scaling_info.append(
5662 {
5663 "osm_kdu_id": kdu_name,
5664 "member-vnf-index": vnf_index,
5665 "type": "delete",
5666 "kdu_index": instance_num - x - 1,
5667 }
5668 )
5669 scaling_info["kdu-delete"][kdu_name].append(
5670 {
5671 "member-vnf-index": vnf_index,
5672 "type": "delete",
5673 "k8s-cluster-type": k8s_cluster_type,
5674 "resource-name": resource_name,
5675 "scale": kdu_replica_count,
5676 }
5677 )
5678
5679 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5680 vdu_delete = copy(scaling_info.get("vdu-delete"))
5681 if scaling_info["scaling_direction"] == "IN":
5682 for vdur in reversed(db_vnfr["vdur"]):
5683 if vdu_delete.get(vdur["vdu-id-ref"]):
5684 vdu_delete[vdur["vdu-id-ref"]] -= 1
5685 scaling_info["vdu"].append(
5686 {
5687 "name": vdur.get("name") or vdur.get("vdu-name"),
5688 "vdu_id": vdur["vdu-id-ref"],
5689 "interface": [],
5690 }
5691 )
5692 for interface in vdur["interfaces"]:
5693 scaling_info["vdu"][-1]["interface"].append(
5694 {
5695 "name": interface["name"],
5696 "ip_address": interface["ip-address"],
5697 "mac_address": interface.get("mac-address"),
5698 }
5699 )
5700 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5701
5702 # PRE-SCALE BEGIN
5703 step = "Executing pre-scale vnf-config-primitive"
5704 if scaling_descriptor.get("scaling-config-action"):
5705 for scaling_config_action in scaling_descriptor[
5706 "scaling-config-action"
5707 ]:
5708 if (
5709 scaling_config_action.get("trigger") == "pre-scale-in"
5710 and scaling_type == "SCALE_IN"
5711 ) or (
5712 scaling_config_action.get("trigger") == "pre-scale-out"
5713 and scaling_type == "SCALE_OUT"
5714 ):
5715 vnf_config_primitive = scaling_config_action[
5716 "vnf-config-primitive-name-ref"
5717 ]
5718 step = db_nslcmop_update[
5719 "detailed-status"
5720 ] = "executing pre-scale scaling-config-action '{}'".format(
5721 vnf_config_primitive
5722 )
5723
5724 # look for primitive
5725 for config_primitive in (
5726 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5727 ).get("config-primitive", ()):
5728 if config_primitive["name"] == vnf_config_primitive:
5729 break
5730 else:
5731 raise LcmException(
5732 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5733 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5734 "primitive".format(scaling_group, vnf_config_primitive)
5735 )
5736
5737 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5738 if db_vnfr.get("additionalParamsForVnf"):
5739 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5740
5741 scale_process = "VCA"
5742 db_nsr_update["config-status"] = "configuring pre-scaling"
5743 primitive_params = self._map_primitive_params(
5744 config_primitive, {}, vnfr_params
5745 )
5746
5747 # Pre-scale retry check: Check if this sub-operation has been executed before
5748 op_index = self._check_or_add_scale_suboperation(
5749 db_nslcmop,
5750 vnf_index,
5751 vnf_config_primitive,
5752 primitive_params,
5753 "PRE-SCALE",
5754 )
5755 if op_index == self.SUBOPERATION_STATUS_SKIP:
5756 # Skip sub-operation
5757 result = "COMPLETED"
5758 result_detail = "Done"
5759 self.logger.debug(
5760 logging_text
5761 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5762 vnf_config_primitive, result, result_detail
5763 )
5764 )
5765 else:
5766 if op_index == self.SUBOPERATION_STATUS_NEW:
5767 # New sub-operation: Get index of this sub-operation
5768 op_index = (
5769 len(db_nslcmop.get("_admin", {}).get("operations"))
5770 - 1
5771 )
5772 self.logger.debug(
5773 logging_text
5774 + "vnf_config_primitive={} New sub-operation".format(
5775 vnf_config_primitive
5776 )
5777 )
5778 else:
5779 # retry: Get registered params for this existing sub-operation
5780 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5781 op_index
5782 ]
5783 vnf_index = op.get("member_vnf_index")
5784 vnf_config_primitive = op.get("primitive")
5785 primitive_params = op.get("primitive_params")
5786 self.logger.debug(
5787 logging_text
5788 + "vnf_config_primitive={} Sub-operation retry".format(
5789 vnf_config_primitive
5790 )
5791 )
5792 # Execute the primitive, either with new (first-time) or registered (reintent) args
5793 ee_descriptor_id = config_primitive.get(
5794 "execution-environment-ref"
5795 )
5796 primitive_name = config_primitive.get(
5797 "execution-environment-primitive", vnf_config_primitive
5798 )
5799 ee_id, vca_type = self._look_for_deployed_vca(
5800 nsr_deployed["VCA"],
5801 member_vnf_index=vnf_index,
5802 vdu_id=None,
5803 vdu_count_index=None,
5804 ee_descriptor_id=ee_descriptor_id,
5805 )
5806 result, result_detail = await self._ns_execute_primitive(
5807 ee_id,
5808 primitive_name,
5809 primitive_params,
5810 vca_type=vca_type,
5811 vca_id=vca_id,
5812 )
5813 self.logger.debug(
5814 logging_text
5815 + "vnf_config_primitive={} Done with result {} {}".format(
5816 vnf_config_primitive, result, result_detail
5817 )
5818 )
5819 # Update operationState = COMPLETED | FAILED
5820 self._update_suboperation_status(
5821 db_nslcmop, op_index, result, result_detail
5822 )
5823
5824 if result == "FAILED":
5825 raise LcmException(result_detail)
5826 db_nsr_update["config-status"] = old_config_status
5827 scale_process = None
5828 # PRE-SCALE END
5829
5830 db_nsr_update[
5831 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5832 ] = nb_scale_op
5833 db_nsr_update[
5834 "_admin.scaling-group.{}.time".format(admin_scale_index)
5835 ] = time()
5836
5837 # SCALE-IN VCA - BEGIN
5838 if vca_scaling_info:
5839 step = db_nslcmop_update[
5840 "detailed-status"
5841 ] = "Deleting the execution environments"
5842 scale_process = "VCA"
5843 for vca_info in vca_scaling_info:
5844 if vca_info["type"] == "delete":
5845 member_vnf_index = str(vca_info["member-vnf-index"])
5846 self.logger.debug(
5847 logging_text + "vdu info: {}".format(vca_info)
5848 )
5849 if vca_info.get("osm_vdu_id"):
5850 vdu_id = vca_info["osm_vdu_id"]
5851 vdu_index = int(vca_info["vdu_index"])
5852 stage[
5853 1
5854 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5855 member_vnf_index, vdu_id, vdu_index
5856 )
5857 else:
5858 vdu_index = 0
5859 kdu_id = vca_info["osm_kdu_id"]
5860 stage[
5861 1
5862 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5863 member_vnf_index, kdu_id, vdu_index
5864 )
5865 stage[2] = step = "Scaling in VCA"
5866 self._write_op_status(op_id=nslcmop_id, stage=stage)
5867 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5868 config_update = db_nsr["configurationStatus"]
5869 for vca_index, vca in enumerate(vca_update):
5870 if (
5871 (vca or vca.get("ee_id"))
5872 and vca["member-vnf-index"] == member_vnf_index
5873 and vca["vdu_count_index"] == vdu_index
5874 ):
5875 if vca.get("vdu_id"):
5876 config_descriptor = get_configuration(
5877 db_vnfd, vca.get("vdu_id")
5878 )
5879 elif vca.get("kdu_name"):
5880 config_descriptor = get_configuration(
5881 db_vnfd, vca.get("kdu_name")
5882 )
5883 else:
5884 config_descriptor = get_configuration(
5885 db_vnfd, db_vnfd["id"]
5886 )
5887 operation_params = (
5888 db_nslcmop.get("operationParams") or {}
5889 )
5890 exec_terminate_primitives = not operation_params.get(
5891 "skip_terminate_primitives"
5892 ) and vca.get("needed_terminate")
5893 task = asyncio.ensure_future(
5894 asyncio.wait_for(
5895 self.destroy_N2VC(
5896 logging_text,
5897 db_nslcmop,
5898 vca,
5899 config_descriptor,
5900 vca_index,
5901 destroy_ee=True,
5902 exec_primitives=exec_terminate_primitives,
5903 scaling_in=True,
5904 vca_id=vca_id,
5905 ),
5906 timeout=self.timeout_charm_delete,
5907 )
5908 )
5909 tasks_dict_info[task] = "Terminating VCA {}".format(
5910 vca.get("ee_id")
5911 )
5912 del vca_update[vca_index]
5913 del config_update[vca_index]
5914 # wait for pending tasks of terminate primitives
5915 if tasks_dict_info:
5916 self.logger.debug(
5917 logging_text
5918 + "Waiting for tasks {}".format(
5919 list(tasks_dict_info.keys())
5920 )
5921 )
5922 error_list = await self._wait_for_tasks(
5923 logging_text,
5924 tasks_dict_info,
5925 min(
5926 self.timeout_charm_delete, self.timeout_ns_terminate
5927 ),
5928 stage,
5929 nslcmop_id,
5930 )
5931 tasks_dict_info.clear()
5932 if error_list:
5933 raise LcmException("; ".join(error_list))
5934
5935 db_vca_and_config_update = {
5936 "_admin.deployed.VCA": vca_update,
5937 "configurationStatus": config_update,
5938 }
5939 self.update_db_2(
5940 "nsrs", db_nsr["_id"], db_vca_and_config_update
5941 )
5942 scale_process = None
5943 # SCALE-IN VCA - END
5944
5945 # SCALE RO - BEGIN
5946 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5947 scale_process = "RO"
5948 if self.ro_config.get("ng"):
5949 await self._scale_ng_ro(
5950 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5951 )
5952 scaling_info.pop("vdu-create", None)
5953 scaling_info.pop("vdu-delete", None)
5954
5955 scale_process = None
5956 # SCALE RO - END
5957
5958 # SCALE KDU - BEGIN
5959 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5960 scale_process = "KDU"
5961 await self._scale_kdu(
5962 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5963 )
5964 scaling_info.pop("kdu-create", None)
5965 scaling_info.pop("kdu-delete", None)
5966
5967 scale_process = None
5968 # SCALE KDU - END
5969
5970 if db_nsr_update:
5971 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5972
5973 # SCALE-UP VCA - BEGIN
5974 if vca_scaling_info:
5975 step = db_nslcmop_update[
5976 "detailed-status"
5977 ] = "Creating new execution environments"
5978 scale_process = "VCA"
5979 for vca_info in vca_scaling_info:
5980 if vca_info["type"] == "create":
5981 member_vnf_index = str(vca_info["member-vnf-index"])
5982 self.logger.debug(
5983 logging_text + "vdu info: {}".format(vca_info)
5984 )
5985 vnfd_id = db_vnfr["vnfd-ref"]
5986 if vca_info.get("osm_vdu_id"):
5987 vdu_index = int(vca_info["vdu_index"])
5988 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5989 if db_vnfr.get("additionalParamsForVnf"):
5990 deploy_params.update(
5991 parse_yaml_strings(
5992 db_vnfr["additionalParamsForVnf"].copy()
5993 )
5994 )
5995 descriptor_config = get_configuration(
5996 db_vnfd, db_vnfd["id"]
5997 )
5998 if descriptor_config:
5999 vdu_id = None
6000 vdu_name = None
6001 kdu_name = None
6002 self._deploy_n2vc(
6003 logging_text=logging_text
6004 + "member_vnf_index={} ".format(member_vnf_index),
6005 db_nsr=db_nsr,
6006 db_vnfr=db_vnfr,
6007 nslcmop_id=nslcmop_id,
6008 nsr_id=nsr_id,
6009 nsi_id=nsi_id,
6010 vnfd_id=vnfd_id,
6011 vdu_id=vdu_id,
6012 kdu_name=kdu_name,
6013 member_vnf_index=member_vnf_index,
6014 vdu_index=vdu_index,
6015 vdu_name=vdu_name,
6016 deploy_params=deploy_params,
6017 descriptor_config=descriptor_config,
6018 base_folder=base_folder,
6019 task_instantiation_info=tasks_dict_info,
6020 stage=stage,
6021 )
6022 vdu_id = vca_info["osm_vdu_id"]
6023 vdur = find_in_list(
6024 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6025 )
6026 descriptor_config = get_configuration(db_vnfd, vdu_id)
6027 if vdur.get("additionalParams"):
6028 deploy_params_vdu = parse_yaml_strings(
6029 vdur["additionalParams"]
6030 )
6031 else:
6032 deploy_params_vdu = deploy_params
6033 deploy_params_vdu["OSM"] = get_osm_params(
6034 db_vnfr, vdu_id, vdu_count_index=vdu_index
6035 )
6036 if descriptor_config:
6037 vdu_name = None
6038 kdu_name = None
6039 stage[
6040 1
6041 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6042 member_vnf_index, vdu_id, vdu_index
6043 )
6044 stage[2] = step = "Scaling out VCA"
6045 self._write_op_status(op_id=nslcmop_id, stage=stage)
6046 self._deploy_n2vc(
6047 logging_text=logging_text
6048 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6049 member_vnf_index, vdu_id, vdu_index
6050 ),
6051 db_nsr=db_nsr,
6052 db_vnfr=db_vnfr,
6053 nslcmop_id=nslcmop_id,
6054 nsr_id=nsr_id,
6055 nsi_id=nsi_id,
6056 vnfd_id=vnfd_id,
6057 vdu_id=vdu_id,
6058 kdu_name=kdu_name,
6059 member_vnf_index=member_vnf_index,
6060 vdu_index=vdu_index,
6061 vdu_name=vdu_name,
6062 deploy_params=deploy_params_vdu,
6063 descriptor_config=descriptor_config,
6064 base_folder=base_folder,
6065 task_instantiation_info=tasks_dict_info,
6066 stage=stage,
6067 )
6068 else:
6069 kdu_name = vca_info["osm_kdu_id"]
6070 descriptor_config = get_configuration(db_vnfd, kdu_name)
6071 if descriptor_config:
6072 vdu_id = None
6073 kdu_index = int(vca_info["kdu_index"])
6074 vdu_name = None
6075 kdur = next(
6076 x
6077 for x in db_vnfr["kdur"]
6078 if x["kdu-name"] == kdu_name
6079 )
6080 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6081 if kdur.get("additionalParams"):
6082 deploy_params_kdu = parse_yaml_strings(
6083 kdur["additionalParams"]
6084 )
6085
6086 self._deploy_n2vc(
6087 logging_text=logging_text,
6088 db_nsr=db_nsr,
6089 db_vnfr=db_vnfr,
6090 nslcmop_id=nslcmop_id,
6091 nsr_id=nsr_id,
6092 nsi_id=nsi_id,
6093 vnfd_id=vnfd_id,
6094 vdu_id=vdu_id,
6095 kdu_name=kdu_name,
6096 member_vnf_index=member_vnf_index,
6097 vdu_index=kdu_index,
6098 vdu_name=vdu_name,
6099 deploy_params=deploy_params_kdu,
6100 descriptor_config=descriptor_config,
6101 base_folder=base_folder,
6102 task_instantiation_info=tasks_dict_info,
6103 stage=stage,
6104 )
6105 # SCALE-UP VCA - END
6106 scale_process = None
6107
6108 # POST-SCALE BEGIN
6109 # execute primitive service POST-SCALING
6110 step = "Executing post-scale vnf-config-primitive"
6111 if scaling_descriptor.get("scaling-config-action"):
6112 for scaling_config_action in scaling_descriptor[
6113 "scaling-config-action"
6114 ]:
6115 if (
6116 scaling_config_action.get("trigger") == "post-scale-in"
6117 and scaling_type == "SCALE_IN"
6118 ) or (
6119 scaling_config_action.get("trigger") == "post-scale-out"
6120 and scaling_type == "SCALE_OUT"
6121 ):
6122 vnf_config_primitive = scaling_config_action[
6123 "vnf-config-primitive-name-ref"
6124 ]
6125 step = db_nslcmop_update[
6126 "detailed-status"
6127 ] = "executing post-scale scaling-config-action '{}'".format(
6128 vnf_config_primitive
6129 )
6130
6131 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6132 if db_vnfr.get("additionalParamsForVnf"):
6133 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6134
6135 # look for primitive
6136 for config_primitive in (
6137 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6138 ).get("config-primitive", ()):
6139 if config_primitive["name"] == vnf_config_primitive:
6140 break
6141 else:
6142 raise LcmException(
6143 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6144 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6145 "config-primitive".format(
6146 scaling_group, vnf_config_primitive
6147 )
6148 )
6149 scale_process = "VCA"
6150 db_nsr_update["config-status"] = "configuring post-scaling"
6151 primitive_params = self._map_primitive_params(
6152 config_primitive, {}, vnfr_params
6153 )
6154
6155 # Post-scale retry check: Check if this sub-operation has been executed before
6156 op_index = self._check_or_add_scale_suboperation(
6157 db_nslcmop,
6158 vnf_index,
6159 vnf_config_primitive,
6160 primitive_params,
6161 "POST-SCALE",
6162 )
6163 if op_index == self.SUBOPERATION_STATUS_SKIP:
6164 # Skip sub-operation
6165 result = "COMPLETED"
6166 result_detail = "Done"
6167 self.logger.debug(
6168 logging_text
6169 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6170 vnf_config_primitive, result, result_detail
6171 )
6172 )
6173 else:
6174 if op_index == self.SUBOPERATION_STATUS_NEW:
6175 # New sub-operation: Get index of this sub-operation
6176 op_index = (
6177 len(db_nslcmop.get("_admin", {}).get("operations"))
6178 - 1
6179 )
6180 self.logger.debug(
6181 logging_text
6182 + "vnf_config_primitive={} New sub-operation".format(
6183 vnf_config_primitive
6184 )
6185 )
6186 else:
6187 # retry: Get registered params for this existing sub-operation
6188 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6189 op_index
6190 ]
6191 vnf_index = op.get("member_vnf_index")
6192 vnf_config_primitive = op.get("primitive")
6193 primitive_params = op.get("primitive_params")
6194 self.logger.debug(
6195 logging_text
6196 + "vnf_config_primitive={} Sub-operation retry".format(
6197 vnf_config_primitive
6198 )
6199 )
6200 # Execute the primitive, either with new (first-time) or registered (reintent) args
6201 ee_descriptor_id = config_primitive.get(
6202 "execution-environment-ref"
6203 )
6204 primitive_name = config_primitive.get(
6205 "execution-environment-primitive", vnf_config_primitive
6206 )
6207 ee_id, vca_type = self._look_for_deployed_vca(
6208 nsr_deployed["VCA"],
6209 member_vnf_index=vnf_index,
6210 vdu_id=None,
6211 vdu_count_index=None,
6212 ee_descriptor_id=ee_descriptor_id,
6213 )
6214 result, result_detail = await self._ns_execute_primitive(
6215 ee_id,
6216 primitive_name,
6217 primitive_params,
6218 vca_type=vca_type,
6219 vca_id=vca_id,
6220 )
6221 self.logger.debug(
6222 logging_text
6223 + "vnf_config_primitive={} Done with result {} {}".format(
6224 vnf_config_primitive, result, result_detail
6225 )
6226 )
6227 # Update operationState = COMPLETED | FAILED
6228 self._update_suboperation_status(
6229 db_nslcmop, op_index, result, result_detail
6230 )
6231
6232 if result == "FAILED":
6233 raise LcmException(result_detail)
6234 db_nsr_update["config-status"] = old_config_status
6235 scale_process = None
6236 # POST-SCALE END
6237
6238 db_nsr_update[
6239 "detailed-status"
6240 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6241 db_nsr_update["operational-status"] = (
6242 "running"
6243 if old_operational_status == "failed"
6244 else old_operational_status
6245 )
6246 db_nsr_update["config-status"] = old_config_status
6247 return
6248 except (
6249 ROclient.ROClientException,
6250 DbException,
6251 LcmException,
6252 NgRoException,
6253 ) as e:
6254 self.logger.error(logging_text + "Exit Exception {}".format(e))
6255 exc = e
6256 except asyncio.CancelledError:
6257 self.logger.error(
6258 logging_text + "Cancelled Exception while '{}'".format(step)
6259 )
6260 exc = "Operation was cancelled"
6261 except Exception as e:
6262 exc = traceback.format_exc()
6263 self.logger.critical(
6264 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6265 exc_info=True,
6266 )
6267 finally:
6268 self._write_ns_status(
6269 nsr_id=nsr_id,
6270 ns_state=None,
6271 current_operation="IDLE",
6272 current_operation_id=None,
6273 )
6274 if tasks_dict_info:
6275 stage[1] = "Waiting for instantiate pending tasks."
6276 self.logger.debug(logging_text + stage[1])
6277 exc = await self._wait_for_tasks(
6278 logging_text,
6279 tasks_dict_info,
6280 self.timeout_ns_deploy,
6281 stage,
6282 nslcmop_id,
6283 nsr_id=nsr_id,
6284 )
6285 if exc:
6286 db_nslcmop_update[
6287 "detailed-status"
6288 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6289 nslcmop_operation_state = "FAILED"
6290 if db_nsr:
6291 db_nsr_update["operational-status"] = old_operational_status
6292 db_nsr_update["config-status"] = old_config_status
6293 db_nsr_update["detailed-status"] = ""
6294 if scale_process:
6295 if "VCA" in scale_process:
6296 db_nsr_update["config-status"] = "failed"
6297 if "RO" in scale_process:
6298 db_nsr_update["operational-status"] = "failed"
6299 db_nsr_update[
6300 "detailed-status"
6301 ] = "FAILED scaling nslcmop={} {}: {}".format(
6302 nslcmop_id, step, exc
6303 )
6304 else:
6305 error_description_nslcmop = None
6306 nslcmop_operation_state = "COMPLETED"
6307 db_nslcmop_update["detailed-status"] = "Done"
6308
6309 self._write_op_status(
6310 op_id=nslcmop_id,
6311 stage="",
6312 error_message=error_description_nslcmop,
6313 operation_state=nslcmop_operation_state,
6314 other_update=db_nslcmop_update,
6315 )
6316 if db_nsr:
6317 self._write_ns_status(
6318 nsr_id=nsr_id,
6319 ns_state=None,
6320 current_operation="IDLE",
6321 current_operation_id=None,
6322 other_update=db_nsr_update,
6323 )
6324
6325 if nslcmop_operation_state:
6326 try:
6327 msg = {
6328 "nsr_id": nsr_id,
6329 "nslcmop_id": nslcmop_id,
6330 "operationState": nslcmop_operation_state,
6331 }
6332 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6333 except Exception as e:
6334 self.logger.error(
6335 logging_text + "kafka_write notification Exception {}".format(e)
6336 )
6337 self.logger.debug(logging_text + "Exit")
6338 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6339
6340 async def _scale_kdu(
6341 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6342 ):
6343 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6344 for kdu_name in _scaling_info:
6345 for kdu_scaling_info in _scaling_info[kdu_name]:
6346 deployed_kdu, index = get_deployed_kdu(
6347 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6348 )
6349 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6350 kdu_instance = deployed_kdu["kdu-instance"]
6351 scale = int(kdu_scaling_info["scale"])
6352 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6353
6354 db_dict = {
6355 "collection": "nsrs",
6356 "filter": {"_id": nsr_id},
6357 "path": "_admin.deployed.K8s.{}".format(index),
6358 }
6359
6360 step = "scaling application {}".format(
6361 kdu_scaling_info["resource-name"]
6362 )
6363 self.logger.debug(logging_text + step)
6364
6365 if kdu_scaling_info["type"] == "delete":
6366 kdu_config = get_configuration(db_vnfd, kdu_name)
6367 if (
6368 kdu_config
6369 and kdu_config.get("terminate-config-primitive")
6370 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6371 ):
6372 terminate_config_primitive_list = kdu_config.get(
6373 "terminate-config-primitive"
6374 )
6375 terminate_config_primitive_list.sort(
6376 key=lambda val: int(val["seq"])
6377 )
6378
6379 for (
6380 terminate_config_primitive
6381 ) in terminate_config_primitive_list:
6382 primitive_params_ = self._map_primitive_params(
6383 terminate_config_primitive, {}, {}
6384 )
6385 step = "execute terminate config primitive"
6386 self.logger.debug(logging_text + step)
6387 await asyncio.wait_for(
6388 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6389 cluster_uuid=cluster_uuid,
6390 kdu_instance=kdu_instance,
6391 primitive_name=terminate_config_primitive["name"],
6392 params=primitive_params_,
6393 db_dict=db_dict,
6394 vca_id=vca_id,
6395 ),
6396 timeout=600,
6397 )
6398
6399 await asyncio.wait_for(
6400 self.k8scluster_map[k8s_cluster_type].scale(
6401 kdu_instance,
6402 scale,
6403 kdu_scaling_info["resource-name"],
6404 vca_id=vca_id,
6405 ),
6406 timeout=self.timeout_vca_on_error,
6407 )
6408
6409 if kdu_scaling_info["type"] == "create":
6410 kdu_config = get_configuration(db_vnfd, kdu_name)
6411 if (
6412 kdu_config
6413 and kdu_config.get("initial-config-primitive")
6414 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6415 ):
6416 initial_config_primitive_list = kdu_config.get(
6417 "initial-config-primitive"
6418 )
6419 initial_config_primitive_list.sort(
6420 key=lambda val: int(val["seq"])
6421 )
6422
6423 for initial_config_primitive in initial_config_primitive_list:
6424 primitive_params_ = self._map_primitive_params(
6425 initial_config_primitive, {}, {}
6426 )
6427 step = "execute initial config primitive"
6428 self.logger.debug(logging_text + step)
6429 await asyncio.wait_for(
6430 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6431 cluster_uuid=cluster_uuid,
6432 kdu_instance=kdu_instance,
6433 primitive_name=initial_config_primitive["name"],
6434 params=primitive_params_,
6435 db_dict=db_dict,
6436 vca_id=vca_id,
6437 ),
6438 timeout=600,
6439 )
6440
6441 async def _scale_ng_ro(
6442 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6443 ):
6444 nsr_id = db_nslcmop["nsInstanceId"]
6445 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6446 db_vnfrs = {}
6447
6448 # read from db: vnfd's for every vnf
6449 db_vnfds = []
6450
6451 # for each vnf in ns, read vnfd
6452 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6453 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6454 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6455 # if we haven't this vnfd, read it from db
6456 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6457 # read from db
6458 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6459 db_vnfds.append(vnfd)
6460 n2vc_key = self.n2vc.get_public_key()
6461 n2vc_key_list = [n2vc_key]
6462 self.scale_vnfr(
6463 db_vnfr,
6464 vdu_scaling_info.get("vdu-create"),
6465 vdu_scaling_info.get("vdu-delete"),
6466 mark_delete=True,
6467 )
6468 # db_vnfr has been updated, update db_vnfrs to use it
6469 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6470 await self._instantiate_ng_ro(
6471 logging_text,
6472 nsr_id,
6473 db_nsd,
6474 db_nsr,
6475 db_nslcmop,
6476 db_vnfrs,
6477 db_vnfds,
6478 n2vc_key_list,
6479 stage=stage,
6480 start_deploy=time(),
6481 timeout_ns_deploy=self.timeout_ns_deploy,
6482 )
6483 if vdu_scaling_info.get("vdu-delete"):
6484 self.scale_vnfr(
6485 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6486 )
6487
6488 async def extract_prometheus_scrape_jobs(
6489 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6490 ):
6491 # look if exist a file called 'prometheus*.j2' and
6492 artifact_content = self.fs.dir_ls(artifact_path)
6493 job_file = next(
6494 (
6495 f
6496 for f in artifact_content
6497 if f.startswith("prometheus") and f.endswith(".j2")
6498 ),
6499 None,
6500 )
6501 if not job_file:
6502 return
6503 with self.fs.file_open((artifact_path, job_file), "r") as f:
6504 job_data = f.read()
6505
6506 # TODO get_service
6507 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6508 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6509 host_port = "80"
6510 vnfr_id = vnfr_id.replace("-", "")
6511 variables = {
6512 "JOB_NAME": vnfr_id,
6513 "TARGET_IP": target_ip,
6514 "EXPORTER_POD_IP": host_name,
6515 "EXPORTER_POD_PORT": host_port,
6516 }
6517 job_list = parse_job(job_data, variables)
6518 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6519 for job in job_list:
6520 if (
6521 not isinstance(job.get("job_name"), str)
6522 or vnfr_id not in job["job_name"]
6523 ):
6524 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6525 job["nsr_id"] = nsr_id
6526 job["vnfr_id"] = vnfr_id
6527 return job_list
6528
6529 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6530 """
6531 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6532
6533 :param: vim_account_id: VIM Account ID
6534
6535 :return: (cloud_name, cloud_credential)
6536 """
6537 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6538 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6539
6540 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6541 """
6542 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6543
6544 :param: vim_account_id: VIM Account ID
6545
6546 :return: (cloud_name, cloud_credential)
6547 """
6548 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6549 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")