Add implicit osm-config configuration parameter to charm
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_kdu,
66 get_kdu_services,
67 get_relation_list,
68 get_vdu_list,
69 get_vdu_profile,
70 get_ee_sorted_initial_config_primitive_list,
71 get_ee_sorted_terminate_config_primitive_list,
72 get_kdu_list,
73 get_virtual_link_profiles,
74 get_vdu,
75 get_configuration,
76 get_vdu_index,
77 get_scaling_aspect,
78 get_number_of_instances,
79 get_juju_ee_ref,
80 get_kdu_resource_profile,
81 )
82 from osm_lcm.data_utils.list_utils import find_in_list
83 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
84 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
85 from osm_lcm.data_utils.database.vim_account import VimAccountDB
86 from n2vc.definitions import RelationEndpoint
87 from n2vc.k8s_helm_conn import K8sHelmConnector
88 from n2vc.k8s_helm3_conn import K8sHelm3Connector
89 from n2vc.k8s_juju_conn import K8sJujuConnector
90
91 from osm_common.dbbase import DbException
92 from osm_common.fsbase import FsException
93
94 from osm_lcm.data_utils.database.database import Database
95 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
96
97 from n2vc.n2vc_juju_conn import N2VCJujuConnector
98 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
99
100 from osm_lcm.lcm_helm_conn import LCMHelmConn
101 from osm_lcm.osm_config import OsmConfigBuilder
102 from osm_lcm.prometheus import parse_job
103
104 from copy import copy, deepcopy
105 from time import time
106 from uuid import uuid4
107
108 from random import randint
109
110 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
111
112
113 class NsLcm(LcmBase):
114 timeout_vca_on_error = (
115 5 * 60
116 ) # Time for charm from first time at blocked,error status to mark as failed
117 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
118 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
119 timeout_charm_delete = 10 * 60
120 timeout_primitive = 30 * 60 # timeout for primitive execution
121 timeout_progress_primitive = (
122 10 * 60
123 ) # timeout for some progress in a primitive execution
124
125 SUBOPERATION_STATUS_NOT_FOUND = -1
126 SUBOPERATION_STATUS_NEW = -2
127 SUBOPERATION_STATUS_SKIP = -3
128 task_name_deploy_vca = "Deploying VCA"
129
130 def __init__(self, msg, lcm_tasks, config, loop):
131 """
132 Init, Connect to database, filesystem storage, and messaging
133 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
134 :return: None
135 """
136 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
137
138 self.db = Database().instance.db
139 self.fs = Filesystem().instance.fs
140 self.loop = loop
141 self.lcm_tasks = lcm_tasks
142 self.timeout = config["timeout"]
143 self.ro_config = config["ro_config"]
144 self.ng_ro = config["ro_config"].get("ng")
145 self.vca_config = config["VCA"].copy()
146
147 # create N2VC connector
148 self.n2vc = N2VCJujuConnector(
149 log=self.logger,
150 loop=self.loop,
151 on_update_db=self._on_update_n2vc_db,
152 fs=self.fs,
153 db=self.db,
154 )
155
156 self.conn_helm_ee = LCMHelmConn(
157 log=self.logger,
158 loop=self.loop,
159 vca_config=self.vca_config,
160 on_update_db=self._on_update_n2vc_db,
161 )
162
163 self.k8sclusterhelm2 = K8sHelmConnector(
164 kubectl_command=self.vca_config.get("kubectlpath"),
165 helm_command=self.vca_config.get("helmpath"),
166 log=self.logger,
167 on_update_db=None,
168 fs=self.fs,
169 db=self.db,
170 )
171
172 self.k8sclusterhelm3 = K8sHelm3Connector(
173 kubectl_command=self.vca_config.get("kubectlpath"),
174 helm_command=self.vca_config.get("helm3path"),
175 fs=self.fs,
176 log=self.logger,
177 db=self.db,
178 on_update_db=None,
179 )
180
181 self.k8sclusterjuju = K8sJujuConnector(
182 kubectl_command=self.vca_config.get("kubectlpath"),
183 juju_command=self.vca_config.get("jujupath"),
184 log=self.logger,
185 loop=self.loop,
186 on_update_db=self._on_update_k8s_db,
187 fs=self.fs,
188 db=self.db,
189 )
190
191 self.k8scluster_map = {
192 "helm-chart": self.k8sclusterhelm2,
193 "helm-chart-v3": self.k8sclusterhelm3,
194 "chart": self.k8sclusterhelm3,
195 "juju-bundle": self.k8sclusterjuju,
196 "juju": self.k8sclusterjuju,
197 }
198
199 self.vca_map = {
200 "lxc_proxy_charm": self.n2vc,
201 "native_charm": self.n2vc,
202 "k8s_proxy_charm": self.n2vc,
203 "helm": self.conn_helm_ee,
204 "helm-v3": self.conn_helm_ee,
205 }
206
207 # create RO client
208 self.RO = NgRoClient(self.loop, **self.ro_config)
209
210 @staticmethod
211 def increment_ip_mac(ip_mac, vm_index=1):
212 if not isinstance(ip_mac, str):
213 return ip_mac
214 try:
215 # try with ipv4 look for last dot
216 i = ip_mac.rfind(".")
217 if i > 0:
218 i += 1
219 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
220 # try with ipv6 or mac look for last colon. Operate in hex
221 i = ip_mac.rfind(":")
222 if i > 0:
223 i += 1
224 # format in hex, len can be 2 for mac or 4 for ipv6
225 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
226 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
227 )
228 except Exception:
229 pass
230 return None
231
232 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
233
234 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
235
236 try:
237 # TODO filter RO descriptor fields...
238
239 # write to database
240 db_dict = dict()
241 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
242 db_dict["deploymentStatus"] = ro_descriptor
243 self.update_db_2("nsrs", nsrs_id, db_dict)
244
245 except Exception as e:
246 self.logger.warn(
247 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
248 )
249
250 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
251
252 # remove last dot from path (if exists)
253 if path.endswith("."):
254 path = path[:-1]
255
256 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
257 # .format(table, filter, path, updated_data))
258 try:
259
260 nsr_id = filter.get("_id")
261
262 # read ns record from database
263 nsr = self.db.get_one(table="nsrs", q_filter=filter)
264 current_ns_status = nsr.get("nsState")
265
266 # get vca status for NS
267 status_dict = await self.n2vc.get_status(
268 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
269 )
270
271 # vcaStatus
272 db_dict = dict()
273 db_dict["vcaStatus"] = status_dict
274 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
275
276 # update configurationStatus for this VCA
277 try:
278 vca_index = int(path[path.rfind(".") + 1 :])
279
280 vca_list = deep_get(
281 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
282 )
283 vca_status = vca_list[vca_index].get("status")
284
285 configuration_status_list = nsr.get("configurationStatus")
286 config_status = configuration_status_list[vca_index].get("status")
287
288 if config_status == "BROKEN" and vca_status != "failed":
289 db_dict["configurationStatus"][vca_index] = "READY"
290 elif config_status != "BROKEN" and vca_status == "failed":
291 db_dict["configurationStatus"][vca_index] = "BROKEN"
292 except Exception as e:
293 # not update configurationStatus
294 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
295
296 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
297 # if nsState = 'DEGRADED' check if all is OK
298 is_degraded = False
299 if current_ns_status in ("READY", "DEGRADED"):
300 error_description = ""
301 # check machines
302 if status_dict.get("machines"):
303 for machine_id in status_dict.get("machines"):
304 machine = status_dict.get("machines").get(machine_id)
305 # check machine agent-status
306 if machine.get("agent-status"):
307 s = machine.get("agent-status").get("status")
308 if s != "started":
309 is_degraded = True
310 error_description += (
311 "machine {} agent-status={} ; ".format(
312 machine_id, s
313 )
314 )
315 # check machine instance status
316 if machine.get("instance-status"):
317 s = machine.get("instance-status").get("status")
318 if s != "running":
319 is_degraded = True
320 error_description += (
321 "machine {} instance-status={} ; ".format(
322 machine_id, s
323 )
324 )
325 # check applications
326 if status_dict.get("applications"):
327 for app_id in status_dict.get("applications"):
328 app = status_dict.get("applications").get(app_id)
329 # check application status
330 if app.get("status"):
331 s = app.get("status").get("status")
332 if s != "active":
333 is_degraded = True
334 error_description += (
335 "application {} status={} ; ".format(app_id, s)
336 )
337
338 if error_description:
339 db_dict["errorDescription"] = error_description
340 if current_ns_status == "READY" and is_degraded:
341 db_dict["nsState"] = "DEGRADED"
342 if current_ns_status == "DEGRADED" and not is_degraded:
343 db_dict["nsState"] = "READY"
344
345 # write to database
346 self.update_db_2("nsrs", nsr_id, db_dict)
347
348 except (asyncio.CancelledError, asyncio.TimeoutError):
349 raise
350 except Exception as e:
351 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
352
353 async def _on_update_k8s_db(
354 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
355 ):
356 """
357 Updating vca status in NSR record
358 :param cluster_uuid: UUID of a k8s cluster
359 :param kdu_instance: The unique name of the KDU instance
360 :param filter: To get nsr_id
361 :cluster_type: The cluster type (juju, k8s)
362 :return: none
363 """
364
365 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
366 # .format(cluster_uuid, kdu_instance, filter))
367
368 nsr_id = filter.get("_id")
369 try:
370 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
371 cluster_uuid=cluster_uuid,
372 kdu_instance=kdu_instance,
373 yaml_format=False,
374 complete_status=True,
375 vca_id=vca_id,
376 )
377
378 # vcaStatus
379 db_dict = dict()
380 db_dict["vcaStatus"] = {nsr_id: vca_status}
381
382 if cluster_type in ("juju-bundle", "juju"):
383 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
384 # status in a similar way between Juju Bundles and Helm Charts on this side
385 await self.k8sclusterjuju.update_vca_status(
386 db_dict["vcaStatus"],
387 kdu_instance,
388 vca_id=vca_id,
389 )
390
391 self.logger.debug(
392 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
393 )
394
395 # write to database
396 self.update_db_2("nsrs", nsr_id, db_dict)
397 except (asyncio.CancelledError, asyncio.TimeoutError):
398 raise
399 except Exception as e:
400 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
401
402 @staticmethod
403 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
404 try:
405 env = Environment(undefined=StrictUndefined)
406 template = env.from_string(cloud_init_text)
407 return template.render(additional_params or {})
408 except UndefinedError as e:
409 raise LcmException(
410 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
411 "file, must be provided in the instantiation parameters inside the "
412 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
413 )
414 except (TemplateError, TemplateNotFound) as e:
415 raise LcmException(
416 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
417 vnfd_id, vdu_id, e
418 )
419 )
420
421 def _get_vdu_cloud_init_content(self, vdu, vnfd):
422 cloud_init_content = cloud_init_file = None
423 try:
424 if vdu.get("cloud-init-file"):
425 base_folder = vnfd["_admin"]["storage"]
426 if base_folder["pkg-dir"]:
427 cloud_init_file = "{}/{}/cloud_init/{}".format(
428 base_folder["folder"],
429 base_folder["pkg-dir"],
430 vdu["cloud-init-file"],
431 )
432 else:
433 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
434 base_folder["folder"],
435 vdu["cloud-init-file"],
436 )
437 with self.fs.file_open(cloud_init_file, "r") as ci_file:
438 cloud_init_content = ci_file.read()
439 elif vdu.get("cloud-init"):
440 cloud_init_content = vdu["cloud-init"]
441
442 return cloud_init_content
443 except FsException as e:
444 raise LcmException(
445 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
446 vnfd["id"], vdu["id"], cloud_init_file, e
447 )
448 )
449
450 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
451 vdur = next(
452 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
453 {}
454 )
455 additional_params = vdur.get("additionalParams")
456 return parse_yaml_strings(additional_params)
457
458 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
459 """
460 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
461 :param vnfd: input vnfd
462 :param new_id: overrides vnf id if provided
463 :param additionalParams: Instantiation params for VNFs provided
464 :param nsrId: Id of the NSR
465 :return: copy of vnfd
466 """
467 vnfd_RO = deepcopy(vnfd)
468 # remove unused by RO configuration, monitoring, scaling and internal keys
469 vnfd_RO.pop("_id", None)
470 vnfd_RO.pop("_admin", None)
471 vnfd_RO.pop("monitoring-param", None)
472 vnfd_RO.pop("scaling-group-descriptor", None)
473 vnfd_RO.pop("kdu", None)
474 vnfd_RO.pop("k8s-cluster", None)
475 if new_id:
476 vnfd_RO["id"] = new_id
477
478 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
479 for vdu in get_iterable(vnfd_RO, "vdu"):
480 vdu.pop("cloud-init-file", None)
481 vdu.pop("cloud-init", None)
482 return vnfd_RO
483
484 @staticmethod
485 def ip_profile_2_RO(ip_profile):
486 RO_ip_profile = deepcopy(ip_profile)
487 if "dns-server" in RO_ip_profile:
488 if isinstance(RO_ip_profile["dns-server"], list):
489 RO_ip_profile["dns-address"] = []
490 for ds in RO_ip_profile.pop("dns-server"):
491 RO_ip_profile["dns-address"].append(ds["address"])
492 else:
493 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
494 if RO_ip_profile.get("ip-version") == "ipv4":
495 RO_ip_profile["ip-version"] = "IPv4"
496 if RO_ip_profile.get("ip-version") == "ipv6":
497 RO_ip_profile["ip-version"] = "IPv6"
498 if "dhcp-params" in RO_ip_profile:
499 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
500 return RO_ip_profile
501
502 def _get_ro_vim_id_for_vim_account(self, vim_account):
503 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
504 if db_vim["_admin"]["operationalState"] != "ENABLED":
505 raise LcmException(
506 "VIM={} is not available. operationalState={}".format(
507 vim_account, db_vim["_admin"]["operationalState"]
508 )
509 )
510 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
511 return RO_vim_id
512
513 def get_ro_wim_id_for_wim_account(self, wim_account):
514 if isinstance(wim_account, str):
515 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
516 if db_wim["_admin"]["operationalState"] != "ENABLED":
517 raise LcmException(
518 "WIM={} is not available. operationalState={}".format(
519 wim_account, db_wim["_admin"]["operationalState"]
520 )
521 )
522 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
523 return RO_wim_id
524 else:
525 return wim_account
526
527 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
528
529 db_vdu_push_list = []
530 template_vdur = []
531 db_update = {"_admin.modified": time()}
532 if vdu_create:
533 for vdu_id, vdu_count in vdu_create.items():
534 vdur = next(
535 (
536 vdur
537 for vdur in reversed(db_vnfr["vdur"])
538 if vdur["vdu-id-ref"] == vdu_id
539 ),
540 None,
541 )
542 if not vdur:
543 # Read the template saved in the db:
544 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
545 vdur_template = db_vnfr.get("vdur-template")
546 if not vdur_template:
547 raise LcmException(
548 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
549 vdu_id
550 )
551 )
552 vdur = vdur_template[0]
553 #Delete a template from the database after using it
554 self.db.set_one("vnfrs",
555 {"_id": db_vnfr["_id"]},
556 None,
557 pull={"vdur-template": {"_id": vdur['_id']}}
558 )
559 for count in range(vdu_count):
560 vdur_copy = deepcopy(vdur)
561 vdur_copy["status"] = "BUILD"
562 vdur_copy["status-detailed"] = None
563 vdur_copy["ip-address"] = None
564 vdur_copy["_id"] = str(uuid4())
565 vdur_copy["count-index"] += count + 1
566 vdur_copy["id"] = "{}-{}".format(
567 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
568 )
569 vdur_copy.pop("vim_info", None)
570 for iface in vdur_copy["interfaces"]:
571 if iface.get("fixed-ip"):
572 iface["ip-address"] = self.increment_ip_mac(
573 iface["ip-address"], count + 1
574 )
575 else:
576 iface.pop("ip-address", None)
577 if iface.get("fixed-mac"):
578 iface["mac-address"] = self.increment_ip_mac(
579 iface["mac-address"], count + 1
580 )
581 else:
582 iface.pop("mac-address", None)
583 if db_vnfr["vdur"]:
584 iface.pop(
585 "mgmt_vnf", None
586 ) # only first vdu can be managment of vnf
587 db_vdu_push_list.append(vdur_copy)
588 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
589 if vdu_delete:
590 if len(db_vnfr["vdur"]) == 1:
591 # The scale will move to 0 instances
592 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
593 template_vdur = [db_vnfr["vdur"][0]]
594 for vdu_id, vdu_count in vdu_delete.items():
595 if mark_delete:
596 indexes_to_delete = [
597 iv[0]
598 for iv in enumerate(db_vnfr["vdur"])
599 if iv[1]["vdu-id-ref"] == vdu_id
600 ]
601 db_update.update(
602 {
603 "vdur.{}.status".format(i): "DELETING"
604 for i in indexes_to_delete[-vdu_count:]
605 }
606 )
607 else:
608 # it must be deleted one by one because common.db does not allow otherwise
609 vdus_to_delete = [
610 v
611 for v in reversed(db_vnfr["vdur"])
612 if v["vdu-id-ref"] == vdu_id
613 ]
614 for vdu in vdus_to_delete[:vdu_count]:
615 self.db.set_one(
616 "vnfrs",
617 {"_id": db_vnfr["_id"]},
618 None,
619 pull={"vdur": {"_id": vdu["_id"]}},
620 )
621 db_push = {}
622 if db_vdu_push_list:
623 db_push["vdur"] = db_vdu_push_list
624 if template_vdur:
625 db_push["vdur-template"] = template_vdur
626 if not db_push:
627 db_push = None
628 db_vnfr["vdur-template"] = template_vdur
629 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
630 # modify passed dictionary db_vnfr
631 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
632 db_vnfr["vdur"] = db_vnfr_["vdur"]
633
634 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
635 """
636 Updates database nsr with the RO info for the created vld
637 :param ns_update_nsr: dictionary to be filled with the updated info
638 :param db_nsr: content of db_nsr. This is also modified
639 :param nsr_desc_RO: nsr descriptor from RO
640 :return: Nothing, LcmException is raised on errors
641 """
642
643 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
644 for net_RO in get_iterable(nsr_desc_RO, "nets"):
645 if vld["id"] != net_RO.get("ns_net_osm_id"):
646 continue
647 vld["vim-id"] = net_RO.get("vim_net_id")
648 vld["name"] = net_RO.get("vim_name")
649 vld["status"] = net_RO.get("status")
650 vld["status-detailed"] = net_RO.get("error_msg")
651 ns_update_nsr["vld.{}".format(vld_index)] = vld
652 break
653 else:
654 raise LcmException(
655 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
656 )
657
658 def set_vnfr_at_error(self, db_vnfrs, error_text):
659 try:
660 for db_vnfr in db_vnfrs.values():
661 vnfr_update = {"status": "ERROR"}
662 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
663 if "status" not in vdur:
664 vdur["status"] = "ERROR"
665 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
666 if error_text:
667 vdur["status-detailed"] = str(error_text)
668 vnfr_update[
669 "vdur.{}.status-detailed".format(vdu_index)
670 ] = "ERROR"
671 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
672 except DbException as e:
673 self.logger.error("Cannot update vnf. {}".format(e))
674
675 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
676 """
677 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
678 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
679 :param nsr_desc_RO: nsr descriptor from RO
680 :return: Nothing, LcmException is raised on errors
681 """
682 for vnf_index, db_vnfr in db_vnfrs.items():
683 for vnf_RO in nsr_desc_RO["vnfs"]:
684 if vnf_RO["member_vnf_index"] != vnf_index:
685 continue
686 vnfr_update = {}
687 if vnf_RO.get("ip_address"):
688 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
689 "ip_address"
690 ].split(";")[0]
691 elif not db_vnfr.get("ip-address"):
692 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
693 raise LcmExceptionNoMgmtIP(
694 "ns member_vnf_index '{}' has no IP address".format(
695 vnf_index
696 )
697 )
698
699 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
700 vdur_RO_count_index = 0
701 if vdur.get("pdu-type"):
702 continue
703 for vdur_RO in get_iterable(vnf_RO, "vms"):
704 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
705 continue
706 if vdur["count-index"] != vdur_RO_count_index:
707 vdur_RO_count_index += 1
708 continue
709 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
710 if vdur_RO.get("ip_address"):
711 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
712 else:
713 vdur["ip-address"] = None
714 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
715 vdur["name"] = vdur_RO.get("vim_name")
716 vdur["status"] = vdur_RO.get("status")
717 vdur["status-detailed"] = vdur_RO.get("error_msg")
718 for ifacer in get_iterable(vdur, "interfaces"):
719 for interface_RO in get_iterable(vdur_RO, "interfaces"):
720 if ifacer["name"] == interface_RO.get("internal_name"):
721 ifacer["ip-address"] = interface_RO.get(
722 "ip_address"
723 )
724 ifacer["mac-address"] = interface_RO.get(
725 "mac_address"
726 )
727 break
728 else:
729 raise LcmException(
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
731 "from VIM info".format(
732 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
733 )
734 )
735 vnfr_update["vdur.{}".format(vdu_index)] = vdur
736 break
737 else:
738 raise LcmException(
739 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
740 "VIM info".format(
741 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
742 )
743 )
744
745 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
746 for net_RO in get_iterable(nsr_desc_RO, "nets"):
747 if vld["id"] != net_RO.get("vnf_net_osm_id"):
748 continue
749 vld["vim-id"] = net_RO.get("vim_net_id")
750 vld["name"] = net_RO.get("vim_name")
751 vld["status"] = net_RO.get("status")
752 vld["status-detailed"] = net_RO.get("error_msg")
753 vnfr_update["vld.{}".format(vld_index)] = vld
754 break
755 else:
756 raise LcmException(
757 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
758 vnf_index, vld["id"]
759 )
760 )
761
762 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
763 break
764
765 else:
766 raise LcmException(
767 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
768 vnf_index
769 )
770 )
771
772 def _get_ns_config_info(self, nsr_id):
773 """
774 Generates a mapping between vnf,vdu elements and the N2VC id
775 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
776 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
777 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
778 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
779 """
780 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
781 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
782 mapping = {}
783 ns_config_info = {"osm-config-mapping": mapping}
784 for vca in vca_deployed_list:
785 if not vca["member-vnf-index"]:
786 continue
787 if not vca["vdu_id"]:
788 mapping[vca["member-vnf-index"]] = vca["application"]
789 else:
790 mapping[
791 "{}.{}.{}".format(
792 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
793 )
794 ] = vca["application"]
795 return ns_config_info
796
797 async def _instantiate_ng_ro(
798 self,
799 logging_text,
800 nsr_id,
801 nsd,
802 db_nsr,
803 db_nslcmop,
804 db_vnfrs,
805 db_vnfds,
806 n2vc_key_list,
807 stage,
808 start_deploy,
809 timeout_ns_deploy,
810 ):
811
812 db_vims = {}
813
814 def get_vim_account(vim_account_id):
815 nonlocal db_vims
816 if vim_account_id in db_vims:
817 return db_vims[vim_account_id]
818 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
819 db_vims[vim_account_id] = db_vim
820 return db_vim
821
822 # modify target_vld info with instantiation parameters
823 def parse_vld_instantiation_params(
824 target_vim, target_vld, vld_params, target_sdn
825 ):
826 if vld_params.get("ip-profile"):
827 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
828 "ip-profile"
829 ]
830 if vld_params.get("provider-network"):
831 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
832 "provider-network"
833 ]
834 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
835 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
836 "provider-network"
837 ]["sdn-ports"]
838 if vld_params.get("wimAccountId"):
839 target_wim = "wim:{}".format(vld_params["wimAccountId"])
840 target_vld["vim_info"][target_wim] = {}
841 for param in ("vim-network-name", "vim-network-id"):
842 if vld_params.get(param):
843 if isinstance(vld_params[param], dict):
844 for vim, vim_net in vld_params[param].items():
845 other_target_vim = "vim:" + vim
846 populate_dict(
847 target_vld["vim_info"],
848 (other_target_vim, param.replace("-", "_")),
849 vim_net,
850 )
851 else: # isinstance str
852 target_vld["vim_info"][target_vim][
853 param.replace("-", "_")
854 ] = vld_params[param]
855 if vld_params.get("common_id"):
856 target_vld["common_id"] = vld_params.get("common_id")
857
858 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
859 def update_ns_vld_target(target, ns_params):
860 for vnf_params in ns_params.get("vnf", ()):
861 if vnf_params.get("vimAccountId"):
862 target_vnf = next(
863 (
864 vnfr
865 for vnfr in db_vnfrs.values()
866 if vnf_params["member-vnf-index"]
867 == vnfr["member-vnf-index-ref"]
868 ),
869 None,
870 )
871 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
872 for a_index, a_vld in enumerate(target["ns"]["vld"]):
873 target_vld = find_in_list(
874 get_iterable(vdur, "interfaces"),
875 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
876 )
877 if target_vld:
878 if vnf_params.get("vimAccountId") not in a_vld.get(
879 "vim_info", {}
880 ):
881 target["ns"]["vld"][a_index].get("vim_info").update(
882 {
883 "vim:{}".format(vnf_params["vimAccountId"]): {
884 "vim_network_name": ""
885 }
886 }
887 )
888
889 nslcmop_id = db_nslcmop["_id"]
890 target = {
891 "name": db_nsr["name"],
892 "ns": {"vld": []},
893 "vnf": [],
894 "image": deepcopy(db_nsr["image"]),
895 "flavor": deepcopy(db_nsr["flavor"]),
896 "action_id": nslcmop_id,
897 "cloud_init_content": {},
898 }
899 for image in target["image"]:
900 image["vim_info"] = {}
901 for flavor in target["flavor"]:
902 flavor["vim_info"] = {}
903 if db_nsr.get("affinity-or-anti-affinity-group"):
904 target["affinity-or-anti-affinity-group"] = deepcopy(
905 db_nsr["affinity-or-anti-affinity-group"]
906 )
907 for affinity_or_anti_affinity_group in target[
908 "affinity-or-anti-affinity-group"
909 ]:
910 affinity_or_anti_affinity_group["vim_info"] = {}
911
912 if db_nslcmop.get("lcmOperationType") != "instantiate":
913 # get parameters of instantiation:
914 db_nslcmop_instantiate = self.db.get_list(
915 "nslcmops",
916 {
917 "nsInstanceId": db_nslcmop["nsInstanceId"],
918 "lcmOperationType": "instantiate",
919 },
920 )[-1]
921 ns_params = db_nslcmop_instantiate.get("operationParams")
922 else:
923 ns_params = db_nslcmop.get("operationParams")
924 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
925 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
926
927 cp2target = {}
928 for vld_index, vld in enumerate(db_nsr.get("vld")):
929 target_vim = "vim:{}".format(ns_params["vimAccountId"])
930 target_vld = {
931 "id": vld["id"],
932 "name": vld["name"],
933 "mgmt-network": vld.get("mgmt-network", False),
934 "type": vld.get("type"),
935 "vim_info": {
936 target_vim: {
937 "vim_network_name": vld.get("vim-network-name"),
938 "vim_account_id": ns_params["vimAccountId"],
939 }
940 },
941 }
942 # check if this network needs SDN assist
943 if vld.get("pci-interfaces"):
944 db_vim = get_vim_account(ns_params["vimAccountId"])
945 sdnc_id = db_vim["config"].get("sdn-controller")
946 if sdnc_id:
947 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
948 target_sdn = "sdn:{}".format(sdnc_id)
949 target_vld["vim_info"][target_sdn] = {
950 "sdn": True,
951 "target_vim": target_vim,
952 "vlds": [sdn_vld],
953 "type": vld.get("type"),
954 }
955
956 nsd_vnf_profiles = get_vnf_profiles(nsd)
957 for nsd_vnf_profile in nsd_vnf_profiles:
958 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
959 if cp["virtual-link-profile-id"] == vld["id"]:
960 cp2target[
961 "member_vnf:{}.{}".format(
962 cp["constituent-cpd-id"][0][
963 "constituent-base-element-id"
964 ],
965 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
966 )
967 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
968
969 # check at nsd descriptor, if there is an ip-profile
970 vld_params = {}
971 nsd_vlp = find_in_list(
972 get_virtual_link_profiles(nsd),
973 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
974 == vld["id"],
975 )
976 if (
977 nsd_vlp
978 and nsd_vlp.get("virtual-link-protocol-data")
979 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
980 ):
981 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
982 "l3-protocol-data"
983 ]
984 ip_profile_dest_data = {}
985 if "ip-version" in ip_profile_source_data:
986 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
987 "ip-version"
988 ]
989 if "cidr" in ip_profile_source_data:
990 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
991 "cidr"
992 ]
993 if "gateway-ip" in ip_profile_source_data:
994 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
995 "gateway-ip"
996 ]
997 if "dhcp-enabled" in ip_profile_source_data:
998 ip_profile_dest_data["dhcp-params"] = {
999 "enabled": ip_profile_source_data["dhcp-enabled"]
1000 }
1001 vld_params["ip-profile"] = ip_profile_dest_data
1002
1003 # update vld_params with instantiation params
1004 vld_instantiation_params = find_in_list(
1005 get_iterable(ns_params, "vld"),
1006 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1007 )
1008 if vld_instantiation_params:
1009 vld_params.update(vld_instantiation_params)
1010 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1011 target["ns"]["vld"].append(target_vld)
1012 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1013 update_ns_vld_target(target, ns_params)
1014
1015 for vnfr in db_vnfrs.values():
1016 vnfd = find_in_list(
1017 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1018 )
1019 vnf_params = find_in_list(
1020 get_iterable(ns_params, "vnf"),
1021 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1022 )
1023 target_vnf = deepcopy(vnfr)
1024 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1025 for vld in target_vnf.get("vld", ()):
1026 # check if connected to a ns.vld, to fill target'
1027 vnf_cp = find_in_list(
1028 vnfd.get("int-virtual-link-desc", ()),
1029 lambda cpd: cpd.get("id") == vld["id"],
1030 )
1031 if vnf_cp:
1032 ns_cp = "member_vnf:{}.{}".format(
1033 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1034 )
1035 if cp2target.get(ns_cp):
1036 vld["target"] = cp2target[ns_cp]
1037
1038 vld["vim_info"] = {
1039 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1040 }
1041 # check if this network needs SDN assist
1042 target_sdn = None
1043 if vld.get("pci-interfaces"):
1044 db_vim = get_vim_account(vnfr["vim-account-id"])
1045 sdnc_id = db_vim["config"].get("sdn-controller")
1046 if sdnc_id:
1047 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1048 target_sdn = "sdn:{}".format(sdnc_id)
1049 vld["vim_info"][target_sdn] = {
1050 "sdn": True,
1051 "target_vim": target_vim,
1052 "vlds": [sdn_vld],
1053 "type": vld.get("type"),
1054 }
1055
1056 # check at vnfd descriptor, if there is an ip-profile
1057 vld_params = {}
1058 vnfd_vlp = find_in_list(
1059 get_virtual_link_profiles(vnfd),
1060 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1061 )
1062 if (
1063 vnfd_vlp
1064 and vnfd_vlp.get("virtual-link-protocol-data")
1065 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1066 ):
1067 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1068 "l3-protocol-data"
1069 ]
1070 ip_profile_dest_data = {}
1071 if "ip-version" in ip_profile_source_data:
1072 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1073 "ip-version"
1074 ]
1075 if "cidr" in ip_profile_source_data:
1076 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1077 "cidr"
1078 ]
1079 if "gateway-ip" in ip_profile_source_data:
1080 ip_profile_dest_data[
1081 "gateway-address"
1082 ] = ip_profile_source_data["gateway-ip"]
1083 if "dhcp-enabled" in ip_profile_source_data:
1084 ip_profile_dest_data["dhcp-params"] = {
1085 "enabled": ip_profile_source_data["dhcp-enabled"]
1086 }
1087
1088 vld_params["ip-profile"] = ip_profile_dest_data
1089 # update vld_params with instantiation params
1090 if vnf_params:
1091 vld_instantiation_params = find_in_list(
1092 get_iterable(vnf_params, "internal-vld"),
1093 lambda i_vld: i_vld["name"] == vld["id"],
1094 )
1095 if vld_instantiation_params:
1096 vld_params.update(vld_instantiation_params)
1097 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1098
1099 vdur_list = []
1100 for vdur in target_vnf.get("vdur", ()):
1101 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1102 continue # This vdu must not be created
1103 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1104
1105 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1106
1107 if ssh_keys_all:
1108 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1109 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1110 if (
1111 vdu_configuration
1112 and vdu_configuration.get("config-access")
1113 and vdu_configuration.get("config-access").get("ssh-access")
1114 ):
1115 vdur["ssh-keys"] = ssh_keys_all
1116 vdur["ssh-access-required"] = vdu_configuration[
1117 "config-access"
1118 ]["ssh-access"]["required"]
1119 elif (
1120 vnf_configuration
1121 and vnf_configuration.get("config-access")
1122 and vnf_configuration.get("config-access").get("ssh-access")
1123 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1124 ):
1125 vdur["ssh-keys"] = ssh_keys_all
1126 vdur["ssh-access-required"] = vnf_configuration[
1127 "config-access"
1128 ]["ssh-access"]["required"]
1129 elif ssh_keys_instantiation and find_in_list(
1130 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1131 ):
1132 vdur["ssh-keys"] = ssh_keys_instantiation
1133
1134 self.logger.debug("NS > vdur > {}".format(vdur))
1135
1136 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1137 # cloud-init
1138 if vdud.get("cloud-init-file"):
1139 vdur["cloud-init"] = "{}:file:{}".format(
1140 vnfd["_id"], vdud.get("cloud-init-file")
1141 )
1142 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1143 if vdur["cloud-init"] not in target["cloud_init_content"]:
1144 base_folder = vnfd["_admin"]["storage"]
1145 if base_folder["pkg-dir"]:
1146 cloud_init_file = "{}/{}/cloud_init/{}".format(
1147 base_folder["folder"],
1148 base_folder["pkg-dir"],
1149 vdud.get("cloud-init-file"),
1150 )
1151 else:
1152 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1153 base_folder["folder"],
1154 vdud.get("cloud-init-file"),
1155 )
1156 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1157 target["cloud_init_content"][
1158 vdur["cloud-init"]
1159 ] = ci_file.read()
1160 elif vdud.get("cloud-init"):
1161 vdur["cloud-init"] = "{}:vdu:{}".format(
1162 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1163 )
1164 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1165 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1166 "cloud-init"
1167 ]
1168 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1169 deploy_params_vdu = self._format_additional_params(
1170 vdur.get("additionalParams") or {}
1171 )
1172 deploy_params_vdu["OSM"] = get_osm_params(
1173 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1174 )
1175 vdur["additionalParams"] = deploy_params_vdu
1176
1177 # flavor
1178 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1179 if target_vim not in ns_flavor["vim_info"]:
1180 ns_flavor["vim_info"][target_vim] = {}
1181
1182 # deal with images
1183 # in case alternative images are provided we must check if they should be applied
1184 # for the vim_type, modify the vim_type taking into account
1185 ns_image_id = int(vdur["ns-image-id"])
1186 if vdur.get("alt-image-ids"):
1187 db_vim = get_vim_account(vnfr["vim-account-id"])
1188 vim_type = db_vim["vim_type"]
1189 for alt_image_id in vdur.get("alt-image-ids"):
1190 ns_alt_image = target["image"][int(alt_image_id)]
1191 if vim_type == ns_alt_image.get("vim-type"):
1192 # must use alternative image
1193 self.logger.debug(
1194 "use alternative image id: {}".format(alt_image_id)
1195 )
1196 ns_image_id = alt_image_id
1197 vdur["ns-image-id"] = ns_image_id
1198 break
1199 ns_image = target["image"][int(ns_image_id)]
1200 if target_vim not in ns_image["vim_info"]:
1201 ns_image["vim_info"][target_vim] = {}
1202
1203 # Affinity groups
1204 if vdur.get("affinity-or-anti-affinity-group-id"):
1205 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1206 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1207 if target_vim not in ns_ags["vim_info"]:
1208 ns_ags["vim_info"][target_vim] = {}
1209
1210 vdur["vim_info"] = {target_vim: {}}
1211 # instantiation parameters
1212 # if vnf_params:
1213 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1214 # vdud["id"]), None)
1215 vdur_list.append(vdur)
1216 target_vnf["vdur"] = vdur_list
1217 target["vnf"].append(target_vnf)
1218
1219 desc = await self.RO.deploy(nsr_id, target)
1220 self.logger.debug("RO return > {}".format(desc))
1221 action_id = desc["action_id"]
1222 await self._wait_ng_ro(
1223 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1224 )
1225
1226 # Updating NSR
1227 db_nsr_update = {
1228 "_admin.deployed.RO.operational-status": "running",
1229 "detailed-status": " ".join(stage),
1230 }
1231 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1232 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1233 self._write_op_status(nslcmop_id, stage)
1234 self.logger.debug(
1235 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1236 )
1237 return
1238
1239 async def _wait_ng_ro(
1240 self,
1241 nsr_id,
1242 action_id,
1243 nslcmop_id=None,
1244 start_time=None,
1245 timeout=600,
1246 stage=None,
1247 ):
1248 detailed_status_old = None
1249 db_nsr_update = {}
1250 start_time = start_time or time()
1251 while time() <= start_time + timeout:
1252 desc_status = await self.RO.status(nsr_id, action_id)
1253 self.logger.debug("Wait NG RO > {}".format(desc_status))
1254 if desc_status["status"] == "FAILED":
1255 raise NgRoException(desc_status["details"])
1256 elif desc_status["status"] == "BUILD":
1257 if stage:
1258 stage[2] = "VIM: ({})".format(desc_status["details"])
1259 elif desc_status["status"] == "DONE":
1260 if stage:
1261 stage[2] = "Deployed at VIM"
1262 break
1263 else:
1264 assert False, "ROclient.check_ns_status returns unknown {}".format(
1265 desc_status["status"]
1266 )
1267 if stage and nslcmop_id and stage[2] != detailed_status_old:
1268 detailed_status_old = stage[2]
1269 db_nsr_update["detailed-status"] = " ".join(stage)
1270 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1271 self._write_op_status(nslcmop_id, stage)
1272 await asyncio.sleep(15, loop=self.loop)
1273 else: # timeout_ns_deploy
1274 raise NgRoException("Timeout waiting ns to deploy")
1275
1276 async def _terminate_ng_ro(
1277 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1278 ):
1279 db_nsr_update = {}
1280 failed_detail = []
1281 action_id = None
1282 start_deploy = time()
1283 try:
1284 target = {
1285 "ns": {"vld": []},
1286 "vnf": [],
1287 "image": [],
1288 "flavor": [],
1289 "action_id": nslcmop_id,
1290 }
1291 desc = await self.RO.deploy(nsr_id, target)
1292 action_id = desc["action_id"]
1293 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1294 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1295 self.logger.debug(
1296 logging_text
1297 + "ns terminate action at RO. action_id={}".format(action_id)
1298 )
1299
1300 # wait until done
1301 delete_timeout = 20 * 60 # 20 minutes
1302 await self._wait_ng_ro(
1303 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1304 )
1305
1306 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1307 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1308 # delete all nsr
1309 await self.RO.delete(nsr_id)
1310 except Exception as e:
1311 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1312 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1313 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1314 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1315 self.logger.debug(
1316 logging_text + "RO_action_id={} already deleted".format(action_id)
1317 )
1318 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1319 failed_detail.append("delete conflict: {}".format(e))
1320 self.logger.debug(
1321 logging_text
1322 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1323 )
1324 else:
1325 failed_detail.append("delete error: {}".format(e))
1326 self.logger.error(
1327 logging_text
1328 + "RO_action_id={} delete error: {}".format(action_id, e)
1329 )
1330
1331 if failed_detail:
1332 stage[2] = "Error deleting from VIM"
1333 else:
1334 stage[2] = "Deleted from VIM"
1335 db_nsr_update["detailed-status"] = " ".join(stage)
1336 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1337 self._write_op_status(nslcmop_id, stage)
1338
1339 if failed_detail:
1340 raise LcmException("; ".join(failed_detail))
1341 return
1342
1343 async def instantiate_RO(
1344 self,
1345 logging_text,
1346 nsr_id,
1347 nsd,
1348 db_nsr,
1349 db_nslcmop,
1350 db_vnfrs,
1351 db_vnfds,
1352 n2vc_key_list,
1353 stage,
1354 ):
1355 """
1356 Instantiate at RO
1357 :param logging_text: preffix text to use at logging
1358 :param nsr_id: nsr identity
1359 :param nsd: database content of ns descriptor
1360 :param db_nsr: database content of ns record
1361 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1362 :param db_vnfrs:
1363 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1364 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1365 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1366 :return: None or exception
1367 """
1368 try:
1369 start_deploy = time()
1370 ns_params = db_nslcmop.get("operationParams")
1371 if ns_params and ns_params.get("timeout_ns_deploy"):
1372 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1373 else:
1374 timeout_ns_deploy = self.timeout.get(
1375 "ns_deploy", self.timeout_ns_deploy
1376 )
1377
1378 # Check for and optionally request placement optimization. Database will be updated if placement activated
1379 stage[2] = "Waiting for Placement."
1380 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1381 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1382 for vnfr in db_vnfrs.values():
1383 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1384 break
1385 else:
1386 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1387
1388 return await self._instantiate_ng_ro(
1389 logging_text,
1390 nsr_id,
1391 nsd,
1392 db_nsr,
1393 db_nslcmop,
1394 db_vnfrs,
1395 db_vnfds,
1396 n2vc_key_list,
1397 stage,
1398 start_deploy,
1399 timeout_ns_deploy,
1400 )
1401 except Exception as e:
1402 stage[2] = "ERROR deploying at VIM"
1403 self.set_vnfr_at_error(db_vnfrs, str(e))
1404 self.logger.error(
1405 "Error deploying at VIM {}".format(e),
1406 exc_info=not isinstance(
1407 e,
1408 (
1409 ROclient.ROClientException,
1410 LcmException,
1411 DbException,
1412 NgRoException,
1413 ),
1414 ),
1415 )
1416 raise
1417
1418 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1419 """
1420 Wait for kdu to be up, get ip address
1421 :param logging_text: prefix use for logging
1422 :param nsr_id:
1423 :param vnfr_id:
1424 :param kdu_name:
1425 :return: IP address, K8s services
1426 """
1427
1428 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1429 nb_tries = 0
1430
1431 while nb_tries < 360:
1432 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1433 kdur = next(
1434 (
1435 x
1436 for x in get_iterable(db_vnfr, "kdur")
1437 if x.get("kdu-name") == kdu_name
1438 ),
1439 None,
1440 )
1441 if not kdur:
1442 raise LcmException(
1443 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1444 )
1445 if kdur.get("status"):
1446 if kdur["status"] in ("READY", "ENABLED"):
1447 return kdur.get("ip-address"), kdur.get("services")
1448 else:
1449 raise LcmException(
1450 "target KDU={} is in error state".format(kdu_name)
1451 )
1452
1453 await asyncio.sleep(10, loop=self.loop)
1454 nb_tries += 1
1455 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1456
1457 async def wait_vm_up_insert_key_ro(
1458 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1459 ):
1460 """
1461 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1462 :param logging_text: prefix use for logging
1463 :param nsr_id:
1464 :param vnfr_id:
1465 :param vdu_id:
1466 :param vdu_index:
1467 :param pub_key: public ssh key to inject, None to skip
1468 :param user: user to apply the public ssh key
1469 :return: IP address
1470 """
1471
1472 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1473 ro_nsr_id = None
1474 ip_address = None
1475 nb_tries = 0
1476 target_vdu_id = None
1477 ro_retries = 0
1478
1479 while True:
1480
1481 ro_retries += 1
1482 if ro_retries >= 360: # 1 hour
1483 raise LcmException(
1484 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1485 )
1486
1487 await asyncio.sleep(10, loop=self.loop)
1488
1489 # get ip address
1490 if not target_vdu_id:
1491 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1492
1493 if not vdu_id: # for the VNF case
1494 if db_vnfr.get("status") == "ERROR":
1495 raise LcmException(
1496 "Cannot inject ssh-key because target VNF is in error state"
1497 )
1498 ip_address = db_vnfr.get("ip-address")
1499 if not ip_address:
1500 continue
1501 vdur = next(
1502 (
1503 x
1504 for x in get_iterable(db_vnfr, "vdur")
1505 if x.get("ip-address") == ip_address
1506 ),
1507 None,
1508 )
1509 else: # VDU case
1510 vdur = next(
1511 (
1512 x
1513 for x in get_iterable(db_vnfr, "vdur")
1514 if x.get("vdu-id-ref") == vdu_id
1515 and x.get("count-index") == vdu_index
1516 ),
1517 None,
1518 )
1519
1520 if (
1521 not vdur and len(db_vnfr.get("vdur", ())) == 1
1522 ): # If only one, this should be the target vdu
1523 vdur = db_vnfr["vdur"][0]
1524 if not vdur:
1525 raise LcmException(
1526 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1527 vnfr_id, vdu_id, vdu_index
1528 )
1529 )
1530 # New generation RO stores information at "vim_info"
1531 ng_ro_status = None
1532 target_vim = None
1533 if vdur.get("vim_info"):
1534 target_vim = next(
1535 t for t in vdur["vim_info"]
1536 ) # there should be only one key
1537 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1538 if (
1539 vdur.get("pdu-type")
1540 or vdur.get("status") == "ACTIVE"
1541 or ng_ro_status == "ACTIVE"
1542 ):
1543 ip_address = vdur.get("ip-address")
1544 if not ip_address:
1545 continue
1546 target_vdu_id = vdur["vdu-id-ref"]
1547 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1548 raise LcmException(
1549 "Cannot inject ssh-key because target VM is in error state"
1550 )
1551
1552 if not target_vdu_id:
1553 continue
1554
1555 # inject public key into machine
1556 if pub_key and user:
1557 self.logger.debug(logging_text + "Inserting RO key")
1558 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1559 if vdur.get("pdu-type"):
1560 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1561 return ip_address
1562 try:
1563 ro_vm_id = "{}-{}".format(
1564 db_vnfr["member-vnf-index-ref"], target_vdu_id
1565 ) # TODO add vdu_index
1566 if self.ng_ro:
1567 target = {
1568 "action": {
1569 "action": "inject_ssh_key",
1570 "key": pub_key,
1571 "user": user,
1572 },
1573 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1574 }
1575 desc = await self.RO.deploy(nsr_id, target)
1576 action_id = desc["action_id"]
1577 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1578 break
1579 else:
1580 # wait until NS is deployed at RO
1581 if not ro_nsr_id:
1582 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1583 ro_nsr_id = deep_get(
1584 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1585 )
1586 if not ro_nsr_id:
1587 continue
1588 result_dict = await self.RO.create_action(
1589 item="ns",
1590 item_id_name=ro_nsr_id,
1591 descriptor={
1592 "add_public_key": pub_key,
1593 "vms": [ro_vm_id],
1594 "user": user,
1595 },
1596 )
1597 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1598 if not result_dict or not isinstance(result_dict, dict):
1599 raise LcmException(
1600 "Unknown response from RO when injecting key"
1601 )
1602 for result in result_dict.values():
1603 if result.get("vim_result") == 200:
1604 break
1605 else:
1606 raise ROclient.ROClientException(
1607 "error injecting key: {}".format(
1608 result.get("description")
1609 )
1610 )
1611 break
1612 except NgRoException as e:
1613 raise LcmException(
1614 "Reaching max tries injecting key. Error: {}".format(e)
1615 )
1616 except ROclient.ROClientException as e:
1617 if not nb_tries:
1618 self.logger.debug(
1619 logging_text
1620 + "error injecting key: {}. Retrying until {} seconds".format(
1621 e, 20 * 10
1622 )
1623 )
1624 nb_tries += 1
1625 if nb_tries >= 20:
1626 raise LcmException(
1627 "Reaching max tries injecting key. Error: {}".format(e)
1628 )
1629 else:
1630 break
1631
1632 return ip_address
1633
1634 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1635 """
1636 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1637 """
1638 my_vca = vca_deployed_list[vca_index]
1639 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1640 # vdu or kdu: no dependencies
1641 return
1642 timeout = 300
1643 while timeout >= 0:
1644 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1645 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1646 configuration_status_list = db_nsr["configurationStatus"]
1647 for index, vca_deployed in enumerate(configuration_status_list):
1648 if index == vca_index:
1649 # myself
1650 continue
1651 if not my_vca.get("member-vnf-index") or (
1652 vca_deployed.get("member-vnf-index")
1653 == my_vca.get("member-vnf-index")
1654 ):
1655 internal_status = configuration_status_list[index].get("status")
1656 if internal_status == "READY":
1657 continue
1658 elif internal_status == "BROKEN":
1659 raise LcmException(
1660 "Configuration aborted because dependent charm/s has failed"
1661 )
1662 else:
1663 break
1664 else:
1665 # no dependencies, return
1666 return
1667 await asyncio.sleep(10)
1668 timeout -= 1
1669
1670 raise LcmException("Configuration aborted because dependent charm/s timeout")
1671
1672 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1673 vca_id = None
1674 if db_vnfr:
1675 vca_id = deep_get(db_vnfr, ("vca-id",))
1676 elif db_nsr:
1677 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1678 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1679 return vca_id
1680
1681 async def instantiate_N2VC(
1682 self,
1683 logging_text,
1684 vca_index,
1685 nsi_id,
1686 db_nsr,
1687 db_vnfr,
1688 vdu_id,
1689 kdu_name,
1690 vdu_index,
1691 config_descriptor,
1692 deploy_params,
1693 base_folder,
1694 nslcmop_id,
1695 stage,
1696 vca_type,
1697 vca_name,
1698 ee_config_descriptor,
1699 ):
1700 nsr_id = db_nsr["_id"]
1701 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1702 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1703 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1704 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1705 db_dict = {
1706 "collection": "nsrs",
1707 "filter": {"_id": nsr_id},
1708 "path": db_update_entry,
1709 }
1710 step = ""
1711 try:
1712
1713 element_type = "NS"
1714 element_under_configuration = nsr_id
1715
1716 vnfr_id = None
1717 if db_vnfr:
1718 vnfr_id = db_vnfr["_id"]
1719 osm_config["osm"]["vnf_id"] = vnfr_id
1720
1721 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1722
1723 if vca_type == "native_charm":
1724 index_number = 0
1725 else:
1726 index_number = vdu_index or 0
1727
1728 if vnfr_id:
1729 element_type = "VNF"
1730 element_under_configuration = vnfr_id
1731 namespace += ".{}-{}".format(vnfr_id, index_number)
1732 if vdu_id:
1733 namespace += ".{}-{}".format(vdu_id, index_number)
1734 element_type = "VDU"
1735 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1736 osm_config["osm"]["vdu_id"] = vdu_id
1737 elif kdu_name:
1738 namespace += ".{}".format(kdu_name)
1739 element_type = "KDU"
1740 element_under_configuration = kdu_name
1741 osm_config["osm"]["kdu_name"] = kdu_name
1742
1743 # Get artifact path
1744 if base_folder["pkg-dir"]:
1745 artifact_path = "{}/{}/{}/{}".format(
1746 base_folder["folder"],
1747 base_folder["pkg-dir"],
1748 "charms"
1749 if vca_type
1750 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1751 else "helm-charts",
1752 vca_name,
1753 )
1754 else:
1755 artifact_path = "{}/Scripts/{}/{}/".format(
1756 base_folder["folder"],
1757 "charms"
1758 if vca_type
1759 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1760 else "helm-charts",
1761 vca_name,
1762 )
1763
1764 self.logger.debug("Artifact path > {}".format(artifact_path))
1765
1766 # get initial_config_primitive_list that applies to this element
1767 initial_config_primitive_list = config_descriptor.get(
1768 "initial-config-primitive"
1769 )
1770
1771 self.logger.debug(
1772 "Initial config primitive list > {}".format(
1773 initial_config_primitive_list
1774 )
1775 )
1776
1777 # add config if not present for NS charm
1778 ee_descriptor_id = ee_config_descriptor.get("id")
1779 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1780 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1781 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1782 )
1783
1784 self.logger.debug(
1785 "Initial config primitive list #2 > {}".format(
1786 initial_config_primitive_list
1787 )
1788 )
1789 # n2vc_redesign STEP 3.1
1790 # find old ee_id if exists
1791 ee_id = vca_deployed.get("ee_id")
1792
1793 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1794 # create or register execution environment in VCA
1795 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1796
1797 self._write_configuration_status(
1798 nsr_id=nsr_id,
1799 vca_index=vca_index,
1800 status="CREATING",
1801 element_under_configuration=element_under_configuration,
1802 element_type=element_type,
1803 )
1804
1805 step = "create execution environment"
1806 self.logger.debug(logging_text + step)
1807
1808 ee_id = None
1809 credentials = None
1810 if vca_type == "k8s_proxy_charm":
1811 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1812 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1813 namespace=namespace,
1814 artifact_path=artifact_path,
1815 db_dict=db_dict,
1816 vca_id=vca_id,
1817 )
1818 elif vca_type == "helm" or vca_type == "helm-v3":
1819 ee_id, credentials = await self.vca_map[
1820 vca_type
1821 ].create_execution_environment(
1822 namespace=namespace,
1823 reuse_ee_id=ee_id,
1824 db_dict=db_dict,
1825 config=osm_config,
1826 artifact_path=artifact_path,
1827 vca_type=vca_type,
1828 )
1829 else:
1830 ee_id, credentials = await self.vca_map[
1831 vca_type
1832 ].create_execution_environment(
1833 namespace=namespace,
1834 reuse_ee_id=ee_id,
1835 db_dict=db_dict,
1836 vca_id=vca_id,
1837 )
1838
1839 elif vca_type == "native_charm":
1840 step = "Waiting to VM being up and getting IP address"
1841 self.logger.debug(logging_text + step)
1842 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1843 logging_text,
1844 nsr_id,
1845 vnfr_id,
1846 vdu_id,
1847 vdu_index,
1848 user=None,
1849 pub_key=None,
1850 )
1851 credentials = {"hostname": rw_mgmt_ip}
1852 # get username
1853 username = deep_get(
1854 config_descriptor, ("config-access", "ssh-access", "default-user")
1855 )
1856 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1857 # merged. Meanwhile let's get username from initial-config-primitive
1858 if not username and initial_config_primitive_list:
1859 for config_primitive in initial_config_primitive_list:
1860 for param in config_primitive.get("parameter", ()):
1861 if param["name"] == "ssh-username":
1862 username = param["value"]
1863 break
1864 if not username:
1865 raise LcmException(
1866 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1867 "'config-access.ssh-access.default-user'"
1868 )
1869 credentials["username"] = username
1870 # n2vc_redesign STEP 3.2
1871
1872 self._write_configuration_status(
1873 nsr_id=nsr_id,
1874 vca_index=vca_index,
1875 status="REGISTERING",
1876 element_under_configuration=element_under_configuration,
1877 element_type=element_type,
1878 )
1879
1880 step = "register execution environment {}".format(credentials)
1881 self.logger.debug(logging_text + step)
1882 ee_id = await self.vca_map[vca_type].register_execution_environment(
1883 credentials=credentials,
1884 namespace=namespace,
1885 db_dict=db_dict,
1886 vca_id=vca_id,
1887 )
1888
1889 # for compatibility with MON/POL modules, the need model and application name at database
1890 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1891 ee_id_parts = ee_id.split(".")
1892 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1893 if len(ee_id_parts) >= 2:
1894 model_name = ee_id_parts[0]
1895 application_name = ee_id_parts[1]
1896 db_nsr_update[db_update_entry + "model"] = model_name
1897 db_nsr_update[db_update_entry + "application"] = application_name
1898
1899 # n2vc_redesign STEP 3.3
1900 step = "Install configuration Software"
1901
1902 self._write_configuration_status(
1903 nsr_id=nsr_id,
1904 vca_index=vca_index,
1905 status="INSTALLING SW",
1906 element_under_configuration=element_under_configuration,
1907 element_type=element_type,
1908 other_update=db_nsr_update,
1909 )
1910
1911 # TODO check if already done
1912 self.logger.debug(logging_text + step)
1913 config = None
1914 if vca_type == "native_charm":
1915 config_primitive = next(
1916 (p for p in initial_config_primitive_list if p["name"] == "config"),
1917 None,
1918 )
1919 if config_primitive:
1920 config = self._map_primitive_params(
1921 config_primitive, {}, deploy_params
1922 )
1923 num_units = 1
1924 if vca_type == "lxc_proxy_charm":
1925 if element_type == "NS":
1926 num_units = db_nsr.get("config-units") or 1
1927 elif element_type == "VNF":
1928 num_units = db_vnfr.get("config-units") or 1
1929 elif element_type == "VDU":
1930 for v in db_vnfr["vdur"]:
1931 if vdu_id == v["vdu-id-ref"]:
1932 num_units = v.get("config-units") or 1
1933 break
1934 if vca_type != "k8s_proxy_charm":
1935 await self.vca_map[vca_type].install_configuration_sw(
1936 ee_id=ee_id,
1937 artifact_path=artifact_path,
1938 db_dict=db_dict,
1939 config=config,
1940 num_units=num_units,
1941 vca_id=vca_id,
1942 vca_type=vca_type,
1943 )
1944
1945 # write in db flag of configuration_sw already installed
1946 self.update_db_2(
1947 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1948 )
1949
1950 # add relations for this VCA (wait for other peers related with this VCA)
1951 await self._add_vca_relations(
1952 logging_text=logging_text,
1953 nsr_id=nsr_id,
1954 vca_type=vca_type,
1955 vca_index=vca_index,
1956 )
1957
1958 # if SSH access is required, then get execution environment SSH public
1959 # if native charm we have waited already to VM be UP
1960 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1961 pub_key = None
1962 user = None
1963 # self.logger.debug("get ssh key block")
1964 if deep_get(
1965 config_descriptor, ("config-access", "ssh-access", "required")
1966 ):
1967 # self.logger.debug("ssh key needed")
1968 # Needed to inject a ssh key
1969 user = deep_get(
1970 config_descriptor,
1971 ("config-access", "ssh-access", "default-user"),
1972 )
1973 step = "Install configuration Software, getting public ssh key"
1974 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1975 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1976 )
1977
1978 step = "Insert public key into VM user={} ssh_key={}".format(
1979 user, pub_key
1980 )
1981 else:
1982 # self.logger.debug("no need to get ssh key")
1983 step = "Waiting to VM being up and getting IP address"
1984 self.logger.debug(logging_text + step)
1985
1986 # n2vc_redesign STEP 5.1
1987 # wait for RO (ip-address) Insert pub_key into VM
1988 if vnfr_id:
1989 if kdu_name:
1990 rw_mgmt_ip, services = await self.wait_kdu_up(
1991 logging_text, nsr_id, vnfr_id, kdu_name
1992 )
1993 vnfd = self.db.get_one(
1994 "vnfds_revisions",
1995 {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
1996 )
1997 kdu = get_kdu(vnfd, kdu_name)
1998 kdu_services = [
1999 service["name"] for service in get_kdu_services(kdu)
2000 ]
2001 exposed_services = []
2002 for service in services:
2003 if any(s in service["name"] for s in kdu_services):
2004 exposed_services.append(service)
2005 await self.vca_map[vca_type].exec_primitive(
2006 ee_id=ee_id,
2007 primitive_name="config",
2008 params_dict={
2009 "osm-config": json.dumps(
2010 OsmConfigBuilder(
2011 k8s={"services": exposed_services}
2012 ).build()
2013 )
2014 },
2015 vca_id=vca_id,
2016 )
2017 else:
2018 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
2019 logging_text,
2020 nsr_id,
2021 vnfr_id,
2022 vdu_id,
2023 vdu_index,
2024 user=user,
2025 pub_key=pub_key,
2026 )
2027
2028 else:
2029 rw_mgmt_ip = None # This is for a NS configuration
2030
2031 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2032
2033 # store rw_mgmt_ip in deploy params for later replacement
2034 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2035
2036 # n2vc_redesign STEP 6 Execute initial config primitive
2037 step = "execute initial config primitive"
2038
2039 # wait for dependent primitives execution (NS -> VNF -> VDU)
2040 if initial_config_primitive_list:
2041 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2042
2043 # stage, in function of element type: vdu, kdu, vnf or ns
2044 my_vca = vca_deployed_list[vca_index]
2045 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2046 # VDU or KDU
2047 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2048 elif my_vca.get("member-vnf-index"):
2049 # VNF
2050 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2051 else:
2052 # NS
2053 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2054
2055 self._write_configuration_status(
2056 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2057 )
2058
2059 self._write_op_status(op_id=nslcmop_id, stage=stage)
2060
2061 check_if_terminated_needed = True
2062 for initial_config_primitive in initial_config_primitive_list:
2063 # adding information on the vca_deployed if it is a NS execution environment
2064 if not vca_deployed["member-vnf-index"]:
2065 deploy_params["ns_config_info"] = json.dumps(
2066 self._get_ns_config_info(nsr_id)
2067 )
2068 # TODO check if already done
2069 primitive_params_ = self._map_primitive_params(
2070 initial_config_primitive, {}, deploy_params
2071 )
2072
2073 step = "execute primitive '{}' params '{}'".format(
2074 initial_config_primitive["name"], primitive_params_
2075 )
2076 self.logger.debug(logging_text + step)
2077 await self.vca_map[vca_type].exec_primitive(
2078 ee_id=ee_id,
2079 primitive_name=initial_config_primitive["name"],
2080 params_dict=primitive_params_,
2081 db_dict=db_dict,
2082 vca_id=vca_id,
2083 vca_type=vca_type,
2084 )
2085 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2086 if check_if_terminated_needed:
2087 if config_descriptor.get("terminate-config-primitive"):
2088 self.update_db_2(
2089 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2090 )
2091 check_if_terminated_needed = False
2092
2093 # TODO register in database that primitive is done
2094
2095 # STEP 7 Configure metrics
2096 if vca_type == "helm" or vca_type == "helm-v3":
2097 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2098 ee_id=ee_id,
2099 artifact_path=artifact_path,
2100 ee_config_descriptor=ee_config_descriptor,
2101 vnfr_id=vnfr_id,
2102 nsr_id=nsr_id,
2103 target_ip=rw_mgmt_ip,
2104 )
2105 if prometheus_jobs:
2106 self.update_db_2(
2107 "nsrs",
2108 nsr_id,
2109 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2110 )
2111
2112 for job in prometheus_jobs:
2113 self.db.set_one(
2114 "prometheus_jobs",
2115 {"job_name": job["job_name"]},
2116 job,
2117 upsert=True,
2118 fail_on_empty=False,
2119 )
2120
2121 step = "instantiated at VCA"
2122 self.logger.debug(logging_text + step)
2123
2124 self._write_configuration_status(
2125 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2126 )
2127
2128 except Exception as e: # TODO not use Exception but N2VC exception
2129 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2130 if not isinstance(
2131 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2132 ):
2133 self.logger.error(
2134 "Exception while {} : {}".format(step, e), exc_info=True
2135 )
2136 self._write_configuration_status(
2137 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2138 )
2139 raise LcmException("{} {}".format(step, e)) from e
2140
2141 def _write_ns_status(
2142 self,
2143 nsr_id: str,
2144 ns_state: str,
2145 current_operation: str,
2146 current_operation_id: str,
2147 error_description: str = None,
2148 error_detail: str = None,
2149 other_update: dict = None,
2150 ):
2151 """
2152 Update db_nsr fields.
2153 :param nsr_id:
2154 :param ns_state:
2155 :param current_operation:
2156 :param current_operation_id:
2157 :param error_description:
2158 :param error_detail:
2159 :param other_update: Other required changes at database if provided, will be cleared
2160 :return:
2161 """
2162 try:
2163 db_dict = other_update or {}
2164 db_dict[
2165 "_admin.nslcmop"
2166 ] = current_operation_id # for backward compatibility
2167 db_dict["_admin.current-operation"] = current_operation_id
2168 db_dict["_admin.operation-type"] = (
2169 current_operation if current_operation != "IDLE" else None
2170 )
2171 db_dict["currentOperation"] = current_operation
2172 db_dict["currentOperationID"] = current_operation_id
2173 db_dict["errorDescription"] = error_description
2174 db_dict["errorDetail"] = error_detail
2175
2176 if ns_state:
2177 db_dict["nsState"] = ns_state
2178 self.update_db_2("nsrs", nsr_id, db_dict)
2179 except DbException as e:
2180 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2181
2182 def _write_op_status(
2183 self,
2184 op_id: str,
2185 stage: list = None,
2186 error_message: str = None,
2187 queuePosition: int = 0,
2188 operation_state: str = None,
2189 other_update: dict = None,
2190 ):
2191 try:
2192 db_dict = other_update or {}
2193 db_dict["queuePosition"] = queuePosition
2194 if isinstance(stage, list):
2195 db_dict["stage"] = stage[0]
2196 db_dict["detailed-status"] = " ".join(stage)
2197 elif stage is not None:
2198 db_dict["stage"] = str(stage)
2199
2200 if error_message is not None:
2201 db_dict["errorMessage"] = error_message
2202 if operation_state is not None:
2203 db_dict["operationState"] = operation_state
2204 db_dict["statusEnteredTime"] = time()
2205 self.update_db_2("nslcmops", op_id, db_dict)
2206 except DbException as e:
2207 self.logger.warn(
2208 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2209 )
2210
2211 def _write_all_config_status(self, db_nsr: dict, status: str):
2212 try:
2213 nsr_id = db_nsr["_id"]
2214 # configurationStatus
2215 config_status = db_nsr.get("configurationStatus")
2216 if config_status:
2217 db_nsr_update = {
2218 "configurationStatus.{}.status".format(index): status
2219 for index, v in enumerate(config_status)
2220 if v
2221 }
2222 # update status
2223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2224
2225 except DbException as e:
2226 self.logger.warn(
2227 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2228 )
2229
2230 def _write_configuration_status(
2231 self,
2232 nsr_id: str,
2233 vca_index: int,
2234 status: str = None,
2235 element_under_configuration: str = None,
2236 element_type: str = None,
2237 other_update: dict = None,
2238 ):
2239
2240 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2241 # .format(vca_index, status))
2242
2243 try:
2244 db_path = "configurationStatus.{}.".format(vca_index)
2245 db_dict = other_update or {}
2246 if status:
2247 db_dict[db_path + "status"] = status
2248 if element_under_configuration:
2249 db_dict[
2250 db_path + "elementUnderConfiguration"
2251 ] = element_under_configuration
2252 if element_type:
2253 db_dict[db_path + "elementType"] = element_type
2254 self.update_db_2("nsrs", nsr_id, db_dict)
2255 except DbException as e:
2256 self.logger.warn(
2257 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2258 status, nsr_id, vca_index, e
2259 )
2260 )
2261
2262 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2263 """
2264 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2265 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2266 Database is used because the result can be obtained from a different LCM worker in case of HA.
2267 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2268 :param db_nslcmop: database content of nslcmop
2269 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2270 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2271 computed 'vim-account-id'
2272 """
2273 modified = False
2274 nslcmop_id = db_nslcmop["_id"]
2275 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2276 if placement_engine == "PLA":
2277 self.logger.debug(
2278 logging_text + "Invoke and wait for placement optimization"
2279 )
2280 await self.msg.aiowrite(
2281 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2282 )
2283 db_poll_interval = 5
2284 wait = db_poll_interval * 10
2285 pla_result = None
2286 while not pla_result and wait >= 0:
2287 await asyncio.sleep(db_poll_interval)
2288 wait -= db_poll_interval
2289 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2290 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2291
2292 if not pla_result:
2293 raise LcmException(
2294 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2295 )
2296
2297 for pla_vnf in pla_result["vnf"]:
2298 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2299 if not pla_vnf.get("vimAccountId") or not vnfr:
2300 continue
2301 modified = True
2302 self.db.set_one(
2303 "vnfrs",
2304 {"_id": vnfr["_id"]},
2305 {"vim-account-id": pla_vnf["vimAccountId"]},
2306 )
2307 # Modifies db_vnfrs
2308 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2309 return modified
2310
2311 def update_nsrs_with_pla_result(self, params):
2312 try:
2313 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2314 self.update_db_2(
2315 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2316 )
2317 except Exception as e:
2318 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2319
2320 async def instantiate(self, nsr_id, nslcmop_id):
2321 """
2322
2323 :param nsr_id: ns instance to deploy
2324 :param nslcmop_id: operation to run
2325 :return:
2326 """
2327
2328 # Try to lock HA task here
2329 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2330 if not task_is_locked_by_me:
2331 self.logger.debug(
2332 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2333 )
2334 return
2335
2336 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2337 self.logger.debug(logging_text + "Enter")
2338
2339 # get all needed from database
2340
2341 # database nsrs record
2342 db_nsr = None
2343
2344 # database nslcmops record
2345 db_nslcmop = None
2346
2347 # update operation on nsrs
2348 db_nsr_update = {}
2349 # update operation on nslcmops
2350 db_nslcmop_update = {}
2351
2352 nslcmop_operation_state = None
2353 db_vnfrs = {} # vnf's info indexed by member-index
2354 # n2vc_info = {}
2355 tasks_dict_info = {} # from task to info text
2356 exc = None
2357 error_list = []
2358 stage = [
2359 "Stage 1/5: preparation of the environment.",
2360 "Waiting for previous operations to terminate.",
2361 "",
2362 ]
2363 # ^ stage, step, VIM progress
2364 try:
2365 # wait for any previous tasks in process
2366 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2367
2368 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2369 stage[1] = "Reading from database."
2370 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2371 db_nsr_update["detailed-status"] = "creating"
2372 db_nsr_update["operational-status"] = "init"
2373 self._write_ns_status(
2374 nsr_id=nsr_id,
2375 ns_state="BUILDING",
2376 current_operation="INSTANTIATING",
2377 current_operation_id=nslcmop_id,
2378 other_update=db_nsr_update,
2379 )
2380 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2381
2382 # read from db: operation
2383 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2384 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2385 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2386 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2387 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2388 )
2389 ns_params = db_nslcmop.get("operationParams")
2390 if ns_params and ns_params.get("timeout_ns_deploy"):
2391 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2392 else:
2393 timeout_ns_deploy = self.timeout.get(
2394 "ns_deploy", self.timeout_ns_deploy
2395 )
2396
2397 # read from db: ns
2398 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2399 self.logger.debug(logging_text + stage[1])
2400 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2401 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2402 self.logger.debug(logging_text + stage[1])
2403 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2404 self.fs.sync(db_nsr["nsd-id"])
2405 db_nsr["nsd"] = nsd
2406 # nsr_name = db_nsr["name"] # TODO short-name??
2407
2408 # read from db: vnf's of this ns
2409 stage[1] = "Getting vnfrs from db."
2410 self.logger.debug(logging_text + stage[1])
2411 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2412
2413 # read from db: vnfd's for every vnf
2414 db_vnfds = [] # every vnfd data
2415
2416 # for each vnf in ns, read vnfd
2417 for vnfr in db_vnfrs_list:
2418 if vnfr.get("kdur"):
2419 kdur_list = []
2420 for kdur in vnfr["kdur"]:
2421 if kdur.get("additionalParams"):
2422 kdur["additionalParams"] = json.loads(
2423 kdur["additionalParams"]
2424 )
2425 kdur_list.append(kdur)
2426 vnfr["kdur"] = kdur_list
2427
2428 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2429 vnfd_id = vnfr["vnfd-id"]
2430 vnfd_ref = vnfr["vnfd-ref"]
2431 self.fs.sync(vnfd_id)
2432
2433 # if we haven't this vnfd, read it from db
2434 if vnfd_id not in db_vnfds:
2435 # read from db
2436 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2437 vnfd_id, vnfd_ref
2438 )
2439 self.logger.debug(logging_text + stage[1])
2440 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2441
2442 # store vnfd
2443 db_vnfds.append(vnfd)
2444
2445 # Get or generates the _admin.deployed.VCA list
2446 vca_deployed_list = None
2447 if db_nsr["_admin"].get("deployed"):
2448 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2449 if vca_deployed_list is None:
2450 vca_deployed_list = []
2451 configuration_status_list = []
2452 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2453 db_nsr_update["configurationStatus"] = configuration_status_list
2454 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2455 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2456 elif isinstance(vca_deployed_list, dict):
2457 # maintain backward compatibility. Change a dict to list at database
2458 vca_deployed_list = list(vca_deployed_list.values())
2459 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2460 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2461
2462 if not isinstance(
2463 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2464 ):
2465 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2466 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2467
2468 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2469 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2470 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2471 self.db.set_list(
2472 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2473 )
2474
2475 # n2vc_redesign STEP 2 Deploy Network Scenario
2476 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2477 self._write_op_status(op_id=nslcmop_id, stage=stage)
2478
2479 stage[1] = "Deploying KDUs."
2480 # self.logger.debug(logging_text + "Before deploy_kdus")
2481 # Call to deploy_kdus in case exists the "vdu:kdu" param
2482 await self.deploy_kdus(
2483 logging_text=logging_text,
2484 nsr_id=nsr_id,
2485 nslcmop_id=nslcmop_id,
2486 db_vnfrs=db_vnfrs,
2487 db_vnfds=db_vnfds,
2488 task_instantiation_info=tasks_dict_info,
2489 )
2490
2491 stage[1] = "Getting VCA public key."
2492 # n2vc_redesign STEP 1 Get VCA public ssh-key
2493 # feature 1429. Add n2vc public key to needed VMs
2494 n2vc_key = self.n2vc.get_public_key()
2495 n2vc_key_list = [n2vc_key]
2496 if self.vca_config.get("public_key"):
2497 n2vc_key_list.append(self.vca_config["public_key"])
2498
2499 stage[1] = "Deploying NS at VIM."
2500 task_ro = asyncio.ensure_future(
2501 self.instantiate_RO(
2502 logging_text=logging_text,
2503 nsr_id=nsr_id,
2504 nsd=nsd,
2505 db_nsr=db_nsr,
2506 db_nslcmop=db_nslcmop,
2507 db_vnfrs=db_vnfrs,
2508 db_vnfds=db_vnfds,
2509 n2vc_key_list=n2vc_key_list,
2510 stage=stage,
2511 )
2512 )
2513 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2514 tasks_dict_info[task_ro] = "Deploying at VIM"
2515
2516 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2517 stage[1] = "Deploying Execution Environments."
2518 self.logger.debug(logging_text + stage[1])
2519
2520 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2521 for vnf_profile in get_vnf_profiles(nsd):
2522 vnfd_id = vnf_profile["vnfd-id"]
2523 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2524 member_vnf_index = str(vnf_profile["id"])
2525 db_vnfr = db_vnfrs[member_vnf_index]
2526 base_folder = vnfd["_admin"]["storage"]
2527 vdu_id = None
2528 vdu_index = 0
2529 vdu_name = None
2530 kdu_name = None
2531
2532 # Get additional parameters
2533 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2534 if db_vnfr.get("additionalParamsForVnf"):
2535 deploy_params.update(
2536 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2537 )
2538
2539 descriptor_config = get_configuration(vnfd, vnfd["id"])
2540 if descriptor_config:
2541 self._deploy_n2vc(
2542 logging_text=logging_text
2543 + "member_vnf_index={} ".format(member_vnf_index),
2544 db_nsr=db_nsr,
2545 db_vnfr=db_vnfr,
2546 nslcmop_id=nslcmop_id,
2547 nsr_id=nsr_id,
2548 nsi_id=nsi_id,
2549 vnfd_id=vnfd_id,
2550 vdu_id=vdu_id,
2551 kdu_name=kdu_name,
2552 member_vnf_index=member_vnf_index,
2553 vdu_index=vdu_index,
2554 vdu_name=vdu_name,
2555 deploy_params=deploy_params,
2556 descriptor_config=descriptor_config,
2557 base_folder=base_folder,
2558 task_instantiation_info=tasks_dict_info,
2559 stage=stage,
2560 )
2561
2562 # Deploy charms for each VDU that supports one.
2563 for vdud in get_vdu_list(vnfd):
2564 vdu_id = vdud["id"]
2565 descriptor_config = get_configuration(vnfd, vdu_id)
2566 vdur = find_in_list(
2567 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2568 )
2569
2570 if vdur.get("additionalParams"):
2571 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2572 else:
2573 deploy_params_vdu = deploy_params
2574 deploy_params_vdu["OSM"] = get_osm_params(
2575 db_vnfr, vdu_id, vdu_count_index=0
2576 )
2577 vdud_count = get_number_of_instances(vnfd, vdu_id)
2578
2579 self.logger.debug("VDUD > {}".format(vdud))
2580 self.logger.debug(
2581 "Descriptor config > {}".format(descriptor_config)
2582 )
2583 if descriptor_config:
2584 vdu_name = None
2585 kdu_name = None
2586 for vdu_index in range(vdud_count):
2587 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2588 self._deploy_n2vc(
2589 logging_text=logging_text
2590 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2591 member_vnf_index, vdu_id, vdu_index
2592 ),
2593 db_nsr=db_nsr,
2594 db_vnfr=db_vnfr,
2595 nslcmop_id=nslcmop_id,
2596 nsr_id=nsr_id,
2597 nsi_id=nsi_id,
2598 vnfd_id=vnfd_id,
2599 vdu_id=vdu_id,
2600 kdu_name=kdu_name,
2601 member_vnf_index=member_vnf_index,
2602 vdu_index=vdu_index,
2603 vdu_name=vdu_name,
2604 deploy_params=deploy_params_vdu,
2605 descriptor_config=descriptor_config,
2606 base_folder=base_folder,
2607 task_instantiation_info=tasks_dict_info,
2608 stage=stage,
2609 )
2610 for kdud in get_kdu_list(vnfd):
2611 kdu_name = kdud["name"]
2612 descriptor_config = get_configuration(vnfd, kdu_name)
2613 if descriptor_config:
2614 vdu_id = None
2615 vdu_index = 0
2616 vdu_name = None
2617 kdur = next(
2618 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2619 )
2620 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2621 if kdur.get("additionalParams"):
2622 deploy_params_kdu.update(
2623 parse_yaml_strings(kdur["additionalParams"].copy())
2624 )
2625
2626 self._deploy_n2vc(
2627 logging_text=logging_text,
2628 db_nsr=db_nsr,
2629 db_vnfr=db_vnfr,
2630 nslcmop_id=nslcmop_id,
2631 nsr_id=nsr_id,
2632 nsi_id=nsi_id,
2633 vnfd_id=vnfd_id,
2634 vdu_id=vdu_id,
2635 kdu_name=kdu_name,
2636 member_vnf_index=member_vnf_index,
2637 vdu_index=vdu_index,
2638 vdu_name=vdu_name,
2639 deploy_params=deploy_params_kdu,
2640 descriptor_config=descriptor_config,
2641 base_folder=base_folder,
2642 task_instantiation_info=tasks_dict_info,
2643 stage=stage,
2644 )
2645
2646 # Check if this NS has a charm configuration
2647 descriptor_config = nsd.get("ns-configuration")
2648 if descriptor_config and descriptor_config.get("juju"):
2649 vnfd_id = None
2650 db_vnfr = None
2651 member_vnf_index = None
2652 vdu_id = None
2653 kdu_name = None
2654 vdu_index = 0
2655 vdu_name = None
2656
2657 # Get additional parameters
2658 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2659 if db_nsr.get("additionalParamsForNs"):
2660 deploy_params.update(
2661 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2662 )
2663 base_folder = nsd["_admin"]["storage"]
2664 self._deploy_n2vc(
2665 logging_text=logging_text,
2666 db_nsr=db_nsr,
2667 db_vnfr=db_vnfr,
2668 nslcmop_id=nslcmop_id,
2669 nsr_id=nsr_id,
2670 nsi_id=nsi_id,
2671 vnfd_id=vnfd_id,
2672 vdu_id=vdu_id,
2673 kdu_name=kdu_name,
2674 member_vnf_index=member_vnf_index,
2675 vdu_index=vdu_index,
2676 vdu_name=vdu_name,
2677 deploy_params=deploy_params,
2678 descriptor_config=descriptor_config,
2679 base_folder=base_folder,
2680 task_instantiation_info=tasks_dict_info,
2681 stage=stage,
2682 )
2683
2684 # rest of staff will be done at finally
2685
2686 except (
2687 ROclient.ROClientException,
2688 DbException,
2689 LcmException,
2690 N2VCException,
2691 ) as e:
2692 self.logger.error(
2693 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2694 )
2695 exc = e
2696 except asyncio.CancelledError:
2697 self.logger.error(
2698 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2699 )
2700 exc = "Operation was cancelled"
2701 except Exception as e:
2702 exc = traceback.format_exc()
2703 self.logger.critical(
2704 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2705 exc_info=True,
2706 )
2707 finally:
2708 if exc:
2709 error_list.append(str(exc))
2710 try:
2711 # wait for pending tasks
2712 if tasks_dict_info:
2713 stage[1] = "Waiting for instantiate pending tasks."
2714 self.logger.debug(logging_text + stage[1])
2715 error_list += await self._wait_for_tasks(
2716 logging_text,
2717 tasks_dict_info,
2718 timeout_ns_deploy,
2719 stage,
2720 nslcmop_id,
2721 nsr_id=nsr_id,
2722 )
2723 stage[1] = stage[2] = ""
2724 except asyncio.CancelledError:
2725 error_list.append("Cancelled")
2726 # TODO cancel all tasks
2727 except Exception as exc:
2728 error_list.append(str(exc))
2729
2730 # update operation-status
2731 db_nsr_update["operational-status"] = "running"
2732 # let's begin with VCA 'configured' status (later we can change it)
2733 db_nsr_update["config-status"] = "configured"
2734 for task, task_name in tasks_dict_info.items():
2735 if not task.done() or task.cancelled() or task.exception():
2736 if task_name.startswith(self.task_name_deploy_vca):
2737 # A N2VC task is pending
2738 db_nsr_update["config-status"] = "failed"
2739 else:
2740 # RO or KDU task is pending
2741 db_nsr_update["operational-status"] = "failed"
2742
2743 # update status at database
2744 if error_list:
2745 error_detail = ". ".join(error_list)
2746 self.logger.error(logging_text + error_detail)
2747 error_description_nslcmop = "{} Detail: {}".format(
2748 stage[0], error_detail
2749 )
2750 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2751 nslcmop_id, stage[0]
2752 )
2753
2754 db_nsr_update["detailed-status"] = (
2755 error_description_nsr + " Detail: " + error_detail
2756 )
2757 db_nslcmop_update["detailed-status"] = error_detail
2758 nslcmop_operation_state = "FAILED"
2759 ns_state = "BROKEN"
2760 else:
2761 error_detail = None
2762 error_description_nsr = error_description_nslcmop = None
2763 ns_state = "READY"
2764 db_nsr_update["detailed-status"] = "Done"
2765 db_nslcmop_update["detailed-status"] = "Done"
2766 nslcmop_operation_state = "COMPLETED"
2767
2768 if db_nsr:
2769 self._write_ns_status(
2770 nsr_id=nsr_id,
2771 ns_state=ns_state,
2772 current_operation="IDLE",
2773 current_operation_id=None,
2774 error_description=error_description_nsr,
2775 error_detail=error_detail,
2776 other_update=db_nsr_update,
2777 )
2778 self._write_op_status(
2779 op_id=nslcmop_id,
2780 stage="",
2781 error_message=error_description_nslcmop,
2782 operation_state=nslcmop_operation_state,
2783 other_update=db_nslcmop_update,
2784 )
2785
2786 if nslcmop_operation_state:
2787 try:
2788 await self.msg.aiowrite(
2789 "ns",
2790 "instantiated",
2791 {
2792 "nsr_id": nsr_id,
2793 "nslcmop_id": nslcmop_id,
2794 "operationState": nslcmop_operation_state,
2795 },
2796 loop=self.loop,
2797 )
2798 except Exception as e:
2799 self.logger.error(
2800 logging_text + "kafka_write notification Exception {}".format(e)
2801 )
2802
2803 self.logger.debug(logging_text + "Exit")
2804 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2805
2806 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2807 if vnfd_id not in cached_vnfds:
2808 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2809 return cached_vnfds[vnfd_id]
2810
2811 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2812 if vnf_profile_id not in cached_vnfrs:
2813 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2814 "vnfrs",
2815 {
2816 "member-vnf-index-ref": vnf_profile_id,
2817 "nsr-id-ref": nsr_id,
2818 },
2819 )
2820 return cached_vnfrs[vnf_profile_id]
2821
2822 def _is_deployed_vca_in_relation(
2823 self, vca: DeployedVCA, relation: Relation
2824 ) -> bool:
2825 found = False
2826 for endpoint in (relation.provider, relation.requirer):
2827 if endpoint["kdu-resource-profile-id"]:
2828 continue
2829 found = (
2830 vca.vnf_profile_id == endpoint.vnf_profile_id
2831 and vca.vdu_profile_id == endpoint.vdu_profile_id
2832 and vca.execution_environment_ref == endpoint.execution_environment_ref
2833 )
2834 if found:
2835 break
2836 return found
2837
2838 def _update_ee_relation_data_with_implicit_data(
2839 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2840 ):
2841 ee_relation_data = safe_get_ee_relation(
2842 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2843 )
2844 ee_relation_level = EELevel.get_level(ee_relation_data)
2845 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2846 "execution-environment-ref"
2847 ]:
2848 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2849 vnfd_id = vnf_profile["vnfd-id"]
2850 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2851 entity_id = (
2852 vnfd_id
2853 if ee_relation_level == EELevel.VNF
2854 else ee_relation_data["vdu-profile-id"]
2855 )
2856 ee = get_juju_ee_ref(db_vnfd, entity_id)
2857 if not ee:
2858 raise Exception(
2859 f"not execution environments found for ee_relation {ee_relation_data}"
2860 )
2861 ee_relation_data["execution-environment-ref"] = ee["id"]
2862 return ee_relation_data
2863
2864 def _get_ns_relations(
2865 self,
2866 nsr_id: str,
2867 nsd: Dict[str, Any],
2868 vca: DeployedVCA,
2869 cached_vnfds: Dict[str, Any],
2870 ) -> List[Relation]:
2871 relations = []
2872 db_ns_relations = get_ns_configuration_relation_list(nsd)
2873 for r in db_ns_relations:
2874 provider_dict = None
2875 requirer_dict = None
2876 if all(key in r for key in ("provider", "requirer")):
2877 provider_dict = r["provider"]
2878 requirer_dict = r["requirer"]
2879 elif "entities" in r:
2880 provider_id = r["entities"][0]["id"]
2881 provider_dict = {
2882 "nsr-id": nsr_id,
2883 "endpoint": r["entities"][0]["endpoint"],
2884 }
2885 if provider_id != nsd["id"]:
2886 provider_dict["vnf-profile-id"] = provider_id
2887 requirer_id = r["entities"][1]["id"]
2888 requirer_dict = {
2889 "nsr-id": nsr_id,
2890 "endpoint": r["entities"][1]["endpoint"],
2891 }
2892 if requirer_id != nsd["id"]:
2893 requirer_dict["vnf-profile-id"] = requirer_id
2894 else:
2895 raise Exception(
2896 "provider/requirer or entities must be included in the relation."
2897 )
2898 relation_provider = self._update_ee_relation_data_with_implicit_data(
2899 nsr_id, nsd, provider_dict, cached_vnfds
2900 )
2901 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2902 nsr_id, nsd, requirer_dict, cached_vnfds
2903 )
2904 provider = EERelation(relation_provider)
2905 requirer = EERelation(relation_requirer)
2906 relation = Relation(r["name"], provider, requirer)
2907 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2908 if vca_in_relation:
2909 relations.append(relation)
2910 return relations
2911
2912 def _get_vnf_relations(
2913 self,
2914 nsr_id: str,
2915 nsd: Dict[str, Any],
2916 vca: DeployedVCA,
2917 cached_vnfds: Dict[str, Any],
2918 ) -> List[Relation]:
2919 relations = []
2920 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2921 vnf_profile_id = vnf_profile["id"]
2922 vnfd_id = vnf_profile["vnfd-id"]
2923 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2924 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2925 for r in db_vnf_relations:
2926 provider_dict = None
2927 requirer_dict = None
2928 if all(key in r for key in ("provider", "requirer")):
2929 provider_dict = r["provider"]
2930 requirer_dict = r["requirer"]
2931 elif "entities" in r:
2932 provider_id = r["entities"][0]["id"]
2933 provider_dict = {
2934 "nsr-id": nsr_id,
2935 "vnf-profile-id": vnf_profile_id,
2936 "endpoint": r["entities"][0]["endpoint"],
2937 }
2938 if provider_id != vnfd_id:
2939 provider_dict["vdu-profile-id"] = provider_id
2940 requirer_id = r["entities"][1]["id"]
2941 requirer_dict = {
2942 "nsr-id": nsr_id,
2943 "vnf-profile-id": vnf_profile_id,
2944 "endpoint": r["entities"][1]["endpoint"],
2945 }
2946 if requirer_id != vnfd_id:
2947 requirer_dict["vdu-profile-id"] = requirer_id
2948 else:
2949 raise Exception(
2950 "provider/requirer or entities must be included in the relation."
2951 )
2952 relation_provider = self._update_ee_relation_data_with_implicit_data(
2953 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2954 )
2955 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2956 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2957 )
2958 provider = EERelation(relation_provider)
2959 requirer = EERelation(relation_requirer)
2960 relation = Relation(r["name"], provider, requirer)
2961 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2962 if vca_in_relation:
2963 relations.append(relation)
2964 return relations
2965
2966 def _get_kdu_resource_data(
2967 self,
2968 ee_relation: EERelation,
2969 db_nsr: Dict[str, Any],
2970 cached_vnfds: Dict[str, Any],
2971 ) -> DeployedK8sResource:
2972 nsd = get_nsd(db_nsr)
2973 vnf_profiles = get_vnf_profiles(nsd)
2974 vnfd_id = find_in_list(
2975 vnf_profiles,
2976 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2977 )["vnfd-id"]
2978 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2979 kdu_resource_profile = get_kdu_resource_profile(
2980 db_vnfd, ee_relation.kdu_resource_profile_id
2981 )
2982 kdu_name = kdu_resource_profile["kdu-name"]
2983 deployed_kdu, _ = get_deployed_kdu(
2984 db_nsr.get("_admin", ()).get("deployed", ()),
2985 kdu_name,
2986 ee_relation.vnf_profile_id,
2987 )
2988 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2989 return deployed_kdu
2990
2991 def _get_deployed_component(
2992 self,
2993 ee_relation: EERelation,
2994 db_nsr: Dict[str, Any],
2995 cached_vnfds: Dict[str, Any],
2996 ) -> DeployedComponent:
2997 nsr_id = db_nsr["_id"]
2998 deployed_component = None
2999 ee_level = EELevel.get_level(ee_relation)
3000 if ee_level == EELevel.NS:
3001 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
3002 if vca:
3003 deployed_component = DeployedVCA(nsr_id, vca)
3004 elif ee_level == EELevel.VNF:
3005 vca = get_deployed_vca(
3006 db_nsr,
3007 {
3008 "vdu_id": None,
3009 "member-vnf-index": ee_relation.vnf_profile_id,
3010 "ee_descriptor_id": ee_relation.execution_environment_ref,
3011 },
3012 )
3013 if vca:
3014 deployed_component = DeployedVCA(nsr_id, vca)
3015 elif ee_level == EELevel.VDU:
3016 vca = get_deployed_vca(
3017 db_nsr,
3018 {
3019 "vdu_id": ee_relation.vdu_profile_id,
3020 "member-vnf-index": ee_relation.vnf_profile_id,
3021 "ee_descriptor_id": ee_relation.execution_environment_ref,
3022 },
3023 )
3024 if vca:
3025 deployed_component = DeployedVCA(nsr_id, vca)
3026 elif ee_level == EELevel.KDU:
3027 kdu_resource_data = self._get_kdu_resource_data(
3028 ee_relation, db_nsr, cached_vnfds
3029 )
3030 if kdu_resource_data:
3031 deployed_component = DeployedK8sResource(kdu_resource_data)
3032 return deployed_component
3033
3034 async def _add_relation(
3035 self,
3036 relation: Relation,
3037 vca_type: str,
3038 db_nsr: Dict[str, Any],
3039 cached_vnfds: Dict[str, Any],
3040 cached_vnfrs: Dict[str, Any],
3041 ) -> bool:
3042 deployed_provider = self._get_deployed_component(
3043 relation.provider, db_nsr, cached_vnfds
3044 )
3045 deployed_requirer = self._get_deployed_component(
3046 relation.requirer, db_nsr, cached_vnfds
3047 )
3048 if (
3049 deployed_provider
3050 and deployed_requirer
3051 and deployed_provider.config_sw_installed
3052 and deployed_requirer.config_sw_installed
3053 ):
3054 provider_db_vnfr = (
3055 self._get_vnfr(
3056 relation.provider.nsr_id,
3057 relation.provider.vnf_profile_id,
3058 cached_vnfrs,
3059 )
3060 if relation.provider.vnf_profile_id
3061 else None
3062 )
3063 requirer_db_vnfr = (
3064 self._get_vnfr(
3065 relation.requirer.nsr_id,
3066 relation.requirer.vnf_profile_id,
3067 cached_vnfrs,
3068 )
3069 if relation.requirer.vnf_profile_id
3070 else None
3071 )
3072 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3073 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3074 provider_relation_endpoint = RelationEndpoint(
3075 deployed_provider.ee_id,
3076 provider_vca_id,
3077 relation.provider.endpoint,
3078 )
3079 requirer_relation_endpoint = RelationEndpoint(
3080 deployed_requirer.ee_id,
3081 requirer_vca_id,
3082 relation.requirer.endpoint,
3083 )
3084 await self.vca_map[vca_type].add_relation(
3085 provider=provider_relation_endpoint,
3086 requirer=requirer_relation_endpoint,
3087 )
3088 # remove entry from relations list
3089 return True
3090 return False
3091
3092 async def _add_vca_relations(
3093 self,
3094 logging_text,
3095 nsr_id,
3096 vca_type: str,
3097 vca_index: int,
3098 timeout: int = 3600,
3099 ) -> bool:
3100
3101 # steps:
3102 # 1. find all relations for this VCA
3103 # 2. wait for other peers related
3104 # 3. add relations
3105
3106 try:
3107 # STEP 1: find all relations for this VCA
3108
3109 # read nsr record
3110 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3111 nsd = get_nsd(db_nsr)
3112
3113 # this VCA data
3114 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3115 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3116
3117 cached_vnfds = {}
3118 cached_vnfrs = {}
3119 relations = []
3120 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3121 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3122
3123 # if no relations, terminate
3124 if not relations:
3125 self.logger.debug(logging_text + " No relations")
3126 return True
3127
3128 self.logger.debug(logging_text + " adding relations {}".format(relations))
3129
3130 # add all relations
3131 start = time()
3132 while True:
3133 # check timeout
3134 now = time()
3135 if now - start >= timeout:
3136 self.logger.error(logging_text + " : timeout adding relations")
3137 return False
3138
3139 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3140 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3141
3142 # for each relation, find the VCA's related
3143 for relation in relations.copy():
3144 added = await self._add_relation(
3145 relation,
3146 vca_type,
3147 db_nsr,
3148 cached_vnfds,
3149 cached_vnfrs,
3150 )
3151 if added:
3152 relations.remove(relation)
3153
3154 if not relations:
3155 self.logger.debug("Relations added")
3156 break
3157 await asyncio.sleep(5.0)
3158
3159 return True
3160
3161 except Exception as e:
3162 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3163 return False
3164
3165 async def _install_kdu(
3166 self,
3167 nsr_id: str,
3168 nsr_db_path: str,
3169 vnfr_data: dict,
3170 kdu_index: int,
3171 kdud: dict,
3172 vnfd: dict,
3173 k8s_instance_info: dict,
3174 k8params: dict = None,
3175 timeout: int = 600,
3176 vca_id: str = None,
3177 ):
3178
3179 try:
3180 k8sclustertype = k8s_instance_info["k8scluster-type"]
3181 # Instantiate kdu
3182 db_dict_install = {
3183 "collection": "nsrs",
3184 "filter": {"_id": nsr_id},
3185 "path": nsr_db_path,
3186 }
3187
3188 if k8s_instance_info.get("kdu-deployment-name"):
3189 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3190 else:
3191 kdu_instance = self.k8scluster_map[
3192 k8sclustertype
3193 ].generate_kdu_instance_name(
3194 db_dict=db_dict_install,
3195 kdu_model=k8s_instance_info["kdu-model"],
3196 kdu_name=k8s_instance_info["kdu-name"],
3197 )
3198 self.update_db_2(
3199 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3200 )
3201 await self.k8scluster_map[k8sclustertype].install(
3202 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3203 kdu_model=k8s_instance_info["kdu-model"],
3204 atomic=True,
3205 params=k8params,
3206 db_dict=db_dict_install,
3207 timeout=timeout,
3208 kdu_name=k8s_instance_info["kdu-name"],
3209 namespace=k8s_instance_info["namespace"],
3210 kdu_instance=kdu_instance,
3211 vca_id=vca_id,
3212 )
3213 self.update_db_2(
3214 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3215 )
3216
3217 # Obtain services to obtain management service ip
3218 services = await self.k8scluster_map[k8sclustertype].get_services(
3219 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3220 kdu_instance=kdu_instance,
3221 namespace=k8s_instance_info["namespace"],
3222 )
3223
3224 # Obtain management service info (if exists)
3225 vnfr_update_dict = {}
3226 kdu_config = get_configuration(vnfd, kdud["name"])
3227 if kdu_config:
3228 target_ee_list = kdu_config.get("execution-environment-list", [])
3229 else:
3230 target_ee_list = []
3231
3232 if services:
3233 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3234 mgmt_services = [
3235 service
3236 for service in kdud.get("service", [])
3237 if service.get("mgmt-service")
3238 ]
3239 for mgmt_service in mgmt_services:
3240 for service in services:
3241 if service["name"].startswith(mgmt_service["name"]):
3242 # Mgmt service found, Obtain service ip
3243 ip = service.get("external_ip", service.get("cluster_ip"))
3244 if isinstance(ip, list) and len(ip) == 1:
3245 ip = ip[0]
3246
3247 vnfr_update_dict[
3248 "kdur.{}.ip-address".format(kdu_index)
3249 ] = ip
3250
3251 # Check if must update also mgmt ip at the vnf
3252 service_external_cp = mgmt_service.get(
3253 "external-connection-point-ref"
3254 )
3255 if service_external_cp:
3256 if (
3257 deep_get(vnfd, ("mgmt-interface", "cp"))
3258 == service_external_cp
3259 ):
3260 vnfr_update_dict["ip-address"] = ip
3261
3262 if find_in_list(
3263 target_ee_list,
3264 lambda ee: ee.get(
3265 "external-connection-point-ref", ""
3266 )
3267 == service_external_cp,
3268 ):
3269 vnfr_update_dict[
3270 "kdur.{}.ip-address".format(kdu_index)
3271 ] = ip
3272 break
3273 else:
3274 self.logger.warn(
3275 "Mgmt service name: {} not found".format(
3276 mgmt_service["name"]
3277 )
3278 )
3279
3280 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3281 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3282
3283 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3284 if (
3285 kdu_config
3286 and kdu_config.get("initial-config-primitive")
3287 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3288 ):
3289 initial_config_primitive_list = kdu_config.get(
3290 "initial-config-primitive"
3291 )
3292 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3293
3294 for initial_config_primitive in initial_config_primitive_list:
3295 primitive_params_ = self._map_primitive_params(
3296 initial_config_primitive, {}, {}
3297 )
3298
3299 await asyncio.wait_for(
3300 self.k8scluster_map[k8sclustertype].exec_primitive(
3301 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3302 kdu_instance=kdu_instance,
3303 primitive_name=initial_config_primitive["name"],
3304 params=primitive_params_,
3305 db_dict=db_dict_install,
3306 vca_id=vca_id,
3307 ),
3308 timeout=timeout,
3309 )
3310
3311 except Exception as e:
3312 # Prepare update db with error and raise exception
3313 try:
3314 self.update_db_2(
3315 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3316 )
3317 self.update_db_2(
3318 "vnfrs",
3319 vnfr_data.get("_id"),
3320 {"kdur.{}.status".format(kdu_index): "ERROR"},
3321 )
3322 except Exception:
3323 # ignore to keep original exception
3324 pass
3325 # reraise original error
3326 raise
3327
3328 return kdu_instance
3329
3330 async def deploy_kdus(
3331 self,
3332 logging_text,
3333 nsr_id,
3334 nslcmop_id,
3335 db_vnfrs,
3336 db_vnfds,
3337 task_instantiation_info,
3338 ):
3339 # Launch kdus if present in the descriptor
3340
3341 k8scluster_id_2_uuic = {
3342 "helm-chart-v3": {},
3343 "helm-chart": {},
3344 "juju-bundle": {},
3345 }
3346
3347 async def _get_cluster_id(cluster_id, cluster_type):
3348 nonlocal k8scluster_id_2_uuic
3349 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3350 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3351
3352 # check if K8scluster is creating and wait look if previous tasks in process
3353 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3354 "k8scluster", cluster_id
3355 )
3356 if task_dependency:
3357 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3358 task_name, cluster_id
3359 )
3360 self.logger.debug(logging_text + text)
3361 await asyncio.wait(task_dependency, timeout=3600)
3362
3363 db_k8scluster = self.db.get_one(
3364 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3365 )
3366 if not db_k8scluster:
3367 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3368
3369 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3370 if not k8s_id:
3371 if cluster_type == "helm-chart-v3":
3372 try:
3373 # backward compatibility for existing clusters that have not been initialized for helm v3
3374 k8s_credentials = yaml.safe_dump(
3375 db_k8scluster.get("credentials")
3376 )
3377 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3378 k8s_credentials, reuse_cluster_uuid=cluster_id
3379 )
3380 db_k8scluster_update = {}
3381 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3382 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3383 db_k8scluster_update[
3384 "_admin.helm-chart-v3.created"
3385 ] = uninstall_sw
3386 db_k8scluster_update[
3387 "_admin.helm-chart-v3.operationalState"
3388 ] = "ENABLED"
3389 self.update_db_2(
3390 "k8sclusters", cluster_id, db_k8scluster_update
3391 )
3392 except Exception as e:
3393 self.logger.error(
3394 logging_text
3395 + "error initializing helm-v3 cluster: {}".format(str(e))
3396 )
3397 raise LcmException(
3398 "K8s cluster '{}' has not been initialized for '{}'".format(
3399 cluster_id, cluster_type
3400 )
3401 )
3402 else:
3403 raise LcmException(
3404 "K8s cluster '{}' has not been initialized for '{}'".format(
3405 cluster_id, cluster_type
3406 )
3407 )
3408 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3409 return k8s_id
3410
3411 logging_text += "Deploy kdus: "
3412 step = ""
3413 try:
3414 db_nsr_update = {"_admin.deployed.K8s": []}
3415 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3416
3417 index = 0
3418 updated_cluster_list = []
3419 updated_v3_cluster_list = []
3420
3421 for vnfr_data in db_vnfrs.values():
3422 vca_id = self.get_vca_id(vnfr_data, {})
3423 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3424 # Step 0: Prepare and set parameters
3425 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3426 vnfd_id = vnfr_data.get("vnfd-id")
3427 vnfd_with_id = find_in_list(
3428 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3429 )
3430 kdud = next(
3431 kdud
3432 for kdud in vnfd_with_id["kdu"]
3433 if kdud["name"] == kdur["kdu-name"]
3434 )
3435 namespace = kdur.get("k8s-namespace")
3436 kdu_deployment_name = kdur.get("kdu-deployment-name")
3437 if kdur.get("helm-chart"):
3438 kdumodel = kdur["helm-chart"]
3439 # Default version: helm3, if helm-version is v2 assign v2
3440 k8sclustertype = "helm-chart-v3"
3441 self.logger.debug("kdur: {}".format(kdur))
3442 if (
3443 kdur.get("helm-version")
3444 and kdur.get("helm-version") == "v2"
3445 ):
3446 k8sclustertype = "helm-chart"
3447 elif kdur.get("juju-bundle"):
3448 kdumodel = kdur["juju-bundle"]
3449 k8sclustertype = "juju-bundle"
3450 else:
3451 raise LcmException(
3452 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3453 "juju-bundle. Maybe an old NBI version is running".format(
3454 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3455 )
3456 )
3457 # check if kdumodel is a file and exists
3458 try:
3459 vnfd_with_id = find_in_list(
3460 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3461 )
3462 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3463 if storage: # may be not present if vnfd has not artifacts
3464 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3465 if storage["pkg-dir"]:
3466 filename = "{}/{}/{}s/{}".format(
3467 storage["folder"],
3468 storage["pkg-dir"],
3469 k8sclustertype,
3470 kdumodel,
3471 )
3472 else:
3473 filename = "{}/Scripts/{}s/{}".format(
3474 storage["folder"],
3475 k8sclustertype,
3476 kdumodel,
3477 )
3478 if self.fs.file_exists(
3479 filename, mode="file"
3480 ) or self.fs.file_exists(filename, mode="dir"):
3481 kdumodel = self.fs.path + filename
3482 except (asyncio.TimeoutError, asyncio.CancelledError):
3483 raise
3484 except Exception: # it is not a file
3485 pass
3486
3487 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3488 step = "Synchronize repos for k8s cluster '{}'".format(
3489 k8s_cluster_id
3490 )
3491 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3492
3493 # Synchronize repos
3494 if (
3495 k8sclustertype == "helm-chart"
3496 and cluster_uuid not in updated_cluster_list
3497 ) or (
3498 k8sclustertype == "helm-chart-v3"
3499 and cluster_uuid not in updated_v3_cluster_list
3500 ):
3501 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3502 self.k8scluster_map[k8sclustertype].synchronize_repos(
3503 cluster_uuid=cluster_uuid
3504 )
3505 )
3506 if del_repo_list or added_repo_dict:
3507 if k8sclustertype == "helm-chart":
3508 unset = {
3509 "_admin.helm_charts_added." + item: None
3510 for item in del_repo_list
3511 }
3512 updated = {
3513 "_admin.helm_charts_added." + item: name
3514 for item, name in added_repo_dict.items()
3515 }
3516 updated_cluster_list.append(cluster_uuid)
3517 elif k8sclustertype == "helm-chart-v3":
3518 unset = {
3519 "_admin.helm_charts_v3_added." + item: None
3520 for item in del_repo_list
3521 }
3522 updated = {
3523 "_admin.helm_charts_v3_added." + item: name
3524 for item, name in added_repo_dict.items()
3525 }
3526 updated_v3_cluster_list.append(cluster_uuid)
3527 self.logger.debug(
3528 logging_text + "repos synchronized on k8s cluster "
3529 "'{}' to_delete: {}, to_add: {}".format(
3530 k8s_cluster_id, del_repo_list, added_repo_dict
3531 )
3532 )
3533 self.db.set_one(
3534 "k8sclusters",
3535 {"_id": k8s_cluster_id},
3536 updated,
3537 unset=unset,
3538 )
3539
3540 # Instantiate kdu
3541 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3542 vnfr_data["member-vnf-index-ref"],
3543 kdur["kdu-name"],
3544 k8s_cluster_id,
3545 )
3546 k8s_instance_info = {
3547 "kdu-instance": None,
3548 "k8scluster-uuid": cluster_uuid,
3549 "k8scluster-type": k8sclustertype,
3550 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3551 "kdu-name": kdur["kdu-name"],
3552 "kdu-model": kdumodel,
3553 "namespace": namespace,
3554 "kdu-deployment-name": kdu_deployment_name,
3555 }
3556 db_path = "_admin.deployed.K8s.{}".format(index)
3557 db_nsr_update[db_path] = k8s_instance_info
3558 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3559 vnfd_with_id = find_in_list(
3560 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3561 )
3562 task = asyncio.ensure_future(
3563 self._install_kdu(
3564 nsr_id,
3565 db_path,
3566 vnfr_data,
3567 kdu_index,
3568 kdud,
3569 vnfd_with_id,
3570 k8s_instance_info,
3571 k8params=desc_params,
3572 timeout=600,
3573 vca_id=vca_id,
3574 )
3575 )
3576 self.lcm_tasks.register(
3577 "ns",
3578 nsr_id,
3579 nslcmop_id,
3580 "instantiate_KDU-{}".format(index),
3581 task,
3582 )
3583 task_instantiation_info[task] = "Deploying KDU {}".format(
3584 kdur["kdu-name"]
3585 )
3586
3587 index += 1
3588
3589 except (LcmException, asyncio.CancelledError):
3590 raise
3591 except Exception as e:
3592 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3593 if isinstance(e, (N2VCException, DbException)):
3594 self.logger.error(logging_text + msg)
3595 else:
3596 self.logger.critical(logging_text + msg, exc_info=True)
3597 raise LcmException(msg)
3598 finally:
3599 if db_nsr_update:
3600 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3601
3602 def _deploy_n2vc(
3603 self,
3604 logging_text,
3605 db_nsr,
3606 db_vnfr,
3607 nslcmop_id,
3608 nsr_id,
3609 nsi_id,
3610 vnfd_id,
3611 vdu_id,
3612 kdu_name,
3613 member_vnf_index,
3614 vdu_index,
3615 vdu_name,
3616 deploy_params,
3617 descriptor_config,
3618 base_folder,
3619 task_instantiation_info,
3620 stage,
3621 ):
3622 # launch instantiate_N2VC in a asyncio task and register task object
3623 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3624 # if not found, create one entry and update database
3625 # fill db_nsr._admin.deployed.VCA.<index>
3626
3627 self.logger.debug(
3628 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3629 )
3630 if "execution-environment-list" in descriptor_config:
3631 ee_list = descriptor_config.get("execution-environment-list", [])
3632 elif "juju" in descriptor_config:
3633 ee_list = [descriptor_config] # ns charms
3634 else: # other types as script are not supported
3635 ee_list = []
3636
3637 for ee_item in ee_list:
3638 self.logger.debug(
3639 logging_text
3640 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3641 ee_item.get("juju"), ee_item.get("helm-chart")
3642 )
3643 )
3644 ee_descriptor_id = ee_item.get("id")
3645 if ee_item.get("juju"):
3646 vca_name = ee_item["juju"].get("charm")
3647 vca_type = (
3648 "lxc_proxy_charm"
3649 if ee_item["juju"].get("charm") is not None
3650 else "native_charm"
3651 )
3652 if ee_item["juju"].get("cloud") == "k8s":
3653 vca_type = "k8s_proxy_charm"
3654 elif ee_item["juju"].get("proxy") is False:
3655 vca_type = "native_charm"
3656 elif ee_item.get("helm-chart"):
3657 vca_name = ee_item["helm-chart"]
3658 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3659 vca_type = "helm"
3660 else:
3661 vca_type = "helm-v3"
3662 else:
3663 self.logger.debug(
3664 logging_text + "skipping non juju neither charm configuration"
3665 )
3666 continue
3667
3668 vca_index = -1
3669 for vca_index, vca_deployed in enumerate(
3670 db_nsr["_admin"]["deployed"]["VCA"]
3671 ):
3672 if not vca_deployed:
3673 continue
3674 if (
3675 vca_deployed.get("member-vnf-index") == member_vnf_index
3676 and vca_deployed.get("vdu_id") == vdu_id
3677 and vca_deployed.get("kdu_name") == kdu_name
3678 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3679 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3680 ):
3681 break
3682 else:
3683 # not found, create one.
3684 target = (
3685 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3686 )
3687 if vdu_id:
3688 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3689 elif kdu_name:
3690 target += "/kdu/{}".format(kdu_name)
3691 vca_deployed = {
3692 "target_element": target,
3693 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3694 "member-vnf-index": member_vnf_index,
3695 "vdu_id": vdu_id,
3696 "kdu_name": kdu_name,
3697 "vdu_count_index": vdu_index,
3698 "operational-status": "init", # TODO revise
3699 "detailed-status": "", # TODO revise
3700 "step": "initial-deploy", # TODO revise
3701 "vnfd_id": vnfd_id,
3702 "vdu_name": vdu_name,
3703 "type": vca_type,
3704 "ee_descriptor_id": ee_descriptor_id,
3705 }
3706 vca_index += 1
3707
3708 # create VCA and configurationStatus in db
3709 db_dict = {
3710 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3711 "configurationStatus.{}".format(vca_index): dict(),
3712 }
3713 self.update_db_2("nsrs", nsr_id, db_dict)
3714
3715 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3716
3717 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3718 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3719 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3720
3721 # Launch task
3722 task_n2vc = asyncio.ensure_future(
3723 self.instantiate_N2VC(
3724 logging_text=logging_text,
3725 vca_index=vca_index,
3726 nsi_id=nsi_id,
3727 db_nsr=db_nsr,
3728 db_vnfr=db_vnfr,
3729 vdu_id=vdu_id,
3730 kdu_name=kdu_name,
3731 vdu_index=vdu_index,
3732 deploy_params=deploy_params,
3733 config_descriptor=descriptor_config,
3734 base_folder=base_folder,
3735 nslcmop_id=nslcmop_id,
3736 stage=stage,
3737 vca_type=vca_type,
3738 vca_name=vca_name,
3739 ee_config_descriptor=ee_item,
3740 )
3741 )
3742 self.lcm_tasks.register(
3743 "ns",
3744 nsr_id,
3745 nslcmop_id,
3746 "instantiate_N2VC-{}".format(vca_index),
3747 task_n2vc,
3748 )
3749 task_instantiation_info[
3750 task_n2vc
3751 ] = self.task_name_deploy_vca + " {}.{}".format(
3752 member_vnf_index or "", vdu_id or ""
3753 )
3754
3755 @staticmethod
3756 def _create_nslcmop(nsr_id, operation, params):
3757 """
3758 Creates a ns-lcm-opp content to be stored at database.
3759 :param nsr_id: internal id of the instance
3760 :param operation: instantiate, terminate, scale, action, ...
3761 :param params: user parameters for the operation
3762 :return: dictionary following SOL005 format
3763 """
3764 # Raise exception if invalid arguments
3765 if not (nsr_id and operation and params):
3766 raise LcmException(
3767 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3768 )
3769 now = time()
3770 _id = str(uuid4())
3771 nslcmop = {
3772 "id": _id,
3773 "_id": _id,
3774 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3775 "operationState": "PROCESSING",
3776 "statusEnteredTime": now,
3777 "nsInstanceId": nsr_id,
3778 "lcmOperationType": operation,
3779 "startTime": now,
3780 "isAutomaticInvocation": False,
3781 "operationParams": params,
3782 "isCancelPending": False,
3783 "links": {
3784 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3785 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3786 },
3787 }
3788 return nslcmop
3789
3790 def _format_additional_params(self, params):
3791 params = params or {}
3792 for key, value in params.items():
3793 if str(value).startswith("!!yaml "):
3794 params[key] = yaml.safe_load(value[7:])
3795 return params
3796
3797 def _get_terminate_primitive_params(self, seq, vnf_index):
3798 primitive = seq.get("name")
3799 primitive_params = {}
3800 params = {
3801 "member_vnf_index": vnf_index,
3802 "primitive": primitive,
3803 "primitive_params": primitive_params,
3804 }
3805 desc_params = {}
3806 return self._map_primitive_params(seq, params, desc_params)
3807
3808 # sub-operations
3809
3810 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3811 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3812 if op.get("operationState") == "COMPLETED":
3813 # b. Skip sub-operation
3814 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3815 return self.SUBOPERATION_STATUS_SKIP
3816 else:
3817 # c. retry executing sub-operation
3818 # The sub-operation exists, and operationState != 'COMPLETED'
3819 # Update operationState = 'PROCESSING' to indicate a retry.
3820 operationState = "PROCESSING"
3821 detailed_status = "In progress"
3822 self._update_suboperation_status(
3823 db_nslcmop, op_index, operationState, detailed_status
3824 )
3825 # Return the sub-operation index
3826 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3827 # with arguments extracted from the sub-operation
3828 return op_index
3829
3830 # Find a sub-operation where all keys in a matching dictionary must match
3831 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3832 def _find_suboperation(self, db_nslcmop, match):
3833 if db_nslcmop and match:
3834 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3835 for i, op in enumerate(op_list):
3836 if all(op.get(k) == match[k] for k in match):
3837 return i
3838 return self.SUBOPERATION_STATUS_NOT_FOUND
3839
3840 # Update status for a sub-operation given its index
3841 def _update_suboperation_status(
3842 self, db_nslcmop, op_index, operationState, detailed_status
3843 ):
3844 # Update DB for HA tasks
3845 q_filter = {"_id": db_nslcmop["_id"]}
3846 update_dict = {
3847 "_admin.operations.{}.operationState".format(op_index): operationState,
3848 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3849 }
3850 self.db.set_one(
3851 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3852 )
3853
3854 # Add sub-operation, return the index of the added sub-operation
3855 # Optionally, set operationState, detailed-status, and operationType
3856 # Status and type are currently set for 'scale' sub-operations:
3857 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3858 # 'detailed-status' : status message
3859 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3860 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3861 def _add_suboperation(
3862 self,
3863 db_nslcmop,
3864 vnf_index,
3865 vdu_id,
3866 vdu_count_index,
3867 vdu_name,
3868 primitive,
3869 mapped_primitive_params,
3870 operationState=None,
3871 detailed_status=None,
3872 operationType=None,
3873 RO_nsr_id=None,
3874 RO_scaling_info=None,
3875 ):
3876 if not db_nslcmop:
3877 return self.SUBOPERATION_STATUS_NOT_FOUND
3878 # Get the "_admin.operations" list, if it exists
3879 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3880 op_list = db_nslcmop_admin.get("operations")
3881 # Create or append to the "_admin.operations" list
3882 new_op = {
3883 "member_vnf_index": vnf_index,
3884 "vdu_id": vdu_id,
3885 "vdu_count_index": vdu_count_index,
3886 "primitive": primitive,
3887 "primitive_params": mapped_primitive_params,
3888 }
3889 if operationState:
3890 new_op["operationState"] = operationState
3891 if detailed_status:
3892 new_op["detailed-status"] = detailed_status
3893 if operationType:
3894 new_op["lcmOperationType"] = operationType
3895 if RO_nsr_id:
3896 new_op["RO_nsr_id"] = RO_nsr_id
3897 if RO_scaling_info:
3898 new_op["RO_scaling_info"] = RO_scaling_info
3899 if not op_list:
3900 # No existing operations, create key 'operations' with current operation as first list element
3901 db_nslcmop_admin.update({"operations": [new_op]})
3902 op_list = db_nslcmop_admin.get("operations")
3903 else:
3904 # Existing operations, append operation to list
3905 op_list.append(new_op)
3906
3907 db_nslcmop_update = {"_admin.operations": op_list}
3908 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3909 op_index = len(op_list) - 1
3910 return op_index
3911
3912 # Helper methods for scale() sub-operations
3913
3914 # pre-scale/post-scale:
3915 # Check for 3 different cases:
3916 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3917 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3918 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3919 def _check_or_add_scale_suboperation(
3920 self,
3921 db_nslcmop,
3922 vnf_index,
3923 vnf_config_primitive,
3924 primitive_params,
3925 operationType,
3926 RO_nsr_id=None,
3927 RO_scaling_info=None,
3928 ):
3929 # Find this sub-operation
3930 if RO_nsr_id and RO_scaling_info:
3931 operationType = "SCALE-RO"
3932 match = {
3933 "member_vnf_index": vnf_index,
3934 "RO_nsr_id": RO_nsr_id,
3935 "RO_scaling_info": RO_scaling_info,
3936 }
3937 else:
3938 match = {
3939 "member_vnf_index": vnf_index,
3940 "primitive": vnf_config_primitive,
3941 "primitive_params": primitive_params,
3942 "lcmOperationType": operationType,
3943 }
3944 op_index = self._find_suboperation(db_nslcmop, match)
3945 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3946 # a. New sub-operation
3947 # The sub-operation does not exist, add it.
3948 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3949 # The following parameters are set to None for all kind of scaling:
3950 vdu_id = None
3951 vdu_count_index = None
3952 vdu_name = None
3953 if RO_nsr_id and RO_scaling_info:
3954 vnf_config_primitive = None
3955 primitive_params = None
3956 else:
3957 RO_nsr_id = None
3958 RO_scaling_info = None
3959 # Initial status for sub-operation
3960 operationState = "PROCESSING"
3961 detailed_status = "In progress"
3962 # Add sub-operation for pre/post-scaling (zero or more operations)
3963 self._add_suboperation(
3964 db_nslcmop,
3965 vnf_index,
3966 vdu_id,
3967 vdu_count_index,
3968 vdu_name,
3969 vnf_config_primitive,
3970 primitive_params,
3971 operationState,
3972 detailed_status,
3973 operationType,
3974 RO_nsr_id,
3975 RO_scaling_info,
3976 )
3977 return self.SUBOPERATION_STATUS_NEW
3978 else:
3979 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3980 # or op_index (operationState != 'COMPLETED')
3981 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3982
3983 # Function to return execution_environment id
3984
3985 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3986 # TODO vdu_index_count
3987 for vca in vca_deployed_list:
3988 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3989 return vca["ee_id"]
3990
3991 async def destroy_N2VC(
3992 self,
3993 logging_text,
3994 db_nslcmop,
3995 vca_deployed,
3996 config_descriptor,
3997 vca_index,
3998 destroy_ee=True,
3999 exec_primitives=True,
4000 scaling_in=False,
4001 vca_id: str = None,
4002 ):
4003 """
4004 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4005 :param logging_text:
4006 :param db_nslcmop:
4007 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4008 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4009 :param vca_index: index in the database _admin.deployed.VCA
4010 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4011 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4012 not executed properly
4013 :param scaling_in: True destroys the application, False destroys the model
4014 :return: None or exception
4015 """
4016
4017 self.logger.debug(
4018 logging_text
4019 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4020 vca_index, vca_deployed, config_descriptor, destroy_ee
4021 )
4022 )
4023
4024 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4025
4026 # execute terminate_primitives
4027 if exec_primitives:
4028 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4029 config_descriptor.get("terminate-config-primitive"),
4030 vca_deployed.get("ee_descriptor_id"),
4031 )
4032 vdu_id = vca_deployed.get("vdu_id")
4033 vdu_count_index = vca_deployed.get("vdu_count_index")
4034 vdu_name = vca_deployed.get("vdu_name")
4035 vnf_index = vca_deployed.get("member-vnf-index")
4036 if terminate_primitives and vca_deployed.get("needed_terminate"):
4037 for seq in terminate_primitives:
4038 # For each sequence in list, get primitive and call _ns_execute_primitive()
4039 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4040 vnf_index, seq.get("name")
4041 )
4042 self.logger.debug(logging_text + step)
4043 # Create the primitive for each sequence, i.e. "primitive": "touch"
4044 primitive = seq.get("name")
4045 mapped_primitive_params = self._get_terminate_primitive_params(
4046 seq, vnf_index
4047 )
4048
4049 # Add sub-operation
4050 self._add_suboperation(
4051 db_nslcmop,
4052 vnf_index,
4053 vdu_id,
4054 vdu_count_index,
4055 vdu_name,
4056 primitive,
4057 mapped_primitive_params,
4058 )
4059 # Sub-operations: Call _ns_execute_primitive() instead of action()
4060 try:
4061 result, result_detail = await self._ns_execute_primitive(
4062 vca_deployed["ee_id"],
4063 primitive,
4064 mapped_primitive_params,
4065 vca_type=vca_type,
4066 vca_id=vca_id,
4067 )
4068 except LcmException:
4069 # this happens when VCA is not deployed. In this case it is not needed to terminate
4070 continue
4071 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4072 if result not in result_ok:
4073 raise LcmException(
4074 "terminate_primitive {} for vnf_member_index={} fails with "
4075 "error {}".format(seq.get("name"), vnf_index, result_detail)
4076 )
4077 # set that this VCA do not need terminated
4078 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4079 vca_index
4080 )
4081 self.update_db_2(
4082 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4083 )
4084
4085 # Delete Prometheus Jobs if any
4086 # This uses NSR_ID, so it will destroy any jobs under this index
4087 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4088
4089 if destroy_ee:
4090 await self.vca_map[vca_type].delete_execution_environment(
4091 vca_deployed["ee_id"],
4092 scaling_in=scaling_in,
4093 vca_type=vca_type,
4094 vca_id=vca_id,
4095 )
4096
4097 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4098 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4099 namespace = "." + db_nsr["_id"]
4100 try:
4101 await self.n2vc.delete_namespace(
4102 namespace=namespace,
4103 total_timeout=self.timeout_charm_delete,
4104 vca_id=vca_id,
4105 )
4106 except N2VCNotFound: # already deleted. Skip
4107 pass
4108 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4109
4110 async def _terminate_RO(
4111 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4112 ):
4113 """
4114 Terminates a deployment from RO
4115 :param logging_text:
4116 :param nsr_deployed: db_nsr._admin.deployed
4117 :param nsr_id:
4118 :param nslcmop_id:
4119 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4120 this method will update only the index 2, but it will write on database the concatenated content of the list
4121 :return:
4122 """
4123 db_nsr_update = {}
4124 failed_detail = []
4125 ro_nsr_id = ro_delete_action = None
4126 if nsr_deployed and nsr_deployed.get("RO"):
4127 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4128 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4129 try:
4130 if ro_nsr_id:
4131 stage[2] = "Deleting ns from VIM."
4132 db_nsr_update["detailed-status"] = " ".join(stage)
4133 self._write_op_status(nslcmop_id, stage)
4134 self.logger.debug(logging_text + stage[2])
4135 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4136 self._write_op_status(nslcmop_id, stage)
4137 desc = await self.RO.delete("ns", ro_nsr_id)
4138 ro_delete_action = desc["action_id"]
4139 db_nsr_update[
4140 "_admin.deployed.RO.nsr_delete_action_id"
4141 ] = ro_delete_action
4142 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4143 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4144 if ro_delete_action:
4145 # wait until NS is deleted from VIM
4146 stage[2] = "Waiting ns deleted from VIM."
4147 detailed_status_old = None
4148 self.logger.debug(
4149 logging_text
4150 + stage[2]
4151 + " RO_id={} ro_delete_action={}".format(
4152 ro_nsr_id, ro_delete_action
4153 )
4154 )
4155 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4156 self._write_op_status(nslcmop_id, stage)
4157
4158 delete_timeout = 20 * 60 # 20 minutes
4159 while delete_timeout > 0:
4160 desc = await self.RO.show(
4161 "ns",
4162 item_id_name=ro_nsr_id,
4163 extra_item="action",
4164 extra_item_id=ro_delete_action,
4165 )
4166
4167 # deploymentStatus
4168 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4169
4170 ns_status, ns_status_info = self.RO.check_action_status(desc)
4171 if ns_status == "ERROR":
4172 raise ROclient.ROClientException(ns_status_info)
4173 elif ns_status == "BUILD":
4174 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4175 elif ns_status == "ACTIVE":
4176 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4177 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4178 break
4179 else:
4180 assert (
4181 False
4182 ), "ROclient.check_action_status returns unknown {}".format(
4183 ns_status
4184 )
4185 if stage[2] != detailed_status_old:
4186 detailed_status_old = stage[2]
4187 db_nsr_update["detailed-status"] = " ".join(stage)
4188 self._write_op_status(nslcmop_id, stage)
4189 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4190 await asyncio.sleep(5, loop=self.loop)
4191 delete_timeout -= 5
4192 else: # delete_timeout <= 0:
4193 raise ROclient.ROClientException(
4194 "Timeout waiting ns deleted from VIM"
4195 )
4196
4197 except Exception as e:
4198 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4199 if (
4200 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4201 ): # not found
4202 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4203 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4204 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4205 self.logger.debug(
4206 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4207 )
4208 elif (
4209 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4210 ): # conflict
4211 failed_detail.append("delete conflict: {}".format(e))
4212 self.logger.debug(
4213 logging_text
4214 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4215 )
4216 else:
4217 failed_detail.append("delete error: {}".format(e))
4218 self.logger.error(
4219 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4220 )
4221
4222 # Delete nsd
4223 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4224 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4225 try:
4226 stage[2] = "Deleting nsd from RO."
4227 db_nsr_update["detailed-status"] = " ".join(stage)
4228 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4229 self._write_op_status(nslcmop_id, stage)
4230 await self.RO.delete("nsd", ro_nsd_id)
4231 self.logger.debug(
4232 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4233 )
4234 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4235 except Exception as e:
4236 if (
4237 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4238 ): # not found
4239 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4240 self.logger.debug(
4241 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4242 )
4243 elif (
4244 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4245 ): # conflict
4246 failed_detail.append(
4247 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4248 )
4249 self.logger.debug(logging_text + failed_detail[-1])
4250 else:
4251 failed_detail.append(
4252 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4253 )
4254 self.logger.error(logging_text + failed_detail[-1])
4255
4256 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4257 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4258 if not vnf_deployed or not vnf_deployed["id"]:
4259 continue
4260 try:
4261 ro_vnfd_id = vnf_deployed["id"]
4262 stage[
4263 2
4264 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4265 vnf_deployed["member-vnf-index"], ro_vnfd_id
4266 )
4267 db_nsr_update["detailed-status"] = " ".join(stage)
4268 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4269 self._write_op_status(nslcmop_id, stage)
4270 await self.RO.delete("vnfd", ro_vnfd_id)
4271 self.logger.debug(
4272 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4273 )
4274 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4275 except Exception as e:
4276 if (
4277 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4278 ): # not found
4279 db_nsr_update[
4280 "_admin.deployed.RO.vnfd.{}.id".format(index)
4281 ] = None
4282 self.logger.debug(
4283 logging_text
4284 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4285 )
4286 elif (
4287 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4288 ): # conflict
4289 failed_detail.append(
4290 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4291 )
4292 self.logger.debug(logging_text + failed_detail[-1])
4293 else:
4294 failed_detail.append(
4295 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4296 )
4297 self.logger.error(logging_text + failed_detail[-1])
4298
4299 if failed_detail:
4300 stage[2] = "Error deleting from VIM"
4301 else:
4302 stage[2] = "Deleted from VIM"
4303 db_nsr_update["detailed-status"] = " ".join(stage)
4304 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4305 self._write_op_status(nslcmop_id, stage)
4306
4307 if failed_detail:
4308 raise LcmException("; ".join(failed_detail))
4309
4310 async def terminate(self, nsr_id, nslcmop_id):
4311 # Try to lock HA task here
4312 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4313 if not task_is_locked_by_me:
4314 return
4315
4316 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4317 self.logger.debug(logging_text + "Enter")
4318 timeout_ns_terminate = self.timeout_ns_terminate
4319 db_nsr = None
4320 db_nslcmop = None
4321 operation_params = None
4322 exc = None
4323 error_list = [] # annotates all failed error messages
4324 db_nslcmop_update = {}
4325 autoremove = False # autoremove after terminated
4326 tasks_dict_info = {}
4327 db_nsr_update = {}
4328 stage = [
4329 "Stage 1/3: Preparing task.",
4330 "Waiting for previous operations to terminate.",
4331 "",
4332 ]
4333 # ^ contains [stage, step, VIM-status]
4334 try:
4335 # wait for any previous tasks in process
4336 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4337
4338 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4339 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4340 operation_params = db_nslcmop.get("operationParams") or {}
4341 if operation_params.get("timeout_ns_terminate"):
4342 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4343 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4344 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4345
4346 db_nsr_update["operational-status"] = "terminating"
4347 db_nsr_update["config-status"] = "terminating"
4348 self._write_ns_status(
4349 nsr_id=nsr_id,
4350 ns_state="TERMINATING",
4351 current_operation="TERMINATING",
4352 current_operation_id=nslcmop_id,
4353 other_update=db_nsr_update,
4354 )
4355 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4356 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4357 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4358 return
4359
4360 stage[1] = "Getting vnf descriptors from db."
4361 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4362 db_vnfrs_dict = {
4363 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4364 }
4365 db_vnfds_from_id = {}
4366 db_vnfds_from_member_index = {}
4367 # Loop over VNFRs
4368 for vnfr in db_vnfrs_list:
4369 vnfd_id = vnfr["vnfd-id"]
4370 if vnfd_id not in db_vnfds_from_id:
4371 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4372 db_vnfds_from_id[vnfd_id] = vnfd
4373 db_vnfds_from_member_index[
4374 vnfr["member-vnf-index-ref"]
4375 ] = db_vnfds_from_id[vnfd_id]
4376
4377 # Destroy individual execution environments when there are terminating primitives.
4378 # Rest of EE will be deleted at once
4379 # TODO - check before calling _destroy_N2VC
4380 # if not operation_params.get("skip_terminate_primitives"):#
4381 # or not vca.get("needed_terminate"):
4382 stage[0] = "Stage 2/3 execute terminating primitives."
4383 self.logger.debug(logging_text + stage[0])
4384 stage[1] = "Looking execution environment that needs terminate."
4385 self.logger.debug(logging_text + stage[1])
4386
4387 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4388 config_descriptor = None
4389 vca_member_vnf_index = vca.get("member-vnf-index")
4390 vca_id = self.get_vca_id(
4391 db_vnfrs_dict.get(vca_member_vnf_index)
4392 if vca_member_vnf_index
4393 else None,
4394 db_nsr,
4395 )
4396 if not vca or not vca.get("ee_id"):
4397 continue
4398 if not vca.get("member-vnf-index"):
4399 # ns
4400 config_descriptor = db_nsr.get("ns-configuration")
4401 elif vca.get("vdu_id"):
4402 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4403 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4404 elif vca.get("kdu_name"):
4405 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4406 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4407 else:
4408 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4409 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4410 vca_type = vca.get("type")
4411 exec_terminate_primitives = not operation_params.get(
4412 "skip_terminate_primitives"
4413 ) and vca.get("needed_terminate")
4414 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4415 # pending native charms
4416 destroy_ee = (
4417 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4418 )
4419 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4420 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4421 task = asyncio.ensure_future(
4422 self.destroy_N2VC(
4423 logging_text,
4424 db_nslcmop,
4425 vca,
4426 config_descriptor,
4427 vca_index,
4428 destroy_ee,
4429 exec_terminate_primitives,
4430 vca_id=vca_id,
4431 )
4432 )
4433 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4434
4435 # wait for pending tasks of terminate primitives
4436 if tasks_dict_info:
4437 self.logger.debug(
4438 logging_text
4439 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4440 )
4441 error_list = await self._wait_for_tasks(
4442 logging_text,
4443 tasks_dict_info,
4444 min(self.timeout_charm_delete, timeout_ns_terminate),
4445 stage,
4446 nslcmop_id,
4447 )
4448 tasks_dict_info.clear()
4449 if error_list:
4450 return # raise LcmException("; ".join(error_list))
4451
4452 # remove All execution environments at once
4453 stage[0] = "Stage 3/3 delete all."
4454
4455 if nsr_deployed.get("VCA"):
4456 stage[1] = "Deleting all execution environments."
4457 self.logger.debug(logging_text + stage[1])
4458 vca_id = self.get_vca_id({}, db_nsr)
4459 task_delete_ee = asyncio.ensure_future(
4460 asyncio.wait_for(
4461 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4462 timeout=self.timeout_charm_delete,
4463 )
4464 )
4465 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4466 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4467
4468 # Delete from k8scluster
4469 stage[1] = "Deleting KDUs."
4470 self.logger.debug(logging_text + stage[1])
4471 # print(nsr_deployed)
4472 for kdu in get_iterable(nsr_deployed, "K8s"):
4473 if not kdu or not kdu.get("kdu-instance"):
4474 continue
4475 kdu_instance = kdu.get("kdu-instance")
4476 if kdu.get("k8scluster-type") in self.k8scluster_map:
4477 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4478 vca_id = self.get_vca_id({}, db_nsr)
4479 task_delete_kdu_instance = asyncio.ensure_future(
4480 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4481 cluster_uuid=kdu.get("k8scluster-uuid"),
4482 kdu_instance=kdu_instance,
4483 vca_id=vca_id,
4484 )
4485 )
4486 else:
4487 self.logger.error(
4488 logging_text
4489 + "Unknown k8s deployment type {}".format(
4490 kdu.get("k8scluster-type")
4491 )
4492 )
4493 continue
4494 tasks_dict_info[
4495 task_delete_kdu_instance
4496 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4497
4498 # remove from RO
4499 stage[1] = "Deleting ns from VIM."
4500 if self.ng_ro:
4501 task_delete_ro = asyncio.ensure_future(
4502 self._terminate_ng_ro(
4503 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4504 )
4505 )
4506 else:
4507 task_delete_ro = asyncio.ensure_future(
4508 self._terminate_RO(
4509 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4510 )
4511 )
4512 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4513
4514 # rest of staff will be done at finally
4515
4516 except (
4517 ROclient.ROClientException,
4518 DbException,
4519 LcmException,
4520 N2VCException,
4521 ) as e:
4522 self.logger.error(logging_text + "Exit Exception {}".format(e))
4523 exc = e
4524 except asyncio.CancelledError:
4525 self.logger.error(
4526 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4527 )
4528 exc = "Operation was cancelled"
4529 except Exception as e:
4530 exc = traceback.format_exc()
4531 self.logger.critical(
4532 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4533 exc_info=True,
4534 )
4535 finally:
4536 if exc:
4537 error_list.append(str(exc))
4538 try:
4539 # wait for pending tasks
4540 if tasks_dict_info:
4541 stage[1] = "Waiting for terminate pending tasks."
4542 self.logger.debug(logging_text + stage[1])
4543 error_list += await self._wait_for_tasks(
4544 logging_text,
4545 tasks_dict_info,
4546 timeout_ns_terminate,
4547 stage,
4548 nslcmop_id,
4549 )
4550 stage[1] = stage[2] = ""
4551 except asyncio.CancelledError:
4552 error_list.append("Cancelled")
4553 # TODO cancell all tasks
4554 except Exception as exc:
4555 error_list.append(str(exc))
4556 # update status at database
4557 if error_list:
4558 error_detail = "; ".join(error_list)
4559 # self.logger.error(logging_text + error_detail)
4560 error_description_nslcmop = "{} Detail: {}".format(
4561 stage[0], error_detail
4562 )
4563 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4564 nslcmop_id, stage[0]
4565 )
4566
4567 db_nsr_update["operational-status"] = "failed"
4568 db_nsr_update["detailed-status"] = (
4569 error_description_nsr + " Detail: " + error_detail
4570 )
4571 db_nslcmop_update["detailed-status"] = error_detail
4572 nslcmop_operation_state = "FAILED"
4573 ns_state = "BROKEN"
4574 else:
4575 error_detail = None
4576 error_description_nsr = error_description_nslcmop = None
4577 ns_state = "NOT_INSTANTIATED"
4578 db_nsr_update["operational-status"] = "terminated"
4579 db_nsr_update["detailed-status"] = "Done"
4580 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4581 db_nslcmop_update["detailed-status"] = "Done"
4582 nslcmop_operation_state = "COMPLETED"
4583
4584 if db_nsr:
4585 self._write_ns_status(
4586 nsr_id=nsr_id,
4587 ns_state=ns_state,
4588 current_operation="IDLE",
4589 current_operation_id=None,
4590 error_description=error_description_nsr,
4591 error_detail=error_detail,
4592 other_update=db_nsr_update,
4593 )
4594 self._write_op_status(
4595 op_id=nslcmop_id,
4596 stage="",
4597 error_message=error_description_nslcmop,
4598 operation_state=nslcmop_operation_state,
4599 other_update=db_nslcmop_update,
4600 )
4601 if ns_state == "NOT_INSTANTIATED":
4602 try:
4603 self.db.set_list(
4604 "vnfrs",
4605 {"nsr-id-ref": nsr_id},
4606 {"_admin.nsState": "NOT_INSTANTIATED"},
4607 )
4608 except DbException as e:
4609 self.logger.warn(
4610 logging_text
4611 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4612 nsr_id, e
4613 )
4614 )
4615 if operation_params:
4616 autoremove = operation_params.get("autoremove", False)
4617 if nslcmop_operation_state:
4618 try:
4619 await self.msg.aiowrite(
4620 "ns",
4621 "terminated",
4622 {
4623 "nsr_id": nsr_id,
4624 "nslcmop_id": nslcmop_id,
4625 "operationState": nslcmop_operation_state,
4626 "autoremove": autoremove,
4627 },
4628 loop=self.loop,
4629 )
4630 except Exception as e:
4631 self.logger.error(
4632 logging_text + "kafka_write notification Exception {}".format(e)
4633 )
4634
4635 self.logger.debug(logging_text + "Exit")
4636 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4637
4638 async def _wait_for_tasks(
4639 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4640 ):
4641 time_start = time()
4642 error_detail_list = []
4643 error_list = []
4644 pending_tasks = list(created_tasks_info.keys())
4645 num_tasks = len(pending_tasks)
4646 num_done = 0
4647 stage[1] = "{}/{}.".format(num_done, num_tasks)
4648 self._write_op_status(nslcmop_id, stage)
4649 while pending_tasks:
4650 new_error = None
4651 _timeout = timeout + time_start - time()
4652 done, pending_tasks = await asyncio.wait(
4653 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4654 )
4655 num_done += len(done)
4656 if not done: # Timeout
4657 for task in pending_tasks:
4658 new_error = created_tasks_info[task] + ": Timeout"
4659 error_detail_list.append(new_error)
4660 error_list.append(new_error)
4661 break
4662 for task in done:
4663 if task.cancelled():
4664 exc = "Cancelled"
4665 else:
4666 exc = task.exception()
4667 if exc:
4668 if isinstance(exc, asyncio.TimeoutError):
4669 exc = "Timeout"
4670 new_error = created_tasks_info[task] + ": {}".format(exc)
4671 error_list.append(created_tasks_info[task])
4672 error_detail_list.append(new_error)
4673 if isinstance(
4674 exc,
4675 (
4676 str,
4677 DbException,
4678 N2VCException,
4679 ROclient.ROClientException,
4680 LcmException,
4681 K8sException,
4682 NgRoException,
4683 ),
4684 ):
4685 self.logger.error(logging_text + new_error)
4686 else:
4687 exc_traceback = "".join(
4688 traceback.format_exception(None, exc, exc.__traceback__)
4689 )
4690 self.logger.error(
4691 logging_text
4692 + created_tasks_info[task]
4693 + " "
4694 + exc_traceback
4695 )
4696 else:
4697 self.logger.debug(
4698 logging_text + created_tasks_info[task] + ": Done"
4699 )
4700 stage[1] = "{}/{}.".format(num_done, num_tasks)
4701 if new_error:
4702 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4703 if nsr_id: # update also nsr
4704 self.update_db_2(
4705 "nsrs",
4706 nsr_id,
4707 {
4708 "errorDescription": "Error at: " + ", ".join(error_list),
4709 "errorDetail": ". ".join(error_detail_list),
4710 },
4711 )
4712 self._write_op_status(nslcmop_id, stage)
4713 return error_detail_list
4714
4715 @staticmethod
4716 def _map_primitive_params(primitive_desc, params, instantiation_params):
4717 """
4718 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4719 The default-value is used. If it is between < > it look for a value at instantiation_params
4720 :param primitive_desc: portion of VNFD/NSD that describes primitive
4721 :param params: Params provided by user
4722 :param instantiation_params: Instantiation params provided by user
4723 :return: a dictionary with the calculated params
4724 """
4725 calculated_params = {}
4726 for parameter in primitive_desc.get("parameter", ()):
4727 param_name = parameter["name"]
4728 if param_name in params:
4729 calculated_params[param_name] = params[param_name]
4730 elif "default-value" in parameter or "value" in parameter:
4731 if "value" in parameter:
4732 calculated_params[param_name] = parameter["value"]
4733 else:
4734 calculated_params[param_name] = parameter["default-value"]
4735 if (
4736 isinstance(calculated_params[param_name], str)
4737 and calculated_params[param_name].startswith("<")
4738 and calculated_params[param_name].endswith(">")
4739 ):
4740 if calculated_params[param_name][1:-1] in instantiation_params:
4741 calculated_params[param_name] = instantiation_params[
4742 calculated_params[param_name][1:-1]
4743 ]
4744 else:
4745 raise LcmException(
4746 "Parameter {} needed to execute primitive {} not provided".format(
4747 calculated_params[param_name], primitive_desc["name"]
4748 )
4749 )
4750 else:
4751 raise LcmException(
4752 "Parameter {} needed to execute primitive {} not provided".format(
4753 param_name, primitive_desc["name"]
4754 )
4755 )
4756
4757 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4758 calculated_params[param_name] = yaml.safe_dump(
4759 calculated_params[param_name], default_flow_style=True, width=256
4760 )
4761 elif isinstance(calculated_params[param_name], str) and calculated_params[
4762 param_name
4763 ].startswith("!!yaml "):
4764 calculated_params[param_name] = calculated_params[param_name][7:]
4765 if parameter.get("data-type") == "INTEGER":
4766 try:
4767 calculated_params[param_name] = int(calculated_params[param_name])
4768 except ValueError: # error converting string to int
4769 raise LcmException(
4770 "Parameter {} of primitive {} must be integer".format(
4771 param_name, primitive_desc["name"]
4772 )
4773 )
4774 elif parameter.get("data-type") == "BOOLEAN":
4775 calculated_params[param_name] = not (
4776 (str(calculated_params[param_name])).lower() == "false"
4777 )
4778
4779 # add always ns_config_info if primitive name is config
4780 if primitive_desc["name"] == "config":
4781 if "ns_config_info" in instantiation_params:
4782 calculated_params["ns_config_info"] = instantiation_params[
4783 "ns_config_info"
4784 ]
4785 return calculated_params
4786
4787 def _look_for_deployed_vca(
4788 self,
4789 deployed_vca,
4790 member_vnf_index,
4791 vdu_id,
4792 vdu_count_index,
4793 kdu_name=None,
4794 ee_descriptor_id=None,
4795 ):
4796 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4797 for vca in deployed_vca:
4798 if not vca:
4799 continue
4800 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4801 continue
4802 if (
4803 vdu_count_index is not None
4804 and vdu_count_index != vca["vdu_count_index"]
4805 ):
4806 continue
4807 if kdu_name and kdu_name != vca["kdu_name"]:
4808 continue
4809 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4810 continue
4811 break
4812 else:
4813 # vca_deployed not found
4814 raise LcmException(
4815 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4816 " is not deployed".format(
4817 member_vnf_index,
4818 vdu_id,
4819 vdu_count_index,
4820 kdu_name,
4821 ee_descriptor_id,
4822 )
4823 )
4824 # get ee_id
4825 ee_id = vca.get("ee_id")
4826 vca_type = vca.get(
4827 "type", "lxc_proxy_charm"
4828 ) # default value for backward compatibility - proxy charm
4829 if not ee_id:
4830 raise LcmException(
4831 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4832 "execution environment".format(
4833 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4834 )
4835 )
4836 return ee_id, vca_type
4837
4838 async def _ns_execute_primitive(
4839 self,
4840 ee_id,
4841 primitive,
4842 primitive_params,
4843 retries=0,
4844 retries_interval=30,
4845 timeout=None,
4846 vca_type=None,
4847 db_dict=None,
4848 vca_id: str = None,
4849 ) -> (str, str):
4850 try:
4851 if primitive == "config":
4852 primitive_params = {"params": primitive_params}
4853
4854 vca_type = vca_type or "lxc_proxy_charm"
4855
4856 while retries >= 0:
4857 try:
4858 output = await asyncio.wait_for(
4859 self.vca_map[vca_type].exec_primitive(
4860 ee_id=ee_id,
4861 primitive_name=primitive,
4862 params_dict=primitive_params,
4863 progress_timeout=self.timeout_progress_primitive,
4864 total_timeout=self.timeout_primitive,
4865 db_dict=db_dict,
4866 vca_id=vca_id,
4867 vca_type=vca_type,
4868 ),
4869 timeout=timeout or self.timeout_primitive,
4870 )
4871 # execution was OK
4872 break
4873 except asyncio.CancelledError:
4874 raise
4875 except Exception as e: # asyncio.TimeoutError
4876 if isinstance(e, asyncio.TimeoutError):
4877 e = "Timeout"
4878 retries -= 1
4879 if retries >= 0:
4880 self.logger.debug(
4881 "Error executing action {} on {} -> {}".format(
4882 primitive, ee_id, e
4883 )
4884 )
4885 # wait and retry
4886 await asyncio.sleep(retries_interval, loop=self.loop)
4887 else:
4888 return "FAILED", str(e)
4889
4890 return "COMPLETED", output
4891
4892 except (LcmException, asyncio.CancelledError):
4893 raise
4894 except Exception as e:
4895 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4896
4897 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4898 """
4899 Updating the vca_status with latest juju information in nsrs record
4900 :param: nsr_id: Id of the nsr
4901 :param: nslcmop_id: Id of the nslcmop
4902 :return: None
4903 """
4904
4905 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4906 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4907 vca_id = self.get_vca_id({}, db_nsr)
4908 if db_nsr["_admin"]["deployed"]["K8s"]:
4909 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4910 cluster_uuid, kdu_instance, cluster_type = (
4911 k8s["k8scluster-uuid"],
4912 k8s["kdu-instance"],
4913 k8s["k8scluster-type"],
4914 )
4915 await self._on_update_k8s_db(
4916 cluster_uuid=cluster_uuid,
4917 kdu_instance=kdu_instance,
4918 filter={"_id": nsr_id},
4919 vca_id=vca_id,
4920 cluster_type=cluster_type,
4921 )
4922 else:
4923 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4924 table, filter = "nsrs", {"_id": nsr_id}
4925 path = "_admin.deployed.VCA.{}.".format(vca_index)
4926 await self._on_update_n2vc_db(table, filter, path, {})
4927
4928 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4929 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4930
4931 async def action(self, nsr_id, nslcmop_id):
4932 # Try to lock HA task here
4933 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4934 if not task_is_locked_by_me:
4935 return
4936
4937 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4938 self.logger.debug(logging_text + "Enter")
4939 # get all needed from database
4940 db_nsr = None
4941 db_nslcmop = None
4942 db_nsr_update = {}
4943 db_nslcmop_update = {}
4944 nslcmop_operation_state = None
4945 error_description_nslcmop = None
4946 exc = None
4947 try:
4948 # wait for any previous tasks in process
4949 step = "Waiting for previous operations to terminate"
4950 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4951
4952 self._write_ns_status(
4953 nsr_id=nsr_id,
4954 ns_state=None,
4955 current_operation="RUNNING ACTION",
4956 current_operation_id=nslcmop_id,
4957 )
4958
4959 step = "Getting information from database"
4960 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4961 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4962 if db_nslcmop["operationParams"].get("primitive_params"):
4963 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4964 db_nslcmop["operationParams"]["primitive_params"]
4965 )
4966
4967 nsr_deployed = db_nsr["_admin"].get("deployed")
4968 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4969 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4970 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4971 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4972 primitive = db_nslcmop["operationParams"]["primitive"]
4973 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4974 timeout_ns_action = db_nslcmop["operationParams"].get(
4975 "timeout_ns_action", self.timeout_primitive
4976 )
4977
4978 if vnf_index:
4979 step = "Getting vnfr from database"
4980 db_vnfr = self.db.get_one(
4981 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4982 )
4983 if db_vnfr.get("kdur"):
4984 kdur_list = []
4985 for kdur in db_vnfr["kdur"]:
4986 if kdur.get("additionalParams"):
4987 kdur["additionalParams"] = json.loads(
4988 kdur["additionalParams"]
4989 )
4990 kdur_list.append(kdur)
4991 db_vnfr["kdur"] = kdur_list
4992 step = "Getting vnfd from database"
4993 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4994
4995 # Sync filesystem before running a primitive
4996 self.fs.sync(db_vnfr["vnfd-id"])
4997 else:
4998 step = "Getting nsd from database"
4999 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5000
5001 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5002 # for backward compatibility
5003 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5004 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5005 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5006 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5007
5008 # look for primitive
5009 config_primitive_desc = descriptor_configuration = None
5010 if vdu_id:
5011 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5012 elif kdu_name:
5013 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5014 elif vnf_index:
5015 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5016 else:
5017 descriptor_configuration = db_nsd.get("ns-configuration")
5018
5019 if descriptor_configuration and descriptor_configuration.get(
5020 "config-primitive"
5021 ):
5022 for config_primitive in descriptor_configuration["config-primitive"]:
5023 if config_primitive["name"] == primitive:
5024 config_primitive_desc = config_primitive
5025 break
5026
5027 if not config_primitive_desc:
5028 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5029 raise LcmException(
5030 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5031 primitive
5032 )
5033 )
5034 primitive_name = primitive
5035 ee_descriptor_id = None
5036 else:
5037 primitive_name = config_primitive_desc.get(
5038 "execution-environment-primitive", primitive
5039 )
5040 ee_descriptor_id = config_primitive_desc.get(
5041 "execution-environment-ref"
5042 )
5043
5044 if vnf_index:
5045 if vdu_id:
5046 vdur = next(
5047 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5048 )
5049 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5050 elif kdu_name:
5051 kdur = next(
5052 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5053 )
5054 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5055 else:
5056 desc_params = parse_yaml_strings(
5057 db_vnfr.get("additionalParamsForVnf")
5058 )
5059 else:
5060 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5061 if kdu_name and get_configuration(db_vnfd, kdu_name):
5062 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5063 actions = set()
5064 for primitive in kdu_configuration.get("initial-config-primitive", []):
5065 actions.add(primitive["name"])
5066 for primitive in kdu_configuration.get("config-primitive", []):
5067 actions.add(primitive["name"])
5068 kdu_action = True if primitive_name in actions else False
5069
5070 # TODO check if ns is in a proper status
5071 if kdu_name and (
5072 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5073 ):
5074 # kdur and desc_params already set from before
5075 if primitive_params:
5076 desc_params.update(primitive_params)
5077 # TODO Check if we will need something at vnf level
5078 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5079 if (
5080 kdu_name == kdu["kdu-name"]
5081 and kdu["member-vnf-index"] == vnf_index
5082 ):
5083 break
5084 else:
5085 raise LcmException(
5086 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5087 )
5088
5089 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5090 msg = "unknown k8scluster-type '{}'".format(
5091 kdu.get("k8scluster-type")
5092 )
5093 raise LcmException(msg)
5094
5095 db_dict = {
5096 "collection": "nsrs",
5097 "filter": {"_id": nsr_id},
5098 "path": "_admin.deployed.K8s.{}".format(index),
5099 }
5100 self.logger.debug(
5101 logging_text
5102 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5103 )
5104 step = "Executing kdu {}".format(primitive_name)
5105 if primitive_name == "upgrade":
5106 if desc_params.get("kdu_model"):
5107 kdu_model = desc_params.get("kdu_model")
5108 del desc_params["kdu_model"]
5109 else:
5110 kdu_model = kdu.get("kdu-model")
5111 parts = kdu_model.split(sep=":")
5112 if len(parts) == 2:
5113 kdu_model = parts[0]
5114
5115 detailed_status = await asyncio.wait_for(
5116 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5117 cluster_uuid=kdu.get("k8scluster-uuid"),
5118 kdu_instance=kdu.get("kdu-instance"),
5119 atomic=True,
5120 kdu_model=kdu_model,
5121 params=desc_params,
5122 db_dict=db_dict,
5123 timeout=timeout_ns_action,
5124 ),
5125 timeout=timeout_ns_action + 10,
5126 )
5127 self.logger.debug(
5128 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5129 )
5130 elif primitive_name == "rollback":
5131 detailed_status = await asyncio.wait_for(
5132 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5133 cluster_uuid=kdu.get("k8scluster-uuid"),
5134 kdu_instance=kdu.get("kdu-instance"),
5135 db_dict=db_dict,
5136 ),
5137 timeout=timeout_ns_action,
5138 )
5139 elif primitive_name == "status":
5140 detailed_status = await asyncio.wait_for(
5141 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5142 cluster_uuid=kdu.get("k8scluster-uuid"),
5143 kdu_instance=kdu.get("kdu-instance"),
5144 vca_id=vca_id,
5145 ),
5146 timeout=timeout_ns_action,
5147 )
5148 else:
5149 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5150 kdu["kdu-name"], nsr_id
5151 )
5152 params = self._map_primitive_params(
5153 config_primitive_desc, primitive_params, desc_params
5154 )
5155
5156 detailed_status = await asyncio.wait_for(
5157 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5158 cluster_uuid=kdu.get("k8scluster-uuid"),
5159 kdu_instance=kdu_instance,
5160 primitive_name=primitive_name,
5161 params=params,
5162 db_dict=db_dict,
5163 timeout=timeout_ns_action,
5164 vca_id=vca_id,
5165 ),
5166 timeout=timeout_ns_action,
5167 )
5168
5169 if detailed_status:
5170 nslcmop_operation_state = "COMPLETED"
5171 else:
5172 detailed_status = ""
5173 nslcmop_operation_state = "FAILED"
5174 else:
5175 ee_id, vca_type = self._look_for_deployed_vca(
5176 nsr_deployed["VCA"],
5177 member_vnf_index=vnf_index,
5178 vdu_id=vdu_id,
5179 vdu_count_index=vdu_count_index,
5180 ee_descriptor_id=ee_descriptor_id,
5181 )
5182 for vca_index, vca_deployed in enumerate(
5183 db_nsr["_admin"]["deployed"]["VCA"]
5184 ):
5185 if vca_deployed.get("member-vnf-index") == vnf_index:
5186 db_dict = {
5187 "collection": "nsrs",
5188 "filter": {"_id": nsr_id},
5189 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5190 }
5191 break
5192 (
5193 nslcmop_operation_state,
5194 detailed_status,
5195 ) = await self._ns_execute_primitive(
5196 ee_id,
5197 primitive=primitive_name,
5198 primitive_params=self._map_primitive_params(
5199 config_primitive_desc, primitive_params, desc_params
5200 ),
5201 timeout=timeout_ns_action,
5202 vca_type=vca_type,
5203 db_dict=db_dict,
5204 vca_id=vca_id,
5205 )
5206
5207 db_nslcmop_update["detailed-status"] = detailed_status
5208 error_description_nslcmop = (
5209 detailed_status if nslcmop_operation_state == "FAILED" else ""
5210 )
5211 self.logger.debug(
5212 logging_text
5213 + " task Done with result {} {}".format(
5214 nslcmop_operation_state, detailed_status
5215 )
5216 )
5217 return # database update is called inside finally
5218
5219 except (DbException, LcmException, N2VCException, K8sException) as e:
5220 self.logger.error(logging_text + "Exit Exception {}".format(e))
5221 exc = e
5222 except asyncio.CancelledError:
5223 self.logger.error(
5224 logging_text + "Cancelled Exception while '{}'".format(step)
5225 )
5226 exc = "Operation was cancelled"
5227 except asyncio.TimeoutError:
5228 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5229 exc = "Timeout"
5230 except Exception as e:
5231 exc = traceback.format_exc()
5232 self.logger.critical(
5233 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5234 exc_info=True,
5235 )
5236 finally:
5237 if exc:
5238 db_nslcmop_update[
5239 "detailed-status"
5240 ] = (
5241 detailed_status
5242 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5243 nslcmop_operation_state = "FAILED"
5244 if db_nsr:
5245 self._write_ns_status(
5246 nsr_id=nsr_id,
5247 ns_state=db_nsr[
5248 "nsState"
5249 ], # TODO check if degraded. For the moment use previous status
5250 current_operation="IDLE",
5251 current_operation_id=None,
5252 # error_description=error_description_nsr,
5253 # error_detail=error_detail,
5254 other_update=db_nsr_update,
5255 )
5256
5257 self._write_op_status(
5258 op_id=nslcmop_id,
5259 stage="",
5260 error_message=error_description_nslcmop,
5261 operation_state=nslcmop_operation_state,
5262 other_update=db_nslcmop_update,
5263 )
5264
5265 if nslcmop_operation_state:
5266 try:
5267 await self.msg.aiowrite(
5268 "ns",
5269 "actioned",
5270 {
5271 "nsr_id": nsr_id,
5272 "nslcmop_id": nslcmop_id,
5273 "operationState": nslcmop_operation_state,
5274 },
5275 loop=self.loop,
5276 )
5277 except Exception as e:
5278 self.logger.error(
5279 logging_text + "kafka_write notification Exception {}".format(e)
5280 )
5281 self.logger.debug(logging_text + "Exit")
5282 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5283 return nslcmop_operation_state, detailed_status
5284
5285 async def scale(self, nsr_id, nslcmop_id):
5286 # Try to lock HA task here
5287 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5288 if not task_is_locked_by_me:
5289 return
5290
5291 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5292 stage = ["", "", ""]
5293 tasks_dict_info = {}
5294 # ^ stage, step, VIM progress
5295 self.logger.debug(logging_text + "Enter")
5296 # get all needed from database
5297 db_nsr = None
5298 db_nslcmop_update = {}
5299 db_nsr_update = {}
5300 exc = None
5301 # in case of error, indicates what part of scale was failed to put nsr at error status
5302 scale_process = None
5303 old_operational_status = ""
5304 old_config_status = ""
5305 nsi_id = None
5306 try:
5307 # wait for any previous tasks in process
5308 step = "Waiting for previous operations to terminate"
5309 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5310 self._write_ns_status(
5311 nsr_id=nsr_id,
5312 ns_state=None,
5313 current_operation="SCALING",
5314 current_operation_id=nslcmop_id,
5315 )
5316
5317 step = "Getting nslcmop from database"
5318 self.logger.debug(
5319 step + " after having waited for previous tasks to be completed"
5320 )
5321 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5322
5323 step = "Getting nsr from database"
5324 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5325 old_operational_status = db_nsr["operational-status"]
5326 old_config_status = db_nsr["config-status"]
5327
5328 step = "Parsing scaling parameters"
5329 db_nsr_update["operational-status"] = "scaling"
5330 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5331 nsr_deployed = db_nsr["_admin"].get("deployed")
5332
5333 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5334 "scaleByStepData"
5335 ]["member-vnf-index"]
5336 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5337 "scaleByStepData"
5338 ]["scaling-group-descriptor"]
5339 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5340 # for backward compatibility
5341 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5342 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5343 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5344 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5345
5346 step = "Getting vnfr from database"
5347 db_vnfr = self.db.get_one(
5348 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5349 )
5350
5351 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5352
5353 step = "Getting vnfd from database"
5354 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5355
5356 base_folder = db_vnfd["_admin"]["storage"]
5357
5358 step = "Getting scaling-group-descriptor"
5359 scaling_descriptor = find_in_list(
5360 get_scaling_aspect(db_vnfd),
5361 lambda scale_desc: scale_desc["name"] == scaling_group,
5362 )
5363 if not scaling_descriptor:
5364 raise LcmException(
5365 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5366 "at vnfd:scaling-group-descriptor".format(scaling_group)
5367 )
5368
5369 step = "Sending scale order to VIM"
5370 # TODO check if ns is in a proper status
5371 nb_scale_op = 0
5372 if not db_nsr["_admin"].get("scaling-group"):
5373 self.update_db_2(
5374 "nsrs",
5375 nsr_id,
5376 {
5377 "_admin.scaling-group": [
5378 {"name": scaling_group, "nb-scale-op": 0}
5379 ]
5380 },
5381 )
5382 admin_scale_index = 0
5383 else:
5384 for admin_scale_index, admin_scale_info in enumerate(
5385 db_nsr["_admin"]["scaling-group"]
5386 ):
5387 if admin_scale_info["name"] == scaling_group:
5388 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5389 break
5390 else: # not found, set index one plus last element and add new entry with the name
5391 admin_scale_index += 1
5392 db_nsr_update[
5393 "_admin.scaling-group.{}.name".format(admin_scale_index)
5394 ] = scaling_group
5395
5396 vca_scaling_info = []
5397 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5398 if scaling_type == "SCALE_OUT":
5399 if "aspect-delta-details" not in scaling_descriptor:
5400 raise LcmException(
5401 "Aspect delta details not fount in scaling descriptor {}".format(
5402 scaling_descriptor["name"]
5403 )
5404 )
5405 # count if max-instance-count is reached
5406 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5407
5408 scaling_info["scaling_direction"] = "OUT"
5409 scaling_info["vdu-create"] = {}
5410 scaling_info["kdu-create"] = {}
5411 for delta in deltas:
5412 for vdu_delta in delta.get("vdu-delta", {}):
5413 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5414 # vdu_index also provides the number of instance of the targeted vdu
5415 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5416 cloud_init_text = self._get_vdu_cloud_init_content(
5417 vdud, db_vnfd
5418 )
5419 if cloud_init_text:
5420 additional_params = (
5421 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5422 or {}
5423 )
5424 cloud_init_list = []
5425
5426 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5427 max_instance_count = 10
5428 if vdu_profile and "max-number-of-instances" in vdu_profile:
5429 max_instance_count = vdu_profile.get(
5430 "max-number-of-instances", 10
5431 )
5432
5433 default_instance_num = get_number_of_instances(
5434 db_vnfd, vdud["id"]
5435 )
5436 instances_number = vdu_delta.get("number-of-instances", 1)
5437 nb_scale_op += instances_number
5438
5439 new_instance_count = nb_scale_op + default_instance_num
5440 # Control if new count is over max and vdu count is less than max.
5441 # Then assign new instance count
5442 if new_instance_count > max_instance_count > vdu_count:
5443 instances_number = new_instance_count - max_instance_count
5444 else:
5445 instances_number = instances_number
5446
5447 if new_instance_count > max_instance_count:
5448 raise LcmException(
5449 "reached the limit of {} (max-instance-count) "
5450 "scaling-out operations for the "
5451 "scaling-group-descriptor '{}'".format(
5452 nb_scale_op, scaling_group
5453 )
5454 )
5455 for x in range(vdu_delta.get("number-of-instances", 1)):
5456 if cloud_init_text:
5457 # TODO Information of its own ip is not available because db_vnfr is not updated.
5458 additional_params["OSM"] = get_osm_params(
5459 db_vnfr, vdu_delta["id"], vdu_index + x
5460 )
5461 cloud_init_list.append(
5462 self._parse_cloud_init(
5463 cloud_init_text,
5464 additional_params,
5465 db_vnfd["id"],
5466 vdud["id"],
5467 )
5468 )
5469 vca_scaling_info.append(
5470 {
5471 "osm_vdu_id": vdu_delta["id"],
5472 "member-vnf-index": vnf_index,
5473 "type": "create",
5474 "vdu_index": vdu_index + x,
5475 }
5476 )
5477 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5478 for kdu_delta in delta.get("kdu-resource-delta", {}):
5479 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5480 kdu_name = kdu_profile["kdu-name"]
5481 resource_name = kdu_profile.get("resource-name", "")
5482
5483 # Might have different kdus in the same delta
5484 # Should have list for each kdu
5485 if not scaling_info["kdu-create"].get(kdu_name, None):
5486 scaling_info["kdu-create"][kdu_name] = []
5487
5488 kdur = get_kdur(db_vnfr, kdu_name)
5489 if kdur.get("helm-chart"):
5490 k8s_cluster_type = "helm-chart-v3"
5491 self.logger.debug("kdur: {}".format(kdur))
5492 if (
5493 kdur.get("helm-version")
5494 and kdur.get("helm-version") == "v2"
5495 ):
5496 k8s_cluster_type = "helm-chart"
5497 elif kdur.get("juju-bundle"):
5498 k8s_cluster_type = "juju-bundle"
5499 else:
5500 raise LcmException(
5501 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5502 "juju-bundle. Maybe an old NBI version is running".format(
5503 db_vnfr["member-vnf-index-ref"], kdu_name
5504 )
5505 )
5506
5507 max_instance_count = 10
5508 if kdu_profile and "max-number-of-instances" in kdu_profile:
5509 max_instance_count = kdu_profile.get(
5510 "max-number-of-instances", 10
5511 )
5512
5513 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5514 deployed_kdu, _ = get_deployed_kdu(
5515 nsr_deployed, kdu_name, vnf_index
5516 )
5517 if deployed_kdu is None:
5518 raise LcmException(
5519 "KDU '{}' for vnf '{}' not deployed".format(
5520 kdu_name, vnf_index
5521 )
5522 )
5523 kdu_instance = deployed_kdu.get("kdu-instance")
5524 instance_num = await self.k8scluster_map[
5525 k8s_cluster_type
5526 ].get_scale_count(
5527 resource_name,
5528 kdu_instance,
5529 vca_id=vca_id,
5530 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5531 kdu_model=deployed_kdu.get("kdu-model"),
5532 )
5533 kdu_replica_count = instance_num + kdu_delta.get(
5534 "number-of-instances", 1
5535 )
5536
5537 # Control if new count is over max and instance_num is less than max.
5538 # Then assign max instance number to kdu replica count
5539 if kdu_replica_count > max_instance_count > instance_num:
5540 kdu_replica_count = max_instance_count
5541 if kdu_replica_count > max_instance_count:
5542 raise LcmException(
5543 "reached the limit of {} (max-instance-count) "
5544 "scaling-out operations for the "
5545 "scaling-group-descriptor '{}'".format(
5546 instance_num, scaling_group
5547 )
5548 )
5549
5550 for x in range(kdu_delta.get("number-of-instances", 1)):
5551 vca_scaling_info.append(
5552 {
5553 "osm_kdu_id": kdu_name,
5554 "member-vnf-index": vnf_index,
5555 "type": "create",
5556 "kdu_index": instance_num + x - 1,
5557 }
5558 )
5559 scaling_info["kdu-create"][kdu_name].append(
5560 {
5561 "member-vnf-index": vnf_index,
5562 "type": "create",
5563 "k8s-cluster-type": k8s_cluster_type,
5564 "resource-name": resource_name,
5565 "scale": kdu_replica_count,
5566 }
5567 )
5568 elif scaling_type == "SCALE_IN":
5569 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5570
5571 scaling_info["scaling_direction"] = "IN"
5572 scaling_info["vdu-delete"] = {}
5573 scaling_info["kdu-delete"] = {}
5574
5575 for delta in deltas:
5576 for vdu_delta in delta.get("vdu-delta", {}):
5577 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5578 min_instance_count = 0
5579 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5580 if vdu_profile and "min-number-of-instances" in vdu_profile:
5581 min_instance_count = vdu_profile["min-number-of-instances"]
5582
5583 default_instance_num = get_number_of_instances(
5584 db_vnfd, vdu_delta["id"]
5585 )
5586 instance_num = vdu_delta.get("number-of-instances", 1)
5587 nb_scale_op -= instance_num
5588
5589 new_instance_count = nb_scale_op + default_instance_num
5590
5591 if new_instance_count < min_instance_count < vdu_count:
5592 instances_number = min_instance_count - new_instance_count
5593 else:
5594 instances_number = instance_num
5595
5596 if new_instance_count < min_instance_count:
5597 raise LcmException(
5598 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5599 "scaling-group-descriptor '{}'".format(
5600 nb_scale_op, scaling_group
5601 )
5602 )
5603 for x in range(vdu_delta.get("number-of-instances", 1)):
5604 vca_scaling_info.append(
5605 {
5606 "osm_vdu_id": vdu_delta["id"],
5607 "member-vnf-index": vnf_index,
5608 "type": "delete",
5609 "vdu_index": vdu_index - 1 - x,
5610 }
5611 )
5612 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5613 for kdu_delta in delta.get("kdu-resource-delta", {}):
5614 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5615 kdu_name = kdu_profile["kdu-name"]
5616 resource_name = kdu_profile.get("resource-name", "")
5617
5618 if not scaling_info["kdu-delete"].get(kdu_name, None):
5619 scaling_info["kdu-delete"][kdu_name] = []
5620
5621 kdur = get_kdur(db_vnfr, kdu_name)
5622 if kdur.get("helm-chart"):
5623 k8s_cluster_type = "helm-chart-v3"
5624 self.logger.debug("kdur: {}".format(kdur))
5625 if (
5626 kdur.get("helm-version")
5627 and kdur.get("helm-version") == "v2"
5628 ):
5629 k8s_cluster_type = "helm-chart"
5630 elif kdur.get("juju-bundle"):
5631 k8s_cluster_type = "juju-bundle"
5632 else:
5633 raise LcmException(
5634 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5635 "juju-bundle. Maybe an old NBI version is running".format(
5636 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5637 )
5638 )
5639
5640 min_instance_count = 0
5641 if kdu_profile and "min-number-of-instances" in kdu_profile:
5642 min_instance_count = kdu_profile["min-number-of-instances"]
5643
5644 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5645 deployed_kdu, _ = get_deployed_kdu(
5646 nsr_deployed, kdu_name, vnf_index
5647 )
5648 if deployed_kdu is None:
5649 raise LcmException(
5650 "KDU '{}' for vnf '{}' not deployed".format(
5651 kdu_name, vnf_index
5652 )
5653 )
5654 kdu_instance = deployed_kdu.get("kdu-instance")
5655 instance_num = await self.k8scluster_map[
5656 k8s_cluster_type
5657 ].get_scale_count(
5658 resource_name,
5659 kdu_instance,
5660 vca_id=vca_id,
5661 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5662 kdu_model=deployed_kdu.get("kdu-model"),
5663 )
5664 kdu_replica_count = instance_num - kdu_delta.get(
5665 "number-of-instances", 1
5666 )
5667
5668 if kdu_replica_count < min_instance_count < instance_num:
5669 kdu_replica_count = min_instance_count
5670 if kdu_replica_count < min_instance_count:
5671 raise LcmException(
5672 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5673 "scaling-group-descriptor '{}'".format(
5674 instance_num, scaling_group
5675 )
5676 )
5677
5678 for x in range(kdu_delta.get("number-of-instances", 1)):
5679 vca_scaling_info.append(
5680 {
5681 "osm_kdu_id": kdu_name,
5682 "member-vnf-index": vnf_index,
5683 "type": "delete",
5684 "kdu_index": instance_num - x - 1,
5685 }
5686 )
5687 scaling_info["kdu-delete"][kdu_name].append(
5688 {
5689 "member-vnf-index": vnf_index,
5690 "type": "delete",
5691 "k8s-cluster-type": k8s_cluster_type,
5692 "resource-name": resource_name,
5693 "scale": kdu_replica_count,
5694 }
5695 )
5696
5697 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5698 vdu_delete = copy(scaling_info.get("vdu-delete"))
5699 if scaling_info["scaling_direction"] == "IN":
5700 for vdur in reversed(db_vnfr["vdur"]):
5701 if vdu_delete.get(vdur["vdu-id-ref"]):
5702 vdu_delete[vdur["vdu-id-ref"]] -= 1
5703 scaling_info["vdu"].append(
5704 {
5705 "name": vdur.get("name") or vdur.get("vdu-name"),
5706 "vdu_id": vdur["vdu-id-ref"],
5707 "interface": [],
5708 }
5709 )
5710 for interface in vdur["interfaces"]:
5711 scaling_info["vdu"][-1]["interface"].append(
5712 {
5713 "name": interface["name"],
5714 "ip_address": interface["ip-address"],
5715 "mac_address": interface.get("mac-address"),
5716 }
5717 )
5718 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5719
5720 # PRE-SCALE BEGIN
5721 step = "Executing pre-scale vnf-config-primitive"
5722 if scaling_descriptor.get("scaling-config-action"):
5723 for scaling_config_action in scaling_descriptor[
5724 "scaling-config-action"
5725 ]:
5726 if (
5727 scaling_config_action.get("trigger") == "pre-scale-in"
5728 and scaling_type == "SCALE_IN"
5729 ) or (
5730 scaling_config_action.get("trigger") == "pre-scale-out"
5731 and scaling_type == "SCALE_OUT"
5732 ):
5733 vnf_config_primitive = scaling_config_action[
5734 "vnf-config-primitive-name-ref"
5735 ]
5736 step = db_nslcmop_update[
5737 "detailed-status"
5738 ] = "executing pre-scale scaling-config-action '{}'".format(
5739 vnf_config_primitive
5740 )
5741
5742 # look for primitive
5743 for config_primitive in (
5744 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5745 ).get("config-primitive", ()):
5746 if config_primitive["name"] == vnf_config_primitive:
5747 break
5748 else:
5749 raise LcmException(
5750 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5751 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5752 "primitive".format(scaling_group, vnf_config_primitive)
5753 )
5754
5755 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5756 if db_vnfr.get("additionalParamsForVnf"):
5757 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5758
5759 scale_process = "VCA"
5760 db_nsr_update["config-status"] = "configuring pre-scaling"
5761 primitive_params = self._map_primitive_params(
5762 config_primitive, {}, vnfr_params
5763 )
5764
5765 # Pre-scale retry check: Check if this sub-operation has been executed before
5766 op_index = self._check_or_add_scale_suboperation(
5767 db_nslcmop,
5768 vnf_index,
5769 vnf_config_primitive,
5770 primitive_params,
5771 "PRE-SCALE",
5772 )
5773 if op_index == self.SUBOPERATION_STATUS_SKIP:
5774 # Skip sub-operation
5775 result = "COMPLETED"
5776 result_detail = "Done"
5777 self.logger.debug(
5778 logging_text
5779 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5780 vnf_config_primitive, result, result_detail
5781 )
5782 )
5783 else:
5784 if op_index == self.SUBOPERATION_STATUS_NEW:
5785 # New sub-operation: Get index of this sub-operation
5786 op_index = (
5787 len(db_nslcmop.get("_admin", {}).get("operations"))
5788 - 1
5789 )
5790 self.logger.debug(
5791 logging_text
5792 + "vnf_config_primitive={} New sub-operation".format(
5793 vnf_config_primitive
5794 )
5795 )
5796 else:
5797 # retry: Get registered params for this existing sub-operation
5798 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5799 op_index
5800 ]
5801 vnf_index = op.get("member_vnf_index")
5802 vnf_config_primitive = op.get("primitive")
5803 primitive_params = op.get("primitive_params")
5804 self.logger.debug(
5805 logging_text
5806 + "vnf_config_primitive={} Sub-operation retry".format(
5807 vnf_config_primitive
5808 )
5809 )
5810 # Execute the primitive, either with new (first-time) or registered (reintent) args
5811 ee_descriptor_id = config_primitive.get(
5812 "execution-environment-ref"
5813 )
5814 primitive_name = config_primitive.get(
5815 "execution-environment-primitive", vnf_config_primitive
5816 )
5817 ee_id, vca_type = self._look_for_deployed_vca(
5818 nsr_deployed["VCA"],
5819 member_vnf_index=vnf_index,
5820 vdu_id=None,
5821 vdu_count_index=None,
5822 ee_descriptor_id=ee_descriptor_id,
5823 )
5824 result, result_detail = await self._ns_execute_primitive(
5825 ee_id,
5826 primitive_name,
5827 primitive_params,
5828 vca_type=vca_type,
5829 vca_id=vca_id,
5830 )
5831 self.logger.debug(
5832 logging_text
5833 + "vnf_config_primitive={} Done with result {} {}".format(
5834 vnf_config_primitive, result, result_detail
5835 )
5836 )
5837 # Update operationState = COMPLETED | FAILED
5838 self._update_suboperation_status(
5839 db_nslcmop, op_index, result, result_detail
5840 )
5841
5842 if result == "FAILED":
5843 raise LcmException(result_detail)
5844 db_nsr_update["config-status"] = old_config_status
5845 scale_process = None
5846 # PRE-SCALE END
5847
5848 db_nsr_update[
5849 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5850 ] = nb_scale_op
5851 db_nsr_update[
5852 "_admin.scaling-group.{}.time".format(admin_scale_index)
5853 ] = time()
5854
5855 # SCALE-IN VCA - BEGIN
5856 if vca_scaling_info:
5857 step = db_nslcmop_update[
5858 "detailed-status"
5859 ] = "Deleting the execution environments"
5860 scale_process = "VCA"
5861 for vca_info in vca_scaling_info:
5862 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
5863 member_vnf_index = str(vca_info["member-vnf-index"])
5864 self.logger.debug(
5865 logging_text + "vdu info: {}".format(vca_info)
5866 )
5867 if vca_info.get("osm_vdu_id"):
5868 vdu_id = vca_info["osm_vdu_id"]
5869 vdu_index = int(vca_info["vdu_index"])
5870 stage[
5871 1
5872 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5873 member_vnf_index, vdu_id, vdu_index
5874 )
5875 stage[2] = step = "Scaling in VCA"
5876 self._write_op_status(op_id=nslcmop_id, stage=stage)
5877 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5878 config_update = db_nsr["configurationStatus"]
5879 for vca_index, vca in enumerate(vca_update):
5880 if (
5881 (vca or vca.get("ee_id"))
5882 and vca["member-vnf-index"] == member_vnf_index
5883 and vca["vdu_count_index"] == vdu_index
5884 ):
5885 if vca.get("vdu_id"):
5886 config_descriptor = get_configuration(
5887 db_vnfd, vca.get("vdu_id")
5888 )
5889 elif vca.get("kdu_name"):
5890 config_descriptor = get_configuration(
5891 db_vnfd, vca.get("kdu_name")
5892 )
5893 else:
5894 config_descriptor = get_configuration(
5895 db_vnfd, db_vnfd["id"]
5896 )
5897 operation_params = (
5898 db_nslcmop.get("operationParams") or {}
5899 )
5900 exec_terminate_primitives = not operation_params.get(
5901 "skip_terminate_primitives"
5902 ) and vca.get("needed_terminate")
5903 task = asyncio.ensure_future(
5904 asyncio.wait_for(
5905 self.destroy_N2VC(
5906 logging_text,
5907 db_nslcmop,
5908 vca,
5909 config_descriptor,
5910 vca_index,
5911 destroy_ee=True,
5912 exec_primitives=exec_terminate_primitives,
5913 scaling_in=True,
5914 vca_id=vca_id,
5915 ),
5916 timeout=self.timeout_charm_delete,
5917 )
5918 )
5919 tasks_dict_info[task] = "Terminating VCA {}".format(
5920 vca.get("ee_id")
5921 )
5922 del vca_update[vca_index]
5923 del config_update[vca_index]
5924 # wait for pending tasks of terminate primitives
5925 if tasks_dict_info:
5926 self.logger.debug(
5927 logging_text
5928 + "Waiting for tasks {}".format(
5929 list(tasks_dict_info.keys())
5930 )
5931 )
5932 error_list = await self._wait_for_tasks(
5933 logging_text,
5934 tasks_dict_info,
5935 min(
5936 self.timeout_charm_delete, self.timeout_ns_terminate
5937 ),
5938 stage,
5939 nslcmop_id,
5940 )
5941 tasks_dict_info.clear()
5942 if error_list:
5943 raise LcmException("; ".join(error_list))
5944
5945 db_vca_and_config_update = {
5946 "_admin.deployed.VCA": vca_update,
5947 "configurationStatus": config_update,
5948 }
5949 self.update_db_2(
5950 "nsrs", db_nsr["_id"], db_vca_and_config_update
5951 )
5952 scale_process = None
5953 # SCALE-IN VCA - END
5954
5955 # SCALE RO - BEGIN
5956 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5957 scale_process = "RO"
5958 if self.ro_config.get("ng"):
5959 await self._scale_ng_ro(
5960 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5961 )
5962 scaling_info.pop("vdu-create", None)
5963 scaling_info.pop("vdu-delete", None)
5964
5965 scale_process = None
5966 # SCALE RO - END
5967
5968 # SCALE KDU - BEGIN
5969 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5970 scale_process = "KDU"
5971 await self._scale_kdu(
5972 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5973 )
5974 scaling_info.pop("kdu-create", None)
5975 scaling_info.pop("kdu-delete", None)
5976
5977 scale_process = None
5978 # SCALE KDU - END
5979
5980 if db_nsr_update:
5981 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5982
5983 # SCALE-UP VCA - BEGIN
5984 if vca_scaling_info:
5985 step = db_nslcmop_update[
5986 "detailed-status"
5987 ] = "Creating new execution environments"
5988 scale_process = "VCA"
5989 for vca_info in vca_scaling_info:
5990 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
5991 member_vnf_index = str(vca_info["member-vnf-index"])
5992 self.logger.debug(
5993 logging_text + "vdu info: {}".format(vca_info)
5994 )
5995 vnfd_id = db_vnfr["vnfd-ref"]
5996 if vca_info.get("osm_vdu_id"):
5997 vdu_index = int(vca_info["vdu_index"])
5998 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5999 if db_vnfr.get("additionalParamsForVnf"):
6000 deploy_params.update(
6001 parse_yaml_strings(
6002 db_vnfr["additionalParamsForVnf"].copy()
6003 )
6004 )
6005 descriptor_config = get_configuration(
6006 db_vnfd, db_vnfd["id"]
6007 )
6008 if descriptor_config:
6009 vdu_id = None
6010 vdu_name = None
6011 kdu_name = None
6012 self._deploy_n2vc(
6013 logging_text=logging_text
6014 + "member_vnf_index={} ".format(member_vnf_index),
6015 db_nsr=db_nsr,
6016 db_vnfr=db_vnfr,
6017 nslcmop_id=nslcmop_id,
6018 nsr_id=nsr_id,
6019 nsi_id=nsi_id,
6020 vnfd_id=vnfd_id,
6021 vdu_id=vdu_id,
6022 kdu_name=kdu_name,
6023 member_vnf_index=member_vnf_index,
6024 vdu_index=vdu_index,
6025 vdu_name=vdu_name,
6026 deploy_params=deploy_params,
6027 descriptor_config=descriptor_config,
6028 base_folder=base_folder,
6029 task_instantiation_info=tasks_dict_info,
6030 stage=stage,
6031 )
6032 vdu_id = vca_info["osm_vdu_id"]
6033 vdur = find_in_list(
6034 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6035 )
6036 descriptor_config = get_configuration(db_vnfd, vdu_id)
6037 if vdur.get("additionalParams"):
6038 deploy_params_vdu = parse_yaml_strings(
6039 vdur["additionalParams"]
6040 )
6041 else:
6042 deploy_params_vdu = deploy_params
6043 deploy_params_vdu["OSM"] = get_osm_params(
6044 db_vnfr, vdu_id, vdu_count_index=vdu_index
6045 )
6046 if descriptor_config:
6047 vdu_name = None
6048 kdu_name = None
6049 stage[
6050 1
6051 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6052 member_vnf_index, vdu_id, vdu_index
6053 )
6054 stage[2] = step = "Scaling out VCA"
6055 self._write_op_status(op_id=nslcmop_id, stage=stage)
6056 self._deploy_n2vc(
6057 logging_text=logging_text
6058 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6059 member_vnf_index, vdu_id, vdu_index
6060 ),
6061 db_nsr=db_nsr,
6062 db_vnfr=db_vnfr,
6063 nslcmop_id=nslcmop_id,
6064 nsr_id=nsr_id,
6065 nsi_id=nsi_id,
6066 vnfd_id=vnfd_id,
6067 vdu_id=vdu_id,
6068 kdu_name=kdu_name,
6069 member_vnf_index=member_vnf_index,
6070 vdu_index=vdu_index,
6071 vdu_name=vdu_name,
6072 deploy_params=deploy_params_vdu,
6073 descriptor_config=descriptor_config,
6074 base_folder=base_folder,
6075 task_instantiation_info=tasks_dict_info,
6076 stage=stage,
6077 )
6078 # SCALE-UP VCA - END
6079 scale_process = None
6080
6081 # POST-SCALE BEGIN
6082 # execute primitive service POST-SCALING
6083 step = "Executing post-scale vnf-config-primitive"
6084 if scaling_descriptor.get("scaling-config-action"):
6085 for scaling_config_action in scaling_descriptor[
6086 "scaling-config-action"
6087 ]:
6088 if (
6089 scaling_config_action.get("trigger") == "post-scale-in"
6090 and scaling_type == "SCALE_IN"
6091 ) or (
6092 scaling_config_action.get("trigger") == "post-scale-out"
6093 and scaling_type == "SCALE_OUT"
6094 ):
6095 vnf_config_primitive = scaling_config_action[
6096 "vnf-config-primitive-name-ref"
6097 ]
6098 step = db_nslcmop_update[
6099 "detailed-status"
6100 ] = "executing post-scale scaling-config-action '{}'".format(
6101 vnf_config_primitive
6102 )
6103
6104 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6105 if db_vnfr.get("additionalParamsForVnf"):
6106 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6107
6108 # look for primitive
6109 for config_primitive in (
6110 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6111 ).get("config-primitive", ()):
6112 if config_primitive["name"] == vnf_config_primitive:
6113 break
6114 else:
6115 raise LcmException(
6116 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6117 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6118 "config-primitive".format(
6119 scaling_group, vnf_config_primitive
6120 )
6121 )
6122 scale_process = "VCA"
6123 db_nsr_update["config-status"] = "configuring post-scaling"
6124 primitive_params = self._map_primitive_params(
6125 config_primitive, {}, vnfr_params
6126 )
6127
6128 # Post-scale retry check: Check if this sub-operation has been executed before
6129 op_index = self._check_or_add_scale_suboperation(
6130 db_nslcmop,
6131 vnf_index,
6132 vnf_config_primitive,
6133 primitive_params,
6134 "POST-SCALE",
6135 )
6136 if op_index == self.SUBOPERATION_STATUS_SKIP:
6137 # Skip sub-operation
6138 result = "COMPLETED"
6139 result_detail = "Done"
6140 self.logger.debug(
6141 logging_text
6142 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6143 vnf_config_primitive, result, result_detail
6144 )
6145 )
6146 else:
6147 if op_index == self.SUBOPERATION_STATUS_NEW:
6148 # New sub-operation: Get index of this sub-operation
6149 op_index = (
6150 len(db_nslcmop.get("_admin", {}).get("operations"))
6151 - 1
6152 )
6153 self.logger.debug(
6154 logging_text
6155 + "vnf_config_primitive={} New sub-operation".format(
6156 vnf_config_primitive
6157 )
6158 )
6159 else:
6160 # retry: Get registered params for this existing sub-operation
6161 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6162 op_index
6163 ]
6164 vnf_index = op.get("member_vnf_index")
6165 vnf_config_primitive = op.get("primitive")
6166 primitive_params = op.get("primitive_params")
6167 self.logger.debug(
6168 logging_text
6169 + "vnf_config_primitive={} Sub-operation retry".format(
6170 vnf_config_primitive
6171 )
6172 )
6173 # Execute the primitive, either with new (first-time) or registered (reintent) args
6174 ee_descriptor_id = config_primitive.get(
6175 "execution-environment-ref"
6176 )
6177 primitive_name = config_primitive.get(
6178 "execution-environment-primitive", vnf_config_primitive
6179 )
6180 ee_id, vca_type = self._look_for_deployed_vca(
6181 nsr_deployed["VCA"],
6182 member_vnf_index=vnf_index,
6183 vdu_id=None,
6184 vdu_count_index=None,
6185 ee_descriptor_id=ee_descriptor_id,
6186 )
6187 result, result_detail = await self._ns_execute_primitive(
6188 ee_id,
6189 primitive_name,
6190 primitive_params,
6191 vca_type=vca_type,
6192 vca_id=vca_id,
6193 )
6194 self.logger.debug(
6195 logging_text
6196 + "vnf_config_primitive={} Done with result {} {}".format(
6197 vnf_config_primitive, result, result_detail
6198 )
6199 )
6200 # Update operationState = COMPLETED | FAILED
6201 self._update_suboperation_status(
6202 db_nslcmop, op_index, result, result_detail
6203 )
6204
6205 if result == "FAILED":
6206 raise LcmException(result_detail)
6207 db_nsr_update["config-status"] = old_config_status
6208 scale_process = None
6209 # POST-SCALE END
6210
6211 db_nsr_update[
6212 "detailed-status"
6213 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6214 db_nsr_update["operational-status"] = (
6215 "running"
6216 if old_operational_status == "failed"
6217 else old_operational_status
6218 )
6219 db_nsr_update["config-status"] = old_config_status
6220 return
6221 except (
6222 ROclient.ROClientException,
6223 DbException,
6224 LcmException,
6225 NgRoException,
6226 ) as e:
6227 self.logger.error(logging_text + "Exit Exception {}".format(e))
6228 exc = e
6229 except asyncio.CancelledError:
6230 self.logger.error(
6231 logging_text + "Cancelled Exception while '{}'".format(step)
6232 )
6233 exc = "Operation was cancelled"
6234 except Exception as e:
6235 exc = traceback.format_exc()
6236 self.logger.critical(
6237 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6238 exc_info=True,
6239 )
6240 finally:
6241 self._write_ns_status(
6242 nsr_id=nsr_id,
6243 ns_state=None,
6244 current_operation="IDLE",
6245 current_operation_id=None,
6246 )
6247 if tasks_dict_info:
6248 stage[1] = "Waiting for instantiate pending tasks."
6249 self.logger.debug(logging_text + stage[1])
6250 exc = await self._wait_for_tasks(
6251 logging_text,
6252 tasks_dict_info,
6253 self.timeout_ns_deploy,
6254 stage,
6255 nslcmop_id,
6256 nsr_id=nsr_id,
6257 )
6258 if exc:
6259 db_nslcmop_update[
6260 "detailed-status"
6261 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6262 nslcmop_operation_state = "FAILED"
6263 if db_nsr:
6264 db_nsr_update["operational-status"] = old_operational_status
6265 db_nsr_update["config-status"] = old_config_status
6266 db_nsr_update["detailed-status"] = ""
6267 if scale_process:
6268 if "VCA" in scale_process:
6269 db_nsr_update["config-status"] = "failed"
6270 if "RO" in scale_process:
6271 db_nsr_update["operational-status"] = "failed"
6272 db_nsr_update[
6273 "detailed-status"
6274 ] = "FAILED scaling nslcmop={} {}: {}".format(
6275 nslcmop_id, step, exc
6276 )
6277 else:
6278 error_description_nslcmop = None
6279 nslcmop_operation_state = "COMPLETED"
6280 db_nslcmop_update["detailed-status"] = "Done"
6281
6282 self._write_op_status(
6283 op_id=nslcmop_id,
6284 stage="",
6285 error_message=error_description_nslcmop,
6286 operation_state=nslcmop_operation_state,
6287 other_update=db_nslcmop_update,
6288 )
6289 if db_nsr:
6290 self._write_ns_status(
6291 nsr_id=nsr_id,
6292 ns_state=None,
6293 current_operation="IDLE",
6294 current_operation_id=None,
6295 other_update=db_nsr_update,
6296 )
6297
6298 if nslcmop_operation_state:
6299 try:
6300 msg = {
6301 "nsr_id": nsr_id,
6302 "nslcmop_id": nslcmop_id,
6303 "operationState": nslcmop_operation_state,
6304 }
6305 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6306 except Exception as e:
6307 self.logger.error(
6308 logging_text + "kafka_write notification Exception {}".format(e)
6309 )
6310 self.logger.debug(logging_text + "Exit")
6311 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6312
6313 async def _scale_kdu(
6314 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6315 ):
6316 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6317 for kdu_name in _scaling_info:
6318 for kdu_scaling_info in _scaling_info[kdu_name]:
6319 deployed_kdu, index = get_deployed_kdu(
6320 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6321 )
6322 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6323 kdu_instance = deployed_kdu["kdu-instance"]
6324 kdu_model = deployed_kdu.get("kdu-model")
6325 scale = int(kdu_scaling_info["scale"])
6326 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6327
6328 db_dict = {
6329 "collection": "nsrs",
6330 "filter": {"_id": nsr_id},
6331 "path": "_admin.deployed.K8s.{}".format(index),
6332 }
6333
6334 step = "scaling application {}".format(
6335 kdu_scaling_info["resource-name"]
6336 )
6337 self.logger.debug(logging_text + step)
6338
6339 if kdu_scaling_info["type"] == "delete":
6340 kdu_config = get_configuration(db_vnfd, kdu_name)
6341 if (
6342 kdu_config
6343 and kdu_config.get("terminate-config-primitive")
6344 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6345 ):
6346 terminate_config_primitive_list = kdu_config.get(
6347 "terminate-config-primitive"
6348 )
6349 terminate_config_primitive_list.sort(
6350 key=lambda val: int(val["seq"])
6351 )
6352
6353 for (
6354 terminate_config_primitive
6355 ) in terminate_config_primitive_list:
6356 primitive_params_ = self._map_primitive_params(
6357 terminate_config_primitive, {}, {}
6358 )
6359 step = "execute terminate config primitive"
6360 self.logger.debug(logging_text + step)
6361 await asyncio.wait_for(
6362 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6363 cluster_uuid=cluster_uuid,
6364 kdu_instance=kdu_instance,
6365 primitive_name=terminate_config_primitive["name"],
6366 params=primitive_params_,
6367 db_dict=db_dict,
6368 vca_id=vca_id,
6369 ),
6370 timeout=600,
6371 )
6372
6373 await asyncio.wait_for(
6374 self.k8scluster_map[k8s_cluster_type].scale(
6375 kdu_instance,
6376 scale,
6377 kdu_scaling_info["resource-name"],
6378 vca_id=vca_id,
6379 cluster_uuid=cluster_uuid,
6380 kdu_model=kdu_model,
6381 atomic=True,
6382 db_dict=db_dict,
6383 ),
6384 timeout=self.timeout_vca_on_error,
6385 )
6386
6387 if kdu_scaling_info["type"] == "create":
6388 kdu_config = get_configuration(db_vnfd, kdu_name)
6389 if (
6390 kdu_config
6391 and kdu_config.get("initial-config-primitive")
6392 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6393 ):
6394 initial_config_primitive_list = kdu_config.get(
6395 "initial-config-primitive"
6396 )
6397 initial_config_primitive_list.sort(
6398 key=lambda val: int(val["seq"])
6399 )
6400
6401 for initial_config_primitive in initial_config_primitive_list:
6402 primitive_params_ = self._map_primitive_params(
6403 initial_config_primitive, {}, {}
6404 )
6405 step = "execute initial config primitive"
6406 self.logger.debug(logging_text + step)
6407 await asyncio.wait_for(
6408 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6409 cluster_uuid=cluster_uuid,
6410 kdu_instance=kdu_instance,
6411 primitive_name=initial_config_primitive["name"],
6412 params=primitive_params_,
6413 db_dict=db_dict,
6414 vca_id=vca_id,
6415 ),
6416 timeout=600,
6417 )
6418
6419 async def _scale_ng_ro(
6420 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6421 ):
6422 nsr_id = db_nslcmop["nsInstanceId"]
6423 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6424 db_vnfrs = {}
6425
6426 # read from db: vnfd's for every vnf
6427 db_vnfds = []
6428
6429 # for each vnf in ns, read vnfd
6430 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6431 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6432 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6433 # if we haven't this vnfd, read it from db
6434 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6435 # read from db
6436 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6437 db_vnfds.append(vnfd)
6438 n2vc_key = self.n2vc.get_public_key()
6439 n2vc_key_list = [n2vc_key]
6440 self.scale_vnfr(
6441 db_vnfr,
6442 vdu_scaling_info.get("vdu-create"),
6443 vdu_scaling_info.get("vdu-delete"),
6444 mark_delete=True,
6445 )
6446 # db_vnfr has been updated, update db_vnfrs to use it
6447 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6448 await self._instantiate_ng_ro(
6449 logging_text,
6450 nsr_id,
6451 db_nsd,
6452 db_nsr,
6453 db_nslcmop,
6454 db_vnfrs,
6455 db_vnfds,
6456 n2vc_key_list,
6457 stage=stage,
6458 start_deploy=time(),
6459 timeout_ns_deploy=self.timeout_ns_deploy,
6460 )
6461 if vdu_scaling_info.get("vdu-delete"):
6462 self.scale_vnfr(
6463 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6464 )
6465
6466 async def extract_prometheus_scrape_jobs(
6467 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6468 ):
6469 # look if exist a file called 'prometheus*.j2' and
6470 artifact_content = self.fs.dir_ls(artifact_path)
6471 job_file = next(
6472 (
6473 f
6474 for f in artifact_content
6475 if f.startswith("prometheus") and f.endswith(".j2")
6476 ),
6477 None,
6478 )
6479 if not job_file:
6480 return
6481 with self.fs.file_open((artifact_path, job_file), "r") as f:
6482 job_data = f.read()
6483
6484 # TODO get_service
6485 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6486 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6487 host_port = "80"
6488 vnfr_id = vnfr_id.replace("-", "")
6489 variables = {
6490 "JOB_NAME": vnfr_id,
6491 "TARGET_IP": target_ip,
6492 "EXPORTER_POD_IP": host_name,
6493 "EXPORTER_POD_PORT": host_port,
6494 }
6495 job_list = parse_job(job_data, variables)
6496 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6497 for job in job_list:
6498 if (
6499 not isinstance(job.get("job_name"), str)
6500 or vnfr_id not in job["job_name"]
6501 ):
6502 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6503 job["nsr_id"] = nsr_id
6504 job["vnfr_id"] = vnfr_id
6505 return job_list
6506
6507 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6508 """
6509 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6510
6511 :param: vim_account_id: VIM Account ID
6512
6513 :return: (cloud_name, cloud_credential)
6514 """
6515 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6516 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6517
6518 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6519 """
6520 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6521
6522 :param: vim_account_id: VIM Account ID
6523
6524 :return: (cloud_name, cloud_credential)
6525 """
6526 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6527 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")