Fix flake8 issues in ns.py
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 nsr_id = filter.get("_id")
366 try:
367 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
368 cluster_uuid=cluster_uuid,
369 kdu_instance=kdu_instance,
370 yaml_format=False,
371 complete_status=True,
372 vca_id=vca_id,
373 )
374
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 if cluster_type in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self.k8sclusterjuju.update_vca_status(
383 db_dict["vcaStatus"],
384 kdu_instance,
385 vca_id=vca_id,
386 )
387
388 self.logger.debug(
389 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
390 )
391
392 # write to database
393 self.update_db_2("nsrs", nsr_id, db_dict)
394 except (asyncio.CancelledError, asyncio.TimeoutError):
395 raise
396 except Exception as e:
397 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
398
399 @staticmethod
400 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
401 try:
402 env = Environment(undefined=StrictUndefined)
403 template = env.from_string(cloud_init_text)
404 return template.render(additional_params or {})
405 except UndefinedError as e:
406 raise LcmException(
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
410 )
411 except (TemplateError, TemplateNotFound) as e:
412 raise LcmException(
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
414 vnfd_id, vdu_id, e
415 )
416 )
417
418 def _get_vdu_cloud_init_content(self, vdu, vnfd):
419 cloud_init_content = cloud_init_file = None
420 try:
421 if vdu.get("cloud-init-file"):
422 base_folder = vnfd["_admin"]["storage"]
423 if base_folder["pkg-dir"]:
424 cloud_init_file = "{}/{}/cloud_init/{}".format(
425 base_folder["folder"],
426 base_folder["pkg-dir"],
427 vdu["cloud-init-file"],
428 )
429 else:
430 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
431 base_folder["folder"],
432 vdu["cloud-init-file"],
433 )
434 with self.fs.file_open(cloud_init_file, "r") as ci_file:
435 cloud_init_content = ci_file.read()
436 elif vdu.get("cloud-init"):
437 cloud_init_content = vdu["cloud-init"]
438
439 return cloud_init_content
440 except FsException as e:
441 raise LcmException(
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd["id"], vdu["id"], cloud_init_file, e
444 )
445 )
446
447 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
448 vdur = next(
449 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
450 {}
451 )
452 additional_params = vdur.get("additionalParams")
453 return parse_yaml_strings(additional_params)
454
455 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
456 """
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
463 """
464 vnfd_RO = deepcopy(vnfd)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO.pop("_id", None)
467 vnfd_RO.pop("_admin", None)
468 vnfd_RO.pop("monitoring-param", None)
469 vnfd_RO.pop("scaling-group-descriptor", None)
470 vnfd_RO.pop("kdu", None)
471 vnfd_RO.pop("k8s-cluster", None)
472 if new_id:
473 vnfd_RO["id"] = new_id
474
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu in get_iterable(vnfd_RO, "vdu"):
477 vdu.pop("cloud-init-file", None)
478 vdu.pop("cloud-init", None)
479 return vnfd_RO
480
481 @staticmethod
482 def ip_profile_2_RO(ip_profile):
483 RO_ip_profile = deepcopy(ip_profile)
484 if "dns-server" in RO_ip_profile:
485 if isinstance(RO_ip_profile["dns-server"], list):
486 RO_ip_profile["dns-address"] = []
487 for ds in RO_ip_profile.pop("dns-server"):
488 RO_ip_profile["dns-address"].append(ds["address"])
489 else:
490 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
491 if RO_ip_profile.get("ip-version") == "ipv4":
492 RO_ip_profile["ip-version"] = "IPv4"
493 if RO_ip_profile.get("ip-version") == "ipv6":
494 RO_ip_profile["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile:
496 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
497 return RO_ip_profile
498
499 def _get_ro_vim_id_for_vim_account(self, vim_account):
500 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
501 if db_vim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "VIM={} is not available. operationalState={}".format(
504 vim_account, db_vim["_admin"]["operationalState"]
505 )
506 )
507 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
508 return RO_vim_id
509
510 def get_ro_wim_id_for_wim_account(self, wim_account):
511 if isinstance(wim_account, str):
512 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
513 if db_wim["_admin"]["operationalState"] != "ENABLED":
514 raise LcmException(
515 "WIM={} is not available. operationalState={}".format(
516 wim_account, db_wim["_admin"]["operationalState"]
517 )
518 )
519 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
520 return RO_wim_id
521 else:
522 return wim_account
523
524 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
525
526 db_vdu_push_list = []
527 template_vdur = []
528 db_update = {"_admin.modified": time()}
529 if vdu_create:
530 for vdu_id, vdu_count in vdu_create.items():
531 vdur = next(
532 (
533 vdur
534 for vdur in reversed(db_vnfr["vdur"])
535 if vdur["vdu-id-ref"] == vdu_id
536 ),
537 None,
538 )
539 if not vdur:
540 # Read the template saved in the db:
541 self.logger.debug("No vdur in the database. Using the vdur-template to scale")
542 vdur_template = db_vnfr.get("vdur-template")
543 if not vdur_template:
544 raise LcmException(
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
546 vdu_id
547 )
548 )
549 vdur = vdur_template[0]
550 # Delete a template from the database after using it
551 self.db.set_one(
552 "vnfrs",
553 {"_id": db_vnfr["_id"]},
554 None,
555 pull={"vdur-template": {"_id": vdur['_id']}}
556 )
557 for count in range(vdu_count):
558 vdur_copy = deepcopy(vdur)
559 vdur_copy["status"] = "BUILD"
560 vdur_copy["status-detailed"] = None
561 vdur_copy["ip-address"] = None
562 vdur_copy["_id"] = str(uuid4())
563 vdur_copy["count-index"] += count + 1
564 vdur_copy["id"] = "{}-{}".format(
565 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
566 )
567 vdur_copy.pop("vim_info", None)
568 for iface in vdur_copy["interfaces"]:
569 if iface.get("fixed-ip"):
570 iface["ip-address"] = self.increment_ip_mac(
571 iface["ip-address"], count + 1
572 )
573 else:
574 iface.pop("ip-address", None)
575 if iface.get("fixed-mac"):
576 iface["mac-address"] = self.increment_ip_mac(
577 iface["mac-address"], count + 1
578 )
579 else:
580 iface.pop("mac-address", None)
581 if db_vnfr["vdur"]:
582 iface.pop(
583 "mgmt_vnf", None
584 ) # only first vdu can be managment of vnf
585 db_vdu_push_list.append(vdur_copy)
586 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
587 if vdu_delete:
588 if len(db_vnfr["vdur"]) == 1:
589 # The scale will move to 0 instances
590 self.logger.debug("Scaling to 0 !, creating the template with the last vdur")
591 template_vdur = [db_vnfr["vdur"][0]]
592 for vdu_id, vdu_count in vdu_delete.items():
593 if mark_delete:
594 indexes_to_delete = [
595 iv[0]
596 for iv in enumerate(db_vnfr["vdur"])
597 if iv[1]["vdu-id-ref"] == vdu_id
598 ]
599 db_update.update(
600 {
601 "vdur.{}.status".format(i): "DELETING"
602 for i in indexes_to_delete[-vdu_count:]
603 }
604 )
605 else:
606 # it must be deleted one by one because common.db does not allow otherwise
607 vdus_to_delete = [
608 v
609 for v in reversed(db_vnfr["vdur"])
610 if v["vdu-id-ref"] == vdu_id
611 ]
612 for vdu in vdus_to_delete[:vdu_count]:
613 self.db.set_one(
614 "vnfrs",
615 {"_id": db_vnfr["_id"]},
616 None,
617 pull={"vdur": {"_id": vdu["_id"]}},
618 )
619 db_push = {}
620 if db_vdu_push_list:
621 db_push["vdur"] = db_vdu_push_list
622 if template_vdur:
623 db_push["vdur-template"] = template_vdur
624 if not db_push:
625 db_push = None
626 db_vnfr["vdur-template"] = template_vdur
627 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
628 # modify passed dictionary db_vnfr
629 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
630 db_vnfr["vdur"] = db_vnfr_["vdur"]
631
632 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
633 """
634 Updates database nsr with the RO info for the created vld
635 :param ns_update_nsr: dictionary to be filled with the updated info
636 :param db_nsr: content of db_nsr. This is also modified
637 :param nsr_desc_RO: nsr descriptor from RO
638 :return: Nothing, LcmException is raised on errors
639 """
640
641 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
642 for net_RO in get_iterable(nsr_desc_RO, "nets"):
643 if vld["id"] != net_RO.get("ns_net_osm_id"):
644 continue
645 vld["vim-id"] = net_RO.get("vim_net_id")
646 vld["name"] = net_RO.get("vim_name")
647 vld["status"] = net_RO.get("status")
648 vld["status-detailed"] = net_RO.get("error_msg")
649 ns_update_nsr["vld.{}".format(vld_index)] = vld
650 break
651 else:
652 raise LcmException(
653 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
654 )
655
656 def set_vnfr_at_error(self, db_vnfrs, error_text):
657 try:
658 for db_vnfr in db_vnfrs.values():
659 vnfr_update = {"status": "ERROR"}
660 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
661 if "status" not in vdur:
662 vdur["status"] = "ERROR"
663 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
664 if error_text:
665 vdur["status-detailed"] = str(error_text)
666 vnfr_update[
667 "vdur.{}.status-detailed".format(vdu_index)
668 ] = "ERROR"
669 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
670 except DbException as e:
671 self.logger.error("Cannot update vnf. {}".format(e))
672
673 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
674 """
675 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
676 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
677 :param nsr_desc_RO: nsr descriptor from RO
678 :return: Nothing, LcmException is raised on errors
679 """
680 for vnf_index, db_vnfr in db_vnfrs.items():
681 for vnf_RO in nsr_desc_RO["vnfs"]:
682 if vnf_RO["member_vnf_index"] != vnf_index:
683 continue
684 vnfr_update = {}
685 if vnf_RO.get("ip_address"):
686 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
687 "ip_address"
688 ].split(";")[0]
689 elif not db_vnfr.get("ip-address"):
690 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
691 raise LcmExceptionNoMgmtIP(
692 "ns member_vnf_index '{}' has no IP address".format(
693 vnf_index
694 )
695 )
696
697 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
698 vdur_RO_count_index = 0
699 if vdur.get("pdu-type"):
700 continue
701 for vdur_RO in get_iterable(vnf_RO, "vms"):
702 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
703 continue
704 if vdur["count-index"] != vdur_RO_count_index:
705 vdur_RO_count_index += 1
706 continue
707 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
708 if vdur_RO.get("ip_address"):
709 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
710 else:
711 vdur["ip-address"] = None
712 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
713 vdur["name"] = vdur_RO.get("vim_name")
714 vdur["status"] = vdur_RO.get("status")
715 vdur["status-detailed"] = vdur_RO.get("error_msg")
716 for ifacer in get_iterable(vdur, "interfaces"):
717 for interface_RO in get_iterable(vdur_RO, "interfaces"):
718 if ifacer["name"] == interface_RO.get("internal_name"):
719 ifacer["ip-address"] = interface_RO.get(
720 "ip_address"
721 )
722 ifacer["mac-address"] = interface_RO.get(
723 "mac_address"
724 )
725 break
726 else:
727 raise LcmException(
728 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
729 "from VIM info".format(
730 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
731 )
732 )
733 vnfr_update["vdur.{}".format(vdu_index)] = vdur
734 break
735 else:
736 raise LcmException(
737 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
738 "VIM info".format(
739 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
740 )
741 )
742
743 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
744 for net_RO in get_iterable(nsr_desc_RO, "nets"):
745 if vld["id"] != net_RO.get("vnf_net_osm_id"):
746 continue
747 vld["vim-id"] = net_RO.get("vim_net_id")
748 vld["name"] = net_RO.get("vim_name")
749 vld["status"] = net_RO.get("status")
750 vld["status-detailed"] = net_RO.get("error_msg")
751 vnfr_update["vld.{}".format(vld_index)] = vld
752 break
753 else:
754 raise LcmException(
755 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
756 vnf_index, vld["id"]
757 )
758 )
759
760 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
761 break
762
763 else:
764 raise LcmException(
765 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
766 vnf_index
767 )
768 )
769
770 def _get_ns_config_info(self, nsr_id):
771 """
772 Generates a mapping between vnf,vdu elements and the N2VC id
773 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
774 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
775 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
776 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
777 """
778 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
779 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
780 mapping = {}
781 ns_config_info = {"osm-config-mapping": mapping}
782 for vca in vca_deployed_list:
783 if not vca["member-vnf-index"]:
784 continue
785 if not vca["vdu_id"]:
786 mapping[vca["member-vnf-index"]] = vca["application"]
787 else:
788 mapping[
789 "{}.{}.{}".format(
790 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
791 )
792 ] = vca["application"]
793 return ns_config_info
794
795 async def _instantiate_ng_ro(
796 self,
797 logging_text,
798 nsr_id,
799 nsd,
800 db_nsr,
801 db_nslcmop,
802 db_vnfrs,
803 db_vnfds,
804 n2vc_key_list,
805 stage,
806 start_deploy,
807 timeout_ns_deploy,
808 ):
809
810 db_vims = {}
811
812 def get_vim_account(vim_account_id):
813 nonlocal db_vims
814 if vim_account_id in db_vims:
815 return db_vims[vim_account_id]
816 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
817 db_vims[vim_account_id] = db_vim
818 return db_vim
819
820 # modify target_vld info with instantiation parameters
821 def parse_vld_instantiation_params(
822 target_vim, target_vld, vld_params, target_sdn
823 ):
824 if vld_params.get("ip-profile"):
825 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
826 "ip-profile"
827 ]
828 if vld_params.get("provider-network"):
829 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
830 "provider-network"
831 ]
832 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
833 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
834 "provider-network"
835 ]["sdn-ports"]
836 if vld_params.get("wimAccountId"):
837 target_wim = "wim:{}".format(vld_params["wimAccountId"])
838 target_vld["vim_info"][target_wim] = {}
839 for param in ("vim-network-name", "vim-network-id"):
840 if vld_params.get(param):
841 if isinstance(vld_params[param], dict):
842 for vim, vim_net in vld_params[param].items():
843 other_target_vim = "vim:" + vim
844 populate_dict(
845 target_vld["vim_info"],
846 (other_target_vim, param.replace("-", "_")),
847 vim_net,
848 )
849 else: # isinstance str
850 target_vld["vim_info"][target_vim][
851 param.replace("-", "_")
852 ] = vld_params[param]
853 if vld_params.get("common_id"):
854 target_vld["common_id"] = vld_params.get("common_id")
855
856 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
857 def update_ns_vld_target(target, ns_params):
858 for vnf_params in ns_params.get("vnf", ()):
859 if vnf_params.get("vimAccountId"):
860 target_vnf = next(
861 (
862 vnfr
863 for vnfr in db_vnfrs.values()
864 if vnf_params["member-vnf-index"]
865 == vnfr["member-vnf-index-ref"]
866 ),
867 None,
868 )
869 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
870 for a_index, a_vld in enumerate(target["ns"]["vld"]):
871 target_vld = find_in_list(
872 get_iterable(vdur, "interfaces"),
873 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
874 )
875 if target_vld:
876 if vnf_params.get("vimAccountId") not in a_vld.get(
877 "vim_info", {}
878 ):
879 target["ns"]["vld"][a_index].get("vim_info").update(
880 {
881 "vim:{}".format(vnf_params["vimAccountId"]): {
882 "vim_network_name": ""
883 }
884 }
885 )
886
887 nslcmop_id = db_nslcmop["_id"]
888 target = {
889 "name": db_nsr["name"],
890 "ns": {"vld": []},
891 "vnf": [],
892 "image": deepcopy(db_nsr["image"]),
893 "flavor": deepcopy(db_nsr["flavor"]),
894 "action_id": nslcmop_id,
895 "cloud_init_content": {},
896 }
897 for image in target["image"]:
898 image["vim_info"] = {}
899 for flavor in target["flavor"]:
900 flavor["vim_info"] = {}
901 if db_nsr.get("affinity-or-anti-affinity-group"):
902 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
903 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
904 affinity_or_anti_affinity_group["vim_info"] = {}
905
906 if db_nslcmop.get("lcmOperationType") != "instantiate":
907 # get parameters of instantiation:
908 db_nslcmop_instantiate = self.db.get_list(
909 "nslcmops",
910 {
911 "nsInstanceId": db_nslcmop["nsInstanceId"],
912 "lcmOperationType": "instantiate",
913 },
914 )[-1]
915 ns_params = db_nslcmop_instantiate.get("operationParams")
916 else:
917 ns_params = db_nslcmop.get("operationParams")
918 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
919 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
920
921 cp2target = {}
922 for vld_index, vld in enumerate(db_nsr.get("vld")):
923 target_vim = "vim:{}".format(ns_params["vimAccountId"])
924 target_vld = {
925 "id": vld["id"],
926 "name": vld["name"],
927 "mgmt-network": vld.get("mgmt-network", False),
928 "type": vld.get("type"),
929 "vim_info": {
930 target_vim: {
931 "vim_network_name": vld.get("vim-network-name"),
932 "vim_account_id": ns_params["vimAccountId"],
933 }
934 },
935 }
936 # check if this network needs SDN assist
937 if vld.get("pci-interfaces"):
938 db_vim = get_vim_account(ns_params["vimAccountId"])
939 sdnc_id = db_vim["config"].get("sdn-controller")
940 if sdnc_id:
941 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
942 target_sdn = "sdn:{}".format(sdnc_id)
943 target_vld["vim_info"][target_sdn] = {
944 "sdn": True,
945 "target_vim": target_vim,
946 "vlds": [sdn_vld],
947 "type": vld.get("type"),
948 }
949
950 nsd_vnf_profiles = get_vnf_profiles(nsd)
951 for nsd_vnf_profile in nsd_vnf_profiles:
952 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
953 if cp["virtual-link-profile-id"] == vld["id"]:
954 cp2target[
955 "member_vnf:{}.{}".format(
956 cp["constituent-cpd-id"][0][
957 "constituent-base-element-id"
958 ],
959 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
960 )
961 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
962
963 # check at nsd descriptor, if there is an ip-profile
964 vld_params = {}
965 nsd_vlp = find_in_list(
966 get_virtual_link_profiles(nsd),
967 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
968 == vld["id"],
969 )
970 if (
971 nsd_vlp
972 and nsd_vlp.get("virtual-link-protocol-data")
973 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
974 ):
975 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
976 "l3-protocol-data"
977 ]
978 ip_profile_dest_data = {}
979 if "ip-version" in ip_profile_source_data:
980 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
981 "ip-version"
982 ]
983 if "cidr" in ip_profile_source_data:
984 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
985 "cidr"
986 ]
987 if "gateway-ip" in ip_profile_source_data:
988 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
989 "gateway-ip"
990 ]
991 if "dhcp-enabled" in ip_profile_source_data:
992 ip_profile_dest_data["dhcp-params"] = {
993 "enabled": ip_profile_source_data["dhcp-enabled"]
994 }
995 vld_params["ip-profile"] = ip_profile_dest_data
996
997 # update vld_params with instantiation params
998 vld_instantiation_params = find_in_list(
999 get_iterable(ns_params, "vld"),
1000 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1001 )
1002 if vld_instantiation_params:
1003 vld_params.update(vld_instantiation_params)
1004 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1005 target["ns"]["vld"].append(target_vld)
1006 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1007 update_ns_vld_target(target, ns_params)
1008
1009 for vnfr in db_vnfrs.values():
1010 vnfd = find_in_list(
1011 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1012 )
1013 vnf_params = find_in_list(
1014 get_iterable(ns_params, "vnf"),
1015 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1016 )
1017 target_vnf = deepcopy(vnfr)
1018 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1019 for vld in target_vnf.get("vld", ()):
1020 # check if connected to a ns.vld, to fill target'
1021 vnf_cp = find_in_list(
1022 vnfd.get("int-virtual-link-desc", ()),
1023 lambda cpd: cpd.get("id") == vld["id"],
1024 )
1025 if vnf_cp:
1026 ns_cp = "member_vnf:{}.{}".format(
1027 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1028 )
1029 if cp2target.get(ns_cp):
1030 vld["target"] = cp2target[ns_cp]
1031
1032 vld["vim_info"] = {
1033 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1034 }
1035 # check if this network needs SDN assist
1036 target_sdn = None
1037 if vld.get("pci-interfaces"):
1038 db_vim = get_vim_account(vnfr["vim-account-id"])
1039 sdnc_id = db_vim["config"].get("sdn-controller")
1040 if sdnc_id:
1041 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1042 target_sdn = "sdn:{}".format(sdnc_id)
1043 vld["vim_info"][target_sdn] = {
1044 "sdn": True,
1045 "target_vim": target_vim,
1046 "vlds": [sdn_vld],
1047 "type": vld.get("type"),
1048 }
1049
1050 # check at vnfd descriptor, if there is an ip-profile
1051 vld_params = {}
1052 vnfd_vlp = find_in_list(
1053 get_virtual_link_profiles(vnfd),
1054 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1055 )
1056 if (
1057 vnfd_vlp
1058 and vnfd_vlp.get("virtual-link-protocol-data")
1059 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1060 ):
1061 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1062 "l3-protocol-data"
1063 ]
1064 ip_profile_dest_data = {}
1065 if "ip-version" in ip_profile_source_data:
1066 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1067 "ip-version"
1068 ]
1069 if "cidr" in ip_profile_source_data:
1070 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1071 "cidr"
1072 ]
1073 if "gateway-ip" in ip_profile_source_data:
1074 ip_profile_dest_data[
1075 "gateway-address"
1076 ] = ip_profile_source_data["gateway-ip"]
1077 if "dhcp-enabled" in ip_profile_source_data:
1078 ip_profile_dest_data["dhcp-params"] = {
1079 "enabled": ip_profile_source_data["dhcp-enabled"]
1080 }
1081
1082 vld_params["ip-profile"] = ip_profile_dest_data
1083 # update vld_params with instantiation params
1084 if vnf_params:
1085 vld_instantiation_params = find_in_list(
1086 get_iterable(vnf_params, "internal-vld"),
1087 lambda i_vld: i_vld["name"] == vld["id"],
1088 )
1089 if vld_instantiation_params:
1090 vld_params.update(vld_instantiation_params)
1091 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1092
1093 vdur_list = []
1094 for vdur in target_vnf.get("vdur", ()):
1095 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1096 continue # This vdu must not be created
1097 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1098
1099 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1100
1101 if ssh_keys_all:
1102 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1103 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1104 if (
1105 vdu_configuration
1106 and vdu_configuration.get("config-access")
1107 and vdu_configuration.get("config-access").get("ssh-access")
1108 ):
1109 vdur["ssh-keys"] = ssh_keys_all
1110 vdur["ssh-access-required"] = vdu_configuration[
1111 "config-access"
1112 ]["ssh-access"]["required"]
1113 elif (
1114 vnf_configuration
1115 and vnf_configuration.get("config-access")
1116 and vnf_configuration.get("config-access").get("ssh-access")
1117 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1118 ):
1119 vdur["ssh-keys"] = ssh_keys_all
1120 vdur["ssh-access-required"] = vnf_configuration[
1121 "config-access"
1122 ]["ssh-access"]["required"]
1123 elif ssh_keys_instantiation and find_in_list(
1124 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1125 ):
1126 vdur["ssh-keys"] = ssh_keys_instantiation
1127
1128 self.logger.debug("NS > vdur > {}".format(vdur))
1129
1130 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1131 # cloud-init
1132 if vdud.get("cloud-init-file"):
1133 vdur["cloud-init"] = "{}:file:{}".format(
1134 vnfd["_id"], vdud.get("cloud-init-file")
1135 )
1136 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1137 if vdur["cloud-init"] not in target["cloud_init_content"]:
1138 base_folder = vnfd["_admin"]["storage"]
1139 if base_folder["pkg-dir"]:
1140 cloud_init_file = "{}/{}/cloud_init/{}".format(
1141 base_folder["folder"],
1142 base_folder["pkg-dir"],
1143 vdud.get("cloud-init-file"),
1144 )
1145 else:
1146 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1147 base_folder["folder"],
1148 vdud.get("cloud-init-file"),
1149 )
1150 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1151 target["cloud_init_content"][
1152 vdur["cloud-init"]
1153 ] = ci_file.read()
1154 elif vdud.get("cloud-init"):
1155 vdur["cloud-init"] = "{}:vdu:{}".format(
1156 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1157 )
1158 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1159 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1160 "cloud-init"
1161 ]
1162 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1163 deploy_params_vdu = self._format_additional_params(
1164 vdur.get("additionalParams") or {}
1165 )
1166 deploy_params_vdu["OSM"] = get_osm_params(
1167 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1168 )
1169 vdur["additionalParams"] = deploy_params_vdu
1170
1171 # flavor
1172 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1173 if target_vim not in ns_flavor["vim_info"]:
1174 ns_flavor["vim_info"][target_vim] = {}
1175
1176 # deal with images
1177 # in case alternative images are provided we must check if they should be applied
1178 # for the vim_type, modify the vim_type taking into account
1179 ns_image_id = int(vdur["ns-image-id"])
1180 if vdur.get("alt-image-ids"):
1181 db_vim = get_vim_account(vnfr["vim-account-id"])
1182 vim_type = db_vim["vim_type"]
1183 for alt_image_id in vdur.get("alt-image-ids"):
1184 ns_alt_image = target["image"][int(alt_image_id)]
1185 if vim_type == ns_alt_image.get("vim-type"):
1186 # must use alternative image
1187 self.logger.debug(
1188 "use alternative image id: {}".format(alt_image_id)
1189 )
1190 ns_image_id = alt_image_id
1191 vdur["ns-image-id"] = ns_image_id
1192 break
1193 ns_image = target["image"][int(ns_image_id)]
1194 if target_vim not in ns_image["vim_info"]:
1195 ns_image["vim_info"][target_vim] = {}
1196
1197 # Affinity groups
1198 if vdur.get("affinity-or-anti-affinity-group-id"):
1199 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1200 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1201 if target_vim not in ns_ags["vim_info"]:
1202 ns_ags["vim_info"][target_vim] = {}
1203
1204 vdur["vim_info"] = {target_vim: {}}
1205 # instantiation parameters
1206 # if vnf_params:
1207 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1208 # vdud["id"]), None)
1209 vdur_list.append(vdur)
1210 target_vnf["vdur"] = vdur_list
1211 target["vnf"].append(target_vnf)
1212
1213 desc = await self.RO.deploy(nsr_id, target)
1214 self.logger.debug("RO return > {}".format(desc))
1215 action_id = desc["action_id"]
1216 await self._wait_ng_ro(
1217 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1218 )
1219
1220 # Updating NSR
1221 db_nsr_update = {
1222 "_admin.deployed.RO.operational-status": "running",
1223 "detailed-status": " ".join(stage),
1224 }
1225 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1226 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1227 self._write_op_status(nslcmop_id, stage)
1228 self.logger.debug(
1229 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1230 )
1231 return
1232
1233 async def _wait_ng_ro(
1234 self,
1235 nsr_id,
1236 action_id,
1237 nslcmop_id=None,
1238 start_time=None,
1239 timeout=600,
1240 stage=None,
1241 ):
1242 detailed_status_old = None
1243 db_nsr_update = {}
1244 start_time = start_time or time()
1245 while time() <= start_time + timeout:
1246 desc_status = await self.RO.status(nsr_id, action_id)
1247 self.logger.debug("Wait NG RO > {}".format(desc_status))
1248 if desc_status["status"] == "FAILED":
1249 raise NgRoException(desc_status["details"])
1250 elif desc_status["status"] == "BUILD":
1251 if stage:
1252 stage[2] = "VIM: ({})".format(desc_status["details"])
1253 elif desc_status["status"] == "DONE":
1254 if stage:
1255 stage[2] = "Deployed at VIM"
1256 break
1257 else:
1258 assert False, "ROclient.check_ns_status returns unknown {}".format(
1259 desc_status["status"]
1260 )
1261 if stage and nslcmop_id and stage[2] != detailed_status_old:
1262 detailed_status_old = stage[2]
1263 db_nsr_update["detailed-status"] = " ".join(stage)
1264 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1265 self._write_op_status(nslcmop_id, stage)
1266 await asyncio.sleep(15, loop=self.loop)
1267 else: # timeout_ns_deploy
1268 raise NgRoException("Timeout waiting ns to deploy")
1269
1270 async def _terminate_ng_ro(
1271 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1272 ):
1273 db_nsr_update = {}
1274 failed_detail = []
1275 action_id = None
1276 start_deploy = time()
1277 try:
1278 target = {
1279 "ns": {"vld": []},
1280 "vnf": [],
1281 "image": [],
1282 "flavor": [],
1283 "action_id": nslcmop_id,
1284 }
1285 desc = await self.RO.deploy(nsr_id, target)
1286 action_id = desc["action_id"]
1287 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1288 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1289 self.logger.debug(
1290 logging_text
1291 + "ns terminate action at RO. action_id={}".format(action_id)
1292 )
1293
1294 # wait until done
1295 delete_timeout = 20 * 60 # 20 minutes
1296 await self._wait_ng_ro(
1297 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1298 )
1299
1300 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1301 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1302 # delete all nsr
1303 await self.RO.delete(nsr_id)
1304 except Exception as e:
1305 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1306 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1307 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1308 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1309 self.logger.debug(
1310 logging_text + "RO_action_id={} already deleted".format(action_id)
1311 )
1312 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1313 failed_detail.append("delete conflict: {}".format(e))
1314 self.logger.debug(
1315 logging_text
1316 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1317 )
1318 else:
1319 failed_detail.append("delete error: {}".format(e))
1320 self.logger.error(
1321 logging_text
1322 + "RO_action_id={} delete error: {}".format(action_id, e)
1323 )
1324
1325 if failed_detail:
1326 stage[2] = "Error deleting from VIM"
1327 else:
1328 stage[2] = "Deleted from VIM"
1329 db_nsr_update["detailed-status"] = " ".join(stage)
1330 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1331 self._write_op_status(nslcmop_id, stage)
1332
1333 if failed_detail:
1334 raise LcmException("; ".join(failed_detail))
1335 return
1336
1337 async def instantiate_RO(
1338 self,
1339 logging_text,
1340 nsr_id,
1341 nsd,
1342 db_nsr,
1343 db_nslcmop,
1344 db_vnfrs,
1345 db_vnfds,
1346 n2vc_key_list,
1347 stage,
1348 ):
1349 """
1350 Instantiate at RO
1351 :param logging_text: preffix text to use at logging
1352 :param nsr_id: nsr identity
1353 :param nsd: database content of ns descriptor
1354 :param db_nsr: database content of ns record
1355 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1356 :param db_vnfrs:
1357 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1358 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1359 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1360 :return: None or exception
1361 """
1362 try:
1363 start_deploy = time()
1364 ns_params = db_nslcmop.get("operationParams")
1365 if ns_params and ns_params.get("timeout_ns_deploy"):
1366 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1367 else:
1368 timeout_ns_deploy = self.timeout.get(
1369 "ns_deploy", self.timeout_ns_deploy
1370 )
1371
1372 # Check for and optionally request placement optimization. Database will be updated if placement activated
1373 stage[2] = "Waiting for Placement."
1374 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1375 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1376 for vnfr in db_vnfrs.values():
1377 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1378 break
1379 else:
1380 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1381
1382 return await self._instantiate_ng_ro(
1383 logging_text,
1384 nsr_id,
1385 nsd,
1386 db_nsr,
1387 db_nslcmop,
1388 db_vnfrs,
1389 db_vnfds,
1390 n2vc_key_list,
1391 stage,
1392 start_deploy,
1393 timeout_ns_deploy,
1394 )
1395 except Exception as e:
1396 stage[2] = "ERROR deploying at VIM"
1397 self.set_vnfr_at_error(db_vnfrs, str(e))
1398 self.logger.error(
1399 "Error deploying at VIM {}".format(e),
1400 exc_info=not isinstance(
1401 e,
1402 (
1403 ROclient.ROClientException,
1404 LcmException,
1405 DbException,
1406 NgRoException,
1407 ),
1408 ),
1409 )
1410 raise
1411
1412 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1413 """
1414 Wait for kdu to be up, get ip address
1415 :param logging_text: prefix use for logging
1416 :param nsr_id:
1417 :param vnfr_id:
1418 :param kdu_name:
1419 :return: IP address
1420 """
1421
1422 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1423 nb_tries = 0
1424
1425 while nb_tries < 360:
1426 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1427 kdur = next(
1428 (
1429 x
1430 for x in get_iterable(db_vnfr, "kdur")
1431 if x.get("kdu-name") == kdu_name
1432 ),
1433 None,
1434 )
1435 if not kdur:
1436 raise LcmException(
1437 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1438 )
1439 if kdur.get("status"):
1440 if kdur["status"] in ("READY", "ENABLED"):
1441 return kdur.get("ip-address")
1442 else:
1443 raise LcmException(
1444 "target KDU={} is in error state".format(kdu_name)
1445 )
1446
1447 await asyncio.sleep(10, loop=self.loop)
1448 nb_tries += 1
1449 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1450
1451 async def wait_vm_up_insert_key_ro(
1452 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1453 ):
1454 """
1455 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1456 :param logging_text: prefix use for logging
1457 :param nsr_id:
1458 :param vnfr_id:
1459 :param vdu_id:
1460 :param vdu_index:
1461 :param pub_key: public ssh key to inject, None to skip
1462 :param user: user to apply the public ssh key
1463 :return: IP address
1464 """
1465
1466 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1467 ro_nsr_id = None
1468 ip_address = None
1469 nb_tries = 0
1470 target_vdu_id = None
1471 ro_retries = 0
1472
1473 while True:
1474
1475 ro_retries += 1
1476 if ro_retries >= 360: # 1 hour
1477 raise LcmException(
1478 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1479 )
1480
1481 await asyncio.sleep(10, loop=self.loop)
1482
1483 # get ip address
1484 if not target_vdu_id:
1485 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1486
1487 if not vdu_id: # for the VNF case
1488 if db_vnfr.get("status") == "ERROR":
1489 raise LcmException(
1490 "Cannot inject ssh-key because target VNF is in error state"
1491 )
1492 ip_address = db_vnfr.get("ip-address")
1493 if not ip_address:
1494 continue
1495 vdur = next(
1496 (
1497 x
1498 for x in get_iterable(db_vnfr, "vdur")
1499 if x.get("ip-address") == ip_address
1500 ),
1501 None,
1502 )
1503 else: # VDU case
1504 vdur = next(
1505 (
1506 x
1507 for x in get_iterable(db_vnfr, "vdur")
1508 if x.get("vdu-id-ref") == vdu_id
1509 and x.get("count-index") == vdu_index
1510 ),
1511 None,
1512 )
1513
1514 if (
1515 not vdur and len(db_vnfr.get("vdur", ())) == 1
1516 ): # If only one, this should be the target vdu
1517 vdur = db_vnfr["vdur"][0]
1518 if not vdur:
1519 raise LcmException(
1520 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1521 vnfr_id, vdu_id, vdu_index
1522 )
1523 )
1524 # New generation RO stores information at "vim_info"
1525 ng_ro_status = None
1526 target_vim = None
1527 if vdur.get("vim_info"):
1528 target_vim = next(
1529 t for t in vdur["vim_info"]
1530 ) # there should be only one key
1531 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1532 if (
1533 vdur.get("pdu-type")
1534 or vdur.get("status") == "ACTIVE"
1535 or ng_ro_status == "ACTIVE"
1536 ):
1537 ip_address = vdur.get("ip-address")
1538 if not ip_address:
1539 continue
1540 target_vdu_id = vdur["vdu-id-ref"]
1541 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1542 raise LcmException(
1543 "Cannot inject ssh-key because target VM is in error state"
1544 )
1545
1546 if not target_vdu_id:
1547 continue
1548
1549 # inject public key into machine
1550 if pub_key and user:
1551 self.logger.debug(logging_text + "Inserting RO key")
1552 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1553 if vdur.get("pdu-type"):
1554 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1555 return ip_address
1556 try:
1557 ro_vm_id = "{}-{}".format(
1558 db_vnfr["member-vnf-index-ref"], target_vdu_id
1559 ) # TODO add vdu_index
1560 if self.ng_ro:
1561 target = {
1562 "action": {
1563 "action": "inject_ssh_key",
1564 "key": pub_key,
1565 "user": user,
1566 },
1567 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1568 }
1569 desc = await self.RO.deploy(nsr_id, target)
1570 action_id = desc["action_id"]
1571 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1572 break
1573 else:
1574 # wait until NS is deployed at RO
1575 if not ro_nsr_id:
1576 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1577 ro_nsr_id = deep_get(
1578 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1579 )
1580 if not ro_nsr_id:
1581 continue
1582 result_dict = await self.RO.create_action(
1583 item="ns",
1584 item_id_name=ro_nsr_id,
1585 descriptor={
1586 "add_public_key": pub_key,
1587 "vms": [ro_vm_id],
1588 "user": user,
1589 },
1590 )
1591 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1592 if not result_dict or not isinstance(result_dict, dict):
1593 raise LcmException(
1594 "Unknown response from RO when injecting key"
1595 )
1596 for result in result_dict.values():
1597 if result.get("vim_result") == 200:
1598 break
1599 else:
1600 raise ROclient.ROClientException(
1601 "error injecting key: {}".format(
1602 result.get("description")
1603 )
1604 )
1605 break
1606 except NgRoException as e:
1607 raise LcmException(
1608 "Reaching max tries injecting key. Error: {}".format(e)
1609 )
1610 except ROclient.ROClientException as e:
1611 if not nb_tries:
1612 self.logger.debug(
1613 logging_text
1614 + "error injecting key: {}. Retrying until {} seconds".format(
1615 e, 20 * 10
1616 )
1617 )
1618 nb_tries += 1
1619 if nb_tries >= 20:
1620 raise LcmException(
1621 "Reaching max tries injecting key. Error: {}".format(e)
1622 )
1623 else:
1624 break
1625
1626 return ip_address
1627
1628 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1629 """
1630 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1631 """
1632 my_vca = vca_deployed_list[vca_index]
1633 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1634 # vdu or kdu: no dependencies
1635 return
1636 timeout = 300
1637 while timeout >= 0:
1638 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1639 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1640 configuration_status_list = db_nsr["configurationStatus"]
1641 for index, vca_deployed in enumerate(configuration_status_list):
1642 if index == vca_index:
1643 # myself
1644 continue
1645 if not my_vca.get("member-vnf-index") or (
1646 vca_deployed.get("member-vnf-index")
1647 == my_vca.get("member-vnf-index")
1648 ):
1649 internal_status = configuration_status_list[index].get("status")
1650 if internal_status == "READY":
1651 continue
1652 elif internal_status == "BROKEN":
1653 raise LcmException(
1654 "Configuration aborted because dependent charm/s has failed"
1655 )
1656 else:
1657 break
1658 else:
1659 # no dependencies, return
1660 return
1661 await asyncio.sleep(10)
1662 timeout -= 1
1663
1664 raise LcmException("Configuration aborted because dependent charm/s timeout")
1665
1666 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1667 vca_id = None
1668 if db_vnfr:
1669 vca_id = deep_get(db_vnfr, ("vca-id",))
1670 elif db_nsr:
1671 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1672 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1673 return vca_id
1674
1675 async def instantiate_N2VC(
1676 self,
1677 logging_text,
1678 vca_index,
1679 nsi_id,
1680 db_nsr,
1681 db_vnfr,
1682 vdu_id,
1683 kdu_name,
1684 vdu_index,
1685 config_descriptor,
1686 deploy_params,
1687 base_folder,
1688 nslcmop_id,
1689 stage,
1690 vca_type,
1691 vca_name,
1692 ee_config_descriptor,
1693 ):
1694 nsr_id = db_nsr["_id"]
1695 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1696 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1697 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1698 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1699 db_dict = {
1700 "collection": "nsrs",
1701 "filter": {"_id": nsr_id},
1702 "path": db_update_entry,
1703 }
1704 step = ""
1705 try:
1706
1707 element_type = "NS"
1708 element_under_configuration = nsr_id
1709
1710 vnfr_id = None
1711 if db_vnfr:
1712 vnfr_id = db_vnfr["_id"]
1713 osm_config["osm"]["vnf_id"] = vnfr_id
1714
1715 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1716
1717 if vca_type == "native_charm":
1718 index_number = 0
1719 else:
1720 index_number = vdu_index or 0
1721
1722 if vnfr_id:
1723 element_type = "VNF"
1724 element_under_configuration = vnfr_id
1725 namespace += ".{}-{}".format(vnfr_id, index_number)
1726 if vdu_id:
1727 namespace += ".{}-{}".format(vdu_id, index_number)
1728 element_type = "VDU"
1729 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1730 osm_config["osm"]["vdu_id"] = vdu_id
1731 elif kdu_name:
1732 namespace += ".{}".format(kdu_name)
1733 element_type = "KDU"
1734 element_under_configuration = kdu_name
1735 osm_config["osm"]["kdu_name"] = kdu_name
1736
1737 # Get artifact path
1738 if base_folder["pkg-dir"]:
1739 artifact_path = "{}/{}/{}/{}".format(
1740 base_folder["folder"],
1741 base_folder["pkg-dir"],
1742 "charms"
1743 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1744 else "helm-charts",
1745 vca_name,
1746 )
1747 else:
1748 artifact_path = "{}/Scripts/{}/{}/".format(
1749 base_folder["folder"],
1750 "charms"
1751 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1752 else "helm-charts",
1753 vca_name,
1754 )
1755
1756 self.logger.debug("Artifact path > {}".format(artifact_path))
1757
1758 # get initial_config_primitive_list that applies to this element
1759 initial_config_primitive_list = config_descriptor.get(
1760 "initial-config-primitive"
1761 )
1762
1763 self.logger.debug(
1764 "Initial config primitive list > {}".format(
1765 initial_config_primitive_list
1766 )
1767 )
1768
1769 # add config if not present for NS charm
1770 ee_descriptor_id = ee_config_descriptor.get("id")
1771 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1772 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1773 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1774 )
1775
1776 self.logger.debug(
1777 "Initial config primitive list #2 > {}".format(
1778 initial_config_primitive_list
1779 )
1780 )
1781 # n2vc_redesign STEP 3.1
1782 # find old ee_id if exists
1783 ee_id = vca_deployed.get("ee_id")
1784
1785 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1786 # create or register execution environment in VCA
1787 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1788
1789 self._write_configuration_status(
1790 nsr_id=nsr_id,
1791 vca_index=vca_index,
1792 status="CREATING",
1793 element_under_configuration=element_under_configuration,
1794 element_type=element_type,
1795 )
1796
1797 step = "create execution environment"
1798 self.logger.debug(logging_text + step)
1799
1800 ee_id = None
1801 credentials = None
1802 if vca_type == "k8s_proxy_charm":
1803 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1804 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1805 namespace=namespace,
1806 artifact_path=artifact_path,
1807 db_dict=db_dict,
1808 vca_id=vca_id,
1809 )
1810 elif vca_type == "helm" or vca_type == "helm-v3":
1811 ee_id, credentials = await self.vca_map[
1812 vca_type
1813 ].create_execution_environment(
1814 namespace=namespace,
1815 reuse_ee_id=ee_id,
1816 db_dict=db_dict,
1817 config=osm_config,
1818 artifact_path=artifact_path,
1819 vca_type=vca_type,
1820 )
1821 else:
1822 ee_id, credentials = await self.vca_map[
1823 vca_type
1824 ].create_execution_environment(
1825 namespace=namespace,
1826 reuse_ee_id=ee_id,
1827 db_dict=db_dict,
1828 vca_id=vca_id,
1829 )
1830
1831 elif vca_type == "native_charm":
1832 step = "Waiting to VM being up and getting IP address"
1833 self.logger.debug(logging_text + step)
1834 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1835 logging_text,
1836 nsr_id,
1837 vnfr_id,
1838 vdu_id,
1839 vdu_index,
1840 user=None,
1841 pub_key=None,
1842 )
1843 credentials = {"hostname": rw_mgmt_ip}
1844 # get username
1845 username = deep_get(
1846 config_descriptor, ("config-access", "ssh-access", "default-user")
1847 )
1848 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1849 # merged. Meanwhile let's get username from initial-config-primitive
1850 if not username and initial_config_primitive_list:
1851 for config_primitive in initial_config_primitive_list:
1852 for param in config_primitive.get("parameter", ()):
1853 if param["name"] == "ssh-username":
1854 username = param["value"]
1855 break
1856 if not username:
1857 raise LcmException(
1858 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1859 "'config-access.ssh-access.default-user'"
1860 )
1861 credentials["username"] = username
1862 # n2vc_redesign STEP 3.2
1863
1864 self._write_configuration_status(
1865 nsr_id=nsr_id,
1866 vca_index=vca_index,
1867 status="REGISTERING",
1868 element_under_configuration=element_under_configuration,
1869 element_type=element_type,
1870 )
1871
1872 step = "register execution environment {}".format(credentials)
1873 self.logger.debug(logging_text + step)
1874 ee_id = await self.vca_map[vca_type].register_execution_environment(
1875 credentials=credentials,
1876 namespace=namespace,
1877 db_dict=db_dict,
1878 vca_id=vca_id,
1879 )
1880
1881 # for compatibility with MON/POL modules, the need model and application name at database
1882 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1883 ee_id_parts = ee_id.split(".")
1884 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1885 if len(ee_id_parts) >= 2:
1886 model_name = ee_id_parts[0]
1887 application_name = ee_id_parts[1]
1888 db_nsr_update[db_update_entry + "model"] = model_name
1889 db_nsr_update[db_update_entry + "application"] = application_name
1890
1891 # n2vc_redesign STEP 3.3
1892 step = "Install configuration Software"
1893
1894 self._write_configuration_status(
1895 nsr_id=nsr_id,
1896 vca_index=vca_index,
1897 status="INSTALLING SW",
1898 element_under_configuration=element_under_configuration,
1899 element_type=element_type,
1900 other_update=db_nsr_update,
1901 )
1902
1903 # TODO check if already done
1904 self.logger.debug(logging_text + step)
1905 config = None
1906 if vca_type == "native_charm":
1907 config_primitive = next(
1908 (p for p in initial_config_primitive_list if p["name"] == "config"),
1909 None,
1910 )
1911 if config_primitive:
1912 config = self._map_primitive_params(
1913 config_primitive, {}, deploy_params
1914 )
1915 num_units = 1
1916 if vca_type == "lxc_proxy_charm":
1917 if element_type == "NS":
1918 num_units = db_nsr.get("config-units") or 1
1919 elif element_type == "VNF":
1920 num_units = db_vnfr.get("config-units") or 1
1921 elif element_type == "VDU":
1922 for v in db_vnfr["vdur"]:
1923 if vdu_id == v["vdu-id-ref"]:
1924 num_units = v.get("config-units") or 1
1925 break
1926 if vca_type != "k8s_proxy_charm":
1927 await self.vca_map[vca_type].install_configuration_sw(
1928 ee_id=ee_id,
1929 artifact_path=artifact_path,
1930 db_dict=db_dict,
1931 config=config,
1932 num_units=num_units,
1933 vca_id=vca_id,
1934 vca_type=vca_type,
1935 )
1936
1937 # write in db flag of configuration_sw already installed
1938 self.update_db_2(
1939 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1940 )
1941
1942 # add relations for this VCA (wait for other peers related with this VCA)
1943 await self._add_vca_relations(
1944 logging_text=logging_text,
1945 nsr_id=nsr_id,
1946 vca_type=vca_type,
1947 vca_index=vca_index,
1948 )
1949
1950 # if SSH access is required, then get execution environment SSH public
1951 # if native charm we have waited already to VM be UP
1952 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1953 pub_key = None
1954 user = None
1955 # self.logger.debug("get ssh key block")
1956 if deep_get(
1957 config_descriptor, ("config-access", "ssh-access", "required")
1958 ):
1959 # self.logger.debug("ssh key needed")
1960 # Needed to inject a ssh key
1961 user = deep_get(
1962 config_descriptor,
1963 ("config-access", "ssh-access", "default-user"),
1964 )
1965 step = "Install configuration Software, getting public ssh key"
1966 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1967 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1968 )
1969
1970 step = "Insert public key into VM user={} ssh_key={}".format(
1971 user, pub_key
1972 )
1973 else:
1974 # self.logger.debug("no need to get ssh key")
1975 step = "Waiting to VM being up and getting IP address"
1976 self.logger.debug(logging_text + step)
1977
1978 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1979 rw_mgmt_ip = None
1980
1981 # n2vc_redesign STEP 5.1
1982 # wait for RO (ip-address) Insert pub_key into VM
1983 if vnfr_id:
1984 if kdu_name:
1985 rw_mgmt_ip = await self.wait_kdu_up(
1986 logging_text, nsr_id, vnfr_id, kdu_name
1987 )
1988
1989 # This verification is needed in order to avoid trying to add a public key
1990 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
1991 # for a KNF and not for its KDUs, the previous verification gives False, and the code
1992 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
1993 # or it is a KNF)
1994 elif db_vnfr.get('vdur'):
1995 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1996 logging_text,
1997 nsr_id,
1998 vnfr_id,
1999 vdu_id,
2000 vdu_index,
2001 user=user,
2002 pub_key=pub_key,
2003 )
2004
2005 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2006
2007 # store rw_mgmt_ip in deploy params for later replacement
2008 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2009
2010 # n2vc_redesign STEP 6 Execute initial config primitive
2011 step = "execute initial config primitive"
2012
2013 # wait for dependent primitives execution (NS -> VNF -> VDU)
2014 if initial_config_primitive_list:
2015 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2016
2017 # stage, in function of element type: vdu, kdu, vnf or ns
2018 my_vca = vca_deployed_list[vca_index]
2019 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2020 # VDU or KDU
2021 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2022 elif my_vca.get("member-vnf-index"):
2023 # VNF
2024 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2025 else:
2026 # NS
2027 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2028
2029 self._write_configuration_status(
2030 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2031 )
2032
2033 self._write_op_status(op_id=nslcmop_id, stage=stage)
2034
2035 check_if_terminated_needed = True
2036 for initial_config_primitive in initial_config_primitive_list:
2037 # adding information on the vca_deployed if it is a NS execution environment
2038 if not vca_deployed["member-vnf-index"]:
2039 deploy_params["ns_config_info"] = json.dumps(
2040 self._get_ns_config_info(nsr_id)
2041 )
2042 # TODO check if already done
2043 primitive_params_ = self._map_primitive_params(
2044 initial_config_primitive, {}, deploy_params
2045 )
2046
2047 step = "execute primitive '{}' params '{}'".format(
2048 initial_config_primitive["name"], primitive_params_
2049 )
2050 self.logger.debug(logging_text + step)
2051 await self.vca_map[vca_type].exec_primitive(
2052 ee_id=ee_id,
2053 primitive_name=initial_config_primitive["name"],
2054 params_dict=primitive_params_,
2055 db_dict=db_dict,
2056 vca_id=vca_id,
2057 vca_type=vca_type,
2058 )
2059 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2060 if check_if_terminated_needed:
2061 if config_descriptor.get("terminate-config-primitive"):
2062 self.update_db_2(
2063 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2064 )
2065 check_if_terminated_needed = False
2066
2067 # TODO register in database that primitive is done
2068
2069 # STEP 7 Configure metrics
2070 if vca_type == "helm" or vca_type == "helm-v3":
2071 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2072 ee_id=ee_id,
2073 artifact_path=artifact_path,
2074 ee_config_descriptor=ee_config_descriptor,
2075 vnfr_id=vnfr_id,
2076 nsr_id=nsr_id,
2077 target_ip=rw_mgmt_ip,
2078 )
2079 if prometheus_jobs:
2080 self.update_db_2(
2081 "nsrs",
2082 nsr_id,
2083 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2084 )
2085
2086 for job in prometheus_jobs:
2087 self.db.set_one(
2088 "prometheus_jobs",
2089 {
2090 "job_name": job["job_name"]
2091 },
2092 job,
2093 upsert=True,
2094 fail_on_empty=False,
2095 )
2096
2097 step = "instantiated at VCA"
2098 self.logger.debug(logging_text + step)
2099
2100 self._write_configuration_status(
2101 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2102 )
2103
2104 except Exception as e: # TODO not use Exception but N2VC exception
2105 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2106 if not isinstance(
2107 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2108 ):
2109 self.logger.error(
2110 "Exception while {} : {}".format(step, e), exc_info=True
2111 )
2112 self._write_configuration_status(
2113 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2114 )
2115 raise LcmException("{} {}".format(step, e)) from e
2116
2117 def _write_ns_status(
2118 self,
2119 nsr_id: str,
2120 ns_state: str,
2121 current_operation: str,
2122 current_operation_id: str,
2123 error_description: str = None,
2124 error_detail: str = None,
2125 other_update: dict = None,
2126 ):
2127 """
2128 Update db_nsr fields.
2129 :param nsr_id:
2130 :param ns_state:
2131 :param current_operation:
2132 :param current_operation_id:
2133 :param error_description:
2134 :param error_detail:
2135 :param other_update: Other required changes at database if provided, will be cleared
2136 :return:
2137 """
2138 try:
2139 db_dict = other_update or {}
2140 db_dict[
2141 "_admin.nslcmop"
2142 ] = current_operation_id # for backward compatibility
2143 db_dict["_admin.current-operation"] = current_operation_id
2144 db_dict["_admin.operation-type"] = (
2145 current_operation if current_operation != "IDLE" else None
2146 )
2147 db_dict["currentOperation"] = current_operation
2148 db_dict["currentOperationID"] = current_operation_id
2149 db_dict["errorDescription"] = error_description
2150 db_dict["errorDetail"] = error_detail
2151
2152 if ns_state:
2153 db_dict["nsState"] = ns_state
2154 self.update_db_2("nsrs", nsr_id, db_dict)
2155 except DbException as e:
2156 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2157
2158 def _write_op_status(
2159 self,
2160 op_id: str,
2161 stage: list = None,
2162 error_message: str = None,
2163 queuePosition: int = 0,
2164 operation_state: str = None,
2165 other_update: dict = None,
2166 ):
2167 try:
2168 db_dict = other_update or {}
2169 db_dict["queuePosition"] = queuePosition
2170 if isinstance(stage, list):
2171 db_dict["stage"] = stage[0]
2172 db_dict["detailed-status"] = " ".join(stage)
2173 elif stage is not None:
2174 db_dict["stage"] = str(stage)
2175
2176 if error_message is not None:
2177 db_dict["errorMessage"] = error_message
2178 if operation_state is not None:
2179 db_dict["operationState"] = operation_state
2180 db_dict["statusEnteredTime"] = time()
2181 self.update_db_2("nslcmops", op_id, db_dict)
2182 except DbException as e:
2183 self.logger.warn(
2184 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2185 )
2186
2187 def _write_all_config_status(self, db_nsr: dict, status: str):
2188 try:
2189 nsr_id = db_nsr["_id"]
2190 # configurationStatus
2191 config_status = db_nsr.get("configurationStatus")
2192 if config_status:
2193 db_nsr_update = {
2194 "configurationStatus.{}.status".format(index): status
2195 for index, v in enumerate(config_status)
2196 if v
2197 }
2198 # update status
2199 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2200
2201 except DbException as e:
2202 self.logger.warn(
2203 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2204 )
2205
2206 def _write_configuration_status(
2207 self,
2208 nsr_id: str,
2209 vca_index: int,
2210 status: str = None,
2211 element_under_configuration: str = None,
2212 element_type: str = None,
2213 other_update: dict = None,
2214 ):
2215
2216 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2217 # .format(vca_index, status))
2218
2219 try:
2220 db_path = "configurationStatus.{}.".format(vca_index)
2221 db_dict = other_update or {}
2222 if status:
2223 db_dict[db_path + "status"] = status
2224 if element_under_configuration:
2225 db_dict[
2226 db_path + "elementUnderConfiguration"
2227 ] = element_under_configuration
2228 if element_type:
2229 db_dict[db_path + "elementType"] = element_type
2230 self.update_db_2("nsrs", nsr_id, db_dict)
2231 except DbException as e:
2232 self.logger.warn(
2233 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2234 status, nsr_id, vca_index, e
2235 )
2236 )
2237
2238 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2239 """
2240 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2241 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2242 Database is used because the result can be obtained from a different LCM worker in case of HA.
2243 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2244 :param db_nslcmop: database content of nslcmop
2245 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2246 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2247 computed 'vim-account-id'
2248 """
2249 modified = False
2250 nslcmop_id = db_nslcmop["_id"]
2251 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2252 if placement_engine == "PLA":
2253 self.logger.debug(
2254 logging_text + "Invoke and wait for placement optimization"
2255 )
2256 await self.msg.aiowrite(
2257 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2258 )
2259 db_poll_interval = 5
2260 wait = db_poll_interval * 10
2261 pla_result = None
2262 while not pla_result and wait >= 0:
2263 await asyncio.sleep(db_poll_interval)
2264 wait -= db_poll_interval
2265 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2266 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2267
2268 if not pla_result:
2269 raise LcmException(
2270 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2271 )
2272
2273 for pla_vnf in pla_result["vnf"]:
2274 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2275 if not pla_vnf.get("vimAccountId") or not vnfr:
2276 continue
2277 modified = True
2278 self.db.set_one(
2279 "vnfrs",
2280 {"_id": vnfr["_id"]},
2281 {"vim-account-id": pla_vnf["vimAccountId"]},
2282 )
2283 # Modifies db_vnfrs
2284 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2285 return modified
2286
2287 def update_nsrs_with_pla_result(self, params):
2288 try:
2289 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2290 self.update_db_2(
2291 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2292 )
2293 except Exception as e:
2294 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2295
2296 async def instantiate(self, nsr_id, nslcmop_id):
2297 """
2298
2299 :param nsr_id: ns instance to deploy
2300 :param nslcmop_id: operation to run
2301 :return:
2302 """
2303
2304 # Try to lock HA task here
2305 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2306 if not task_is_locked_by_me:
2307 self.logger.debug(
2308 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2309 )
2310 return
2311
2312 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2313 self.logger.debug(logging_text + "Enter")
2314
2315 # get all needed from database
2316
2317 # database nsrs record
2318 db_nsr = None
2319
2320 # database nslcmops record
2321 db_nslcmop = None
2322
2323 # update operation on nsrs
2324 db_nsr_update = {}
2325 # update operation on nslcmops
2326 db_nslcmop_update = {}
2327
2328 nslcmop_operation_state = None
2329 db_vnfrs = {} # vnf's info indexed by member-index
2330 # n2vc_info = {}
2331 tasks_dict_info = {} # from task to info text
2332 exc = None
2333 error_list = []
2334 stage = [
2335 "Stage 1/5: preparation of the environment.",
2336 "Waiting for previous operations to terminate.",
2337 "",
2338 ]
2339 # ^ stage, step, VIM progress
2340 try:
2341 # wait for any previous tasks in process
2342 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2343
2344 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2345 stage[1] = "Reading from database."
2346 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2347 db_nsr_update["detailed-status"] = "creating"
2348 db_nsr_update["operational-status"] = "init"
2349 self._write_ns_status(
2350 nsr_id=nsr_id,
2351 ns_state="BUILDING",
2352 current_operation="INSTANTIATING",
2353 current_operation_id=nslcmop_id,
2354 other_update=db_nsr_update,
2355 )
2356 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2357
2358 # read from db: operation
2359 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2360 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2361 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2362 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2363 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2364 )
2365 ns_params = db_nslcmop.get("operationParams")
2366 if ns_params and ns_params.get("timeout_ns_deploy"):
2367 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2368 else:
2369 timeout_ns_deploy = self.timeout.get(
2370 "ns_deploy", self.timeout_ns_deploy
2371 )
2372
2373 # read from db: ns
2374 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2375 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2376 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2377 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2378 self.fs.sync(db_nsr["nsd-id"])
2379 db_nsr["nsd"] = nsd
2380 # nsr_name = db_nsr["name"] # TODO short-name??
2381
2382 # read from db: vnf's of this ns
2383 stage[1] = "Getting vnfrs from db."
2384 self.logger.debug(logging_text + stage[1])
2385 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2386
2387 # read from db: vnfd's for every vnf
2388 db_vnfds = [] # every vnfd data
2389
2390 # for each vnf in ns, read vnfd
2391 for vnfr in db_vnfrs_list:
2392 if vnfr.get("kdur"):
2393 kdur_list = []
2394 for kdur in vnfr["kdur"]:
2395 if kdur.get("additionalParams"):
2396 kdur["additionalParams"] = json.loads(
2397 kdur["additionalParams"]
2398 )
2399 kdur_list.append(kdur)
2400 vnfr["kdur"] = kdur_list
2401
2402 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2403 vnfd_id = vnfr["vnfd-id"]
2404 vnfd_ref = vnfr["vnfd-ref"]
2405 self.fs.sync(vnfd_id)
2406
2407 # if we haven't this vnfd, read it from db
2408 if vnfd_id not in db_vnfds:
2409 # read from db
2410 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2411 vnfd_id, vnfd_ref
2412 )
2413 self.logger.debug(logging_text + stage[1])
2414 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2415
2416 # store vnfd
2417 db_vnfds.append(vnfd)
2418
2419 # Get or generates the _admin.deployed.VCA list
2420 vca_deployed_list = None
2421 if db_nsr["_admin"].get("deployed"):
2422 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2423 if vca_deployed_list is None:
2424 vca_deployed_list = []
2425 configuration_status_list = []
2426 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2427 db_nsr_update["configurationStatus"] = configuration_status_list
2428 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2429 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2430 elif isinstance(vca_deployed_list, dict):
2431 # maintain backward compatibility. Change a dict to list at database
2432 vca_deployed_list = list(vca_deployed_list.values())
2433 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2434 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2435
2436 if not isinstance(
2437 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2438 ):
2439 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2440 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2441
2442 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2443 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2444 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2445 self.db.set_list(
2446 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2447 )
2448
2449 # n2vc_redesign STEP 2 Deploy Network Scenario
2450 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2451 self._write_op_status(op_id=nslcmop_id, stage=stage)
2452
2453 stage[1] = "Deploying KDUs."
2454 # self.logger.debug(logging_text + "Before deploy_kdus")
2455 # Call to deploy_kdus in case exists the "vdu:kdu" param
2456 await self.deploy_kdus(
2457 logging_text=logging_text,
2458 nsr_id=nsr_id,
2459 nslcmop_id=nslcmop_id,
2460 db_vnfrs=db_vnfrs,
2461 db_vnfds=db_vnfds,
2462 task_instantiation_info=tasks_dict_info,
2463 )
2464
2465 stage[1] = "Getting VCA public key."
2466 # n2vc_redesign STEP 1 Get VCA public ssh-key
2467 # feature 1429. Add n2vc public key to needed VMs
2468 n2vc_key = self.n2vc.get_public_key()
2469 n2vc_key_list = [n2vc_key]
2470 if self.vca_config.get("public_key"):
2471 n2vc_key_list.append(self.vca_config["public_key"])
2472
2473 stage[1] = "Deploying NS at VIM."
2474 task_ro = asyncio.ensure_future(
2475 self.instantiate_RO(
2476 logging_text=logging_text,
2477 nsr_id=nsr_id,
2478 nsd=nsd,
2479 db_nsr=db_nsr,
2480 db_nslcmop=db_nslcmop,
2481 db_vnfrs=db_vnfrs,
2482 db_vnfds=db_vnfds,
2483 n2vc_key_list=n2vc_key_list,
2484 stage=stage,
2485 )
2486 )
2487 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2488 tasks_dict_info[task_ro] = "Deploying at VIM"
2489
2490 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2491 stage[1] = "Deploying Execution Environments."
2492 self.logger.debug(logging_text + stage[1])
2493
2494 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2495 for vnf_profile in get_vnf_profiles(nsd):
2496 vnfd_id = vnf_profile["vnfd-id"]
2497 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2498 member_vnf_index = str(vnf_profile["id"])
2499 db_vnfr = db_vnfrs[member_vnf_index]
2500 base_folder = vnfd["_admin"]["storage"]
2501 vdu_id = None
2502 vdu_index = 0
2503 vdu_name = None
2504 kdu_name = None
2505
2506 # Get additional parameters
2507 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2508 if db_vnfr.get("additionalParamsForVnf"):
2509 deploy_params.update(
2510 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2511 )
2512
2513 descriptor_config = get_configuration(vnfd, vnfd["id"])
2514 if descriptor_config:
2515 self._deploy_n2vc(
2516 logging_text=logging_text
2517 + "member_vnf_index={} ".format(member_vnf_index),
2518 db_nsr=db_nsr,
2519 db_vnfr=db_vnfr,
2520 nslcmop_id=nslcmop_id,
2521 nsr_id=nsr_id,
2522 nsi_id=nsi_id,
2523 vnfd_id=vnfd_id,
2524 vdu_id=vdu_id,
2525 kdu_name=kdu_name,
2526 member_vnf_index=member_vnf_index,
2527 vdu_index=vdu_index,
2528 vdu_name=vdu_name,
2529 deploy_params=deploy_params,
2530 descriptor_config=descriptor_config,
2531 base_folder=base_folder,
2532 task_instantiation_info=tasks_dict_info,
2533 stage=stage,
2534 )
2535
2536 # Deploy charms for each VDU that supports one.
2537 for vdud in get_vdu_list(vnfd):
2538 vdu_id = vdud["id"]
2539 descriptor_config = get_configuration(vnfd, vdu_id)
2540 vdur = find_in_list(
2541 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2542 )
2543
2544 if vdur.get("additionalParams"):
2545 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2546 else:
2547 deploy_params_vdu = deploy_params
2548 deploy_params_vdu["OSM"] = get_osm_params(
2549 db_vnfr, vdu_id, vdu_count_index=0
2550 )
2551 vdud_count = get_number_of_instances(vnfd, vdu_id)
2552
2553 self.logger.debug("VDUD > {}".format(vdud))
2554 self.logger.debug(
2555 "Descriptor config > {}".format(descriptor_config)
2556 )
2557 if descriptor_config:
2558 vdu_name = None
2559 kdu_name = None
2560 for vdu_index in range(vdud_count):
2561 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2562 self._deploy_n2vc(
2563 logging_text=logging_text
2564 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2565 member_vnf_index, vdu_id, vdu_index
2566 ),
2567 db_nsr=db_nsr,
2568 db_vnfr=db_vnfr,
2569 nslcmop_id=nslcmop_id,
2570 nsr_id=nsr_id,
2571 nsi_id=nsi_id,
2572 vnfd_id=vnfd_id,
2573 vdu_id=vdu_id,
2574 kdu_name=kdu_name,
2575 member_vnf_index=member_vnf_index,
2576 vdu_index=vdu_index,
2577 vdu_name=vdu_name,
2578 deploy_params=deploy_params_vdu,
2579 descriptor_config=descriptor_config,
2580 base_folder=base_folder,
2581 task_instantiation_info=tasks_dict_info,
2582 stage=stage,
2583 )
2584 for kdud in get_kdu_list(vnfd):
2585 kdu_name = kdud["name"]
2586 descriptor_config = get_configuration(vnfd, kdu_name)
2587 if descriptor_config:
2588 vdu_id = None
2589 vdu_index = 0
2590 vdu_name = None
2591 kdur = next(
2592 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2593 )
2594 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2595 if kdur.get("additionalParams"):
2596 deploy_params_kdu.update(
2597 parse_yaml_strings(kdur["additionalParams"].copy())
2598 )
2599
2600 self._deploy_n2vc(
2601 logging_text=logging_text,
2602 db_nsr=db_nsr,
2603 db_vnfr=db_vnfr,
2604 nslcmop_id=nslcmop_id,
2605 nsr_id=nsr_id,
2606 nsi_id=nsi_id,
2607 vnfd_id=vnfd_id,
2608 vdu_id=vdu_id,
2609 kdu_name=kdu_name,
2610 member_vnf_index=member_vnf_index,
2611 vdu_index=vdu_index,
2612 vdu_name=vdu_name,
2613 deploy_params=deploy_params_kdu,
2614 descriptor_config=descriptor_config,
2615 base_folder=base_folder,
2616 task_instantiation_info=tasks_dict_info,
2617 stage=stage,
2618 )
2619
2620 # Check if this NS has a charm configuration
2621 descriptor_config = nsd.get("ns-configuration")
2622 if descriptor_config and descriptor_config.get("juju"):
2623 vnfd_id = None
2624 db_vnfr = None
2625 member_vnf_index = None
2626 vdu_id = None
2627 kdu_name = None
2628 vdu_index = 0
2629 vdu_name = None
2630
2631 # Get additional parameters
2632 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2633 if db_nsr.get("additionalParamsForNs"):
2634 deploy_params.update(
2635 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2636 )
2637 base_folder = nsd["_admin"]["storage"]
2638 self._deploy_n2vc(
2639 logging_text=logging_text,
2640 db_nsr=db_nsr,
2641 db_vnfr=db_vnfr,
2642 nslcmop_id=nslcmop_id,
2643 nsr_id=nsr_id,
2644 nsi_id=nsi_id,
2645 vnfd_id=vnfd_id,
2646 vdu_id=vdu_id,
2647 kdu_name=kdu_name,
2648 member_vnf_index=member_vnf_index,
2649 vdu_index=vdu_index,
2650 vdu_name=vdu_name,
2651 deploy_params=deploy_params,
2652 descriptor_config=descriptor_config,
2653 base_folder=base_folder,
2654 task_instantiation_info=tasks_dict_info,
2655 stage=stage,
2656 )
2657
2658 # rest of staff will be done at finally
2659
2660 except (
2661 ROclient.ROClientException,
2662 DbException,
2663 LcmException,
2664 N2VCException,
2665 ) as e:
2666 self.logger.error(
2667 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2668 )
2669 exc = e
2670 except asyncio.CancelledError:
2671 self.logger.error(
2672 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2673 )
2674 exc = "Operation was cancelled"
2675 except Exception as e:
2676 exc = traceback.format_exc()
2677 self.logger.critical(
2678 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2679 exc_info=True,
2680 )
2681 finally:
2682 if exc:
2683 error_list.append(str(exc))
2684 try:
2685 # wait for pending tasks
2686 if tasks_dict_info:
2687 stage[1] = "Waiting for instantiate pending tasks."
2688 self.logger.debug(logging_text + stage[1])
2689 error_list += await self._wait_for_tasks(
2690 logging_text,
2691 tasks_dict_info,
2692 timeout_ns_deploy,
2693 stage,
2694 nslcmop_id,
2695 nsr_id=nsr_id,
2696 )
2697 stage[1] = stage[2] = ""
2698 except asyncio.CancelledError:
2699 error_list.append("Cancelled")
2700 # TODO cancel all tasks
2701 except Exception as exc:
2702 error_list.append(str(exc))
2703
2704 # update operation-status
2705 db_nsr_update["operational-status"] = "running"
2706 # let's begin with VCA 'configured' status (later we can change it)
2707 db_nsr_update["config-status"] = "configured"
2708 for task, task_name in tasks_dict_info.items():
2709 if not task.done() or task.cancelled() or task.exception():
2710 if task_name.startswith(self.task_name_deploy_vca):
2711 # A N2VC task is pending
2712 db_nsr_update["config-status"] = "failed"
2713 else:
2714 # RO or KDU task is pending
2715 db_nsr_update["operational-status"] = "failed"
2716
2717 # update status at database
2718 if error_list:
2719 error_detail = ". ".join(error_list)
2720 self.logger.error(logging_text + error_detail)
2721 error_description_nslcmop = "{} Detail: {}".format(
2722 stage[0], error_detail
2723 )
2724 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2725 nslcmop_id, stage[0]
2726 )
2727
2728 db_nsr_update["detailed-status"] = (
2729 error_description_nsr + " Detail: " + error_detail
2730 )
2731 db_nslcmop_update["detailed-status"] = error_detail
2732 nslcmop_operation_state = "FAILED"
2733 ns_state = "BROKEN"
2734 else:
2735 error_detail = None
2736 error_description_nsr = error_description_nslcmop = None
2737 ns_state = "READY"
2738 db_nsr_update["detailed-status"] = "Done"
2739 db_nslcmop_update["detailed-status"] = "Done"
2740 nslcmop_operation_state = "COMPLETED"
2741
2742 if db_nsr:
2743 self._write_ns_status(
2744 nsr_id=nsr_id,
2745 ns_state=ns_state,
2746 current_operation="IDLE",
2747 current_operation_id=None,
2748 error_description=error_description_nsr,
2749 error_detail=error_detail,
2750 other_update=db_nsr_update,
2751 )
2752 self._write_op_status(
2753 op_id=nslcmop_id,
2754 stage="",
2755 error_message=error_description_nslcmop,
2756 operation_state=nslcmop_operation_state,
2757 other_update=db_nslcmop_update,
2758 )
2759
2760 if nslcmop_operation_state:
2761 try:
2762 await self.msg.aiowrite(
2763 "ns",
2764 "instantiated",
2765 {
2766 "nsr_id": nsr_id,
2767 "nslcmop_id": nslcmop_id,
2768 "operationState": nslcmop_operation_state,
2769 },
2770 loop=self.loop,
2771 )
2772 except Exception as e:
2773 self.logger.error(
2774 logging_text + "kafka_write notification Exception {}".format(e)
2775 )
2776
2777 self.logger.debug(logging_text + "Exit")
2778 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2779
2780 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2781 if vnfd_id not in cached_vnfds:
2782 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2783 return cached_vnfds[vnfd_id]
2784
2785 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2786 if vnf_profile_id not in cached_vnfrs:
2787 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2788 "vnfrs",
2789 {
2790 "member-vnf-index-ref": vnf_profile_id,
2791 "nsr-id-ref": nsr_id,
2792 },
2793 )
2794 return cached_vnfrs[vnf_profile_id]
2795
2796 def _is_deployed_vca_in_relation(
2797 self, vca: DeployedVCA, relation: Relation
2798 ) -> bool:
2799 found = False
2800 for endpoint in (relation.provider, relation.requirer):
2801 if endpoint["kdu-resource-profile-id"]:
2802 continue
2803 found = (
2804 vca.vnf_profile_id == endpoint.vnf_profile_id
2805 and vca.vdu_profile_id == endpoint.vdu_profile_id
2806 and vca.execution_environment_ref == endpoint.execution_environment_ref
2807 )
2808 if found:
2809 break
2810 return found
2811
2812 def _update_ee_relation_data_with_implicit_data(
2813 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2814 ):
2815 ee_relation_data = safe_get_ee_relation(
2816 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2817 )
2818 ee_relation_level = EELevel.get_level(ee_relation_data)
2819 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2820 "execution-environment-ref"
2821 ]:
2822 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2823 vnfd_id = vnf_profile["vnfd-id"]
2824 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2825 entity_id = (
2826 vnfd_id
2827 if ee_relation_level == EELevel.VNF
2828 else ee_relation_data["vdu-profile-id"]
2829 )
2830 ee = get_juju_ee_ref(db_vnfd, entity_id)
2831 if not ee:
2832 raise Exception(
2833 f"not execution environments found for ee_relation {ee_relation_data}"
2834 )
2835 ee_relation_data["execution-environment-ref"] = ee["id"]
2836 return ee_relation_data
2837
2838 def _get_ns_relations(
2839 self,
2840 nsr_id: str,
2841 nsd: Dict[str, Any],
2842 vca: DeployedVCA,
2843 cached_vnfds: Dict[str, Any],
2844 ) -> List[Relation]:
2845 relations = []
2846 db_ns_relations = get_ns_configuration_relation_list(nsd)
2847 for r in db_ns_relations:
2848 provider_dict = None
2849 requirer_dict = None
2850 if all(key in r for key in ("provider", "requirer")):
2851 provider_dict = r["provider"]
2852 requirer_dict = r["requirer"]
2853 elif "entities" in r:
2854 provider_id = r["entities"][0]["id"]
2855 provider_dict = {
2856 "nsr-id": nsr_id,
2857 "endpoint": r["entities"][0]["endpoint"],
2858 }
2859 if provider_id != nsd["id"]:
2860 provider_dict["vnf-profile-id"] = provider_id
2861 requirer_id = r["entities"][1]["id"]
2862 requirer_dict = {
2863 "nsr-id": nsr_id,
2864 "endpoint": r["entities"][1]["endpoint"],
2865 }
2866 if requirer_id != nsd["id"]:
2867 requirer_dict["vnf-profile-id"] = requirer_id
2868 else:
2869 raise Exception("provider/requirer or entities must be included in the relation.")
2870 relation_provider = self._update_ee_relation_data_with_implicit_data(
2871 nsr_id, nsd, provider_dict, cached_vnfds
2872 )
2873 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2874 nsr_id, nsd, requirer_dict, cached_vnfds
2875 )
2876 provider = EERelation(relation_provider)
2877 requirer = EERelation(relation_requirer)
2878 relation = Relation(r["name"], provider, requirer)
2879 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2880 if vca_in_relation:
2881 relations.append(relation)
2882 return relations
2883
2884 def _get_vnf_relations(
2885 self,
2886 nsr_id: str,
2887 nsd: Dict[str, Any],
2888 vca: DeployedVCA,
2889 cached_vnfds: Dict[str, Any],
2890 ) -> List[Relation]:
2891 relations = []
2892 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2893 vnf_profile_id = vnf_profile["id"]
2894 vnfd_id = vnf_profile["vnfd-id"]
2895 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2896 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2897 for r in db_vnf_relations:
2898 provider_dict = None
2899 requirer_dict = None
2900 if all(key in r for key in ("provider", "requirer")):
2901 provider_dict = r["provider"]
2902 requirer_dict = r["requirer"]
2903 elif "entities" in r:
2904 provider_id = r["entities"][0]["id"]
2905 provider_dict = {
2906 "nsr-id": nsr_id,
2907 "vnf-profile-id": vnf_profile_id,
2908 "endpoint": r["entities"][0]["endpoint"],
2909 }
2910 if provider_id != vnfd_id:
2911 provider_dict["vdu-profile-id"] = provider_id
2912 requirer_id = r["entities"][1]["id"]
2913 requirer_dict = {
2914 "nsr-id": nsr_id,
2915 "vnf-profile-id": vnf_profile_id,
2916 "endpoint": r["entities"][1]["endpoint"],
2917 }
2918 if requirer_id != vnfd_id:
2919 requirer_dict["vdu-profile-id"] = requirer_id
2920 else:
2921 raise Exception("provider/requirer or entities must be included in the relation.")
2922 relation_provider = self._update_ee_relation_data_with_implicit_data(
2923 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2924 )
2925 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2926 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2927 )
2928 provider = EERelation(relation_provider)
2929 requirer = EERelation(relation_requirer)
2930 relation = Relation(r["name"], provider, requirer)
2931 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2932 if vca_in_relation:
2933 relations.append(relation)
2934 return relations
2935
2936 def _get_kdu_resource_data(
2937 self,
2938 ee_relation: EERelation,
2939 db_nsr: Dict[str, Any],
2940 cached_vnfds: Dict[str, Any],
2941 ) -> DeployedK8sResource:
2942 nsd = get_nsd(db_nsr)
2943 vnf_profiles = get_vnf_profiles(nsd)
2944 vnfd_id = find_in_list(
2945 vnf_profiles,
2946 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2947 )["vnfd-id"]
2948 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2949 kdu_resource_profile = get_kdu_resource_profile(
2950 db_vnfd, ee_relation.kdu_resource_profile_id
2951 )
2952 kdu_name = kdu_resource_profile["kdu-name"]
2953 deployed_kdu, _ = get_deployed_kdu(
2954 db_nsr.get("_admin", ()).get("deployed", ()),
2955 kdu_name,
2956 ee_relation.vnf_profile_id,
2957 )
2958 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2959 return deployed_kdu
2960
2961 def _get_deployed_component(
2962 self,
2963 ee_relation: EERelation,
2964 db_nsr: Dict[str, Any],
2965 cached_vnfds: Dict[str, Any],
2966 ) -> DeployedComponent:
2967 nsr_id = db_nsr["_id"]
2968 deployed_component = None
2969 ee_level = EELevel.get_level(ee_relation)
2970 if ee_level == EELevel.NS:
2971 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2972 if vca:
2973 deployed_component = DeployedVCA(nsr_id, vca)
2974 elif ee_level == EELevel.VNF:
2975 vca = get_deployed_vca(
2976 db_nsr,
2977 {
2978 "vdu_id": None,
2979 "member-vnf-index": ee_relation.vnf_profile_id,
2980 "ee_descriptor_id": ee_relation.execution_environment_ref,
2981 },
2982 )
2983 if vca:
2984 deployed_component = DeployedVCA(nsr_id, vca)
2985 elif ee_level == EELevel.VDU:
2986 vca = get_deployed_vca(
2987 db_nsr,
2988 {
2989 "vdu_id": ee_relation.vdu_profile_id,
2990 "member-vnf-index": ee_relation.vnf_profile_id,
2991 "ee_descriptor_id": ee_relation.execution_environment_ref,
2992 },
2993 )
2994 if vca:
2995 deployed_component = DeployedVCA(nsr_id, vca)
2996 elif ee_level == EELevel.KDU:
2997 kdu_resource_data = self._get_kdu_resource_data(
2998 ee_relation, db_nsr, cached_vnfds
2999 )
3000 if kdu_resource_data:
3001 deployed_component = DeployedK8sResource(kdu_resource_data)
3002 return deployed_component
3003
3004 async def _add_relation(
3005 self,
3006 relation: Relation,
3007 vca_type: str,
3008 db_nsr: Dict[str, Any],
3009 cached_vnfds: Dict[str, Any],
3010 cached_vnfrs: Dict[str, Any],
3011 ) -> bool:
3012 deployed_provider = self._get_deployed_component(
3013 relation.provider, db_nsr, cached_vnfds
3014 )
3015 deployed_requirer = self._get_deployed_component(
3016 relation.requirer, db_nsr, cached_vnfds
3017 )
3018 if (
3019 deployed_provider
3020 and deployed_requirer
3021 and deployed_provider.config_sw_installed
3022 and deployed_requirer.config_sw_installed
3023 ):
3024 provider_db_vnfr = (
3025 self._get_vnfr(
3026 relation.provider.nsr_id,
3027 relation.provider.vnf_profile_id,
3028 cached_vnfrs,
3029 )
3030 if relation.provider.vnf_profile_id
3031 else None
3032 )
3033 requirer_db_vnfr = (
3034 self._get_vnfr(
3035 relation.requirer.nsr_id,
3036 relation.requirer.vnf_profile_id,
3037 cached_vnfrs,
3038 )
3039 if relation.requirer.vnf_profile_id
3040 else None
3041 )
3042 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3043 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3044 provider_relation_endpoint = RelationEndpoint(
3045 deployed_provider.ee_id,
3046 provider_vca_id,
3047 relation.provider.endpoint,
3048 )
3049 requirer_relation_endpoint = RelationEndpoint(
3050 deployed_requirer.ee_id,
3051 requirer_vca_id,
3052 relation.requirer.endpoint,
3053 )
3054 await self.vca_map[vca_type].add_relation(
3055 provider=provider_relation_endpoint,
3056 requirer=requirer_relation_endpoint,
3057 )
3058 # remove entry from relations list
3059 return True
3060 return False
3061
3062 async def _add_vca_relations(
3063 self,
3064 logging_text,
3065 nsr_id,
3066 vca_type: str,
3067 vca_index: int,
3068 timeout: int = 3600,
3069 ) -> bool:
3070
3071 # steps:
3072 # 1. find all relations for this VCA
3073 # 2. wait for other peers related
3074 # 3. add relations
3075
3076 try:
3077 # STEP 1: find all relations for this VCA
3078
3079 # read nsr record
3080 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3081 nsd = get_nsd(db_nsr)
3082
3083 # this VCA data
3084 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3085 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3086
3087 cached_vnfds = {}
3088 cached_vnfrs = {}
3089 relations = []
3090 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3091 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3092
3093 # if no relations, terminate
3094 if not relations:
3095 self.logger.debug(logging_text + " No relations")
3096 return True
3097
3098 self.logger.debug(logging_text + " adding relations {}".format(relations))
3099
3100 # add all relations
3101 start = time()
3102 while True:
3103 # check timeout
3104 now = time()
3105 if now - start >= timeout:
3106 self.logger.error(logging_text + " : timeout adding relations")
3107 return False
3108
3109 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3110 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3111
3112 # for each relation, find the VCA's related
3113 for relation in relations.copy():
3114 added = await self._add_relation(
3115 relation,
3116 vca_type,
3117 db_nsr,
3118 cached_vnfds,
3119 cached_vnfrs,
3120 )
3121 if added:
3122 relations.remove(relation)
3123
3124 if not relations:
3125 self.logger.debug("Relations added")
3126 break
3127 await asyncio.sleep(5.0)
3128
3129 return True
3130
3131 except Exception as e:
3132 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3133 return False
3134
3135 async def _install_kdu(
3136 self,
3137 nsr_id: str,
3138 nsr_db_path: str,
3139 vnfr_data: dict,
3140 kdu_index: int,
3141 kdud: dict,
3142 vnfd: dict,
3143 k8s_instance_info: dict,
3144 k8params: dict = None,
3145 timeout: int = 600,
3146 vca_id: str = None,
3147 ):
3148
3149 try:
3150 k8sclustertype = k8s_instance_info["k8scluster-type"]
3151 # Instantiate kdu
3152 db_dict_install = {
3153 "collection": "nsrs",
3154 "filter": {"_id": nsr_id},
3155 "path": nsr_db_path,
3156 }
3157
3158 if k8s_instance_info.get("kdu-deployment-name"):
3159 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3160 else:
3161 kdu_instance = self.k8scluster_map[
3162 k8sclustertype
3163 ].generate_kdu_instance_name(
3164 db_dict=db_dict_install,
3165 kdu_model=k8s_instance_info["kdu-model"],
3166 kdu_name=k8s_instance_info["kdu-name"],
3167 )
3168
3169 # Update the nsrs table with the kdu-instance value
3170 self.update_db_2(
3171 item="nsrs",
3172 _id=nsr_id,
3173 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
3174 )
3175
3176 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3177 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3178 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3179 # namespace, this first verification could be removed, and the next step would be done for any kind
3180 # of KNF.
3181 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3182 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3183 if k8sclustertype in ("juju", "juju-bundle"):
3184 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3185 # that the user passed a namespace which he wants its KDU to be deployed in)
3186 if (
3187 self.db.count(
3188 table="nsrs",
3189 q_filter={
3190 "_id": nsr_id,
3191 "_admin.projects_write": k8s_instance_info["namespace"],
3192 "_admin.projects_read": k8s_instance_info["namespace"],
3193 },
3194 )
3195 > 0
3196 ):
3197 self.logger.debug(
3198 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3199 )
3200 self.update_db_2(
3201 item="nsrs",
3202 _id=nsr_id,
3203 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3204 )
3205 k8s_instance_info["namespace"] = kdu_instance
3206
3207 await self.k8scluster_map[k8sclustertype].install(
3208 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3209 kdu_model=k8s_instance_info["kdu-model"],
3210 atomic=True,
3211 params=k8params,
3212 db_dict=db_dict_install,
3213 timeout=timeout,
3214 kdu_name=k8s_instance_info["kdu-name"],
3215 namespace=k8s_instance_info["namespace"],
3216 kdu_instance=kdu_instance,
3217 vca_id=vca_id,
3218 )
3219
3220 # Obtain services to obtain management service ip
3221 services = await self.k8scluster_map[k8sclustertype].get_services(
3222 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3223 kdu_instance=kdu_instance,
3224 namespace=k8s_instance_info["namespace"],
3225 )
3226
3227 # Obtain management service info (if exists)
3228 vnfr_update_dict = {}
3229 kdu_config = get_configuration(vnfd, kdud["name"])
3230 if kdu_config:
3231 target_ee_list = kdu_config.get("execution-environment-list", [])
3232 else:
3233 target_ee_list = []
3234
3235 if services:
3236 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3237 mgmt_services = [
3238 service
3239 for service in kdud.get("service", [])
3240 if service.get("mgmt-service")
3241 ]
3242 for mgmt_service in mgmt_services:
3243 for service in services:
3244 if service["name"].startswith(mgmt_service["name"]):
3245 # Mgmt service found, Obtain service ip
3246 ip = service.get("external_ip", service.get("cluster_ip"))
3247 if isinstance(ip, list) and len(ip) == 1:
3248 ip = ip[0]
3249
3250 vnfr_update_dict[
3251 "kdur.{}.ip-address".format(kdu_index)
3252 ] = ip
3253
3254 # Check if must update also mgmt ip at the vnf
3255 service_external_cp = mgmt_service.get(
3256 "external-connection-point-ref"
3257 )
3258 if service_external_cp:
3259 if (
3260 deep_get(vnfd, ("mgmt-interface", "cp"))
3261 == service_external_cp
3262 ):
3263 vnfr_update_dict["ip-address"] = ip
3264
3265 if find_in_list(
3266 target_ee_list,
3267 lambda ee: ee.get(
3268 "external-connection-point-ref", ""
3269 )
3270 == service_external_cp,
3271 ):
3272 vnfr_update_dict[
3273 "kdur.{}.ip-address".format(kdu_index)
3274 ] = ip
3275 break
3276 else:
3277 self.logger.warn(
3278 "Mgmt service name: {} not found".format(
3279 mgmt_service["name"]
3280 )
3281 )
3282
3283 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3284 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3285
3286 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3287 if (
3288 kdu_config
3289 and kdu_config.get("initial-config-primitive")
3290 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3291 ):
3292 initial_config_primitive_list = kdu_config.get(
3293 "initial-config-primitive"
3294 )
3295 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3296
3297 for initial_config_primitive in initial_config_primitive_list:
3298 primitive_params_ = self._map_primitive_params(
3299 initial_config_primitive, {}, {}
3300 )
3301
3302 await asyncio.wait_for(
3303 self.k8scluster_map[k8sclustertype].exec_primitive(
3304 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3305 kdu_instance=kdu_instance,
3306 primitive_name=initial_config_primitive["name"],
3307 params=primitive_params_,
3308 db_dict=db_dict_install,
3309 vca_id=vca_id,
3310 ),
3311 timeout=timeout,
3312 )
3313
3314 except Exception as e:
3315 # Prepare update db with error and raise exception
3316 try:
3317 self.update_db_2(
3318 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3319 )
3320 self.update_db_2(
3321 "vnfrs",
3322 vnfr_data.get("_id"),
3323 {"kdur.{}.status".format(kdu_index): "ERROR"},
3324 )
3325 except Exception:
3326 # ignore to keep original exception
3327 pass
3328 # reraise original error
3329 raise
3330
3331 return kdu_instance
3332
3333 async def deploy_kdus(
3334 self,
3335 logging_text,
3336 nsr_id,
3337 nslcmop_id,
3338 db_vnfrs,
3339 db_vnfds,
3340 task_instantiation_info,
3341 ):
3342 # Launch kdus if present in the descriptor
3343
3344 k8scluster_id_2_uuic = {
3345 "helm-chart-v3": {},
3346 "helm-chart": {},
3347 "juju-bundle": {},
3348 }
3349
3350 async def _get_cluster_id(cluster_id, cluster_type):
3351 nonlocal k8scluster_id_2_uuic
3352 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3353 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3354
3355 # check if K8scluster is creating and wait look if previous tasks in process
3356 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3357 "k8scluster", cluster_id
3358 )
3359 if task_dependency:
3360 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3361 task_name, cluster_id
3362 )
3363 self.logger.debug(logging_text + text)
3364 await asyncio.wait(task_dependency, timeout=3600)
3365
3366 db_k8scluster = self.db.get_one(
3367 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3368 )
3369 if not db_k8scluster:
3370 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3371
3372 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3373 if not k8s_id:
3374 if cluster_type == "helm-chart-v3":
3375 try:
3376 # backward compatibility for existing clusters that have not been initialized for helm v3
3377 k8s_credentials = yaml.safe_dump(
3378 db_k8scluster.get("credentials")
3379 )
3380 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3381 k8s_credentials, reuse_cluster_uuid=cluster_id
3382 )
3383 db_k8scluster_update = {}
3384 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3385 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3386 db_k8scluster_update[
3387 "_admin.helm-chart-v3.created"
3388 ] = uninstall_sw
3389 db_k8scluster_update[
3390 "_admin.helm-chart-v3.operationalState"
3391 ] = "ENABLED"
3392 self.update_db_2(
3393 "k8sclusters", cluster_id, db_k8scluster_update
3394 )
3395 except Exception as e:
3396 self.logger.error(
3397 logging_text
3398 + "error initializing helm-v3 cluster: {}".format(str(e))
3399 )
3400 raise LcmException(
3401 "K8s cluster '{}' has not been initialized for '{}'".format(
3402 cluster_id, cluster_type
3403 )
3404 )
3405 else:
3406 raise LcmException(
3407 "K8s cluster '{}' has not been initialized for '{}'".format(
3408 cluster_id, cluster_type
3409 )
3410 )
3411 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3412 return k8s_id
3413
3414 logging_text += "Deploy kdus: "
3415 step = ""
3416 try:
3417 db_nsr_update = {"_admin.deployed.K8s": []}
3418 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3419
3420 index = 0
3421 updated_cluster_list = []
3422 updated_v3_cluster_list = []
3423
3424 for vnfr_data in db_vnfrs.values():
3425 vca_id = self.get_vca_id(vnfr_data, {})
3426 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3427 # Step 0: Prepare and set parameters
3428 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3429 vnfd_id = vnfr_data.get("vnfd-id")
3430 vnfd_with_id = find_in_list(
3431 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3432 )
3433 kdud = next(
3434 kdud
3435 for kdud in vnfd_with_id["kdu"]
3436 if kdud["name"] == kdur["kdu-name"]
3437 )
3438 namespace = kdur.get("k8s-namespace")
3439 kdu_deployment_name = kdur.get("kdu-deployment-name")
3440 if kdur.get("helm-chart"):
3441 kdumodel = kdur["helm-chart"]
3442 # Default version: helm3, if helm-version is v2 assign v2
3443 k8sclustertype = "helm-chart-v3"
3444 self.logger.debug("kdur: {}".format(kdur))
3445 if (
3446 kdur.get("helm-version")
3447 and kdur.get("helm-version") == "v2"
3448 ):
3449 k8sclustertype = "helm-chart"
3450 elif kdur.get("juju-bundle"):
3451 kdumodel = kdur["juju-bundle"]
3452 k8sclustertype = "juju-bundle"
3453 else:
3454 raise LcmException(
3455 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3456 "juju-bundle. Maybe an old NBI version is running".format(
3457 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3458 )
3459 )
3460 # check if kdumodel is a file and exists
3461 try:
3462 vnfd_with_id = find_in_list(
3463 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3464 )
3465 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3466 if storage: # may be not present if vnfd has not artifacts
3467 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3468 if storage["pkg-dir"]:
3469 filename = "{}/{}/{}s/{}".format(
3470 storage["folder"],
3471 storage["pkg-dir"],
3472 k8sclustertype,
3473 kdumodel,
3474 )
3475 else:
3476 filename = "{}/Scripts/{}s/{}".format(
3477 storage["folder"],
3478 k8sclustertype,
3479 kdumodel,
3480 )
3481 if self.fs.file_exists(
3482 filename, mode="file"
3483 ) or self.fs.file_exists(filename, mode="dir"):
3484 kdumodel = self.fs.path + filename
3485 except (asyncio.TimeoutError, asyncio.CancelledError):
3486 raise
3487 except Exception: # it is not a file
3488 pass
3489
3490 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3491 step = "Synchronize repos for k8s cluster '{}'".format(
3492 k8s_cluster_id
3493 )
3494 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3495
3496 # Synchronize repos
3497 if (
3498 k8sclustertype == "helm-chart"
3499 and cluster_uuid not in updated_cluster_list
3500 ) or (
3501 k8sclustertype == "helm-chart-v3"
3502 and cluster_uuid not in updated_v3_cluster_list
3503 ):
3504 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3505 self.k8scluster_map[k8sclustertype].synchronize_repos(
3506 cluster_uuid=cluster_uuid
3507 )
3508 )
3509 if del_repo_list or added_repo_dict:
3510 if k8sclustertype == "helm-chart":
3511 unset = {
3512 "_admin.helm_charts_added." + item: None
3513 for item in del_repo_list
3514 }
3515 updated = {
3516 "_admin.helm_charts_added." + item: name
3517 for item, name in added_repo_dict.items()
3518 }
3519 updated_cluster_list.append(cluster_uuid)
3520 elif k8sclustertype == "helm-chart-v3":
3521 unset = {
3522 "_admin.helm_charts_v3_added." + item: None
3523 for item in del_repo_list
3524 }
3525 updated = {
3526 "_admin.helm_charts_v3_added." + item: name
3527 for item, name in added_repo_dict.items()
3528 }
3529 updated_v3_cluster_list.append(cluster_uuid)
3530 self.logger.debug(
3531 logging_text + "repos synchronized on k8s cluster "
3532 "'{}' to_delete: {}, to_add: {}".format(
3533 k8s_cluster_id, del_repo_list, added_repo_dict
3534 )
3535 )
3536 self.db.set_one(
3537 "k8sclusters",
3538 {"_id": k8s_cluster_id},
3539 updated,
3540 unset=unset,
3541 )
3542
3543 # Instantiate kdu
3544 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3545 vnfr_data["member-vnf-index-ref"],
3546 kdur["kdu-name"],
3547 k8s_cluster_id,
3548 )
3549 k8s_instance_info = {
3550 "kdu-instance": None,
3551 "k8scluster-uuid": cluster_uuid,
3552 "k8scluster-type": k8sclustertype,
3553 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3554 "kdu-name": kdur["kdu-name"],
3555 "kdu-model": kdumodel,
3556 "namespace": namespace,
3557 "kdu-deployment-name": kdu_deployment_name,
3558 }
3559 db_path = "_admin.deployed.K8s.{}".format(index)
3560 db_nsr_update[db_path] = k8s_instance_info
3561 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3562 vnfd_with_id = find_in_list(
3563 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3564 )
3565 task = asyncio.ensure_future(
3566 self._install_kdu(
3567 nsr_id,
3568 db_path,
3569 vnfr_data,
3570 kdu_index,
3571 kdud,
3572 vnfd_with_id,
3573 k8s_instance_info,
3574 k8params=desc_params,
3575 timeout=1800,
3576 vca_id=vca_id,
3577 )
3578 )
3579 self.lcm_tasks.register(
3580 "ns",
3581 nsr_id,
3582 nslcmop_id,
3583 "instantiate_KDU-{}".format(index),
3584 task,
3585 )
3586 task_instantiation_info[task] = "Deploying KDU {}".format(
3587 kdur["kdu-name"]
3588 )
3589
3590 index += 1
3591
3592 except (LcmException, asyncio.CancelledError):
3593 raise
3594 except Exception as e:
3595 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3596 if isinstance(e, (N2VCException, DbException)):
3597 self.logger.error(logging_text + msg)
3598 else:
3599 self.logger.critical(logging_text + msg, exc_info=True)
3600 raise LcmException(msg)
3601 finally:
3602 if db_nsr_update:
3603 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3604
3605 def _deploy_n2vc(
3606 self,
3607 logging_text,
3608 db_nsr,
3609 db_vnfr,
3610 nslcmop_id,
3611 nsr_id,
3612 nsi_id,
3613 vnfd_id,
3614 vdu_id,
3615 kdu_name,
3616 member_vnf_index,
3617 vdu_index,
3618 vdu_name,
3619 deploy_params,
3620 descriptor_config,
3621 base_folder,
3622 task_instantiation_info,
3623 stage,
3624 ):
3625 # launch instantiate_N2VC in a asyncio task and register task object
3626 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3627 # if not found, create one entry and update database
3628 # fill db_nsr._admin.deployed.VCA.<index>
3629
3630 self.logger.debug(
3631 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3632 )
3633 if "execution-environment-list" in descriptor_config:
3634 ee_list = descriptor_config.get("execution-environment-list", [])
3635 elif "juju" in descriptor_config:
3636 ee_list = [descriptor_config] # ns charms
3637 else: # other types as script are not supported
3638 ee_list = []
3639
3640 for ee_item in ee_list:
3641 self.logger.debug(
3642 logging_text
3643 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3644 ee_item.get("juju"), ee_item.get("helm-chart")
3645 )
3646 )
3647 ee_descriptor_id = ee_item.get("id")
3648 if ee_item.get("juju"):
3649 vca_name = ee_item["juju"].get("charm")
3650 vca_type = (
3651 "lxc_proxy_charm"
3652 if ee_item["juju"].get("charm") is not None
3653 else "native_charm"
3654 )
3655 if ee_item["juju"].get("cloud") == "k8s":
3656 vca_type = "k8s_proxy_charm"
3657 elif ee_item["juju"].get("proxy") is False:
3658 vca_type = "native_charm"
3659 elif ee_item.get("helm-chart"):
3660 vca_name = ee_item["helm-chart"]
3661 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3662 vca_type = "helm"
3663 else:
3664 vca_type = "helm-v3"
3665 else:
3666 self.logger.debug(
3667 logging_text + "skipping non juju neither charm configuration"
3668 )
3669 continue
3670
3671 vca_index = -1
3672 for vca_index, vca_deployed in enumerate(
3673 db_nsr["_admin"]["deployed"]["VCA"]
3674 ):
3675 if not vca_deployed:
3676 continue
3677 if (
3678 vca_deployed.get("member-vnf-index") == member_vnf_index
3679 and vca_deployed.get("vdu_id") == vdu_id
3680 and vca_deployed.get("kdu_name") == kdu_name
3681 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3682 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3683 ):
3684 break
3685 else:
3686 # not found, create one.
3687 target = (
3688 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3689 )
3690 if vdu_id:
3691 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3692 elif kdu_name:
3693 target += "/kdu/{}".format(kdu_name)
3694 vca_deployed = {
3695 "target_element": target,
3696 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3697 "member-vnf-index": member_vnf_index,
3698 "vdu_id": vdu_id,
3699 "kdu_name": kdu_name,
3700 "vdu_count_index": vdu_index,
3701 "operational-status": "init", # TODO revise
3702 "detailed-status": "", # TODO revise
3703 "step": "initial-deploy", # TODO revise
3704 "vnfd_id": vnfd_id,
3705 "vdu_name": vdu_name,
3706 "type": vca_type,
3707 "ee_descriptor_id": ee_descriptor_id,
3708 }
3709 vca_index += 1
3710
3711 # create VCA and configurationStatus in db
3712 db_dict = {
3713 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3714 "configurationStatus.{}".format(vca_index): dict(),
3715 }
3716 self.update_db_2("nsrs", nsr_id, db_dict)
3717
3718 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3719
3720 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3721 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3722 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3723
3724 # Launch task
3725 task_n2vc = asyncio.ensure_future(
3726 self.instantiate_N2VC(
3727 logging_text=logging_text,
3728 vca_index=vca_index,
3729 nsi_id=nsi_id,
3730 db_nsr=db_nsr,
3731 db_vnfr=db_vnfr,
3732 vdu_id=vdu_id,
3733 kdu_name=kdu_name,
3734 vdu_index=vdu_index,
3735 deploy_params=deploy_params,
3736 config_descriptor=descriptor_config,
3737 base_folder=base_folder,
3738 nslcmop_id=nslcmop_id,
3739 stage=stage,
3740 vca_type=vca_type,
3741 vca_name=vca_name,
3742 ee_config_descriptor=ee_item,
3743 )
3744 )
3745 self.lcm_tasks.register(
3746 "ns",
3747 nsr_id,
3748 nslcmop_id,
3749 "instantiate_N2VC-{}".format(vca_index),
3750 task_n2vc,
3751 )
3752 task_instantiation_info[
3753 task_n2vc
3754 ] = self.task_name_deploy_vca + " {}.{}".format(
3755 member_vnf_index or "", vdu_id or ""
3756 )
3757
3758 @staticmethod
3759 def _create_nslcmop(nsr_id, operation, params):
3760 """
3761 Creates a ns-lcm-opp content to be stored at database.
3762 :param nsr_id: internal id of the instance
3763 :param operation: instantiate, terminate, scale, action, ...
3764 :param params: user parameters for the operation
3765 :return: dictionary following SOL005 format
3766 """
3767 # Raise exception if invalid arguments
3768 if not (nsr_id and operation and params):
3769 raise LcmException(
3770 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3771 )
3772 now = time()
3773 _id = str(uuid4())
3774 nslcmop = {
3775 "id": _id,
3776 "_id": _id,
3777 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3778 "operationState": "PROCESSING",
3779 "statusEnteredTime": now,
3780 "nsInstanceId": nsr_id,
3781 "lcmOperationType": operation,
3782 "startTime": now,
3783 "isAutomaticInvocation": False,
3784 "operationParams": params,
3785 "isCancelPending": False,
3786 "links": {
3787 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3788 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3789 },
3790 }
3791 return nslcmop
3792
3793 def _format_additional_params(self, params):
3794 params = params or {}
3795 for key, value in params.items():
3796 if str(value).startswith("!!yaml "):
3797 params[key] = yaml.safe_load(value[7:])
3798 return params
3799
3800 def _get_terminate_primitive_params(self, seq, vnf_index):
3801 primitive = seq.get("name")
3802 primitive_params = {}
3803 params = {
3804 "member_vnf_index": vnf_index,
3805 "primitive": primitive,
3806 "primitive_params": primitive_params,
3807 }
3808 desc_params = {}
3809 return self._map_primitive_params(seq, params, desc_params)
3810
3811 # sub-operations
3812
3813 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3814 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3815 if op.get("operationState") == "COMPLETED":
3816 # b. Skip sub-operation
3817 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3818 return self.SUBOPERATION_STATUS_SKIP
3819 else:
3820 # c. retry executing sub-operation
3821 # The sub-operation exists, and operationState != 'COMPLETED'
3822 # Update operationState = 'PROCESSING' to indicate a retry.
3823 operationState = "PROCESSING"
3824 detailed_status = "In progress"
3825 self._update_suboperation_status(
3826 db_nslcmop, op_index, operationState, detailed_status
3827 )
3828 # Return the sub-operation index
3829 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3830 # with arguments extracted from the sub-operation
3831 return op_index
3832
3833 # Find a sub-operation where all keys in a matching dictionary must match
3834 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3835 def _find_suboperation(self, db_nslcmop, match):
3836 if db_nslcmop and match:
3837 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3838 for i, op in enumerate(op_list):
3839 if all(op.get(k) == match[k] for k in match):
3840 return i
3841 return self.SUBOPERATION_STATUS_NOT_FOUND
3842
3843 # Update status for a sub-operation given its index
3844 def _update_suboperation_status(
3845 self, db_nslcmop, op_index, operationState, detailed_status
3846 ):
3847 # Update DB for HA tasks
3848 q_filter = {"_id": db_nslcmop["_id"]}
3849 update_dict = {
3850 "_admin.operations.{}.operationState".format(op_index): operationState,
3851 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3852 }
3853 self.db.set_one(
3854 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3855 )
3856
3857 # Add sub-operation, return the index of the added sub-operation
3858 # Optionally, set operationState, detailed-status, and operationType
3859 # Status and type are currently set for 'scale' sub-operations:
3860 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3861 # 'detailed-status' : status message
3862 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3863 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3864 def _add_suboperation(
3865 self,
3866 db_nslcmop,
3867 vnf_index,
3868 vdu_id,
3869 vdu_count_index,
3870 vdu_name,
3871 primitive,
3872 mapped_primitive_params,
3873 operationState=None,
3874 detailed_status=None,
3875 operationType=None,
3876 RO_nsr_id=None,
3877 RO_scaling_info=None,
3878 ):
3879 if not db_nslcmop:
3880 return self.SUBOPERATION_STATUS_NOT_FOUND
3881 # Get the "_admin.operations" list, if it exists
3882 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3883 op_list = db_nslcmop_admin.get("operations")
3884 # Create or append to the "_admin.operations" list
3885 new_op = {
3886 "member_vnf_index": vnf_index,
3887 "vdu_id": vdu_id,
3888 "vdu_count_index": vdu_count_index,
3889 "primitive": primitive,
3890 "primitive_params": mapped_primitive_params,
3891 }
3892 if operationState:
3893 new_op["operationState"] = operationState
3894 if detailed_status:
3895 new_op["detailed-status"] = detailed_status
3896 if operationType:
3897 new_op["lcmOperationType"] = operationType
3898 if RO_nsr_id:
3899 new_op["RO_nsr_id"] = RO_nsr_id
3900 if RO_scaling_info:
3901 new_op["RO_scaling_info"] = RO_scaling_info
3902 if not op_list:
3903 # No existing operations, create key 'operations' with current operation as first list element
3904 db_nslcmop_admin.update({"operations": [new_op]})
3905 op_list = db_nslcmop_admin.get("operations")
3906 else:
3907 # Existing operations, append operation to list
3908 op_list.append(new_op)
3909
3910 db_nslcmop_update = {"_admin.operations": op_list}
3911 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3912 op_index = len(op_list) - 1
3913 return op_index
3914
3915 # Helper methods for scale() sub-operations
3916
3917 # pre-scale/post-scale:
3918 # Check for 3 different cases:
3919 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3920 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3921 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3922 def _check_or_add_scale_suboperation(
3923 self,
3924 db_nslcmop,
3925 vnf_index,
3926 vnf_config_primitive,
3927 primitive_params,
3928 operationType,
3929 RO_nsr_id=None,
3930 RO_scaling_info=None,
3931 ):
3932 # Find this sub-operation
3933 if RO_nsr_id and RO_scaling_info:
3934 operationType = "SCALE-RO"
3935 match = {
3936 "member_vnf_index": vnf_index,
3937 "RO_nsr_id": RO_nsr_id,
3938 "RO_scaling_info": RO_scaling_info,
3939 }
3940 else:
3941 match = {
3942 "member_vnf_index": vnf_index,
3943 "primitive": vnf_config_primitive,
3944 "primitive_params": primitive_params,
3945 "lcmOperationType": operationType,
3946 }
3947 op_index = self._find_suboperation(db_nslcmop, match)
3948 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3949 # a. New sub-operation
3950 # The sub-operation does not exist, add it.
3951 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3952 # The following parameters are set to None for all kind of scaling:
3953 vdu_id = None
3954 vdu_count_index = None
3955 vdu_name = None
3956 if RO_nsr_id and RO_scaling_info:
3957 vnf_config_primitive = None
3958 primitive_params = None
3959 else:
3960 RO_nsr_id = None
3961 RO_scaling_info = None
3962 # Initial status for sub-operation
3963 operationState = "PROCESSING"
3964 detailed_status = "In progress"
3965 # Add sub-operation for pre/post-scaling (zero or more operations)
3966 self._add_suboperation(
3967 db_nslcmop,
3968 vnf_index,
3969 vdu_id,
3970 vdu_count_index,
3971 vdu_name,
3972 vnf_config_primitive,
3973 primitive_params,
3974 operationState,
3975 detailed_status,
3976 operationType,
3977 RO_nsr_id,
3978 RO_scaling_info,
3979 )
3980 return self.SUBOPERATION_STATUS_NEW
3981 else:
3982 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3983 # or op_index (operationState != 'COMPLETED')
3984 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3985
3986 # Function to return execution_environment id
3987
3988 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3989 # TODO vdu_index_count
3990 for vca in vca_deployed_list:
3991 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3992 return vca["ee_id"]
3993
3994 async def destroy_N2VC(
3995 self,
3996 logging_text,
3997 db_nslcmop,
3998 vca_deployed,
3999 config_descriptor,
4000 vca_index,
4001 destroy_ee=True,
4002 exec_primitives=True,
4003 scaling_in=False,
4004 vca_id: str = None,
4005 ):
4006 """
4007 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4008 :param logging_text:
4009 :param db_nslcmop:
4010 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4011 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4012 :param vca_index: index in the database _admin.deployed.VCA
4013 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4014 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4015 not executed properly
4016 :param scaling_in: True destroys the application, False destroys the model
4017 :return: None or exception
4018 """
4019
4020 self.logger.debug(
4021 logging_text
4022 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4023 vca_index, vca_deployed, config_descriptor, destroy_ee
4024 )
4025 )
4026
4027 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4028
4029 # execute terminate_primitives
4030 if exec_primitives:
4031 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4032 config_descriptor.get("terminate-config-primitive"),
4033 vca_deployed.get("ee_descriptor_id"),
4034 )
4035 vdu_id = vca_deployed.get("vdu_id")
4036 vdu_count_index = vca_deployed.get("vdu_count_index")
4037 vdu_name = vca_deployed.get("vdu_name")
4038 vnf_index = vca_deployed.get("member-vnf-index")
4039 if terminate_primitives and vca_deployed.get("needed_terminate"):
4040 for seq in terminate_primitives:
4041 # For each sequence in list, get primitive and call _ns_execute_primitive()
4042 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4043 vnf_index, seq.get("name")
4044 )
4045 self.logger.debug(logging_text + step)
4046 # Create the primitive for each sequence, i.e. "primitive": "touch"
4047 primitive = seq.get("name")
4048 mapped_primitive_params = self._get_terminate_primitive_params(
4049 seq, vnf_index
4050 )
4051
4052 # Add sub-operation
4053 self._add_suboperation(
4054 db_nslcmop,
4055 vnf_index,
4056 vdu_id,
4057 vdu_count_index,
4058 vdu_name,
4059 primitive,
4060 mapped_primitive_params,
4061 )
4062 # Sub-operations: Call _ns_execute_primitive() instead of action()
4063 try:
4064 result, result_detail = await self._ns_execute_primitive(
4065 vca_deployed["ee_id"],
4066 primitive,
4067 mapped_primitive_params,
4068 vca_type=vca_type,
4069 vca_id=vca_id,
4070 )
4071 except LcmException:
4072 # this happens when VCA is not deployed. In this case it is not needed to terminate
4073 continue
4074 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4075 if result not in result_ok:
4076 raise LcmException(
4077 "terminate_primitive {} for vnf_member_index={} fails with "
4078 "error {}".format(seq.get("name"), vnf_index, result_detail)
4079 )
4080 # set that this VCA do not need terminated
4081 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4082 vca_index
4083 )
4084 self.update_db_2(
4085 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4086 )
4087
4088 # Delete Prometheus Jobs if any
4089 # This uses NSR_ID, so it will destroy any jobs under this index
4090 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4091
4092 if destroy_ee:
4093 await self.vca_map[vca_type].delete_execution_environment(
4094 vca_deployed["ee_id"],
4095 scaling_in=scaling_in,
4096 vca_type=vca_type,
4097 vca_id=vca_id,
4098 )
4099
4100 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4101 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4102 namespace = "." + db_nsr["_id"]
4103 try:
4104 await self.n2vc.delete_namespace(
4105 namespace=namespace,
4106 total_timeout=self.timeout_charm_delete,
4107 vca_id=vca_id,
4108 )
4109 except N2VCNotFound: # already deleted. Skip
4110 pass
4111 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4112
4113 async def _terminate_RO(
4114 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4115 ):
4116 """
4117 Terminates a deployment from RO
4118 :param logging_text:
4119 :param nsr_deployed: db_nsr._admin.deployed
4120 :param nsr_id:
4121 :param nslcmop_id:
4122 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4123 this method will update only the index 2, but it will write on database the concatenated content of the list
4124 :return:
4125 """
4126 db_nsr_update = {}
4127 failed_detail = []
4128 ro_nsr_id = ro_delete_action = None
4129 if nsr_deployed and nsr_deployed.get("RO"):
4130 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4131 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4132 try:
4133 if ro_nsr_id:
4134 stage[2] = "Deleting ns from VIM."
4135 db_nsr_update["detailed-status"] = " ".join(stage)
4136 self._write_op_status(nslcmop_id, stage)
4137 self.logger.debug(logging_text + stage[2])
4138 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4139 self._write_op_status(nslcmop_id, stage)
4140 desc = await self.RO.delete("ns", ro_nsr_id)
4141 ro_delete_action = desc["action_id"]
4142 db_nsr_update[
4143 "_admin.deployed.RO.nsr_delete_action_id"
4144 ] = ro_delete_action
4145 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4146 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4147 if ro_delete_action:
4148 # wait until NS is deleted from VIM
4149 stage[2] = "Waiting ns deleted from VIM."
4150 detailed_status_old = None
4151 self.logger.debug(
4152 logging_text
4153 + stage[2]
4154 + " RO_id={} ro_delete_action={}".format(
4155 ro_nsr_id, ro_delete_action
4156 )
4157 )
4158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4159 self._write_op_status(nslcmop_id, stage)
4160
4161 delete_timeout = 20 * 60 # 20 minutes
4162 while delete_timeout > 0:
4163 desc = await self.RO.show(
4164 "ns",
4165 item_id_name=ro_nsr_id,
4166 extra_item="action",
4167 extra_item_id=ro_delete_action,
4168 )
4169
4170 # deploymentStatus
4171 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4172
4173 ns_status, ns_status_info = self.RO.check_action_status(desc)
4174 if ns_status == "ERROR":
4175 raise ROclient.ROClientException(ns_status_info)
4176 elif ns_status == "BUILD":
4177 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4178 elif ns_status == "ACTIVE":
4179 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4180 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4181 break
4182 else:
4183 assert (
4184 False
4185 ), "ROclient.check_action_status returns unknown {}".format(
4186 ns_status
4187 )
4188 if stage[2] != detailed_status_old:
4189 detailed_status_old = stage[2]
4190 db_nsr_update["detailed-status"] = " ".join(stage)
4191 self._write_op_status(nslcmop_id, stage)
4192 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4193 await asyncio.sleep(5, loop=self.loop)
4194 delete_timeout -= 5
4195 else: # delete_timeout <= 0:
4196 raise ROclient.ROClientException(
4197 "Timeout waiting ns deleted from VIM"
4198 )
4199
4200 except Exception as e:
4201 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4202 if (
4203 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4204 ): # not found
4205 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4206 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4207 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4208 self.logger.debug(
4209 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4210 )
4211 elif (
4212 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4213 ): # conflict
4214 failed_detail.append("delete conflict: {}".format(e))
4215 self.logger.debug(
4216 logging_text
4217 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4218 )
4219 else:
4220 failed_detail.append("delete error: {}".format(e))
4221 self.logger.error(
4222 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4223 )
4224
4225 # Delete nsd
4226 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4227 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4228 try:
4229 stage[2] = "Deleting nsd from RO."
4230 db_nsr_update["detailed-status"] = " ".join(stage)
4231 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4232 self._write_op_status(nslcmop_id, stage)
4233 await self.RO.delete("nsd", ro_nsd_id)
4234 self.logger.debug(
4235 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4236 )
4237 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4238 except Exception as e:
4239 if (
4240 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4241 ): # not found
4242 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4243 self.logger.debug(
4244 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4245 )
4246 elif (
4247 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4248 ): # conflict
4249 failed_detail.append(
4250 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4251 )
4252 self.logger.debug(logging_text + failed_detail[-1])
4253 else:
4254 failed_detail.append(
4255 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4256 )
4257 self.logger.error(logging_text + failed_detail[-1])
4258
4259 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4260 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4261 if not vnf_deployed or not vnf_deployed["id"]:
4262 continue
4263 try:
4264 ro_vnfd_id = vnf_deployed["id"]
4265 stage[
4266 2
4267 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4268 vnf_deployed["member-vnf-index"], ro_vnfd_id
4269 )
4270 db_nsr_update["detailed-status"] = " ".join(stage)
4271 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4272 self._write_op_status(nslcmop_id, stage)
4273 await self.RO.delete("vnfd", ro_vnfd_id)
4274 self.logger.debug(
4275 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4276 )
4277 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4278 except Exception as e:
4279 if (
4280 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4281 ): # not found
4282 db_nsr_update[
4283 "_admin.deployed.RO.vnfd.{}.id".format(index)
4284 ] = None
4285 self.logger.debug(
4286 logging_text
4287 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4288 )
4289 elif (
4290 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4291 ): # conflict
4292 failed_detail.append(
4293 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4294 )
4295 self.logger.debug(logging_text + failed_detail[-1])
4296 else:
4297 failed_detail.append(
4298 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4299 )
4300 self.logger.error(logging_text + failed_detail[-1])
4301
4302 if failed_detail:
4303 stage[2] = "Error deleting from VIM"
4304 else:
4305 stage[2] = "Deleted from VIM"
4306 db_nsr_update["detailed-status"] = " ".join(stage)
4307 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4308 self._write_op_status(nslcmop_id, stage)
4309
4310 if failed_detail:
4311 raise LcmException("; ".join(failed_detail))
4312
4313 async def terminate(self, nsr_id, nslcmop_id):
4314 # Try to lock HA task here
4315 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4316 if not task_is_locked_by_me:
4317 return
4318
4319 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4320 self.logger.debug(logging_text + "Enter")
4321 timeout_ns_terminate = self.timeout_ns_terminate
4322 db_nsr = None
4323 db_nslcmop = None
4324 operation_params = None
4325 exc = None
4326 error_list = [] # annotates all failed error messages
4327 db_nslcmop_update = {}
4328 autoremove = False # autoremove after terminated
4329 tasks_dict_info = {}
4330 db_nsr_update = {}
4331 stage = [
4332 "Stage 1/3: Preparing task.",
4333 "Waiting for previous operations to terminate.",
4334 "",
4335 ]
4336 # ^ contains [stage, step, VIM-status]
4337 try:
4338 # wait for any previous tasks in process
4339 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4340
4341 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4342 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4343 operation_params = db_nslcmop.get("operationParams") or {}
4344 if operation_params.get("timeout_ns_terminate"):
4345 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4346 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4347 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4348
4349 db_nsr_update["operational-status"] = "terminating"
4350 db_nsr_update["config-status"] = "terminating"
4351 self._write_ns_status(
4352 nsr_id=nsr_id,
4353 ns_state="TERMINATING",
4354 current_operation="TERMINATING",
4355 current_operation_id=nslcmop_id,
4356 other_update=db_nsr_update,
4357 )
4358 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4359 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4360 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4361 return
4362
4363 stage[1] = "Getting vnf descriptors from db."
4364 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4365 db_vnfrs_dict = {
4366 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4367 }
4368 db_vnfds_from_id = {}
4369 db_vnfds_from_member_index = {}
4370 # Loop over VNFRs
4371 for vnfr in db_vnfrs_list:
4372 vnfd_id = vnfr["vnfd-id"]
4373 if vnfd_id not in db_vnfds_from_id:
4374 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4375 db_vnfds_from_id[vnfd_id] = vnfd
4376 db_vnfds_from_member_index[
4377 vnfr["member-vnf-index-ref"]
4378 ] = db_vnfds_from_id[vnfd_id]
4379
4380 # Destroy individual execution environments when there are terminating primitives.
4381 # Rest of EE will be deleted at once
4382 # TODO - check before calling _destroy_N2VC
4383 # if not operation_params.get("skip_terminate_primitives"):#
4384 # or not vca.get("needed_terminate"):
4385 stage[0] = "Stage 2/3 execute terminating primitives."
4386 self.logger.debug(logging_text + stage[0])
4387 stage[1] = "Looking execution environment that needs terminate."
4388 self.logger.debug(logging_text + stage[1])
4389
4390 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4391 config_descriptor = None
4392 vca_member_vnf_index = vca.get("member-vnf-index")
4393 vca_id = self.get_vca_id(
4394 db_vnfrs_dict.get(vca_member_vnf_index)
4395 if vca_member_vnf_index
4396 else None,
4397 db_nsr,
4398 )
4399 if not vca or not vca.get("ee_id"):
4400 continue
4401 if not vca.get("member-vnf-index"):
4402 # ns
4403 config_descriptor = db_nsr.get("ns-configuration")
4404 elif vca.get("vdu_id"):
4405 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4406 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4407 elif vca.get("kdu_name"):
4408 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4409 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4410 else:
4411 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4412 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4413 vca_type = vca.get("type")
4414 exec_terminate_primitives = not operation_params.get(
4415 "skip_terminate_primitives"
4416 ) and vca.get("needed_terminate")
4417 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4418 # pending native charms
4419 destroy_ee = (
4420 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4421 )
4422 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4423 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4424 task = asyncio.ensure_future(
4425 self.destroy_N2VC(
4426 logging_text,
4427 db_nslcmop,
4428 vca,
4429 config_descriptor,
4430 vca_index,
4431 destroy_ee,
4432 exec_terminate_primitives,
4433 vca_id=vca_id,
4434 )
4435 )
4436 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4437
4438 # wait for pending tasks of terminate primitives
4439 if tasks_dict_info:
4440 self.logger.debug(
4441 logging_text
4442 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4443 )
4444 error_list = await self._wait_for_tasks(
4445 logging_text,
4446 tasks_dict_info,
4447 min(self.timeout_charm_delete, timeout_ns_terminate),
4448 stage,
4449 nslcmop_id,
4450 )
4451 tasks_dict_info.clear()
4452 if error_list:
4453 return # raise LcmException("; ".join(error_list))
4454
4455 # remove All execution environments at once
4456 stage[0] = "Stage 3/3 delete all."
4457
4458 if nsr_deployed.get("VCA"):
4459 stage[1] = "Deleting all execution environments."
4460 self.logger.debug(logging_text + stage[1])
4461 vca_id = self.get_vca_id({}, db_nsr)
4462 task_delete_ee = asyncio.ensure_future(
4463 asyncio.wait_for(
4464 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4465 timeout=self.timeout_charm_delete,
4466 )
4467 )
4468 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4469 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4470
4471 # Delete from k8scluster
4472 stage[1] = "Deleting KDUs."
4473 self.logger.debug(logging_text + stage[1])
4474 # print(nsr_deployed)
4475 for kdu in get_iterable(nsr_deployed, "K8s"):
4476 if not kdu or not kdu.get("kdu-instance"):
4477 continue
4478 kdu_instance = kdu.get("kdu-instance")
4479 if kdu.get("k8scluster-type") in self.k8scluster_map:
4480 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4481 vca_id = self.get_vca_id({}, db_nsr)
4482 task_delete_kdu_instance = asyncio.ensure_future(
4483 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4484 cluster_uuid=kdu.get("k8scluster-uuid"),
4485 kdu_instance=kdu_instance,
4486 vca_id=vca_id,
4487 )
4488 )
4489 else:
4490 self.logger.error(
4491 logging_text
4492 + "Unknown k8s deployment type {}".format(
4493 kdu.get("k8scluster-type")
4494 )
4495 )
4496 continue
4497 tasks_dict_info[
4498 task_delete_kdu_instance
4499 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4500
4501 # remove from RO
4502 stage[1] = "Deleting ns from VIM."
4503 if self.ng_ro:
4504 task_delete_ro = asyncio.ensure_future(
4505 self._terminate_ng_ro(
4506 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4507 )
4508 )
4509 else:
4510 task_delete_ro = asyncio.ensure_future(
4511 self._terminate_RO(
4512 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4513 )
4514 )
4515 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4516
4517 # rest of staff will be done at finally
4518
4519 except (
4520 ROclient.ROClientException,
4521 DbException,
4522 LcmException,
4523 N2VCException,
4524 ) as e:
4525 self.logger.error(logging_text + "Exit Exception {}".format(e))
4526 exc = e
4527 except asyncio.CancelledError:
4528 self.logger.error(
4529 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4530 )
4531 exc = "Operation was cancelled"
4532 except Exception as e:
4533 exc = traceback.format_exc()
4534 self.logger.critical(
4535 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4536 exc_info=True,
4537 )
4538 finally:
4539 if exc:
4540 error_list.append(str(exc))
4541 try:
4542 # wait for pending tasks
4543 if tasks_dict_info:
4544 stage[1] = "Waiting for terminate pending tasks."
4545 self.logger.debug(logging_text + stage[1])
4546 error_list += await self._wait_for_tasks(
4547 logging_text,
4548 tasks_dict_info,
4549 timeout_ns_terminate,
4550 stage,
4551 nslcmop_id,
4552 )
4553 stage[1] = stage[2] = ""
4554 except asyncio.CancelledError:
4555 error_list.append("Cancelled")
4556 # TODO cancell all tasks
4557 except Exception as exc:
4558 error_list.append(str(exc))
4559 # update status at database
4560 if error_list:
4561 error_detail = "; ".join(error_list)
4562 # self.logger.error(logging_text + error_detail)
4563 error_description_nslcmop = "{} Detail: {}".format(
4564 stage[0], error_detail
4565 )
4566 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4567 nslcmop_id, stage[0]
4568 )
4569
4570 db_nsr_update["operational-status"] = "failed"
4571 db_nsr_update["detailed-status"] = (
4572 error_description_nsr + " Detail: " + error_detail
4573 )
4574 db_nslcmop_update["detailed-status"] = error_detail
4575 nslcmop_operation_state = "FAILED"
4576 ns_state = "BROKEN"
4577 else:
4578 error_detail = None
4579 error_description_nsr = error_description_nslcmop = None
4580 ns_state = "NOT_INSTANTIATED"
4581 db_nsr_update["operational-status"] = "terminated"
4582 db_nsr_update["detailed-status"] = "Done"
4583 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4584 db_nslcmop_update["detailed-status"] = "Done"
4585 nslcmop_operation_state = "COMPLETED"
4586
4587 if db_nsr:
4588 self._write_ns_status(
4589 nsr_id=nsr_id,
4590 ns_state=ns_state,
4591 current_operation="IDLE",
4592 current_operation_id=None,
4593 error_description=error_description_nsr,
4594 error_detail=error_detail,
4595 other_update=db_nsr_update,
4596 )
4597 self._write_op_status(
4598 op_id=nslcmop_id,
4599 stage="",
4600 error_message=error_description_nslcmop,
4601 operation_state=nslcmop_operation_state,
4602 other_update=db_nslcmop_update,
4603 )
4604 if ns_state == "NOT_INSTANTIATED":
4605 try:
4606 self.db.set_list(
4607 "vnfrs",
4608 {"nsr-id-ref": nsr_id},
4609 {"_admin.nsState": "NOT_INSTANTIATED"},
4610 )
4611 except DbException as e:
4612 self.logger.warn(
4613 logging_text
4614 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4615 nsr_id, e
4616 )
4617 )
4618 if operation_params:
4619 autoremove = operation_params.get("autoremove", False)
4620 if nslcmop_operation_state:
4621 try:
4622 await self.msg.aiowrite(
4623 "ns",
4624 "terminated",
4625 {
4626 "nsr_id": nsr_id,
4627 "nslcmop_id": nslcmop_id,
4628 "operationState": nslcmop_operation_state,
4629 "autoremove": autoremove,
4630 },
4631 loop=self.loop,
4632 )
4633 except Exception as e:
4634 self.logger.error(
4635 logging_text + "kafka_write notification Exception {}".format(e)
4636 )
4637
4638 self.logger.debug(logging_text + "Exit")
4639 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4640
4641 async def _wait_for_tasks(
4642 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4643 ):
4644 time_start = time()
4645 error_detail_list = []
4646 error_list = []
4647 pending_tasks = list(created_tasks_info.keys())
4648 num_tasks = len(pending_tasks)
4649 num_done = 0
4650 stage[1] = "{}/{}.".format(num_done, num_tasks)
4651 self._write_op_status(nslcmop_id, stage)
4652 while pending_tasks:
4653 new_error = None
4654 _timeout = timeout + time_start - time()
4655 done, pending_tasks = await asyncio.wait(
4656 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4657 )
4658 num_done += len(done)
4659 if not done: # Timeout
4660 for task in pending_tasks:
4661 new_error = created_tasks_info[task] + ": Timeout"
4662 error_detail_list.append(new_error)
4663 error_list.append(new_error)
4664 break
4665 for task in done:
4666 if task.cancelled():
4667 exc = "Cancelled"
4668 else:
4669 exc = task.exception()
4670 if exc:
4671 if isinstance(exc, asyncio.TimeoutError):
4672 exc = "Timeout"
4673 new_error = created_tasks_info[task] + ": {}".format(exc)
4674 error_list.append(created_tasks_info[task])
4675 error_detail_list.append(new_error)
4676 if isinstance(
4677 exc,
4678 (
4679 str,
4680 DbException,
4681 N2VCException,
4682 ROclient.ROClientException,
4683 LcmException,
4684 K8sException,
4685 NgRoException,
4686 ),
4687 ):
4688 self.logger.error(logging_text + new_error)
4689 else:
4690 exc_traceback = "".join(
4691 traceback.format_exception(None, exc, exc.__traceback__)
4692 )
4693 self.logger.error(
4694 logging_text
4695 + created_tasks_info[task]
4696 + " "
4697 + exc_traceback
4698 )
4699 else:
4700 self.logger.debug(
4701 logging_text + created_tasks_info[task] + ": Done"
4702 )
4703 stage[1] = "{}/{}.".format(num_done, num_tasks)
4704 if new_error:
4705 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4706 if nsr_id: # update also nsr
4707 self.update_db_2(
4708 "nsrs",
4709 nsr_id,
4710 {
4711 "errorDescription": "Error at: " + ", ".join(error_list),
4712 "errorDetail": ". ".join(error_detail_list),
4713 },
4714 )
4715 self._write_op_status(nslcmop_id, stage)
4716 return error_detail_list
4717
4718 @staticmethod
4719 def _map_primitive_params(primitive_desc, params, instantiation_params):
4720 """
4721 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4722 The default-value is used. If it is between < > it look for a value at instantiation_params
4723 :param primitive_desc: portion of VNFD/NSD that describes primitive
4724 :param params: Params provided by user
4725 :param instantiation_params: Instantiation params provided by user
4726 :return: a dictionary with the calculated params
4727 """
4728 calculated_params = {}
4729 for parameter in primitive_desc.get("parameter", ()):
4730 param_name = parameter["name"]
4731 if param_name in params:
4732 calculated_params[param_name] = params[param_name]
4733 elif "default-value" in parameter or "value" in parameter:
4734 if "value" in parameter:
4735 calculated_params[param_name] = parameter["value"]
4736 else:
4737 calculated_params[param_name] = parameter["default-value"]
4738 if (
4739 isinstance(calculated_params[param_name], str)
4740 and calculated_params[param_name].startswith("<")
4741 and calculated_params[param_name].endswith(">")
4742 ):
4743 if calculated_params[param_name][1:-1] in instantiation_params:
4744 calculated_params[param_name] = instantiation_params[
4745 calculated_params[param_name][1:-1]
4746 ]
4747 else:
4748 raise LcmException(
4749 "Parameter {} needed to execute primitive {} not provided".format(
4750 calculated_params[param_name], primitive_desc["name"]
4751 )
4752 )
4753 else:
4754 raise LcmException(
4755 "Parameter {} needed to execute primitive {} not provided".format(
4756 param_name, primitive_desc["name"]
4757 )
4758 )
4759
4760 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4761 calculated_params[param_name] = yaml.safe_dump(
4762 calculated_params[param_name], default_flow_style=True, width=256
4763 )
4764 elif isinstance(calculated_params[param_name], str) and calculated_params[
4765 param_name
4766 ].startswith("!!yaml "):
4767 calculated_params[param_name] = calculated_params[param_name][7:]
4768 if parameter.get("data-type") == "INTEGER":
4769 try:
4770 calculated_params[param_name] = int(calculated_params[param_name])
4771 except ValueError: # error converting string to int
4772 raise LcmException(
4773 "Parameter {} of primitive {} must be integer".format(
4774 param_name, primitive_desc["name"]
4775 )
4776 )
4777 elif parameter.get("data-type") == "BOOLEAN":
4778 calculated_params[param_name] = not (
4779 (str(calculated_params[param_name])).lower() == "false"
4780 )
4781
4782 # add always ns_config_info if primitive name is config
4783 if primitive_desc["name"] == "config":
4784 if "ns_config_info" in instantiation_params:
4785 calculated_params["ns_config_info"] = instantiation_params[
4786 "ns_config_info"
4787 ]
4788 return calculated_params
4789
4790 def _look_for_deployed_vca(
4791 self,
4792 deployed_vca,
4793 member_vnf_index,
4794 vdu_id,
4795 vdu_count_index,
4796 kdu_name=None,
4797 ee_descriptor_id=None,
4798 ):
4799 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4800 for vca in deployed_vca:
4801 if not vca:
4802 continue
4803 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4804 continue
4805 if (
4806 vdu_count_index is not None
4807 and vdu_count_index != vca["vdu_count_index"]
4808 ):
4809 continue
4810 if kdu_name and kdu_name != vca["kdu_name"]:
4811 continue
4812 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4813 continue
4814 break
4815 else:
4816 # vca_deployed not found
4817 raise LcmException(
4818 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4819 " is not deployed".format(
4820 member_vnf_index,
4821 vdu_id,
4822 vdu_count_index,
4823 kdu_name,
4824 ee_descriptor_id,
4825 )
4826 )
4827 # get ee_id
4828 ee_id = vca.get("ee_id")
4829 vca_type = vca.get(
4830 "type", "lxc_proxy_charm"
4831 ) # default value for backward compatibility - proxy charm
4832 if not ee_id:
4833 raise LcmException(
4834 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4835 "execution environment".format(
4836 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4837 )
4838 )
4839 return ee_id, vca_type
4840
4841 async def _ns_execute_primitive(
4842 self,
4843 ee_id,
4844 primitive,
4845 primitive_params,
4846 retries=0,
4847 retries_interval=30,
4848 timeout=None,
4849 vca_type=None,
4850 db_dict=None,
4851 vca_id: str = None,
4852 ) -> (str, str):
4853 try:
4854 if primitive == "config":
4855 primitive_params = {"params": primitive_params}
4856
4857 vca_type = vca_type or "lxc_proxy_charm"
4858
4859 while retries >= 0:
4860 try:
4861 output = await asyncio.wait_for(
4862 self.vca_map[vca_type].exec_primitive(
4863 ee_id=ee_id,
4864 primitive_name=primitive,
4865 params_dict=primitive_params,
4866 progress_timeout=self.timeout_progress_primitive,
4867 total_timeout=self.timeout_primitive,
4868 db_dict=db_dict,
4869 vca_id=vca_id,
4870 vca_type=vca_type,
4871 ),
4872 timeout=timeout or self.timeout_primitive,
4873 )
4874 # execution was OK
4875 break
4876 except asyncio.CancelledError:
4877 raise
4878 except Exception as e: # asyncio.TimeoutError
4879 if isinstance(e, asyncio.TimeoutError):
4880 e = "Timeout"
4881 retries -= 1
4882 if retries >= 0:
4883 self.logger.debug(
4884 "Error executing action {} on {} -> {}".format(
4885 primitive, ee_id, e
4886 )
4887 )
4888 # wait and retry
4889 await asyncio.sleep(retries_interval, loop=self.loop)
4890 else:
4891 return "FAILED", str(e)
4892
4893 return "COMPLETED", output
4894
4895 except (LcmException, asyncio.CancelledError):
4896 raise
4897 except Exception as e:
4898 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4899
4900 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4901 """
4902 Updating the vca_status with latest juju information in nsrs record
4903 :param: nsr_id: Id of the nsr
4904 :param: nslcmop_id: Id of the nslcmop
4905 :return: None
4906 """
4907
4908 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4909 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4910 vca_id = self.get_vca_id({}, db_nsr)
4911 if db_nsr["_admin"]["deployed"]["K8s"]:
4912 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4913 cluster_uuid, kdu_instance, cluster_type = (
4914 k8s["k8scluster-uuid"],
4915 k8s["kdu-instance"],
4916 k8s["k8scluster-type"],
4917 )
4918 await self._on_update_k8s_db(
4919 cluster_uuid=cluster_uuid,
4920 kdu_instance=kdu_instance,
4921 filter={"_id": nsr_id},
4922 vca_id=vca_id,
4923 cluster_type=cluster_type,
4924 )
4925 else:
4926 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4927 table, filter = "nsrs", {"_id": nsr_id}
4928 path = "_admin.deployed.VCA.{}.".format(vca_index)
4929 await self._on_update_n2vc_db(table, filter, path, {})
4930
4931 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4932 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4933
4934 async def action(self, nsr_id, nslcmop_id):
4935 # Try to lock HA task here
4936 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4937 if not task_is_locked_by_me:
4938 return
4939
4940 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4941 self.logger.debug(logging_text + "Enter")
4942 # get all needed from database
4943 db_nsr = None
4944 db_nslcmop = None
4945 db_nsr_update = {}
4946 db_nslcmop_update = {}
4947 nslcmop_operation_state = None
4948 error_description_nslcmop = None
4949 exc = None
4950 try:
4951 # wait for any previous tasks in process
4952 step = "Waiting for previous operations to terminate"
4953 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4954
4955 self._write_ns_status(
4956 nsr_id=nsr_id,
4957 ns_state=None,
4958 current_operation="RUNNING ACTION",
4959 current_operation_id=nslcmop_id,
4960 )
4961
4962 step = "Getting information from database"
4963 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4964 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4965 if db_nslcmop["operationParams"].get("primitive_params"):
4966 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4967 db_nslcmop["operationParams"]["primitive_params"]
4968 )
4969
4970 nsr_deployed = db_nsr["_admin"].get("deployed")
4971 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4972 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4973 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4974 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4975 primitive = db_nslcmop["operationParams"]["primitive"]
4976 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4977 timeout_ns_action = db_nslcmop["operationParams"].get(
4978 "timeout_ns_action", self.timeout_primitive
4979 )
4980
4981 if vnf_index:
4982 step = "Getting vnfr from database"
4983 db_vnfr = self.db.get_one(
4984 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4985 )
4986 if db_vnfr.get("kdur"):
4987 kdur_list = []
4988 for kdur in db_vnfr["kdur"]:
4989 if kdur.get("additionalParams"):
4990 kdur["additionalParams"] = json.loads(
4991 kdur["additionalParams"]
4992 )
4993 kdur_list.append(kdur)
4994 db_vnfr["kdur"] = kdur_list
4995 step = "Getting vnfd from database"
4996 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4997 else:
4998 step = "Getting nsd from database"
4999 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5000
5001 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5002 # for backward compatibility
5003 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5004 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5005 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5006 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5007
5008 # look for primitive
5009 config_primitive_desc = descriptor_configuration = None
5010 if vdu_id:
5011 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5012 elif kdu_name:
5013 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5014 elif vnf_index:
5015 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5016 else:
5017 descriptor_configuration = db_nsd.get("ns-configuration")
5018
5019 if descriptor_configuration and descriptor_configuration.get(
5020 "config-primitive"
5021 ):
5022 for config_primitive in descriptor_configuration["config-primitive"]:
5023 if config_primitive["name"] == primitive:
5024 config_primitive_desc = config_primitive
5025 break
5026
5027 if not config_primitive_desc:
5028 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5029 raise LcmException(
5030 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5031 primitive
5032 )
5033 )
5034 primitive_name = primitive
5035 ee_descriptor_id = None
5036 else:
5037 primitive_name = config_primitive_desc.get(
5038 "execution-environment-primitive", primitive
5039 )
5040 ee_descriptor_id = config_primitive_desc.get(
5041 "execution-environment-ref"
5042 )
5043
5044 if vnf_index:
5045 if vdu_id:
5046 vdur = next(
5047 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5048 )
5049 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5050 elif kdu_name:
5051 kdur = next(
5052 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5053 )
5054 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5055 else:
5056 desc_params = parse_yaml_strings(
5057 db_vnfr.get("additionalParamsForVnf")
5058 )
5059 else:
5060 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5061 if kdu_name and get_configuration(db_vnfd, kdu_name):
5062 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5063 actions = set()
5064 for primitive in kdu_configuration.get("initial-config-primitive", []):
5065 actions.add(primitive["name"])
5066 for primitive in kdu_configuration.get("config-primitive", []):
5067 actions.add(primitive["name"])
5068 kdu_action = True if primitive_name in actions else False
5069
5070 # TODO check if ns is in a proper status
5071 if kdu_name and (
5072 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5073 ):
5074 # kdur and desc_params already set from before
5075 if primitive_params:
5076 desc_params.update(primitive_params)
5077 # TODO Check if we will need something at vnf level
5078 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5079 if (
5080 kdu_name == kdu["kdu-name"]
5081 and kdu["member-vnf-index"] == vnf_index
5082 ):
5083 break
5084 else:
5085 raise LcmException(
5086 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5087 )
5088
5089 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5090 msg = "unknown k8scluster-type '{}'".format(
5091 kdu.get("k8scluster-type")
5092 )
5093 raise LcmException(msg)
5094
5095 db_dict = {
5096 "collection": "nsrs",
5097 "filter": {"_id": nsr_id},
5098 "path": "_admin.deployed.K8s.{}".format(index),
5099 }
5100 self.logger.debug(
5101 logging_text
5102 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5103 )
5104 step = "Executing kdu {}".format(primitive_name)
5105 if primitive_name == "upgrade":
5106 if desc_params.get("kdu_model"):
5107 kdu_model = desc_params.get("kdu_model")
5108 del desc_params["kdu_model"]
5109 else:
5110 kdu_model = kdu.get("kdu-model")
5111 parts = kdu_model.split(sep=":")
5112 if len(parts) == 2:
5113 kdu_model = parts[0]
5114
5115 detailed_status = await asyncio.wait_for(
5116 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5117 cluster_uuid=kdu.get("k8scluster-uuid"),
5118 kdu_instance=kdu.get("kdu-instance"),
5119 atomic=True,
5120 kdu_model=kdu_model,
5121 params=desc_params,
5122 db_dict=db_dict,
5123 timeout=timeout_ns_action,
5124 ),
5125 timeout=timeout_ns_action + 10,
5126 )
5127 self.logger.debug(
5128 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5129 )
5130 elif primitive_name == "rollback":
5131 detailed_status = await asyncio.wait_for(
5132 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5133 cluster_uuid=kdu.get("k8scluster-uuid"),
5134 kdu_instance=kdu.get("kdu-instance"),
5135 db_dict=db_dict,
5136 ),
5137 timeout=timeout_ns_action,
5138 )
5139 elif primitive_name == "status":
5140 detailed_status = await asyncio.wait_for(
5141 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5142 cluster_uuid=kdu.get("k8scluster-uuid"),
5143 kdu_instance=kdu.get("kdu-instance"),
5144 vca_id=vca_id,
5145 ),
5146 timeout=timeout_ns_action,
5147 )
5148 else:
5149 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5150 kdu["kdu-name"], nsr_id
5151 )
5152 params = self._map_primitive_params(
5153 config_primitive_desc, primitive_params, desc_params
5154 )
5155
5156 detailed_status = await asyncio.wait_for(
5157 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5158 cluster_uuid=kdu.get("k8scluster-uuid"),
5159 kdu_instance=kdu_instance,
5160 primitive_name=primitive_name,
5161 params=params,
5162 db_dict=db_dict,
5163 timeout=timeout_ns_action,
5164 vca_id=vca_id,
5165 ),
5166 timeout=timeout_ns_action,
5167 )
5168
5169 if detailed_status:
5170 nslcmop_operation_state = "COMPLETED"
5171 else:
5172 detailed_status = ""
5173 nslcmop_operation_state = "FAILED"
5174 else:
5175 ee_id, vca_type = self._look_for_deployed_vca(
5176 nsr_deployed["VCA"],
5177 member_vnf_index=vnf_index,
5178 vdu_id=vdu_id,
5179 vdu_count_index=vdu_count_index,
5180 ee_descriptor_id=ee_descriptor_id,
5181 )
5182 for vca_index, vca_deployed in enumerate(
5183 db_nsr["_admin"]["deployed"]["VCA"]
5184 ):
5185 if vca_deployed.get("member-vnf-index") == vnf_index:
5186 db_dict = {
5187 "collection": "nsrs",
5188 "filter": {"_id": nsr_id},
5189 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5190 }
5191 break
5192 (
5193 nslcmop_operation_state,
5194 detailed_status,
5195 ) = await self._ns_execute_primitive(
5196 ee_id,
5197 primitive=primitive_name,
5198 primitive_params=self._map_primitive_params(
5199 config_primitive_desc, primitive_params, desc_params
5200 ),
5201 timeout=timeout_ns_action,
5202 vca_type=vca_type,
5203 db_dict=db_dict,
5204 vca_id=vca_id,
5205 )
5206
5207 db_nslcmop_update["detailed-status"] = detailed_status
5208 error_description_nslcmop = (
5209 detailed_status if nslcmop_operation_state == "FAILED" else ""
5210 )
5211 self.logger.debug(
5212 logging_text
5213 + " task Done with result {} {}".format(
5214 nslcmop_operation_state, detailed_status
5215 )
5216 )
5217 return # database update is called inside finally
5218
5219 except (DbException, LcmException, N2VCException, K8sException) as e:
5220 self.logger.error(logging_text + "Exit Exception {}".format(e))
5221 exc = e
5222 except asyncio.CancelledError:
5223 self.logger.error(
5224 logging_text + "Cancelled Exception while '{}'".format(step)
5225 )
5226 exc = "Operation was cancelled"
5227 except asyncio.TimeoutError:
5228 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5229 exc = "Timeout"
5230 except Exception as e:
5231 exc = traceback.format_exc()
5232 self.logger.critical(
5233 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5234 exc_info=True,
5235 )
5236 finally:
5237 if exc:
5238 db_nslcmop_update[
5239 "detailed-status"
5240 ] = (
5241 detailed_status
5242 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5243 nslcmop_operation_state = "FAILED"
5244 if db_nsr:
5245 self._write_ns_status(
5246 nsr_id=nsr_id,
5247 ns_state=db_nsr[
5248 "nsState"
5249 ], # TODO check if degraded. For the moment use previous status
5250 current_operation="IDLE",
5251 current_operation_id=None,
5252 # error_description=error_description_nsr,
5253 # error_detail=error_detail,
5254 other_update=db_nsr_update,
5255 )
5256
5257 self._write_op_status(
5258 op_id=nslcmop_id,
5259 stage="",
5260 error_message=error_description_nslcmop,
5261 operation_state=nslcmop_operation_state,
5262 other_update=db_nslcmop_update,
5263 )
5264
5265 if nslcmop_operation_state:
5266 try:
5267 await self.msg.aiowrite(
5268 "ns",
5269 "actioned",
5270 {
5271 "nsr_id": nsr_id,
5272 "nslcmop_id": nslcmop_id,
5273 "operationState": nslcmop_operation_state,
5274 },
5275 loop=self.loop,
5276 )
5277 except Exception as e:
5278 self.logger.error(
5279 logging_text + "kafka_write notification Exception {}".format(e)
5280 )
5281 self.logger.debug(logging_text + "Exit")
5282 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5283 return nslcmop_operation_state, detailed_status
5284
5285 async def scale(self, nsr_id, nslcmop_id):
5286 # Try to lock HA task here
5287 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5288 if not task_is_locked_by_me:
5289 return
5290
5291 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5292 stage = ["", "", ""]
5293 tasks_dict_info = {}
5294 # ^ stage, step, VIM progress
5295 self.logger.debug(logging_text + "Enter")
5296 # get all needed from database
5297 db_nsr = None
5298 db_nslcmop_update = {}
5299 db_nsr_update = {}
5300 exc = None
5301 # in case of error, indicates what part of scale was failed to put nsr at error status
5302 scale_process = None
5303 old_operational_status = ""
5304 old_config_status = ""
5305 nsi_id = None
5306 try:
5307 # wait for any previous tasks in process
5308 step = "Waiting for previous operations to terminate"
5309 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5310 self._write_ns_status(
5311 nsr_id=nsr_id,
5312 ns_state=None,
5313 current_operation="SCALING",
5314 current_operation_id=nslcmop_id,
5315 )
5316
5317 step = "Getting nslcmop from database"
5318 self.logger.debug(
5319 step + " after having waited for previous tasks to be completed"
5320 )
5321 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5322
5323 step = "Getting nsr from database"
5324 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5325 old_operational_status = db_nsr["operational-status"]
5326 old_config_status = db_nsr["config-status"]
5327
5328 step = "Parsing scaling parameters"
5329 db_nsr_update["operational-status"] = "scaling"
5330 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5331 nsr_deployed = db_nsr["_admin"].get("deployed")
5332
5333 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5334 "scaleByStepData"
5335 ]["member-vnf-index"]
5336 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5337 "scaleByStepData"
5338 ]["scaling-group-descriptor"]
5339 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5340 # for backward compatibility
5341 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5342 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5343 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5344 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5345
5346 step = "Getting vnfr from database"
5347 db_vnfr = self.db.get_one(
5348 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5349 )
5350
5351 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5352
5353 step = "Getting vnfd from database"
5354 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5355
5356 base_folder = db_vnfd["_admin"]["storage"]
5357
5358 step = "Getting scaling-group-descriptor"
5359 scaling_descriptor = find_in_list(
5360 get_scaling_aspect(db_vnfd),
5361 lambda scale_desc: scale_desc["name"] == scaling_group,
5362 )
5363 if not scaling_descriptor:
5364 raise LcmException(
5365 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5366 "at vnfd:scaling-group-descriptor".format(scaling_group)
5367 )
5368
5369 step = "Sending scale order to VIM"
5370 # TODO check if ns is in a proper status
5371 nb_scale_op = 0
5372 if not db_nsr["_admin"].get("scaling-group"):
5373 self.update_db_2(
5374 "nsrs",
5375 nsr_id,
5376 {
5377 "_admin.scaling-group": [
5378 {"name": scaling_group, "nb-scale-op": 0}
5379 ]
5380 },
5381 )
5382 admin_scale_index = 0
5383 else:
5384 for admin_scale_index, admin_scale_info in enumerate(
5385 db_nsr["_admin"]["scaling-group"]
5386 ):
5387 if admin_scale_info["name"] == scaling_group:
5388 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5389 break
5390 else: # not found, set index one plus last element and add new entry with the name
5391 admin_scale_index += 1
5392 db_nsr_update[
5393 "_admin.scaling-group.{}.name".format(admin_scale_index)
5394 ] = scaling_group
5395
5396 vca_scaling_info = []
5397 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5398 if scaling_type == "SCALE_OUT":
5399 if "aspect-delta-details" not in scaling_descriptor:
5400 raise LcmException(
5401 "Aspect delta details not fount in scaling descriptor {}".format(
5402 scaling_descriptor["name"]
5403 )
5404 )
5405 # count if max-instance-count is reached
5406 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5407
5408 scaling_info["scaling_direction"] = "OUT"
5409 scaling_info["vdu-create"] = {}
5410 scaling_info["kdu-create"] = {}
5411 for delta in deltas:
5412 for vdu_delta in delta.get("vdu-delta", {}):
5413 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5414 # vdu_index also provides the number of instance of the targeted vdu
5415 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5416 cloud_init_text = self._get_vdu_cloud_init_content(
5417 vdud, db_vnfd
5418 )
5419 if cloud_init_text:
5420 additional_params = (
5421 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5422 or {}
5423 )
5424 cloud_init_list = []
5425
5426 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5427 max_instance_count = 10
5428 if vdu_profile and "max-number-of-instances" in vdu_profile:
5429 max_instance_count = vdu_profile.get(
5430 "max-number-of-instances", 10
5431 )
5432
5433 default_instance_num = get_number_of_instances(
5434 db_vnfd, vdud["id"]
5435 )
5436 instances_number = vdu_delta.get("number-of-instances", 1)
5437 nb_scale_op += instances_number
5438
5439 new_instance_count = nb_scale_op + default_instance_num
5440 # Control if new count is over max and vdu count is less than max.
5441 # Then assign new instance count
5442 if new_instance_count > max_instance_count > vdu_count:
5443 instances_number = new_instance_count - max_instance_count
5444 else:
5445 instances_number = instances_number
5446
5447 if new_instance_count > max_instance_count:
5448 raise LcmException(
5449 "reached the limit of {} (max-instance-count) "
5450 "scaling-out operations for the "
5451 "scaling-group-descriptor '{}'".format(
5452 nb_scale_op, scaling_group
5453 )
5454 )
5455 for x in range(vdu_delta.get("number-of-instances", 1)):
5456 if cloud_init_text:
5457 # TODO Information of its own ip is not available because db_vnfr is not updated.
5458 additional_params["OSM"] = get_osm_params(
5459 db_vnfr, vdu_delta["id"], vdu_index + x
5460 )
5461 cloud_init_list.append(
5462 self._parse_cloud_init(
5463 cloud_init_text,
5464 additional_params,
5465 db_vnfd["id"],
5466 vdud["id"],
5467 )
5468 )
5469 vca_scaling_info.append(
5470 {
5471 "osm_vdu_id": vdu_delta["id"],
5472 "member-vnf-index": vnf_index,
5473 "type": "create",
5474 "vdu_index": vdu_index + x,
5475 }
5476 )
5477 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5478 for kdu_delta in delta.get("kdu-resource-delta", {}):
5479 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5480 kdu_name = kdu_profile["kdu-name"]
5481 resource_name = kdu_profile["resource-name"]
5482
5483 # Might have different kdus in the same delta
5484 # Should have list for each kdu
5485 if not scaling_info["kdu-create"].get(kdu_name, None):
5486 scaling_info["kdu-create"][kdu_name] = []
5487
5488 kdur = get_kdur(db_vnfr, kdu_name)
5489 if kdur.get("helm-chart"):
5490 k8s_cluster_type = "helm-chart-v3"
5491 self.logger.debug("kdur: {}".format(kdur))
5492 if (
5493 kdur.get("helm-version")
5494 and kdur.get("helm-version") == "v2"
5495 ):
5496 k8s_cluster_type = "helm-chart"
5497 raise NotImplementedError
5498 elif kdur.get("juju-bundle"):
5499 k8s_cluster_type = "juju-bundle"
5500 else:
5501 raise LcmException(
5502 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5503 "juju-bundle. Maybe an old NBI version is running".format(
5504 db_vnfr["member-vnf-index-ref"], kdu_name
5505 )
5506 )
5507
5508 max_instance_count = 10
5509 if kdu_profile and "max-number-of-instances" in kdu_profile:
5510 max_instance_count = kdu_profile.get(
5511 "max-number-of-instances", 10
5512 )
5513
5514 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5515 deployed_kdu, _ = get_deployed_kdu(
5516 nsr_deployed, kdu_name, vnf_index
5517 )
5518 if deployed_kdu is None:
5519 raise LcmException(
5520 "KDU '{}' for vnf '{}' not deployed".format(
5521 kdu_name, vnf_index
5522 )
5523 )
5524 kdu_instance = deployed_kdu.get("kdu-instance")
5525 instance_num = await self.k8scluster_map[
5526 k8s_cluster_type
5527 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5528 kdu_replica_count = instance_num + kdu_delta.get(
5529 "number-of-instances", 1
5530 )
5531
5532 # Control if new count is over max and instance_num is less than max.
5533 # Then assign max instance number to kdu replica count
5534 if kdu_replica_count > max_instance_count > instance_num:
5535 kdu_replica_count = max_instance_count
5536 if kdu_replica_count > max_instance_count:
5537 raise LcmException(
5538 "reached the limit of {} (max-instance-count) "
5539 "scaling-out operations for the "
5540 "scaling-group-descriptor '{}'".format(
5541 instance_num, scaling_group
5542 )
5543 )
5544
5545 for x in range(kdu_delta.get("number-of-instances", 1)):
5546 vca_scaling_info.append(
5547 {
5548 "osm_kdu_id": kdu_name,
5549 "member-vnf-index": vnf_index,
5550 "type": "create",
5551 "kdu_index": instance_num + x - 1,
5552 }
5553 )
5554 scaling_info["kdu-create"][kdu_name].append(
5555 {
5556 "member-vnf-index": vnf_index,
5557 "type": "create",
5558 "k8s-cluster-type": k8s_cluster_type,
5559 "resource-name": resource_name,
5560 "scale": kdu_replica_count,
5561 }
5562 )
5563 elif scaling_type == "SCALE_IN":
5564 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5565
5566 scaling_info["scaling_direction"] = "IN"
5567 scaling_info["vdu-delete"] = {}
5568 scaling_info["kdu-delete"] = {}
5569
5570 for delta in deltas:
5571 for vdu_delta in delta.get("vdu-delta", {}):
5572 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5573 min_instance_count = 0
5574 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5575 if vdu_profile and "min-number-of-instances" in vdu_profile:
5576 min_instance_count = vdu_profile["min-number-of-instances"]
5577
5578 default_instance_num = get_number_of_instances(
5579 db_vnfd, vdu_delta["id"]
5580 )
5581 instance_num = vdu_delta.get("number-of-instances", 1)
5582 nb_scale_op -= instance_num
5583
5584 new_instance_count = nb_scale_op + default_instance_num
5585
5586 if new_instance_count < min_instance_count < vdu_count:
5587 instances_number = min_instance_count - new_instance_count
5588 else:
5589 instances_number = instance_num
5590
5591 if new_instance_count < min_instance_count:
5592 raise LcmException(
5593 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5594 "scaling-group-descriptor '{}'".format(
5595 nb_scale_op, scaling_group
5596 )
5597 )
5598 for x in range(vdu_delta.get("number-of-instances", 1)):
5599 vca_scaling_info.append(
5600 {
5601 "osm_vdu_id": vdu_delta["id"],
5602 "member-vnf-index": vnf_index,
5603 "type": "delete",
5604 "vdu_index": vdu_index - 1 - x,
5605 }
5606 )
5607 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5608 for kdu_delta in delta.get("kdu-resource-delta", {}):
5609 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5610 kdu_name = kdu_profile["kdu-name"]
5611 resource_name = kdu_profile["resource-name"]
5612
5613 if not scaling_info["kdu-delete"].get(kdu_name, None):
5614 scaling_info["kdu-delete"][kdu_name] = []
5615
5616 kdur = get_kdur(db_vnfr, kdu_name)
5617 if kdur.get("helm-chart"):
5618 k8s_cluster_type = "helm-chart-v3"
5619 self.logger.debug("kdur: {}".format(kdur))
5620 if (
5621 kdur.get("helm-version")
5622 and kdur.get("helm-version") == "v2"
5623 ):
5624 k8s_cluster_type = "helm-chart"
5625 raise NotImplementedError
5626 elif kdur.get("juju-bundle"):
5627 k8s_cluster_type = "juju-bundle"
5628 else:
5629 raise LcmException(
5630 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5631 "juju-bundle. Maybe an old NBI version is running".format(
5632 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5633 )
5634 )
5635
5636 min_instance_count = 0
5637 if kdu_profile and "min-number-of-instances" in kdu_profile:
5638 min_instance_count = kdu_profile["min-number-of-instances"]
5639
5640 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5641 deployed_kdu, _ = get_deployed_kdu(
5642 nsr_deployed, kdu_name, vnf_index
5643 )
5644 if deployed_kdu is None:
5645 raise LcmException(
5646 "KDU '{}' for vnf '{}' not deployed".format(
5647 kdu_name, vnf_index
5648 )
5649 )
5650 kdu_instance = deployed_kdu.get("kdu-instance")
5651 instance_num = await self.k8scluster_map[
5652 k8s_cluster_type
5653 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5654 kdu_replica_count = instance_num - kdu_delta.get(
5655 "number-of-instances", 1
5656 )
5657
5658 if kdu_replica_count < min_instance_count < instance_num:
5659 kdu_replica_count = min_instance_count
5660 if kdu_replica_count < min_instance_count:
5661 raise LcmException(
5662 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5663 "scaling-group-descriptor '{}'".format(
5664 instance_num, scaling_group
5665 )
5666 )
5667
5668 for x in range(kdu_delta.get("number-of-instances", 1)):
5669 vca_scaling_info.append(
5670 {
5671 "osm_kdu_id": kdu_name,
5672 "member-vnf-index": vnf_index,
5673 "type": "delete",
5674 "kdu_index": instance_num - x - 1,
5675 }
5676 )
5677 scaling_info["kdu-delete"][kdu_name].append(
5678 {
5679 "member-vnf-index": vnf_index,
5680 "type": "delete",
5681 "k8s-cluster-type": k8s_cluster_type,
5682 "resource-name": resource_name,
5683 "scale": kdu_replica_count,
5684 }
5685 )
5686
5687 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5688 vdu_delete = copy(scaling_info.get("vdu-delete"))
5689 if scaling_info["scaling_direction"] == "IN":
5690 for vdur in reversed(db_vnfr["vdur"]):
5691 if vdu_delete.get(vdur["vdu-id-ref"]):
5692 vdu_delete[vdur["vdu-id-ref"]] -= 1
5693 scaling_info["vdu"].append(
5694 {
5695 "name": vdur.get("name") or vdur.get("vdu-name"),
5696 "vdu_id": vdur["vdu-id-ref"],
5697 "interface": [],
5698 }
5699 )
5700 for interface in vdur["interfaces"]:
5701 scaling_info["vdu"][-1]["interface"].append(
5702 {
5703 "name": interface["name"],
5704 "ip_address": interface["ip-address"],
5705 "mac_address": interface.get("mac-address"),
5706 }
5707 )
5708 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5709
5710 # PRE-SCALE BEGIN
5711 step = "Executing pre-scale vnf-config-primitive"
5712 if scaling_descriptor.get("scaling-config-action"):
5713 for scaling_config_action in scaling_descriptor[
5714 "scaling-config-action"
5715 ]:
5716 if (
5717 scaling_config_action.get("trigger") == "pre-scale-in"
5718 and scaling_type == "SCALE_IN"
5719 ) or (
5720 scaling_config_action.get("trigger") == "pre-scale-out"
5721 and scaling_type == "SCALE_OUT"
5722 ):
5723 vnf_config_primitive = scaling_config_action[
5724 "vnf-config-primitive-name-ref"
5725 ]
5726 step = db_nslcmop_update[
5727 "detailed-status"
5728 ] = "executing pre-scale scaling-config-action '{}'".format(
5729 vnf_config_primitive
5730 )
5731
5732 # look for primitive
5733 for config_primitive in (
5734 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5735 ).get("config-primitive", ()):
5736 if config_primitive["name"] == vnf_config_primitive:
5737 break
5738 else:
5739 raise LcmException(
5740 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5741 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5742 "primitive".format(scaling_group, vnf_config_primitive)
5743 )
5744
5745 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5746 if db_vnfr.get("additionalParamsForVnf"):
5747 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5748
5749 scale_process = "VCA"
5750 db_nsr_update["config-status"] = "configuring pre-scaling"
5751 primitive_params = self._map_primitive_params(
5752 config_primitive, {}, vnfr_params
5753 )
5754
5755 # Pre-scale retry check: Check if this sub-operation has been executed before
5756 op_index = self._check_or_add_scale_suboperation(
5757 db_nslcmop,
5758 vnf_index,
5759 vnf_config_primitive,
5760 primitive_params,
5761 "PRE-SCALE",
5762 )
5763 if op_index == self.SUBOPERATION_STATUS_SKIP:
5764 # Skip sub-operation
5765 result = "COMPLETED"
5766 result_detail = "Done"
5767 self.logger.debug(
5768 logging_text
5769 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5770 vnf_config_primitive, result, result_detail
5771 )
5772 )
5773 else:
5774 if op_index == self.SUBOPERATION_STATUS_NEW:
5775 # New sub-operation: Get index of this sub-operation
5776 op_index = (
5777 len(db_nslcmop.get("_admin", {}).get("operations"))
5778 - 1
5779 )
5780 self.logger.debug(
5781 logging_text
5782 + "vnf_config_primitive={} New sub-operation".format(
5783 vnf_config_primitive
5784 )
5785 )
5786 else:
5787 # retry: Get registered params for this existing sub-operation
5788 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5789 op_index
5790 ]
5791 vnf_index = op.get("member_vnf_index")
5792 vnf_config_primitive = op.get("primitive")
5793 primitive_params = op.get("primitive_params")
5794 self.logger.debug(
5795 logging_text
5796 + "vnf_config_primitive={} Sub-operation retry".format(
5797 vnf_config_primitive
5798 )
5799 )
5800 # Execute the primitive, either with new (first-time) or registered (reintent) args
5801 ee_descriptor_id = config_primitive.get(
5802 "execution-environment-ref"
5803 )
5804 primitive_name = config_primitive.get(
5805 "execution-environment-primitive", vnf_config_primitive
5806 )
5807 ee_id, vca_type = self._look_for_deployed_vca(
5808 nsr_deployed["VCA"],
5809 member_vnf_index=vnf_index,
5810 vdu_id=None,
5811 vdu_count_index=None,
5812 ee_descriptor_id=ee_descriptor_id,
5813 )
5814 result, result_detail = await self._ns_execute_primitive(
5815 ee_id,
5816 primitive_name,
5817 primitive_params,
5818 vca_type=vca_type,
5819 vca_id=vca_id,
5820 )
5821 self.logger.debug(
5822 logging_text
5823 + "vnf_config_primitive={} Done with result {} {}".format(
5824 vnf_config_primitive, result, result_detail
5825 )
5826 )
5827 # Update operationState = COMPLETED | FAILED
5828 self._update_suboperation_status(
5829 db_nslcmop, op_index, result, result_detail
5830 )
5831
5832 if result == "FAILED":
5833 raise LcmException(result_detail)
5834 db_nsr_update["config-status"] = old_config_status
5835 scale_process = None
5836 # PRE-SCALE END
5837
5838 db_nsr_update[
5839 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5840 ] = nb_scale_op
5841 db_nsr_update[
5842 "_admin.scaling-group.{}.time".format(admin_scale_index)
5843 ] = time()
5844
5845 # SCALE-IN VCA - BEGIN
5846 if vca_scaling_info:
5847 step = db_nslcmop_update[
5848 "detailed-status"
5849 ] = "Deleting the execution environments"
5850 scale_process = "VCA"
5851 for vca_info in vca_scaling_info:
5852 if vca_info["type"] == "delete":
5853 member_vnf_index = str(vca_info["member-vnf-index"])
5854 self.logger.debug(
5855 logging_text + "vdu info: {}".format(vca_info)
5856 )
5857 if vca_info.get("osm_vdu_id"):
5858 vdu_id = vca_info["osm_vdu_id"]
5859 vdu_index = int(vca_info["vdu_index"])
5860 stage[
5861 1
5862 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5863 member_vnf_index, vdu_id, vdu_index
5864 )
5865 else:
5866 vdu_index = 0
5867 kdu_id = vca_info["osm_kdu_id"]
5868 stage[
5869 1
5870 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5871 member_vnf_index, kdu_id, vdu_index
5872 )
5873 stage[2] = step = "Scaling in VCA"
5874 self._write_op_status(op_id=nslcmop_id, stage=stage)
5875 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5876 config_update = db_nsr["configurationStatus"]
5877 for vca_index, vca in enumerate(vca_update):
5878 if (
5879 (vca or vca.get("ee_id"))
5880 and vca["member-vnf-index"] == member_vnf_index
5881 and vca["vdu_count_index"] == vdu_index
5882 ):
5883 if vca.get("vdu_id"):
5884 config_descriptor = get_configuration(
5885 db_vnfd, vca.get("vdu_id")
5886 )
5887 elif vca.get("kdu_name"):
5888 config_descriptor = get_configuration(
5889 db_vnfd, vca.get("kdu_name")
5890 )
5891 else:
5892 config_descriptor = get_configuration(
5893 db_vnfd, db_vnfd["id"]
5894 )
5895 operation_params = (
5896 db_nslcmop.get("operationParams") or {}
5897 )
5898 exec_terminate_primitives = not operation_params.get(
5899 "skip_terminate_primitives"
5900 ) and vca.get("needed_terminate")
5901 task = asyncio.ensure_future(
5902 asyncio.wait_for(
5903 self.destroy_N2VC(
5904 logging_text,
5905 db_nslcmop,
5906 vca,
5907 config_descriptor,
5908 vca_index,
5909 destroy_ee=True,
5910 exec_primitives=exec_terminate_primitives,
5911 scaling_in=True,
5912 vca_id=vca_id,
5913 ),
5914 timeout=self.timeout_charm_delete,
5915 )
5916 )
5917 tasks_dict_info[task] = "Terminating VCA {}".format(
5918 vca.get("ee_id")
5919 )
5920 del vca_update[vca_index]
5921 del config_update[vca_index]
5922 # wait for pending tasks of terminate primitives
5923 if tasks_dict_info:
5924 self.logger.debug(
5925 logging_text
5926 + "Waiting for tasks {}".format(
5927 list(tasks_dict_info.keys())
5928 )
5929 )
5930 error_list = await self._wait_for_tasks(
5931 logging_text,
5932 tasks_dict_info,
5933 min(
5934 self.timeout_charm_delete, self.timeout_ns_terminate
5935 ),
5936 stage,
5937 nslcmop_id,
5938 )
5939 tasks_dict_info.clear()
5940 if error_list:
5941 raise LcmException("; ".join(error_list))
5942
5943 db_vca_and_config_update = {
5944 "_admin.deployed.VCA": vca_update,
5945 "configurationStatus": config_update,
5946 }
5947 self.update_db_2(
5948 "nsrs", db_nsr["_id"], db_vca_and_config_update
5949 )
5950 scale_process = None
5951 # SCALE-IN VCA - END
5952
5953 # SCALE RO - BEGIN
5954 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5955 scale_process = "RO"
5956 if self.ro_config.get("ng"):
5957 await self._scale_ng_ro(
5958 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5959 )
5960 scaling_info.pop("vdu-create", None)
5961 scaling_info.pop("vdu-delete", None)
5962
5963 scale_process = None
5964 # SCALE RO - END
5965
5966 # SCALE KDU - BEGIN
5967 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5968 scale_process = "KDU"
5969 await self._scale_kdu(
5970 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5971 )
5972 scaling_info.pop("kdu-create", None)
5973 scaling_info.pop("kdu-delete", None)
5974
5975 scale_process = None
5976 # SCALE KDU - END
5977
5978 if db_nsr_update:
5979 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5980
5981 # SCALE-UP VCA - BEGIN
5982 if vca_scaling_info:
5983 step = db_nslcmop_update[
5984 "detailed-status"
5985 ] = "Creating new execution environments"
5986 scale_process = "VCA"
5987 for vca_info in vca_scaling_info:
5988 if vca_info["type"] == "create":
5989 member_vnf_index = str(vca_info["member-vnf-index"])
5990 self.logger.debug(
5991 logging_text + "vdu info: {}".format(vca_info)
5992 )
5993 vnfd_id = db_vnfr["vnfd-ref"]
5994 if vca_info.get("osm_vdu_id"):
5995 vdu_index = int(vca_info["vdu_index"])
5996 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5997 if db_vnfr.get("additionalParamsForVnf"):
5998 deploy_params.update(
5999 parse_yaml_strings(
6000 db_vnfr["additionalParamsForVnf"].copy()
6001 )
6002 )
6003 descriptor_config = get_configuration(
6004 db_vnfd, db_vnfd["id"]
6005 )
6006 if descriptor_config:
6007 vdu_id = None
6008 vdu_name = None
6009 kdu_name = None
6010 self._deploy_n2vc(
6011 logging_text=logging_text
6012 + "member_vnf_index={} ".format(member_vnf_index),
6013 db_nsr=db_nsr,
6014 db_vnfr=db_vnfr,
6015 nslcmop_id=nslcmop_id,
6016 nsr_id=nsr_id,
6017 nsi_id=nsi_id,
6018 vnfd_id=vnfd_id,
6019 vdu_id=vdu_id,
6020 kdu_name=kdu_name,
6021 member_vnf_index=member_vnf_index,
6022 vdu_index=vdu_index,
6023 vdu_name=vdu_name,
6024 deploy_params=deploy_params,
6025 descriptor_config=descriptor_config,
6026 base_folder=base_folder,
6027 task_instantiation_info=tasks_dict_info,
6028 stage=stage,
6029 )
6030 vdu_id = vca_info["osm_vdu_id"]
6031 vdur = find_in_list(
6032 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6033 )
6034 descriptor_config = get_configuration(db_vnfd, vdu_id)
6035 if vdur.get("additionalParams"):
6036 deploy_params_vdu = parse_yaml_strings(
6037 vdur["additionalParams"]
6038 )
6039 else:
6040 deploy_params_vdu = deploy_params
6041 deploy_params_vdu["OSM"] = get_osm_params(
6042 db_vnfr, vdu_id, vdu_count_index=vdu_index
6043 )
6044 if descriptor_config:
6045 vdu_name = None
6046 kdu_name = None
6047 stage[
6048 1
6049 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6050 member_vnf_index, vdu_id, vdu_index
6051 )
6052 stage[2] = step = "Scaling out VCA"
6053 self._write_op_status(op_id=nslcmop_id, stage=stage)
6054 self._deploy_n2vc(
6055 logging_text=logging_text
6056 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6057 member_vnf_index, vdu_id, vdu_index
6058 ),
6059 db_nsr=db_nsr,
6060 db_vnfr=db_vnfr,
6061 nslcmop_id=nslcmop_id,
6062 nsr_id=nsr_id,
6063 nsi_id=nsi_id,
6064 vnfd_id=vnfd_id,
6065 vdu_id=vdu_id,
6066 kdu_name=kdu_name,
6067 member_vnf_index=member_vnf_index,
6068 vdu_index=vdu_index,
6069 vdu_name=vdu_name,
6070 deploy_params=deploy_params_vdu,
6071 descriptor_config=descriptor_config,
6072 base_folder=base_folder,
6073 task_instantiation_info=tasks_dict_info,
6074 stage=stage,
6075 )
6076 else:
6077 kdu_name = vca_info["osm_kdu_id"]
6078 descriptor_config = get_configuration(db_vnfd, kdu_name)
6079 if descriptor_config:
6080 vdu_id = None
6081 kdu_index = int(vca_info["kdu_index"])
6082 vdu_name = None
6083 kdur = next(
6084 x
6085 for x in db_vnfr["kdur"]
6086 if x["kdu-name"] == kdu_name
6087 )
6088 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6089 if kdur.get("additionalParams"):
6090 deploy_params_kdu = parse_yaml_strings(
6091 kdur["additionalParams"]
6092 )
6093
6094 self._deploy_n2vc(
6095 logging_text=logging_text,
6096 db_nsr=db_nsr,
6097 db_vnfr=db_vnfr,
6098 nslcmop_id=nslcmop_id,
6099 nsr_id=nsr_id,
6100 nsi_id=nsi_id,
6101 vnfd_id=vnfd_id,
6102 vdu_id=vdu_id,
6103 kdu_name=kdu_name,
6104 member_vnf_index=member_vnf_index,
6105 vdu_index=kdu_index,
6106 vdu_name=vdu_name,
6107 deploy_params=deploy_params_kdu,
6108 descriptor_config=descriptor_config,
6109 base_folder=base_folder,
6110 task_instantiation_info=tasks_dict_info,
6111 stage=stage,
6112 )
6113 # SCALE-UP VCA - END
6114 scale_process = None
6115
6116 # POST-SCALE BEGIN
6117 # execute primitive service POST-SCALING
6118 step = "Executing post-scale vnf-config-primitive"
6119 if scaling_descriptor.get("scaling-config-action"):
6120 for scaling_config_action in scaling_descriptor[
6121 "scaling-config-action"
6122 ]:
6123 if (
6124 scaling_config_action.get("trigger") == "post-scale-in"
6125 and scaling_type == "SCALE_IN"
6126 ) or (
6127 scaling_config_action.get("trigger") == "post-scale-out"
6128 and scaling_type == "SCALE_OUT"
6129 ):
6130 vnf_config_primitive = scaling_config_action[
6131 "vnf-config-primitive-name-ref"
6132 ]
6133 step = db_nslcmop_update[
6134 "detailed-status"
6135 ] = "executing post-scale scaling-config-action '{}'".format(
6136 vnf_config_primitive
6137 )
6138
6139 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6140 if db_vnfr.get("additionalParamsForVnf"):
6141 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6142
6143 # look for primitive
6144 for config_primitive in (
6145 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6146 ).get("config-primitive", ()):
6147 if config_primitive["name"] == vnf_config_primitive:
6148 break
6149 else:
6150 raise LcmException(
6151 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6152 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6153 "config-primitive".format(
6154 scaling_group, vnf_config_primitive
6155 )
6156 )
6157 scale_process = "VCA"
6158 db_nsr_update["config-status"] = "configuring post-scaling"
6159 primitive_params = self._map_primitive_params(
6160 config_primitive, {}, vnfr_params
6161 )
6162
6163 # Post-scale retry check: Check if this sub-operation has been executed before
6164 op_index = self._check_or_add_scale_suboperation(
6165 db_nslcmop,
6166 vnf_index,
6167 vnf_config_primitive,
6168 primitive_params,
6169 "POST-SCALE",
6170 )
6171 if op_index == self.SUBOPERATION_STATUS_SKIP:
6172 # Skip sub-operation
6173 result = "COMPLETED"
6174 result_detail = "Done"
6175 self.logger.debug(
6176 logging_text
6177 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6178 vnf_config_primitive, result, result_detail
6179 )
6180 )
6181 else:
6182 if op_index == self.SUBOPERATION_STATUS_NEW:
6183 # New sub-operation: Get index of this sub-operation
6184 op_index = (
6185 len(db_nslcmop.get("_admin", {}).get("operations"))
6186 - 1
6187 )
6188 self.logger.debug(
6189 logging_text
6190 + "vnf_config_primitive={} New sub-operation".format(
6191 vnf_config_primitive
6192 )
6193 )
6194 else:
6195 # retry: Get registered params for this existing sub-operation
6196 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6197 op_index
6198 ]
6199 vnf_index = op.get("member_vnf_index")
6200 vnf_config_primitive = op.get("primitive")
6201 primitive_params = op.get("primitive_params")
6202 self.logger.debug(
6203 logging_text
6204 + "vnf_config_primitive={} Sub-operation retry".format(
6205 vnf_config_primitive
6206 )
6207 )
6208 # Execute the primitive, either with new (first-time) or registered (reintent) args
6209 ee_descriptor_id = config_primitive.get(
6210 "execution-environment-ref"
6211 )
6212 primitive_name = config_primitive.get(
6213 "execution-environment-primitive", vnf_config_primitive
6214 )
6215 ee_id, vca_type = self._look_for_deployed_vca(
6216 nsr_deployed["VCA"],
6217 member_vnf_index=vnf_index,
6218 vdu_id=None,
6219 vdu_count_index=None,
6220 ee_descriptor_id=ee_descriptor_id,
6221 )
6222 result, result_detail = await self._ns_execute_primitive(
6223 ee_id,
6224 primitive_name,
6225 primitive_params,
6226 vca_type=vca_type,
6227 vca_id=vca_id,
6228 )
6229 self.logger.debug(
6230 logging_text
6231 + "vnf_config_primitive={} Done with result {} {}".format(
6232 vnf_config_primitive, result, result_detail
6233 )
6234 )
6235 # Update operationState = COMPLETED | FAILED
6236 self._update_suboperation_status(
6237 db_nslcmop, op_index, result, result_detail
6238 )
6239
6240 if result == "FAILED":
6241 raise LcmException(result_detail)
6242 db_nsr_update["config-status"] = old_config_status
6243 scale_process = None
6244 # POST-SCALE END
6245
6246 db_nsr_update[
6247 "detailed-status"
6248 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6249 db_nsr_update["operational-status"] = (
6250 "running"
6251 if old_operational_status == "failed"
6252 else old_operational_status
6253 )
6254 db_nsr_update["config-status"] = old_config_status
6255 return
6256 except (
6257 ROclient.ROClientException,
6258 DbException,
6259 LcmException,
6260 NgRoException,
6261 ) as e:
6262 self.logger.error(logging_text + "Exit Exception {}".format(e))
6263 exc = e
6264 except asyncio.CancelledError:
6265 self.logger.error(
6266 logging_text + "Cancelled Exception while '{}'".format(step)
6267 )
6268 exc = "Operation was cancelled"
6269 except Exception as e:
6270 exc = traceback.format_exc()
6271 self.logger.critical(
6272 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6273 exc_info=True,
6274 )
6275 finally:
6276 self._write_ns_status(
6277 nsr_id=nsr_id,
6278 ns_state=None,
6279 current_operation="IDLE",
6280 current_operation_id=None,
6281 )
6282 if tasks_dict_info:
6283 stage[1] = "Waiting for instantiate pending tasks."
6284 self.logger.debug(logging_text + stage[1])
6285 exc = await self._wait_for_tasks(
6286 logging_text,
6287 tasks_dict_info,
6288 self.timeout_ns_deploy,
6289 stage,
6290 nslcmop_id,
6291 nsr_id=nsr_id,
6292 )
6293 if exc:
6294 db_nslcmop_update[
6295 "detailed-status"
6296 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6297 nslcmop_operation_state = "FAILED"
6298 if db_nsr:
6299 db_nsr_update["operational-status"] = old_operational_status
6300 db_nsr_update["config-status"] = old_config_status
6301 db_nsr_update["detailed-status"] = ""
6302 if scale_process:
6303 if "VCA" in scale_process:
6304 db_nsr_update["config-status"] = "failed"
6305 if "RO" in scale_process:
6306 db_nsr_update["operational-status"] = "failed"
6307 db_nsr_update[
6308 "detailed-status"
6309 ] = "FAILED scaling nslcmop={} {}: {}".format(
6310 nslcmop_id, step, exc
6311 )
6312 else:
6313 error_description_nslcmop = None
6314 nslcmop_operation_state = "COMPLETED"
6315 db_nslcmop_update["detailed-status"] = "Done"
6316
6317 self._write_op_status(
6318 op_id=nslcmop_id,
6319 stage="",
6320 error_message=error_description_nslcmop,
6321 operation_state=nslcmop_operation_state,
6322 other_update=db_nslcmop_update,
6323 )
6324 if db_nsr:
6325 self._write_ns_status(
6326 nsr_id=nsr_id,
6327 ns_state=None,
6328 current_operation="IDLE",
6329 current_operation_id=None,
6330 other_update=db_nsr_update,
6331 )
6332
6333 if nslcmop_operation_state:
6334 try:
6335 msg = {
6336 "nsr_id": nsr_id,
6337 "nslcmop_id": nslcmop_id,
6338 "operationState": nslcmop_operation_state,
6339 }
6340 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6341 except Exception as e:
6342 self.logger.error(
6343 logging_text + "kafka_write notification Exception {}".format(e)
6344 )
6345 self.logger.debug(logging_text + "Exit")
6346 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6347
6348 async def _scale_kdu(
6349 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6350 ):
6351 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6352 for kdu_name in _scaling_info:
6353 for kdu_scaling_info in _scaling_info[kdu_name]:
6354 deployed_kdu, index = get_deployed_kdu(
6355 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6356 )
6357 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6358 kdu_instance = deployed_kdu["kdu-instance"]
6359 scale = int(kdu_scaling_info["scale"])
6360 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6361
6362 db_dict = {
6363 "collection": "nsrs",
6364 "filter": {"_id": nsr_id},
6365 "path": "_admin.deployed.K8s.{}".format(index),
6366 }
6367
6368 step = "scaling application {}".format(
6369 kdu_scaling_info["resource-name"]
6370 )
6371 self.logger.debug(logging_text + step)
6372
6373 if kdu_scaling_info["type"] == "delete":
6374 kdu_config = get_configuration(db_vnfd, kdu_name)
6375 if (
6376 kdu_config
6377 and kdu_config.get("terminate-config-primitive")
6378 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6379 ):
6380 terminate_config_primitive_list = kdu_config.get(
6381 "terminate-config-primitive"
6382 )
6383 terminate_config_primitive_list.sort(
6384 key=lambda val: int(val["seq"])
6385 )
6386
6387 for (
6388 terminate_config_primitive
6389 ) in terminate_config_primitive_list:
6390 primitive_params_ = self._map_primitive_params(
6391 terminate_config_primitive, {}, {}
6392 )
6393 step = "execute terminate config primitive"
6394 self.logger.debug(logging_text + step)
6395 await asyncio.wait_for(
6396 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6397 cluster_uuid=cluster_uuid,
6398 kdu_instance=kdu_instance,
6399 primitive_name=terminate_config_primitive["name"],
6400 params=primitive_params_,
6401 db_dict=db_dict,
6402 vca_id=vca_id,
6403 ),
6404 timeout=600,
6405 )
6406
6407 await asyncio.wait_for(
6408 self.k8scluster_map[k8s_cluster_type].scale(
6409 kdu_instance,
6410 scale,
6411 kdu_scaling_info["resource-name"],
6412 vca_id=vca_id,
6413 ),
6414 timeout=self.timeout_vca_on_error,
6415 )
6416
6417 if kdu_scaling_info["type"] == "create":
6418 kdu_config = get_configuration(db_vnfd, kdu_name)
6419 if (
6420 kdu_config
6421 and kdu_config.get("initial-config-primitive")
6422 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6423 ):
6424 initial_config_primitive_list = kdu_config.get(
6425 "initial-config-primitive"
6426 )
6427 initial_config_primitive_list.sort(
6428 key=lambda val: int(val["seq"])
6429 )
6430
6431 for initial_config_primitive in initial_config_primitive_list:
6432 primitive_params_ = self._map_primitive_params(
6433 initial_config_primitive, {}, {}
6434 )
6435 step = "execute initial config primitive"
6436 self.logger.debug(logging_text + step)
6437 await asyncio.wait_for(
6438 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6439 cluster_uuid=cluster_uuid,
6440 kdu_instance=kdu_instance,
6441 primitive_name=initial_config_primitive["name"],
6442 params=primitive_params_,
6443 db_dict=db_dict,
6444 vca_id=vca_id,
6445 ),
6446 timeout=600,
6447 )
6448
6449 async def _scale_ng_ro(
6450 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6451 ):
6452 nsr_id = db_nslcmop["nsInstanceId"]
6453 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6454 db_vnfrs = {}
6455
6456 # read from db: vnfd's for every vnf
6457 db_vnfds = []
6458
6459 # for each vnf in ns, read vnfd
6460 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6461 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6462 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6463 # if we haven't this vnfd, read it from db
6464 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6465 # read from db
6466 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6467 db_vnfds.append(vnfd)
6468 n2vc_key = self.n2vc.get_public_key()
6469 n2vc_key_list = [n2vc_key]
6470 self.scale_vnfr(
6471 db_vnfr,
6472 vdu_scaling_info.get("vdu-create"),
6473 vdu_scaling_info.get("vdu-delete"),
6474 mark_delete=True,
6475 )
6476 # db_vnfr has been updated, update db_vnfrs to use it
6477 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6478 await self._instantiate_ng_ro(
6479 logging_text,
6480 nsr_id,
6481 db_nsd,
6482 db_nsr,
6483 db_nslcmop,
6484 db_vnfrs,
6485 db_vnfds,
6486 n2vc_key_list,
6487 stage=stage,
6488 start_deploy=time(),
6489 timeout_ns_deploy=self.timeout_ns_deploy,
6490 )
6491 if vdu_scaling_info.get("vdu-delete"):
6492 self.scale_vnfr(
6493 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6494 )
6495
6496 async def extract_prometheus_scrape_jobs(
6497 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6498 ):
6499 # look if exist a file called 'prometheus*.j2' and
6500 artifact_content = self.fs.dir_ls(artifact_path)
6501 job_file = next(
6502 (
6503 f
6504 for f in artifact_content
6505 if f.startswith("prometheus") and f.endswith(".j2")
6506 ),
6507 None,
6508 )
6509 if not job_file:
6510 return
6511 with self.fs.file_open((artifact_path, job_file), "r") as f:
6512 job_data = f.read()
6513
6514 # TODO get_service
6515 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6516 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6517 host_port = "80"
6518 vnfr_id = vnfr_id.replace("-", "")
6519 variables = {
6520 "JOB_NAME": vnfr_id,
6521 "TARGET_IP": target_ip,
6522 "EXPORTER_POD_IP": host_name,
6523 "EXPORTER_POD_PORT": host_port,
6524 }
6525 job_list = parse_job(job_data, variables)
6526 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6527 for job in job_list:
6528 if (
6529 not isinstance(job.get("job_name"), str)
6530 or vnfr_id not in job["job_name"]
6531 ):
6532 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6533 job["nsr_id"] = nsr_id
6534 job["vnfr_id"] = vnfr_id
6535 return job_list
6536
6537 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6538 """
6539 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6540
6541 :param: vim_account_id: VIM Account ID
6542
6543 :return: (cloud_name, cloud_credential)
6544 """
6545 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6546 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6547
6548 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6549 """
6550 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6551
6552 :param: vim_account_id: VIM Account ID
6553
6554 :return: (cloud_name, cloud_credential)
6555 """
6556 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6557 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")