Feature 10916 Remove VNF Instance from NS - NS Update
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import shutil
21 from typing import Any, Dict, List
22 import yaml
23 import logging
24 import logging.handlers
25 import traceback
26 import json
27 from jinja2 import (
28 Environment,
29 TemplateError,
30 TemplateNotFound,
31 StrictUndefined,
32 UndefinedError,
33 )
34
35 from osm_lcm import ROclient
36 from osm_lcm.data_utils.nsr import (
37 get_deployed_kdu,
38 get_deployed_vca,
39 get_deployed_vca_list,
40 get_nsd,
41 )
42 from osm_lcm.data_utils.vca import (
43 DeployedComponent,
44 DeployedK8sResource,
45 DeployedVCA,
46 EELevel,
47 Relation,
48 EERelation,
49 safe_get_ee_relation,
50 )
51 from osm_lcm.ng_ro import NgRoClient, NgRoException
52 from osm_lcm.lcm_utils import (
53 LcmException,
54 LcmExceptionNoMgmtIP,
55 LcmBase,
56 deep_get,
57 get_iterable,
58 populate_dict,
59 check_juju_bundle_existence,
60 get_charm_artifact_path,
61 )
62 from osm_lcm.data_utils.nsd import (
63 get_ns_configuration_relation_list,
64 get_vnf_profile,
65 get_vnf_profiles,
66 )
67 from osm_lcm.data_utils.vnfd import (
68 get_kdu,
69 get_kdu_services,
70 get_relation_list,
71 get_vdu_list,
72 get_vdu_profile,
73 get_ee_sorted_initial_config_primitive_list,
74 get_ee_sorted_terminate_config_primitive_list,
75 get_kdu_list,
76 get_virtual_link_profiles,
77 get_vdu,
78 get_configuration,
79 get_vdu_index,
80 get_scaling_aspect,
81 get_number_of_instances,
82 get_juju_ee_ref,
83 get_kdu_resource_profile,
84 find_software_version,
85 )
86 from osm_lcm.data_utils.list_utils import find_in_list
87 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
88 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
89 from osm_lcm.data_utils.database.vim_account import VimAccountDB
90 from n2vc.definitions import RelationEndpoint
91 from n2vc.k8s_helm_conn import K8sHelmConnector
92 from n2vc.k8s_helm3_conn import K8sHelm3Connector
93 from n2vc.k8s_juju_conn import K8sJujuConnector
94
95 from osm_common.dbbase import DbException
96 from osm_common.fsbase import FsException
97
98 from osm_lcm.data_utils.database.database import Database
99 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
100
101 from n2vc.n2vc_juju_conn import N2VCJujuConnector
102 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
103
104 from osm_lcm.lcm_helm_conn import LCMHelmConn
105 from osm_lcm.osm_config import OsmConfigBuilder
106 from osm_lcm.prometheus import parse_job
107
108 from copy import copy, deepcopy
109 from time import time
110 from uuid import uuid4
111
112 from random import randint
113
114 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
115
116
117 class NsLcm(LcmBase):
118 timeout_vca_on_error = (
119 5 * 60
120 ) # Time for charm from first time at blocked,error status to mark as failed
121 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
122 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
123 timeout_charm_delete = 10 * 60
124 timeout_primitive = 30 * 60 # timeout for primitive execution
125 timeout_ns_update = 30 * 60 # timeout for ns update
126 timeout_progress_primitive = (
127 10 * 60
128 ) # timeout for some progress in a primitive execution
129 timeout_migrate = 1800 # default global timeout for migrating vnfs
130
131 SUBOPERATION_STATUS_NOT_FOUND = -1
132 SUBOPERATION_STATUS_NEW = -2
133 SUBOPERATION_STATUS_SKIP = -3
134 task_name_deploy_vca = "Deploying VCA"
135
136 def __init__(self, msg, lcm_tasks, config, loop):
137 """
138 Init, Connect to database, filesystem storage, and messaging
139 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
140 :return: None
141 """
142 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
143
144 self.db = Database().instance.db
145 self.fs = Filesystem().instance.fs
146 self.loop = loop
147 self.lcm_tasks = lcm_tasks
148 self.timeout = config["timeout"]
149 self.ro_config = config["ro_config"]
150 self.ng_ro = config["ro_config"].get("ng")
151 self.vca_config = config["VCA"].copy()
152
153 # create N2VC connector
154 self.n2vc = N2VCJujuConnector(
155 log=self.logger,
156 loop=self.loop,
157 on_update_db=self._on_update_n2vc_db,
158 fs=self.fs,
159 db=self.db,
160 )
161
162 self.conn_helm_ee = LCMHelmConn(
163 log=self.logger,
164 loop=self.loop,
165 vca_config=self.vca_config,
166 on_update_db=self._on_update_n2vc_db,
167 )
168
169 self.k8sclusterhelm2 = K8sHelmConnector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helmpath"),
172 log=self.logger,
173 on_update_db=None,
174 fs=self.fs,
175 db=self.db,
176 )
177
178 self.k8sclusterhelm3 = K8sHelm3Connector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 helm_command=self.vca_config.get("helm3path"),
181 fs=self.fs,
182 log=self.logger,
183 db=self.db,
184 on_update_db=None,
185 )
186
187 self.k8sclusterjuju = K8sJujuConnector(
188 kubectl_command=self.vca_config.get("kubectlpath"),
189 juju_command=self.vca_config.get("jujupath"),
190 log=self.logger,
191 loop=self.loop,
192 on_update_db=self._on_update_k8s_db,
193 fs=self.fs,
194 db=self.db,
195 )
196
197 self.k8scluster_map = {
198 "helm-chart": self.k8sclusterhelm2,
199 "helm-chart-v3": self.k8sclusterhelm3,
200 "chart": self.k8sclusterhelm3,
201 "juju-bundle": self.k8sclusterjuju,
202 "juju": self.k8sclusterjuju,
203 }
204
205 self.vca_map = {
206 "lxc_proxy_charm": self.n2vc,
207 "native_charm": self.n2vc,
208 "k8s_proxy_charm": self.n2vc,
209 "helm": self.conn_helm_ee,
210 "helm-v3": self.conn_helm_ee,
211 }
212
213 # create RO client
214 self.RO = NgRoClient(self.loop, **self.ro_config)
215
216 @staticmethod
217 def increment_ip_mac(ip_mac, vm_index=1):
218 if not isinstance(ip_mac, str):
219 return ip_mac
220 try:
221 # try with ipv4 look for last dot
222 i = ip_mac.rfind(".")
223 if i > 0:
224 i += 1
225 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
226 # try with ipv6 or mac look for last colon. Operate in hex
227 i = ip_mac.rfind(":")
228 if i > 0:
229 i += 1
230 # format in hex, len can be 2 for mac or 4 for ipv6
231 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
232 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
233 )
234 except Exception:
235 pass
236 return None
237
238 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
239
240 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
241
242 try:
243 # TODO filter RO descriptor fields...
244
245 # write to database
246 db_dict = dict()
247 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
248 db_dict["deploymentStatus"] = ro_descriptor
249 self.update_db_2("nsrs", nsrs_id, db_dict)
250
251 except Exception as e:
252 self.logger.warn(
253 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
254 )
255
256 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
257
258 # remove last dot from path (if exists)
259 if path.endswith("."):
260 path = path[:-1]
261
262 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
263 # .format(table, filter, path, updated_data))
264 try:
265
266 nsr_id = filter.get("_id")
267
268 # read ns record from database
269 nsr = self.db.get_one(table="nsrs", q_filter=filter)
270 current_ns_status = nsr.get("nsState")
271
272 # get vca status for NS
273 status_dict = await self.n2vc.get_status(
274 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
275 )
276
277 # vcaStatus
278 db_dict = dict()
279 db_dict["vcaStatus"] = status_dict
280 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
281
282 # update configurationStatus for this VCA
283 try:
284 vca_index = int(path[path.rfind(".") + 1 :])
285
286 vca_list = deep_get(
287 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
288 )
289 vca_status = vca_list[vca_index].get("status")
290
291 configuration_status_list = nsr.get("configurationStatus")
292 config_status = configuration_status_list[vca_index].get("status")
293
294 if config_status == "BROKEN" and vca_status != "failed":
295 db_dict["configurationStatus"][vca_index] = "READY"
296 elif config_status != "BROKEN" and vca_status == "failed":
297 db_dict["configurationStatus"][vca_index] = "BROKEN"
298 except Exception as e:
299 # not update configurationStatus
300 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
301
302 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
303 # if nsState = 'DEGRADED' check if all is OK
304 is_degraded = False
305 if current_ns_status in ("READY", "DEGRADED"):
306 error_description = ""
307 # check machines
308 if status_dict.get("machines"):
309 for machine_id in status_dict.get("machines"):
310 machine = status_dict.get("machines").get(machine_id)
311 # check machine agent-status
312 if machine.get("agent-status"):
313 s = machine.get("agent-status").get("status")
314 if s != "started":
315 is_degraded = True
316 error_description += (
317 "machine {} agent-status={} ; ".format(
318 machine_id, s
319 )
320 )
321 # check machine instance status
322 if machine.get("instance-status"):
323 s = machine.get("instance-status").get("status")
324 if s != "running":
325 is_degraded = True
326 error_description += (
327 "machine {} instance-status={} ; ".format(
328 machine_id, s
329 )
330 )
331 # check applications
332 if status_dict.get("applications"):
333 for app_id in status_dict.get("applications"):
334 app = status_dict.get("applications").get(app_id)
335 # check application status
336 if app.get("status"):
337 s = app.get("status").get("status")
338 if s != "active":
339 is_degraded = True
340 error_description += (
341 "application {} status={} ; ".format(app_id, s)
342 )
343
344 if error_description:
345 db_dict["errorDescription"] = error_description
346 if current_ns_status == "READY" and is_degraded:
347 db_dict["nsState"] = "DEGRADED"
348 if current_ns_status == "DEGRADED" and not is_degraded:
349 db_dict["nsState"] = "READY"
350
351 # write to database
352 self.update_db_2("nsrs", nsr_id, db_dict)
353
354 except (asyncio.CancelledError, asyncio.TimeoutError):
355 raise
356 except Exception as e:
357 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
358
359 async def _on_update_k8s_db(
360 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
361 ):
362 """
363 Updating vca status in NSR record
364 :param cluster_uuid: UUID of a k8s cluster
365 :param kdu_instance: The unique name of the KDU instance
366 :param filter: To get nsr_id
367 :cluster_type: The cluster type (juju, k8s)
368 :return: none
369 """
370
371 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
372 # .format(cluster_uuid, kdu_instance, filter))
373
374 nsr_id = filter.get("_id")
375 try:
376 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
377 cluster_uuid=cluster_uuid,
378 kdu_instance=kdu_instance,
379 yaml_format=False,
380 complete_status=True,
381 vca_id=vca_id,
382 )
383
384 # vcaStatus
385 db_dict = dict()
386 db_dict["vcaStatus"] = {nsr_id: vca_status}
387
388 if cluster_type in ("juju-bundle", "juju"):
389 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
390 # status in a similar way between Juju Bundles and Helm Charts on this side
391 await self.k8sclusterjuju.update_vca_status(
392 db_dict["vcaStatus"],
393 kdu_instance,
394 vca_id=vca_id,
395 )
396
397 self.logger.debug(
398 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
399 )
400
401 # write to database
402 self.update_db_2("nsrs", nsr_id, db_dict)
403 except (asyncio.CancelledError, asyncio.TimeoutError):
404 raise
405 except Exception as e:
406 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
407
408 @staticmethod
409 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
410 try:
411 env = Environment(undefined=StrictUndefined)
412 template = env.from_string(cloud_init_text)
413 return template.render(additional_params or {})
414 except UndefinedError as e:
415 raise LcmException(
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
419 )
420 except (TemplateError, TemplateNotFound) as e:
421 raise LcmException(
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
423 vnfd_id, vdu_id, e
424 )
425 )
426
427 def _get_vdu_cloud_init_content(self, vdu, vnfd):
428 cloud_init_content = cloud_init_file = None
429 try:
430 if vdu.get("cloud-init-file"):
431 base_folder = vnfd["_admin"]["storage"]
432 if base_folder["pkg-dir"]:
433 cloud_init_file = "{}/{}/cloud_init/{}".format(
434 base_folder["folder"],
435 base_folder["pkg-dir"],
436 vdu["cloud-init-file"],
437 )
438 else:
439 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
440 base_folder["folder"],
441 vdu["cloud-init-file"],
442 )
443 with self.fs.file_open(cloud_init_file, "r") as ci_file:
444 cloud_init_content = ci_file.read()
445 elif vdu.get("cloud-init"):
446 cloud_init_content = vdu["cloud-init"]
447
448 return cloud_init_content
449 except FsException as e:
450 raise LcmException(
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd["id"], vdu["id"], cloud_init_file, e
453 )
454 )
455
456 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
457 vdur = next(
458 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
459 {}
460 )
461 additional_params = vdur.get("additionalParams")
462 return parse_yaml_strings(additional_params)
463
464 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
465 """
466 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
467 :param vnfd: input vnfd
468 :param new_id: overrides vnf id if provided
469 :param additionalParams: Instantiation params for VNFs provided
470 :param nsrId: Id of the NSR
471 :return: copy of vnfd
472 """
473 vnfd_RO = deepcopy(vnfd)
474 # remove unused by RO configuration, monitoring, scaling and internal keys
475 vnfd_RO.pop("_id", None)
476 vnfd_RO.pop("_admin", None)
477 vnfd_RO.pop("monitoring-param", None)
478 vnfd_RO.pop("scaling-group-descriptor", None)
479 vnfd_RO.pop("kdu", None)
480 vnfd_RO.pop("k8s-cluster", None)
481 if new_id:
482 vnfd_RO["id"] = new_id
483
484 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
485 for vdu in get_iterable(vnfd_RO, "vdu"):
486 vdu.pop("cloud-init-file", None)
487 vdu.pop("cloud-init", None)
488 return vnfd_RO
489
490 @staticmethod
491 def ip_profile_2_RO(ip_profile):
492 RO_ip_profile = deepcopy(ip_profile)
493 if "dns-server" in RO_ip_profile:
494 if isinstance(RO_ip_profile["dns-server"], list):
495 RO_ip_profile["dns-address"] = []
496 for ds in RO_ip_profile.pop("dns-server"):
497 RO_ip_profile["dns-address"].append(ds["address"])
498 else:
499 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
500 if RO_ip_profile.get("ip-version") == "ipv4":
501 RO_ip_profile["ip-version"] = "IPv4"
502 if RO_ip_profile.get("ip-version") == "ipv6":
503 RO_ip_profile["ip-version"] = "IPv6"
504 if "dhcp-params" in RO_ip_profile:
505 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
506 return RO_ip_profile
507
508 def _get_ro_vim_id_for_vim_account(self, vim_account):
509 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
510 if db_vim["_admin"]["operationalState"] != "ENABLED":
511 raise LcmException(
512 "VIM={} is not available. operationalState={}".format(
513 vim_account, db_vim["_admin"]["operationalState"]
514 )
515 )
516 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
517 return RO_vim_id
518
519 def get_ro_wim_id_for_wim_account(self, wim_account):
520 if isinstance(wim_account, str):
521 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
522 if db_wim["_admin"]["operationalState"] != "ENABLED":
523 raise LcmException(
524 "WIM={} is not available. operationalState={}".format(
525 wim_account, db_wim["_admin"]["operationalState"]
526 )
527 )
528 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
529 return RO_wim_id
530 else:
531 return wim_account
532
533 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
534
535 db_vdu_push_list = []
536 template_vdur = []
537 db_update = {"_admin.modified": time()}
538 if vdu_create:
539 for vdu_id, vdu_count in vdu_create.items():
540 vdur = next(
541 (
542 vdur
543 for vdur in reversed(db_vnfr["vdur"])
544 if vdur["vdu-id-ref"] == vdu_id
545 ),
546 None,
547 )
548 if not vdur:
549 # Read the template saved in the db:
550 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
551 vdur_template = db_vnfr.get("vdur-template")
552 if not vdur_template:
553 raise LcmException(
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
555 vdu_id
556 )
557 )
558 vdur = vdur_template[0]
559 #Delete a template from the database after using it
560 self.db.set_one("vnfrs",
561 {"_id": db_vnfr["_id"]},
562 None,
563 pull={"vdur-template": {"_id": vdur['_id']}}
564 )
565 for count in range(vdu_count):
566 vdur_copy = deepcopy(vdur)
567 vdur_copy["status"] = "BUILD"
568 vdur_copy["status-detailed"] = None
569 vdur_copy["ip-address"] = None
570 vdur_copy["_id"] = str(uuid4())
571 vdur_copy["count-index"] += count + 1
572 vdur_copy["id"] = "{}-{}".format(
573 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
574 )
575 vdur_copy.pop("vim_info", None)
576 for iface in vdur_copy["interfaces"]:
577 if iface.get("fixed-ip"):
578 iface["ip-address"] = self.increment_ip_mac(
579 iface["ip-address"], count + 1
580 )
581 else:
582 iface.pop("ip-address", None)
583 if iface.get("fixed-mac"):
584 iface["mac-address"] = self.increment_ip_mac(
585 iface["mac-address"], count + 1
586 )
587 else:
588 iface.pop("mac-address", None)
589 if db_vnfr["vdur"]:
590 iface.pop(
591 "mgmt_vnf", None
592 ) # only first vdu can be managment of vnf
593 db_vdu_push_list.append(vdur_copy)
594 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
595 if vdu_delete:
596 if len(db_vnfr["vdur"]) == 1:
597 # The scale will move to 0 instances
598 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
599 template_vdur = [db_vnfr["vdur"][0]]
600 for vdu_id, vdu_count in vdu_delete.items():
601 if mark_delete:
602 indexes_to_delete = [
603 iv[0]
604 for iv in enumerate(db_vnfr["vdur"])
605 if iv[1]["vdu-id-ref"] == vdu_id
606 ]
607 db_update.update(
608 {
609 "vdur.{}.status".format(i): "DELETING"
610 for i in indexes_to_delete[-vdu_count:]
611 }
612 )
613 else:
614 # it must be deleted one by one because common.db does not allow otherwise
615 vdus_to_delete = [
616 v
617 for v in reversed(db_vnfr["vdur"])
618 if v["vdu-id-ref"] == vdu_id
619 ]
620 for vdu in vdus_to_delete[:vdu_count]:
621 self.db.set_one(
622 "vnfrs",
623 {"_id": db_vnfr["_id"]},
624 None,
625 pull={"vdur": {"_id": vdu["_id"]}},
626 )
627 db_push = {}
628 if db_vdu_push_list:
629 db_push["vdur"] = db_vdu_push_list
630 if template_vdur:
631 db_push["vdur-template"] = template_vdur
632 if not db_push:
633 db_push = None
634 db_vnfr["vdur-template"] = template_vdur
635 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
636 # modify passed dictionary db_vnfr
637 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
638 db_vnfr["vdur"] = db_vnfr_["vdur"]
639
640 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
641 """
642 Updates database nsr with the RO info for the created vld
643 :param ns_update_nsr: dictionary to be filled with the updated info
644 :param db_nsr: content of db_nsr. This is also modified
645 :param nsr_desc_RO: nsr descriptor from RO
646 :return: Nothing, LcmException is raised on errors
647 """
648
649 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
650 for net_RO in get_iterable(nsr_desc_RO, "nets"):
651 if vld["id"] != net_RO.get("ns_net_osm_id"):
652 continue
653 vld["vim-id"] = net_RO.get("vim_net_id")
654 vld["name"] = net_RO.get("vim_name")
655 vld["status"] = net_RO.get("status")
656 vld["status-detailed"] = net_RO.get("error_msg")
657 ns_update_nsr["vld.{}".format(vld_index)] = vld
658 break
659 else:
660 raise LcmException(
661 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
662 )
663
664 def set_vnfr_at_error(self, db_vnfrs, error_text):
665 try:
666 for db_vnfr in db_vnfrs.values():
667 vnfr_update = {"status": "ERROR"}
668 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
669 if "status" not in vdur:
670 vdur["status"] = "ERROR"
671 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
672 if error_text:
673 vdur["status-detailed"] = str(error_text)
674 vnfr_update[
675 "vdur.{}.status-detailed".format(vdu_index)
676 ] = "ERROR"
677 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
678 except DbException as e:
679 self.logger.error("Cannot update vnf. {}".format(e))
680
681 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
682 """
683 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
684 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
685 :param nsr_desc_RO: nsr descriptor from RO
686 :return: Nothing, LcmException is raised on errors
687 """
688 for vnf_index, db_vnfr in db_vnfrs.items():
689 for vnf_RO in nsr_desc_RO["vnfs"]:
690 if vnf_RO["member_vnf_index"] != vnf_index:
691 continue
692 vnfr_update = {}
693 if vnf_RO.get("ip_address"):
694 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
695 "ip_address"
696 ].split(";")[0]
697 elif not db_vnfr.get("ip-address"):
698 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
699 raise LcmExceptionNoMgmtIP(
700 "ns member_vnf_index '{}' has no IP address".format(
701 vnf_index
702 )
703 )
704
705 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
706 vdur_RO_count_index = 0
707 if vdur.get("pdu-type"):
708 continue
709 for vdur_RO in get_iterable(vnf_RO, "vms"):
710 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
711 continue
712 if vdur["count-index"] != vdur_RO_count_index:
713 vdur_RO_count_index += 1
714 continue
715 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
716 if vdur_RO.get("ip_address"):
717 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
718 else:
719 vdur["ip-address"] = None
720 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
721 vdur["name"] = vdur_RO.get("vim_name")
722 vdur["status"] = vdur_RO.get("status")
723 vdur["status-detailed"] = vdur_RO.get("error_msg")
724 for ifacer in get_iterable(vdur, "interfaces"):
725 for interface_RO in get_iterable(vdur_RO, "interfaces"):
726 if ifacer["name"] == interface_RO.get("internal_name"):
727 ifacer["ip-address"] = interface_RO.get(
728 "ip_address"
729 )
730 ifacer["mac-address"] = interface_RO.get(
731 "mac_address"
732 )
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
737 "from VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
739 )
740 )
741 vnfr_update["vdur.{}".format(vdu_index)] = vdur
742 break
743 else:
744 raise LcmException(
745 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 "VIM info".format(
747 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
748 )
749 )
750
751 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
752 for net_RO in get_iterable(nsr_desc_RO, "nets"):
753 if vld["id"] != net_RO.get("vnf_net_osm_id"):
754 continue
755 vld["vim-id"] = net_RO.get("vim_net_id")
756 vld["name"] = net_RO.get("vim_name")
757 vld["status"] = net_RO.get("status")
758 vld["status-detailed"] = net_RO.get("error_msg")
759 vnfr_update["vld.{}".format(vld_index)] = vld
760 break
761 else:
762 raise LcmException(
763 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
764 vnf_index, vld["id"]
765 )
766 )
767
768 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
769 break
770
771 else:
772 raise LcmException(
773 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
774 vnf_index
775 )
776 )
777
778 def _get_ns_config_info(self, nsr_id):
779 """
780 Generates a mapping between vnf,vdu elements and the N2VC id
781 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
782 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
783 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
784 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 """
786 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
787 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
788 mapping = {}
789 ns_config_info = {"osm-config-mapping": mapping}
790 for vca in vca_deployed_list:
791 if not vca["member-vnf-index"]:
792 continue
793 if not vca["vdu_id"]:
794 mapping[vca["member-vnf-index"]] = vca["application"]
795 else:
796 mapping[
797 "{}.{}.{}".format(
798 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
799 )
800 ] = vca["application"]
801 return ns_config_info
802
803 async def _instantiate_ng_ro(
804 self,
805 logging_text,
806 nsr_id,
807 nsd,
808 db_nsr,
809 db_nslcmop,
810 db_vnfrs,
811 db_vnfds,
812 n2vc_key_list,
813 stage,
814 start_deploy,
815 timeout_ns_deploy,
816 ):
817
818 db_vims = {}
819
820 def get_vim_account(vim_account_id):
821 nonlocal db_vims
822 if vim_account_id in db_vims:
823 return db_vims[vim_account_id]
824 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
825 db_vims[vim_account_id] = db_vim
826 return db_vim
827
828 # modify target_vld info with instantiation parameters
829 def parse_vld_instantiation_params(
830 target_vim, target_vld, vld_params, target_sdn
831 ):
832 if vld_params.get("ip-profile"):
833 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
834 "ip-profile"
835 ]
836 if vld_params.get("provider-network"):
837 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
838 "provider-network"
839 ]
840 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
841 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
842 "provider-network"
843 ]["sdn-ports"]
844 if vld_params.get("wimAccountId"):
845 target_wim = "wim:{}".format(vld_params["wimAccountId"])
846 target_vld["vim_info"][target_wim] = {}
847 for param in ("vim-network-name", "vim-network-id"):
848 if vld_params.get(param):
849 if isinstance(vld_params[param], dict):
850 for vim, vim_net in vld_params[param].items():
851 other_target_vim = "vim:" + vim
852 populate_dict(
853 target_vld["vim_info"],
854 (other_target_vim, param.replace("-", "_")),
855 vim_net,
856 )
857 else: # isinstance str
858 target_vld["vim_info"][target_vim][
859 param.replace("-", "_")
860 ] = vld_params[param]
861 if vld_params.get("common_id"):
862 target_vld["common_id"] = vld_params.get("common_id")
863
864 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
865 def update_ns_vld_target(target, ns_params):
866 for vnf_params in ns_params.get("vnf", ()):
867 if vnf_params.get("vimAccountId"):
868 target_vnf = next(
869 (
870 vnfr
871 for vnfr in db_vnfrs.values()
872 if vnf_params["member-vnf-index"]
873 == vnfr["member-vnf-index-ref"]
874 ),
875 None,
876 )
877 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
878 for a_index, a_vld in enumerate(target["ns"]["vld"]):
879 target_vld = find_in_list(
880 get_iterable(vdur, "interfaces"),
881 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
882 )
883 if target_vld:
884 if vnf_params.get("vimAccountId") not in a_vld.get(
885 "vim_info", {}
886 ):
887 target["ns"]["vld"][a_index].get("vim_info").update(
888 {
889 "vim:{}".format(vnf_params["vimAccountId"]): {
890 "vim_network_name": ""
891 }
892 }
893 )
894
895 nslcmop_id = db_nslcmop["_id"]
896 target = {
897 "name": db_nsr["name"],
898 "ns": {"vld": []},
899 "vnf": [],
900 "image": deepcopy(db_nsr["image"]),
901 "flavor": deepcopy(db_nsr["flavor"]),
902 "action_id": nslcmop_id,
903 "cloud_init_content": {},
904 }
905 for image in target["image"]:
906 image["vim_info"] = {}
907 for flavor in target["flavor"]:
908 flavor["vim_info"] = {}
909 if db_nsr.get("affinity-or-anti-affinity-group"):
910 target["affinity-or-anti-affinity-group"] = deepcopy(
911 db_nsr["affinity-or-anti-affinity-group"]
912 )
913 for affinity_or_anti_affinity_group in target[
914 "affinity-or-anti-affinity-group"
915 ]:
916 affinity_or_anti_affinity_group["vim_info"] = {}
917
918 if db_nslcmop.get("lcmOperationType") != "instantiate":
919 # get parameters of instantiation:
920 db_nslcmop_instantiate = self.db.get_list(
921 "nslcmops",
922 {
923 "nsInstanceId": db_nslcmop["nsInstanceId"],
924 "lcmOperationType": "instantiate",
925 },
926 )[-1]
927 ns_params = db_nslcmop_instantiate.get("operationParams")
928 else:
929 ns_params = db_nslcmop.get("operationParams")
930 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
931 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
932
933 cp2target = {}
934 for vld_index, vld in enumerate(db_nsr.get("vld")):
935 target_vim = "vim:{}".format(ns_params["vimAccountId"])
936 target_vld = {
937 "id": vld["id"],
938 "name": vld["name"],
939 "mgmt-network": vld.get("mgmt-network", False),
940 "type": vld.get("type"),
941 "vim_info": {
942 target_vim: {
943 "vim_network_name": vld.get("vim-network-name"),
944 "vim_account_id": ns_params["vimAccountId"],
945 }
946 },
947 }
948 # check if this network needs SDN assist
949 if vld.get("pci-interfaces"):
950 db_vim = get_vim_account(ns_params["vimAccountId"])
951 sdnc_id = db_vim["config"].get("sdn-controller")
952 if sdnc_id:
953 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
954 target_sdn = "sdn:{}".format(sdnc_id)
955 target_vld["vim_info"][target_sdn] = {
956 "sdn": True,
957 "target_vim": target_vim,
958 "vlds": [sdn_vld],
959 "type": vld.get("type"),
960 }
961
962 nsd_vnf_profiles = get_vnf_profiles(nsd)
963 for nsd_vnf_profile in nsd_vnf_profiles:
964 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
965 if cp["virtual-link-profile-id"] == vld["id"]:
966 cp2target[
967 "member_vnf:{}.{}".format(
968 cp["constituent-cpd-id"][0][
969 "constituent-base-element-id"
970 ],
971 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
972 )
973 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
974
975 # check at nsd descriptor, if there is an ip-profile
976 vld_params = {}
977 nsd_vlp = find_in_list(
978 get_virtual_link_profiles(nsd),
979 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
980 == vld["id"],
981 )
982 if (
983 nsd_vlp
984 and nsd_vlp.get("virtual-link-protocol-data")
985 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
986 ):
987 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
988 "l3-protocol-data"
989 ]
990 ip_profile_dest_data = {}
991 if "ip-version" in ip_profile_source_data:
992 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
993 "ip-version"
994 ]
995 if "cidr" in ip_profile_source_data:
996 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
997 "cidr"
998 ]
999 if "gateway-ip" in ip_profile_source_data:
1000 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
1001 "gateway-ip"
1002 ]
1003 if "dhcp-enabled" in ip_profile_source_data:
1004 ip_profile_dest_data["dhcp-params"] = {
1005 "enabled": ip_profile_source_data["dhcp-enabled"]
1006 }
1007 vld_params["ip-profile"] = ip_profile_dest_data
1008
1009 # update vld_params with instantiation params
1010 vld_instantiation_params = find_in_list(
1011 get_iterable(ns_params, "vld"),
1012 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1013 )
1014 if vld_instantiation_params:
1015 vld_params.update(vld_instantiation_params)
1016 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1017 target["ns"]["vld"].append(target_vld)
1018 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1019 update_ns_vld_target(target, ns_params)
1020
1021 for vnfr in db_vnfrs.values():
1022 vnfd = find_in_list(
1023 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1024 )
1025 vnf_params = find_in_list(
1026 get_iterable(ns_params, "vnf"),
1027 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1028 )
1029 target_vnf = deepcopy(vnfr)
1030 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1031 for vld in target_vnf.get("vld", ()):
1032 # check if connected to a ns.vld, to fill target'
1033 vnf_cp = find_in_list(
1034 vnfd.get("int-virtual-link-desc", ()),
1035 lambda cpd: cpd.get("id") == vld["id"],
1036 )
1037 if vnf_cp:
1038 ns_cp = "member_vnf:{}.{}".format(
1039 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1040 )
1041 if cp2target.get(ns_cp):
1042 vld["target"] = cp2target[ns_cp]
1043
1044 vld["vim_info"] = {
1045 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1046 }
1047 # check if this network needs SDN assist
1048 target_sdn = None
1049 if vld.get("pci-interfaces"):
1050 db_vim = get_vim_account(vnfr["vim-account-id"])
1051 sdnc_id = db_vim["config"].get("sdn-controller")
1052 if sdnc_id:
1053 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1054 target_sdn = "sdn:{}".format(sdnc_id)
1055 vld["vim_info"][target_sdn] = {
1056 "sdn": True,
1057 "target_vim": target_vim,
1058 "vlds": [sdn_vld],
1059 "type": vld.get("type"),
1060 }
1061
1062 # check at vnfd descriptor, if there is an ip-profile
1063 vld_params = {}
1064 vnfd_vlp = find_in_list(
1065 get_virtual_link_profiles(vnfd),
1066 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1067 )
1068 if (
1069 vnfd_vlp
1070 and vnfd_vlp.get("virtual-link-protocol-data")
1071 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1072 ):
1073 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1074 "l3-protocol-data"
1075 ]
1076 ip_profile_dest_data = {}
1077 if "ip-version" in ip_profile_source_data:
1078 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1079 "ip-version"
1080 ]
1081 if "cidr" in ip_profile_source_data:
1082 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1083 "cidr"
1084 ]
1085 if "gateway-ip" in ip_profile_source_data:
1086 ip_profile_dest_data[
1087 "gateway-address"
1088 ] = ip_profile_source_data["gateway-ip"]
1089 if "dhcp-enabled" in ip_profile_source_data:
1090 ip_profile_dest_data["dhcp-params"] = {
1091 "enabled": ip_profile_source_data["dhcp-enabled"]
1092 }
1093
1094 vld_params["ip-profile"] = ip_profile_dest_data
1095 # update vld_params with instantiation params
1096 if vnf_params:
1097 vld_instantiation_params = find_in_list(
1098 get_iterable(vnf_params, "internal-vld"),
1099 lambda i_vld: i_vld["name"] == vld["id"],
1100 )
1101 if vld_instantiation_params:
1102 vld_params.update(vld_instantiation_params)
1103 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1104
1105 vdur_list = []
1106 for vdur in target_vnf.get("vdur", ()):
1107 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1108 continue # This vdu must not be created
1109 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1110
1111 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1112
1113 if ssh_keys_all:
1114 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1115 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1116 if (
1117 vdu_configuration
1118 and vdu_configuration.get("config-access")
1119 and vdu_configuration.get("config-access").get("ssh-access")
1120 ):
1121 vdur["ssh-keys"] = ssh_keys_all
1122 vdur["ssh-access-required"] = vdu_configuration[
1123 "config-access"
1124 ]["ssh-access"]["required"]
1125 elif (
1126 vnf_configuration
1127 and vnf_configuration.get("config-access")
1128 and vnf_configuration.get("config-access").get("ssh-access")
1129 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1130 ):
1131 vdur["ssh-keys"] = ssh_keys_all
1132 vdur["ssh-access-required"] = vnf_configuration[
1133 "config-access"
1134 ]["ssh-access"]["required"]
1135 elif ssh_keys_instantiation and find_in_list(
1136 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1137 ):
1138 vdur["ssh-keys"] = ssh_keys_instantiation
1139
1140 self.logger.debug("NS > vdur > {}".format(vdur))
1141
1142 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1143 # cloud-init
1144 if vdud.get("cloud-init-file"):
1145 vdur["cloud-init"] = "{}:file:{}".format(
1146 vnfd["_id"], vdud.get("cloud-init-file")
1147 )
1148 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1149 if vdur["cloud-init"] not in target["cloud_init_content"]:
1150 base_folder = vnfd["_admin"]["storage"]
1151 if base_folder["pkg-dir"]:
1152 cloud_init_file = "{}/{}/cloud_init/{}".format(
1153 base_folder["folder"],
1154 base_folder["pkg-dir"],
1155 vdud.get("cloud-init-file"),
1156 )
1157 else:
1158 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1159 base_folder["folder"],
1160 vdud.get("cloud-init-file"),
1161 )
1162 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1163 target["cloud_init_content"][
1164 vdur["cloud-init"]
1165 ] = ci_file.read()
1166 elif vdud.get("cloud-init"):
1167 vdur["cloud-init"] = "{}:vdu:{}".format(
1168 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1169 )
1170 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1171 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1172 "cloud-init"
1173 ]
1174 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1175 deploy_params_vdu = self._format_additional_params(
1176 vdur.get("additionalParams") or {}
1177 )
1178 deploy_params_vdu["OSM"] = get_osm_params(
1179 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1180 )
1181 vdur["additionalParams"] = deploy_params_vdu
1182
1183 # flavor
1184 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1185 if target_vim not in ns_flavor["vim_info"]:
1186 ns_flavor["vim_info"][target_vim] = {}
1187
1188 # deal with images
1189 # in case alternative images are provided we must check if they should be applied
1190 # for the vim_type, modify the vim_type taking into account
1191 ns_image_id = int(vdur["ns-image-id"])
1192 if vdur.get("alt-image-ids"):
1193 db_vim = get_vim_account(vnfr["vim-account-id"])
1194 vim_type = db_vim["vim_type"]
1195 for alt_image_id in vdur.get("alt-image-ids"):
1196 ns_alt_image = target["image"][int(alt_image_id)]
1197 if vim_type == ns_alt_image.get("vim-type"):
1198 # must use alternative image
1199 self.logger.debug(
1200 "use alternative image id: {}".format(alt_image_id)
1201 )
1202 ns_image_id = alt_image_id
1203 vdur["ns-image-id"] = ns_image_id
1204 break
1205 ns_image = target["image"][int(ns_image_id)]
1206 if target_vim not in ns_image["vim_info"]:
1207 ns_image["vim_info"][target_vim] = {}
1208
1209 # Affinity groups
1210 if vdur.get("affinity-or-anti-affinity-group-id"):
1211 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1212 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1213 if target_vim not in ns_ags["vim_info"]:
1214 ns_ags["vim_info"][target_vim] = {}
1215
1216 vdur["vim_info"] = {target_vim: {}}
1217 # instantiation parameters
1218 # if vnf_params:
1219 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1220 # vdud["id"]), None)
1221 vdur_list.append(vdur)
1222 target_vnf["vdur"] = vdur_list
1223 target["vnf"].append(target_vnf)
1224
1225 desc = await self.RO.deploy(nsr_id, target)
1226 self.logger.debug("RO return > {}".format(desc))
1227 action_id = desc["action_id"]
1228 await self._wait_ng_ro(
1229 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1230 )
1231
1232 # Updating NSR
1233 db_nsr_update = {
1234 "_admin.deployed.RO.operational-status": "running",
1235 "detailed-status": " ".join(stage),
1236 }
1237 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1238 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1239 self._write_op_status(nslcmop_id, stage)
1240 self.logger.debug(
1241 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1242 )
1243 return
1244
1245 async def _wait_ng_ro(
1246 self,
1247 nsr_id,
1248 action_id,
1249 nslcmop_id=None,
1250 start_time=None,
1251 timeout=600,
1252 stage=None,
1253 ):
1254 detailed_status_old = None
1255 db_nsr_update = {}
1256 start_time = start_time or time()
1257 while time() <= start_time + timeout:
1258 desc_status = await self.RO.status(nsr_id, action_id)
1259 self.logger.debug("Wait NG RO > {}".format(desc_status))
1260 if desc_status["status"] == "FAILED":
1261 raise NgRoException(desc_status["details"])
1262 elif desc_status["status"] == "BUILD":
1263 if stage:
1264 stage[2] = "VIM: ({})".format(desc_status["details"])
1265 elif desc_status["status"] == "DONE":
1266 if stage:
1267 stage[2] = "Deployed at VIM"
1268 break
1269 else:
1270 assert False, "ROclient.check_ns_status returns unknown {}".format(
1271 desc_status["status"]
1272 )
1273 if stage and nslcmop_id and stage[2] != detailed_status_old:
1274 detailed_status_old = stage[2]
1275 db_nsr_update["detailed-status"] = " ".join(stage)
1276 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1277 self._write_op_status(nslcmop_id, stage)
1278 await asyncio.sleep(15, loop=self.loop)
1279 else: # timeout_ns_deploy
1280 raise NgRoException("Timeout waiting ns to deploy")
1281
1282 async def _terminate_ng_ro(
1283 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1284 ):
1285 db_nsr_update = {}
1286 failed_detail = []
1287 action_id = None
1288 start_deploy = time()
1289 try:
1290 target = {
1291 "ns": {"vld": []},
1292 "vnf": [],
1293 "image": [],
1294 "flavor": [],
1295 "action_id": nslcmop_id,
1296 }
1297 desc = await self.RO.deploy(nsr_id, target)
1298 action_id = desc["action_id"]
1299 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1301 self.logger.debug(
1302 logging_text
1303 + "ns terminate action at RO. action_id={}".format(action_id)
1304 )
1305
1306 # wait until done
1307 delete_timeout = 20 * 60 # 20 minutes
1308 await self._wait_ng_ro(
1309 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1310 )
1311
1312 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1313 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1314 # delete all nsr
1315 await self.RO.delete(nsr_id)
1316 except Exception as e:
1317 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1318 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1319 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1320 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1321 self.logger.debug(
1322 logging_text + "RO_action_id={} already deleted".format(action_id)
1323 )
1324 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1325 failed_detail.append("delete conflict: {}".format(e))
1326 self.logger.debug(
1327 logging_text
1328 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1329 )
1330 else:
1331 failed_detail.append("delete error: {}".format(e))
1332 self.logger.error(
1333 logging_text
1334 + "RO_action_id={} delete error: {}".format(action_id, e)
1335 )
1336
1337 if failed_detail:
1338 stage[2] = "Error deleting from VIM"
1339 else:
1340 stage[2] = "Deleted from VIM"
1341 db_nsr_update["detailed-status"] = " ".join(stage)
1342 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1343 self._write_op_status(nslcmop_id, stage)
1344
1345 if failed_detail:
1346 raise LcmException("; ".join(failed_detail))
1347 return
1348
1349 async def instantiate_RO(
1350 self,
1351 logging_text,
1352 nsr_id,
1353 nsd,
1354 db_nsr,
1355 db_nslcmop,
1356 db_vnfrs,
1357 db_vnfds,
1358 n2vc_key_list,
1359 stage,
1360 ):
1361 """
1362 Instantiate at RO
1363 :param logging_text: preffix text to use at logging
1364 :param nsr_id: nsr identity
1365 :param nsd: database content of ns descriptor
1366 :param db_nsr: database content of ns record
1367 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1368 :param db_vnfrs:
1369 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1370 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1371 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1372 :return: None or exception
1373 """
1374 try:
1375 start_deploy = time()
1376 ns_params = db_nslcmop.get("operationParams")
1377 if ns_params and ns_params.get("timeout_ns_deploy"):
1378 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1379 else:
1380 timeout_ns_deploy = self.timeout.get(
1381 "ns_deploy", self.timeout_ns_deploy
1382 )
1383
1384 # Check for and optionally request placement optimization. Database will be updated if placement activated
1385 stage[2] = "Waiting for Placement."
1386 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1387 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1388 for vnfr in db_vnfrs.values():
1389 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1390 break
1391 else:
1392 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1393
1394 return await self._instantiate_ng_ro(
1395 logging_text,
1396 nsr_id,
1397 nsd,
1398 db_nsr,
1399 db_nslcmop,
1400 db_vnfrs,
1401 db_vnfds,
1402 n2vc_key_list,
1403 stage,
1404 start_deploy,
1405 timeout_ns_deploy,
1406 )
1407 except Exception as e:
1408 stage[2] = "ERROR deploying at VIM"
1409 self.set_vnfr_at_error(db_vnfrs, str(e))
1410 self.logger.error(
1411 "Error deploying at VIM {}".format(e),
1412 exc_info=not isinstance(
1413 e,
1414 (
1415 ROclient.ROClientException,
1416 LcmException,
1417 DbException,
1418 NgRoException,
1419 ),
1420 ),
1421 )
1422 raise
1423
1424 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1425 """
1426 Wait for kdu to be up, get ip address
1427 :param logging_text: prefix use for logging
1428 :param nsr_id:
1429 :param vnfr_id:
1430 :param kdu_name:
1431 :return: IP address, K8s services
1432 """
1433
1434 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1435 nb_tries = 0
1436
1437 while nb_tries < 360:
1438 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1439 kdur = next(
1440 (
1441 x
1442 for x in get_iterable(db_vnfr, "kdur")
1443 if x.get("kdu-name") == kdu_name
1444 ),
1445 None,
1446 )
1447 if not kdur:
1448 raise LcmException(
1449 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1450 )
1451 if kdur.get("status"):
1452 if kdur["status"] in ("READY", "ENABLED"):
1453 return kdur.get("ip-address"), kdur.get("services")
1454 else:
1455 raise LcmException(
1456 "target KDU={} is in error state".format(kdu_name)
1457 )
1458
1459 await asyncio.sleep(10, loop=self.loop)
1460 nb_tries += 1
1461 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1462
1463 async def wait_vm_up_insert_key_ro(
1464 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1465 ):
1466 """
1467 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1468 :param logging_text: prefix use for logging
1469 :param nsr_id:
1470 :param vnfr_id:
1471 :param vdu_id:
1472 :param vdu_index:
1473 :param pub_key: public ssh key to inject, None to skip
1474 :param user: user to apply the public ssh key
1475 :return: IP address
1476 """
1477
1478 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1479 ro_nsr_id = None
1480 ip_address = None
1481 nb_tries = 0
1482 target_vdu_id = None
1483 ro_retries = 0
1484
1485 while True:
1486
1487 ro_retries += 1
1488 if ro_retries >= 360: # 1 hour
1489 raise LcmException(
1490 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1491 )
1492
1493 await asyncio.sleep(10, loop=self.loop)
1494
1495 # get ip address
1496 if not target_vdu_id:
1497 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1498
1499 if not vdu_id: # for the VNF case
1500 if db_vnfr.get("status") == "ERROR":
1501 raise LcmException(
1502 "Cannot inject ssh-key because target VNF is in error state"
1503 )
1504 ip_address = db_vnfr.get("ip-address")
1505 if not ip_address:
1506 continue
1507 vdur = next(
1508 (
1509 x
1510 for x in get_iterable(db_vnfr, "vdur")
1511 if x.get("ip-address") == ip_address
1512 ),
1513 None,
1514 )
1515 else: # VDU case
1516 vdur = next(
1517 (
1518 x
1519 for x in get_iterable(db_vnfr, "vdur")
1520 if x.get("vdu-id-ref") == vdu_id
1521 and x.get("count-index") == vdu_index
1522 ),
1523 None,
1524 )
1525
1526 if (
1527 not vdur and len(db_vnfr.get("vdur", ())) == 1
1528 ): # If only one, this should be the target vdu
1529 vdur = db_vnfr["vdur"][0]
1530 if not vdur:
1531 raise LcmException(
1532 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1533 vnfr_id, vdu_id, vdu_index
1534 )
1535 )
1536 # New generation RO stores information at "vim_info"
1537 ng_ro_status = None
1538 target_vim = None
1539 if vdur.get("vim_info"):
1540 target_vim = next(
1541 t for t in vdur["vim_info"]
1542 ) # there should be only one key
1543 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1544 if (
1545 vdur.get("pdu-type")
1546 or vdur.get("status") == "ACTIVE"
1547 or ng_ro_status == "ACTIVE"
1548 ):
1549 ip_address = vdur.get("ip-address")
1550 if not ip_address:
1551 continue
1552 target_vdu_id = vdur["vdu-id-ref"]
1553 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1554 raise LcmException(
1555 "Cannot inject ssh-key because target VM is in error state"
1556 )
1557
1558 if not target_vdu_id:
1559 continue
1560
1561 # inject public key into machine
1562 if pub_key and user:
1563 self.logger.debug(logging_text + "Inserting RO key")
1564 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1565 if vdur.get("pdu-type"):
1566 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1567 return ip_address
1568 try:
1569 ro_vm_id = "{}-{}".format(
1570 db_vnfr["member-vnf-index-ref"], target_vdu_id
1571 ) # TODO add vdu_index
1572 if self.ng_ro:
1573 target = {
1574 "action": {
1575 "action": "inject_ssh_key",
1576 "key": pub_key,
1577 "user": user,
1578 },
1579 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1580 }
1581 desc = await self.RO.deploy(nsr_id, target)
1582 action_id = desc["action_id"]
1583 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1584 break
1585 else:
1586 # wait until NS is deployed at RO
1587 if not ro_nsr_id:
1588 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1589 ro_nsr_id = deep_get(
1590 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1591 )
1592 if not ro_nsr_id:
1593 continue
1594 result_dict = await self.RO.create_action(
1595 item="ns",
1596 item_id_name=ro_nsr_id,
1597 descriptor={
1598 "add_public_key": pub_key,
1599 "vms": [ro_vm_id],
1600 "user": user,
1601 },
1602 )
1603 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1604 if not result_dict or not isinstance(result_dict, dict):
1605 raise LcmException(
1606 "Unknown response from RO when injecting key"
1607 )
1608 for result in result_dict.values():
1609 if result.get("vim_result") == 200:
1610 break
1611 else:
1612 raise ROclient.ROClientException(
1613 "error injecting key: {}".format(
1614 result.get("description")
1615 )
1616 )
1617 break
1618 except NgRoException as e:
1619 raise LcmException(
1620 "Reaching max tries injecting key. Error: {}".format(e)
1621 )
1622 except ROclient.ROClientException as e:
1623 if not nb_tries:
1624 self.logger.debug(
1625 logging_text
1626 + "error injecting key: {}. Retrying until {} seconds".format(
1627 e, 20 * 10
1628 )
1629 )
1630 nb_tries += 1
1631 if nb_tries >= 20:
1632 raise LcmException(
1633 "Reaching max tries injecting key. Error: {}".format(e)
1634 )
1635 else:
1636 break
1637
1638 return ip_address
1639
1640 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1641 """
1642 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1643 """
1644 my_vca = vca_deployed_list[vca_index]
1645 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1646 # vdu or kdu: no dependencies
1647 return
1648 timeout = 300
1649 while timeout >= 0:
1650 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1651 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1652 configuration_status_list = db_nsr["configurationStatus"]
1653 for index, vca_deployed in enumerate(configuration_status_list):
1654 if index == vca_index:
1655 # myself
1656 continue
1657 if not my_vca.get("member-vnf-index") or (
1658 vca_deployed.get("member-vnf-index")
1659 == my_vca.get("member-vnf-index")
1660 ):
1661 internal_status = configuration_status_list[index].get("status")
1662 if internal_status == "READY":
1663 continue
1664 elif internal_status == "BROKEN":
1665 raise LcmException(
1666 "Configuration aborted because dependent charm/s has failed"
1667 )
1668 else:
1669 break
1670 else:
1671 # no dependencies, return
1672 return
1673 await asyncio.sleep(10)
1674 timeout -= 1
1675
1676 raise LcmException("Configuration aborted because dependent charm/s timeout")
1677
1678 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1679 vca_id = None
1680 if db_vnfr:
1681 vca_id = deep_get(db_vnfr, ("vca-id",))
1682 elif db_nsr:
1683 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1684 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1685 return vca_id
1686
1687 async def instantiate_N2VC(
1688 self,
1689 logging_text,
1690 vca_index,
1691 nsi_id,
1692 db_nsr,
1693 db_vnfr,
1694 vdu_id,
1695 kdu_name,
1696 vdu_index,
1697 config_descriptor,
1698 deploy_params,
1699 base_folder,
1700 nslcmop_id,
1701 stage,
1702 vca_type,
1703 vca_name,
1704 ee_config_descriptor,
1705 ):
1706 nsr_id = db_nsr["_id"]
1707 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1708 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1709 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1710 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1711 db_dict = {
1712 "collection": "nsrs",
1713 "filter": {"_id": nsr_id},
1714 "path": db_update_entry,
1715 }
1716 step = ""
1717 try:
1718
1719 element_type = "NS"
1720 element_under_configuration = nsr_id
1721
1722 vnfr_id = None
1723 if db_vnfr:
1724 vnfr_id = db_vnfr["_id"]
1725 osm_config["osm"]["vnf_id"] = vnfr_id
1726
1727 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1728
1729 if vca_type == "native_charm":
1730 index_number = 0
1731 else:
1732 index_number = vdu_index or 0
1733
1734 if vnfr_id:
1735 element_type = "VNF"
1736 element_under_configuration = vnfr_id
1737 namespace += ".{}-{}".format(vnfr_id, index_number)
1738 if vdu_id:
1739 namespace += ".{}-{}".format(vdu_id, index_number)
1740 element_type = "VDU"
1741 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1742 osm_config["osm"]["vdu_id"] = vdu_id
1743 elif kdu_name:
1744 namespace += ".{}".format(kdu_name)
1745 element_type = "KDU"
1746 element_under_configuration = kdu_name
1747 osm_config["osm"]["kdu_name"] = kdu_name
1748
1749 # Get artifact path
1750 if base_folder["pkg-dir"]:
1751 artifact_path = "{}/{}/{}/{}".format(
1752 base_folder["folder"],
1753 base_folder["pkg-dir"],
1754 "charms"
1755 if vca_type
1756 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1757 else "helm-charts",
1758 vca_name,
1759 )
1760 else:
1761 artifact_path = "{}/Scripts/{}/{}/".format(
1762 base_folder["folder"],
1763 "charms"
1764 if vca_type
1765 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1766 else "helm-charts",
1767 vca_name,
1768 )
1769
1770 self.logger.debug("Artifact path > {}".format(artifact_path))
1771
1772 # get initial_config_primitive_list that applies to this element
1773 initial_config_primitive_list = config_descriptor.get(
1774 "initial-config-primitive"
1775 )
1776
1777 self.logger.debug(
1778 "Initial config primitive list > {}".format(
1779 initial_config_primitive_list
1780 )
1781 )
1782
1783 # add config if not present for NS charm
1784 ee_descriptor_id = ee_config_descriptor.get("id")
1785 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1786 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1787 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1788 )
1789
1790 self.logger.debug(
1791 "Initial config primitive list #2 > {}".format(
1792 initial_config_primitive_list
1793 )
1794 )
1795 # n2vc_redesign STEP 3.1
1796 # find old ee_id if exists
1797 ee_id = vca_deployed.get("ee_id")
1798
1799 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1800 # create or register execution environment in VCA
1801 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1802
1803 self._write_configuration_status(
1804 nsr_id=nsr_id,
1805 vca_index=vca_index,
1806 status="CREATING",
1807 element_under_configuration=element_under_configuration,
1808 element_type=element_type,
1809 )
1810
1811 step = "create execution environment"
1812 self.logger.debug(logging_text + step)
1813
1814 ee_id = None
1815 credentials = None
1816 if vca_type == "k8s_proxy_charm":
1817 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1818 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1819 namespace=namespace,
1820 artifact_path=artifact_path,
1821 db_dict=db_dict,
1822 vca_id=vca_id,
1823 )
1824 elif vca_type == "helm" or vca_type == "helm-v3":
1825 ee_id, credentials = await self.vca_map[
1826 vca_type
1827 ].create_execution_environment(
1828 namespace=namespace,
1829 reuse_ee_id=ee_id,
1830 db_dict=db_dict,
1831 config=osm_config,
1832 artifact_path=artifact_path,
1833 vca_type=vca_type,
1834 )
1835 else:
1836 ee_id, credentials = await self.vca_map[
1837 vca_type
1838 ].create_execution_environment(
1839 namespace=namespace,
1840 reuse_ee_id=ee_id,
1841 db_dict=db_dict,
1842 vca_id=vca_id,
1843 )
1844
1845 elif vca_type == "native_charm":
1846 step = "Waiting to VM being up and getting IP address"
1847 self.logger.debug(logging_text + step)
1848 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1849 logging_text,
1850 nsr_id,
1851 vnfr_id,
1852 vdu_id,
1853 vdu_index,
1854 user=None,
1855 pub_key=None,
1856 )
1857 credentials = {"hostname": rw_mgmt_ip}
1858 # get username
1859 username = deep_get(
1860 config_descriptor, ("config-access", "ssh-access", "default-user")
1861 )
1862 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1863 # merged. Meanwhile let's get username from initial-config-primitive
1864 if not username and initial_config_primitive_list:
1865 for config_primitive in initial_config_primitive_list:
1866 for param in config_primitive.get("parameter", ()):
1867 if param["name"] == "ssh-username":
1868 username = param["value"]
1869 break
1870 if not username:
1871 raise LcmException(
1872 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1873 "'config-access.ssh-access.default-user'"
1874 )
1875 credentials["username"] = username
1876 # n2vc_redesign STEP 3.2
1877
1878 self._write_configuration_status(
1879 nsr_id=nsr_id,
1880 vca_index=vca_index,
1881 status="REGISTERING",
1882 element_under_configuration=element_under_configuration,
1883 element_type=element_type,
1884 )
1885
1886 step = "register execution environment {}".format(credentials)
1887 self.logger.debug(logging_text + step)
1888 ee_id = await self.vca_map[vca_type].register_execution_environment(
1889 credentials=credentials,
1890 namespace=namespace,
1891 db_dict=db_dict,
1892 vca_id=vca_id,
1893 )
1894
1895 # for compatibility with MON/POL modules, the need model and application name at database
1896 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1897 ee_id_parts = ee_id.split(".")
1898 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1899 if len(ee_id_parts) >= 2:
1900 model_name = ee_id_parts[0]
1901 application_name = ee_id_parts[1]
1902 db_nsr_update[db_update_entry + "model"] = model_name
1903 db_nsr_update[db_update_entry + "application"] = application_name
1904
1905 # n2vc_redesign STEP 3.3
1906 step = "Install configuration Software"
1907
1908 self._write_configuration_status(
1909 nsr_id=nsr_id,
1910 vca_index=vca_index,
1911 status="INSTALLING SW",
1912 element_under_configuration=element_under_configuration,
1913 element_type=element_type,
1914 other_update=db_nsr_update,
1915 )
1916
1917 # TODO check if already done
1918 self.logger.debug(logging_text + step)
1919 config = None
1920 if vca_type == "native_charm":
1921 config_primitive = next(
1922 (p for p in initial_config_primitive_list if p["name"] == "config"),
1923 None,
1924 )
1925 if config_primitive:
1926 config = self._map_primitive_params(
1927 config_primitive, {}, deploy_params
1928 )
1929 num_units = 1
1930 if vca_type == "lxc_proxy_charm":
1931 if element_type == "NS":
1932 num_units = db_nsr.get("config-units") or 1
1933 elif element_type == "VNF":
1934 num_units = db_vnfr.get("config-units") or 1
1935 elif element_type == "VDU":
1936 for v in db_vnfr["vdur"]:
1937 if vdu_id == v["vdu-id-ref"]:
1938 num_units = v.get("config-units") or 1
1939 break
1940 if vca_type != "k8s_proxy_charm":
1941 await self.vca_map[vca_type].install_configuration_sw(
1942 ee_id=ee_id,
1943 artifact_path=artifact_path,
1944 db_dict=db_dict,
1945 config=config,
1946 num_units=num_units,
1947 vca_id=vca_id,
1948 vca_type=vca_type,
1949 )
1950
1951 # write in db flag of configuration_sw already installed
1952 self.update_db_2(
1953 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1954 )
1955
1956 # add relations for this VCA (wait for other peers related with this VCA)
1957 await self._add_vca_relations(
1958 logging_text=logging_text,
1959 nsr_id=nsr_id,
1960 vca_type=vca_type,
1961 vca_index=vca_index,
1962 )
1963
1964 # if SSH access is required, then get execution environment SSH public
1965 # if native charm we have waited already to VM be UP
1966 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1967 pub_key = None
1968 user = None
1969 # self.logger.debug("get ssh key block")
1970 if deep_get(
1971 config_descriptor, ("config-access", "ssh-access", "required")
1972 ):
1973 # self.logger.debug("ssh key needed")
1974 # Needed to inject a ssh key
1975 user = deep_get(
1976 config_descriptor,
1977 ("config-access", "ssh-access", "default-user"),
1978 )
1979 step = "Install configuration Software, getting public ssh key"
1980 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1981 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1982 )
1983
1984 step = "Insert public key into VM user={} ssh_key={}".format(
1985 user, pub_key
1986 )
1987 else:
1988 # self.logger.debug("no need to get ssh key")
1989 step = "Waiting to VM being up and getting IP address"
1990 self.logger.debug(logging_text + step)
1991
1992 # n2vc_redesign STEP 5.1
1993 # wait for RO (ip-address) Insert pub_key into VM
1994 if vnfr_id:
1995 if kdu_name:
1996 rw_mgmt_ip, services = await self.wait_kdu_up(
1997 logging_text, nsr_id, vnfr_id, kdu_name
1998 )
1999 vnfd = self.db.get_one(
2000 "vnfds_revisions",
2001 {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2002 )
2003 kdu = get_kdu(vnfd, kdu_name)
2004 kdu_services = [
2005 service["name"] for service in get_kdu_services(kdu)
2006 ]
2007 exposed_services = []
2008 for service in services:
2009 if any(s in service["name"] for s in kdu_services):
2010 exposed_services.append(service)
2011 await self.vca_map[vca_type].exec_primitive(
2012 ee_id=ee_id,
2013 primitive_name="config",
2014 params_dict={
2015 "osm-config": json.dumps(
2016 OsmConfigBuilder(
2017 k8s={"services": exposed_services}
2018 ).build()
2019 )
2020 },
2021 vca_id=vca_id,
2022 )
2023 else:
2024 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
2025 logging_text,
2026 nsr_id,
2027 vnfr_id,
2028 vdu_id,
2029 vdu_index,
2030 user=user,
2031 pub_key=pub_key,
2032 )
2033
2034 else:
2035 rw_mgmt_ip = None # This is for a NS configuration
2036
2037 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2038
2039 # store rw_mgmt_ip in deploy params for later replacement
2040 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2041
2042 # n2vc_redesign STEP 6 Execute initial config primitive
2043 step = "execute initial config primitive"
2044
2045 # wait for dependent primitives execution (NS -> VNF -> VDU)
2046 if initial_config_primitive_list:
2047 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2048
2049 # stage, in function of element type: vdu, kdu, vnf or ns
2050 my_vca = vca_deployed_list[vca_index]
2051 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2052 # VDU or KDU
2053 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2054 elif my_vca.get("member-vnf-index"):
2055 # VNF
2056 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2057 else:
2058 # NS
2059 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2060
2061 self._write_configuration_status(
2062 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2063 )
2064
2065 self._write_op_status(op_id=nslcmop_id, stage=stage)
2066
2067 check_if_terminated_needed = True
2068 for initial_config_primitive in initial_config_primitive_list:
2069 # adding information on the vca_deployed if it is a NS execution environment
2070 if not vca_deployed["member-vnf-index"]:
2071 deploy_params["ns_config_info"] = json.dumps(
2072 self._get_ns_config_info(nsr_id)
2073 )
2074 # TODO check if already done
2075 primitive_params_ = self._map_primitive_params(
2076 initial_config_primitive, {}, deploy_params
2077 )
2078
2079 step = "execute primitive '{}' params '{}'".format(
2080 initial_config_primitive["name"], primitive_params_
2081 )
2082 self.logger.debug(logging_text + step)
2083 await self.vca_map[vca_type].exec_primitive(
2084 ee_id=ee_id,
2085 primitive_name=initial_config_primitive["name"],
2086 params_dict=primitive_params_,
2087 db_dict=db_dict,
2088 vca_id=vca_id,
2089 vca_type=vca_type,
2090 )
2091 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2092 if check_if_terminated_needed:
2093 if config_descriptor.get("terminate-config-primitive"):
2094 self.update_db_2(
2095 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2096 )
2097 check_if_terminated_needed = False
2098
2099 # TODO register in database that primitive is done
2100
2101 # STEP 7 Configure metrics
2102 if vca_type == "helm" or vca_type == "helm-v3":
2103 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2104 ee_id=ee_id,
2105 artifact_path=artifact_path,
2106 ee_config_descriptor=ee_config_descriptor,
2107 vnfr_id=vnfr_id,
2108 nsr_id=nsr_id,
2109 target_ip=rw_mgmt_ip,
2110 )
2111 if prometheus_jobs:
2112 self.update_db_2(
2113 "nsrs",
2114 nsr_id,
2115 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2116 )
2117
2118 for job in prometheus_jobs:
2119 self.db.set_one(
2120 "prometheus_jobs",
2121 {"job_name": job["job_name"]},
2122 job,
2123 upsert=True,
2124 fail_on_empty=False,
2125 )
2126
2127 step = "instantiated at VCA"
2128 self.logger.debug(logging_text + step)
2129
2130 self._write_configuration_status(
2131 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2132 )
2133
2134 except Exception as e: # TODO not use Exception but N2VC exception
2135 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2136 if not isinstance(
2137 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2138 ):
2139 self.logger.error(
2140 "Exception while {} : {}".format(step, e), exc_info=True
2141 )
2142 self._write_configuration_status(
2143 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2144 )
2145 raise LcmException("{} {}".format(step, e)) from e
2146
2147 def _write_ns_status(
2148 self,
2149 nsr_id: str,
2150 ns_state: str,
2151 current_operation: str,
2152 current_operation_id: str,
2153 error_description: str = None,
2154 error_detail: str = None,
2155 other_update: dict = None,
2156 ):
2157 """
2158 Update db_nsr fields.
2159 :param nsr_id:
2160 :param ns_state:
2161 :param current_operation:
2162 :param current_operation_id:
2163 :param error_description:
2164 :param error_detail:
2165 :param other_update: Other required changes at database if provided, will be cleared
2166 :return:
2167 """
2168 try:
2169 db_dict = other_update or {}
2170 db_dict[
2171 "_admin.nslcmop"
2172 ] = current_operation_id # for backward compatibility
2173 db_dict["_admin.current-operation"] = current_operation_id
2174 db_dict["_admin.operation-type"] = (
2175 current_operation if current_operation != "IDLE" else None
2176 )
2177 db_dict["currentOperation"] = current_operation
2178 db_dict["currentOperationID"] = current_operation_id
2179 db_dict["errorDescription"] = error_description
2180 db_dict["errorDetail"] = error_detail
2181
2182 if ns_state:
2183 db_dict["nsState"] = ns_state
2184 self.update_db_2("nsrs", nsr_id, db_dict)
2185 except DbException as e:
2186 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2187
2188 def _write_op_status(
2189 self,
2190 op_id: str,
2191 stage: list = None,
2192 error_message: str = None,
2193 queuePosition: int = 0,
2194 operation_state: str = None,
2195 other_update: dict = None,
2196 ):
2197 try:
2198 db_dict = other_update or {}
2199 db_dict["queuePosition"] = queuePosition
2200 if isinstance(stage, list):
2201 db_dict["stage"] = stage[0]
2202 db_dict["detailed-status"] = " ".join(stage)
2203 elif stage is not None:
2204 db_dict["stage"] = str(stage)
2205
2206 if error_message is not None:
2207 db_dict["errorMessage"] = error_message
2208 if operation_state is not None:
2209 db_dict["operationState"] = operation_state
2210 db_dict["statusEnteredTime"] = time()
2211 self.update_db_2("nslcmops", op_id, db_dict)
2212 except DbException as e:
2213 self.logger.warn(
2214 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2215 )
2216
2217 def _write_all_config_status(self, db_nsr: dict, status: str):
2218 try:
2219 nsr_id = db_nsr["_id"]
2220 # configurationStatus
2221 config_status = db_nsr.get("configurationStatus")
2222 if config_status:
2223 db_nsr_update = {
2224 "configurationStatus.{}.status".format(index): status
2225 for index, v in enumerate(config_status)
2226 if v
2227 }
2228 # update status
2229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2230
2231 except DbException as e:
2232 self.logger.warn(
2233 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2234 )
2235
2236 def _write_configuration_status(
2237 self,
2238 nsr_id: str,
2239 vca_index: int,
2240 status: str = None,
2241 element_under_configuration: str = None,
2242 element_type: str = None,
2243 other_update: dict = None,
2244 ):
2245
2246 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2247 # .format(vca_index, status))
2248
2249 try:
2250 db_path = "configurationStatus.{}.".format(vca_index)
2251 db_dict = other_update or {}
2252 if status:
2253 db_dict[db_path + "status"] = status
2254 if element_under_configuration:
2255 db_dict[
2256 db_path + "elementUnderConfiguration"
2257 ] = element_under_configuration
2258 if element_type:
2259 db_dict[db_path + "elementType"] = element_type
2260 self.update_db_2("nsrs", nsr_id, db_dict)
2261 except DbException as e:
2262 self.logger.warn(
2263 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2264 status, nsr_id, vca_index, e
2265 )
2266 )
2267
2268 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2269 """
2270 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2271 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2272 Database is used because the result can be obtained from a different LCM worker in case of HA.
2273 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2274 :param db_nslcmop: database content of nslcmop
2275 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2276 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2277 computed 'vim-account-id'
2278 """
2279 modified = False
2280 nslcmop_id = db_nslcmop["_id"]
2281 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2282 if placement_engine == "PLA":
2283 self.logger.debug(
2284 logging_text + "Invoke and wait for placement optimization"
2285 )
2286 await self.msg.aiowrite(
2287 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2288 )
2289 db_poll_interval = 5
2290 wait = db_poll_interval * 10
2291 pla_result = None
2292 while not pla_result and wait >= 0:
2293 await asyncio.sleep(db_poll_interval)
2294 wait -= db_poll_interval
2295 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2296 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2297
2298 if not pla_result:
2299 raise LcmException(
2300 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2301 )
2302
2303 for pla_vnf in pla_result["vnf"]:
2304 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2305 if not pla_vnf.get("vimAccountId") or not vnfr:
2306 continue
2307 modified = True
2308 self.db.set_one(
2309 "vnfrs",
2310 {"_id": vnfr["_id"]},
2311 {"vim-account-id": pla_vnf["vimAccountId"]},
2312 )
2313 # Modifies db_vnfrs
2314 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2315 return modified
2316
2317 def update_nsrs_with_pla_result(self, params):
2318 try:
2319 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2320 self.update_db_2(
2321 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2322 )
2323 except Exception as e:
2324 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2325
2326 async def instantiate(self, nsr_id, nslcmop_id):
2327 """
2328
2329 :param nsr_id: ns instance to deploy
2330 :param nslcmop_id: operation to run
2331 :return:
2332 """
2333
2334 # Try to lock HA task here
2335 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2336 if not task_is_locked_by_me:
2337 self.logger.debug(
2338 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2339 )
2340 return
2341
2342 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2343 self.logger.debug(logging_text + "Enter")
2344
2345 # get all needed from database
2346
2347 # database nsrs record
2348 db_nsr = None
2349
2350 # database nslcmops record
2351 db_nslcmop = None
2352
2353 # update operation on nsrs
2354 db_nsr_update = {}
2355 # update operation on nslcmops
2356 db_nslcmop_update = {}
2357
2358 nslcmop_operation_state = None
2359 db_vnfrs = {} # vnf's info indexed by member-index
2360 # n2vc_info = {}
2361 tasks_dict_info = {} # from task to info text
2362 exc = None
2363 error_list = []
2364 stage = [
2365 "Stage 1/5: preparation of the environment.",
2366 "Waiting for previous operations to terminate.",
2367 "",
2368 ]
2369 # ^ stage, step, VIM progress
2370 try:
2371 # wait for any previous tasks in process
2372 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2373
2374 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2375 stage[1] = "Reading from database."
2376 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2377 db_nsr_update["detailed-status"] = "creating"
2378 db_nsr_update["operational-status"] = "init"
2379 self._write_ns_status(
2380 nsr_id=nsr_id,
2381 ns_state="BUILDING",
2382 current_operation="INSTANTIATING",
2383 current_operation_id=nslcmop_id,
2384 other_update=db_nsr_update,
2385 )
2386 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2387
2388 # read from db: operation
2389 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2390 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2391 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2392 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2393 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2394 )
2395 ns_params = db_nslcmop.get("operationParams")
2396 if ns_params and ns_params.get("timeout_ns_deploy"):
2397 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2398 else:
2399 timeout_ns_deploy = self.timeout.get(
2400 "ns_deploy", self.timeout_ns_deploy
2401 )
2402
2403 # read from db: ns
2404 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2405 self.logger.debug(logging_text + stage[1])
2406 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2407 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2408 self.logger.debug(logging_text + stage[1])
2409 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2410 self.fs.sync(db_nsr["nsd-id"])
2411 db_nsr["nsd"] = nsd
2412 # nsr_name = db_nsr["name"] # TODO short-name??
2413
2414 # read from db: vnf's of this ns
2415 stage[1] = "Getting vnfrs from db."
2416 self.logger.debug(logging_text + stage[1])
2417 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2418
2419 # read from db: vnfd's for every vnf
2420 db_vnfds = [] # every vnfd data
2421
2422 # for each vnf in ns, read vnfd
2423 for vnfr in db_vnfrs_list:
2424 if vnfr.get("kdur"):
2425 kdur_list = []
2426 for kdur in vnfr["kdur"]:
2427 if kdur.get("additionalParams"):
2428 kdur["additionalParams"] = json.loads(
2429 kdur["additionalParams"]
2430 )
2431 kdur_list.append(kdur)
2432 vnfr["kdur"] = kdur_list
2433
2434 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2435 vnfd_id = vnfr["vnfd-id"]
2436 vnfd_ref = vnfr["vnfd-ref"]
2437 self.fs.sync(vnfd_id)
2438
2439 # if we haven't this vnfd, read it from db
2440 if vnfd_id not in db_vnfds:
2441 # read from db
2442 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2443 vnfd_id, vnfd_ref
2444 )
2445 self.logger.debug(logging_text + stage[1])
2446 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2447
2448 # store vnfd
2449 db_vnfds.append(vnfd)
2450
2451 # Get or generates the _admin.deployed.VCA list
2452 vca_deployed_list = None
2453 if db_nsr["_admin"].get("deployed"):
2454 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2455 if vca_deployed_list is None:
2456 vca_deployed_list = []
2457 configuration_status_list = []
2458 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2459 db_nsr_update["configurationStatus"] = configuration_status_list
2460 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2461 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2462 elif isinstance(vca_deployed_list, dict):
2463 # maintain backward compatibility. Change a dict to list at database
2464 vca_deployed_list = list(vca_deployed_list.values())
2465 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2466 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2467
2468 if not isinstance(
2469 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2470 ):
2471 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2472 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2473
2474 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2475 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2476 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2477 self.db.set_list(
2478 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2479 )
2480
2481 # n2vc_redesign STEP 2 Deploy Network Scenario
2482 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2483 self._write_op_status(op_id=nslcmop_id, stage=stage)
2484
2485 stage[1] = "Deploying KDUs."
2486 # self.logger.debug(logging_text + "Before deploy_kdus")
2487 # Call to deploy_kdus in case exists the "vdu:kdu" param
2488 await self.deploy_kdus(
2489 logging_text=logging_text,
2490 nsr_id=nsr_id,
2491 nslcmop_id=nslcmop_id,
2492 db_vnfrs=db_vnfrs,
2493 db_vnfds=db_vnfds,
2494 task_instantiation_info=tasks_dict_info,
2495 )
2496
2497 stage[1] = "Getting VCA public key."
2498 # n2vc_redesign STEP 1 Get VCA public ssh-key
2499 # feature 1429. Add n2vc public key to needed VMs
2500 n2vc_key = self.n2vc.get_public_key()
2501 n2vc_key_list = [n2vc_key]
2502 if self.vca_config.get("public_key"):
2503 n2vc_key_list.append(self.vca_config["public_key"])
2504
2505 stage[1] = "Deploying NS at VIM."
2506 task_ro = asyncio.ensure_future(
2507 self.instantiate_RO(
2508 logging_text=logging_text,
2509 nsr_id=nsr_id,
2510 nsd=nsd,
2511 db_nsr=db_nsr,
2512 db_nslcmop=db_nslcmop,
2513 db_vnfrs=db_vnfrs,
2514 db_vnfds=db_vnfds,
2515 n2vc_key_list=n2vc_key_list,
2516 stage=stage,
2517 )
2518 )
2519 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2520 tasks_dict_info[task_ro] = "Deploying at VIM"
2521
2522 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2523 stage[1] = "Deploying Execution Environments."
2524 self.logger.debug(logging_text + stage[1])
2525
2526 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2527 for vnf_profile in get_vnf_profiles(nsd):
2528 vnfd_id = vnf_profile["vnfd-id"]
2529 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2530 member_vnf_index = str(vnf_profile["id"])
2531 db_vnfr = db_vnfrs[member_vnf_index]
2532 base_folder = vnfd["_admin"]["storage"]
2533 vdu_id = None
2534 vdu_index = 0
2535 vdu_name = None
2536 kdu_name = None
2537
2538 # Get additional parameters
2539 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2540 if db_vnfr.get("additionalParamsForVnf"):
2541 deploy_params.update(
2542 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2543 )
2544
2545 descriptor_config = get_configuration(vnfd, vnfd["id"])
2546 if descriptor_config:
2547 self._deploy_n2vc(
2548 logging_text=logging_text
2549 + "member_vnf_index={} ".format(member_vnf_index),
2550 db_nsr=db_nsr,
2551 db_vnfr=db_vnfr,
2552 nslcmop_id=nslcmop_id,
2553 nsr_id=nsr_id,
2554 nsi_id=nsi_id,
2555 vnfd_id=vnfd_id,
2556 vdu_id=vdu_id,
2557 kdu_name=kdu_name,
2558 member_vnf_index=member_vnf_index,
2559 vdu_index=vdu_index,
2560 vdu_name=vdu_name,
2561 deploy_params=deploy_params,
2562 descriptor_config=descriptor_config,
2563 base_folder=base_folder,
2564 task_instantiation_info=tasks_dict_info,
2565 stage=stage,
2566 )
2567
2568 # Deploy charms for each VDU that supports one.
2569 for vdud in get_vdu_list(vnfd):
2570 vdu_id = vdud["id"]
2571 descriptor_config = get_configuration(vnfd, vdu_id)
2572 vdur = find_in_list(
2573 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2574 )
2575
2576 if vdur.get("additionalParams"):
2577 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2578 else:
2579 deploy_params_vdu = deploy_params
2580 deploy_params_vdu["OSM"] = get_osm_params(
2581 db_vnfr, vdu_id, vdu_count_index=0
2582 )
2583 vdud_count = get_number_of_instances(vnfd, vdu_id)
2584
2585 self.logger.debug("VDUD > {}".format(vdud))
2586 self.logger.debug(
2587 "Descriptor config > {}".format(descriptor_config)
2588 )
2589 if descriptor_config:
2590 vdu_name = None
2591 kdu_name = None
2592 for vdu_index in range(vdud_count):
2593 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2594 self._deploy_n2vc(
2595 logging_text=logging_text
2596 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2597 member_vnf_index, vdu_id, vdu_index
2598 ),
2599 db_nsr=db_nsr,
2600 db_vnfr=db_vnfr,
2601 nslcmop_id=nslcmop_id,
2602 nsr_id=nsr_id,
2603 nsi_id=nsi_id,
2604 vnfd_id=vnfd_id,
2605 vdu_id=vdu_id,
2606 kdu_name=kdu_name,
2607 member_vnf_index=member_vnf_index,
2608 vdu_index=vdu_index,
2609 vdu_name=vdu_name,
2610 deploy_params=deploy_params_vdu,
2611 descriptor_config=descriptor_config,
2612 base_folder=base_folder,
2613 task_instantiation_info=tasks_dict_info,
2614 stage=stage,
2615 )
2616 for kdud in get_kdu_list(vnfd):
2617 kdu_name = kdud["name"]
2618 descriptor_config = get_configuration(vnfd, kdu_name)
2619 if descriptor_config:
2620 vdu_id = None
2621 vdu_index = 0
2622 vdu_name = None
2623 kdur = next(
2624 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2625 )
2626 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2627 if kdur.get("additionalParams"):
2628 deploy_params_kdu.update(
2629 parse_yaml_strings(kdur["additionalParams"].copy())
2630 )
2631
2632 self._deploy_n2vc(
2633 logging_text=logging_text,
2634 db_nsr=db_nsr,
2635 db_vnfr=db_vnfr,
2636 nslcmop_id=nslcmop_id,
2637 nsr_id=nsr_id,
2638 nsi_id=nsi_id,
2639 vnfd_id=vnfd_id,
2640 vdu_id=vdu_id,
2641 kdu_name=kdu_name,
2642 member_vnf_index=member_vnf_index,
2643 vdu_index=vdu_index,
2644 vdu_name=vdu_name,
2645 deploy_params=deploy_params_kdu,
2646 descriptor_config=descriptor_config,
2647 base_folder=base_folder,
2648 task_instantiation_info=tasks_dict_info,
2649 stage=stage,
2650 )
2651
2652 # Check if this NS has a charm configuration
2653 descriptor_config = nsd.get("ns-configuration")
2654 if descriptor_config and descriptor_config.get("juju"):
2655 vnfd_id = None
2656 db_vnfr = None
2657 member_vnf_index = None
2658 vdu_id = None
2659 kdu_name = None
2660 vdu_index = 0
2661 vdu_name = None
2662
2663 # Get additional parameters
2664 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2665 if db_nsr.get("additionalParamsForNs"):
2666 deploy_params.update(
2667 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2668 )
2669 base_folder = nsd["_admin"]["storage"]
2670 self._deploy_n2vc(
2671 logging_text=logging_text,
2672 db_nsr=db_nsr,
2673 db_vnfr=db_vnfr,
2674 nslcmop_id=nslcmop_id,
2675 nsr_id=nsr_id,
2676 nsi_id=nsi_id,
2677 vnfd_id=vnfd_id,
2678 vdu_id=vdu_id,
2679 kdu_name=kdu_name,
2680 member_vnf_index=member_vnf_index,
2681 vdu_index=vdu_index,
2682 vdu_name=vdu_name,
2683 deploy_params=deploy_params,
2684 descriptor_config=descriptor_config,
2685 base_folder=base_folder,
2686 task_instantiation_info=tasks_dict_info,
2687 stage=stage,
2688 )
2689
2690 # rest of staff will be done at finally
2691
2692 except (
2693 ROclient.ROClientException,
2694 DbException,
2695 LcmException,
2696 N2VCException,
2697 ) as e:
2698 self.logger.error(
2699 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2700 )
2701 exc = e
2702 except asyncio.CancelledError:
2703 self.logger.error(
2704 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2705 )
2706 exc = "Operation was cancelled"
2707 except Exception as e:
2708 exc = traceback.format_exc()
2709 self.logger.critical(
2710 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2711 exc_info=True,
2712 )
2713 finally:
2714 if exc:
2715 error_list.append(str(exc))
2716 try:
2717 # wait for pending tasks
2718 if tasks_dict_info:
2719 stage[1] = "Waiting for instantiate pending tasks."
2720 self.logger.debug(logging_text + stage[1])
2721 error_list += await self._wait_for_tasks(
2722 logging_text,
2723 tasks_dict_info,
2724 timeout_ns_deploy,
2725 stage,
2726 nslcmop_id,
2727 nsr_id=nsr_id,
2728 )
2729 stage[1] = stage[2] = ""
2730 except asyncio.CancelledError:
2731 error_list.append("Cancelled")
2732 # TODO cancel all tasks
2733 except Exception as exc:
2734 error_list.append(str(exc))
2735
2736 # update operation-status
2737 db_nsr_update["operational-status"] = "running"
2738 # let's begin with VCA 'configured' status (later we can change it)
2739 db_nsr_update["config-status"] = "configured"
2740 for task, task_name in tasks_dict_info.items():
2741 if not task.done() or task.cancelled() or task.exception():
2742 if task_name.startswith(self.task_name_deploy_vca):
2743 # A N2VC task is pending
2744 db_nsr_update["config-status"] = "failed"
2745 else:
2746 # RO or KDU task is pending
2747 db_nsr_update["operational-status"] = "failed"
2748
2749 # update status at database
2750 if error_list:
2751 error_detail = ". ".join(error_list)
2752 self.logger.error(logging_text + error_detail)
2753 error_description_nslcmop = "{} Detail: {}".format(
2754 stage[0], error_detail
2755 )
2756 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2757 nslcmop_id, stage[0]
2758 )
2759
2760 db_nsr_update["detailed-status"] = (
2761 error_description_nsr + " Detail: " + error_detail
2762 )
2763 db_nslcmop_update["detailed-status"] = error_detail
2764 nslcmop_operation_state = "FAILED"
2765 ns_state = "BROKEN"
2766 else:
2767 error_detail = None
2768 error_description_nsr = error_description_nslcmop = None
2769 ns_state = "READY"
2770 db_nsr_update["detailed-status"] = "Done"
2771 db_nslcmop_update["detailed-status"] = "Done"
2772 nslcmop_operation_state = "COMPLETED"
2773
2774 if db_nsr:
2775 self._write_ns_status(
2776 nsr_id=nsr_id,
2777 ns_state=ns_state,
2778 current_operation="IDLE",
2779 current_operation_id=None,
2780 error_description=error_description_nsr,
2781 error_detail=error_detail,
2782 other_update=db_nsr_update,
2783 )
2784 self._write_op_status(
2785 op_id=nslcmop_id,
2786 stage="",
2787 error_message=error_description_nslcmop,
2788 operation_state=nslcmop_operation_state,
2789 other_update=db_nslcmop_update,
2790 )
2791
2792 if nslcmop_operation_state:
2793 try:
2794 await self.msg.aiowrite(
2795 "ns",
2796 "instantiated",
2797 {
2798 "nsr_id": nsr_id,
2799 "nslcmop_id": nslcmop_id,
2800 "operationState": nslcmop_operation_state,
2801 },
2802 loop=self.loop,
2803 )
2804 except Exception as e:
2805 self.logger.error(
2806 logging_text + "kafka_write notification Exception {}".format(e)
2807 )
2808
2809 self.logger.debug(logging_text + "Exit")
2810 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2811
2812 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2813 if vnfd_id not in cached_vnfds:
2814 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2815 return cached_vnfds[vnfd_id]
2816
2817 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2818 if vnf_profile_id not in cached_vnfrs:
2819 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2820 "vnfrs",
2821 {
2822 "member-vnf-index-ref": vnf_profile_id,
2823 "nsr-id-ref": nsr_id,
2824 },
2825 )
2826 return cached_vnfrs[vnf_profile_id]
2827
2828 def _is_deployed_vca_in_relation(
2829 self, vca: DeployedVCA, relation: Relation
2830 ) -> bool:
2831 found = False
2832 for endpoint in (relation.provider, relation.requirer):
2833 if endpoint["kdu-resource-profile-id"]:
2834 continue
2835 found = (
2836 vca.vnf_profile_id == endpoint.vnf_profile_id
2837 and vca.vdu_profile_id == endpoint.vdu_profile_id
2838 and vca.execution_environment_ref == endpoint.execution_environment_ref
2839 )
2840 if found:
2841 break
2842 return found
2843
2844 def _update_ee_relation_data_with_implicit_data(
2845 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2846 ):
2847 ee_relation_data = safe_get_ee_relation(
2848 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2849 )
2850 ee_relation_level = EELevel.get_level(ee_relation_data)
2851 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2852 "execution-environment-ref"
2853 ]:
2854 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2855 vnfd_id = vnf_profile["vnfd-id"]
2856 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2857 entity_id = (
2858 vnfd_id
2859 if ee_relation_level == EELevel.VNF
2860 else ee_relation_data["vdu-profile-id"]
2861 )
2862 ee = get_juju_ee_ref(db_vnfd, entity_id)
2863 if not ee:
2864 raise Exception(
2865 f"not execution environments found for ee_relation {ee_relation_data}"
2866 )
2867 ee_relation_data["execution-environment-ref"] = ee["id"]
2868 return ee_relation_data
2869
2870 def _get_ns_relations(
2871 self,
2872 nsr_id: str,
2873 nsd: Dict[str, Any],
2874 vca: DeployedVCA,
2875 cached_vnfds: Dict[str, Any],
2876 ) -> List[Relation]:
2877 relations = []
2878 db_ns_relations = get_ns_configuration_relation_list(nsd)
2879 for r in db_ns_relations:
2880 provider_dict = None
2881 requirer_dict = None
2882 if all(key in r for key in ("provider", "requirer")):
2883 provider_dict = r["provider"]
2884 requirer_dict = r["requirer"]
2885 elif "entities" in r:
2886 provider_id = r["entities"][0]["id"]
2887 provider_dict = {
2888 "nsr-id": nsr_id,
2889 "endpoint": r["entities"][0]["endpoint"],
2890 }
2891 if provider_id != nsd["id"]:
2892 provider_dict["vnf-profile-id"] = provider_id
2893 requirer_id = r["entities"][1]["id"]
2894 requirer_dict = {
2895 "nsr-id": nsr_id,
2896 "endpoint": r["entities"][1]["endpoint"],
2897 }
2898 if requirer_id != nsd["id"]:
2899 requirer_dict["vnf-profile-id"] = requirer_id
2900 else:
2901 raise Exception(
2902 "provider/requirer or entities must be included in the relation."
2903 )
2904 relation_provider = self._update_ee_relation_data_with_implicit_data(
2905 nsr_id, nsd, provider_dict, cached_vnfds
2906 )
2907 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2908 nsr_id, nsd, requirer_dict, cached_vnfds
2909 )
2910 provider = EERelation(relation_provider)
2911 requirer = EERelation(relation_requirer)
2912 relation = Relation(r["name"], provider, requirer)
2913 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2914 if vca_in_relation:
2915 relations.append(relation)
2916 return relations
2917
2918 def _get_vnf_relations(
2919 self,
2920 nsr_id: str,
2921 nsd: Dict[str, Any],
2922 vca: DeployedVCA,
2923 cached_vnfds: Dict[str, Any],
2924 ) -> List[Relation]:
2925 relations = []
2926 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2927 vnf_profile_id = vnf_profile["id"]
2928 vnfd_id = vnf_profile["vnfd-id"]
2929 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2930 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2931 for r in db_vnf_relations:
2932 provider_dict = None
2933 requirer_dict = None
2934 if all(key in r for key in ("provider", "requirer")):
2935 provider_dict = r["provider"]
2936 requirer_dict = r["requirer"]
2937 elif "entities" in r:
2938 provider_id = r["entities"][0]["id"]
2939 provider_dict = {
2940 "nsr-id": nsr_id,
2941 "vnf-profile-id": vnf_profile_id,
2942 "endpoint": r["entities"][0]["endpoint"],
2943 }
2944 if provider_id != vnfd_id:
2945 provider_dict["vdu-profile-id"] = provider_id
2946 requirer_id = r["entities"][1]["id"]
2947 requirer_dict = {
2948 "nsr-id": nsr_id,
2949 "vnf-profile-id": vnf_profile_id,
2950 "endpoint": r["entities"][1]["endpoint"],
2951 }
2952 if requirer_id != vnfd_id:
2953 requirer_dict["vdu-profile-id"] = requirer_id
2954 else:
2955 raise Exception(
2956 "provider/requirer or entities must be included in the relation."
2957 )
2958 relation_provider = self._update_ee_relation_data_with_implicit_data(
2959 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2960 )
2961 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2962 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2963 )
2964 provider = EERelation(relation_provider)
2965 requirer = EERelation(relation_requirer)
2966 relation = Relation(r["name"], provider, requirer)
2967 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2968 if vca_in_relation:
2969 relations.append(relation)
2970 return relations
2971
2972 def _get_kdu_resource_data(
2973 self,
2974 ee_relation: EERelation,
2975 db_nsr: Dict[str, Any],
2976 cached_vnfds: Dict[str, Any],
2977 ) -> DeployedK8sResource:
2978 nsd = get_nsd(db_nsr)
2979 vnf_profiles = get_vnf_profiles(nsd)
2980 vnfd_id = find_in_list(
2981 vnf_profiles,
2982 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2983 )["vnfd-id"]
2984 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2985 kdu_resource_profile = get_kdu_resource_profile(
2986 db_vnfd, ee_relation.kdu_resource_profile_id
2987 )
2988 kdu_name = kdu_resource_profile["kdu-name"]
2989 deployed_kdu, _ = get_deployed_kdu(
2990 db_nsr.get("_admin", ()).get("deployed", ()),
2991 kdu_name,
2992 ee_relation.vnf_profile_id,
2993 )
2994 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2995 return deployed_kdu
2996
2997 def _get_deployed_component(
2998 self,
2999 ee_relation: EERelation,
3000 db_nsr: Dict[str, Any],
3001 cached_vnfds: Dict[str, Any],
3002 ) -> DeployedComponent:
3003 nsr_id = db_nsr["_id"]
3004 deployed_component = None
3005 ee_level = EELevel.get_level(ee_relation)
3006 if ee_level == EELevel.NS:
3007 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
3008 if vca:
3009 deployed_component = DeployedVCA(nsr_id, vca)
3010 elif ee_level == EELevel.VNF:
3011 vca = get_deployed_vca(
3012 db_nsr,
3013 {
3014 "vdu_id": None,
3015 "member-vnf-index": ee_relation.vnf_profile_id,
3016 "ee_descriptor_id": ee_relation.execution_environment_ref,
3017 },
3018 )
3019 if vca:
3020 deployed_component = DeployedVCA(nsr_id, vca)
3021 elif ee_level == EELevel.VDU:
3022 vca = get_deployed_vca(
3023 db_nsr,
3024 {
3025 "vdu_id": ee_relation.vdu_profile_id,
3026 "member-vnf-index": ee_relation.vnf_profile_id,
3027 "ee_descriptor_id": ee_relation.execution_environment_ref,
3028 },
3029 )
3030 if vca:
3031 deployed_component = DeployedVCA(nsr_id, vca)
3032 elif ee_level == EELevel.KDU:
3033 kdu_resource_data = self._get_kdu_resource_data(
3034 ee_relation, db_nsr, cached_vnfds
3035 )
3036 if kdu_resource_data:
3037 deployed_component = DeployedK8sResource(kdu_resource_data)
3038 return deployed_component
3039
3040 async def _add_relation(
3041 self,
3042 relation: Relation,
3043 vca_type: str,
3044 db_nsr: Dict[str, Any],
3045 cached_vnfds: Dict[str, Any],
3046 cached_vnfrs: Dict[str, Any],
3047 ) -> bool:
3048 deployed_provider = self._get_deployed_component(
3049 relation.provider, db_nsr, cached_vnfds
3050 )
3051 deployed_requirer = self._get_deployed_component(
3052 relation.requirer, db_nsr, cached_vnfds
3053 )
3054 if (
3055 deployed_provider
3056 and deployed_requirer
3057 and deployed_provider.config_sw_installed
3058 and deployed_requirer.config_sw_installed
3059 ):
3060 provider_db_vnfr = (
3061 self._get_vnfr(
3062 relation.provider.nsr_id,
3063 relation.provider.vnf_profile_id,
3064 cached_vnfrs,
3065 )
3066 if relation.provider.vnf_profile_id
3067 else None
3068 )
3069 requirer_db_vnfr = (
3070 self._get_vnfr(
3071 relation.requirer.nsr_id,
3072 relation.requirer.vnf_profile_id,
3073 cached_vnfrs,
3074 )
3075 if relation.requirer.vnf_profile_id
3076 else None
3077 )
3078 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3079 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3080 provider_relation_endpoint = RelationEndpoint(
3081 deployed_provider.ee_id,
3082 provider_vca_id,
3083 relation.provider.endpoint,
3084 )
3085 requirer_relation_endpoint = RelationEndpoint(
3086 deployed_requirer.ee_id,
3087 requirer_vca_id,
3088 relation.requirer.endpoint,
3089 )
3090 await self.vca_map[vca_type].add_relation(
3091 provider=provider_relation_endpoint,
3092 requirer=requirer_relation_endpoint,
3093 )
3094 # remove entry from relations list
3095 return True
3096 return False
3097
3098 async def _add_vca_relations(
3099 self,
3100 logging_text,
3101 nsr_id,
3102 vca_type: str,
3103 vca_index: int,
3104 timeout: int = 3600,
3105 ) -> bool:
3106
3107 # steps:
3108 # 1. find all relations for this VCA
3109 # 2. wait for other peers related
3110 # 3. add relations
3111
3112 try:
3113 # STEP 1: find all relations for this VCA
3114
3115 # read nsr record
3116 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3117 nsd = get_nsd(db_nsr)
3118
3119 # this VCA data
3120 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3121 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3122
3123 cached_vnfds = {}
3124 cached_vnfrs = {}
3125 relations = []
3126 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3127 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3128
3129 # if no relations, terminate
3130 if not relations:
3131 self.logger.debug(logging_text + " No relations")
3132 return True
3133
3134 self.logger.debug(logging_text + " adding relations {}".format(relations))
3135
3136 # add all relations
3137 start = time()
3138 while True:
3139 # check timeout
3140 now = time()
3141 if now - start >= timeout:
3142 self.logger.error(logging_text + " : timeout adding relations")
3143 return False
3144
3145 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3146 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3147
3148 # for each relation, find the VCA's related
3149 for relation in relations.copy():
3150 added = await self._add_relation(
3151 relation,
3152 vca_type,
3153 db_nsr,
3154 cached_vnfds,
3155 cached_vnfrs,
3156 )
3157 if added:
3158 relations.remove(relation)
3159
3160 if not relations:
3161 self.logger.debug("Relations added")
3162 break
3163 await asyncio.sleep(5.0)
3164
3165 return True
3166
3167 except Exception as e:
3168 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3169 return False
3170
3171 async def _install_kdu(
3172 self,
3173 nsr_id: str,
3174 nsr_db_path: str,
3175 vnfr_data: dict,
3176 kdu_index: int,
3177 kdud: dict,
3178 vnfd: dict,
3179 k8s_instance_info: dict,
3180 k8params: dict = None,
3181 timeout: int = 600,
3182 vca_id: str = None,
3183 ):
3184
3185 try:
3186 k8sclustertype = k8s_instance_info["k8scluster-type"]
3187 # Instantiate kdu
3188 db_dict_install = {
3189 "collection": "nsrs",
3190 "filter": {"_id": nsr_id},
3191 "path": nsr_db_path,
3192 }
3193
3194 if k8s_instance_info.get("kdu-deployment-name"):
3195 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3196 else:
3197 kdu_instance = self.k8scluster_map[
3198 k8sclustertype
3199 ].generate_kdu_instance_name(
3200 db_dict=db_dict_install,
3201 kdu_model=k8s_instance_info["kdu-model"],
3202 kdu_name=k8s_instance_info["kdu-name"],
3203 )
3204 self.update_db_2(
3205 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3206 )
3207 await self.k8scluster_map[k8sclustertype].install(
3208 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3209 kdu_model=k8s_instance_info["kdu-model"],
3210 atomic=True,
3211 params=k8params,
3212 db_dict=db_dict_install,
3213 timeout=timeout,
3214 kdu_name=k8s_instance_info["kdu-name"],
3215 namespace=k8s_instance_info["namespace"],
3216 kdu_instance=kdu_instance,
3217 vca_id=vca_id,
3218 )
3219 self.update_db_2(
3220 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3221 )
3222
3223 # Obtain services to obtain management service ip
3224 services = await self.k8scluster_map[k8sclustertype].get_services(
3225 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3226 kdu_instance=kdu_instance,
3227 namespace=k8s_instance_info["namespace"],
3228 )
3229
3230 # Obtain management service info (if exists)
3231 vnfr_update_dict = {}
3232 kdu_config = get_configuration(vnfd, kdud["name"])
3233 if kdu_config:
3234 target_ee_list = kdu_config.get("execution-environment-list", [])
3235 else:
3236 target_ee_list = []
3237
3238 if services:
3239 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3240 mgmt_services = [
3241 service
3242 for service in kdud.get("service", [])
3243 if service.get("mgmt-service")
3244 ]
3245 for mgmt_service in mgmt_services:
3246 for service in services:
3247 if service["name"].startswith(mgmt_service["name"]):
3248 # Mgmt service found, Obtain service ip
3249 ip = service.get("external_ip", service.get("cluster_ip"))
3250 if isinstance(ip, list) and len(ip) == 1:
3251 ip = ip[0]
3252
3253 vnfr_update_dict[
3254 "kdur.{}.ip-address".format(kdu_index)
3255 ] = ip
3256
3257 # Check if must update also mgmt ip at the vnf
3258 service_external_cp = mgmt_service.get(
3259 "external-connection-point-ref"
3260 )
3261 if service_external_cp:
3262 if (
3263 deep_get(vnfd, ("mgmt-interface", "cp"))
3264 == service_external_cp
3265 ):
3266 vnfr_update_dict["ip-address"] = ip
3267
3268 if find_in_list(
3269 target_ee_list,
3270 lambda ee: ee.get(
3271 "external-connection-point-ref", ""
3272 )
3273 == service_external_cp,
3274 ):
3275 vnfr_update_dict[
3276 "kdur.{}.ip-address".format(kdu_index)
3277 ] = ip
3278 break
3279 else:
3280 self.logger.warn(
3281 "Mgmt service name: {} not found".format(
3282 mgmt_service["name"]
3283 )
3284 )
3285
3286 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3287 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3288
3289 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3290 if (
3291 kdu_config
3292 and kdu_config.get("initial-config-primitive")
3293 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3294 ):
3295 initial_config_primitive_list = kdu_config.get(
3296 "initial-config-primitive"
3297 )
3298 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3299
3300 for initial_config_primitive in initial_config_primitive_list:
3301 primitive_params_ = self._map_primitive_params(
3302 initial_config_primitive, {}, {}
3303 )
3304
3305 await asyncio.wait_for(
3306 self.k8scluster_map[k8sclustertype].exec_primitive(
3307 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3308 kdu_instance=kdu_instance,
3309 primitive_name=initial_config_primitive["name"],
3310 params=primitive_params_,
3311 db_dict=db_dict_install,
3312 vca_id=vca_id,
3313 ),
3314 timeout=timeout,
3315 )
3316
3317 except Exception as e:
3318 # Prepare update db with error and raise exception
3319 try:
3320 self.update_db_2(
3321 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3322 )
3323 self.update_db_2(
3324 "vnfrs",
3325 vnfr_data.get("_id"),
3326 {"kdur.{}.status".format(kdu_index): "ERROR"},
3327 )
3328 except Exception:
3329 # ignore to keep original exception
3330 pass
3331 # reraise original error
3332 raise
3333
3334 return kdu_instance
3335
3336 async def deploy_kdus(
3337 self,
3338 logging_text,
3339 nsr_id,
3340 nslcmop_id,
3341 db_vnfrs,
3342 db_vnfds,
3343 task_instantiation_info,
3344 ):
3345 # Launch kdus if present in the descriptor
3346
3347 k8scluster_id_2_uuic = {
3348 "helm-chart-v3": {},
3349 "helm-chart": {},
3350 "juju-bundle": {},
3351 }
3352
3353 async def _get_cluster_id(cluster_id, cluster_type):
3354 nonlocal k8scluster_id_2_uuic
3355 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3356 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3357
3358 # check if K8scluster is creating and wait look if previous tasks in process
3359 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3360 "k8scluster", cluster_id
3361 )
3362 if task_dependency:
3363 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3364 task_name, cluster_id
3365 )
3366 self.logger.debug(logging_text + text)
3367 await asyncio.wait(task_dependency, timeout=3600)
3368
3369 db_k8scluster = self.db.get_one(
3370 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3371 )
3372 if not db_k8scluster:
3373 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3374
3375 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3376 if not k8s_id:
3377 if cluster_type == "helm-chart-v3":
3378 try:
3379 # backward compatibility for existing clusters that have not been initialized for helm v3
3380 k8s_credentials = yaml.safe_dump(
3381 db_k8scluster.get("credentials")
3382 )
3383 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3384 k8s_credentials, reuse_cluster_uuid=cluster_id
3385 )
3386 db_k8scluster_update = {}
3387 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3388 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3389 db_k8scluster_update[
3390 "_admin.helm-chart-v3.created"
3391 ] = uninstall_sw
3392 db_k8scluster_update[
3393 "_admin.helm-chart-v3.operationalState"
3394 ] = "ENABLED"
3395 self.update_db_2(
3396 "k8sclusters", cluster_id, db_k8scluster_update
3397 )
3398 except Exception as e:
3399 self.logger.error(
3400 logging_text
3401 + "error initializing helm-v3 cluster: {}".format(str(e))
3402 )
3403 raise LcmException(
3404 "K8s cluster '{}' has not been initialized for '{}'".format(
3405 cluster_id, cluster_type
3406 )
3407 )
3408 else:
3409 raise LcmException(
3410 "K8s cluster '{}' has not been initialized for '{}'".format(
3411 cluster_id, cluster_type
3412 )
3413 )
3414 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3415 return k8s_id
3416
3417 logging_text += "Deploy kdus: "
3418 step = ""
3419 try:
3420 db_nsr_update = {"_admin.deployed.K8s": []}
3421 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3422
3423 index = 0
3424 updated_cluster_list = []
3425 updated_v3_cluster_list = []
3426
3427 for vnfr_data in db_vnfrs.values():
3428 vca_id = self.get_vca_id(vnfr_data, {})
3429 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3430 # Step 0: Prepare and set parameters
3431 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3432 vnfd_id = vnfr_data.get("vnfd-id")
3433 vnfd_with_id = find_in_list(
3434 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3435 )
3436 kdud = next(
3437 kdud
3438 for kdud in vnfd_with_id["kdu"]
3439 if kdud["name"] == kdur["kdu-name"]
3440 )
3441 namespace = kdur.get("k8s-namespace")
3442 kdu_deployment_name = kdur.get("kdu-deployment-name")
3443 if kdur.get("helm-chart"):
3444 kdumodel = kdur["helm-chart"]
3445 # Default version: helm3, if helm-version is v2 assign v2
3446 k8sclustertype = "helm-chart-v3"
3447 self.logger.debug("kdur: {}".format(kdur))
3448 if (
3449 kdur.get("helm-version")
3450 and kdur.get("helm-version") == "v2"
3451 ):
3452 k8sclustertype = "helm-chart"
3453 elif kdur.get("juju-bundle"):
3454 kdumodel = kdur["juju-bundle"]
3455 k8sclustertype = "juju-bundle"
3456 else:
3457 raise LcmException(
3458 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3459 "juju-bundle. Maybe an old NBI version is running".format(
3460 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3461 )
3462 )
3463 # check if kdumodel is a file and exists
3464 try:
3465 vnfd_with_id = find_in_list(
3466 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3467 )
3468 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3469 if storage: # may be not present if vnfd has not artifacts
3470 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3471 if storage["pkg-dir"]:
3472 filename = "{}/{}/{}s/{}".format(
3473 storage["folder"],
3474 storage["pkg-dir"],
3475 k8sclustertype,
3476 kdumodel,
3477 )
3478 else:
3479 filename = "{}/Scripts/{}s/{}".format(
3480 storage["folder"],
3481 k8sclustertype,
3482 kdumodel,
3483 )
3484 if self.fs.file_exists(
3485 filename, mode="file"
3486 ) or self.fs.file_exists(filename, mode="dir"):
3487 kdumodel = self.fs.path + filename
3488 except (asyncio.TimeoutError, asyncio.CancelledError):
3489 raise
3490 except Exception: # it is not a file
3491 pass
3492
3493 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3494 step = "Synchronize repos for k8s cluster '{}'".format(
3495 k8s_cluster_id
3496 )
3497 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3498
3499 # Synchronize repos
3500 if (
3501 k8sclustertype == "helm-chart"
3502 and cluster_uuid not in updated_cluster_list
3503 ) or (
3504 k8sclustertype == "helm-chart-v3"
3505 and cluster_uuid not in updated_v3_cluster_list
3506 ):
3507 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3508 self.k8scluster_map[k8sclustertype].synchronize_repos(
3509 cluster_uuid=cluster_uuid
3510 )
3511 )
3512 if del_repo_list or added_repo_dict:
3513 if k8sclustertype == "helm-chart":
3514 unset = {
3515 "_admin.helm_charts_added." + item: None
3516 for item in del_repo_list
3517 }
3518 updated = {
3519 "_admin.helm_charts_added." + item: name
3520 for item, name in added_repo_dict.items()
3521 }
3522 updated_cluster_list.append(cluster_uuid)
3523 elif k8sclustertype == "helm-chart-v3":
3524 unset = {
3525 "_admin.helm_charts_v3_added." + item: None
3526 for item in del_repo_list
3527 }
3528 updated = {
3529 "_admin.helm_charts_v3_added." + item: name
3530 for item, name in added_repo_dict.items()
3531 }
3532 updated_v3_cluster_list.append(cluster_uuid)
3533 self.logger.debug(
3534 logging_text + "repos synchronized on k8s cluster "
3535 "'{}' to_delete: {}, to_add: {}".format(
3536 k8s_cluster_id, del_repo_list, added_repo_dict
3537 )
3538 )
3539 self.db.set_one(
3540 "k8sclusters",
3541 {"_id": k8s_cluster_id},
3542 updated,
3543 unset=unset,
3544 )
3545
3546 # Instantiate kdu
3547 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3548 vnfr_data["member-vnf-index-ref"],
3549 kdur["kdu-name"],
3550 k8s_cluster_id,
3551 )
3552 k8s_instance_info = {
3553 "kdu-instance": None,
3554 "k8scluster-uuid": cluster_uuid,
3555 "k8scluster-type": k8sclustertype,
3556 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3557 "kdu-name": kdur["kdu-name"],
3558 "kdu-model": kdumodel,
3559 "namespace": namespace,
3560 "kdu-deployment-name": kdu_deployment_name,
3561 }
3562 db_path = "_admin.deployed.K8s.{}".format(index)
3563 db_nsr_update[db_path] = k8s_instance_info
3564 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3565 vnfd_with_id = find_in_list(
3566 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3567 )
3568 task = asyncio.ensure_future(
3569 self._install_kdu(
3570 nsr_id,
3571 db_path,
3572 vnfr_data,
3573 kdu_index,
3574 kdud,
3575 vnfd_with_id,
3576 k8s_instance_info,
3577 k8params=desc_params,
3578 timeout=600,
3579 vca_id=vca_id,
3580 )
3581 )
3582 self.lcm_tasks.register(
3583 "ns",
3584 nsr_id,
3585 nslcmop_id,
3586 "instantiate_KDU-{}".format(index),
3587 task,
3588 )
3589 task_instantiation_info[task] = "Deploying KDU {}".format(
3590 kdur["kdu-name"]
3591 )
3592
3593 index += 1
3594
3595 except (LcmException, asyncio.CancelledError):
3596 raise
3597 except Exception as e:
3598 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3599 if isinstance(e, (N2VCException, DbException)):
3600 self.logger.error(logging_text + msg)
3601 else:
3602 self.logger.critical(logging_text + msg, exc_info=True)
3603 raise LcmException(msg)
3604 finally:
3605 if db_nsr_update:
3606 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3607
3608 def _deploy_n2vc(
3609 self,
3610 logging_text,
3611 db_nsr,
3612 db_vnfr,
3613 nslcmop_id,
3614 nsr_id,
3615 nsi_id,
3616 vnfd_id,
3617 vdu_id,
3618 kdu_name,
3619 member_vnf_index,
3620 vdu_index,
3621 vdu_name,
3622 deploy_params,
3623 descriptor_config,
3624 base_folder,
3625 task_instantiation_info,
3626 stage,
3627 ):
3628 # launch instantiate_N2VC in a asyncio task and register task object
3629 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3630 # if not found, create one entry and update database
3631 # fill db_nsr._admin.deployed.VCA.<index>
3632
3633 self.logger.debug(
3634 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3635 )
3636 if "execution-environment-list" in descriptor_config:
3637 ee_list = descriptor_config.get("execution-environment-list", [])
3638 elif "juju" in descriptor_config:
3639 ee_list = [descriptor_config] # ns charms
3640 else: # other types as script are not supported
3641 ee_list = []
3642
3643 for ee_item in ee_list:
3644 self.logger.debug(
3645 logging_text
3646 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3647 ee_item.get("juju"), ee_item.get("helm-chart")
3648 )
3649 )
3650 ee_descriptor_id = ee_item.get("id")
3651 if ee_item.get("juju"):
3652 vca_name = ee_item["juju"].get("charm")
3653 vca_type = (
3654 "lxc_proxy_charm"
3655 if ee_item["juju"].get("charm") is not None
3656 else "native_charm"
3657 )
3658 if ee_item["juju"].get("cloud") == "k8s":
3659 vca_type = "k8s_proxy_charm"
3660 elif ee_item["juju"].get("proxy") is False:
3661 vca_type = "native_charm"
3662 elif ee_item.get("helm-chart"):
3663 vca_name = ee_item["helm-chart"]
3664 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3665 vca_type = "helm"
3666 else:
3667 vca_type = "helm-v3"
3668 else:
3669 self.logger.debug(
3670 logging_text + "skipping non juju neither charm configuration"
3671 )
3672 continue
3673
3674 vca_index = -1
3675 for vca_index, vca_deployed in enumerate(
3676 db_nsr["_admin"]["deployed"]["VCA"]
3677 ):
3678 if not vca_deployed:
3679 continue
3680 if (
3681 vca_deployed.get("member-vnf-index") == member_vnf_index
3682 and vca_deployed.get("vdu_id") == vdu_id
3683 and vca_deployed.get("kdu_name") == kdu_name
3684 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3685 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3686 ):
3687 break
3688 else:
3689 # not found, create one.
3690 target = (
3691 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3692 )
3693 if vdu_id:
3694 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3695 elif kdu_name:
3696 target += "/kdu/{}".format(kdu_name)
3697 vca_deployed = {
3698 "target_element": target,
3699 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3700 "member-vnf-index": member_vnf_index,
3701 "vdu_id": vdu_id,
3702 "kdu_name": kdu_name,
3703 "vdu_count_index": vdu_index,
3704 "operational-status": "init", # TODO revise
3705 "detailed-status": "", # TODO revise
3706 "step": "initial-deploy", # TODO revise
3707 "vnfd_id": vnfd_id,
3708 "vdu_name": vdu_name,
3709 "type": vca_type,
3710 "ee_descriptor_id": ee_descriptor_id,
3711 }
3712 vca_index += 1
3713
3714 # create VCA and configurationStatus in db
3715 db_dict = {
3716 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3717 "configurationStatus.{}".format(vca_index): dict(),
3718 }
3719 self.update_db_2("nsrs", nsr_id, db_dict)
3720
3721 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3722
3723 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3724 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3725 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3726
3727 # Launch task
3728 task_n2vc = asyncio.ensure_future(
3729 self.instantiate_N2VC(
3730 logging_text=logging_text,
3731 vca_index=vca_index,
3732 nsi_id=nsi_id,
3733 db_nsr=db_nsr,
3734 db_vnfr=db_vnfr,
3735 vdu_id=vdu_id,
3736 kdu_name=kdu_name,
3737 vdu_index=vdu_index,
3738 deploy_params=deploy_params,
3739 config_descriptor=descriptor_config,
3740 base_folder=base_folder,
3741 nslcmop_id=nslcmop_id,
3742 stage=stage,
3743 vca_type=vca_type,
3744 vca_name=vca_name,
3745 ee_config_descriptor=ee_item,
3746 )
3747 )
3748 self.lcm_tasks.register(
3749 "ns",
3750 nsr_id,
3751 nslcmop_id,
3752 "instantiate_N2VC-{}".format(vca_index),
3753 task_n2vc,
3754 )
3755 task_instantiation_info[
3756 task_n2vc
3757 ] = self.task_name_deploy_vca + " {}.{}".format(
3758 member_vnf_index or "", vdu_id or ""
3759 )
3760
3761 @staticmethod
3762 def _create_nslcmop(nsr_id, operation, params):
3763 """
3764 Creates a ns-lcm-opp content to be stored at database.
3765 :param nsr_id: internal id of the instance
3766 :param operation: instantiate, terminate, scale, action, ...
3767 :param params: user parameters for the operation
3768 :return: dictionary following SOL005 format
3769 """
3770 # Raise exception if invalid arguments
3771 if not (nsr_id and operation and params):
3772 raise LcmException(
3773 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3774 )
3775 now = time()
3776 _id = str(uuid4())
3777 nslcmop = {
3778 "id": _id,
3779 "_id": _id,
3780 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3781 "operationState": "PROCESSING",
3782 "statusEnteredTime": now,
3783 "nsInstanceId": nsr_id,
3784 "lcmOperationType": operation,
3785 "startTime": now,
3786 "isAutomaticInvocation": False,
3787 "operationParams": params,
3788 "isCancelPending": False,
3789 "links": {
3790 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3791 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3792 },
3793 }
3794 return nslcmop
3795
3796 def _format_additional_params(self, params):
3797 params = params or {}
3798 for key, value in params.items():
3799 if str(value).startswith("!!yaml "):
3800 params[key] = yaml.safe_load(value[7:])
3801 return params
3802
3803 def _get_terminate_primitive_params(self, seq, vnf_index):
3804 primitive = seq.get("name")
3805 primitive_params = {}
3806 params = {
3807 "member_vnf_index": vnf_index,
3808 "primitive": primitive,
3809 "primitive_params": primitive_params,
3810 }
3811 desc_params = {}
3812 return self._map_primitive_params(seq, params, desc_params)
3813
3814 # sub-operations
3815
3816 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3817 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3818 if op.get("operationState") == "COMPLETED":
3819 # b. Skip sub-operation
3820 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3821 return self.SUBOPERATION_STATUS_SKIP
3822 else:
3823 # c. retry executing sub-operation
3824 # The sub-operation exists, and operationState != 'COMPLETED'
3825 # Update operationState = 'PROCESSING' to indicate a retry.
3826 operationState = "PROCESSING"
3827 detailed_status = "In progress"
3828 self._update_suboperation_status(
3829 db_nslcmop, op_index, operationState, detailed_status
3830 )
3831 # Return the sub-operation index
3832 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3833 # with arguments extracted from the sub-operation
3834 return op_index
3835
3836 # Find a sub-operation where all keys in a matching dictionary must match
3837 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3838 def _find_suboperation(self, db_nslcmop, match):
3839 if db_nslcmop and match:
3840 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3841 for i, op in enumerate(op_list):
3842 if all(op.get(k) == match[k] for k in match):
3843 return i
3844 return self.SUBOPERATION_STATUS_NOT_FOUND
3845
3846 # Update status for a sub-operation given its index
3847 def _update_suboperation_status(
3848 self, db_nslcmop, op_index, operationState, detailed_status
3849 ):
3850 # Update DB for HA tasks
3851 q_filter = {"_id": db_nslcmop["_id"]}
3852 update_dict = {
3853 "_admin.operations.{}.operationState".format(op_index): operationState,
3854 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3855 }
3856 self.db.set_one(
3857 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3858 )
3859
3860 # Add sub-operation, return the index of the added sub-operation
3861 # Optionally, set operationState, detailed-status, and operationType
3862 # Status and type are currently set for 'scale' sub-operations:
3863 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3864 # 'detailed-status' : status message
3865 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3866 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3867 def _add_suboperation(
3868 self,
3869 db_nslcmop,
3870 vnf_index,
3871 vdu_id,
3872 vdu_count_index,
3873 vdu_name,
3874 primitive,
3875 mapped_primitive_params,
3876 operationState=None,
3877 detailed_status=None,
3878 operationType=None,
3879 RO_nsr_id=None,
3880 RO_scaling_info=None,
3881 ):
3882 if not db_nslcmop:
3883 return self.SUBOPERATION_STATUS_NOT_FOUND
3884 # Get the "_admin.operations" list, if it exists
3885 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3886 op_list = db_nslcmop_admin.get("operations")
3887 # Create or append to the "_admin.operations" list
3888 new_op = {
3889 "member_vnf_index": vnf_index,
3890 "vdu_id": vdu_id,
3891 "vdu_count_index": vdu_count_index,
3892 "primitive": primitive,
3893 "primitive_params": mapped_primitive_params,
3894 }
3895 if operationState:
3896 new_op["operationState"] = operationState
3897 if detailed_status:
3898 new_op["detailed-status"] = detailed_status
3899 if operationType:
3900 new_op["lcmOperationType"] = operationType
3901 if RO_nsr_id:
3902 new_op["RO_nsr_id"] = RO_nsr_id
3903 if RO_scaling_info:
3904 new_op["RO_scaling_info"] = RO_scaling_info
3905 if not op_list:
3906 # No existing operations, create key 'operations' with current operation as first list element
3907 db_nslcmop_admin.update({"operations": [new_op]})
3908 op_list = db_nslcmop_admin.get("operations")
3909 else:
3910 # Existing operations, append operation to list
3911 op_list.append(new_op)
3912
3913 db_nslcmop_update = {"_admin.operations": op_list}
3914 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3915 op_index = len(op_list) - 1
3916 return op_index
3917
3918 # Helper methods for scale() sub-operations
3919
3920 # pre-scale/post-scale:
3921 # Check for 3 different cases:
3922 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3923 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3924 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3925 def _check_or_add_scale_suboperation(
3926 self,
3927 db_nslcmop,
3928 vnf_index,
3929 vnf_config_primitive,
3930 primitive_params,
3931 operationType,
3932 RO_nsr_id=None,
3933 RO_scaling_info=None,
3934 ):
3935 # Find this sub-operation
3936 if RO_nsr_id and RO_scaling_info:
3937 operationType = "SCALE-RO"
3938 match = {
3939 "member_vnf_index": vnf_index,
3940 "RO_nsr_id": RO_nsr_id,
3941 "RO_scaling_info": RO_scaling_info,
3942 }
3943 else:
3944 match = {
3945 "member_vnf_index": vnf_index,
3946 "primitive": vnf_config_primitive,
3947 "primitive_params": primitive_params,
3948 "lcmOperationType": operationType,
3949 }
3950 op_index = self._find_suboperation(db_nslcmop, match)
3951 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3952 # a. New sub-operation
3953 # The sub-operation does not exist, add it.
3954 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3955 # The following parameters are set to None for all kind of scaling:
3956 vdu_id = None
3957 vdu_count_index = None
3958 vdu_name = None
3959 if RO_nsr_id and RO_scaling_info:
3960 vnf_config_primitive = None
3961 primitive_params = None
3962 else:
3963 RO_nsr_id = None
3964 RO_scaling_info = None
3965 # Initial status for sub-operation
3966 operationState = "PROCESSING"
3967 detailed_status = "In progress"
3968 # Add sub-operation for pre/post-scaling (zero or more operations)
3969 self._add_suboperation(
3970 db_nslcmop,
3971 vnf_index,
3972 vdu_id,
3973 vdu_count_index,
3974 vdu_name,
3975 vnf_config_primitive,
3976 primitive_params,
3977 operationState,
3978 detailed_status,
3979 operationType,
3980 RO_nsr_id,
3981 RO_scaling_info,
3982 )
3983 return self.SUBOPERATION_STATUS_NEW
3984 else:
3985 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3986 # or op_index (operationState != 'COMPLETED')
3987 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3988
3989 # Function to return execution_environment id
3990
3991 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3992 # TODO vdu_index_count
3993 for vca in vca_deployed_list:
3994 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3995 return vca["ee_id"]
3996
3997 async def destroy_N2VC(
3998 self,
3999 logging_text,
4000 db_nslcmop,
4001 vca_deployed,
4002 config_descriptor,
4003 vca_index,
4004 destroy_ee=True,
4005 exec_primitives=True,
4006 scaling_in=False,
4007 vca_id: str = None,
4008 ):
4009 """
4010 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4011 :param logging_text:
4012 :param db_nslcmop:
4013 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4014 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4015 :param vca_index: index in the database _admin.deployed.VCA
4016 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4017 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4018 not executed properly
4019 :param scaling_in: True destroys the application, False destroys the model
4020 :return: None or exception
4021 """
4022
4023 self.logger.debug(
4024 logging_text
4025 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4026 vca_index, vca_deployed, config_descriptor, destroy_ee
4027 )
4028 )
4029
4030 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4031
4032 # execute terminate_primitives
4033 if exec_primitives:
4034 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4035 config_descriptor.get("terminate-config-primitive"),
4036 vca_deployed.get("ee_descriptor_id"),
4037 )
4038 vdu_id = vca_deployed.get("vdu_id")
4039 vdu_count_index = vca_deployed.get("vdu_count_index")
4040 vdu_name = vca_deployed.get("vdu_name")
4041 vnf_index = vca_deployed.get("member-vnf-index")
4042 if terminate_primitives and vca_deployed.get("needed_terminate"):
4043 for seq in terminate_primitives:
4044 # For each sequence in list, get primitive and call _ns_execute_primitive()
4045 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4046 vnf_index, seq.get("name")
4047 )
4048 self.logger.debug(logging_text + step)
4049 # Create the primitive for each sequence, i.e. "primitive": "touch"
4050 primitive = seq.get("name")
4051 mapped_primitive_params = self._get_terminate_primitive_params(
4052 seq, vnf_index
4053 )
4054
4055 # Add sub-operation
4056 self._add_suboperation(
4057 db_nslcmop,
4058 vnf_index,
4059 vdu_id,
4060 vdu_count_index,
4061 vdu_name,
4062 primitive,
4063 mapped_primitive_params,
4064 )
4065 # Sub-operations: Call _ns_execute_primitive() instead of action()
4066 try:
4067 result, result_detail = await self._ns_execute_primitive(
4068 vca_deployed["ee_id"],
4069 primitive,
4070 mapped_primitive_params,
4071 vca_type=vca_type,
4072 vca_id=vca_id,
4073 )
4074 except LcmException:
4075 # this happens when VCA is not deployed. In this case it is not needed to terminate
4076 continue
4077 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4078 if result not in result_ok:
4079 raise LcmException(
4080 "terminate_primitive {} for vnf_member_index={} fails with "
4081 "error {}".format(seq.get("name"), vnf_index, result_detail)
4082 )
4083 # set that this VCA do not need terminated
4084 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4085 vca_index
4086 )
4087 self.update_db_2(
4088 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4089 )
4090
4091 # Delete Prometheus Jobs if any
4092 # This uses NSR_ID, so it will destroy any jobs under this index
4093 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4094
4095 if destroy_ee:
4096 await self.vca_map[vca_type].delete_execution_environment(
4097 vca_deployed["ee_id"],
4098 scaling_in=scaling_in,
4099 vca_type=vca_type,
4100 vca_id=vca_id,
4101 )
4102
4103 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4104 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4105 namespace = "." + db_nsr["_id"]
4106 try:
4107 await self.n2vc.delete_namespace(
4108 namespace=namespace,
4109 total_timeout=self.timeout_charm_delete,
4110 vca_id=vca_id,
4111 )
4112 except N2VCNotFound: # already deleted. Skip
4113 pass
4114 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4115
4116 async def _terminate_RO(
4117 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4118 ):
4119 """
4120 Terminates a deployment from RO
4121 :param logging_text:
4122 :param nsr_deployed: db_nsr._admin.deployed
4123 :param nsr_id:
4124 :param nslcmop_id:
4125 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4126 this method will update only the index 2, but it will write on database the concatenated content of the list
4127 :return:
4128 """
4129 db_nsr_update = {}
4130 failed_detail = []
4131 ro_nsr_id = ro_delete_action = None
4132 if nsr_deployed and nsr_deployed.get("RO"):
4133 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4134 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4135 try:
4136 if ro_nsr_id:
4137 stage[2] = "Deleting ns from VIM."
4138 db_nsr_update["detailed-status"] = " ".join(stage)
4139 self._write_op_status(nslcmop_id, stage)
4140 self.logger.debug(logging_text + stage[2])
4141 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4142 self._write_op_status(nslcmop_id, stage)
4143 desc = await self.RO.delete("ns", ro_nsr_id)
4144 ro_delete_action = desc["action_id"]
4145 db_nsr_update[
4146 "_admin.deployed.RO.nsr_delete_action_id"
4147 ] = ro_delete_action
4148 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4149 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4150 if ro_delete_action:
4151 # wait until NS is deleted from VIM
4152 stage[2] = "Waiting ns deleted from VIM."
4153 detailed_status_old = None
4154 self.logger.debug(
4155 logging_text
4156 + stage[2]
4157 + " RO_id={} ro_delete_action={}".format(
4158 ro_nsr_id, ro_delete_action
4159 )
4160 )
4161 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4162 self._write_op_status(nslcmop_id, stage)
4163
4164 delete_timeout = 20 * 60 # 20 minutes
4165 while delete_timeout > 0:
4166 desc = await self.RO.show(
4167 "ns",
4168 item_id_name=ro_nsr_id,
4169 extra_item="action",
4170 extra_item_id=ro_delete_action,
4171 )
4172
4173 # deploymentStatus
4174 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4175
4176 ns_status, ns_status_info = self.RO.check_action_status(desc)
4177 if ns_status == "ERROR":
4178 raise ROclient.ROClientException(ns_status_info)
4179 elif ns_status == "BUILD":
4180 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4181 elif ns_status == "ACTIVE":
4182 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4183 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4184 break
4185 else:
4186 assert (
4187 False
4188 ), "ROclient.check_action_status returns unknown {}".format(
4189 ns_status
4190 )
4191 if stage[2] != detailed_status_old:
4192 detailed_status_old = stage[2]
4193 db_nsr_update["detailed-status"] = " ".join(stage)
4194 self._write_op_status(nslcmop_id, stage)
4195 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4196 await asyncio.sleep(5, loop=self.loop)
4197 delete_timeout -= 5
4198 else: # delete_timeout <= 0:
4199 raise ROclient.ROClientException(
4200 "Timeout waiting ns deleted from VIM"
4201 )
4202
4203 except Exception as e:
4204 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4205 if (
4206 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4207 ): # not found
4208 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4209 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4210 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4211 self.logger.debug(
4212 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4213 )
4214 elif (
4215 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4216 ): # conflict
4217 failed_detail.append("delete conflict: {}".format(e))
4218 self.logger.debug(
4219 logging_text
4220 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4221 )
4222 else:
4223 failed_detail.append("delete error: {}".format(e))
4224 self.logger.error(
4225 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4226 )
4227
4228 # Delete nsd
4229 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4230 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4231 try:
4232 stage[2] = "Deleting nsd from RO."
4233 db_nsr_update["detailed-status"] = " ".join(stage)
4234 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4235 self._write_op_status(nslcmop_id, stage)
4236 await self.RO.delete("nsd", ro_nsd_id)
4237 self.logger.debug(
4238 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4239 )
4240 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4241 except Exception as e:
4242 if (
4243 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4244 ): # not found
4245 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4246 self.logger.debug(
4247 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4248 )
4249 elif (
4250 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4251 ): # conflict
4252 failed_detail.append(
4253 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4254 )
4255 self.logger.debug(logging_text + failed_detail[-1])
4256 else:
4257 failed_detail.append(
4258 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4259 )
4260 self.logger.error(logging_text + failed_detail[-1])
4261
4262 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4263 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4264 if not vnf_deployed or not vnf_deployed["id"]:
4265 continue
4266 try:
4267 ro_vnfd_id = vnf_deployed["id"]
4268 stage[
4269 2
4270 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4271 vnf_deployed["member-vnf-index"], ro_vnfd_id
4272 )
4273 db_nsr_update["detailed-status"] = " ".join(stage)
4274 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4275 self._write_op_status(nslcmop_id, stage)
4276 await self.RO.delete("vnfd", ro_vnfd_id)
4277 self.logger.debug(
4278 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4279 )
4280 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4281 except Exception as e:
4282 if (
4283 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4284 ): # not found
4285 db_nsr_update[
4286 "_admin.deployed.RO.vnfd.{}.id".format(index)
4287 ] = None
4288 self.logger.debug(
4289 logging_text
4290 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4291 )
4292 elif (
4293 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4294 ): # conflict
4295 failed_detail.append(
4296 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4297 )
4298 self.logger.debug(logging_text + failed_detail[-1])
4299 else:
4300 failed_detail.append(
4301 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4302 )
4303 self.logger.error(logging_text + failed_detail[-1])
4304
4305 if failed_detail:
4306 stage[2] = "Error deleting from VIM"
4307 else:
4308 stage[2] = "Deleted from VIM"
4309 db_nsr_update["detailed-status"] = " ".join(stage)
4310 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4311 self._write_op_status(nslcmop_id, stage)
4312
4313 if failed_detail:
4314 raise LcmException("; ".join(failed_detail))
4315
4316 async def terminate(self, nsr_id, nslcmop_id):
4317 # Try to lock HA task here
4318 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4319 if not task_is_locked_by_me:
4320 return
4321
4322 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4323 self.logger.debug(logging_text + "Enter")
4324 timeout_ns_terminate = self.timeout_ns_terminate
4325 db_nsr = None
4326 db_nslcmop = None
4327 operation_params = None
4328 exc = None
4329 error_list = [] # annotates all failed error messages
4330 db_nslcmop_update = {}
4331 autoremove = False # autoremove after terminated
4332 tasks_dict_info = {}
4333 db_nsr_update = {}
4334 stage = [
4335 "Stage 1/3: Preparing task.",
4336 "Waiting for previous operations to terminate.",
4337 "",
4338 ]
4339 # ^ contains [stage, step, VIM-status]
4340 try:
4341 # wait for any previous tasks in process
4342 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4343
4344 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4345 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4346 operation_params = db_nslcmop.get("operationParams") or {}
4347 if operation_params.get("timeout_ns_terminate"):
4348 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4349 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4350 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4351
4352 db_nsr_update["operational-status"] = "terminating"
4353 db_nsr_update["config-status"] = "terminating"
4354 self._write_ns_status(
4355 nsr_id=nsr_id,
4356 ns_state="TERMINATING",
4357 current_operation="TERMINATING",
4358 current_operation_id=nslcmop_id,
4359 other_update=db_nsr_update,
4360 )
4361 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4362 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4363 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4364 return
4365
4366 stage[1] = "Getting vnf descriptors from db."
4367 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4368 db_vnfrs_dict = {
4369 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4370 }
4371 db_vnfds_from_id = {}
4372 db_vnfds_from_member_index = {}
4373 # Loop over VNFRs
4374 for vnfr in db_vnfrs_list:
4375 vnfd_id = vnfr["vnfd-id"]
4376 if vnfd_id not in db_vnfds_from_id:
4377 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4378 db_vnfds_from_id[vnfd_id] = vnfd
4379 db_vnfds_from_member_index[
4380 vnfr["member-vnf-index-ref"]
4381 ] = db_vnfds_from_id[vnfd_id]
4382
4383 # Destroy individual execution environments when there are terminating primitives.
4384 # Rest of EE will be deleted at once
4385 # TODO - check before calling _destroy_N2VC
4386 # if not operation_params.get("skip_terminate_primitives"):#
4387 # or not vca.get("needed_terminate"):
4388 stage[0] = "Stage 2/3 execute terminating primitives."
4389 self.logger.debug(logging_text + stage[0])
4390 stage[1] = "Looking execution environment that needs terminate."
4391 self.logger.debug(logging_text + stage[1])
4392
4393 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4394 config_descriptor = None
4395 vca_member_vnf_index = vca.get("member-vnf-index")
4396 vca_id = self.get_vca_id(
4397 db_vnfrs_dict.get(vca_member_vnf_index)
4398 if vca_member_vnf_index
4399 else None,
4400 db_nsr,
4401 )
4402 if not vca or not vca.get("ee_id"):
4403 continue
4404 if not vca.get("member-vnf-index"):
4405 # ns
4406 config_descriptor = db_nsr.get("ns-configuration")
4407 elif vca.get("vdu_id"):
4408 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4409 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4410 elif vca.get("kdu_name"):
4411 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4412 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4413 else:
4414 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4415 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4416 vca_type = vca.get("type")
4417 exec_terminate_primitives = not operation_params.get(
4418 "skip_terminate_primitives"
4419 ) and vca.get("needed_terminate")
4420 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4421 # pending native charms
4422 destroy_ee = (
4423 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4424 )
4425 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4426 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4427 task = asyncio.ensure_future(
4428 self.destroy_N2VC(
4429 logging_text,
4430 db_nslcmop,
4431 vca,
4432 config_descriptor,
4433 vca_index,
4434 destroy_ee,
4435 exec_terminate_primitives,
4436 vca_id=vca_id,
4437 )
4438 )
4439 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4440
4441 # wait for pending tasks of terminate primitives
4442 if tasks_dict_info:
4443 self.logger.debug(
4444 logging_text
4445 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4446 )
4447 error_list = await self._wait_for_tasks(
4448 logging_text,
4449 tasks_dict_info,
4450 min(self.timeout_charm_delete, timeout_ns_terminate),
4451 stage,
4452 nslcmop_id,
4453 )
4454 tasks_dict_info.clear()
4455 if error_list:
4456 return # raise LcmException("; ".join(error_list))
4457
4458 # remove All execution environments at once
4459 stage[0] = "Stage 3/3 delete all."
4460
4461 if nsr_deployed.get("VCA"):
4462 stage[1] = "Deleting all execution environments."
4463 self.logger.debug(logging_text + stage[1])
4464 vca_id = self.get_vca_id({}, db_nsr)
4465 task_delete_ee = asyncio.ensure_future(
4466 asyncio.wait_for(
4467 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4468 timeout=self.timeout_charm_delete,
4469 )
4470 )
4471 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4472 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4473
4474 # Delete from k8scluster
4475 stage[1] = "Deleting KDUs."
4476 self.logger.debug(logging_text + stage[1])
4477 # print(nsr_deployed)
4478 for kdu in get_iterable(nsr_deployed, "K8s"):
4479 if not kdu or not kdu.get("kdu-instance"):
4480 continue
4481 kdu_instance = kdu.get("kdu-instance")
4482 if kdu.get("k8scluster-type") in self.k8scluster_map:
4483 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4484 vca_id = self.get_vca_id({}, db_nsr)
4485 task_delete_kdu_instance = asyncio.ensure_future(
4486 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4487 cluster_uuid=kdu.get("k8scluster-uuid"),
4488 kdu_instance=kdu_instance,
4489 vca_id=vca_id,
4490 namespace=kdu.get("namespace"),
4491 )
4492 )
4493 else:
4494 self.logger.error(
4495 logging_text
4496 + "Unknown k8s deployment type {}".format(
4497 kdu.get("k8scluster-type")
4498 )
4499 )
4500 continue
4501 tasks_dict_info[
4502 task_delete_kdu_instance
4503 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4504
4505 # remove from RO
4506 stage[1] = "Deleting ns from VIM."
4507 if self.ng_ro:
4508 task_delete_ro = asyncio.ensure_future(
4509 self._terminate_ng_ro(
4510 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4511 )
4512 )
4513 else:
4514 task_delete_ro = asyncio.ensure_future(
4515 self._terminate_RO(
4516 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4517 )
4518 )
4519 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4520
4521 # rest of staff will be done at finally
4522
4523 except (
4524 ROclient.ROClientException,
4525 DbException,
4526 LcmException,
4527 N2VCException,
4528 ) as e:
4529 self.logger.error(logging_text + "Exit Exception {}".format(e))
4530 exc = e
4531 except asyncio.CancelledError:
4532 self.logger.error(
4533 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4534 )
4535 exc = "Operation was cancelled"
4536 except Exception as e:
4537 exc = traceback.format_exc()
4538 self.logger.critical(
4539 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4540 exc_info=True,
4541 )
4542 finally:
4543 if exc:
4544 error_list.append(str(exc))
4545 try:
4546 # wait for pending tasks
4547 if tasks_dict_info:
4548 stage[1] = "Waiting for terminate pending tasks."
4549 self.logger.debug(logging_text + stage[1])
4550 error_list += await self._wait_for_tasks(
4551 logging_text,
4552 tasks_dict_info,
4553 timeout_ns_terminate,
4554 stage,
4555 nslcmop_id,
4556 )
4557 stage[1] = stage[2] = ""
4558 except asyncio.CancelledError:
4559 error_list.append("Cancelled")
4560 # TODO cancell all tasks
4561 except Exception as exc:
4562 error_list.append(str(exc))
4563 # update status at database
4564 if error_list:
4565 error_detail = "; ".join(error_list)
4566 # self.logger.error(logging_text + error_detail)
4567 error_description_nslcmop = "{} Detail: {}".format(
4568 stage[0], error_detail
4569 )
4570 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4571 nslcmop_id, stage[0]
4572 )
4573
4574 db_nsr_update["operational-status"] = "failed"
4575 db_nsr_update["detailed-status"] = (
4576 error_description_nsr + " Detail: " + error_detail
4577 )
4578 db_nslcmop_update["detailed-status"] = error_detail
4579 nslcmop_operation_state = "FAILED"
4580 ns_state = "BROKEN"
4581 else:
4582 error_detail = None
4583 error_description_nsr = error_description_nslcmop = None
4584 ns_state = "NOT_INSTANTIATED"
4585 db_nsr_update["operational-status"] = "terminated"
4586 db_nsr_update["detailed-status"] = "Done"
4587 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4588 db_nslcmop_update["detailed-status"] = "Done"
4589 nslcmop_operation_state = "COMPLETED"
4590
4591 if db_nsr:
4592 self._write_ns_status(
4593 nsr_id=nsr_id,
4594 ns_state=ns_state,
4595 current_operation="IDLE",
4596 current_operation_id=None,
4597 error_description=error_description_nsr,
4598 error_detail=error_detail,
4599 other_update=db_nsr_update,
4600 )
4601 self._write_op_status(
4602 op_id=nslcmop_id,
4603 stage="",
4604 error_message=error_description_nslcmop,
4605 operation_state=nslcmop_operation_state,
4606 other_update=db_nslcmop_update,
4607 )
4608 if ns_state == "NOT_INSTANTIATED":
4609 try:
4610 self.db.set_list(
4611 "vnfrs",
4612 {"nsr-id-ref": nsr_id},
4613 {"_admin.nsState": "NOT_INSTANTIATED"},
4614 )
4615 except DbException as e:
4616 self.logger.warn(
4617 logging_text
4618 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4619 nsr_id, e
4620 )
4621 )
4622 if operation_params:
4623 autoremove = operation_params.get("autoremove", False)
4624 if nslcmop_operation_state:
4625 try:
4626 await self.msg.aiowrite(
4627 "ns",
4628 "terminated",
4629 {
4630 "nsr_id": nsr_id,
4631 "nslcmop_id": nslcmop_id,
4632 "operationState": nslcmop_operation_state,
4633 "autoremove": autoremove,
4634 },
4635 loop=self.loop,
4636 )
4637 except Exception as e:
4638 self.logger.error(
4639 logging_text + "kafka_write notification Exception {}".format(e)
4640 )
4641
4642 self.logger.debug(logging_text + "Exit")
4643 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4644
4645 async def _wait_for_tasks(
4646 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4647 ):
4648 time_start = time()
4649 error_detail_list = []
4650 error_list = []
4651 pending_tasks = list(created_tasks_info.keys())
4652 num_tasks = len(pending_tasks)
4653 num_done = 0
4654 stage[1] = "{}/{}.".format(num_done, num_tasks)
4655 self._write_op_status(nslcmop_id, stage)
4656 while pending_tasks:
4657 new_error = None
4658 _timeout = timeout + time_start - time()
4659 done, pending_tasks = await asyncio.wait(
4660 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4661 )
4662 num_done += len(done)
4663 if not done: # Timeout
4664 for task in pending_tasks:
4665 new_error = created_tasks_info[task] + ": Timeout"
4666 error_detail_list.append(new_error)
4667 error_list.append(new_error)
4668 break
4669 for task in done:
4670 if task.cancelled():
4671 exc = "Cancelled"
4672 else:
4673 exc = task.exception()
4674 if exc:
4675 if isinstance(exc, asyncio.TimeoutError):
4676 exc = "Timeout"
4677 new_error = created_tasks_info[task] + ": {}".format(exc)
4678 error_list.append(created_tasks_info[task])
4679 error_detail_list.append(new_error)
4680 if isinstance(
4681 exc,
4682 (
4683 str,
4684 DbException,
4685 N2VCException,
4686 ROclient.ROClientException,
4687 LcmException,
4688 K8sException,
4689 NgRoException,
4690 ),
4691 ):
4692 self.logger.error(logging_text + new_error)
4693 else:
4694 exc_traceback = "".join(
4695 traceback.format_exception(None, exc, exc.__traceback__)
4696 )
4697 self.logger.error(
4698 logging_text
4699 + created_tasks_info[task]
4700 + " "
4701 + exc_traceback
4702 )
4703 else:
4704 self.logger.debug(
4705 logging_text + created_tasks_info[task] + ": Done"
4706 )
4707 stage[1] = "{}/{}.".format(num_done, num_tasks)
4708 if new_error:
4709 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4710 if nsr_id: # update also nsr
4711 self.update_db_2(
4712 "nsrs",
4713 nsr_id,
4714 {
4715 "errorDescription": "Error at: " + ", ".join(error_list),
4716 "errorDetail": ". ".join(error_detail_list),
4717 },
4718 )
4719 self._write_op_status(nslcmop_id, stage)
4720 return error_detail_list
4721
4722 @staticmethod
4723 def _map_primitive_params(primitive_desc, params, instantiation_params):
4724 """
4725 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4726 The default-value is used. If it is between < > it look for a value at instantiation_params
4727 :param primitive_desc: portion of VNFD/NSD that describes primitive
4728 :param params: Params provided by user
4729 :param instantiation_params: Instantiation params provided by user
4730 :return: a dictionary with the calculated params
4731 """
4732 calculated_params = {}
4733 for parameter in primitive_desc.get("parameter", ()):
4734 param_name = parameter["name"]
4735 if param_name in params:
4736 calculated_params[param_name] = params[param_name]
4737 elif "default-value" in parameter or "value" in parameter:
4738 if "value" in parameter:
4739 calculated_params[param_name] = parameter["value"]
4740 else:
4741 calculated_params[param_name] = parameter["default-value"]
4742 if (
4743 isinstance(calculated_params[param_name], str)
4744 and calculated_params[param_name].startswith("<")
4745 and calculated_params[param_name].endswith(">")
4746 ):
4747 if calculated_params[param_name][1:-1] in instantiation_params:
4748 calculated_params[param_name] = instantiation_params[
4749 calculated_params[param_name][1:-1]
4750 ]
4751 else:
4752 raise LcmException(
4753 "Parameter {} needed to execute primitive {} not provided".format(
4754 calculated_params[param_name], primitive_desc["name"]
4755 )
4756 )
4757 else:
4758 raise LcmException(
4759 "Parameter {} needed to execute primitive {} not provided".format(
4760 param_name, primitive_desc["name"]
4761 )
4762 )
4763
4764 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4765 calculated_params[param_name] = yaml.safe_dump(
4766 calculated_params[param_name], default_flow_style=True, width=256
4767 )
4768 elif isinstance(calculated_params[param_name], str) and calculated_params[
4769 param_name
4770 ].startswith("!!yaml "):
4771 calculated_params[param_name] = calculated_params[param_name][7:]
4772 if parameter.get("data-type") == "INTEGER":
4773 try:
4774 calculated_params[param_name] = int(calculated_params[param_name])
4775 except ValueError: # error converting string to int
4776 raise LcmException(
4777 "Parameter {} of primitive {} must be integer".format(
4778 param_name, primitive_desc["name"]
4779 )
4780 )
4781 elif parameter.get("data-type") == "BOOLEAN":
4782 calculated_params[param_name] = not (
4783 (str(calculated_params[param_name])).lower() == "false"
4784 )
4785
4786 # add always ns_config_info if primitive name is config
4787 if primitive_desc["name"] == "config":
4788 if "ns_config_info" in instantiation_params:
4789 calculated_params["ns_config_info"] = instantiation_params[
4790 "ns_config_info"
4791 ]
4792 return calculated_params
4793
4794 def _look_for_deployed_vca(
4795 self,
4796 deployed_vca,
4797 member_vnf_index,
4798 vdu_id,
4799 vdu_count_index,
4800 kdu_name=None,
4801 ee_descriptor_id=None,
4802 ):
4803 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4804 for vca in deployed_vca:
4805 if not vca:
4806 continue
4807 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4808 continue
4809 if (
4810 vdu_count_index is not None
4811 and vdu_count_index != vca["vdu_count_index"]
4812 ):
4813 continue
4814 if kdu_name and kdu_name != vca["kdu_name"]:
4815 continue
4816 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4817 continue
4818 break
4819 else:
4820 # vca_deployed not found
4821 raise LcmException(
4822 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4823 " is not deployed".format(
4824 member_vnf_index,
4825 vdu_id,
4826 vdu_count_index,
4827 kdu_name,
4828 ee_descriptor_id,
4829 )
4830 )
4831 # get ee_id
4832 ee_id = vca.get("ee_id")
4833 vca_type = vca.get(
4834 "type", "lxc_proxy_charm"
4835 ) # default value for backward compatibility - proxy charm
4836 if not ee_id:
4837 raise LcmException(
4838 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4839 "execution environment".format(
4840 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4841 )
4842 )
4843 return ee_id, vca_type
4844
4845 async def _ns_execute_primitive(
4846 self,
4847 ee_id,
4848 primitive,
4849 primitive_params,
4850 retries=0,
4851 retries_interval=30,
4852 timeout=None,
4853 vca_type=None,
4854 db_dict=None,
4855 vca_id: str = None,
4856 ) -> (str, str):
4857 try:
4858 if primitive == "config":
4859 primitive_params = {"params": primitive_params}
4860
4861 vca_type = vca_type or "lxc_proxy_charm"
4862
4863 while retries >= 0:
4864 try:
4865 output = await asyncio.wait_for(
4866 self.vca_map[vca_type].exec_primitive(
4867 ee_id=ee_id,
4868 primitive_name=primitive,
4869 params_dict=primitive_params,
4870 progress_timeout=self.timeout_progress_primitive,
4871 total_timeout=self.timeout_primitive,
4872 db_dict=db_dict,
4873 vca_id=vca_id,
4874 vca_type=vca_type,
4875 ),
4876 timeout=timeout or self.timeout_primitive,
4877 )
4878 # execution was OK
4879 break
4880 except asyncio.CancelledError:
4881 raise
4882 except Exception as e: # asyncio.TimeoutError
4883 if isinstance(e, asyncio.TimeoutError):
4884 e = "Timeout"
4885 retries -= 1
4886 if retries >= 0:
4887 self.logger.debug(
4888 "Error executing action {} on {} -> {}".format(
4889 primitive, ee_id, e
4890 )
4891 )
4892 # wait and retry
4893 await asyncio.sleep(retries_interval, loop=self.loop)
4894 else:
4895 return "FAILED", str(e)
4896
4897 return "COMPLETED", output
4898
4899 except (LcmException, asyncio.CancelledError):
4900 raise
4901 except Exception as e:
4902 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4903
4904 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4905 """
4906 Updating the vca_status with latest juju information in nsrs record
4907 :param: nsr_id: Id of the nsr
4908 :param: nslcmop_id: Id of the nslcmop
4909 :return: None
4910 """
4911
4912 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4913 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4914 vca_id = self.get_vca_id({}, db_nsr)
4915 if db_nsr["_admin"]["deployed"]["K8s"]:
4916 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4917 cluster_uuid, kdu_instance, cluster_type = (
4918 k8s["k8scluster-uuid"],
4919 k8s["kdu-instance"],
4920 k8s["k8scluster-type"],
4921 )
4922 await self._on_update_k8s_db(
4923 cluster_uuid=cluster_uuid,
4924 kdu_instance=kdu_instance,
4925 filter={"_id": nsr_id},
4926 vca_id=vca_id,
4927 cluster_type=cluster_type,
4928 )
4929 else:
4930 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4931 table, filter = "nsrs", {"_id": nsr_id}
4932 path = "_admin.deployed.VCA.{}.".format(vca_index)
4933 await self._on_update_n2vc_db(table, filter, path, {})
4934
4935 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4936 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4937
4938 async def action(self, nsr_id, nslcmop_id):
4939 # Try to lock HA task here
4940 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4941 if not task_is_locked_by_me:
4942 return
4943
4944 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4945 self.logger.debug(logging_text + "Enter")
4946 # get all needed from database
4947 db_nsr = None
4948 db_nslcmop = None
4949 db_nsr_update = {}
4950 db_nslcmop_update = {}
4951 nslcmop_operation_state = None
4952 error_description_nslcmop = None
4953 exc = None
4954 try:
4955 # wait for any previous tasks in process
4956 step = "Waiting for previous operations to terminate"
4957 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4958
4959 self._write_ns_status(
4960 nsr_id=nsr_id,
4961 ns_state=None,
4962 current_operation="RUNNING ACTION",
4963 current_operation_id=nslcmop_id,
4964 )
4965
4966 step = "Getting information from database"
4967 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4968 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4969 if db_nslcmop["operationParams"].get("primitive_params"):
4970 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4971 db_nslcmop["operationParams"]["primitive_params"]
4972 )
4973
4974 nsr_deployed = db_nsr["_admin"].get("deployed")
4975 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4976 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4977 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4978 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4979 primitive = db_nslcmop["operationParams"]["primitive"]
4980 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4981 timeout_ns_action = db_nslcmop["operationParams"].get(
4982 "timeout_ns_action", self.timeout_primitive
4983 )
4984
4985 if vnf_index:
4986 step = "Getting vnfr from database"
4987 db_vnfr = self.db.get_one(
4988 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4989 )
4990 if db_vnfr.get("kdur"):
4991 kdur_list = []
4992 for kdur in db_vnfr["kdur"]:
4993 if kdur.get("additionalParams"):
4994 kdur["additionalParams"] = json.loads(
4995 kdur["additionalParams"]
4996 )
4997 kdur_list.append(kdur)
4998 db_vnfr["kdur"] = kdur_list
4999 step = "Getting vnfd from database"
5000 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5001
5002 # Sync filesystem before running a primitive
5003 self.fs.sync(db_vnfr["vnfd-id"])
5004 else:
5005 step = "Getting nsd from database"
5006 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5007
5008 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5009 # for backward compatibility
5010 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5011 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5012 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5013 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5014
5015 # look for primitive
5016 config_primitive_desc = descriptor_configuration = None
5017 if vdu_id:
5018 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5019 elif kdu_name:
5020 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5021 elif vnf_index:
5022 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5023 else:
5024 descriptor_configuration = db_nsd.get("ns-configuration")
5025
5026 if descriptor_configuration and descriptor_configuration.get(
5027 "config-primitive"
5028 ):
5029 for config_primitive in descriptor_configuration["config-primitive"]:
5030 if config_primitive["name"] == primitive:
5031 config_primitive_desc = config_primitive
5032 break
5033
5034 if not config_primitive_desc:
5035 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5036 raise LcmException(
5037 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5038 primitive
5039 )
5040 )
5041 primitive_name = primitive
5042 ee_descriptor_id = None
5043 else:
5044 primitive_name = config_primitive_desc.get(
5045 "execution-environment-primitive", primitive
5046 )
5047 ee_descriptor_id = config_primitive_desc.get(
5048 "execution-environment-ref"
5049 )
5050
5051 if vnf_index:
5052 if vdu_id:
5053 vdur = next(
5054 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5055 )
5056 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5057 elif kdu_name:
5058 kdur = next(
5059 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5060 )
5061 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5062 else:
5063 desc_params = parse_yaml_strings(
5064 db_vnfr.get("additionalParamsForVnf")
5065 )
5066 else:
5067 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5068 if kdu_name and get_configuration(db_vnfd, kdu_name):
5069 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5070 actions = set()
5071 for primitive in kdu_configuration.get("initial-config-primitive", []):
5072 actions.add(primitive["name"])
5073 for primitive in kdu_configuration.get("config-primitive", []):
5074 actions.add(primitive["name"])
5075 kdu = find_in_list(
5076 nsr_deployed["K8s"],
5077 lambda kdu: kdu_name == kdu["kdu-name"]
5078 and kdu["member-vnf-index"] == vnf_index,
5079 )
5080 kdu_action = (
5081 True
5082 if primitive_name in actions
5083 and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5084 else False
5085 )
5086
5087 # TODO check if ns is in a proper status
5088 if kdu_name and (
5089 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5090 ):
5091 # kdur and desc_params already set from before
5092 if primitive_params:
5093 desc_params.update(primitive_params)
5094 # TODO Check if we will need something at vnf level
5095 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5096 if (
5097 kdu_name == kdu["kdu-name"]
5098 and kdu["member-vnf-index"] == vnf_index
5099 ):
5100 break
5101 else:
5102 raise LcmException(
5103 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5104 )
5105
5106 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5107 msg = "unknown k8scluster-type '{}'".format(
5108 kdu.get("k8scluster-type")
5109 )
5110 raise LcmException(msg)
5111
5112 db_dict = {
5113 "collection": "nsrs",
5114 "filter": {"_id": nsr_id},
5115 "path": "_admin.deployed.K8s.{}".format(index),
5116 }
5117 self.logger.debug(
5118 logging_text
5119 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5120 )
5121 step = "Executing kdu {}".format(primitive_name)
5122 if primitive_name == "upgrade":
5123 if desc_params.get("kdu_model"):
5124 kdu_model = desc_params.get("kdu_model")
5125 del desc_params["kdu_model"]
5126 else:
5127 kdu_model = kdu.get("kdu-model")
5128 parts = kdu_model.split(sep=":")
5129 if len(parts) == 2:
5130 kdu_model = parts[0]
5131
5132 detailed_status = await asyncio.wait_for(
5133 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5134 cluster_uuid=kdu.get("k8scluster-uuid"),
5135 kdu_instance=kdu.get("kdu-instance"),
5136 atomic=True,
5137 kdu_model=kdu_model,
5138 params=desc_params,
5139 db_dict=db_dict,
5140 timeout=timeout_ns_action,
5141 ),
5142 timeout=timeout_ns_action + 10,
5143 )
5144 self.logger.debug(
5145 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5146 )
5147 elif primitive_name == "rollback":
5148 detailed_status = await asyncio.wait_for(
5149 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5150 cluster_uuid=kdu.get("k8scluster-uuid"),
5151 kdu_instance=kdu.get("kdu-instance"),
5152 db_dict=db_dict,
5153 ),
5154 timeout=timeout_ns_action,
5155 )
5156 elif primitive_name == "status":
5157 detailed_status = await asyncio.wait_for(
5158 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5159 cluster_uuid=kdu.get("k8scluster-uuid"),
5160 kdu_instance=kdu.get("kdu-instance"),
5161 vca_id=vca_id,
5162 ),
5163 timeout=timeout_ns_action,
5164 )
5165 else:
5166 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5167 kdu["kdu-name"], nsr_id
5168 )
5169 params = self._map_primitive_params(
5170 config_primitive_desc, primitive_params, desc_params
5171 )
5172
5173 detailed_status = await asyncio.wait_for(
5174 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5175 cluster_uuid=kdu.get("k8scluster-uuid"),
5176 kdu_instance=kdu_instance,
5177 primitive_name=primitive_name,
5178 params=params,
5179 db_dict=db_dict,
5180 timeout=timeout_ns_action,
5181 vca_id=vca_id,
5182 ),
5183 timeout=timeout_ns_action,
5184 )
5185
5186 if detailed_status:
5187 nslcmop_operation_state = "COMPLETED"
5188 else:
5189 detailed_status = ""
5190 nslcmop_operation_state = "FAILED"
5191 else:
5192 ee_id, vca_type = self._look_for_deployed_vca(
5193 nsr_deployed["VCA"],
5194 member_vnf_index=vnf_index,
5195 vdu_id=vdu_id,
5196 vdu_count_index=vdu_count_index,
5197 ee_descriptor_id=ee_descriptor_id,
5198 )
5199 for vca_index, vca_deployed in enumerate(
5200 db_nsr["_admin"]["deployed"]["VCA"]
5201 ):
5202 if vca_deployed.get("member-vnf-index") == vnf_index:
5203 db_dict = {
5204 "collection": "nsrs",
5205 "filter": {"_id": nsr_id},
5206 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5207 }
5208 break
5209 (
5210 nslcmop_operation_state,
5211 detailed_status,
5212 ) = await self._ns_execute_primitive(
5213 ee_id,
5214 primitive=primitive_name,
5215 primitive_params=self._map_primitive_params(
5216 config_primitive_desc, primitive_params, desc_params
5217 ),
5218 timeout=timeout_ns_action,
5219 vca_type=vca_type,
5220 db_dict=db_dict,
5221 vca_id=vca_id,
5222 )
5223
5224 db_nslcmop_update["detailed-status"] = detailed_status
5225 error_description_nslcmop = (
5226 detailed_status if nslcmop_operation_state == "FAILED" else ""
5227 )
5228 self.logger.debug(
5229 logging_text
5230 + " task Done with result {} {}".format(
5231 nslcmop_operation_state, detailed_status
5232 )
5233 )
5234 return # database update is called inside finally
5235
5236 except (DbException, LcmException, N2VCException, K8sException) as e:
5237 self.logger.error(logging_text + "Exit Exception {}".format(e))
5238 exc = e
5239 except asyncio.CancelledError:
5240 self.logger.error(
5241 logging_text + "Cancelled Exception while '{}'".format(step)
5242 )
5243 exc = "Operation was cancelled"
5244 except asyncio.TimeoutError:
5245 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5246 exc = "Timeout"
5247 except Exception as e:
5248 exc = traceback.format_exc()
5249 self.logger.critical(
5250 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5251 exc_info=True,
5252 )
5253 finally:
5254 if exc:
5255 db_nslcmop_update[
5256 "detailed-status"
5257 ] = (
5258 detailed_status
5259 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5260 nslcmop_operation_state = "FAILED"
5261 if db_nsr:
5262 self._write_ns_status(
5263 nsr_id=nsr_id,
5264 ns_state=db_nsr[
5265 "nsState"
5266 ], # TODO check if degraded. For the moment use previous status
5267 current_operation="IDLE",
5268 current_operation_id=None,
5269 # error_description=error_description_nsr,
5270 # error_detail=error_detail,
5271 other_update=db_nsr_update,
5272 )
5273
5274 self._write_op_status(
5275 op_id=nslcmop_id,
5276 stage="",
5277 error_message=error_description_nslcmop,
5278 operation_state=nslcmop_operation_state,
5279 other_update=db_nslcmop_update,
5280 )
5281
5282 if nslcmop_operation_state:
5283 try:
5284 await self.msg.aiowrite(
5285 "ns",
5286 "actioned",
5287 {
5288 "nsr_id": nsr_id,
5289 "nslcmop_id": nslcmop_id,
5290 "operationState": nslcmop_operation_state,
5291 },
5292 loop=self.loop,
5293 )
5294 except Exception as e:
5295 self.logger.error(
5296 logging_text + "kafka_write notification Exception {}".format(e)
5297 )
5298 self.logger.debug(logging_text + "Exit")
5299 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5300 return nslcmop_operation_state, detailed_status
5301
5302 async def terminate_vdus(
5303 self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
5304 ):
5305 """This method terminates VDUs
5306
5307 Args:
5308 db_vnfr: VNF instance record
5309 member_vnf_index: VNF index to identify the VDUs to be removed
5310 db_nsr: NS instance record
5311 update_db_nslcmops: Nslcmop update record
5312 """
5313 vca_scaling_info = []
5314 scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5315 scaling_info["scaling_direction"] = "IN"
5316 scaling_info["vdu-delete"] = {}
5317 scaling_info["kdu-delete"] = {}
5318 db_vdur = db_vnfr.get("vdur")
5319 vdur_list = copy(db_vdur)
5320 count_index = 0
5321 for index, vdu in enumerate(vdur_list):
5322 vca_scaling_info.append(
5323 {
5324 "osm_vdu_id": vdu["vdu-id-ref"],
5325 "member-vnf-index": member_vnf_index,
5326 "type": "delete",
5327 "vdu_index": count_index,
5328 })
5329 scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
5330 scaling_info["vdu"].append(
5331 {
5332 "name": vdu.get("name") or vdu.get("vdu-name"),
5333 "vdu_id": vdu["vdu-id-ref"],
5334 "interface": [],
5335 })
5336 for interface in vdu["interfaces"]:
5337 scaling_info["vdu"][index]["interface"].append(
5338 {
5339 "name": interface["name"],
5340 "ip_address": interface["ip-address"],
5341 "mac_address": interface.get("mac-address"),
5342 })
5343 self.logger.info("NS update scaling info{}".format(scaling_info))
5344 stage[2] = "Terminating VDUs"
5345 if scaling_info.get("vdu-delete"):
5346 # scale_process = "RO"
5347 if self.ro_config.get("ng"):
5348 await self._scale_ng_ro(
5349 logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
5350 )
5351
5352 async def remove_vnf(
5353 self, nsr_id, nslcmop_id, vnf_instance_id
5354 ):
5355 """This method is to Remove VNF instances from NS.
5356
5357 Args:
5358 nsr_id: NS instance id
5359 nslcmop_id: nslcmop id of update
5360 vnf_instance_id: id of the VNF instance to be removed
5361
5362 Returns:
5363 result: (str, str) COMPLETED/FAILED, details
5364 """
5365 try:
5366 db_nsr_update = {}
5367 logging_text = "Task ns={} update ".format(nsr_id)
5368 check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
5369 self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
5370 if check_vnfr_count > 1:
5371 stage = ["", "", ""]
5372 step = "Getting nslcmop from database"
5373 self.logger.debug(step + " after having waited for previous tasks to be completed")
5374 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5375 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5376 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5377 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5378 """ db_vnfr = self.db.get_one(
5379 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5380
5381 update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5382 await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
5383
5384 constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
5385 constituent_vnfr.remove(db_vnfr.get("_id"))
5386 db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
5387 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5388 self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
5389 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5390 return "COMPLETED", "Done"
5391 else:
5392 step = "Terminate VNF Failed with"
5393 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5394 vnf_instance_id))
5395 except (LcmException, asyncio.CancelledError):
5396 raise
5397 except Exception as e:
5398 self.logger.debug("Error removing VNF {}".format(e))
5399 return "FAILED", "Error removing VNF {}".format(e)
5400
5401 async def _ns_charm_upgrade(
5402 self,
5403 ee_id,
5404 charm_id,
5405 charm_type,
5406 path,
5407 timeout: float = None,
5408 ) -> (str, str):
5409 """This method upgrade charms in VNF instances
5410
5411 Args:
5412 ee_id: Execution environment id
5413 path: Local path to the charm
5414 charm_id: charm-id
5415 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5416 timeout: (Float) Timeout for the ns update operation
5417
5418 Returns:
5419 result: (str, str) COMPLETED/FAILED, details
5420 """
5421 try:
5422 charm_type = charm_type or "lxc_proxy_charm"
5423 output = await self.vca_map[charm_type].upgrade_charm(
5424 ee_id=ee_id,
5425 path=path,
5426 charm_id=charm_id,
5427 charm_type=charm_type,
5428 timeout=timeout or self.timeout_ns_update,
5429 )
5430
5431 if output:
5432 return "COMPLETED", output
5433
5434 except (LcmException, asyncio.CancelledError):
5435 raise
5436
5437 except Exception as e:
5438
5439 self.logger.debug("Error upgrading charm {}".format(path))
5440
5441 return "FAILED", "Error upgrading charm {}: {}".format(path, e)
5442
5443 async def update(self, nsr_id, nslcmop_id):
5444 """Update NS according to different update types
5445
5446 This method performs upgrade of VNF instances then updates the revision
5447 number in VNF record
5448
5449 Args:
5450 nsr_id: Network service will be updated
5451 nslcmop_id: ns lcm operation id
5452
5453 Returns:
5454 It may raise DbException, LcmException, N2VCException, K8sException
5455
5456 """
5457 # Try to lock HA task here
5458 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5459 if not task_is_locked_by_me:
5460 return
5461
5462 logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
5463 self.logger.debug(logging_text + "Enter")
5464
5465 # Set the required variables to be filled up later
5466 db_nsr = None
5467 db_nslcmop_update = {}
5468 vnfr_update = {}
5469 nslcmop_operation_state = None
5470 db_nsr_update = {}
5471 error_description_nslcmop = ""
5472 exc = None
5473 change_type = "updated"
5474 detailed_status = ""
5475
5476 try:
5477 # wait for any previous tasks in process
5478 step = "Waiting for previous operations to terminate"
5479 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5480 self._write_ns_status(
5481 nsr_id=nsr_id,
5482 ns_state=None,
5483 current_operation="UPDATING",
5484 current_operation_id=nslcmop_id,
5485 )
5486
5487 step = "Getting nslcmop from database"
5488 db_nslcmop = self.db.get_one(
5489 "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
5490 )
5491 update_type = db_nslcmop["operationParams"]["updateType"]
5492
5493 step = "Getting nsr from database"
5494 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5495 old_operational_status = db_nsr["operational-status"]
5496 db_nsr_update["operational-status"] = "updating"
5497 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5498 nsr_deployed = db_nsr["_admin"].get("deployed")
5499
5500 if update_type == "CHANGE_VNFPKG":
5501
5502 # Get the input parameters given through update request
5503 vnf_instance_id = db_nslcmop["operationParams"][
5504 "changeVnfPackageData"
5505 ].get("vnfInstanceId")
5506
5507 vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
5508 "vnfdId"
5509 )
5510 timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
5511
5512 step = "Getting vnfr from database"
5513 db_vnfr = self.db.get_one(
5514 "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
5515 )
5516
5517 step = "Getting vnfds from database"
5518 # Latest VNFD
5519 latest_vnfd = self.db.get_one(
5520 "vnfds", {"_id": vnfd_id}, fail_on_empty=False
5521 )
5522 latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
5523
5524 # Current VNFD
5525 current_vnf_revision = db_vnfr.get("revision", 1)
5526 current_vnfd = self.db.get_one(
5527 "vnfds_revisions",
5528 {"_id": vnfd_id + ":" + str(current_vnf_revision)},
5529 fail_on_empty=False,
5530 )
5531 # Charm artifact paths will be filled up later
5532 (
5533 current_charm_artifact_path,
5534 target_charm_artifact_path,
5535 charm_artifact_paths,
5536 ) = ([], [], [])
5537
5538 step = "Checking if revision has changed in VNFD"
5539 if current_vnf_revision != latest_vnfd_revision:
5540
5541 # There is new revision of VNFD, update operation is required
5542 current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
5543 latest_vnfd_path = vnfd_id
5544
5545 step = "Removing the VNFD packages if they exist in the local path"
5546 shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
5547 shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
5548
5549 step = "Get the VNFD packages from FSMongo"
5550 self.fs.sync(from_path=latest_vnfd_path)
5551 self.fs.sync(from_path=current_vnfd_path)
5552
5553 step = (
5554 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5555 )
5556 base_folder = latest_vnfd["_admin"]["storage"]
5557
5558 for charm_index, charm_deployed in enumerate(
5559 get_iterable(nsr_deployed, "VCA")
5560 ):
5561 vnf_index = db_vnfr.get("member-vnf-index-ref")
5562
5563 # Getting charm-id and charm-type
5564 if charm_deployed.get("member-vnf-index") == vnf_index:
5565 charm_id = self.get_vca_id(db_vnfr, db_nsr)
5566 charm_type = charm_deployed.get("type")
5567
5568 # Getting ee-id
5569 ee_id = charm_deployed.get("ee_id")
5570
5571 step = "Getting descriptor config"
5572 descriptor_config = get_configuration(
5573 current_vnfd, current_vnfd["id"]
5574 )
5575
5576 if "execution-environment-list" in descriptor_config:
5577 ee_list = descriptor_config.get(
5578 "execution-environment-list", []
5579 )
5580 else:
5581 ee_list = []
5582
5583 # There could be several charm used in the same VNF
5584 for ee_item in ee_list:
5585 if ee_item.get("juju"):
5586
5587 step = "Getting charm name"
5588 charm_name = ee_item["juju"].get("charm")
5589
5590 step = "Setting Charm artifact paths"
5591 current_charm_artifact_path.append(
5592 get_charm_artifact_path(
5593 base_folder,
5594 charm_name,
5595 charm_type,
5596 current_vnf_revision,
5597 )
5598 )
5599 target_charm_artifact_path.append(
5600 get_charm_artifact_path(
5601 base_folder,
5602 charm_name,
5603 charm_type,
5604 )
5605 )
5606
5607 charm_artifact_paths = zip(
5608 current_charm_artifact_path, target_charm_artifact_path
5609 )
5610
5611 step = "Checking if software version has changed in VNFD"
5612 if find_software_version(current_vnfd) != find_software_version(
5613 latest_vnfd
5614 ):
5615
5616 step = "Checking if existing VNF has charm"
5617 for current_charm_path, target_charm_path in list(
5618 charm_artifact_paths
5619 ):
5620 if current_charm_path:
5621 raise LcmException(
5622 "Software version change is not supported as VNF instance {} has charm.".format(
5623 vnf_instance_id
5624 )
5625 )
5626
5627 # There is no change in the charm package, then redeploy the VNF
5628 # based on new descriptor
5629 step = "Redeploying VNF"
5630 # This part is in https://osm.etsi.org/gerrit/11943
5631
5632 else:
5633 step = "Checking if any charm package has changed or not"
5634 for current_charm_path, target_charm_path in list(
5635 charm_artifact_paths
5636 ):
5637 if (
5638 current_charm_path
5639 and target_charm_path
5640 and self.check_charm_hash_changed(
5641 current_charm_path, target_charm_path
5642 )
5643 ):
5644
5645 step = "Checking whether VNF uses juju bundle"
5646 if check_juju_bundle_existence(current_vnfd):
5647
5648 raise LcmException(
5649 "Charm upgrade is not supported for the instance which"
5650 " uses juju-bundle: {}".format(
5651 check_juju_bundle_existence(current_vnfd)
5652 )
5653 )
5654
5655 step = "Upgrading Charm"
5656 (
5657 result,
5658 detailed_status,
5659 ) = await self._ns_charm_upgrade(
5660 ee_id=ee_id,
5661 charm_id=charm_id,
5662 charm_type=charm_type,
5663 path=self.fs.path + target_charm_path,
5664 timeout=timeout_seconds,
5665 )
5666
5667 if result == "FAILED":
5668 nslcmop_operation_state = result
5669 error_description_nslcmop = detailed_status
5670
5671 db_nslcmop_update["detailed-status"] = detailed_status
5672 self.logger.debug(
5673 logging_text
5674 + " step {} Done with result {} {}".format(
5675 step, nslcmop_operation_state, detailed_status
5676 )
5677 )
5678
5679 step = "Updating policies"
5680 # This part is in https://osm.etsi.org/gerrit/11943
5681
5682 # If nslcmop_operation_state is None, so any operation is not failed.
5683 if not nslcmop_operation_state:
5684 nslcmop_operation_state = "COMPLETED"
5685
5686 # If update CHANGE_VNFPKG nslcmop_operation is successful
5687 # vnf revision need to be updated
5688 vnfr_update["revision"] = latest_vnfd_revision
5689 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
5690
5691 self.logger.debug(
5692 logging_text
5693 + " task Done with result {} {}".format(
5694 nslcmop_operation_state, detailed_status
5695 )
5696 )
5697 elif update_type == "REMOVE_VNF":
5698 # This part is included in https://osm.etsi.org/gerrit/11876
5699 vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
5700 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5701 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5702 step = "Removing VNF"
5703 (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id)
5704 if result == "FAILED":
5705 nslcmop_operation_state = result
5706 error_description_nslcmop = detailed_status
5707 db_nslcmop_update["detailed-status"] = detailed_status
5708 change_type = "vnf_terminated"
5709 if not nslcmop_operation_state:
5710 nslcmop_operation_state = "COMPLETED"
5711 self.logger.debug(
5712 logging_text
5713 + " task Done with result {} {}".format(
5714 nslcmop_operation_state, detailed_status
5715 )
5716 )
5717
5718 # If nslcmop_operation_state is None, so any operation is not failed.
5719 # All operations are executed in overall.
5720 if not nslcmop_operation_state:
5721 nslcmop_operation_state = "COMPLETED"
5722 db_nsr_update["operational-status"] = old_operational_status
5723
5724 except (DbException, LcmException, N2VCException, K8sException) as e:
5725 self.logger.error(logging_text + "Exit Exception {}".format(e))
5726 exc = e
5727 except asyncio.CancelledError:
5728 self.logger.error(
5729 logging_text + "Cancelled Exception while '{}'".format(step)
5730 )
5731 exc = "Operation was cancelled"
5732 except asyncio.TimeoutError:
5733 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5734 exc = "Timeout"
5735 except Exception as e:
5736 exc = traceback.format_exc()
5737 self.logger.critical(
5738 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5739 exc_info=True,
5740 )
5741 finally:
5742 if exc:
5743 db_nslcmop_update[
5744 "detailed-status"
5745 ] = (
5746 detailed_status
5747 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5748 nslcmop_operation_state = "FAILED"
5749 db_nsr_update["operational-status"] = old_operational_status
5750 if db_nsr:
5751 self._write_ns_status(
5752 nsr_id=nsr_id,
5753 ns_state=db_nsr["nsState"],
5754 current_operation="IDLE",
5755 current_operation_id=None,
5756 other_update=db_nsr_update,
5757 )
5758
5759 self._write_op_status(
5760 op_id=nslcmop_id,
5761 stage="",
5762 error_message=error_description_nslcmop,
5763 operation_state=nslcmop_operation_state,
5764 other_update=db_nslcmop_update,
5765 )
5766
5767 if nslcmop_operation_state:
5768 try:
5769 msg = {
5770 "nsr_id": nsr_id,
5771 "nslcmop_id": nslcmop_id,
5772 "operationState": nslcmop_operation_state,
5773 }
5774 if change_type in ("vnf_terminated"):
5775 msg.update({"vnf_member_index": member_vnf_index})
5776 await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
5777 except Exception as e:
5778 self.logger.error(
5779 logging_text + "kafka_write notification Exception {}".format(e)
5780 )
5781 self.logger.debug(logging_text + "Exit")
5782 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
5783 return nslcmop_operation_state, detailed_status
5784
5785 async def scale(self, nsr_id, nslcmop_id):
5786 # Try to lock HA task here
5787 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5788 if not task_is_locked_by_me:
5789 return
5790
5791 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5792 stage = ["", "", ""]
5793 tasks_dict_info = {}
5794 # ^ stage, step, VIM progress
5795 self.logger.debug(logging_text + "Enter")
5796 # get all needed from database
5797 db_nsr = None
5798 db_nslcmop_update = {}
5799 db_nsr_update = {}
5800 exc = None
5801 # in case of error, indicates what part of scale was failed to put nsr at error status
5802 scale_process = None
5803 old_operational_status = ""
5804 old_config_status = ""
5805 nsi_id = None
5806 try:
5807 # wait for any previous tasks in process
5808 step = "Waiting for previous operations to terminate"
5809 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5810 self._write_ns_status(
5811 nsr_id=nsr_id,
5812 ns_state=None,
5813 current_operation="SCALING",
5814 current_operation_id=nslcmop_id,
5815 )
5816
5817 step = "Getting nslcmop from database"
5818 self.logger.debug(
5819 step + " after having waited for previous tasks to be completed"
5820 )
5821 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5822
5823 step = "Getting nsr from database"
5824 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5825 old_operational_status = db_nsr["operational-status"]
5826 old_config_status = db_nsr["config-status"]
5827
5828 step = "Parsing scaling parameters"
5829 db_nsr_update["operational-status"] = "scaling"
5830 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5831 nsr_deployed = db_nsr["_admin"].get("deployed")
5832
5833 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5834 "scaleByStepData"
5835 ]["member-vnf-index"]
5836 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5837 "scaleByStepData"
5838 ]["scaling-group-descriptor"]
5839 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5840 # for backward compatibility
5841 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5842 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5843 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5844 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5845
5846 step = "Getting vnfr from database"
5847 db_vnfr = self.db.get_one(
5848 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5849 )
5850
5851 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5852
5853 step = "Getting vnfd from database"
5854 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5855
5856 base_folder = db_vnfd["_admin"]["storage"]
5857
5858 step = "Getting scaling-group-descriptor"
5859 scaling_descriptor = find_in_list(
5860 get_scaling_aspect(db_vnfd),
5861 lambda scale_desc: scale_desc["name"] == scaling_group,
5862 )
5863 if not scaling_descriptor:
5864 raise LcmException(
5865 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5866 "at vnfd:scaling-group-descriptor".format(scaling_group)
5867 )
5868
5869 step = "Sending scale order to VIM"
5870 # TODO check if ns is in a proper status
5871 nb_scale_op = 0
5872 if not db_nsr["_admin"].get("scaling-group"):
5873 self.update_db_2(
5874 "nsrs",
5875 nsr_id,
5876 {
5877 "_admin.scaling-group": [
5878 {"name": scaling_group, "nb-scale-op": 0}
5879 ]
5880 },
5881 )
5882 admin_scale_index = 0
5883 else:
5884 for admin_scale_index, admin_scale_info in enumerate(
5885 db_nsr["_admin"]["scaling-group"]
5886 ):
5887 if admin_scale_info["name"] == scaling_group:
5888 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5889 break
5890 else: # not found, set index one plus last element and add new entry with the name
5891 admin_scale_index += 1
5892 db_nsr_update[
5893 "_admin.scaling-group.{}.name".format(admin_scale_index)
5894 ] = scaling_group
5895
5896 vca_scaling_info = []
5897 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5898 if scaling_type == "SCALE_OUT":
5899 if "aspect-delta-details" not in scaling_descriptor:
5900 raise LcmException(
5901 "Aspect delta details not fount in scaling descriptor {}".format(
5902 scaling_descriptor["name"]
5903 )
5904 )
5905 # count if max-instance-count is reached
5906 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5907
5908 scaling_info["scaling_direction"] = "OUT"
5909 scaling_info["vdu-create"] = {}
5910 scaling_info["kdu-create"] = {}
5911 for delta in deltas:
5912 for vdu_delta in delta.get("vdu-delta", {}):
5913 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5914 # vdu_index also provides the number of instance of the targeted vdu
5915 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5916 cloud_init_text = self._get_vdu_cloud_init_content(
5917 vdud, db_vnfd
5918 )
5919 if cloud_init_text:
5920 additional_params = (
5921 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5922 or {}
5923 )
5924 cloud_init_list = []
5925
5926 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5927 max_instance_count = 10
5928 if vdu_profile and "max-number-of-instances" in vdu_profile:
5929 max_instance_count = vdu_profile.get(
5930 "max-number-of-instances", 10
5931 )
5932
5933 default_instance_num = get_number_of_instances(
5934 db_vnfd, vdud["id"]
5935 )
5936 instances_number = vdu_delta.get("number-of-instances", 1)
5937 nb_scale_op += instances_number
5938
5939 new_instance_count = nb_scale_op + default_instance_num
5940 # Control if new count is over max and vdu count is less than max.
5941 # Then assign new instance count
5942 if new_instance_count > max_instance_count > vdu_count:
5943 instances_number = new_instance_count - max_instance_count
5944 else:
5945 instances_number = instances_number
5946
5947 if new_instance_count > max_instance_count:
5948 raise LcmException(
5949 "reached the limit of {} (max-instance-count) "
5950 "scaling-out operations for the "
5951 "scaling-group-descriptor '{}'".format(
5952 nb_scale_op, scaling_group
5953 )
5954 )
5955 for x in range(vdu_delta.get("number-of-instances", 1)):
5956 if cloud_init_text:
5957 # TODO Information of its own ip is not available because db_vnfr is not updated.
5958 additional_params["OSM"] = get_osm_params(
5959 db_vnfr, vdu_delta["id"], vdu_index + x
5960 )
5961 cloud_init_list.append(
5962 self._parse_cloud_init(
5963 cloud_init_text,
5964 additional_params,
5965 db_vnfd["id"],
5966 vdud["id"],
5967 )
5968 )
5969 vca_scaling_info.append(
5970 {
5971 "osm_vdu_id": vdu_delta["id"],
5972 "member-vnf-index": vnf_index,
5973 "type": "create",
5974 "vdu_index": vdu_index + x,
5975 }
5976 )
5977 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5978 for kdu_delta in delta.get("kdu-resource-delta", {}):
5979 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5980 kdu_name = kdu_profile["kdu-name"]
5981 resource_name = kdu_profile.get("resource-name", "")
5982
5983 # Might have different kdus in the same delta
5984 # Should have list for each kdu
5985 if not scaling_info["kdu-create"].get(kdu_name, None):
5986 scaling_info["kdu-create"][kdu_name] = []
5987
5988 kdur = get_kdur(db_vnfr, kdu_name)
5989 if kdur.get("helm-chart"):
5990 k8s_cluster_type = "helm-chart-v3"
5991 self.logger.debug("kdur: {}".format(kdur))
5992 if (
5993 kdur.get("helm-version")
5994 and kdur.get("helm-version") == "v2"
5995 ):
5996 k8s_cluster_type = "helm-chart"
5997 elif kdur.get("juju-bundle"):
5998 k8s_cluster_type = "juju-bundle"
5999 else:
6000 raise LcmException(
6001 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6002 "juju-bundle. Maybe an old NBI version is running".format(
6003 db_vnfr["member-vnf-index-ref"], kdu_name
6004 )
6005 )
6006
6007 max_instance_count = 10
6008 if kdu_profile and "max-number-of-instances" in kdu_profile:
6009 max_instance_count = kdu_profile.get(
6010 "max-number-of-instances", 10
6011 )
6012
6013 nb_scale_op += kdu_delta.get("number-of-instances", 1)
6014 deployed_kdu, _ = get_deployed_kdu(
6015 nsr_deployed, kdu_name, vnf_index
6016 )
6017 if deployed_kdu is None:
6018 raise LcmException(
6019 "KDU '{}' for vnf '{}' not deployed".format(
6020 kdu_name, vnf_index
6021 )
6022 )
6023 kdu_instance = deployed_kdu.get("kdu-instance")
6024 instance_num = await self.k8scluster_map[
6025 k8s_cluster_type
6026 ].get_scale_count(
6027 resource_name,
6028 kdu_instance,
6029 vca_id=vca_id,
6030 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6031 kdu_model=deployed_kdu.get("kdu-model"),
6032 )
6033 kdu_replica_count = instance_num + kdu_delta.get(
6034 "number-of-instances", 1
6035 )
6036
6037 # Control if new count is over max and instance_num is less than max.
6038 # Then assign max instance number to kdu replica count
6039 if kdu_replica_count > max_instance_count > instance_num:
6040 kdu_replica_count = max_instance_count
6041 if kdu_replica_count > max_instance_count:
6042 raise LcmException(
6043 "reached the limit of {} (max-instance-count) "
6044 "scaling-out operations for the "
6045 "scaling-group-descriptor '{}'".format(
6046 instance_num, scaling_group
6047 )
6048 )
6049
6050 for x in range(kdu_delta.get("number-of-instances", 1)):
6051 vca_scaling_info.append(
6052 {
6053 "osm_kdu_id": kdu_name,
6054 "member-vnf-index": vnf_index,
6055 "type": "create",
6056 "kdu_index": instance_num + x - 1,
6057 }
6058 )
6059 scaling_info["kdu-create"][kdu_name].append(
6060 {
6061 "member-vnf-index": vnf_index,
6062 "type": "create",
6063 "k8s-cluster-type": k8s_cluster_type,
6064 "resource-name": resource_name,
6065 "scale": kdu_replica_count,
6066 }
6067 )
6068 elif scaling_type == "SCALE_IN":
6069 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
6070
6071 scaling_info["scaling_direction"] = "IN"
6072 scaling_info["vdu-delete"] = {}
6073 scaling_info["kdu-delete"] = {}
6074
6075 for delta in deltas:
6076 for vdu_delta in delta.get("vdu-delta", {}):
6077 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
6078 min_instance_count = 0
6079 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
6080 if vdu_profile and "min-number-of-instances" in vdu_profile:
6081 min_instance_count = vdu_profile["min-number-of-instances"]
6082
6083 default_instance_num = get_number_of_instances(
6084 db_vnfd, vdu_delta["id"]
6085 )
6086 instance_num = vdu_delta.get("number-of-instances", 1)
6087 nb_scale_op -= instance_num
6088
6089 new_instance_count = nb_scale_op + default_instance_num
6090
6091 if new_instance_count < min_instance_count < vdu_count:
6092 instances_number = min_instance_count - new_instance_count
6093 else:
6094 instances_number = instance_num
6095
6096 if new_instance_count < min_instance_count:
6097 raise LcmException(
6098 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6099 "scaling-group-descriptor '{}'".format(
6100 nb_scale_op, scaling_group
6101 )
6102 )
6103 for x in range(vdu_delta.get("number-of-instances", 1)):
6104 vca_scaling_info.append(
6105 {
6106 "osm_vdu_id": vdu_delta["id"],
6107 "member-vnf-index": vnf_index,
6108 "type": "delete",
6109 "vdu_index": vdu_index - 1 - x,
6110 }
6111 )
6112 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
6113 for kdu_delta in delta.get("kdu-resource-delta", {}):
6114 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
6115 kdu_name = kdu_profile["kdu-name"]
6116 resource_name = kdu_profile.get("resource-name", "")
6117
6118 if not scaling_info["kdu-delete"].get(kdu_name, None):
6119 scaling_info["kdu-delete"][kdu_name] = []
6120
6121 kdur = get_kdur(db_vnfr, kdu_name)
6122 if kdur.get("helm-chart"):
6123 k8s_cluster_type = "helm-chart-v3"
6124 self.logger.debug("kdur: {}".format(kdur))
6125 if (
6126 kdur.get("helm-version")
6127 and kdur.get("helm-version") == "v2"
6128 ):
6129 k8s_cluster_type = "helm-chart"
6130 elif kdur.get("juju-bundle"):
6131 k8s_cluster_type = "juju-bundle"
6132 else:
6133 raise LcmException(
6134 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6135 "juju-bundle. Maybe an old NBI version is running".format(
6136 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
6137 )
6138 )
6139
6140 min_instance_count = 0
6141 if kdu_profile and "min-number-of-instances" in kdu_profile:
6142 min_instance_count = kdu_profile["min-number-of-instances"]
6143
6144 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
6145 deployed_kdu, _ = get_deployed_kdu(
6146 nsr_deployed, kdu_name, vnf_index
6147 )
6148 if deployed_kdu is None:
6149 raise LcmException(
6150 "KDU '{}' for vnf '{}' not deployed".format(
6151 kdu_name, vnf_index
6152 )
6153 )
6154 kdu_instance = deployed_kdu.get("kdu-instance")
6155 instance_num = await self.k8scluster_map[
6156 k8s_cluster_type
6157 ].get_scale_count(
6158 resource_name,
6159 kdu_instance,
6160 vca_id=vca_id,
6161 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6162 kdu_model=deployed_kdu.get("kdu-model"),
6163 )
6164 kdu_replica_count = instance_num - kdu_delta.get(
6165 "number-of-instances", 1
6166 )
6167
6168 if kdu_replica_count < min_instance_count < instance_num:
6169 kdu_replica_count = min_instance_count
6170 if kdu_replica_count < min_instance_count:
6171 raise LcmException(
6172 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6173 "scaling-group-descriptor '{}'".format(
6174 instance_num, scaling_group
6175 )
6176 )
6177
6178 for x in range(kdu_delta.get("number-of-instances", 1)):
6179 vca_scaling_info.append(
6180 {
6181 "osm_kdu_id": kdu_name,
6182 "member-vnf-index": vnf_index,
6183 "type": "delete",
6184 "kdu_index": instance_num - x - 1,
6185 }
6186 )
6187 scaling_info["kdu-delete"][kdu_name].append(
6188 {
6189 "member-vnf-index": vnf_index,
6190 "type": "delete",
6191 "k8s-cluster-type": k8s_cluster_type,
6192 "resource-name": resource_name,
6193 "scale": kdu_replica_count,
6194 }
6195 )
6196
6197 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6198 vdu_delete = copy(scaling_info.get("vdu-delete"))
6199 if scaling_info["scaling_direction"] == "IN":
6200 for vdur in reversed(db_vnfr["vdur"]):
6201 if vdu_delete.get(vdur["vdu-id-ref"]):
6202 vdu_delete[vdur["vdu-id-ref"]] -= 1
6203 scaling_info["vdu"].append(
6204 {
6205 "name": vdur.get("name") or vdur.get("vdu-name"),
6206 "vdu_id": vdur["vdu-id-ref"],
6207 "interface": [],
6208 }
6209 )
6210 for interface in vdur["interfaces"]:
6211 scaling_info["vdu"][-1]["interface"].append(
6212 {
6213 "name": interface["name"],
6214 "ip_address": interface["ip-address"],
6215 "mac_address": interface.get("mac-address"),
6216 }
6217 )
6218 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6219
6220 # PRE-SCALE BEGIN
6221 step = "Executing pre-scale vnf-config-primitive"
6222 if scaling_descriptor.get("scaling-config-action"):
6223 for scaling_config_action in scaling_descriptor[
6224 "scaling-config-action"
6225 ]:
6226 if (
6227 scaling_config_action.get("trigger") == "pre-scale-in"
6228 and scaling_type == "SCALE_IN"
6229 ) or (
6230 scaling_config_action.get("trigger") == "pre-scale-out"
6231 and scaling_type == "SCALE_OUT"
6232 ):
6233 vnf_config_primitive = scaling_config_action[
6234 "vnf-config-primitive-name-ref"
6235 ]
6236 step = db_nslcmop_update[
6237 "detailed-status"
6238 ] = "executing pre-scale scaling-config-action '{}'".format(
6239 vnf_config_primitive
6240 )
6241
6242 # look for primitive
6243 for config_primitive in (
6244 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6245 ).get("config-primitive", ()):
6246 if config_primitive["name"] == vnf_config_primitive:
6247 break
6248 else:
6249 raise LcmException(
6250 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6251 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6252 "primitive".format(scaling_group, vnf_config_primitive)
6253 )
6254
6255 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6256 if db_vnfr.get("additionalParamsForVnf"):
6257 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6258
6259 scale_process = "VCA"
6260 db_nsr_update["config-status"] = "configuring pre-scaling"
6261 primitive_params = self._map_primitive_params(
6262 config_primitive, {}, vnfr_params
6263 )
6264
6265 # Pre-scale retry check: Check if this sub-operation has been executed before
6266 op_index = self._check_or_add_scale_suboperation(
6267 db_nslcmop,
6268 vnf_index,
6269 vnf_config_primitive,
6270 primitive_params,
6271 "PRE-SCALE",
6272 )
6273 if op_index == self.SUBOPERATION_STATUS_SKIP:
6274 # Skip sub-operation
6275 result = "COMPLETED"
6276 result_detail = "Done"
6277 self.logger.debug(
6278 logging_text
6279 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6280 vnf_config_primitive, result, result_detail
6281 )
6282 )
6283 else:
6284 if op_index == self.SUBOPERATION_STATUS_NEW:
6285 # New sub-operation: Get index of this sub-operation
6286 op_index = (
6287 len(db_nslcmop.get("_admin", {}).get("operations"))
6288 - 1
6289 )
6290 self.logger.debug(
6291 logging_text
6292 + "vnf_config_primitive={} New sub-operation".format(
6293 vnf_config_primitive
6294 )
6295 )
6296 else:
6297 # retry: Get registered params for this existing sub-operation
6298 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6299 op_index
6300 ]
6301 vnf_index = op.get("member_vnf_index")
6302 vnf_config_primitive = op.get("primitive")
6303 primitive_params = op.get("primitive_params")
6304 self.logger.debug(
6305 logging_text
6306 + "vnf_config_primitive={} Sub-operation retry".format(
6307 vnf_config_primitive
6308 )
6309 )
6310 # Execute the primitive, either with new (first-time) or registered (reintent) args
6311 ee_descriptor_id = config_primitive.get(
6312 "execution-environment-ref"
6313 )
6314 primitive_name = config_primitive.get(
6315 "execution-environment-primitive", vnf_config_primitive
6316 )
6317 ee_id, vca_type = self._look_for_deployed_vca(
6318 nsr_deployed["VCA"],
6319 member_vnf_index=vnf_index,
6320 vdu_id=None,
6321 vdu_count_index=None,
6322 ee_descriptor_id=ee_descriptor_id,
6323 )
6324 result, result_detail = await self._ns_execute_primitive(
6325 ee_id,
6326 primitive_name,
6327 primitive_params,
6328 vca_type=vca_type,
6329 vca_id=vca_id,
6330 )
6331 self.logger.debug(
6332 logging_text
6333 + "vnf_config_primitive={} Done with result {} {}".format(
6334 vnf_config_primitive, result, result_detail
6335 )
6336 )
6337 # Update operationState = COMPLETED | FAILED
6338 self._update_suboperation_status(
6339 db_nslcmop, op_index, result, result_detail
6340 )
6341
6342 if result == "FAILED":
6343 raise LcmException(result_detail)
6344 db_nsr_update["config-status"] = old_config_status
6345 scale_process = None
6346 # PRE-SCALE END
6347
6348 db_nsr_update[
6349 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
6350 ] = nb_scale_op
6351 db_nsr_update[
6352 "_admin.scaling-group.{}.time".format(admin_scale_index)
6353 ] = time()
6354
6355 # SCALE-IN VCA - BEGIN
6356 if vca_scaling_info:
6357 step = db_nslcmop_update[
6358 "detailed-status"
6359 ] = "Deleting the execution environments"
6360 scale_process = "VCA"
6361 for vca_info in vca_scaling_info:
6362 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
6363 member_vnf_index = str(vca_info["member-vnf-index"])
6364 self.logger.debug(
6365 logging_text + "vdu info: {}".format(vca_info)
6366 )
6367 if vca_info.get("osm_vdu_id"):
6368 vdu_id = vca_info["osm_vdu_id"]
6369 vdu_index = int(vca_info["vdu_index"])
6370 stage[
6371 1
6372 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6373 member_vnf_index, vdu_id, vdu_index
6374 )
6375 stage[2] = step = "Scaling in VCA"
6376 self._write_op_status(op_id=nslcmop_id, stage=stage)
6377 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
6378 config_update = db_nsr["configurationStatus"]
6379 for vca_index, vca in enumerate(vca_update):
6380 if (
6381 (vca or vca.get("ee_id"))
6382 and vca["member-vnf-index"] == member_vnf_index
6383 and vca["vdu_count_index"] == vdu_index
6384 ):
6385 if vca.get("vdu_id"):
6386 config_descriptor = get_configuration(
6387 db_vnfd, vca.get("vdu_id")
6388 )
6389 elif vca.get("kdu_name"):
6390 config_descriptor = get_configuration(
6391 db_vnfd, vca.get("kdu_name")
6392 )
6393 else:
6394 config_descriptor = get_configuration(
6395 db_vnfd, db_vnfd["id"]
6396 )
6397 operation_params = (
6398 db_nslcmop.get("operationParams") or {}
6399 )
6400 exec_terminate_primitives = not operation_params.get(
6401 "skip_terminate_primitives"
6402 ) and vca.get("needed_terminate")
6403 task = asyncio.ensure_future(
6404 asyncio.wait_for(
6405 self.destroy_N2VC(
6406 logging_text,
6407 db_nslcmop,
6408 vca,
6409 config_descriptor,
6410 vca_index,
6411 destroy_ee=True,
6412 exec_primitives=exec_terminate_primitives,
6413 scaling_in=True,
6414 vca_id=vca_id,
6415 ),
6416 timeout=self.timeout_charm_delete,
6417 )
6418 )
6419 tasks_dict_info[task] = "Terminating VCA {}".format(
6420 vca.get("ee_id")
6421 )
6422 del vca_update[vca_index]
6423 del config_update[vca_index]
6424 # wait for pending tasks of terminate primitives
6425 if tasks_dict_info:
6426 self.logger.debug(
6427 logging_text
6428 + "Waiting for tasks {}".format(
6429 list(tasks_dict_info.keys())
6430 )
6431 )
6432 error_list = await self._wait_for_tasks(
6433 logging_text,
6434 tasks_dict_info,
6435 min(
6436 self.timeout_charm_delete, self.timeout_ns_terminate
6437 ),
6438 stage,
6439 nslcmop_id,
6440 )
6441 tasks_dict_info.clear()
6442 if error_list:
6443 raise LcmException("; ".join(error_list))
6444
6445 db_vca_and_config_update = {
6446 "_admin.deployed.VCA": vca_update,
6447 "configurationStatus": config_update,
6448 }
6449 self.update_db_2(
6450 "nsrs", db_nsr["_id"], db_vca_and_config_update
6451 )
6452 scale_process = None
6453 # SCALE-IN VCA - END
6454
6455 # SCALE RO - BEGIN
6456 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
6457 scale_process = "RO"
6458 if self.ro_config.get("ng"):
6459 await self._scale_ng_ro(
6460 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
6461 )
6462 scaling_info.pop("vdu-create", None)
6463 scaling_info.pop("vdu-delete", None)
6464
6465 scale_process = None
6466 # SCALE RO - END
6467
6468 # SCALE KDU - BEGIN
6469 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
6470 scale_process = "KDU"
6471 await self._scale_kdu(
6472 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6473 )
6474 scaling_info.pop("kdu-create", None)
6475 scaling_info.pop("kdu-delete", None)
6476
6477 scale_process = None
6478 # SCALE KDU - END
6479
6480 if db_nsr_update:
6481 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6482
6483 # SCALE-UP VCA - BEGIN
6484 if vca_scaling_info:
6485 step = db_nslcmop_update[
6486 "detailed-status"
6487 ] = "Creating new execution environments"
6488 scale_process = "VCA"
6489 for vca_info in vca_scaling_info:
6490 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
6491 member_vnf_index = str(vca_info["member-vnf-index"])
6492 self.logger.debug(
6493 logging_text + "vdu info: {}".format(vca_info)
6494 )
6495 vnfd_id = db_vnfr["vnfd-ref"]
6496 if vca_info.get("osm_vdu_id"):
6497 vdu_index = int(vca_info["vdu_index"])
6498 deploy_params = {"OSM": get_osm_params(db_vnfr)}
6499 if db_vnfr.get("additionalParamsForVnf"):
6500 deploy_params.update(
6501 parse_yaml_strings(
6502 db_vnfr["additionalParamsForVnf"].copy()
6503 )
6504 )
6505 descriptor_config = get_configuration(
6506 db_vnfd, db_vnfd["id"]
6507 )
6508 if descriptor_config:
6509 vdu_id = None
6510 vdu_name = None
6511 kdu_name = None
6512 self._deploy_n2vc(
6513 logging_text=logging_text
6514 + "member_vnf_index={} ".format(member_vnf_index),
6515 db_nsr=db_nsr,
6516 db_vnfr=db_vnfr,
6517 nslcmop_id=nslcmop_id,
6518 nsr_id=nsr_id,
6519 nsi_id=nsi_id,
6520 vnfd_id=vnfd_id,
6521 vdu_id=vdu_id,
6522 kdu_name=kdu_name,
6523 member_vnf_index=member_vnf_index,
6524 vdu_index=vdu_index,
6525 vdu_name=vdu_name,
6526 deploy_params=deploy_params,
6527 descriptor_config=descriptor_config,
6528 base_folder=base_folder,
6529 task_instantiation_info=tasks_dict_info,
6530 stage=stage,
6531 )
6532 vdu_id = vca_info["osm_vdu_id"]
6533 vdur = find_in_list(
6534 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6535 )
6536 descriptor_config = get_configuration(db_vnfd, vdu_id)
6537 if vdur.get("additionalParams"):
6538 deploy_params_vdu = parse_yaml_strings(
6539 vdur["additionalParams"]
6540 )
6541 else:
6542 deploy_params_vdu = deploy_params
6543 deploy_params_vdu["OSM"] = get_osm_params(
6544 db_vnfr, vdu_id, vdu_count_index=vdu_index
6545 )
6546 if descriptor_config:
6547 vdu_name = None
6548 kdu_name = None
6549 stage[
6550 1
6551 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6552 member_vnf_index, vdu_id, vdu_index
6553 )
6554 stage[2] = step = "Scaling out VCA"
6555 self._write_op_status(op_id=nslcmop_id, stage=stage)
6556 self._deploy_n2vc(
6557 logging_text=logging_text
6558 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6559 member_vnf_index, vdu_id, vdu_index
6560 ),
6561 db_nsr=db_nsr,
6562 db_vnfr=db_vnfr,
6563 nslcmop_id=nslcmop_id,
6564 nsr_id=nsr_id,
6565 nsi_id=nsi_id,
6566 vnfd_id=vnfd_id,
6567 vdu_id=vdu_id,
6568 kdu_name=kdu_name,
6569 member_vnf_index=member_vnf_index,
6570 vdu_index=vdu_index,
6571 vdu_name=vdu_name,
6572 deploy_params=deploy_params_vdu,
6573 descriptor_config=descriptor_config,
6574 base_folder=base_folder,
6575 task_instantiation_info=tasks_dict_info,
6576 stage=stage,
6577 )
6578 # SCALE-UP VCA - END
6579 scale_process = None
6580
6581 # POST-SCALE BEGIN
6582 # execute primitive service POST-SCALING
6583 step = "Executing post-scale vnf-config-primitive"
6584 if scaling_descriptor.get("scaling-config-action"):
6585 for scaling_config_action in scaling_descriptor[
6586 "scaling-config-action"
6587 ]:
6588 if (
6589 scaling_config_action.get("trigger") == "post-scale-in"
6590 and scaling_type == "SCALE_IN"
6591 ) or (
6592 scaling_config_action.get("trigger") == "post-scale-out"
6593 and scaling_type == "SCALE_OUT"
6594 ):
6595 vnf_config_primitive = scaling_config_action[
6596 "vnf-config-primitive-name-ref"
6597 ]
6598 step = db_nslcmop_update[
6599 "detailed-status"
6600 ] = "executing post-scale scaling-config-action '{}'".format(
6601 vnf_config_primitive
6602 )
6603
6604 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6605 if db_vnfr.get("additionalParamsForVnf"):
6606 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6607
6608 # look for primitive
6609 for config_primitive in (
6610 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6611 ).get("config-primitive", ()):
6612 if config_primitive["name"] == vnf_config_primitive:
6613 break
6614 else:
6615 raise LcmException(
6616 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6617 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6618 "config-primitive".format(
6619 scaling_group, vnf_config_primitive
6620 )
6621 )
6622 scale_process = "VCA"
6623 db_nsr_update["config-status"] = "configuring post-scaling"
6624 primitive_params = self._map_primitive_params(
6625 config_primitive, {}, vnfr_params
6626 )
6627
6628 # Post-scale retry check: Check if this sub-operation has been executed before
6629 op_index = self._check_or_add_scale_suboperation(
6630 db_nslcmop,
6631 vnf_index,
6632 vnf_config_primitive,
6633 primitive_params,
6634 "POST-SCALE",
6635 )
6636 if op_index == self.SUBOPERATION_STATUS_SKIP:
6637 # Skip sub-operation
6638 result = "COMPLETED"
6639 result_detail = "Done"
6640 self.logger.debug(
6641 logging_text
6642 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6643 vnf_config_primitive, result, result_detail
6644 )
6645 )
6646 else:
6647 if op_index == self.SUBOPERATION_STATUS_NEW:
6648 # New sub-operation: Get index of this sub-operation
6649 op_index = (
6650 len(db_nslcmop.get("_admin", {}).get("operations"))
6651 - 1
6652 )
6653 self.logger.debug(
6654 logging_text
6655 + "vnf_config_primitive={} New sub-operation".format(
6656 vnf_config_primitive
6657 )
6658 )
6659 else:
6660 # retry: Get registered params for this existing sub-operation
6661 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6662 op_index
6663 ]
6664 vnf_index = op.get("member_vnf_index")
6665 vnf_config_primitive = op.get("primitive")
6666 primitive_params = op.get("primitive_params")
6667 self.logger.debug(
6668 logging_text
6669 + "vnf_config_primitive={} Sub-operation retry".format(
6670 vnf_config_primitive
6671 )
6672 )
6673 # Execute the primitive, either with new (first-time) or registered (reintent) args
6674 ee_descriptor_id = config_primitive.get(
6675 "execution-environment-ref"
6676 )
6677 primitive_name = config_primitive.get(
6678 "execution-environment-primitive", vnf_config_primitive
6679 )
6680 ee_id, vca_type = self._look_for_deployed_vca(
6681 nsr_deployed["VCA"],
6682 member_vnf_index=vnf_index,
6683 vdu_id=None,
6684 vdu_count_index=None,
6685 ee_descriptor_id=ee_descriptor_id,
6686 )
6687 result, result_detail = await self._ns_execute_primitive(
6688 ee_id,
6689 primitive_name,
6690 primitive_params,
6691 vca_type=vca_type,
6692 vca_id=vca_id,
6693 )
6694 self.logger.debug(
6695 logging_text
6696 + "vnf_config_primitive={} Done with result {} {}".format(
6697 vnf_config_primitive, result, result_detail
6698 )
6699 )
6700 # Update operationState = COMPLETED | FAILED
6701 self._update_suboperation_status(
6702 db_nslcmop, op_index, result, result_detail
6703 )
6704
6705 if result == "FAILED":
6706 raise LcmException(result_detail)
6707 db_nsr_update["config-status"] = old_config_status
6708 scale_process = None
6709 # POST-SCALE END
6710
6711 db_nsr_update[
6712 "detailed-status"
6713 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6714 db_nsr_update["operational-status"] = (
6715 "running"
6716 if old_operational_status == "failed"
6717 else old_operational_status
6718 )
6719 db_nsr_update["config-status"] = old_config_status
6720 return
6721 except (
6722 ROclient.ROClientException,
6723 DbException,
6724 LcmException,
6725 NgRoException,
6726 ) as e:
6727 self.logger.error(logging_text + "Exit Exception {}".format(e))
6728 exc = e
6729 except asyncio.CancelledError:
6730 self.logger.error(
6731 logging_text + "Cancelled Exception while '{}'".format(step)
6732 )
6733 exc = "Operation was cancelled"
6734 except Exception as e:
6735 exc = traceback.format_exc()
6736 self.logger.critical(
6737 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6738 exc_info=True,
6739 )
6740 finally:
6741 self._write_ns_status(
6742 nsr_id=nsr_id,
6743 ns_state=None,
6744 current_operation="IDLE",
6745 current_operation_id=None,
6746 )
6747 if tasks_dict_info:
6748 stage[1] = "Waiting for instantiate pending tasks."
6749 self.logger.debug(logging_text + stage[1])
6750 exc = await self._wait_for_tasks(
6751 logging_text,
6752 tasks_dict_info,
6753 self.timeout_ns_deploy,
6754 stage,
6755 nslcmop_id,
6756 nsr_id=nsr_id,
6757 )
6758 if exc:
6759 db_nslcmop_update[
6760 "detailed-status"
6761 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6762 nslcmop_operation_state = "FAILED"
6763 if db_nsr:
6764 db_nsr_update["operational-status"] = old_operational_status
6765 db_nsr_update["config-status"] = old_config_status
6766 db_nsr_update["detailed-status"] = ""
6767 if scale_process:
6768 if "VCA" in scale_process:
6769 db_nsr_update["config-status"] = "failed"
6770 if "RO" in scale_process:
6771 db_nsr_update["operational-status"] = "failed"
6772 db_nsr_update[
6773 "detailed-status"
6774 ] = "FAILED scaling nslcmop={} {}: {}".format(
6775 nslcmop_id, step, exc
6776 )
6777 else:
6778 error_description_nslcmop = None
6779 nslcmop_operation_state = "COMPLETED"
6780 db_nslcmop_update["detailed-status"] = "Done"
6781
6782 self._write_op_status(
6783 op_id=nslcmop_id,
6784 stage="",
6785 error_message=error_description_nslcmop,
6786 operation_state=nslcmop_operation_state,
6787 other_update=db_nslcmop_update,
6788 )
6789 if db_nsr:
6790 self._write_ns_status(
6791 nsr_id=nsr_id,
6792 ns_state=None,
6793 current_operation="IDLE",
6794 current_operation_id=None,
6795 other_update=db_nsr_update,
6796 )
6797
6798 if nslcmop_operation_state:
6799 try:
6800 msg = {
6801 "nsr_id": nsr_id,
6802 "nslcmop_id": nslcmop_id,
6803 "operationState": nslcmop_operation_state,
6804 }
6805 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6806 except Exception as e:
6807 self.logger.error(
6808 logging_text + "kafka_write notification Exception {}".format(e)
6809 )
6810 self.logger.debug(logging_text + "Exit")
6811 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6812
6813 async def _scale_kdu(
6814 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6815 ):
6816 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6817 for kdu_name in _scaling_info:
6818 for kdu_scaling_info in _scaling_info[kdu_name]:
6819 deployed_kdu, index = get_deployed_kdu(
6820 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6821 )
6822 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6823 kdu_instance = deployed_kdu["kdu-instance"]
6824 kdu_model = deployed_kdu.get("kdu-model")
6825 scale = int(kdu_scaling_info["scale"])
6826 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6827
6828 db_dict = {
6829 "collection": "nsrs",
6830 "filter": {"_id": nsr_id},
6831 "path": "_admin.deployed.K8s.{}".format(index),
6832 }
6833
6834 step = "scaling application {}".format(
6835 kdu_scaling_info["resource-name"]
6836 )
6837 self.logger.debug(logging_text + step)
6838
6839 if kdu_scaling_info["type"] == "delete":
6840 kdu_config = get_configuration(db_vnfd, kdu_name)
6841 if (
6842 kdu_config
6843 and kdu_config.get("terminate-config-primitive")
6844 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6845 ):
6846 terminate_config_primitive_list = kdu_config.get(
6847 "terminate-config-primitive"
6848 )
6849 terminate_config_primitive_list.sort(
6850 key=lambda val: int(val["seq"])
6851 )
6852
6853 for (
6854 terminate_config_primitive
6855 ) in terminate_config_primitive_list:
6856 primitive_params_ = self._map_primitive_params(
6857 terminate_config_primitive, {}, {}
6858 )
6859 step = "execute terminate config primitive"
6860 self.logger.debug(logging_text + step)
6861 await asyncio.wait_for(
6862 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6863 cluster_uuid=cluster_uuid,
6864 kdu_instance=kdu_instance,
6865 primitive_name=terminate_config_primitive["name"],
6866 params=primitive_params_,
6867 db_dict=db_dict,
6868 vca_id=vca_id,
6869 ),
6870 timeout=600,
6871 )
6872
6873 await asyncio.wait_for(
6874 self.k8scluster_map[k8s_cluster_type].scale(
6875 kdu_instance,
6876 scale,
6877 kdu_scaling_info["resource-name"],
6878 vca_id=vca_id,
6879 cluster_uuid=cluster_uuid,
6880 kdu_model=kdu_model,
6881 atomic=True,
6882 db_dict=db_dict,
6883 ),
6884 timeout=self.timeout_vca_on_error,
6885 )
6886
6887 if kdu_scaling_info["type"] == "create":
6888 kdu_config = get_configuration(db_vnfd, kdu_name)
6889 if (
6890 kdu_config
6891 and kdu_config.get("initial-config-primitive")
6892 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6893 ):
6894 initial_config_primitive_list = kdu_config.get(
6895 "initial-config-primitive"
6896 )
6897 initial_config_primitive_list.sort(
6898 key=lambda val: int(val["seq"])
6899 )
6900
6901 for initial_config_primitive in initial_config_primitive_list:
6902 primitive_params_ = self._map_primitive_params(
6903 initial_config_primitive, {}, {}
6904 )
6905 step = "execute initial config primitive"
6906 self.logger.debug(logging_text + step)
6907 await asyncio.wait_for(
6908 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6909 cluster_uuid=cluster_uuid,
6910 kdu_instance=kdu_instance,
6911 primitive_name=initial_config_primitive["name"],
6912 params=primitive_params_,
6913 db_dict=db_dict,
6914 vca_id=vca_id,
6915 ),
6916 timeout=600,
6917 )
6918
6919 async def _scale_ng_ro(
6920 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6921 ):
6922 nsr_id = db_nslcmop["nsInstanceId"]
6923 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6924 db_vnfrs = {}
6925
6926 # read from db: vnfd's for every vnf
6927 db_vnfds = []
6928
6929 # for each vnf in ns, read vnfd
6930 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6931 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6932 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6933 # if we haven't this vnfd, read it from db
6934 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6935 # read from db
6936 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6937 db_vnfds.append(vnfd)
6938 n2vc_key = self.n2vc.get_public_key()
6939 n2vc_key_list = [n2vc_key]
6940 self.scale_vnfr(
6941 db_vnfr,
6942 vdu_scaling_info.get("vdu-create"),
6943 vdu_scaling_info.get("vdu-delete"),
6944 mark_delete=True,
6945 )
6946 # db_vnfr has been updated, update db_vnfrs to use it
6947 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6948 await self._instantiate_ng_ro(
6949 logging_text,
6950 nsr_id,
6951 db_nsd,
6952 db_nsr,
6953 db_nslcmop,
6954 db_vnfrs,
6955 db_vnfds,
6956 n2vc_key_list,
6957 stage=stage,
6958 start_deploy=time(),
6959 timeout_ns_deploy=self.timeout_ns_deploy,
6960 )
6961 if vdu_scaling_info.get("vdu-delete"):
6962 self.scale_vnfr(
6963 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6964 )
6965
6966 async def extract_prometheus_scrape_jobs(
6967 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6968 ):
6969 # look if exist a file called 'prometheus*.j2' and
6970 artifact_content = self.fs.dir_ls(artifact_path)
6971 job_file = next(
6972 (
6973 f
6974 for f in artifact_content
6975 if f.startswith("prometheus") and f.endswith(".j2")
6976 ),
6977 None,
6978 )
6979 if not job_file:
6980 return
6981 with self.fs.file_open((artifact_path, job_file), "r") as f:
6982 job_data = f.read()
6983
6984 # TODO get_service
6985 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6986 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6987 host_port = "80"
6988 vnfr_id = vnfr_id.replace("-", "")
6989 variables = {
6990 "JOB_NAME": vnfr_id,
6991 "TARGET_IP": target_ip,
6992 "EXPORTER_POD_IP": host_name,
6993 "EXPORTER_POD_PORT": host_port,
6994 }
6995 job_list = parse_job(job_data, variables)
6996 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6997 for job in job_list:
6998 if (
6999 not isinstance(job.get("job_name"), str)
7000 or vnfr_id not in job["job_name"]
7001 ):
7002 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
7003 job["nsr_id"] = nsr_id
7004 job["vnfr_id"] = vnfr_id
7005 return job_list
7006
7007 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7008 """
7009 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7010
7011 :param: vim_account_id: VIM Account ID
7012
7013 :return: (cloud_name, cloud_credential)
7014 """
7015 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7016 return config.get("vca_cloud"), config.get("vca_cloud_credential")
7017
7018 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7019 """
7020 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7021
7022 :param: vim_account_id: VIM Account ID
7023
7024 :return: (cloud_name, cloud_credential)
7025 """
7026 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7027 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
7028
7029 async def migrate(self, nsr_id, nslcmop_id):
7030 """
7031 Migrate VNFs and VDUs instances in a NS
7032
7033 :param: nsr_id: NS Instance ID
7034 :param: nslcmop_id: nslcmop ID of migrate
7035
7036 """
7037 # Try to lock HA task here
7038 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
7039 if not task_is_locked_by_me:
7040 return
7041 logging_text = "Task ns={} migrate ".format(nsr_id)
7042 self.logger.debug(logging_text + "Enter")
7043 # get all needed from database
7044 db_nslcmop = None
7045 db_nslcmop_update = {}
7046 nslcmop_operation_state = None
7047 db_nsr_update = {}
7048 target = {}
7049 exc = None
7050 # in case of error, indicates what part of scale was failed to put nsr at error status
7051 start_deploy = time()
7052
7053 try:
7054 # wait for any previous tasks in process
7055 step = "Waiting for previous operations to terminate"
7056 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
7057
7058 self._write_ns_status(
7059 nsr_id=nsr_id,
7060 ns_state=None,
7061 current_operation="MIGRATING",
7062 current_operation_id=nslcmop_id
7063 )
7064 step = "Getting nslcmop from database"
7065 self.logger.debug(step + " after having waited for previous tasks to be completed")
7066 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
7067 migrate_params = db_nslcmop.get("operationParams")
7068
7069 target = {}
7070 target.update(migrate_params)
7071 desc = await self.RO.migrate(nsr_id, target)
7072 self.logger.debug("RO return > {}".format(desc))
7073 action_id = desc["action_id"]
7074 await self._wait_ng_ro(
7075 nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
7076 )
7077 except (ROclient.ROClientException, DbException, LcmException) as e:
7078 self.logger.error("Exit Exception {}".format(e))
7079 exc = e
7080 except asyncio.CancelledError:
7081 self.logger.error("Cancelled Exception while '{}'".format(step))
7082 exc = "Operation was cancelled"
7083 except Exception as e:
7084 exc = traceback.format_exc()
7085 self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
7086 finally:
7087 self._write_ns_status(
7088 nsr_id=nsr_id,
7089 ns_state=None,
7090 current_operation="IDLE",
7091 current_operation_id=None,
7092 )
7093 if exc:
7094 db_nslcmop_update[
7095 "detailed-status"
7096 ] = "FAILED {}: {}".format(step, exc)
7097 nslcmop_operation_state = "FAILED"
7098 else:
7099 nslcmop_operation_state = "COMPLETED"
7100 db_nslcmop_update["detailed-status"] = "Done"
7101 db_nsr_update["detailed-status"] = "Done"
7102
7103 self._write_op_status(
7104 op_id=nslcmop_id,
7105 stage="",
7106 error_message="",
7107 operation_state=nslcmop_operation_state,
7108 other_update=db_nslcmop_update,
7109 )
7110 if nslcmop_operation_state:
7111 try:
7112 msg = {
7113 "nsr_id": nsr_id,
7114 "nslcmop_id": nslcmop_id,
7115 "operationState": nslcmop_operation_state,
7116 }
7117 await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
7118 except Exception as e:
7119 self.logger.error(
7120 logging_text + "kafka_write notification Exception {}".format(e)
7121 )
7122 self.logger.debug(logging_text + "Exit")
7123 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")