02b9f089c882e5b56894156047dc1228b3bdf185
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import shutil
21 from typing import Any, Dict, List
22 import yaml
23 import logging
24 import logging.handlers
25 import traceback
26 import json
27 from jinja2 import (
28 Environment,
29 TemplateError,
30 TemplateNotFound,
31 StrictUndefined,
32 UndefinedError,
33 )
34
35 from osm_lcm import ROclient
36 from osm_lcm.data_utils.nsr import (
37 get_deployed_kdu,
38 get_deployed_vca,
39 get_deployed_vca_list,
40 get_nsd,
41 )
42 from osm_lcm.data_utils.vca import (
43 DeployedComponent,
44 DeployedK8sResource,
45 DeployedVCA,
46 EELevel,
47 Relation,
48 EERelation,
49 safe_get_ee_relation,
50 )
51 from osm_lcm.ng_ro import NgRoClient, NgRoException
52 from osm_lcm.lcm_utils import (
53 LcmException,
54 LcmExceptionNoMgmtIP,
55 LcmBase,
56 deep_get,
57 get_iterable,
58 populate_dict,
59 check_juju_bundle_existence,
60 get_charm_artifact_path,
61 )
62 from osm_lcm.data_utils.nsd import (
63 get_ns_configuration_relation_list,
64 get_vnf_profile,
65 get_vnf_profiles,
66 )
67 from osm_lcm.data_utils.vnfd import (
68 get_kdu,
69 get_kdu_services,
70 get_relation_list,
71 get_vdu_list,
72 get_vdu_profile,
73 get_ee_sorted_initial_config_primitive_list,
74 get_ee_sorted_terminate_config_primitive_list,
75 get_kdu_list,
76 get_virtual_link_profiles,
77 get_vdu,
78 get_configuration,
79 get_vdu_index,
80 get_scaling_aspect,
81 get_number_of_instances,
82 get_juju_ee_ref,
83 get_kdu_resource_profile,
84 find_software_version,
85 )
86 from osm_lcm.data_utils.list_utils import find_in_list
87 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
88 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
89 from osm_lcm.data_utils.database.vim_account import VimAccountDB
90 from n2vc.definitions import RelationEndpoint
91 from n2vc.k8s_helm_conn import K8sHelmConnector
92 from n2vc.k8s_helm3_conn import K8sHelm3Connector
93 from n2vc.k8s_juju_conn import K8sJujuConnector
94
95 from osm_common.dbbase import DbException
96 from osm_common.fsbase import FsException
97
98 from osm_lcm.data_utils.database.database import Database
99 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
100
101 from n2vc.n2vc_juju_conn import N2VCJujuConnector
102 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
103
104 from osm_lcm.lcm_helm_conn import LCMHelmConn
105 from osm_lcm.osm_config import OsmConfigBuilder
106 from osm_lcm.prometheus import parse_job
107
108 from copy import copy, deepcopy
109 from time import time
110 from uuid import uuid4
111
112 from random import randint
113
114 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
115
116
117 class NsLcm(LcmBase):
118 timeout_vca_on_error = (
119 5 * 60
120 ) # Time for charm from first time at blocked,error status to mark as failed
121 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
122 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
123 timeout_charm_delete = 10 * 60
124 timeout_primitive = 30 * 60 # timeout for primitive execution
125 timeout_ns_update = 30 * 60 # timeout for ns update
126 timeout_progress_primitive = (
127 10 * 60
128 ) # timeout for some progress in a primitive execution
129 timeout_migrate = 1800 # default global timeout for migrating vnfs
130
131 SUBOPERATION_STATUS_NOT_FOUND = -1
132 SUBOPERATION_STATUS_NEW = -2
133 SUBOPERATION_STATUS_SKIP = -3
134 task_name_deploy_vca = "Deploying VCA"
135
136 def __init__(self, msg, lcm_tasks, config, loop):
137 """
138 Init, Connect to database, filesystem storage, and messaging
139 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
140 :return: None
141 """
142 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
143
144 self.db = Database().instance.db
145 self.fs = Filesystem().instance.fs
146 self.loop = loop
147 self.lcm_tasks = lcm_tasks
148 self.timeout = config["timeout"]
149 self.ro_config = config["ro_config"]
150 self.ng_ro = config["ro_config"].get("ng")
151 self.vca_config = config["VCA"].copy()
152
153 # create N2VC connector
154 self.n2vc = N2VCJujuConnector(
155 log=self.logger,
156 loop=self.loop,
157 on_update_db=self._on_update_n2vc_db,
158 fs=self.fs,
159 db=self.db,
160 )
161
162 self.conn_helm_ee = LCMHelmConn(
163 log=self.logger,
164 loop=self.loop,
165 vca_config=self.vca_config,
166 on_update_db=self._on_update_n2vc_db,
167 )
168
169 self.k8sclusterhelm2 = K8sHelmConnector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helmpath"),
172 log=self.logger,
173 on_update_db=None,
174 fs=self.fs,
175 db=self.db,
176 )
177
178 self.k8sclusterhelm3 = K8sHelm3Connector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 helm_command=self.vca_config.get("helm3path"),
181 fs=self.fs,
182 log=self.logger,
183 db=self.db,
184 on_update_db=None,
185 )
186
187 self.k8sclusterjuju = K8sJujuConnector(
188 kubectl_command=self.vca_config.get("kubectlpath"),
189 juju_command=self.vca_config.get("jujupath"),
190 log=self.logger,
191 loop=self.loop,
192 on_update_db=self._on_update_k8s_db,
193 fs=self.fs,
194 db=self.db,
195 )
196
197 self.k8scluster_map = {
198 "helm-chart": self.k8sclusterhelm2,
199 "helm-chart-v3": self.k8sclusterhelm3,
200 "chart": self.k8sclusterhelm3,
201 "juju-bundle": self.k8sclusterjuju,
202 "juju": self.k8sclusterjuju,
203 }
204
205 self.vca_map = {
206 "lxc_proxy_charm": self.n2vc,
207 "native_charm": self.n2vc,
208 "k8s_proxy_charm": self.n2vc,
209 "helm": self.conn_helm_ee,
210 "helm-v3": self.conn_helm_ee,
211 }
212
213 # create RO client
214 self.RO = NgRoClient(self.loop, **self.ro_config)
215
216 @staticmethod
217 def increment_ip_mac(ip_mac, vm_index=1):
218 if not isinstance(ip_mac, str):
219 return ip_mac
220 try:
221 # try with ipv4 look for last dot
222 i = ip_mac.rfind(".")
223 if i > 0:
224 i += 1
225 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
226 # try with ipv6 or mac look for last colon. Operate in hex
227 i = ip_mac.rfind(":")
228 if i > 0:
229 i += 1
230 # format in hex, len can be 2 for mac or 4 for ipv6
231 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
232 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
233 )
234 except Exception:
235 pass
236 return None
237
238 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
239
240 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
241
242 try:
243 # TODO filter RO descriptor fields...
244
245 # write to database
246 db_dict = dict()
247 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
248 db_dict["deploymentStatus"] = ro_descriptor
249 self.update_db_2("nsrs", nsrs_id, db_dict)
250
251 except Exception as e:
252 self.logger.warn(
253 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
254 )
255
256 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
257
258 # remove last dot from path (if exists)
259 if path.endswith("."):
260 path = path[:-1]
261
262 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
263 # .format(table, filter, path, updated_data))
264 try:
265
266 nsr_id = filter.get("_id")
267
268 # read ns record from database
269 nsr = self.db.get_one(table="nsrs", q_filter=filter)
270 current_ns_status = nsr.get("nsState")
271
272 # get vca status for NS
273 status_dict = await self.n2vc.get_status(
274 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
275 )
276
277 # vcaStatus
278 db_dict = dict()
279 db_dict["vcaStatus"] = status_dict
280 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
281
282 # update configurationStatus for this VCA
283 try:
284 vca_index = int(path[path.rfind(".") + 1 :])
285
286 vca_list = deep_get(
287 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
288 )
289 vca_status = vca_list[vca_index].get("status")
290
291 configuration_status_list = nsr.get("configurationStatus")
292 config_status = configuration_status_list[vca_index].get("status")
293
294 if config_status == "BROKEN" and vca_status != "failed":
295 db_dict["configurationStatus"][vca_index] = "READY"
296 elif config_status != "BROKEN" and vca_status == "failed":
297 db_dict["configurationStatus"][vca_index] = "BROKEN"
298 except Exception as e:
299 # not update configurationStatus
300 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
301
302 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
303 # if nsState = 'DEGRADED' check if all is OK
304 is_degraded = False
305 if current_ns_status in ("READY", "DEGRADED"):
306 error_description = ""
307 # check machines
308 if status_dict.get("machines"):
309 for machine_id in status_dict.get("machines"):
310 machine = status_dict.get("machines").get(machine_id)
311 # check machine agent-status
312 if machine.get("agent-status"):
313 s = machine.get("agent-status").get("status")
314 if s != "started":
315 is_degraded = True
316 error_description += (
317 "machine {} agent-status={} ; ".format(
318 machine_id, s
319 )
320 )
321 # check machine instance status
322 if machine.get("instance-status"):
323 s = machine.get("instance-status").get("status")
324 if s != "running":
325 is_degraded = True
326 error_description += (
327 "machine {} instance-status={} ; ".format(
328 machine_id, s
329 )
330 )
331 # check applications
332 if status_dict.get("applications"):
333 for app_id in status_dict.get("applications"):
334 app = status_dict.get("applications").get(app_id)
335 # check application status
336 if app.get("status"):
337 s = app.get("status").get("status")
338 if s != "active":
339 is_degraded = True
340 error_description += (
341 "application {} status={} ; ".format(app_id, s)
342 )
343
344 if error_description:
345 db_dict["errorDescription"] = error_description
346 if current_ns_status == "READY" and is_degraded:
347 db_dict["nsState"] = "DEGRADED"
348 if current_ns_status == "DEGRADED" and not is_degraded:
349 db_dict["nsState"] = "READY"
350
351 # write to database
352 self.update_db_2("nsrs", nsr_id, db_dict)
353
354 except (asyncio.CancelledError, asyncio.TimeoutError):
355 raise
356 except Exception as e:
357 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
358
359 async def _on_update_k8s_db(
360 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
361 ):
362 """
363 Updating vca status in NSR record
364 :param cluster_uuid: UUID of a k8s cluster
365 :param kdu_instance: The unique name of the KDU instance
366 :param filter: To get nsr_id
367 :cluster_type: The cluster type (juju, k8s)
368 :return: none
369 """
370
371 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
372 # .format(cluster_uuid, kdu_instance, filter))
373
374 nsr_id = filter.get("_id")
375 try:
376 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
377 cluster_uuid=cluster_uuid,
378 kdu_instance=kdu_instance,
379 yaml_format=False,
380 complete_status=True,
381 vca_id=vca_id,
382 )
383
384 # vcaStatus
385 db_dict = dict()
386 db_dict["vcaStatus"] = {nsr_id: vca_status}
387
388 if cluster_type in ("juju-bundle", "juju"):
389 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
390 # status in a similar way between Juju Bundles and Helm Charts on this side
391 await self.k8sclusterjuju.update_vca_status(
392 db_dict["vcaStatus"],
393 kdu_instance,
394 vca_id=vca_id,
395 )
396
397 self.logger.debug(
398 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
399 )
400
401 # write to database
402 self.update_db_2("nsrs", nsr_id, db_dict)
403 except (asyncio.CancelledError, asyncio.TimeoutError):
404 raise
405 except Exception as e:
406 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
407
408 @staticmethod
409 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
410 try:
411 env = Environment(undefined=StrictUndefined)
412 template = env.from_string(cloud_init_text)
413 return template.render(additional_params or {})
414 except UndefinedError as e:
415 raise LcmException(
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
419 )
420 except (TemplateError, TemplateNotFound) as e:
421 raise LcmException(
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
423 vnfd_id, vdu_id, e
424 )
425 )
426
427 def _get_vdu_cloud_init_content(self, vdu, vnfd):
428 cloud_init_content = cloud_init_file = None
429 try:
430 if vdu.get("cloud-init-file"):
431 base_folder = vnfd["_admin"]["storage"]
432 if base_folder["pkg-dir"]:
433 cloud_init_file = "{}/{}/cloud_init/{}".format(
434 base_folder["folder"],
435 base_folder["pkg-dir"],
436 vdu["cloud-init-file"],
437 )
438 else:
439 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
440 base_folder["folder"],
441 vdu["cloud-init-file"],
442 )
443 with self.fs.file_open(cloud_init_file, "r") as ci_file:
444 cloud_init_content = ci_file.read()
445 elif vdu.get("cloud-init"):
446 cloud_init_content = vdu["cloud-init"]
447
448 return cloud_init_content
449 except FsException as e:
450 raise LcmException(
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd["id"], vdu["id"], cloud_init_file, e
453 )
454 )
455
456 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
457 vdur = next(
458 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
459 {}
460 )
461 additional_params = vdur.get("additionalParams")
462 return parse_yaml_strings(additional_params)
463
464 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
465 """
466 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
467 :param vnfd: input vnfd
468 :param new_id: overrides vnf id if provided
469 :param additionalParams: Instantiation params for VNFs provided
470 :param nsrId: Id of the NSR
471 :return: copy of vnfd
472 """
473 vnfd_RO = deepcopy(vnfd)
474 # remove unused by RO configuration, monitoring, scaling and internal keys
475 vnfd_RO.pop("_id", None)
476 vnfd_RO.pop("_admin", None)
477 vnfd_RO.pop("monitoring-param", None)
478 vnfd_RO.pop("scaling-group-descriptor", None)
479 vnfd_RO.pop("kdu", None)
480 vnfd_RO.pop("k8s-cluster", None)
481 if new_id:
482 vnfd_RO["id"] = new_id
483
484 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
485 for vdu in get_iterable(vnfd_RO, "vdu"):
486 vdu.pop("cloud-init-file", None)
487 vdu.pop("cloud-init", None)
488 return vnfd_RO
489
490 @staticmethod
491 def ip_profile_2_RO(ip_profile):
492 RO_ip_profile = deepcopy(ip_profile)
493 if "dns-server" in RO_ip_profile:
494 if isinstance(RO_ip_profile["dns-server"], list):
495 RO_ip_profile["dns-address"] = []
496 for ds in RO_ip_profile.pop("dns-server"):
497 RO_ip_profile["dns-address"].append(ds["address"])
498 else:
499 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
500 if RO_ip_profile.get("ip-version") == "ipv4":
501 RO_ip_profile["ip-version"] = "IPv4"
502 if RO_ip_profile.get("ip-version") == "ipv6":
503 RO_ip_profile["ip-version"] = "IPv6"
504 if "dhcp-params" in RO_ip_profile:
505 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
506 return RO_ip_profile
507
508 def _get_ro_vim_id_for_vim_account(self, vim_account):
509 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
510 if db_vim["_admin"]["operationalState"] != "ENABLED":
511 raise LcmException(
512 "VIM={} is not available. operationalState={}".format(
513 vim_account, db_vim["_admin"]["operationalState"]
514 )
515 )
516 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
517 return RO_vim_id
518
519 def get_ro_wim_id_for_wim_account(self, wim_account):
520 if isinstance(wim_account, str):
521 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
522 if db_wim["_admin"]["operationalState"] != "ENABLED":
523 raise LcmException(
524 "WIM={} is not available. operationalState={}".format(
525 wim_account, db_wim["_admin"]["operationalState"]
526 )
527 )
528 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
529 return RO_wim_id
530 else:
531 return wim_account
532
533 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
534
535 db_vdu_push_list = []
536 template_vdur = []
537 db_update = {"_admin.modified": time()}
538 if vdu_create:
539 for vdu_id, vdu_count in vdu_create.items():
540 vdur = next(
541 (
542 vdur
543 for vdur in reversed(db_vnfr["vdur"])
544 if vdur["vdu-id-ref"] == vdu_id
545 ),
546 None,
547 )
548 if not vdur:
549 # Read the template saved in the db:
550 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
551 vdur_template = db_vnfr.get("vdur-template")
552 if not vdur_template:
553 raise LcmException(
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
555 vdu_id
556 )
557 )
558 vdur = vdur_template[0]
559 #Delete a template from the database after using it
560 self.db.set_one("vnfrs",
561 {"_id": db_vnfr["_id"]},
562 None,
563 pull={"vdur-template": {"_id": vdur['_id']}}
564 )
565 for count in range(vdu_count):
566 vdur_copy = deepcopy(vdur)
567 vdur_copy["status"] = "BUILD"
568 vdur_copy["status-detailed"] = None
569 vdur_copy["ip-address"] = None
570 vdur_copy["_id"] = str(uuid4())
571 vdur_copy["count-index"] += count + 1
572 vdur_copy["id"] = "{}-{}".format(
573 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
574 )
575 vdur_copy.pop("vim_info", None)
576 for iface in vdur_copy["interfaces"]:
577 if iface.get("fixed-ip"):
578 iface["ip-address"] = self.increment_ip_mac(
579 iface["ip-address"], count + 1
580 )
581 else:
582 iface.pop("ip-address", None)
583 if iface.get("fixed-mac"):
584 iface["mac-address"] = self.increment_ip_mac(
585 iface["mac-address"], count + 1
586 )
587 else:
588 iface.pop("mac-address", None)
589 if db_vnfr["vdur"]:
590 iface.pop(
591 "mgmt_vnf", None
592 ) # only first vdu can be managment of vnf
593 db_vdu_push_list.append(vdur_copy)
594 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
595 if vdu_delete:
596 if len(db_vnfr["vdur"]) == 1:
597 # The scale will move to 0 instances
598 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
599 template_vdur = [db_vnfr["vdur"][0]]
600 for vdu_id, vdu_count in vdu_delete.items():
601 if mark_delete:
602 indexes_to_delete = [
603 iv[0]
604 for iv in enumerate(db_vnfr["vdur"])
605 if iv[1]["vdu-id-ref"] == vdu_id
606 ]
607 db_update.update(
608 {
609 "vdur.{}.status".format(i): "DELETING"
610 for i in indexes_to_delete[-vdu_count:]
611 }
612 )
613 else:
614 # it must be deleted one by one because common.db does not allow otherwise
615 vdus_to_delete = [
616 v
617 for v in reversed(db_vnfr["vdur"])
618 if v["vdu-id-ref"] == vdu_id
619 ]
620 for vdu in vdus_to_delete[:vdu_count]:
621 self.db.set_one(
622 "vnfrs",
623 {"_id": db_vnfr["_id"]},
624 None,
625 pull={"vdur": {"_id": vdu["_id"]}},
626 )
627 db_push = {}
628 if db_vdu_push_list:
629 db_push["vdur"] = db_vdu_push_list
630 if template_vdur:
631 db_push["vdur-template"] = template_vdur
632 if not db_push:
633 db_push = None
634 db_vnfr["vdur-template"] = template_vdur
635 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
636 # modify passed dictionary db_vnfr
637 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
638 db_vnfr["vdur"] = db_vnfr_["vdur"]
639
640 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
641 """
642 Updates database nsr with the RO info for the created vld
643 :param ns_update_nsr: dictionary to be filled with the updated info
644 :param db_nsr: content of db_nsr. This is also modified
645 :param nsr_desc_RO: nsr descriptor from RO
646 :return: Nothing, LcmException is raised on errors
647 """
648
649 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
650 for net_RO in get_iterable(nsr_desc_RO, "nets"):
651 if vld["id"] != net_RO.get("ns_net_osm_id"):
652 continue
653 vld["vim-id"] = net_RO.get("vim_net_id")
654 vld["name"] = net_RO.get("vim_name")
655 vld["status"] = net_RO.get("status")
656 vld["status-detailed"] = net_RO.get("error_msg")
657 ns_update_nsr["vld.{}".format(vld_index)] = vld
658 break
659 else:
660 raise LcmException(
661 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
662 )
663
664 def set_vnfr_at_error(self, db_vnfrs, error_text):
665 try:
666 for db_vnfr in db_vnfrs.values():
667 vnfr_update = {"status": "ERROR"}
668 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
669 if "status" not in vdur:
670 vdur["status"] = "ERROR"
671 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
672 if error_text:
673 vdur["status-detailed"] = str(error_text)
674 vnfr_update[
675 "vdur.{}.status-detailed".format(vdu_index)
676 ] = "ERROR"
677 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
678 except DbException as e:
679 self.logger.error("Cannot update vnf. {}".format(e))
680
681 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
682 """
683 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
684 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
685 :param nsr_desc_RO: nsr descriptor from RO
686 :return: Nothing, LcmException is raised on errors
687 """
688 for vnf_index, db_vnfr in db_vnfrs.items():
689 for vnf_RO in nsr_desc_RO["vnfs"]:
690 if vnf_RO["member_vnf_index"] != vnf_index:
691 continue
692 vnfr_update = {}
693 if vnf_RO.get("ip_address"):
694 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
695 "ip_address"
696 ].split(";")[0]
697 elif not db_vnfr.get("ip-address"):
698 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
699 raise LcmExceptionNoMgmtIP(
700 "ns member_vnf_index '{}' has no IP address".format(
701 vnf_index
702 )
703 )
704
705 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
706 vdur_RO_count_index = 0
707 if vdur.get("pdu-type"):
708 continue
709 for vdur_RO in get_iterable(vnf_RO, "vms"):
710 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
711 continue
712 if vdur["count-index"] != vdur_RO_count_index:
713 vdur_RO_count_index += 1
714 continue
715 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
716 if vdur_RO.get("ip_address"):
717 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
718 else:
719 vdur["ip-address"] = None
720 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
721 vdur["name"] = vdur_RO.get("vim_name")
722 vdur["status"] = vdur_RO.get("status")
723 vdur["status-detailed"] = vdur_RO.get("error_msg")
724 for ifacer in get_iterable(vdur, "interfaces"):
725 for interface_RO in get_iterable(vdur_RO, "interfaces"):
726 if ifacer["name"] == interface_RO.get("internal_name"):
727 ifacer["ip-address"] = interface_RO.get(
728 "ip_address"
729 )
730 ifacer["mac-address"] = interface_RO.get(
731 "mac_address"
732 )
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
737 "from VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
739 )
740 )
741 vnfr_update["vdur.{}".format(vdu_index)] = vdur
742 break
743 else:
744 raise LcmException(
745 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
746 "VIM info".format(
747 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
748 )
749 )
750
751 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
752 for net_RO in get_iterable(nsr_desc_RO, "nets"):
753 if vld["id"] != net_RO.get("vnf_net_osm_id"):
754 continue
755 vld["vim-id"] = net_RO.get("vim_net_id")
756 vld["name"] = net_RO.get("vim_name")
757 vld["status"] = net_RO.get("status")
758 vld["status-detailed"] = net_RO.get("error_msg")
759 vnfr_update["vld.{}".format(vld_index)] = vld
760 break
761 else:
762 raise LcmException(
763 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
764 vnf_index, vld["id"]
765 )
766 )
767
768 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
769 break
770
771 else:
772 raise LcmException(
773 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
774 vnf_index
775 )
776 )
777
778 def _get_ns_config_info(self, nsr_id):
779 """
780 Generates a mapping between vnf,vdu elements and the N2VC id
781 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
782 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
783 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
784 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
785 """
786 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
787 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
788 mapping = {}
789 ns_config_info = {"osm-config-mapping": mapping}
790 for vca in vca_deployed_list:
791 if not vca["member-vnf-index"]:
792 continue
793 if not vca["vdu_id"]:
794 mapping[vca["member-vnf-index"]] = vca["application"]
795 else:
796 mapping[
797 "{}.{}.{}".format(
798 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
799 )
800 ] = vca["application"]
801 return ns_config_info
802
803 async def _instantiate_ng_ro(
804 self,
805 logging_text,
806 nsr_id,
807 nsd,
808 db_nsr,
809 db_nslcmop,
810 db_vnfrs,
811 db_vnfds,
812 n2vc_key_list,
813 stage,
814 start_deploy,
815 timeout_ns_deploy,
816 ):
817
818 db_vims = {}
819
820 def get_vim_account(vim_account_id):
821 nonlocal db_vims
822 if vim_account_id in db_vims:
823 return db_vims[vim_account_id]
824 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
825 db_vims[vim_account_id] = db_vim
826 return db_vim
827
828 # modify target_vld info with instantiation parameters
829 def parse_vld_instantiation_params(
830 target_vim, target_vld, vld_params, target_sdn
831 ):
832 if vld_params.get("ip-profile"):
833 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
834 "ip-profile"
835 ]
836 if vld_params.get("provider-network"):
837 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
838 "provider-network"
839 ]
840 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
841 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
842 "provider-network"
843 ]["sdn-ports"]
844 if vld_params.get("wimAccountId"):
845 target_wim = "wim:{}".format(vld_params["wimAccountId"])
846 target_vld["vim_info"][target_wim] = {}
847 for param in ("vim-network-name", "vim-network-id"):
848 if vld_params.get(param):
849 if isinstance(vld_params[param], dict):
850 for vim, vim_net in vld_params[param].items():
851 other_target_vim = "vim:" + vim
852 populate_dict(
853 target_vld["vim_info"],
854 (other_target_vim, param.replace("-", "_")),
855 vim_net,
856 )
857 else: # isinstance str
858 target_vld["vim_info"][target_vim][
859 param.replace("-", "_")
860 ] = vld_params[param]
861 if vld_params.get("common_id"):
862 target_vld["common_id"] = vld_params.get("common_id")
863
864 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
865 def update_ns_vld_target(target, ns_params):
866 for vnf_params in ns_params.get("vnf", ()):
867 if vnf_params.get("vimAccountId"):
868 target_vnf = next(
869 (
870 vnfr
871 for vnfr in db_vnfrs.values()
872 if vnf_params["member-vnf-index"]
873 == vnfr["member-vnf-index-ref"]
874 ),
875 None,
876 )
877 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
878 for a_index, a_vld in enumerate(target["ns"]["vld"]):
879 target_vld = find_in_list(
880 get_iterable(vdur, "interfaces"),
881 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
882 )
883 if target_vld:
884 if vnf_params.get("vimAccountId") not in a_vld.get(
885 "vim_info", {}
886 ):
887 target["ns"]["vld"][a_index].get("vim_info").update(
888 {
889 "vim:{}".format(vnf_params["vimAccountId"]): {
890 "vim_network_name": ""
891 }
892 }
893 )
894
895 nslcmop_id = db_nslcmop["_id"]
896 target = {
897 "name": db_nsr["name"],
898 "ns": {"vld": []},
899 "vnf": [],
900 "image": deepcopy(db_nsr["image"]),
901 "flavor": deepcopy(db_nsr["flavor"]),
902 "action_id": nslcmop_id,
903 "cloud_init_content": {},
904 }
905 for image in target["image"]:
906 image["vim_info"] = {}
907 for flavor in target["flavor"]:
908 flavor["vim_info"] = {}
909 if db_nsr.get("affinity-or-anti-affinity-group"):
910 target["affinity-or-anti-affinity-group"] = deepcopy(
911 db_nsr["affinity-or-anti-affinity-group"]
912 )
913 for affinity_or_anti_affinity_group in target[
914 "affinity-or-anti-affinity-group"
915 ]:
916 affinity_or_anti_affinity_group["vim_info"] = {}
917
918 if db_nslcmop.get("lcmOperationType") != "instantiate":
919 # get parameters of instantiation:
920 db_nslcmop_instantiate = self.db.get_list(
921 "nslcmops",
922 {
923 "nsInstanceId": db_nslcmop["nsInstanceId"],
924 "lcmOperationType": "instantiate",
925 },
926 )[-1]
927 ns_params = db_nslcmop_instantiate.get("operationParams")
928 else:
929 ns_params = db_nslcmop.get("operationParams")
930 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
931 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
932
933 cp2target = {}
934 for vld_index, vld in enumerate(db_nsr.get("vld")):
935 target_vim = "vim:{}".format(ns_params["vimAccountId"])
936 target_vld = {
937 "id": vld["id"],
938 "name": vld["name"],
939 "mgmt-network": vld.get("mgmt-network", False),
940 "type": vld.get("type"),
941 "vim_info": {
942 target_vim: {
943 "vim_network_name": vld.get("vim-network-name"),
944 "vim_account_id": ns_params["vimAccountId"],
945 }
946 },
947 }
948 # check if this network needs SDN assist
949 if vld.get("pci-interfaces"):
950 db_vim = get_vim_account(ns_params["vimAccountId"])
951 sdnc_id = db_vim["config"].get("sdn-controller")
952 if sdnc_id:
953 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
954 target_sdn = "sdn:{}".format(sdnc_id)
955 target_vld["vim_info"][target_sdn] = {
956 "sdn": True,
957 "target_vim": target_vim,
958 "vlds": [sdn_vld],
959 "type": vld.get("type"),
960 }
961
962 nsd_vnf_profiles = get_vnf_profiles(nsd)
963 for nsd_vnf_profile in nsd_vnf_profiles:
964 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
965 if cp["virtual-link-profile-id"] == vld["id"]:
966 cp2target[
967 "member_vnf:{}.{}".format(
968 cp["constituent-cpd-id"][0][
969 "constituent-base-element-id"
970 ],
971 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
972 )
973 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
974
975 # check at nsd descriptor, if there is an ip-profile
976 vld_params = {}
977 nsd_vlp = find_in_list(
978 get_virtual_link_profiles(nsd),
979 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
980 == vld["id"],
981 )
982 if (
983 nsd_vlp
984 and nsd_vlp.get("virtual-link-protocol-data")
985 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
986 ):
987 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
988 "l3-protocol-data"
989 ]
990 ip_profile_dest_data = {}
991 if "ip-version" in ip_profile_source_data:
992 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
993 "ip-version"
994 ]
995 if "cidr" in ip_profile_source_data:
996 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
997 "cidr"
998 ]
999 if "gateway-ip" in ip_profile_source_data:
1000 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
1001 "gateway-ip"
1002 ]
1003 if "dhcp-enabled" in ip_profile_source_data:
1004 ip_profile_dest_data["dhcp-params"] = {
1005 "enabled": ip_profile_source_data["dhcp-enabled"]
1006 }
1007 vld_params["ip-profile"] = ip_profile_dest_data
1008
1009 # update vld_params with instantiation params
1010 vld_instantiation_params = find_in_list(
1011 get_iterable(ns_params, "vld"),
1012 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1013 )
1014 if vld_instantiation_params:
1015 vld_params.update(vld_instantiation_params)
1016 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1017 target["ns"]["vld"].append(target_vld)
1018 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1019 update_ns_vld_target(target, ns_params)
1020
1021 for vnfr in db_vnfrs.values():
1022 vnfd = find_in_list(
1023 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1024 )
1025 vnf_params = find_in_list(
1026 get_iterable(ns_params, "vnf"),
1027 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1028 )
1029 target_vnf = deepcopy(vnfr)
1030 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1031 for vld in target_vnf.get("vld", ()):
1032 # check if connected to a ns.vld, to fill target'
1033 vnf_cp = find_in_list(
1034 vnfd.get("int-virtual-link-desc", ()),
1035 lambda cpd: cpd.get("id") == vld["id"],
1036 )
1037 if vnf_cp:
1038 ns_cp = "member_vnf:{}.{}".format(
1039 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1040 )
1041 if cp2target.get(ns_cp):
1042 vld["target"] = cp2target[ns_cp]
1043
1044 vld["vim_info"] = {
1045 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1046 }
1047 # check if this network needs SDN assist
1048 target_sdn = None
1049 if vld.get("pci-interfaces"):
1050 db_vim = get_vim_account(vnfr["vim-account-id"])
1051 sdnc_id = db_vim["config"].get("sdn-controller")
1052 if sdnc_id:
1053 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1054 target_sdn = "sdn:{}".format(sdnc_id)
1055 vld["vim_info"][target_sdn] = {
1056 "sdn": True,
1057 "target_vim": target_vim,
1058 "vlds": [sdn_vld],
1059 "type": vld.get("type"),
1060 }
1061
1062 # check at vnfd descriptor, if there is an ip-profile
1063 vld_params = {}
1064 vnfd_vlp = find_in_list(
1065 get_virtual_link_profiles(vnfd),
1066 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1067 )
1068 if (
1069 vnfd_vlp
1070 and vnfd_vlp.get("virtual-link-protocol-data")
1071 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1072 ):
1073 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1074 "l3-protocol-data"
1075 ]
1076 ip_profile_dest_data = {}
1077 if "ip-version" in ip_profile_source_data:
1078 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1079 "ip-version"
1080 ]
1081 if "cidr" in ip_profile_source_data:
1082 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1083 "cidr"
1084 ]
1085 if "gateway-ip" in ip_profile_source_data:
1086 ip_profile_dest_data[
1087 "gateway-address"
1088 ] = ip_profile_source_data["gateway-ip"]
1089 if "dhcp-enabled" in ip_profile_source_data:
1090 ip_profile_dest_data["dhcp-params"] = {
1091 "enabled": ip_profile_source_data["dhcp-enabled"]
1092 }
1093
1094 vld_params["ip-profile"] = ip_profile_dest_data
1095 # update vld_params with instantiation params
1096 if vnf_params:
1097 vld_instantiation_params = find_in_list(
1098 get_iterable(vnf_params, "internal-vld"),
1099 lambda i_vld: i_vld["name"] == vld["id"],
1100 )
1101 if vld_instantiation_params:
1102 vld_params.update(vld_instantiation_params)
1103 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1104
1105 vdur_list = []
1106 for vdur in target_vnf.get("vdur", ()):
1107 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1108 continue # This vdu must not be created
1109 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1110
1111 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1112
1113 if ssh_keys_all:
1114 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1115 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1116 if (
1117 vdu_configuration
1118 and vdu_configuration.get("config-access")
1119 and vdu_configuration.get("config-access").get("ssh-access")
1120 ):
1121 vdur["ssh-keys"] = ssh_keys_all
1122 vdur["ssh-access-required"] = vdu_configuration[
1123 "config-access"
1124 ]["ssh-access"]["required"]
1125 elif (
1126 vnf_configuration
1127 and vnf_configuration.get("config-access")
1128 and vnf_configuration.get("config-access").get("ssh-access")
1129 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1130 ):
1131 vdur["ssh-keys"] = ssh_keys_all
1132 vdur["ssh-access-required"] = vnf_configuration[
1133 "config-access"
1134 ]["ssh-access"]["required"]
1135 elif ssh_keys_instantiation and find_in_list(
1136 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1137 ):
1138 vdur["ssh-keys"] = ssh_keys_instantiation
1139
1140 self.logger.debug("NS > vdur > {}".format(vdur))
1141
1142 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1143 # cloud-init
1144 if vdud.get("cloud-init-file"):
1145 vdur["cloud-init"] = "{}:file:{}".format(
1146 vnfd["_id"], vdud.get("cloud-init-file")
1147 )
1148 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1149 if vdur["cloud-init"] not in target["cloud_init_content"]:
1150 base_folder = vnfd["_admin"]["storage"]
1151 if base_folder["pkg-dir"]:
1152 cloud_init_file = "{}/{}/cloud_init/{}".format(
1153 base_folder["folder"],
1154 base_folder["pkg-dir"],
1155 vdud.get("cloud-init-file"),
1156 )
1157 else:
1158 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1159 base_folder["folder"],
1160 vdud.get("cloud-init-file"),
1161 )
1162 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1163 target["cloud_init_content"][
1164 vdur["cloud-init"]
1165 ] = ci_file.read()
1166 elif vdud.get("cloud-init"):
1167 vdur["cloud-init"] = "{}:vdu:{}".format(
1168 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1169 )
1170 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1171 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1172 "cloud-init"
1173 ]
1174 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1175 deploy_params_vdu = self._format_additional_params(
1176 vdur.get("additionalParams") or {}
1177 )
1178 deploy_params_vdu["OSM"] = get_osm_params(
1179 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1180 )
1181 vdur["additionalParams"] = deploy_params_vdu
1182
1183 # flavor
1184 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1185 if target_vim not in ns_flavor["vim_info"]:
1186 ns_flavor["vim_info"][target_vim] = {}
1187
1188 # deal with images
1189 # in case alternative images are provided we must check if they should be applied
1190 # for the vim_type, modify the vim_type taking into account
1191 ns_image_id = int(vdur["ns-image-id"])
1192 if vdur.get("alt-image-ids"):
1193 db_vim = get_vim_account(vnfr["vim-account-id"])
1194 vim_type = db_vim["vim_type"]
1195 for alt_image_id in vdur.get("alt-image-ids"):
1196 ns_alt_image = target["image"][int(alt_image_id)]
1197 if vim_type == ns_alt_image.get("vim-type"):
1198 # must use alternative image
1199 self.logger.debug(
1200 "use alternative image id: {}".format(alt_image_id)
1201 )
1202 ns_image_id = alt_image_id
1203 vdur["ns-image-id"] = ns_image_id
1204 break
1205 ns_image = target["image"][int(ns_image_id)]
1206 if target_vim not in ns_image["vim_info"]:
1207 ns_image["vim_info"][target_vim] = {}
1208
1209 # Affinity groups
1210 if vdur.get("affinity-or-anti-affinity-group-id"):
1211 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1212 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1213 if target_vim not in ns_ags["vim_info"]:
1214 ns_ags["vim_info"][target_vim] = {}
1215
1216 vdur["vim_info"] = {target_vim: {}}
1217 # instantiation parameters
1218 # if vnf_params:
1219 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1220 # vdud["id"]), None)
1221 vdur_list.append(vdur)
1222 target_vnf["vdur"] = vdur_list
1223 target["vnf"].append(target_vnf)
1224
1225 desc = await self.RO.deploy(nsr_id, target)
1226 self.logger.debug("RO return > {}".format(desc))
1227 action_id = desc["action_id"]
1228 await self._wait_ng_ro(
1229 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1230 )
1231
1232 # Updating NSR
1233 db_nsr_update = {
1234 "_admin.deployed.RO.operational-status": "running",
1235 "detailed-status": " ".join(stage),
1236 }
1237 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1238 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1239 self._write_op_status(nslcmop_id, stage)
1240 self.logger.debug(
1241 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1242 )
1243 return
1244
1245 async def _wait_ng_ro(
1246 self,
1247 nsr_id,
1248 action_id,
1249 nslcmop_id=None,
1250 start_time=None,
1251 timeout=600,
1252 stage=None,
1253 ):
1254 detailed_status_old = None
1255 db_nsr_update = {}
1256 start_time = start_time or time()
1257 while time() <= start_time + timeout:
1258 desc_status = await self.RO.status(nsr_id, action_id)
1259 self.logger.debug("Wait NG RO > {}".format(desc_status))
1260 if desc_status["status"] == "FAILED":
1261 raise NgRoException(desc_status["details"])
1262 elif desc_status["status"] == "BUILD":
1263 if stage:
1264 stage[2] = "VIM: ({})".format(desc_status["details"])
1265 elif desc_status["status"] == "DONE":
1266 if stage:
1267 stage[2] = "Deployed at VIM"
1268 break
1269 else:
1270 assert False, "ROclient.check_ns_status returns unknown {}".format(
1271 desc_status["status"]
1272 )
1273 if stage and nslcmop_id and stage[2] != detailed_status_old:
1274 detailed_status_old = stage[2]
1275 db_nsr_update["detailed-status"] = " ".join(stage)
1276 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1277 self._write_op_status(nslcmop_id, stage)
1278 await asyncio.sleep(15, loop=self.loop)
1279 else: # timeout_ns_deploy
1280 raise NgRoException("Timeout waiting ns to deploy")
1281
1282 async def _terminate_ng_ro(
1283 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1284 ):
1285 db_nsr_update = {}
1286 failed_detail = []
1287 action_id = None
1288 start_deploy = time()
1289 try:
1290 target = {
1291 "ns": {"vld": []},
1292 "vnf": [],
1293 "image": [],
1294 "flavor": [],
1295 "action_id": nslcmop_id,
1296 }
1297 desc = await self.RO.deploy(nsr_id, target)
1298 action_id = desc["action_id"]
1299 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1301 self.logger.debug(
1302 logging_text
1303 + "ns terminate action at RO. action_id={}".format(action_id)
1304 )
1305
1306 # wait until done
1307 delete_timeout = 20 * 60 # 20 minutes
1308 await self._wait_ng_ro(
1309 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1310 )
1311
1312 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1313 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1314 # delete all nsr
1315 await self.RO.delete(nsr_id)
1316 except Exception as e:
1317 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1318 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1319 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1320 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1321 self.logger.debug(
1322 logging_text + "RO_action_id={} already deleted".format(action_id)
1323 )
1324 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1325 failed_detail.append("delete conflict: {}".format(e))
1326 self.logger.debug(
1327 logging_text
1328 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1329 )
1330 else:
1331 failed_detail.append("delete error: {}".format(e))
1332 self.logger.error(
1333 logging_text
1334 + "RO_action_id={} delete error: {}".format(action_id, e)
1335 )
1336
1337 if failed_detail:
1338 stage[2] = "Error deleting from VIM"
1339 else:
1340 stage[2] = "Deleted from VIM"
1341 db_nsr_update["detailed-status"] = " ".join(stage)
1342 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1343 self._write_op_status(nslcmop_id, stage)
1344
1345 if failed_detail:
1346 raise LcmException("; ".join(failed_detail))
1347 return
1348
1349 async def instantiate_RO(
1350 self,
1351 logging_text,
1352 nsr_id,
1353 nsd,
1354 db_nsr,
1355 db_nslcmop,
1356 db_vnfrs,
1357 db_vnfds,
1358 n2vc_key_list,
1359 stage,
1360 ):
1361 """
1362 Instantiate at RO
1363 :param logging_text: preffix text to use at logging
1364 :param nsr_id: nsr identity
1365 :param nsd: database content of ns descriptor
1366 :param db_nsr: database content of ns record
1367 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1368 :param db_vnfrs:
1369 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1370 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1371 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1372 :return: None or exception
1373 """
1374 try:
1375 start_deploy = time()
1376 ns_params = db_nslcmop.get("operationParams")
1377 if ns_params and ns_params.get("timeout_ns_deploy"):
1378 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1379 else:
1380 timeout_ns_deploy = self.timeout.get(
1381 "ns_deploy", self.timeout_ns_deploy
1382 )
1383
1384 # Check for and optionally request placement optimization. Database will be updated if placement activated
1385 stage[2] = "Waiting for Placement."
1386 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1387 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1388 for vnfr in db_vnfrs.values():
1389 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1390 break
1391 else:
1392 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1393
1394 return await self._instantiate_ng_ro(
1395 logging_text,
1396 nsr_id,
1397 nsd,
1398 db_nsr,
1399 db_nslcmop,
1400 db_vnfrs,
1401 db_vnfds,
1402 n2vc_key_list,
1403 stage,
1404 start_deploy,
1405 timeout_ns_deploy,
1406 )
1407 except Exception as e:
1408 stage[2] = "ERROR deploying at VIM"
1409 self.set_vnfr_at_error(db_vnfrs, str(e))
1410 self.logger.error(
1411 "Error deploying at VIM {}".format(e),
1412 exc_info=not isinstance(
1413 e,
1414 (
1415 ROclient.ROClientException,
1416 LcmException,
1417 DbException,
1418 NgRoException,
1419 ),
1420 ),
1421 )
1422 raise
1423
1424 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1425 """
1426 Wait for kdu to be up, get ip address
1427 :param logging_text: prefix use for logging
1428 :param nsr_id:
1429 :param vnfr_id:
1430 :param kdu_name:
1431 :return: IP address, K8s services
1432 """
1433
1434 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1435 nb_tries = 0
1436
1437 while nb_tries < 360:
1438 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1439 kdur = next(
1440 (
1441 x
1442 for x in get_iterable(db_vnfr, "kdur")
1443 if x.get("kdu-name") == kdu_name
1444 ),
1445 None,
1446 )
1447 if not kdur:
1448 raise LcmException(
1449 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1450 )
1451 if kdur.get("status"):
1452 if kdur["status"] in ("READY", "ENABLED"):
1453 return kdur.get("ip-address"), kdur.get("services")
1454 else:
1455 raise LcmException(
1456 "target KDU={} is in error state".format(kdu_name)
1457 )
1458
1459 await asyncio.sleep(10, loop=self.loop)
1460 nb_tries += 1
1461 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1462
1463 async def wait_vm_up_insert_key_ro(
1464 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1465 ):
1466 """
1467 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1468 :param logging_text: prefix use for logging
1469 :param nsr_id:
1470 :param vnfr_id:
1471 :param vdu_id:
1472 :param vdu_index:
1473 :param pub_key: public ssh key to inject, None to skip
1474 :param user: user to apply the public ssh key
1475 :return: IP address
1476 """
1477
1478 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1479 ro_nsr_id = None
1480 ip_address = None
1481 nb_tries = 0
1482 target_vdu_id = None
1483 ro_retries = 0
1484
1485 while True:
1486
1487 ro_retries += 1
1488 if ro_retries >= 360: # 1 hour
1489 raise LcmException(
1490 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1491 )
1492
1493 await asyncio.sleep(10, loop=self.loop)
1494
1495 # get ip address
1496 if not target_vdu_id:
1497 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1498
1499 if not vdu_id: # for the VNF case
1500 if db_vnfr.get("status") == "ERROR":
1501 raise LcmException(
1502 "Cannot inject ssh-key because target VNF is in error state"
1503 )
1504 ip_address = db_vnfr.get("ip-address")
1505 if not ip_address:
1506 continue
1507 vdur = next(
1508 (
1509 x
1510 for x in get_iterable(db_vnfr, "vdur")
1511 if x.get("ip-address") == ip_address
1512 ),
1513 None,
1514 )
1515 else: # VDU case
1516 vdur = next(
1517 (
1518 x
1519 for x in get_iterable(db_vnfr, "vdur")
1520 if x.get("vdu-id-ref") == vdu_id
1521 and x.get("count-index") == vdu_index
1522 ),
1523 None,
1524 )
1525
1526 if (
1527 not vdur and len(db_vnfr.get("vdur", ())) == 1
1528 ): # If only one, this should be the target vdu
1529 vdur = db_vnfr["vdur"][0]
1530 if not vdur:
1531 raise LcmException(
1532 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1533 vnfr_id, vdu_id, vdu_index
1534 )
1535 )
1536 # New generation RO stores information at "vim_info"
1537 ng_ro_status = None
1538 target_vim = None
1539 if vdur.get("vim_info"):
1540 target_vim = next(
1541 t for t in vdur["vim_info"]
1542 ) # there should be only one key
1543 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1544 if (
1545 vdur.get("pdu-type")
1546 or vdur.get("status") == "ACTIVE"
1547 or ng_ro_status == "ACTIVE"
1548 ):
1549 ip_address = vdur.get("ip-address")
1550 if not ip_address:
1551 continue
1552 target_vdu_id = vdur["vdu-id-ref"]
1553 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1554 raise LcmException(
1555 "Cannot inject ssh-key because target VM is in error state"
1556 )
1557
1558 if not target_vdu_id:
1559 continue
1560
1561 # inject public key into machine
1562 if pub_key and user:
1563 self.logger.debug(logging_text + "Inserting RO key")
1564 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1565 if vdur.get("pdu-type"):
1566 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1567 return ip_address
1568 try:
1569 ro_vm_id = "{}-{}".format(
1570 db_vnfr["member-vnf-index-ref"], target_vdu_id
1571 ) # TODO add vdu_index
1572 if self.ng_ro:
1573 target = {
1574 "action": {
1575 "action": "inject_ssh_key",
1576 "key": pub_key,
1577 "user": user,
1578 },
1579 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1580 }
1581 desc = await self.RO.deploy(nsr_id, target)
1582 action_id = desc["action_id"]
1583 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1584 break
1585 else:
1586 # wait until NS is deployed at RO
1587 if not ro_nsr_id:
1588 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1589 ro_nsr_id = deep_get(
1590 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1591 )
1592 if not ro_nsr_id:
1593 continue
1594 result_dict = await self.RO.create_action(
1595 item="ns",
1596 item_id_name=ro_nsr_id,
1597 descriptor={
1598 "add_public_key": pub_key,
1599 "vms": [ro_vm_id],
1600 "user": user,
1601 },
1602 )
1603 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1604 if not result_dict or not isinstance(result_dict, dict):
1605 raise LcmException(
1606 "Unknown response from RO when injecting key"
1607 )
1608 for result in result_dict.values():
1609 if result.get("vim_result") == 200:
1610 break
1611 else:
1612 raise ROclient.ROClientException(
1613 "error injecting key: {}".format(
1614 result.get("description")
1615 )
1616 )
1617 break
1618 except NgRoException as e:
1619 raise LcmException(
1620 "Reaching max tries injecting key. Error: {}".format(e)
1621 )
1622 except ROclient.ROClientException as e:
1623 if not nb_tries:
1624 self.logger.debug(
1625 logging_text
1626 + "error injecting key: {}. Retrying until {} seconds".format(
1627 e, 20 * 10
1628 )
1629 )
1630 nb_tries += 1
1631 if nb_tries >= 20:
1632 raise LcmException(
1633 "Reaching max tries injecting key. Error: {}".format(e)
1634 )
1635 else:
1636 break
1637
1638 return ip_address
1639
1640 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1641 """
1642 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1643 """
1644 my_vca = vca_deployed_list[vca_index]
1645 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1646 # vdu or kdu: no dependencies
1647 return
1648 timeout = 300
1649 while timeout >= 0:
1650 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1651 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1652 configuration_status_list = db_nsr["configurationStatus"]
1653 for index, vca_deployed in enumerate(configuration_status_list):
1654 if index == vca_index:
1655 # myself
1656 continue
1657 if not my_vca.get("member-vnf-index") or (
1658 vca_deployed.get("member-vnf-index")
1659 == my_vca.get("member-vnf-index")
1660 ):
1661 internal_status = configuration_status_list[index].get("status")
1662 if internal_status == "READY":
1663 continue
1664 elif internal_status == "BROKEN":
1665 raise LcmException(
1666 "Configuration aborted because dependent charm/s has failed"
1667 )
1668 else:
1669 break
1670 else:
1671 # no dependencies, return
1672 return
1673 await asyncio.sleep(10)
1674 timeout -= 1
1675
1676 raise LcmException("Configuration aborted because dependent charm/s timeout")
1677
1678 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1679 vca_id = None
1680 if db_vnfr:
1681 vca_id = deep_get(db_vnfr, ("vca-id",))
1682 elif db_nsr:
1683 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1684 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1685 return vca_id
1686
1687 async def instantiate_N2VC(
1688 self,
1689 logging_text,
1690 vca_index,
1691 nsi_id,
1692 db_nsr,
1693 db_vnfr,
1694 vdu_id,
1695 kdu_name,
1696 vdu_index,
1697 config_descriptor,
1698 deploy_params,
1699 base_folder,
1700 nslcmop_id,
1701 stage,
1702 vca_type,
1703 vca_name,
1704 ee_config_descriptor,
1705 ):
1706 nsr_id = db_nsr["_id"]
1707 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1708 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1709 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1710 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1711 db_dict = {
1712 "collection": "nsrs",
1713 "filter": {"_id": nsr_id},
1714 "path": db_update_entry,
1715 }
1716 step = ""
1717 try:
1718
1719 element_type = "NS"
1720 element_under_configuration = nsr_id
1721
1722 vnfr_id = None
1723 if db_vnfr:
1724 vnfr_id = db_vnfr["_id"]
1725 osm_config["osm"]["vnf_id"] = vnfr_id
1726
1727 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1728
1729 if vca_type == "native_charm":
1730 index_number = 0
1731 else:
1732 index_number = vdu_index or 0
1733
1734 if vnfr_id:
1735 element_type = "VNF"
1736 element_under_configuration = vnfr_id
1737 namespace += ".{}-{}".format(vnfr_id, index_number)
1738 if vdu_id:
1739 namespace += ".{}-{}".format(vdu_id, index_number)
1740 element_type = "VDU"
1741 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1742 osm_config["osm"]["vdu_id"] = vdu_id
1743 elif kdu_name:
1744 namespace += ".{}".format(kdu_name)
1745 element_type = "KDU"
1746 element_under_configuration = kdu_name
1747 osm_config["osm"]["kdu_name"] = kdu_name
1748
1749 # Get artifact path
1750 if base_folder["pkg-dir"]:
1751 artifact_path = "{}/{}/{}/{}".format(
1752 base_folder["folder"],
1753 base_folder["pkg-dir"],
1754 "charms"
1755 if vca_type
1756 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1757 else "helm-charts",
1758 vca_name,
1759 )
1760 else:
1761 artifact_path = "{}/Scripts/{}/{}/".format(
1762 base_folder["folder"],
1763 "charms"
1764 if vca_type
1765 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1766 else "helm-charts",
1767 vca_name,
1768 )
1769
1770 self.logger.debug("Artifact path > {}".format(artifact_path))
1771
1772 # get initial_config_primitive_list that applies to this element
1773 initial_config_primitive_list = config_descriptor.get(
1774 "initial-config-primitive"
1775 )
1776
1777 self.logger.debug(
1778 "Initial config primitive list > {}".format(
1779 initial_config_primitive_list
1780 )
1781 )
1782
1783 # add config if not present for NS charm
1784 ee_descriptor_id = ee_config_descriptor.get("id")
1785 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1786 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1787 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1788 )
1789
1790 self.logger.debug(
1791 "Initial config primitive list #2 > {}".format(
1792 initial_config_primitive_list
1793 )
1794 )
1795 # n2vc_redesign STEP 3.1
1796 # find old ee_id if exists
1797 ee_id = vca_deployed.get("ee_id")
1798
1799 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1800 # create or register execution environment in VCA
1801 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1802
1803 self._write_configuration_status(
1804 nsr_id=nsr_id,
1805 vca_index=vca_index,
1806 status="CREATING",
1807 element_under_configuration=element_under_configuration,
1808 element_type=element_type,
1809 )
1810
1811 step = "create execution environment"
1812 self.logger.debug(logging_text + step)
1813
1814 ee_id = None
1815 credentials = None
1816 if vca_type == "k8s_proxy_charm":
1817 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1818 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1819 namespace=namespace,
1820 artifact_path=artifact_path,
1821 db_dict=db_dict,
1822 vca_id=vca_id,
1823 )
1824 elif vca_type == "helm" or vca_type == "helm-v3":
1825 ee_id, credentials = await self.vca_map[
1826 vca_type
1827 ].create_execution_environment(
1828 namespace=namespace,
1829 reuse_ee_id=ee_id,
1830 db_dict=db_dict,
1831 config=osm_config,
1832 artifact_path=artifact_path,
1833 vca_type=vca_type,
1834 )
1835 else:
1836 ee_id, credentials = await self.vca_map[
1837 vca_type
1838 ].create_execution_environment(
1839 namespace=namespace,
1840 reuse_ee_id=ee_id,
1841 db_dict=db_dict,
1842 vca_id=vca_id,
1843 )
1844
1845 elif vca_type == "native_charm":
1846 step = "Waiting to VM being up and getting IP address"
1847 self.logger.debug(logging_text + step)
1848 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1849 logging_text,
1850 nsr_id,
1851 vnfr_id,
1852 vdu_id,
1853 vdu_index,
1854 user=None,
1855 pub_key=None,
1856 )
1857 credentials = {"hostname": rw_mgmt_ip}
1858 # get username
1859 username = deep_get(
1860 config_descriptor, ("config-access", "ssh-access", "default-user")
1861 )
1862 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1863 # merged. Meanwhile let's get username from initial-config-primitive
1864 if not username and initial_config_primitive_list:
1865 for config_primitive in initial_config_primitive_list:
1866 for param in config_primitive.get("parameter", ()):
1867 if param["name"] == "ssh-username":
1868 username = param["value"]
1869 break
1870 if not username:
1871 raise LcmException(
1872 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1873 "'config-access.ssh-access.default-user'"
1874 )
1875 credentials["username"] = username
1876 # n2vc_redesign STEP 3.2
1877
1878 self._write_configuration_status(
1879 nsr_id=nsr_id,
1880 vca_index=vca_index,
1881 status="REGISTERING",
1882 element_under_configuration=element_under_configuration,
1883 element_type=element_type,
1884 )
1885
1886 step = "register execution environment {}".format(credentials)
1887 self.logger.debug(logging_text + step)
1888 ee_id = await self.vca_map[vca_type].register_execution_environment(
1889 credentials=credentials,
1890 namespace=namespace,
1891 db_dict=db_dict,
1892 vca_id=vca_id,
1893 )
1894
1895 # for compatibility with MON/POL modules, the need model and application name at database
1896 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1897 ee_id_parts = ee_id.split(".")
1898 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1899 if len(ee_id_parts) >= 2:
1900 model_name = ee_id_parts[0]
1901 application_name = ee_id_parts[1]
1902 db_nsr_update[db_update_entry + "model"] = model_name
1903 db_nsr_update[db_update_entry + "application"] = application_name
1904
1905 # n2vc_redesign STEP 3.3
1906 step = "Install configuration Software"
1907
1908 self._write_configuration_status(
1909 nsr_id=nsr_id,
1910 vca_index=vca_index,
1911 status="INSTALLING SW",
1912 element_under_configuration=element_under_configuration,
1913 element_type=element_type,
1914 other_update=db_nsr_update,
1915 )
1916
1917 # TODO check if already done
1918 self.logger.debug(logging_text + step)
1919 config = None
1920 if vca_type == "native_charm":
1921 config_primitive = next(
1922 (p for p in initial_config_primitive_list if p["name"] == "config"),
1923 None,
1924 )
1925 if config_primitive:
1926 config = self._map_primitive_params(
1927 config_primitive, {}, deploy_params
1928 )
1929 num_units = 1
1930 if vca_type == "lxc_proxy_charm":
1931 if element_type == "NS":
1932 num_units = db_nsr.get("config-units") or 1
1933 elif element_type == "VNF":
1934 num_units = db_vnfr.get("config-units") or 1
1935 elif element_type == "VDU":
1936 for v in db_vnfr["vdur"]:
1937 if vdu_id == v["vdu-id-ref"]:
1938 num_units = v.get("config-units") or 1
1939 break
1940 if vca_type != "k8s_proxy_charm":
1941 await self.vca_map[vca_type].install_configuration_sw(
1942 ee_id=ee_id,
1943 artifact_path=artifact_path,
1944 db_dict=db_dict,
1945 config=config,
1946 num_units=num_units,
1947 vca_id=vca_id,
1948 vca_type=vca_type,
1949 )
1950
1951 # write in db flag of configuration_sw already installed
1952 self.update_db_2(
1953 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1954 )
1955
1956 # add relations for this VCA (wait for other peers related with this VCA)
1957 await self._add_vca_relations(
1958 logging_text=logging_text,
1959 nsr_id=nsr_id,
1960 vca_type=vca_type,
1961 vca_index=vca_index,
1962 )
1963
1964 # if SSH access is required, then get execution environment SSH public
1965 # if native charm we have waited already to VM be UP
1966 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1967 pub_key = None
1968 user = None
1969 # self.logger.debug("get ssh key block")
1970 if deep_get(
1971 config_descriptor, ("config-access", "ssh-access", "required")
1972 ):
1973 # self.logger.debug("ssh key needed")
1974 # Needed to inject a ssh key
1975 user = deep_get(
1976 config_descriptor,
1977 ("config-access", "ssh-access", "default-user"),
1978 )
1979 step = "Install configuration Software, getting public ssh key"
1980 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1981 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1982 )
1983
1984 step = "Insert public key into VM user={} ssh_key={}".format(
1985 user, pub_key
1986 )
1987 else:
1988 # self.logger.debug("no need to get ssh key")
1989 step = "Waiting to VM being up and getting IP address"
1990 self.logger.debug(logging_text + step)
1991
1992 # default rw_mgmt_ip to None, avoiding the non definition of the variable
1993 rw_mgmt_ip = None
1994
1995 # n2vc_redesign STEP 5.1
1996 # wait for RO (ip-address) Insert pub_key into VM
1997 if vnfr_id:
1998 if kdu_name:
1999 rw_mgmt_ip, services = await self.wait_kdu_up(
2000 logging_text, nsr_id, vnfr_id, kdu_name
2001 )
2002 vnfd = self.db.get_one(
2003 "vnfds_revisions",
2004 {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2005 )
2006 kdu = get_kdu(vnfd, kdu_name)
2007 kdu_services = [
2008 service["name"] for service in get_kdu_services(kdu)
2009 ]
2010 exposed_services = []
2011 for service in services:
2012 if any(s in service["name"] for s in kdu_services):
2013 exposed_services.append(service)
2014 await self.vca_map[vca_type].exec_primitive(
2015 ee_id=ee_id,
2016 primitive_name="config",
2017 params_dict={
2018 "osm-config": json.dumps(
2019 OsmConfigBuilder(
2020 k8s={"services": exposed_services}
2021 ).build()
2022 )
2023 },
2024 vca_id=vca_id,
2025 )
2026
2027 # This verification is needed in order to avoid trying to add a public key
2028 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2029 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2030 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2031 # or it is a KNF)
2032 elif db_vnfr.get('vdur'):
2033 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
2034 logging_text,
2035 nsr_id,
2036 vnfr_id,
2037 vdu_id,
2038 vdu_index,
2039 user=user,
2040 pub_key=pub_key,
2041 )
2042
2043 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2044
2045 # store rw_mgmt_ip in deploy params for later replacement
2046 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2047
2048 # n2vc_redesign STEP 6 Execute initial config primitive
2049 step = "execute initial config primitive"
2050
2051 # wait for dependent primitives execution (NS -> VNF -> VDU)
2052 if initial_config_primitive_list:
2053 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2054
2055 # stage, in function of element type: vdu, kdu, vnf or ns
2056 my_vca = vca_deployed_list[vca_index]
2057 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2058 # VDU or KDU
2059 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2060 elif my_vca.get("member-vnf-index"):
2061 # VNF
2062 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2063 else:
2064 # NS
2065 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2066
2067 self._write_configuration_status(
2068 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2069 )
2070
2071 self._write_op_status(op_id=nslcmop_id, stage=stage)
2072
2073 check_if_terminated_needed = True
2074 for initial_config_primitive in initial_config_primitive_list:
2075 # adding information on the vca_deployed if it is a NS execution environment
2076 if not vca_deployed["member-vnf-index"]:
2077 deploy_params["ns_config_info"] = json.dumps(
2078 self._get_ns_config_info(nsr_id)
2079 )
2080 # TODO check if already done
2081 primitive_params_ = self._map_primitive_params(
2082 initial_config_primitive, {}, deploy_params
2083 )
2084
2085 step = "execute primitive '{}' params '{}'".format(
2086 initial_config_primitive["name"], primitive_params_
2087 )
2088 self.logger.debug(logging_text + step)
2089 await self.vca_map[vca_type].exec_primitive(
2090 ee_id=ee_id,
2091 primitive_name=initial_config_primitive["name"],
2092 params_dict=primitive_params_,
2093 db_dict=db_dict,
2094 vca_id=vca_id,
2095 vca_type=vca_type,
2096 )
2097 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2098 if check_if_terminated_needed:
2099 if config_descriptor.get("terminate-config-primitive"):
2100 self.update_db_2(
2101 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2102 )
2103 check_if_terminated_needed = False
2104
2105 # TODO register in database that primitive is done
2106
2107 # STEP 7 Configure metrics
2108 if vca_type == "helm" or vca_type == "helm-v3":
2109 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2110 ee_id=ee_id,
2111 artifact_path=artifact_path,
2112 ee_config_descriptor=ee_config_descriptor,
2113 vnfr_id=vnfr_id,
2114 nsr_id=nsr_id,
2115 target_ip=rw_mgmt_ip,
2116 )
2117 if prometheus_jobs:
2118 self.update_db_2(
2119 "nsrs",
2120 nsr_id,
2121 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2122 )
2123
2124 for job in prometheus_jobs:
2125 self.db.set_one(
2126 "prometheus_jobs",
2127 {"job_name": job["job_name"]},
2128 job,
2129 upsert=True,
2130 fail_on_empty=False,
2131 )
2132
2133 step = "instantiated at VCA"
2134 self.logger.debug(logging_text + step)
2135
2136 self._write_configuration_status(
2137 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2138 )
2139
2140 except Exception as e: # TODO not use Exception but N2VC exception
2141 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2142 if not isinstance(
2143 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2144 ):
2145 self.logger.error(
2146 "Exception while {} : {}".format(step, e), exc_info=True
2147 )
2148 self._write_configuration_status(
2149 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2150 )
2151 raise LcmException("{} {}".format(step, e)) from e
2152
2153 def _write_ns_status(
2154 self,
2155 nsr_id: str,
2156 ns_state: str,
2157 current_operation: str,
2158 current_operation_id: str,
2159 error_description: str = None,
2160 error_detail: str = None,
2161 other_update: dict = None,
2162 ):
2163 """
2164 Update db_nsr fields.
2165 :param nsr_id:
2166 :param ns_state:
2167 :param current_operation:
2168 :param current_operation_id:
2169 :param error_description:
2170 :param error_detail:
2171 :param other_update: Other required changes at database if provided, will be cleared
2172 :return:
2173 """
2174 try:
2175 db_dict = other_update or {}
2176 db_dict[
2177 "_admin.nslcmop"
2178 ] = current_operation_id # for backward compatibility
2179 db_dict["_admin.current-operation"] = current_operation_id
2180 db_dict["_admin.operation-type"] = (
2181 current_operation if current_operation != "IDLE" else None
2182 )
2183 db_dict["currentOperation"] = current_operation
2184 db_dict["currentOperationID"] = current_operation_id
2185 db_dict["errorDescription"] = error_description
2186 db_dict["errorDetail"] = error_detail
2187
2188 if ns_state:
2189 db_dict["nsState"] = ns_state
2190 self.update_db_2("nsrs", nsr_id, db_dict)
2191 except DbException as e:
2192 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2193
2194 def _write_op_status(
2195 self,
2196 op_id: str,
2197 stage: list = None,
2198 error_message: str = None,
2199 queuePosition: int = 0,
2200 operation_state: str = None,
2201 other_update: dict = None,
2202 ):
2203 try:
2204 db_dict = other_update or {}
2205 db_dict["queuePosition"] = queuePosition
2206 if isinstance(stage, list):
2207 db_dict["stage"] = stage[0]
2208 db_dict["detailed-status"] = " ".join(stage)
2209 elif stage is not None:
2210 db_dict["stage"] = str(stage)
2211
2212 if error_message is not None:
2213 db_dict["errorMessage"] = error_message
2214 if operation_state is not None:
2215 db_dict["operationState"] = operation_state
2216 db_dict["statusEnteredTime"] = time()
2217 self.update_db_2("nslcmops", op_id, db_dict)
2218 except DbException as e:
2219 self.logger.warn(
2220 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2221 )
2222
2223 def _write_all_config_status(self, db_nsr: dict, status: str):
2224 try:
2225 nsr_id = db_nsr["_id"]
2226 # configurationStatus
2227 config_status = db_nsr.get("configurationStatus")
2228 if config_status:
2229 db_nsr_update = {
2230 "configurationStatus.{}.status".format(index): status
2231 for index, v in enumerate(config_status)
2232 if v
2233 }
2234 # update status
2235 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2236
2237 except DbException as e:
2238 self.logger.warn(
2239 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2240 )
2241
2242 def _write_configuration_status(
2243 self,
2244 nsr_id: str,
2245 vca_index: int,
2246 status: str = None,
2247 element_under_configuration: str = None,
2248 element_type: str = None,
2249 other_update: dict = None,
2250 ):
2251
2252 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2253 # .format(vca_index, status))
2254
2255 try:
2256 db_path = "configurationStatus.{}.".format(vca_index)
2257 db_dict = other_update or {}
2258 if status:
2259 db_dict[db_path + "status"] = status
2260 if element_under_configuration:
2261 db_dict[
2262 db_path + "elementUnderConfiguration"
2263 ] = element_under_configuration
2264 if element_type:
2265 db_dict[db_path + "elementType"] = element_type
2266 self.update_db_2("nsrs", nsr_id, db_dict)
2267 except DbException as e:
2268 self.logger.warn(
2269 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2270 status, nsr_id, vca_index, e
2271 )
2272 )
2273
2274 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2275 """
2276 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2277 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2278 Database is used because the result can be obtained from a different LCM worker in case of HA.
2279 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2280 :param db_nslcmop: database content of nslcmop
2281 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2282 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2283 computed 'vim-account-id'
2284 """
2285 modified = False
2286 nslcmop_id = db_nslcmop["_id"]
2287 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2288 if placement_engine == "PLA":
2289 self.logger.debug(
2290 logging_text + "Invoke and wait for placement optimization"
2291 )
2292 await self.msg.aiowrite(
2293 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2294 )
2295 db_poll_interval = 5
2296 wait = db_poll_interval * 10
2297 pla_result = None
2298 while not pla_result and wait >= 0:
2299 await asyncio.sleep(db_poll_interval)
2300 wait -= db_poll_interval
2301 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2302 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2303
2304 if not pla_result:
2305 raise LcmException(
2306 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2307 )
2308
2309 for pla_vnf in pla_result["vnf"]:
2310 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2311 if not pla_vnf.get("vimAccountId") or not vnfr:
2312 continue
2313 modified = True
2314 self.db.set_one(
2315 "vnfrs",
2316 {"_id": vnfr["_id"]},
2317 {"vim-account-id": pla_vnf["vimAccountId"]},
2318 )
2319 # Modifies db_vnfrs
2320 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2321 return modified
2322
2323 def update_nsrs_with_pla_result(self, params):
2324 try:
2325 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2326 self.update_db_2(
2327 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2328 )
2329 except Exception as e:
2330 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2331
2332 async def instantiate(self, nsr_id, nslcmop_id):
2333 """
2334
2335 :param nsr_id: ns instance to deploy
2336 :param nslcmop_id: operation to run
2337 :return:
2338 """
2339
2340 # Try to lock HA task here
2341 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2342 if not task_is_locked_by_me:
2343 self.logger.debug(
2344 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2345 )
2346 return
2347
2348 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2349 self.logger.debug(logging_text + "Enter")
2350
2351 # get all needed from database
2352
2353 # database nsrs record
2354 db_nsr = None
2355
2356 # database nslcmops record
2357 db_nslcmop = None
2358
2359 # update operation on nsrs
2360 db_nsr_update = {}
2361 # update operation on nslcmops
2362 db_nslcmop_update = {}
2363
2364 nslcmop_operation_state = None
2365 db_vnfrs = {} # vnf's info indexed by member-index
2366 # n2vc_info = {}
2367 tasks_dict_info = {} # from task to info text
2368 exc = None
2369 error_list = []
2370 stage = [
2371 "Stage 1/5: preparation of the environment.",
2372 "Waiting for previous operations to terminate.",
2373 "",
2374 ]
2375 # ^ stage, step, VIM progress
2376 try:
2377 # wait for any previous tasks in process
2378 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2379
2380 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2381 stage[1] = "Reading from database."
2382 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2383 db_nsr_update["detailed-status"] = "creating"
2384 db_nsr_update["operational-status"] = "init"
2385 self._write_ns_status(
2386 nsr_id=nsr_id,
2387 ns_state="BUILDING",
2388 current_operation="INSTANTIATING",
2389 current_operation_id=nslcmop_id,
2390 other_update=db_nsr_update,
2391 )
2392 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2393
2394 # read from db: operation
2395 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2396 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2397 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2398 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2399 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2400 )
2401 ns_params = db_nslcmop.get("operationParams")
2402 if ns_params and ns_params.get("timeout_ns_deploy"):
2403 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2404 else:
2405 timeout_ns_deploy = self.timeout.get(
2406 "ns_deploy", self.timeout_ns_deploy
2407 )
2408
2409 # read from db: ns
2410 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2411 self.logger.debug(logging_text + stage[1])
2412 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2413 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2414 self.logger.debug(logging_text + stage[1])
2415 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2416 self.fs.sync(db_nsr["nsd-id"])
2417 db_nsr["nsd"] = nsd
2418 # nsr_name = db_nsr["name"] # TODO short-name??
2419
2420 # read from db: vnf's of this ns
2421 stage[1] = "Getting vnfrs from db."
2422 self.logger.debug(logging_text + stage[1])
2423 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2424
2425 # read from db: vnfd's for every vnf
2426 db_vnfds = [] # every vnfd data
2427
2428 # for each vnf in ns, read vnfd
2429 for vnfr in db_vnfrs_list:
2430 if vnfr.get("kdur"):
2431 kdur_list = []
2432 for kdur in vnfr["kdur"]:
2433 if kdur.get("additionalParams"):
2434 kdur["additionalParams"] = json.loads(
2435 kdur["additionalParams"]
2436 )
2437 kdur_list.append(kdur)
2438 vnfr["kdur"] = kdur_list
2439
2440 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2441 vnfd_id = vnfr["vnfd-id"]
2442 vnfd_ref = vnfr["vnfd-ref"]
2443 self.fs.sync(vnfd_id)
2444
2445 # if we haven't this vnfd, read it from db
2446 if vnfd_id not in db_vnfds:
2447 # read from db
2448 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2449 vnfd_id, vnfd_ref
2450 )
2451 self.logger.debug(logging_text + stage[1])
2452 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2453
2454 # store vnfd
2455 db_vnfds.append(vnfd)
2456
2457 # Get or generates the _admin.deployed.VCA list
2458 vca_deployed_list = None
2459 if db_nsr["_admin"].get("deployed"):
2460 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2461 if vca_deployed_list is None:
2462 vca_deployed_list = []
2463 configuration_status_list = []
2464 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2465 db_nsr_update["configurationStatus"] = configuration_status_list
2466 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2467 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2468 elif isinstance(vca_deployed_list, dict):
2469 # maintain backward compatibility. Change a dict to list at database
2470 vca_deployed_list = list(vca_deployed_list.values())
2471 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2472 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2473
2474 if not isinstance(
2475 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2476 ):
2477 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2478 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2479
2480 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2481 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2482 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2483 self.db.set_list(
2484 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2485 )
2486
2487 # n2vc_redesign STEP 2 Deploy Network Scenario
2488 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2489 self._write_op_status(op_id=nslcmop_id, stage=stage)
2490
2491 stage[1] = "Deploying KDUs."
2492 # self.logger.debug(logging_text + "Before deploy_kdus")
2493 # Call to deploy_kdus in case exists the "vdu:kdu" param
2494 await self.deploy_kdus(
2495 logging_text=logging_text,
2496 nsr_id=nsr_id,
2497 nslcmop_id=nslcmop_id,
2498 db_vnfrs=db_vnfrs,
2499 db_vnfds=db_vnfds,
2500 task_instantiation_info=tasks_dict_info,
2501 )
2502
2503 stage[1] = "Getting VCA public key."
2504 # n2vc_redesign STEP 1 Get VCA public ssh-key
2505 # feature 1429. Add n2vc public key to needed VMs
2506 n2vc_key = self.n2vc.get_public_key()
2507 n2vc_key_list = [n2vc_key]
2508 if self.vca_config.get("public_key"):
2509 n2vc_key_list.append(self.vca_config["public_key"])
2510
2511 stage[1] = "Deploying NS at VIM."
2512 task_ro = asyncio.ensure_future(
2513 self.instantiate_RO(
2514 logging_text=logging_text,
2515 nsr_id=nsr_id,
2516 nsd=nsd,
2517 db_nsr=db_nsr,
2518 db_nslcmop=db_nslcmop,
2519 db_vnfrs=db_vnfrs,
2520 db_vnfds=db_vnfds,
2521 n2vc_key_list=n2vc_key_list,
2522 stage=stage,
2523 )
2524 )
2525 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2526 tasks_dict_info[task_ro] = "Deploying at VIM"
2527
2528 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2529 stage[1] = "Deploying Execution Environments."
2530 self.logger.debug(logging_text + stage[1])
2531
2532 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2533 for vnf_profile in get_vnf_profiles(nsd):
2534 vnfd_id = vnf_profile["vnfd-id"]
2535 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2536 member_vnf_index = str(vnf_profile["id"])
2537 db_vnfr = db_vnfrs[member_vnf_index]
2538 base_folder = vnfd["_admin"]["storage"]
2539 vdu_id = None
2540 vdu_index = 0
2541 vdu_name = None
2542 kdu_name = None
2543
2544 # Get additional parameters
2545 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2546 if db_vnfr.get("additionalParamsForVnf"):
2547 deploy_params.update(
2548 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2549 )
2550
2551 descriptor_config = get_configuration(vnfd, vnfd["id"])
2552 if descriptor_config:
2553 self._deploy_n2vc(
2554 logging_text=logging_text
2555 + "member_vnf_index={} ".format(member_vnf_index),
2556 db_nsr=db_nsr,
2557 db_vnfr=db_vnfr,
2558 nslcmop_id=nslcmop_id,
2559 nsr_id=nsr_id,
2560 nsi_id=nsi_id,
2561 vnfd_id=vnfd_id,
2562 vdu_id=vdu_id,
2563 kdu_name=kdu_name,
2564 member_vnf_index=member_vnf_index,
2565 vdu_index=vdu_index,
2566 vdu_name=vdu_name,
2567 deploy_params=deploy_params,
2568 descriptor_config=descriptor_config,
2569 base_folder=base_folder,
2570 task_instantiation_info=tasks_dict_info,
2571 stage=stage,
2572 )
2573
2574 # Deploy charms for each VDU that supports one.
2575 for vdud in get_vdu_list(vnfd):
2576 vdu_id = vdud["id"]
2577 descriptor_config = get_configuration(vnfd, vdu_id)
2578 vdur = find_in_list(
2579 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2580 )
2581
2582 if vdur.get("additionalParams"):
2583 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2584 else:
2585 deploy_params_vdu = deploy_params
2586 deploy_params_vdu["OSM"] = get_osm_params(
2587 db_vnfr, vdu_id, vdu_count_index=0
2588 )
2589 vdud_count = get_number_of_instances(vnfd, vdu_id)
2590
2591 self.logger.debug("VDUD > {}".format(vdud))
2592 self.logger.debug(
2593 "Descriptor config > {}".format(descriptor_config)
2594 )
2595 if descriptor_config:
2596 vdu_name = None
2597 kdu_name = None
2598 for vdu_index in range(vdud_count):
2599 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2600 self._deploy_n2vc(
2601 logging_text=logging_text
2602 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2603 member_vnf_index, vdu_id, vdu_index
2604 ),
2605 db_nsr=db_nsr,
2606 db_vnfr=db_vnfr,
2607 nslcmop_id=nslcmop_id,
2608 nsr_id=nsr_id,
2609 nsi_id=nsi_id,
2610 vnfd_id=vnfd_id,
2611 vdu_id=vdu_id,
2612 kdu_name=kdu_name,
2613 member_vnf_index=member_vnf_index,
2614 vdu_index=vdu_index,
2615 vdu_name=vdu_name,
2616 deploy_params=deploy_params_vdu,
2617 descriptor_config=descriptor_config,
2618 base_folder=base_folder,
2619 task_instantiation_info=tasks_dict_info,
2620 stage=stage,
2621 )
2622 for kdud in get_kdu_list(vnfd):
2623 kdu_name = kdud["name"]
2624 descriptor_config = get_configuration(vnfd, kdu_name)
2625 if descriptor_config:
2626 vdu_id = None
2627 vdu_index = 0
2628 vdu_name = None
2629 kdur = next(
2630 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2631 )
2632 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2633 if kdur.get("additionalParams"):
2634 deploy_params_kdu.update(
2635 parse_yaml_strings(kdur["additionalParams"].copy())
2636 )
2637
2638 self._deploy_n2vc(
2639 logging_text=logging_text,
2640 db_nsr=db_nsr,
2641 db_vnfr=db_vnfr,
2642 nslcmop_id=nslcmop_id,
2643 nsr_id=nsr_id,
2644 nsi_id=nsi_id,
2645 vnfd_id=vnfd_id,
2646 vdu_id=vdu_id,
2647 kdu_name=kdu_name,
2648 member_vnf_index=member_vnf_index,
2649 vdu_index=vdu_index,
2650 vdu_name=vdu_name,
2651 deploy_params=deploy_params_kdu,
2652 descriptor_config=descriptor_config,
2653 base_folder=base_folder,
2654 task_instantiation_info=tasks_dict_info,
2655 stage=stage,
2656 )
2657
2658 # Check if this NS has a charm configuration
2659 descriptor_config = nsd.get("ns-configuration")
2660 if descriptor_config and descriptor_config.get("juju"):
2661 vnfd_id = None
2662 db_vnfr = None
2663 member_vnf_index = None
2664 vdu_id = None
2665 kdu_name = None
2666 vdu_index = 0
2667 vdu_name = None
2668
2669 # Get additional parameters
2670 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2671 if db_nsr.get("additionalParamsForNs"):
2672 deploy_params.update(
2673 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2674 )
2675 base_folder = nsd["_admin"]["storage"]
2676 self._deploy_n2vc(
2677 logging_text=logging_text,
2678 db_nsr=db_nsr,
2679 db_vnfr=db_vnfr,
2680 nslcmop_id=nslcmop_id,
2681 nsr_id=nsr_id,
2682 nsi_id=nsi_id,
2683 vnfd_id=vnfd_id,
2684 vdu_id=vdu_id,
2685 kdu_name=kdu_name,
2686 member_vnf_index=member_vnf_index,
2687 vdu_index=vdu_index,
2688 vdu_name=vdu_name,
2689 deploy_params=deploy_params,
2690 descriptor_config=descriptor_config,
2691 base_folder=base_folder,
2692 task_instantiation_info=tasks_dict_info,
2693 stage=stage,
2694 )
2695
2696 # rest of staff will be done at finally
2697
2698 except (
2699 ROclient.ROClientException,
2700 DbException,
2701 LcmException,
2702 N2VCException,
2703 ) as e:
2704 self.logger.error(
2705 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2706 )
2707 exc = e
2708 except asyncio.CancelledError:
2709 self.logger.error(
2710 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2711 )
2712 exc = "Operation was cancelled"
2713 except Exception as e:
2714 exc = traceback.format_exc()
2715 self.logger.critical(
2716 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2717 exc_info=True,
2718 )
2719 finally:
2720 if exc:
2721 error_list.append(str(exc))
2722 try:
2723 # wait for pending tasks
2724 if tasks_dict_info:
2725 stage[1] = "Waiting for instantiate pending tasks."
2726 self.logger.debug(logging_text + stage[1])
2727 error_list += await self._wait_for_tasks(
2728 logging_text,
2729 tasks_dict_info,
2730 timeout_ns_deploy,
2731 stage,
2732 nslcmop_id,
2733 nsr_id=nsr_id,
2734 )
2735 stage[1] = stage[2] = ""
2736 except asyncio.CancelledError:
2737 error_list.append("Cancelled")
2738 # TODO cancel all tasks
2739 except Exception as exc:
2740 error_list.append(str(exc))
2741
2742 # update operation-status
2743 db_nsr_update["operational-status"] = "running"
2744 # let's begin with VCA 'configured' status (later we can change it)
2745 db_nsr_update["config-status"] = "configured"
2746 for task, task_name in tasks_dict_info.items():
2747 if not task.done() or task.cancelled() or task.exception():
2748 if task_name.startswith(self.task_name_deploy_vca):
2749 # A N2VC task is pending
2750 db_nsr_update["config-status"] = "failed"
2751 else:
2752 # RO or KDU task is pending
2753 db_nsr_update["operational-status"] = "failed"
2754
2755 # update status at database
2756 if error_list:
2757 error_detail = ". ".join(error_list)
2758 self.logger.error(logging_text + error_detail)
2759 error_description_nslcmop = "{} Detail: {}".format(
2760 stage[0], error_detail
2761 )
2762 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2763 nslcmop_id, stage[0]
2764 )
2765
2766 db_nsr_update["detailed-status"] = (
2767 error_description_nsr + " Detail: " + error_detail
2768 )
2769 db_nslcmop_update["detailed-status"] = error_detail
2770 nslcmop_operation_state = "FAILED"
2771 ns_state = "BROKEN"
2772 else:
2773 error_detail = None
2774 error_description_nsr = error_description_nslcmop = None
2775 ns_state = "READY"
2776 db_nsr_update["detailed-status"] = "Done"
2777 db_nslcmop_update["detailed-status"] = "Done"
2778 nslcmop_operation_state = "COMPLETED"
2779
2780 if db_nsr:
2781 self._write_ns_status(
2782 nsr_id=nsr_id,
2783 ns_state=ns_state,
2784 current_operation="IDLE",
2785 current_operation_id=None,
2786 error_description=error_description_nsr,
2787 error_detail=error_detail,
2788 other_update=db_nsr_update,
2789 )
2790 self._write_op_status(
2791 op_id=nslcmop_id,
2792 stage="",
2793 error_message=error_description_nslcmop,
2794 operation_state=nslcmop_operation_state,
2795 other_update=db_nslcmop_update,
2796 )
2797
2798 if nslcmop_operation_state:
2799 try:
2800 await self.msg.aiowrite(
2801 "ns",
2802 "instantiated",
2803 {
2804 "nsr_id": nsr_id,
2805 "nslcmop_id": nslcmop_id,
2806 "operationState": nslcmop_operation_state,
2807 },
2808 loop=self.loop,
2809 )
2810 except Exception as e:
2811 self.logger.error(
2812 logging_text + "kafka_write notification Exception {}".format(e)
2813 )
2814
2815 self.logger.debug(logging_text + "Exit")
2816 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2817
2818 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2819 if vnfd_id not in cached_vnfds:
2820 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2821 return cached_vnfds[vnfd_id]
2822
2823 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2824 if vnf_profile_id not in cached_vnfrs:
2825 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2826 "vnfrs",
2827 {
2828 "member-vnf-index-ref": vnf_profile_id,
2829 "nsr-id-ref": nsr_id,
2830 },
2831 )
2832 return cached_vnfrs[vnf_profile_id]
2833
2834 def _is_deployed_vca_in_relation(
2835 self, vca: DeployedVCA, relation: Relation
2836 ) -> bool:
2837 found = False
2838 for endpoint in (relation.provider, relation.requirer):
2839 if endpoint["kdu-resource-profile-id"]:
2840 continue
2841 found = (
2842 vca.vnf_profile_id == endpoint.vnf_profile_id
2843 and vca.vdu_profile_id == endpoint.vdu_profile_id
2844 and vca.execution_environment_ref == endpoint.execution_environment_ref
2845 )
2846 if found:
2847 break
2848 return found
2849
2850 def _update_ee_relation_data_with_implicit_data(
2851 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2852 ):
2853 ee_relation_data = safe_get_ee_relation(
2854 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2855 )
2856 ee_relation_level = EELevel.get_level(ee_relation_data)
2857 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2858 "execution-environment-ref"
2859 ]:
2860 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2861 vnfd_id = vnf_profile["vnfd-id"]
2862 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2863 entity_id = (
2864 vnfd_id
2865 if ee_relation_level == EELevel.VNF
2866 else ee_relation_data["vdu-profile-id"]
2867 )
2868 ee = get_juju_ee_ref(db_vnfd, entity_id)
2869 if not ee:
2870 raise Exception(
2871 f"not execution environments found for ee_relation {ee_relation_data}"
2872 )
2873 ee_relation_data["execution-environment-ref"] = ee["id"]
2874 return ee_relation_data
2875
2876 def _get_ns_relations(
2877 self,
2878 nsr_id: str,
2879 nsd: Dict[str, Any],
2880 vca: DeployedVCA,
2881 cached_vnfds: Dict[str, Any],
2882 ) -> List[Relation]:
2883 relations = []
2884 db_ns_relations = get_ns_configuration_relation_list(nsd)
2885 for r in db_ns_relations:
2886 provider_dict = None
2887 requirer_dict = None
2888 if all(key in r for key in ("provider", "requirer")):
2889 provider_dict = r["provider"]
2890 requirer_dict = r["requirer"]
2891 elif "entities" in r:
2892 provider_id = r["entities"][0]["id"]
2893 provider_dict = {
2894 "nsr-id": nsr_id,
2895 "endpoint": r["entities"][0]["endpoint"],
2896 }
2897 if provider_id != nsd["id"]:
2898 provider_dict["vnf-profile-id"] = provider_id
2899 requirer_id = r["entities"][1]["id"]
2900 requirer_dict = {
2901 "nsr-id": nsr_id,
2902 "endpoint": r["entities"][1]["endpoint"],
2903 }
2904 if requirer_id != nsd["id"]:
2905 requirer_dict["vnf-profile-id"] = requirer_id
2906 else:
2907 raise Exception(
2908 "provider/requirer or entities must be included in the relation."
2909 )
2910 relation_provider = self._update_ee_relation_data_with_implicit_data(
2911 nsr_id, nsd, provider_dict, cached_vnfds
2912 )
2913 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2914 nsr_id, nsd, requirer_dict, cached_vnfds
2915 )
2916 provider = EERelation(relation_provider)
2917 requirer = EERelation(relation_requirer)
2918 relation = Relation(r["name"], provider, requirer)
2919 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2920 if vca_in_relation:
2921 relations.append(relation)
2922 return relations
2923
2924 def _get_vnf_relations(
2925 self,
2926 nsr_id: str,
2927 nsd: Dict[str, Any],
2928 vca: DeployedVCA,
2929 cached_vnfds: Dict[str, Any],
2930 ) -> List[Relation]:
2931 relations = []
2932 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2933 vnf_profile_id = vnf_profile["id"]
2934 vnfd_id = vnf_profile["vnfd-id"]
2935 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2936 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2937 for r in db_vnf_relations:
2938 provider_dict = None
2939 requirer_dict = None
2940 if all(key in r for key in ("provider", "requirer")):
2941 provider_dict = r["provider"]
2942 requirer_dict = r["requirer"]
2943 elif "entities" in r:
2944 provider_id = r["entities"][0]["id"]
2945 provider_dict = {
2946 "nsr-id": nsr_id,
2947 "vnf-profile-id": vnf_profile_id,
2948 "endpoint": r["entities"][0]["endpoint"],
2949 }
2950 if provider_id != vnfd_id:
2951 provider_dict["vdu-profile-id"] = provider_id
2952 requirer_id = r["entities"][1]["id"]
2953 requirer_dict = {
2954 "nsr-id": nsr_id,
2955 "vnf-profile-id": vnf_profile_id,
2956 "endpoint": r["entities"][1]["endpoint"],
2957 }
2958 if requirer_id != vnfd_id:
2959 requirer_dict["vdu-profile-id"] = requirer_id
2960 else:
2961 raise Exception(
2962 "provider/requirer or entities must be included in the relation."
2963 )
2964 relation_provider = self._update_ee_relation_data_with_implicit_data(
2965 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2966 )
2967 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2968 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2969 )
2970 provider = EERelation(relation_provider)
2971 requirer = EERelation(relation_requirer)
2972 relation = Relation(r["name"], provider, requirer)
2973 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2974 if vca_in_relation:
2975 relations.append(relation)
2976 return relations
2977
2978 def _get_kdu_resource_data(
2979 self,
2980 ee_relation: EERelation,
2981 db_nsr: Dict[str, Any],
2982 cached_vnfds: Dict[str, Any],
2983 ) -> DeployedK8sResource:
2984 nsd = get_nsd(db_nsr)
2985 vnf_profiles = get_vnf_profiles(nsd)
2986 vnfd_id = find_in_list(
2987 vnf_profiles,
2988 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2989 )["vnfd-id"]
2990 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2991 kdu_resource_profile = get_kdu_resource_profile(
2992 db_vnfd, ee_relation.kdu_resource_profile_id
2993 )
2994 kdu_name = kdu_resource_profile["kdu-name"]
2995 deployed_kdu, _ = get_deployed_kdu(
2996 db_nsr.get("_admin", ()).get("deployed", ()),
2997 kdu_name,
2998 ee_relation.vnf_profile_id,
2999 )
3000 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
3001 return deployed_kdu
3002
3003 def _get_deployed_component(
3004 self,
3005 ee_relation: EERelation,
3006 db_nsr: Dict[str, Any],
3007 cached_vnfds: Dict[str, Any],
3008 ) -> DeployedComponent:
3009 nsr_id = db_nsr["_id"]
3010 deployed_component = None
3011 ee_level = EELevel.get_level(ee_relation)
3012 if ee_level == EELevel.NS:
3013 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
3014 if vca:
3015 deployed_component = DeployedVCA(nsr_id, vca)
3016 elif ee_level == EELevel.VNF:
3017 vca = get_deployed_vca(
3018 db_nsr,
3019 {
3020 "vdu_id": None,
3021 "member-vnf-index": ee_relation.vnf_profile_id,
3022 "ee_descriptor_id": ee_relation.execution_environment_ref,
3023 },
3024 )
3025 if vca:
3026 deployed_component = DeployedVCA(nsr_id, vca)
3027 elif ee_level == EELevel.VDU:
3028 vca = get_deployed_vca(
3029 db_nsr,
3030 {
3031 "vdu_id": ee_relation.vdu_profile_id,
3032 "member-vnf-index": ee_relation.vnf_profile_id,
3033 "ee_descriptor_id": ee_relation.execution_environment_ref,
3034 },
3035 )
3036 if vca:
3037 deployed_component = DeployedVCA(nsr_id, vca)
3038 elif ee_level == EELevel.KDU:
3039 kdu_resource_data = self._get_kdu_resource_data(
3040 ee_relation, db_nsr, cached_vnfds
3041 )
3042 if kdu_resource_data:
3043 deployed_component = DeployedK8sResource(kdu_resource_data)
3044 return deployed_component
3045
3046 async def _add_relation(
3047 self,
3048 relation: Relation,
3049 vca_type: str,
3050 db_nsr: Dict[str, Any],
3051 cached_vnfds: Dict[str, Any],
3052 cached_vnfrs: Dict[str, Any],
3053 ) -> bool:
3054 deployed_provider = self._get_deployed_component(
3055 relation.provider, db_nsr, cached_vnfds
3056 )
3057 deployed_requirer = self._get_deployed_component(
3058 relation.requirer, db_nsr, cached_vnfds
3059 )
3060 if (
3061 deployed_provider
3062 and deployed_requirer
3063 and deployed_provider.config_sw_installed
3064 and deployed_requirer.config_sw_installed
3065 ):
3066 provider_db_vnfr = (
3067 self._get_vnfr(
3068 relation.provider.nsr_id,
3069 relation.provider.vnf_profile_id,
3070 cached_vnfrs,
3071 )
3072 if relation.provider.vnf_profile_id
3073 else None
3074 )
3075 requirer_db_vnfr = (
3076 self._get_vnfr(
3077 relation.requirer.nsr_id,
3078 relation.requirer.vnf_profile_id,
3079 cached_vnfrs,
3080 )
3081 if relation.requirer.vnf_profile_id
3082 else None
3083 )
3084 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3085 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3086 provider_relation_endpoint = RelationEndpoint(
3087 deployed_provider.ee_id,
3088 provider_vca_id,
3089 relation.provider.endpoint,
3090 )
3091 requirer_relation_endpoint = RelationEndpoint(
3092 deployed_requirer.ee_id,
3093 requirer_vca_id,
3094 relation.requirer.endpoint,
3095 )
3096 await self.vca_map[vca_type].add_relation(
3097 provider=provider_relation_endpoint,
3098 requirer=requirer_relation_endpoint,
3099 )
3100 # remove entry from relations list
3101 return True
3102 return False
3103
3104 async def _add_vca_relations(
3105 self,
3106 logging_text,
3107 nsr_id,
3108 vca_type: str,
3109 vca_index: int,
3110 timeout: int = 3600,
3111 ) -> bool:
3112
3113 # steps:
3114 # 1. find all relations for this VCA
3115 # 2. wait for other peers related
3116 # 3. add relations
3117
3118 try:
3119 # STEP 1: find all relations for this VCA
3120
3121 # read nsr record
3122 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3123 nsd = get_nsd(db_nsr)
3124
3125 # this VCA data
3126 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3127 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3128
3129 cached_vnfds = {}
3130 cached_vnfrs = {}
3131 relations = []
3132 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3133 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3134
3135 # if no relations, terminate
3136 if not relations:
3137 self.logger.debug(logging_text + " No relations")
3138 return True
3139
3140 self.logger.debug(logging_text + " adding relations {}".format(relations))
3141
3142 # add all relations
3143 start = time()
3144 while True:
3145 # check timeout
3146 now = time()
3147 if now - start >= timeout:
3148 self.logger.error(logging_text + " : timeout adding relations")
3149 return False
3150
3151 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3152 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3153
3154 # for each relation, find the VCA's related
3155 for relation in relations.copy():
3156 added = await self._add_relation(
3157 relation,
3158 vca_type,
3159 db_nsr,
3160 cached_vnfds,
3161 cached_vnfrs,
3162 )
3163 if added:
3164 relations.remove(relation)
3165
3166 if not relations:
3167 self.logger.debug("Relations added")
3168 break
3169 await asyncio.sleep(5.0)
3170
3171 return True
3172
3173 except Exception as e:
3174 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3175 return False
3176
3177 async def _install_kdu(
3178 self,
3179 nsr_id: str,
3180 nsr_db_path: str,
3181 vnfr_data: dict,
3182 kdu_index: int,
3183 kdud: dict,
3184 vnfd: dict,
3185 k8s_instance_info: dict,
3186 k8params: dict = None,
3187 timeout: int = 600,
3188 vca_id: str = None,
3189 ):
3190
3191 try:
3192 k8sclustertype = k8s_instance_info["k8scluster-type"]
3193 # Instantiate kdu
3194 db_dict_install = {
3195 "collection": "nsrs",
3196 "filter": {"_id": nsr_id},
3197 "path": nsr_db_path,
3198 }
3199
3200 if k8s_instance_info.get("kdu-deployment-name"):
3201 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3202 else:
3203 kdu_instance = self.k8scluster_map[
3204 k8sclustertype
3205 ].generate_kdu_instance_name(
3206 db_dict=db_dict_install,
3207 kdu_model=k8s_instance_info["kdu-model"],
3208 kdu_name=k8s_instance_info["kdu-name"],
3209 )
3210
3211 # Update the nsrs table with the kdu-instance value
3212 self.update_db_2(
3213 item="nsrs",
3214 _id=nsr_id,
3215 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
3216 )
3217
3218 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3219 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3220 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3221 # namespace, this first verification could be removed, and the next step would be done for any kind
3222 # of KNF.
3223 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3224 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3225 if k8sclustertype in ("juju", "juju-bundle"):
3226 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3227 # that the user passed a namespace which he wants its KDU to be deployed in)
3228 if (
3229 self.db.count(
3230 table="nsrs",
3231 q_filter={
3232 "_id": nsr_id,
3233 "_admin.projects_write": k8s_instance_info["namespace"],
3234 "_admin.projects_read": k8s_instance_info["namespace"],
3235 },
3236 )
3237 > 0
3238 ):
3239 self.logger.debug(
3240 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3241 )
3242 self.update_db_2(
3243 item="nsrs",
3244 _id=nsr_id,
3245 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3246 )
3247 k8s_instance_info["namespace"] = kdu_instance
3248
3249 await self.k8scluster_map[k8sclustertype].install(
3250 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3251 kdu_model=k8s_instance_info["kdu-model"],
3252 atomic=True,
3253 params=k8params,
3254 db_dict=db_dict_install,
3255 timeout=timeout,
3256 kdu_name=k8s_instance_info["kdu-name"],
3257 namespace=k8s_instance_info["namespace"],
3258 kdu_instance=kdu_instance,
3259 vca_id=vca_id,
3260 )
3261
3262 # Obtain services to obtain management service ip
3263 services = await self.k8scluster_map[k8sclustertype].get_services(
3264 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3265 kdu_instance=kdu_instance,
3266 namespace=k8s_instance_info["namespace"],
3267 )
3268
3269 # Obtain management service info (if exists)
3270 vnfr_update_dict = {}
3271 kdu_config = get_configuration(vnfd, kdud["name"])
3272 if kdu_config:
3273 target_ee_list = kdu_config.get("execution-environment-list", [])
3274 else:
3275 target_ee_list = []
3276
3277 if services:
3278 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3279 mgmt_services = [
3280 service
3281 for service in kdud.get("service", [])
3282 if service.get("mgmt-service")
3283 ]
3284 for mgmt_service in mgmt_services:
3285 for service in services:
3286 if service["name"].startswith(mgmt_service["name"]):
3287 # Mgmt service found, Obtain service ip
3288 ip = service.get("external_ip", service.get("cluster_ip"))
3289 if isinstance(ip, list) and len(ip) == 1:
3290 ip = ip[0]
3291
3292 vnfr_update_dict[
3293 "kdur.{}.ip-address".format(kdu_index)
3294 ] = ip
3295
3296 # Check if must update also mgmt ip at the vnf
3297 service_external_cp = mgmt_service.get(
3298 "external-connection-point-ref"
3299 )
3300 if service_external_cp:
3301 if (
3302 deep_get(vnfd, ("mgmt-interface", "cp"))
3303 == service_external_cp
3304 ):
3305 vnfr_update_dict["ip-address"] = ip
3306
3307 if find_in_list(
3308 target_ee_list,
3309 lambda ee: ee.get(
3310 "external-connection-point-ref", ""
3311 )
3312 == service_external_cp,
3313 ):
3314 vnfr_update_dict[
3315 "kdur.{}.ip-address".format(kdu_index)
3316 ] = ip
3317 break
3318 else:
3319 self.logger.warn(
3320 "Mgmt service name: {} not found".format(
3321 mgmt_service["name"]
3322 )
3323 )
3324
3325 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3326 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3327
3328 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3329 if (
3330 kdu_config
3331 and kdu_config.get("initial-config-primitive")
3332 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3333 ):
3334 initial_config_primitive_list = kdu_config.get(
3335 "initial-config-primitive"
3336 )
3337 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3338
3339 for initial_config_primitive in initial_config_primitive_list:
3340 primitive_params_ = self._map_primitive_params(
3341 initial_config_primitive, {}, {}
3342 )
3343
3344 await asyncio.wait_for(
3345 self.k8scluster_map[k8sclustertype].exec_primitive(
3346 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3347 kdu_instance=kdu_instance,
3348 primitive_name=initial_config_primitive["name"],
3349 params=primitive_params_,
3350 db_dict=db_dict_install,
3351 vca_id=vca_id,
3352 ),
3353 timeout=timeout,
3354 )
3355
3356 except Exception as e:
3357 # Prepare update db with error and raise exception
3358 try:
3359 self.update_db_2(
3360 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3361 )
3362 self.update_db_2(
3363 "vnfrs",
3364 vnfr_data.get("_id"),
3365 {"kdur.{}.status".format(kdu_index): "ERROR"},
3366 )
3367 except Exception:
3368 # ignore to keep original exception
3369 pass
3370 # reraise original error
3371 raise
3372
3373 return kdu_instance
3374
3375 async def deploy_kdus(
3376 self,
3377 logging_text,
3378 nsr_id,
3379 nslcmop_id,
3380 db_vnfrs,
3381 db_vnfds,
3382 task_instantiation_info,
3383 ):
3384 # Launch kdus if present in the descriptor
3385
3386 k8scluster_id_2_uuic = {
3387 "helm-chart-v3": {},
3388 "helm-chart": {},
3389 "juju-bundle": {},
3390 }
3391
3392 async def _get_cluster_id(cluster_id, cluster_type):
3393 nonlocal k8scluster_id_2_uuic
3394 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3395 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3396
3397 # check if K8scluster is creating and wait look if previous tasks in process
3398 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3399 "k8scluster", cluster_id
3400 )
3401 if task_dependency:
3402 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3403 task_name, cluster_id
3404 )
3405 self.logger.debug(logging_text + text)
3406 await asyncio.wait(task_dependency, timeout=3600)
3407
3408 db_k8scluster = self.db.get_one(
3409 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3410 )
3411 if not db_k8scluster:
3412 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3413
3414 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3415 if not k8s_id:
3416 if cluster_type == "helm-chart-v3":
3417 try:
3418 # backward compatibility for existing clusters that have not been initialized for helm v3
3419 k8s_credentials = yaml.safe_dump(
3420 db_k8scluster.get("credentials")
3421 )
3422 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3423 k8s_credentials, reuse_cluster_uuid=cluster_id
3424 )
3425 db_k8scluster_update = {}
3426 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3427 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3428 db_k8scluster_update[
3429 "_admin.helm-chart-v3.created"
3430 ] = uninstall_sw
3431 db_k8scluster_update[
3432 "_admin.helm-chart-v3.operationalState"
3433 ] = "ENABLED"
3434 self.update_db_2(
3435 "k8sclusters", cluster_id, db_k8scluster_update
3436 )
3437 except Exception as e:
3438 self.logger.error(
3439 logging_text
3440 + "error initializing helm-v3 cluster: {}".format(str(e))
3441 )
3442 raise LcmException(
3443 "K8s cluster '{}' has not been initialized for '{}'".format(
3444 cluster_id, cluster_type
3445 )
3446 )
3447 else:
3448 raise LcmException(
3449 "K8s cluster '{}' has not been initialized for '{}'".format(
3450 cluster_id, cluster_type
3451 )
3452 )
3453 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3454 return k8s_id
3455
3456 logging_text += "Deploy kdus: "
3457 step = ""
3458 try:
3459 db_nsr_update = {"_admin.deployed.K8s": []}
3460 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3461
3462 index = 0
3463 updated_cluster_list = []
3464 updated_v3_cluster_list = []
3465
3466 for vnfr_data in db_vnfrs.values():
3467 vca_id = self.get_vca_id(vnfr_data, {})
3468 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3469 # Step 0: Prepare and set parameters
3470 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3471 vnfd_id = vnfr_data.get("vnfd-id")
3472 vnfd_with_id = find_in_list(
3473 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3474 )
3475 kdud = next(
3476 kdud
3477 for kdud in vnfd_with_id["kdu"]
3478 if kdud["name"] == kdur["kdu-name"]
3479 )
3480 namespace = kdur.get("k8s-namespace")
3481 kdu_deployment_name = kdur.get("kdu-deployment-name")
3482 if kdur.get("helm-chart"):
3483 kdumodel = kdur["helm-chart"]
3484 # Default version: helm3, if helm-version is v2 assign v2
3485 k8sclustertype = "helm-chart-v3"
3486 self.logger.debug("kdur: {}".format(kdur))
3487 if (
3488 kdur.get("helm-version")
3489 and kdur.get("helm-version") == "v2"
3490 ):
3491 k8sclustertype = "helm-chart"
3492 elif kdur.get("juju-bundle"):
3493 kdumodel = kdur["juju-bundle"]
3494 k8sclustertype = "juju-bundle"
3495 else:
3496 raise LcmException(
3497 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3498 "juju-bundle. Maybe an old NBI version is running".format(
3499 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3500 )
3501 )
3502 # check if kdumodel is a file and exists
3503 try:
3504 vnfd_with_id = find_in_list(
3505 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3506 )
3507 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3508 if storage: # may be not present if vnfd has not artifacts
3509 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3510 if storage["pkg-dir"]:
3511 filename = "{}/{}/{}s/{}".format(
3512 storage["folder"],
3513 storage["pkg-dir"],
3514 k8sclustertype,
3515 kdumodel,
3516 )
3517 else:
3518 filename = "{}/Scripts/{}s/{}".format(
3519 storage["folder"],
3520 k8sclustertype,
3521 kdumodel,
3522 )
3523 if self.fs.file_exists(
3524 filename, mode="file"
3525 ) or self.fs.file_exists(filename, mode="dir"):
3526 kdumodel = self.fs.path + filename
3527 except (asyncio.TimeoutError, asyncio.CancelledError):
3528 raise
3529 except Exception: # it is not a file
3530 pass
3531
3532 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3533 step = "Synchronize repos for k8s cluster '{}'".format(
3534 k8s_cluster_id
3535 )
3536 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3537
3538 # Synchronize repos
3539 if (
3540 k8sclustertype == "helm-chart"
3541 and cluster_uuid not in updated_cluster_list
3542 ) or (
3543 k8sclustertype == "helm-chart-v3"
3544 and cluster_uuid not in updated_v3_cluster_list
3545 ):
3546 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3547 self.k8scluster_map[k8sclustertype].synchronize_repos(
3548 cluster_uuid=cluster_uuid
3549 )
3550 )
3551 if del_repo_list or added_repo_dict:
3552 if k8sclustertype == "helm-chart":
3553 unset = {
3554 "_admin.helm_charts_added." + item: None
3555 for item in del_repo_list
3556 }
3557 updated = {
3558 "_admin.helm_charts_added." + item: name
3559 for item, name in added_repo_dict.items()
3560 }
3561 updated_cluster_list.append(cluster_uuid)
3562 elif k8sclustertype == "helm-chart-v3":
3563 unset = {
3564 "_admin.helm_charts_v3_added." + item: None
3565 for item in del_repo_list
3566 }
3567 updated = {
3568 "_admin.helm_charts_v3_added." + item: name
3569 for item, name in added_repo_dict.items()
3570 }
3571 updated_v3_cluster_list.append(cluster_uuid)
3572 self.logger.debug(
3573 logging_text + "repos synchronized on k8s cluster "
3574 "'{}' to_delete: {}, to_add: {}".format(
3575 k8s_cluster_id, del_repo_list, added_repo_dict
3576 )
3577 )
3578 self.db.set_one(
3579 "k8sclusters",
3580 {"_id": k8s_cluster_id},
3581 updated,
3582 unset=unset,
3583 )
3584
3585 # Instantiate kdu
3586 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3587 vnfr_data["member-vnf-index-ref"],
3588 kdur["kdu-name"],
3589 k8s_cluster_id,
3590 )
3591 k8s_instance_info = {
3592 "kdu-instance": None,
3593 "k8scluster-uuid": cluster_uuid,
3594 "k8scluster-type": k8sclustertype,
3595 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3596 "kdu-name": kdur["kdu-name"],
3597 "kdu-model": kdumodel,
3598 "namespace": namespace,
3599 "kdu-deployment-name": kdu_deployment_name,
3600 }
3601 db_path = "_admin.deployed.K8s.{}".format(index)
3602 db_nsr_update[db_path] = k8s_instance_info
3603 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3604 vnfd_with_id = find_in_list(
3605 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3606 )
3607 task = asyncio.ensure_future(
3608 self._install_kdu(
3609 nsr_id,
3610 db_path,
3611 vnfr_data,
3612 kdu_index,
3613 kdud,
3614 vnfd_with_id,
3615 k8s_instance_info,
3616 k8params=desc_params,
3617 timeout=1800,
3618 vca_id=vca_id,
3619 )
3620 )
3621 self.lcm_tasks.register(
3622 "ns",
3623 nsr_id,
3624 nslcmop_id,
3625 "instantiate_KDU-{}".format(index),
3626 task,
3627 )
3628 task_instantiation_info[task] = "Deploying KDU {}".format(
3629 kdur["kdu-name"]
3630 )
3631
3632 index += 1
3633
3634 except (LcmException, asyncio.CancelledError):
3635 raise
3636 except Exception as e:
3637 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3638 if isinstance(e, (N2VCException, DbException)):
3639 self.logger.error(logging_text + msg)
3640 else:
3641 self.logger.critical(logging_text + msg, exc_info=True)
3642 raise LcmException(msg)
3643 finally:
3644 if db_nsr_update:
3645 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3646
3647 def _deploy_n2vc(
3648 self,
3649 logging_text,
3650 db_nsr,
3651 db_vnfr,
3652 nslcmop_id,
3653 nsr_id,
3654 nsi_id,
3655 vnfd_id,
3656 vdu_id,
3657 kdu_name,
3658 member_vnf_index,
3659 vdu_index,
3660 vdu_name,
3661 deploy_params,
3662 descriptor_config,
3663 base_folder,
3664 task_instantiation_info,
3665 stage,
3666 ):
3667 # launch instantiate_N2VC in a asyncio task and register task object
3668 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3669 # if not found, create one entry and update database
3670 # fill db_nsr._admin.deployed.VCA.<index>
3671
3672 self.logger.debug(
3673 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3674 )
3675 if "execution-environment-list" in descriptor_config:
3676 ee_list = descriptor_config.get("execution-environment-list", [])
3677 elif "juju" in descriptor_config:
3678 ee_list = [descriptor_config] # ns charms
3679 else: # other types as script are not supported
3680 ee_list = []
3681
3682 for ee_item in ee_list:
3683 self.logger.debug(
3684 logging_text
3685 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3686 ee_item.get("juju"), ee_item.get("helm-chart")
3687 )
3688 )
3689 ee_descriptor_id = ee_item.get("id")
3690 if ee_item.get("juju"):
3691 vca_name = ee_item["juju"].get("charm")
3692 vca_type = (
3693 "lxc_proxy_charm"
3694 if ee_item["juju"].get("charm") is not None
3695 else "native_charm"
3696 )
3697 if ee_item["juju"].get("cloud") == "k8s":
3698 vca_type = "k8s_proxy_charm"
3699 elif ee_item["juju"].get("proxy") is False:
3700 vca_type = "native_charm"
3701 elif ee_item.get("helm-chart"):
3702 vca_name = ee_item["helm-chart"]
3703 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3704 vca_type = "helm"
3705 else:
3706 vca_type = "helm-v3"
3707 else:
3708 self.logger.debug(
3709 logging_text + "skipping non juju neither charm configuration"
3710 )
3711 continue
3712
3713 vca_index = -1
3714 for vca_index, vca_deployed in enumerate(
3715 db_nsr["_admin"]["deployed"]["VCA"]
3716 ):
3717 if not vca_deployed:
3718 continue
3719 if (
3720 vca_deployed.get("member-vnf-index") == member_vnf_index
3721 and vca_deployed.get("vdu_id") == vdu_id
3722 and vca_deployed.get("kdu_name") == kdu_name
3723 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3724 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3725 ):
3726 break
3727 else:
3728 # not found, create one.
3729 target = (
3730 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3731 )
3732 if vdu_id:
3733 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3734 elif kdu_name:
3735 target += "/kdu/{}".format(kdu_name)
3736 vca_deployed = {
3737 "target_element": target,
3738 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3739 "member-vnf-index": member_vnf_index,
3740 "vdu_id": vdu_id,
3741 "kdu_name": kdu_name,
3742 "vdu_count_index": vdu_index,
3743 "operational-status": "init", # TODO revise
3744 "detailed-status": "", # TODO revise
3745 "step": "initial-deploy", # TODO revise
3746 "vnfd_id": vnfd_id,
3747 "vdu_name": vdu_name,
3748 "type": vca_type,
3749 "ee_descriptor_id": ee_descriptor_id,
3750 }
3751 vca_index += 1
3752
3753 # create VCA and configurationStatus in db
3754 db_dict = {
3755 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3756 "configurationStatus.{}".format(vca_index): dict(),
3757 }
3758 self.update_db_2("nsrs", nsr_id, db_dict)
3759
3760 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3761
3762 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3763 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3764 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3765
3766 # Launch task
3767 task_n2vc = asyncio.ensure_future(
3768 self.instantiate_N2VC(
3769 logging_text=logging_text,
3770 vca_index=vca_index,
3771 nsi_id=nsi_id,
3772 db_nsr=db_nsr,
3773 db_vnfr=db_vnfr,
3774 vdu_id=vdu_id,
3775 kdu_name=kdu_name,
3776 vdu_index=vdu_index,
3777 deploy_params=deploy_params,
3778 config_descriptor=descriptor_config,
3779 base_folder=base_folder,
3780 nslcmop_id=nslcmop_id,
3781 stage=stage,
3782 vca_type=vca_type,
3783 vca_name=vca_name,
3784 ee_config_descriptor=ee_item,
3785 )
3786 )
3787 self.lcm_tasks.register(
3788 "ns",
3789 nsr_id,
3790 nslcmop_id,
3791 "instantiate_N2VC-{}".format(vca_index),
3792 task_n2vc,
3793 )
3794 task_instantiation_info[
3795 task_n2vc
3796 ] = self.task_name_deploy_vca + " {}.{}".format(
3797 member_vnf_index or "", vdu_id or ""
3798 )
3799
3800 @staticmethod
3801 def _create_nslcmop(nsr_id, operation, params):
3802 """
3803 Creates a ns-lcm-opp content to be stored at database.
3804 :param nsr_id: internal id of the instance
3805 :param operation: instantiate, terminate, scale, action, ...
3806 :param params: user parameters for the operation
3807 :return: dictionary following SOL005 format
3808 """
3809 # Raise exception if invalid arguments
3810 if not (nsr_id and operation and params):
3811 raise LcmException(
3812 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3813 )
3814 now = time()
3815 _id = str(uuid4())
3816 nslcmop = {
3817 "id": _id,
3818 "_id": _id,
3819 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3820 "operationState": "PROCESSING",
3821 "statusEnteredTime": now,
3822 "nsInstanceId": nsr_id,
3823 "lcmOperationType": operation,
3824 "startTime": now,
3825 "isAutomaticInvocation": False,
3826 "operationParams": params,
3827 "isCancelPending": False,
3828 "links": {
3829 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3830 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3831 },
3832 }
3833 return nslcmop
3834
3835 def _format_additional_params(self, params):
3836 params = params or {}
3837 for key, value in params.items():
3838 if str(value).startswith("!!yaml "):
3839 params[key] = yaml.safe_load(value[7:])
3840 return params
3841
3842 def _get_terminate_primitive_params(self, seq, vnf_index):
3843 primitive = seq.get("name")
3844 primitive_params = {}
3845 params = {
3846 "member_vnf_index": vnf_index,
3847 "primitive": primitive,
3848 "primitive_params": primitive_params,
3849 }
3850 desc_params = {}
3851 return self._map_primitive_params(seq, params, desc_params)
3852
3853 # sub-operations
3854
3855 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3856 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3857 if op.get("operationState") == "COMPLETED":
3858 # b. Skip sub-operation
3859 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3860 return self.SUBOPERATION_STATUS_SKIP
3861 else:
3862 # c. retry executing sub-operation
3863 # The sub-operation exists, and operationState != 'COMPLETED'
3864 # Update operationState = 'PROCESSING' to indicate a retry.
3865 operationState = "PROCESSING"
3866 detailed_status = "In progress"
3867 self._update_suboperation_status(
3868 db_nslcmop, op_index, operationState, detailed_status
3869 )
3870 # Return the sub-operation index
3871 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3872 # with arguments extracted from the sub-operation
3873 return op_index
3874
3875 # Find a sub-operation where all keys in a matching dictionary must match
3876 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3877 def _find_suboperation(self, db_nslcmop, match):
3878 if db_nslcmop and match:
3879 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3880 for i, op in enumerate(op_list):
3881 if all(op.get(k) == match[k] for k in match):
3882 return i
3883 return self.SUBOPERATION_STATUS_NOT_FOUND
3884
3885 # Update status for a sub-operation given its index
3886 def _update_suboperation_status(
3887 self, db_nslcmop, op_index, operationState, detailed_status
3888 ):
3889 # Update DB for HA tasks
3890 q_filter = {"_id": db_nslcmop["_id"]}
3891 update_dict = {
3892 "_admin.operations.{}.operationState".format(op_index): operationState,
3893 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3894 }
3895 self.db.set_one(
3896 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3897 )
3898
3899 # Add sub-operation, return the index of the added sub-operation
3900 # Optionally, set operationState, detailed-status, and operationType
3901 # Status and type are currently set for 'scale' sub-operations:
3902 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3903 # 'detailed-status' : status message
3904 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3905 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3906 def _add_suboperation(
3907 self,
3908 db_nslcmop,
3909 vnf_index,
3910 vdu_id,
3911 vdu_count_index,
3912 vdu_name,
3913 primitive,
3914 mapped_primitive_params,
3915 operationState=None,
3916 detailed_status=None,
3917 operationType=None,
3918 RO_nsr_id=None,
3919 RO_scaling_info=None,
3920 ):
3921 if not db_nslcmop:
3922 return self.SUBOPERATION_STATUS_NOT_FOUND
3923 # Get the "_admin.operations" list, if it exists
3924 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3925 op_list = db_nslcmop_admin.get("operations")
3926 # Create or append to the "_admin.operations" list
3927 new_op = {
3928 "member_vnf_index": vnf_index,
3929 "vdu_id": vdu_id,
3930 "vdu_count_index": vdu_count_index,
3931 "primitive": primitive,
3932 "primitive_params": mapped_primitive_params,
3933 }
3934 if operationState:
3935 new_op["operationState"] = operationState
3936 if detailed_status:
3937 new_op["detailed-status"] = detailed_status
3938 if operationType:
3939 new_op["lcmOperationType"] = operationType
3940 if RO_nsr_id:
3941 new_op["RO_nsr_id"] = RO_nsr_id
3942 if RO_scaling_info:
3943 new_op["RO_scaling_info"] = RO_scaling_info
3944 if not op_list:
3945 # No existing operations, create key 'operations' with current operation as first list element
3946 db_nslcmop_admin.update({"operations": [new_op]})
3947 op_list = db_nslcmop_admin.get("operations")
3948 else:
3949 # Existing operations, append operation to list
3950 op_list.append(new_op)
3951
3952 db_nslcmop_update = {"_admin.operations": op_list}
3953 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3954 op_index = len(op_list) - 1
3955 return op_index
3956
3957 # Helper methods for scale() sub-operations
3958
3959 # pre-scale/post-scale:
3960 # Check for 3 different cases:
3961 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3962 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3963 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3964 def _check_or_add_scale_suboperation(
3965 self,
3966 db_nslcmop,
3967 vnf_index,
3968 vnf_config_primitive,
3969 primitive_params,
3970 operationType,
3971 RO_nsr_id=None,
3972 RO_scaling_info=None,
3973 ):
3974 # Find this sub-operation
3975 if RO_nsr_id and RO_scaling_info:
3976 operationType = "SCALE-RO"
3977 match = {
3978 "member_vnf_index": vnf_index,
3979 "RO_nsr_id": RO_nsr_id,
3980 "RO_scaling_info": RO_scaling_info,
3981 }
3982 else:
3983 match = {
3984 "member_vnf_index": vnf_index,
3985 "primitive": vnf_config_primitive,
3986 "primitive_params": primitive_params,
3987 "lcmOperationType": operationType,
3988 }
3989 op_index = self._find_suboperation(db_nslcmop, match)
3990 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3991 # a. New sub-operation
3992 # The sub-operation does not exist, add it.
3993 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3994 # The following parameters are set to None for all kind of scaling:
3995 vdu_id = None
3996 vdu_count_index = None
3997 vdu_name = None
3998 if RO_nsr_id and RO_scaling_info:
3999 vnf_config_primitive = None
4000 primitive_params = None
4001 else:
4002 RO_nsr_id = None
4003 RO_scaling_info = None
4004 # Initial status for sub-operation
4005 operationState = "PROCESSING"
4006 detailed_status = "In progress"
4007 # Add sub-operation for pre/post-scaling (zero or more operations)
4008 self._add_suboperation(
4009 db_nslcmop,
4010 vnf_index,
4011 vdu_id,
4012 vdu_count_index,
4013 vdu_name,
4014 vnf_config_primitive,
4015 primitive_params,
4016 operationState,
4017 detailed_status,
4018 operationType,
4019 RO_nsr_id,
4020 RO_scaling_info,
4021 )
4022 return self.SUBOPERATION_STATUS_NEW
4023 else:
4024 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4025 # or op_index (operationState != 'COMPLETED')
4026 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
4027
4028 # Function to return execution_environment id
4029
4030 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
4031 # TODO vdu_index_count
4032 for vca in vca_deployed_list:
4033 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
4034 return vca["ee_id"]
4035
4036 async def destroy_N2VC(
4037 self,
4038 logging_text,
4039 db_nslcmop,
4040 vca_deployed,
4041 config_descriptor,
4042 vca_index,
4043 destroy_ee=True,
4044 exec_primitives=True,
4045 scaling_in=False,
4046 vca_id: str = None,
4047 ):
4048 """
4049 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4050 :param logging_text:
4051 :param db_nslcmop:
4052 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4053 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4054 :param vca_index: index in the database _admin.deployed.VCA
4055 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4056 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4057 not executed properly
4058 :param scaling_in: True destroys the application, False destroys the model
4059 :return: None or exception
4060 """
4061
4062 self.logger.debug(
4063 logging_text
4064 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4065 vca_index, vca_deployed, config_descriptor, destroy_ee
4066 )
4067 )
4068
4069 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4070
4071 # execute terminate_primitives
4072 if exec_primitives:
4073 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4074 config_descriptor.get("terminate-config-primitive"),
4075 vca_deployed.get("ee_descriptor_id"),
4076 )
4077 vdu_id = vca_deployed.get("vdu_id")
4078 vdu_count_index = vca_deployed.get("vdu_count_index")
4079 vdu_name = vca_deployed.get("vdu_name")
4080 vnf_index = vca_deployed.get("member-vnf-index")
4081 if terminate_primitives and vca_deployed.get("needed_terminate"):
4082 for seq in terminate_primitives:
4083 # For each sequence in list, get primitive and call _ns_execute_primitive()
4084 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4085 vnf_index, seq.get("name")
4086 )
4087 self.logger.debug(logging_text + step)
4088 # Create the primitive for each sequence, i.e. "primitive": "touch"
4089 primitive = seq.get("name")
4090 mapped_primitive_params = self._get_terminate_primitive_params(
4091 seq, vnf_index
4092 )
4093
4094 # Add sub-operation
4095 self._add_suboperation(
4096 db_nslcmop,
4097 vnf_index,
4098 vdu_id,
4099 vdu_count_index,
4100 vdu_name,
4101 primitive,
4102 mapped_primitive_params,
4103 )
4104 # Sub-operations: Call _ns_execute_primitive() instead of action()
4105 try:
4106 result, result_detail = await self._ns_execute_primitive(
4107 vca_deployed["ee_id"],
4108 primitive,
4109 mapped_primitive_params,
4110 vca_type=vca_type,
4111 vca_id=vca_id,
4112 )
4113 except LcmException:
4114 # this happens when VCA is not deployed. In this case it is not needed to terminate
4115 continue
4116 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4117 if result not in result_ok:
4118 raise LcmException(
4119 "terminate_primitive {} for vnf_member_index={} fails with "
4120 "error {}".format(seq.get("name"), vnf_index, result_detail)
4121 )
4122 # set that this VCA do not need terminated
4123 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4124 vca_index
4125 )
4126 self.update_db_2(
4127 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4128 )
4129
4130 # Delete Prometheus Jobs if any
4131 # This uses NSR_ID, so it will destroy any jobs under this index
4132 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4133
4134 if destroy_ee:
4135 await self.vca_map[vca_type].delete_execution_environment(
4136 vca_deployed["ee_id"],
4137 scaling_in=scaling_in,
4138 vca_type=vca_type,
4139 vca_id=vca_id,
4140 )
4141
4142 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4143 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4144 namespace = "." + db_nsr["_id"]
4145 try:
4146 await self.n2vc.delete_namespace(
4147 namespace=namespace,
4148 total_timeout=self.timeout_charm_delete,
4149 vca_id=vca_id,
4150 )
4151 except N2VCNotFound: # already deleted. Skip
4152 pass
4153 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4154
4155 async def _terminate_RO(
4156 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4157 ):
4158 """
4159 Terminates a deployment from RO
4160 :param logging_text:
4161 :param nsr_deployed: db_nsr._admin.deployed
4162 :param nsr_id:
4163 :param nslcmop_id:
4164 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4165 this method will update only the index 2, but it will write on database the concatenated content of the list
4166 :return:
4167 """
4168 db_nsr_update = {}
4169 failed_detail = []
4170 ro_nsr_id = ro_delete_action = None
4171 if nsr_deployed and nsr_deployed.get("RO"):
4172 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4173 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4174 try:
4175 if ro_nsr_id:
4176 stage[2] = "Deleting ns from VIM."
4177 db_nsr_update["detailed-status"] = " ".join(stage)
4178 self._write_op_status(nslcmop_id, stage)
4179 self.logger.debug(logging_text + stage[2])
4180 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4181 self._write_op_status(nslcmop_id, stage)
4182 desc = await self.RO.delete("ns", ro_nsr_id)
4183 ro_delete_action = desc["action_id"]
4184 db_nsr_update[
4185 "_admin.deployed.RO.nsr_delete_action_id"
4186 ] = ro_delete_action
4187 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4188 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4189 if ro_delete_action:
4190 # wait until NS is deleted from VIM
4191 stage[2] = "Waiting ns deleted from VIM."
4192 detailed_status_old = None
4193 self.logger.debug(
4194 logging_text
4195 + stage[2]
4196 + " RO_id={} ro_delete_action={}".format(
4197 ro_nsr_id, ro_delete_action
4198 )
4199 )
4200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4201 self._write_op_status(nslcmop_id, stage)
4202
4203 delete_timeout = 20 * 60 # 20 minutes
4204 while delete_timeout > 0:
4205 desc = await self.RO.show(
4206 "ns",
4207 item_id_name=ro_nsr_id,
4208 extra_item="action",
4209 extra_item_id=ro_delete_action,
4210 )
4211
4212 # deploymentStatus
4213 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4214
4215 ns_status, ns_status_info = self.RO.check_action_status(desc)
4216 if ns_status == "ERROR":
4217 raise ROclient.ROClientException(ns_status_info)
4218 elif ns_status == "BUILD":
4219 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4220 elif ns_status == "ACTIVE":
4221 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4222 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4223 break
4224 else:
4225 assert (
4226 False
4227 ), "ROclient.check_action_status returns unknown {}".format(
4228 ns_status
4229 )
4230 if stage[2] != detailed_status_old:
4231 detailed_status_old = stage[2]
4232 db_nsr_update["detailed-status"] = " ".join(stage)
4233 self._write_op_status(nslcmop_id, stage)
4234 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4235 await asyncio.sleep(5, loop=self.loop)
4236 delete_timeout -= 5
4237 else: # delete_timeout <= 0:
4238 raise ROclient.ROClientException(
4239 "Timeout waiting ns deleted from VIM"
4240 )
4241
4242 except Exception as e:
4243 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4244 if (
4245 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4246 ): # not found
4247 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4248 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4249 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4250 self.logger.debug(
4251 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4252 )
4253 elif (
4254 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4255 ): # conflict
4256 failed_detail.append("delete conflict: {}".format(e))
4257 self.logger.debug(
4258 logging_text
4259 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4260 )
4261 else:
4262 failed_detail.append("delete error: {}".format(e))
4263 self.logger.error(
4264 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4265 )
4266
4267 # Delete nsd
4268 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4269 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4270 try:
4271 stage[2] = "Deleting nsd from RO."
4272 db_nsr_update["detailed-status"] = " ".join(stage)
4273 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4274 self._write_op_status(nslcmop_id, stage)
4275 await self.RO.delete("nsd", ro_nsd_id)
4276 self.logger.debug(
4277 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4278 )
4279 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4280 except Exception as e:
4281 if (
4282 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4283 ): # not found
4284 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4285 self.logger.debug(
4286 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4287 )
4288 elif (
4289 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4290 ): # conflict
4291 failed_detail.append(
4292 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4293 )
4294 self.logger.debug(logging_text + failed_detail[-1])
4295 else:
4296 failed_detail.append(
4297 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4298 )
4299 self.logger.error(logging_text + failed_detail[-1])
4300
4301 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4302 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4303 if not vnf_deployed or not vnf_deployed["id"]:
4304 continue
4305 try:
4306 ro_vnfd_id = vnf_deployed["id"]
4307 stage[
4308 2
4309 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4310 vnf_deployed["member-vnf-index"], ro_vnfd_id
4311 )
4312 db_nsr_update["detailed-status"] = " ".join(stage)
4313 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4314 self._write_op_status(nslcmop_id, stage)
4315 await self.RO.delete("vnfd", ro_vnfd_id)
4316 self.logger.debug(
4317 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4318 )
4319 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4320 except Exception as e:
4321 if (
4322 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4323 ): # not found
4324 db_nsr_update[
4325 "_admin.deployed.RO.vnfd.{}.id".format(index)
4326 ] = None
4327 self.logger.debug(
4328 logging_text
4329 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4330 )
4331 elif (
4332 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4333 ): # conflict
4334 failed_detail.append(
4335 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4336 )
4337 self.logger.debug(logging_text + failed_detail[-1])
4338 else:
4339 failed_detail.append(
4340 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4341 )
4342 self.logger.error(logging_text + failed_detail[-1])
4343
4344 if failed_detail:
4345 stage[2] = "Error deleting from VIM"
4346 else:
4347 stage[2] = "Deleted from VIM"
4348 db_nsr_update["detailed-status"] = " ".join(stage)
4349 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4350 self._write_op_status(nslcmop_id, stage)
4351
4352 if failed_detail:
4353 raise LcmException("; ".join(failed_detail))
4354
4355 async def terminate(self, nsr_id, nslcmop_id):
4356 # Try to lock HA task here
4357 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4358 if not task_is_locked_by_me:
4359 return
4360
4361 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4362 self.logger.debug(logging_text + "Enter")
4363 timeout_ns_terminate = self.timeout_ns_terminate
4364 db_nsr = None
4365 db_nslcmop = None
4366 operation_params = None
4367 exc = None
4368 error_list = [] # annotates all failed error messages
4369 db_nslcmop_update = {}
4370 autoremove = False # autoremove after terminated
4371 tasks_dict_info = {}
4372 db_nsr_update = {}
4373 stage = [
4374 "Stage 1/3: Preparing task.",
4375 "Waiting for previous operations to terminate.",
4376 "",
4377 ]
4378 # ^ contains [stage, step, VIM-status]
4379 try:
4380 # wait for any previous tasks in process
4381 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4382
4383 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4384 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4385 operation_params = db_nslcmop.get("operationParams") or {}
4386 if operation_params.get("timeout_ns_terminate"):
4387 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4388 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4389 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4390
4391 db_nsr_update["operational-status"] = "terminating"
4392 db_nsr_update["config-status"] = "terminating"
4393 self._write_ns_status(
4394 nsr_id=nsr_id,
4395 ns_state="TERMINATING",
4396 current_operation="TERMINATING",
4397 current_operation_id=nslcmop_id,
4398 other_update=db_nsr_update,
4399 )
4400 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4401 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4402 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4403 return
4404
4405 stage[1] = "Getting vnf descriptors from db."
4406 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4407 db_vnfrs_dict = {
4408 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4409 }
4410 db_vnfds_from_id = {}
4411 db_vnfds_from_member_index = {}
4412 # Loop over VNFRs
4413 for vnfr in db_vnfrs_list:
4414 vnfd_id = vnfr["vnfd-id"]
4415 if vnfd_id not in db_vnfds_from_id:
4416 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4417 db_vnfds_from_id[vnfd_id] = vnfd
4418 db_vnfds_from_member_index[
4419 vnfr["member-vnf-index-ref"]
4420 ] = db_vnfds_from_id[vnfd_id]
4421
4422 # Destroy individual execution environments when there are terminating primitives.
4423 # Rest of EE will be deleted at once
4424 # TODO - check before calling _destroy_N2VC
4425 # if not operation_params.get("skip_terminate_primitives"):#
4426 # or not vca.get("needed_terminate"):
4427 stage[0] = "Stage 2/3 execute terminating primitives."
4428 self.logger.debug(logging_text + stage[0])
4429 stage[1] = "Looking execution environment that needs terminate."
4430 self.logger.debug(logging_text + stage[1])
4431
4432 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4433 config_descriptor = None
4434 vca_member_vnf_index = vca.get("member-vnf-index")
4435 vca_id = self.get_vca_id(
4436 db_vnfrs_dict.get(vca_member_vnf_index)
4437 if vca_member_vnf_index
4438 else None,
4439 db_nsr,
4440 )
4441 if not vca or not vca.get("ee_id"):
4442 continue
4443 if not vca.get("member-vnf-index"):
4444 # ns
4445 config_descriptor = db_nsr.get("ns-configuration")
4446 elif vca.get("vdu_id"):
4447 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4448 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4449 elif vca.get("kdu_name"):
4450 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4451 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4452 else:
4453 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4454 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4455 vca_type = vca.get("type")
4456 exec_terminate_primitives = not operation_params.get(
4457 "skip_terminate_primitives"
4458 ) and vca.get("needed_terminate")
4459 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4460 # pending native charms
4461 destroy_ee = (
4462 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4463 )
4464 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4465 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4466 task = asyncio.ensure_future(
4467 self.destroy_N2VC(
4468 logging_text,
4469 db_nslcmop,
4470 vca,
4471 config_descriptor,
4472 vca_index,
4473 destroy_ee,
4474 exec_terminate_primitives,
4475 vca_id=vca_id,
4476 )
4477 )
4478 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4479
4480 # wait for pending tasks of terminate primitives
4481 if tasks_dict_info:
4482 self.logger.debug(
4483 logging_text
4484 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4485 )
4486 error_list = await self._wait_for_tasks(
4487 logging_text,
4488 tasks_dict_info,
4489 min(self.timeout_charm_delete, timeout_ns_terminate),
4490 stage,
4491 nslcmop_id,
4492 )
4493 tasks_dict_info.clear()
4494 if error_list:
4495 return # raise LcmException("; ".join(error_list))
4496
4497 # remove All execution environments at once
4498 stage[0] = "Stage 3/3 delete all."
4499
4500 if nsr_deployed.get("VCA"):
4501 stage[1] = "Deleting all execution environments."
4502 self.logger.debug(logging_text + stage[1])
4503 vca_id = self.get_vca_id({}, db_nsr)
4504 task_delete_ee = asyncio.ensure_future(
4505 asyncio.wait_for(
4506 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4507 timeout=self.timeout_charm_delete,
4508 )
4509 )
4510 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4511 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4512
4513 # Delete from k8scluster
4514 stage[1] = "Deleting KDUs."
4515 self.logger.debug(logging_text + stage[1])
4516 # print(nsr_deployed)
4517 for kdu in get_iterable(nsr_deployed, "K8s"):
4518 if not kdu or not kdu.get("kdu-instance"):
4519 continue
4520 kdu_instance = kdu.get("kdu-instance")
4521 if kdu.get("k8scluster-type") in self.k8scluster_map:
4522 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4523 vca_id = self.get_vca_id({}, db_nsr)
4524 task_delete_kdu_instance = asyncio.ensure_future(
4525 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4526 cluster_uuid=kdu.get("k8scluster-uuid"),
4527 kdu_instance=kdu_instance,
4528 vca_id=vca_id,
4529 namespace=kdu.get("namespace"),
4530 )
4531 )
4532 else:
4533 self.logger.error(
4534 logging_text
4535 + "Unknown k8s deployment type {}".format(
4536 kdu.get("k8scluster-type")
4537 )
4538 )
4539 continue
4540 tasks_dict_info[
4541 task_delete_kdu_instance
4542 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4543
4544 # remove from RO
4545 stage[1] = "Deleting ns from VIM."
4546 if self.ng_ro:
4547 task_delete_ro = asyncio.ensure_future(
4548 self._terminate_ng_ro(
4549 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4550 )
4551 )
4552 else:
4553 task_delete_ro = asyncio.ensure_future(
4554 self._terminate_RO(
4555 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4556 )
4557 )
4558 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4559
4560 # rest of staff will be done at finally
4561
4562 except (
4563 ROclient.ROClientException,
4564 DbException,
4565 LcmException,
4566 N2VCException,
4567 ) as e:
4568 self.logger.error(logging_text + "Exit Exception {}".format(e))
4569 exc = e
4570 except asyncio.CancelledError:
4571 self.logger.error(
4572 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4573 )
4574 exc = "Operation was cancelled"
4575 except Exception as e:
4576 exc = traceback.format_exc()
4577 self.logger.critical(
4578 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4579 exc_info=True,
4580 )
4581 finally:
4582 if exc:
4583 error_list.append(str(exc))
4584 try:
4585 # wait for pending tasks
4586 if tasks_dict_info:
4587 stage[1] = "Waiting for terminate pending tasks."
4588 self.logger.debug(logging_text + stage[1])
4589 error_list += await self._wait_for_tasks(
4590 logging_text,
4591 tasks_dict_info,
4592 timeout_ns_terminate,
4593 stage,
4594 nslcmop_id,
4595 )
4596 stage[1] = stage[2] = ""
4597 except asyncio.CancelledError:
4598 error_list.append("Cancelled")
4599 # TODO cancell all tasks
4600 except Exception as exc:
4601 error_list.append(str(exc))
4602 # update status at database
4603 if error_list:
4604 error_detail = "; ".join(error_list)
4605 # self.logger.error(logging_text + error_detail)
4606 error_description_nslcmop = "{} Detail: {}".format(
4607 stage[0], error_detail
4608 )
4609 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4610 nslcmop_id, stage[0]
4611 )
4612
4613 db_nsr_update["operational-status"] = "failed"
4614 db_nsr_update["detailed-status"] = (
4615 error_description_nsr + " Detail: " + error_detail
4616 )
4617 db_nslcmop_update["detailed-status"] = error_detail
4618 nslcmop_operation_state = "FAILED"
4619 ns_state = "BROKEN"
4620 else:
4621 error_detail = None
4622 error_description_nsr = error_description_nslcmop = None
4623 ns_state = "NOT_INSTANTIATED"
4624 db_nsr_update["operational-status"] = "terminated"
4625 db_nsr_update["detailed-status"] = "Done"
4626 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4627 db_nslcmop_update["detailed-status"] = "Done"
4628 nslcmop_operation_state = "COMPLETED"
4629
4630 if db_nsr:
4631 self._write_ns_status(
4632 nsr_id=nsr_id,
4633 ns_state=ns_state,
4634 current_operation="IDLE",
4635 current_operation_id=None,
4636 error_description=error_description_nsr,
4637 error_detail=error_detail,
4638 other_update=db_nsr_update,
4639 )
4640 self._write_op_status(
4641 op_id=nslcmop_id,
4642 stage="",
4643 error_message=error_description_nslcmop,
4644 operation_state=nslcmop_operation_state,
4645 other_update=db_nslcmop_update,
4646 )
4647 if ns_state == "NOT_INSTANTIATED":
4648 try:
4649 self.db.set_list(
4650 "vnfrs",
4651 {"nsr-id-ref": nsr_id},
4652 {"_admin.nsState": "NOT_INSTANTIATED"},
4653 )
4654 except DbException as e:
4655 self.logger.warn(
4656 logging_text
4657 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4658 nsr_id, e
4659 )
4660 )
4661 if operation_params:
4662 autoremove = operation_params.get("autoremove", False)
4663 if nslcmop_operation_state:
4664 try:
4665 await self.msg.aiowrite(
4666 "ns",
4667 "terminated",
4668 {
4669 "nsr_id": nsr_id,
4670 "nslcmop_id": nslcmop_id,
4671 "operationState": nslcmop_operation_state,
4672 "autoremove": autoremove,
4673 },
4674 loop=self.loop,
4675 )
4676 except Exception as e:
4677 self.logger.error(
4678 logging_text + "kafka_write notification Exception {}".format(e)
4679 )
4680
4681 self.logger.debug(logging_text + "Exit")
4682 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4683
4684 async def _wait_for_tasks(
4685 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4686 ):
4687 time_start = time()
4688 error_detail_list = []
4689 error_list = []
4690 pending_tasks = list(created_tasks_info.keys())
4691 num_tasks = len(pending_tasks)
4692 num_done = 0
4693 stage[1] = "{}/{}.".format(num_done, num_tasks)
4694 self._write_op_status(nslcmop_id, stage)
4695 while pending_tasks:
4696 new_error = None
4697 _timeout = timeout + time_start - time()
4698 done, pending_tasks = await asyncio.wait(
4699 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4700 )
4701 num_done += len(done)
4702 if not done: # Timeout
4703 for task in pending_tasks:
4704 new_error = created_tasks_info[task] + ": Timeout"
4705 error_detail_list.append(new_error)
4706 error_list.append(new_error)
4707 break
4708 for task in done:
4709 if task.cancelled():
4710 exc = "Cancelled"
4711 else:
4712 exc = task.exception()
4713 if exc:
4714 if isinstance(exc, asyncio.TimeoutError):
4715 exc = "Timeout"
4716 new_error = created_tasks_info[task] + ": {}".format(exc)
4717 error_list.append(created_tasks_info[task])
4718 error_detail_list.append(new_error)
4719 if isinstance(
4720 exc,
4721 (
4722 str,
4723 DbException,
4724 N2VCException,
4725 ROclient.ROClientException,
4726 LcmException,
4727 K8sException,
4728 NgRoException,
4729 ),
4730 ):
4731 self.logger.error(logging_text + new_error)
4732 else:
4733 exc_traceback = "".join(
4734 traceback.format_exception(None, exc, exc.__traceback__)
4735 )
4736 self.logger.error(
4737 logging_text
4738 + created_tasks_info[task]
4739 + " "
4740 + exc_traceback
4741 )
4742 else:
4743 self.logger.debug(
4744 logging_text + created_tasks_info[task] + ": Done"
4745 )
4746 stage[1] = "{}/{}.".format(num_done, num_tasks)
4747 if new_error:
4748 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4749 if nsr_id: # update also nsr
4750 self.update_db_2(
4751 "nsrs",
4752 nsr_id,
4753 {
4754 "errorDescription": "Error at: " + ", ".join(error_list),
4755 "errorDetail": ". ".join(error_detail_list),
4756 },
4757 )
4758 self._write_op_status(nslcmop_id, stage)
4759 return error_detail_list
4760
4761 @staticmethod
4762 def _map_primitive_params(primitive_desc, params, instantiation_params):
4763 """
4764 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4765 The default-value is used. If it is between < > it look for a value at instantiation_params
4766 :param primitive_desc: portion of VNFD/NSD that describes primitive
4767 :param params: Params provided by user
4768 :param instantiation_params: Instantiation params provided by user
4769 :return: a dictionary with the calculated params
4770 """
4771 calculated_params = {}
4772 for parameter in primitive_desc.get("parameter", ()):
4773 param_name = parameter["name"]
4774 if param_name in params:
4775 calculated_params[param_name] = params[param_name]
4776 elif "default-value" in parameter or "value" in parameter:
4777 if "value" in parameter:
4778 calculated_params[param_name] = parameter["value"]
4779 else:
4780 calculated_params[param_name] = parameter["default-value"]
4781 if (
4782 isinstance(calculated_params[param_name], str)
4783 and calculated_params[param_name].startswith("<")
4784 and calculated_params[param_name].endswith(">")
4785 ):
4786 if calculated_params[param_name][1:-1] in instantiation_params:
4787 calculated_params[param_name] = instantiation_params[
4788 calculated_params[param_name][1:-1]
4789 ]
4790 else:
4791 raise LcmException(
4792 "Parameter {} needed to execute primitive {} not provided".format(
4793 calculated_params[param_name], primitive_desc["name"]
4794 )
4795 )
4796 else:
4797 raise LcmException(
4798 "Parameter {} needed to execute primitive {} not provided".format(
4799 param_name, primitive_desc["name"]
4800 )
4801 )
4802
4803 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4804 calculated_params[param_name] = yaml.safe_dump(
4805 calculated_params[param_name], default_flow_style=True, width=256
4806 )
4807 elif isinstance(calculated_params[param_name], str) and calculated_params[
4808 param_name
4809 ].startswith("!!yaml "):
4810 calculated_params[param_name] = calculated_params[param_name][7:]
4811 if parameter.get("data-type") == "INTEGER":
4812 try:
4813 calculated_params[param_name] = int(calculated_params[param_name])
4814 except ValueError: # error converting string to int
4815 raise LcmException(
4816 "Parameter {} of primitive {} must be integer".format(
4817 param_name, primitive_desc["name"]
4818 )
4819 )
4820 elif parameter.get("data-type") == "BOOLEAN":
4821 calculated_params[param_name] = not (
4822 (str(calculated_params[param_name])).lower() == "false"
4823 )
4824
4825 # add always ns_config_info if primitive name is config
4826 if primitive_desc["name"] == "config":
4827 if "ns_config_info" in instantiation_params:
4828 calculated_params["ns_config_info"] = instantiation_params[
4829 "ns_config_info"
4830 ]
4831 return calculated_params
4832
4833 def _look_for_deployed_vca(
4834 self,
4835 deployed_vca,
4836 member_vnf_index,
4837 vdu_id,
4838 vdu_count_index,
4839 kdu_name=None,
4840 ee_descriptor_id=None,
4841 ):
4842 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4843 for vca in deployed_vca:
4844 if not vca:
4845 continue
4846 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4847 continue
4848 if (
4849 vdu_count_index is not None
4850 and vdu_count_index != vca["vdu_count_index"]
4851 ):
4852 continue
4853 if kdu_name and kdu_name != vca["kdu_name"]:
4854 continue
4855 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4856 continue
4857 break
4858 else:
4859 # vca_deployed not found
4860 raise LcmException(
4861 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4862 " is not deployed".format(
4863 member_vnf_index,
4864 vdu_id,
4865 vdu_count_index,
4866 kdu_name,
4867 ee_descriptor_id,
4868 )
4869 )
4870 # get ee_id
4871 ee_id = vca.get("ee_id")
4872 vca_type = vca.get(
4873 "type", "lxc_proxy_charm"
4874 ) # default value for backward compatibility - proxy charm
4875 if not ee_id:
4876 raise LcmException(
4877 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4878 "execution environment".format(
4879 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4880 )
4881 )
4882 return ee_id, vca_type
4883
4884 async def _ns_execute_primitive(
4885 self,
4886 ee_id,
4887 primitive,
4888 primitive_params,
4889 retries=0,
4890 retries_interval=30,
4891 timeout=None,
4892 vca_type=None,
4893 db_dict=None,
4894 vca_id: str = None,
4895 ) -> (str, str):
4896 try:
4897 if primitive == "config":
4898 primitive_params = {"params": primitive_params}
4899
4900 vca_type = vca_type or "lxc_proxy_charm"
4901
4902 while retries >= 0:
4903 try:
4904 output = await asyncio.wait_for(
4905 self.vca_map[vca_type].exec_primitive(
4906 ee_id=ee_id,
4907 primitive_name=primitive,
4908 params_dict=primitive_params,
4909 progress_timeout=self.timeout_progress_primitive,
4910 total_timeout=self.timeout_primitive,
4911 db_dict=db_dict,
4912 vca_id=vca_id,
4913 vca_type=vca_type,
4914 ),
4915 timeout=timeout or self.timeout_primitive,
4916 )
4917 # execution was OK
4918 break
4919 except asyncio.CancelledError:
4920 raise
4921 except Exception as e: # asyncio.TimeoutError
4922 if isinstance(e, asyncio.TimeoutError):
4923 e = "Timeout"
4924 retries -= 1
4925 if retries >= 0:
4926 self.logger.debug(
4927 "Error executing action {} on {} -> {}".format(
4928 primitive, ee_id, e
4929 )
4930 )
4931 # wait and retry
4932 await asyncio.sleep(retries_interval, loop=self.loop)
4933 else:
4934 return "FAILED", str(e)
4935
4936 return "COMPLETED", output
4937
4938 except (LcmException, asyncio.CancelledError):
4939 raise
4940 except Exception as e:
4941 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4942
4943 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4944 """
4945 Updating the vca_status with latest juju information in nsrs record
4946 :param: nsr_id: Id of the nsr
4947 :param: nslcmop_id: Id of the nslcmop
4948 :return: None
4949 """
4950
4951 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4952 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4953 vca_id = self.get_vca_id({}, db_nsr)
4954 if db_nsr["_admin"]["deployed"]["K8s"]:
4955 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4956 cluster_uuid, kdu_instance, cluster_type = (
4957 k8s["k8scluster-uuid"],
4958 k8s["kdu-instance"],
4959 k8s["k8scluster-type"],
4960 )
4961 await self._on_update_k8s_db(
4962 cluster_uuid=cluster_uuid,
4963 kdu_instance=kdu_instance,
4964 filter={"_id": nsr_id},
4965 vca_id=vca_id,
4966 cluster_type=cluster_type,
4967 )
4968 else:
4969 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4970 table, filter = "nsrs", {"_id": nsr_id}
4971 path = "_admin.deployed.VCA.{}.".format(vca_index)
4972 await self._on_update_n2vc_db(table, filter, path, {})
4973
4974 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4975 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4976
4977 async def action(self, nsr_id, nslcmop_id):
4978 # Try to lock HA task here
4979 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4980 if not task_is_locked_by_me:
4981 return
4982
4983 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4984 self.logger.debug(logging_text + "Enter")
4985 # get all needed from database
4986 db_nsr = None
4987 db_nslcmop = None
4988 db_nsr_update = {}
4989 db_nslcmop_update = {}
4990 nslcmop_operation_state = None
4991 error_description_nslcmop = None
4992 exc = None
4993 try:
4994 # wait for any previous tasks in process
4995 step = "Waiting for previous operations to terminate"
4996 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4997
4998 self._write_ns_status(
4999 nsr_id=nsr_id,
5000 ns_state=None,
5001 current_operation="RUNNING ACTION",
5002 current_operation_id=nslcmop_id,
5003 )
5004
5005 step = "Getting information from database"
5006 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5007 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5008 if db_nslcmop["operationParams"].get("primitive_params"):
5009 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
5010 db_nslcmop["operationParams"]["primitive_params"]
5011 )
5012
5013 nsr_deployed = db_nsr["_admin"].get("deployed")
5014 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
5015 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
5016 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
5017 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
5018 primitive = db_nslcmop["operationParams"]["primitive"]
5019 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
5020 timeout_ns_action = db_nslcmop["operationParams"].get(
5021 "timeout_ns_action", self.timeout_primitive
5022 )
5023
5024 if vnf_index:
5025 step = "Getting vnfr from database"
5026 db_vnfr = self.db.get_one(
5027 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5028 )
5029 if db_vnfr.get("kdur"):
5030 kdur_list = []
5031 for kdur in db_vnfr["kdur"]:
5032 if kdur.get("additionalParams"):
5033 kdur["additionalParams"] = json.loads(
5034 kdur["additionalParams"]
5035 )
5036 kdur_list.append(kdur)
5037 db_vnfr["kdur"] = kdur_list
5038 step = "Getting vnfd from database"
5039 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5040
5041 # Sync filesystem before running a primitive
5042 self.fs.sync(db_vnfr["vnfd-id"])
5043 else:
5044 step = "Getting nsd from database"
5045 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5046
5047 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5048 # for backward compatibility
5049 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5050 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5051 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5052 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5053
5054 # look for primitive
5055 config_primitive_desc = descriptor_configuration = None
5056 if vdu_id:
5057 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5058 elif kdu_name:
5059 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5060 elif vnf_index:
5061 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5062 else:
5063 descriptor_configuration = db_nsd.get("ns-configuration")
5064
5065 if descriptor_configuration and descriptor_configuration.get(
5066 "config-primitive"
5067 ):
5068 for config_primitive in descriptor_configuration["config-primitive"]:
5069 if config_primitive["name"] == primitive:
5070 config_primitive_desc = config_primitive
5071 break
5072
5073 if not config_primitive_desc:
5074 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5075 raise LcmException(
5076 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5077 primitive
5078 )
5079 )
5080 primitive_name = primitive
5081 ee_descriptor_id = None
5082 else:
5083 primitive_name = config_primitive_desc.get(
5084 "execution-environment-primitive", primitive
5085 )
5086 ee_descriptor_id = config_primitive_desc.get(
5087 "execution-environment-ref"
5088 )
5089
5090 if vnf_index:
5091 if vdu_id:
5092 vdur = next(
5093 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5094 )
5095 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5096 elif kdu_name:
5097 kdur = next(
5098 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5099 )
5100 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5101 else:
5102 desc_params = parse_yaml_strings(
5103 db_vnfr.get("additionalParamsForVnf")
5104 )
5105 else:
5106 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5107 if kdu_name and get_configuration(db_vnfd, kdu_name):
5108 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5109 actions = set()
5110 for primitive in kdu_configuration.get("initial-config-primitive", []):
5111 actions.add(primitive["name"])
5112 for primitive in kdu_configuration.get("config-primitive", []):
5113 actions.add(primitive["name"])
5114 kdu = find_in_list(
5115 nsr_deployed["K8s"],
5116 lambda kdu: kdu_name == kdu["kdu-name"]
5117 and kdu["member-vnf-index"] == vnf_index,
5118 )
5119 kdu_action = (
5120 True
5121 if primitive_name in actions
5122 and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5123 else False
5124 )
5125
5126 # TODO check if ns is in a proper status
5127 if kdu_name and (
5128 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5129 ):
5130 # kdur and desc_params already set from before
5131 if primitive_params:
5132 desc_params.update(primitive_params)
5133 # TODO Check if we will need something at vnf level
5134 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5135 if (
5136 kdu_name == kdu["kdu-name"]
5137 and kdu["member-vnf-index"] == vnf_index
5138 ):
5139 break
5140 else:
5141 raise LcmException(
5142 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5143 )
5144
5145 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5146 msg = "unknown k8scluster-type '{}'".format(
5147 kdu.get("k8scluster-type")
5148 )
5149 raise LcmException(msg)
5150
5151 db_dict = {
5152 "collection": "nsrs",
5153 "filter": {"_id": nsr_id},
5154 "path": "_admin.deployed.K8s.{}".format(index),
5155 }
5156 self.logger.debug(
5157 logging_text
5158 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5159 )
5160 step = "Executing kdu {}".format(primitive_name)
5161 if primitive_name == "upgrade":
5162 if desc_params.get("kdu_model"):
5163 kdu_model = desc_params.get("kdu_model")
5164 del desc_params["kdu_model"]
5165 else:
5166 kdu_model = kdu.get("kdu-model")
5167 parts = kdu_model.split(sep=":")
5168 if len(parts) == 2:
5169 kdu_model = parts[0]
5170
5171 detailed_status = await asyncio.wait_for(
5172 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5173 cluster_uuid=kdu.get("k8scluster-uuid"),
5174 kdu_instance=kdu.get("kdu-instance"),
5175 atomic=True,
5176 kdu_model=kdu_model,
5177 params=desc_params,
5178 db_dict=db_dict,
5179 timeout=timeout_ns_action,
5180 ),
5181 timeout=timeout_ns_action + 10,
5182 )
5183 self.logger.debug(
5184 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5185 )
5186 elif primitive_name == "rollback":
5187 detailed_status = await asyncio.wait_for(
5188 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5189 cluster_uuid=kdu.get("k8scluster-uuid"),
5190 kdu_instance=kdu.get("kdu-instance"),
5191 db_dict=db_dict,
5192 ),
5193 timeout=timeout_ns_action,
5194 )
5195 elif primitive_name == "status":
5196 detailed_status = await asyncio.wait_for(
5197 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5198 cluster_uuid=kdu.get("k8scluster-uuid"),
5199 kdu_instance=kdu.get("kdu-instance"),
5200 vca_id=vca_id,
5201 ),
5202 timeout=timeout_ns_action,
5203 )
5204 else:
5205 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5206 kdu["kdu-name"], nsr_id
5207 )
5208 params = self._map_primitive_params(
5209 config_primitive_desc, primitive_params, desc_params
5210 )
5211
5212 detailed_status = await asyncio.wait_for(
5213 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5214 cluster_uuid=kdu.get("k8scluster-uuid"),
5215 kdu_instance=kdu_instance,
5216 primitive_name=primitive_name,
5217 params=params,
5218 db_dict=db_dict,
5219 timeout=timeout_ns_action,
5220 vca_id=vca_id,
5221 ),
5222 timeout=timeout_ns_action,
5223 )
5224
5225 if detailed_status:
5226 nslcmop_operation_state = "COMPLETED"
5227 else:
5228 detailed_status = ""
5229 nslcmop_operation_state = "FAILED"
5230 else:
5231 ee_id, vca_type = self._look_for_deployed_vca(
5232 nsr_deployed["VCA"],
5233 member_vnf_index=vnf_index,
5234 vdu_id=vdu_id,
5235 vdu_count_index=vdu_count_index,
5236 ee_descriptor_id=ee_descriptor_id,
5237 )
5238 for vca_index, vca_deployed in enumerate(
5239 db_nsr["_admin"]["deployed"]["VCA"]
5240 ):
5241 if vca_deployed.get("member-vnf-index") == vnf_index:
5242 db_dict = {
5243 "collection": "nsrs",
5244 "filter": {"_id": nsr_id},
5245 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5246 }
5247 break
5248 (
5249 nslcmop_operation_state,
5250 detailed_status,
5251 ) = await self._ns_execute_primitive(
5252 ee_id,
5253 primitive=primitive_name,
5254 primitive_params=self._map_primitive_params(
5255 config_primitive_desc, primitive_params, desc_params
5256 ),
5257 timeout=timeout_ns_action,
5258 vca_type=vca_type,
5259 db_dict=db_dict,
5260 vca_id=vca_id,
5261 )
5262
5263 db_nslcmop_update["detailed-status"] = detailed_status
5264 error_description_nslcmop = (
5265 detailed_status if nslcmop_operation_state == "FAILED" else ""
5266 )
5267 self.logger.debug(
5268 logging_text
5269 + " task Done with result {} {}".format(
5270 nslcmop_operation_state, detailed_status
5271 )
5272 )
5273 return # database update is called inside finally
5274
5275 except (DbException, LcmException, N2VCException, K8sException) as e:
5276 self.logger.error(logging_text + "Exit Exception {}".format(e))
5277 exc = e
5278 except asyncio.CancelledError:
5279 self.logger.error(
5280 logging_text + "Cancelled Exception while '{}'".format(step)
5281 )
5282 exc = "Operation was cancelled"
5283 except asyncio.TimeoutError:
5284 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5285 exc = "Timeout"
5286 except Exception as e:
5287 exc = traceback.format_exc()
5288 self.logger.critical(
5289 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5290 exc_info=True,
5291 )
5292 finally:
5293 if exc:
5294 db_nslcmop_update[
5295 "detailed-status"
5296 ] = (
5297 detailed_status
5298 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5299 nslcmop_operation_state = "FAILED"
5300 if db_nsr:
5301 self._write_ns_status(
5302 nsr_id=nsr_id,
5303 ns_state=db_nsr[
5304 "nsState"
5305 ], # TODO check if degraded. For the moment use previous status
5306 current_operation="IDLE",
5307 current_operation_id=None,
5308 # error_description=error_description_nsr,
5309 # error_detail=error_detail,
5310 other_update=db_nsr_update,
5311 )
5312
5313 self._write_op_status(
5314 op_id=nslcmop_id,
5315 stage="",
5316 error_message=error_description_nslcmop,
5317 operation_state=nslcmop_operation_state,
5318 other_update=db_nslcmop_update,
5319 )
5320
5321 if nslcmop_operation_state:
5322 try:
5323 await self.msg.aiowrite(
5324 "ns",
5325 "actioned",
5326 {
5327 "nsr_id": nsr_id,
5328 "nslcmop_id": nslcmop_id,
5329 "operationState": nslcmop_operation_state,
5330 },
5331 loop=self.loop,
5332 )
5333 except Exception as e:
5334 self.logger.error(
5335 logging_text + "kafka_write notification Exception {}".format(e)
5336 )
5337 self.logger.debug(logging_text + "Exit")
5338 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5339 return nslcmop_operation_state, detailed_status
5340
5341 async def terminate_vdus(
5342 self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
5343 ):
5344 """This method terminates VDUs
5345
5346 Args:
5347 db_vnfr: VNF instance record
5348 member_vnf_index: VNF index to identify the VDUs to be removed
5349 db_nsr: NS instance record
5350 update_db_nslcmops: Nslcmop update record
5351 """
5352 vca_scaling_info = []
5353 scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5354 scaling_info["scaling_direction"] = "IN"
5355 scaling_info["vdu-delete"] = {}
5356 scaling_info["kdu-delete"] = {}
5357 db_vdur = db_vnfr.get("vdur")
5358 vdur_list = copy(db_vdur)
5359 count_index = 0
5360 for index, vdu in enumerate(vdur_list):
5361 vca_scaling_info.append(
5362 {
5363 "osm_vdu_id": vdu["vdu-id-ref"],
5364 "member-vnf-index": member_vnf_index,
5365 "type": "delete",
5366 "vdu_index": count_index,
5367 })
5368 scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
5369 scaling_info["vdu"].append(
5370 {
5371 "name": vdu.get("name") or vdu.get("vdu-name"),
5372 "vdu_id": vdu["vdu-id-ref"],
5373 "interface": [],
5374 })
5375 for interface in vdu["interfaces"]:
5376 scaling_info["vdu"][index]["interface"].append(
5377 {
5378 "name": interface["name"],
5379 "ip_address": interface["ip-address"],
5380 "mac_address": interface.get("mac-address"),
5381 })
5382 self.logger.info("NS update scaling info{}".format(scaling_info))
5383 stage[2] = "Terminating VDUs"
5384 if scaling_info.get("vdu-delete"):
5385 # scale_process = "RO"
5386 if self.ro_config.get("ng"):
5387 await self._scale_ng_ro(
5388 logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
5389 )
5390
5391 async def remove_vnf(
5392 self, nsr_id, nslcmop_id, vnf_instance_id
5393 ):
5394 """This method is to Remove VNF instances from NS.
5395
5396 Args:
5397 nsr_id: NS instance id
5398 nslcmop_id: nslcmop id of update
5399 vnf_instance_id: id of the VNF instance to be removed
5400
5401 Returns:
5402 result: (str, str) COMPLETED/FAILED, details
5403 """
5404 try:
5405 db_nsr_update = {}
5406 logging_text = "Task ns={} update ".format(nsr_id)
5407 check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
5408 self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
5409 if check_vnfr_count > 1:
5410 stage = ["", "", ""]
5411 step = "Getting nslcmop from database"
5412 self.logger.debug(step + " after having waited for previous tasks to be completed")
5413 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5414 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5415 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5416 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5417 """ db_vnfr = self.db.get_one(
5418 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5419
5420 update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5421 await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
5422
5423 constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
5424 constituent_vnfr.remove(db_vnfr.get("_id"))
5425 db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
5426 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5427 self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
5428 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5429 return "COMPLETED", "Done"
5430 else:
5431 step = "Terminate VNF Failed with"
5432 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5433 vnf_instance_id))
5434 except (LcmException, asyncio.CancelledError):
5435 raise
5436 except Exception as e:
5437 self.logger.debug("Error removing VNF {}".format(e))
5438 return "FAILED", "Error removing VNF {}".format(e)
5439
5440 async def _ns_redeploy_vnf(
5441 self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr,
5442 ):
5443 """This method updates and redeploys VNF instances
5444
5445 Args:
5446 nsr_id: NS instance id
5447 nslcmop_id: nslcmop id
5448 db_vnfd: VNF descriptor
5449 db_vnfr: VNF instance record
5450 db_nsr: NS instance record
5451
5452 Returns:
5453 result: (str, str) COMPLETED/FAILED, details
5454 """
5455 try:
5456 count_index = 0
5457 stage = ["", "", ""]
5458 logging_text = "Task ns={} update ".format(nsr_id)
5459 latest_vnfd_revision = db_vnfd["_admin"].get("revision")
5460 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5461
5462 # Terminate old VNF resources
5463 update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5464 await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
5465
5466 # old_vnfd_id = db_vnfr["vnfd-id"]
5467 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5468 new_db_vnfd = db_vnfd
5469 # new_vnfd_ref = new_db_vnfd["id"]
5470 # new_vnfd_id = vnfd_id
5471
5472 # Create VDUR
5473 new_vnfr_cp = []
5474 for cp in new_db_vnfd.get("ext-cpd", ()):
5475 vnf_cp = {
5476 "name": cp.get("id"),
5477 "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
5478 "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
5479 "id": cp.get("id"),
5480 }
5481 new_vnfr_cp.append(vnf_cp)
5482 new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
5483 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5484 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5485 new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5486 self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
5487 updated_db_vnfr = self.db.get_one(
5488 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}
5489 )
5490
5491 # Instantiate new VNF resources
5492 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5493 vca_scaling_info = []
5494 scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5495 scaling_info["scaling_direction"] = "OUT"
5496 scaling_info["vdu-create"] = {}
5497 scaling_info["kdu-create"] = {}
5498 vdud_instantiate_list = db_vnfd["vdu"]
5499 for index, vdud in enumerate(vdud_instantiate_list):
5500 cloud_init_text = self._get_vdu_cloud_init_content(
5501 vdud, db_vnfd
5502 )
5503 if cloud_init_text:
5504 additional_params = (
5505 self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
5506 or {}
5507 )
5508 cloud_init_list = []
5509 if cloud_init_text:
5510 # TODO Information of its own ip is not available because db_vnfr is not updated.
5511 additional_params["OSM"] = get_osm_params(
5512 updated_db_vnfr, vdud["id"], 1
5513 )
5514 cloud_init_list.append(
5515 self._parse_cloud_init(
5516 cloud_init_text,
5517 additional_params,
5518 db_vnfd["id"],
5519 vdud["id"],
5520 )
5521 )
5522 vca_scaling_info.append(
5523 {
5524 "osm_vdu_id": vdud["id"],
5525 "member-vnf-index": member_vnf_index,
5526 "type": "create",
5527 "vdu_index": count_index,
5528 }
5529 )
5530 scaling_info["vdu-create"][vdud["id"]] = count_index
5531 if self.ro_config.get("ng"):
5532 self.logger.debug(
5533 "New Resources to be deployed: {}".format(scaling_info))
5534 await self._scale_ng_ro(
5535 logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage
5536 )
5537 return "COMPLETED", "Done"
5538 except (LcmException, asyncio.CancelledError):
5539 raise
5540 except Exception as e:
5541 self.logger.debug("Error updating VNF {}".format(e))
5542 return "FAILED", "Error updating VNF {}".format(e)
5543
5544 async def _ns_charm_upgrade(
5545 self,
5546 ee_id,
5547 charm_id,
5548 charm_type,
5549 path,
5550 timeout: float = None,
5551 ) -> (str, str):
5552 """This method upgrade charms in VNF instances
5553
5554 Args:
5555 ee_id: Execution environment id
5556 path: Local path to the charm
5557 charm_id: charm-id
5558 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5559 timeout: (Float) Timeout for the ns update operation
5560
5561 Returns:
5562 result: (str, str) COMPLETED/FAILED, details
5563 """
5564 try:
5565 charm_type = charm_type or "lxc_proxy_charm"
5566 output = await self.vca_map[charm_type].upgrade_charm(
5567 ee_id=ee_id,
5568 path=path,
5569 charm_id=charm_id,
5570 charm_type=charm_type,
5571 timeout=timeout or self.timeout_ns_update,
5572 )
5573
5574 if output:
5575 return "COMPLETED", output
5576
5577 except (LcmException, asyncio.CancelledError):
5578 raise
5579
5580 except Exception as e:
5581
5582 self.logger.debug("Error upgrading charm {}".format(path))
5583
5584 return "FAILED", "Error upgrading charm {}: {}".format(path, e)
5585
5586 async def update(self, nsr_id, nslcmop_id):
5587 """Update NS according to different update types
5588
5589 This method performs upgrade of VNF instances then updates the revision
5590 number in VNF record
5591
5592 Args:
5593 nsr_id: Network service will be updated
5594 nslcmop_id: ns lcm operation id
5595
5596 Returns:
5597 It may raise DbException, LcmException, N2VCException, K8sException
5598
5599 """
5600 # Try to lock HA task here
5601 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5602 if not task_is_locked_by_me:
5603 return
5604
5605 logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
5606 self.logger.debug(logging_text + "Enter")
5607
5608 # Set the required variables to be filled up later
5609 db_nsr = None
5610 db_nslcmop_update = {}
5611 vnfr_update = {}
5612 nslcmop_operation_state = None
5613 db_nsr_update = {}
5614 error_description_nslcmop = ""
5615 exc = None
5616 change_type = "updated"
5617 detailed_status = ""
5618
5619 try:
5620 # wait for any previous tasks in process
5621 step = "Waiting for previous operations to terminate"
5622 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5623 self._write_ns_status(
5624 nsr_id=nsr_id,
5625 ns_state=None,
5626 current_operation="UPDATING",
5627 current_operation_id=nslcmop_id,
5628 )
5629
5630 step = "Getting nslcmop from database"
5631 db_nslcmop = self.db.get_one(
5632 "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
5633 )
5634 update_type = db_nslcmop["operationParams"]["updateType"]
5635
5636 step = "Getting nsr from database"
5637 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5638 old_operational_status = db_nsr["operational-status"]
5639 db_nsr_update["operational-status"] = "updating"
5640 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5641 nsr_deployed = db_nsr["_admin"].get("deployed")
5642
5643 if update_type == "CHANGE_VNFPKG":
5644
5645 # Get the input parameters given through update request
5646 vnf_instance_id = db_nslcmop["operationParams"][
5647 "changeVnfPackageData"
5648 ].get("vnfInstanceId")
5649
5650 vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
5651 "vnfdId"
5652 )
5653 timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
5654
5655 step = "Getting vnfr from database"
5656 db_vnfr = self.db.get_one(
5657 "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
5658 )
5659
5660 step = "Getting vnfds from database"
5661 # Latest VNFD
5662 latest_vnfd = self.db.get_one(
5663 "vnfds", {"_id": vnfd_id}, fail_on_empty=False
5664 )
5665 latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
5666
5667 # Current VNFD
5668 current_vnf_revision = db_vnfr.get("revision", 1)
5669 current_vnfd = self.db.get_one(
5670 "vnfds_revisions",
5671 {"_id": vnfd_id + ":" + str(current_vnf_revision)},
5672 fail_on_empty=False,
5673 )
5674 # Charm artifact paths will be filled up later
5675 (
5676 current_charm_artifact_path,
5677 target_charm_artifact_path,
5678 charm_artifact_paths,
5679 ) = ([], [], [])
5680
5681 step = "Checking if revision has changed in VNFD"
5682 if current_vnf_revision != latest_vnfd_revision:
5683
5684 change_type = "policy_updated"
5685
5686 # There is new revision of VNFD, update operation is required
5687 current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
5688 latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
5689
5690 step = "Removing the VNFD packages if they exist in the local path"
5691 shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
5692 shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
5693
5694 step = "Get the VNFD packages from FSMongo"
5695 self.fs.sync(from_path=latest_vnfd_path)
5696 self.fs.sync(from_path=current_vnfd_path)
5697
5698 step = (
5699 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5700 )
5701 base_folder = latest_vnfd["_admin"]["storage"]
5702
5703 for charm_index, charm_deployed in enumerate(
5704 get_iterable(nsr_deployed, "VCA")
5705 ):
5706 vnf_index = db_vnfr.get("member-vnf-index-ref")
5707
5708 # Getting charm-id and charm-type
5709 if charm_deployed.get("member-vnf-index") == vnf_index:
5710 charm_id = self.get_vca_id(db_vnfr, db_nsr)
5711 charm_type = charm_deployed.get("type")
5712
5713 # Getting ee-id
5714 ee_id = charm_deployed.get("ee_id")
5715
5716 step = "Getting descriptor config"
5717 descriptor_config = get_configuration(
5718 current_vnfd, current_vnfd["id"]
5719 )
5720
5721 if "execution-environment-list" in descriptor_config:
5722 ee_list = descriptor_config.get(
5723 "execution-environment-list", []
5724 )
5725 else:
5726 ee_list = []
5727
5728 # There could be several charm used in the same VNF
5729 for ee_item in ee_list:
5730 if ee_item.get("juju"):
5731
5732 step = "Getting charm name"
5733 charm_name = ee_item["juju"].get("charm")
5734
5735 step = "Setting Charm artifact paths"
5736 current_charm_artifact_path.append(
5737 get_charm_artifact_path(
5738 base_folder,
5739 charm_name,
5740 charm_type,
5741 current_vnf_revision,
5742 )
5743 )
5744 target_charm_artifact_path.append(
5745 get_charm_artifact_path(
5746 base_folder,
5747 charm_name,
5748 charm_type,
5749 latest_vnfd_revision,
5750 )
5751 )
5752
5753 charm_artifact_paths = zip(
5754 current_charm_artifact_path, target_charm_artifact_path
5755 )
5756
5757 step = "Checking if software version has changed in VNFD"
5758 if find_software_version(current_vnfd) != find_software_version(
5759 latest_vnfd
5760 ):
5761
5762 step = "Checking if existing VNF has charm"
5763 for current_charm_path, target_charm_path in list(
5764 charm_artifact_paths
5765 ):
5766 if current_charm_path:
5767 raise LcmException(
5768 "Software version change is not supported as VNF instance {} has charm.".format(
5769 vnf_instance_id
5770 )
5771 )
5772
5773 # There is no change in the charm package, then redeploy the VNF
5774 # based on new descriptor
5775 step = "Redeploying VNF"
5776 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5777 (
5778 result,
5779 detailed_status
5780 ) = await self._ns_redeploy_vnf(
5781 nsr_id,
5782 nslcmop_id,
5783 latest_vnfd,
5784 db_vnfr,
5785 db_nsr
5786 )
5787 if result == "FAILED":
5788 nslcmop_operation_state = result
5789 error_description_nslcmop = detailed_status
5790 db_nslcmop_update["detailed-status"] = detailed_status
5791 self.logger.debug(
5792 logging_text
5793 + " step {} Done with result {} {}".format(
5794 step, nslcmop_operation_state, detailed_status
5795 )
5796 )
5797
5798 else:
5799 step = "Checking if any charm package has changed or not"
5800 for current_charm_path, target_charm_path in list(
5801 charm_artifact_paths
5802 ):
5803 if (
5804 current_charm_path
5805 and target_charm_path
5806 and self.check_charm_hash_changed(
5807 current_charm_path, target_charm_path
5808 )
5809 ):
5810
5811 step = "Checking whether VNF uses juju bundle"
5812 if check_juju_bundle_existence(current_vnfd):
5813
5814 raise LcmException(
5815 "Charm upgrade is not supported for the instance which"
5816 " uses juju-bundle: {}".format(
5817 check_juju_bundle_existence(current_vnfd)
5818 )
5819 )
5820
5821 step = "Upgrading Charm"
5822 (
5823 result,
5824 detailed_status,
5825 ) = await self._ns_charm_upgrade(
5826 ee_id=ee_id,
5827 charm_id=charm_id,
5828 charm_type=charm_type,
5829 path=self.fs.path + target_charm_path,
5830 timeout=timeout_seconds,
5831 )
5832
5833 if result == "FAILED":
5834 nslcmop_operation_state = result
5835 error_description_nslcmop = detailed_status
5836
5837 db_nslcmop_update["detailed-status"] = detailed_status
5838 self.logger.debug(
5839 logging_text
5840 + " step {} Done with result {} {}".format(
5841 step, nslcmop_operation_state, detailed_status
5842 )
5843 )
5844
5845 step = "Updating policies"
5846 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5847 result = "COMPLETED"
5848 detailed_status = "Done"
5849 db_nslcmop_update["detailed-status"] = "Done"
5850
5851 # If nslcmop_operation_state is None, so any operation is not failed.
5852 if not nslcmop_operation_state:
5853 nslcmop_operation_state = "COMPLETED"
5854
5855 # If update CHANGE_VNFPKG nslcmop_operation is successful
5856 # vnf revision need to be updated
5857 vnfr_update["revision"] = latest_vnfd_revision
5858 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
5859
5860 self.logger.debug(
5861 logging_text
5862 + " task Done with result {} {}".format(
5863 nslcmop_operation_state, detailed_status
5864 )
5865 )
5866 elif update_type == "REMOVE_VNF":
5867 # This part is included in https://osm.etsi.org/gerrit/11876
5868 vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
5869 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5870 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5871 step = "Removing VNF"
5872 (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id)
5873 if result == "FAILED":
5874 nslcmop_operation_state = result
5875 error_description_nslcmop = detailed_status
5876 db_nslcmop_update["detailed-status"] = detailed_status
5877 change_type = "vnf_terminated"
5878 if not nslcmop_operation_state:
5879 nslcmop_operation_state = "COMPLETED"
5880 self.logger.debug(
5881 logging_text
5882 + " task Done with result {} {}".format(
5883 nslcmop_operation_state, detailed_status
5884 )
5885 )
5886
5887 # If nslcmop_operation_state is None, so any operation is not failed.
5888 # All operations are executed in overall.
5889 if not nslcmop_operation_state:
5890 nslcmop_operation_state = "COMPLETED"
5891 db_nsr_update["operational-status"] = old_operational_status
5892
5893 except (DbException, LcmException, N2VCException, K8sException) as e:
5894 self.logger.error(logging_text + "Exit Exception {}".format(e))
5895 exc = e
5896 except asyncio.CancelledError:
5897 self.logger.error(
5898 logging_text + "Cancelled Exception while '{}'".format(step)
5899 )
5900 exc = "Operation was cancelled"
5901 except asyncio.TimeoutError:
5902 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5903 exc = "Timeout"
5904 except Exception as e:
5905 exc = traceback.format_exc()
5906 self.logger.critical(
5907 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5908 exc_info=True,
5909 )
5910 finally:
5911 if exc:
5912 db_nslcmop_update[
5913 "detailed-status"
5914 ] = (
5915 detailed_status
5916 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5917 nslcmop_operation_state = "FAILED"
5918 db_nsr_update["operational-status"] = old_operational_status
5919 if db_nsr:
5920 self._write_ns_status(
5921 nsr_id=nsr_id,
5922 ns_state=db_nsr["nsState"],
5923 current_operation="IDLE",
5924 current_operation_id=None,
5925 other_update=db_nsr_update,
5926 )
5927
5928 self._write_op_status(
5929 op_id=nslcmop_id,
5930 stage="",
5931 error_message=error_description_nslcmop,
5932 operation_state=nslcmop_operation_state,
5933 other_update=db_nslcmop_update,
5934 )
5935
5936 if nslcmop_operation_state:
5937 try:
5938 msg = {
5939 "nsr_id": nsr_id,
5940 "nslcmop_id": nslcmop_id,
5941 "operationState": nslcmop_operation_state,
5942 }
5943 if change_type in ("vnf_terminated", "policy_updated"):
5944 msg.update({"vnf_member_index": member_vnf_index})
5945 await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
5946 except Exception as e:
5947 self.logger.error(
5948 logging_text + "kafka_write notification Exception {}".format(e)
5949 )
5950 self.logger.debug(logging_text + "Exit")
5951 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
5952 return nslcmop_operation_state, detailed_status
5953
5954 async def scale(self, nsr_id, nslcmop_id):
5955 # Try to lock HA task here
5956 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5957 if not task_is_locked_by_me:
5958 return
5959
5960 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5961 stage = ["", "", ""]
5962 tasks_dict_info = {}
5963 # ^ stage, step, VIM progress
5964 self.logger.debug(logging_text + "Enter")
5965 # get all needed from database
5966 db_nsr = None
5967 db_nslcmop_update = {}
5968 db_nsr_update = {}
5969 exc = None
5970 # in case of error, indicates what part of scale was failed to put nsr at error status
5971 scale_process = None
5972 old_operational_status = ""
5973 old_config_status = ""
5974 nsi_id = None
5975 try:
5976 # wait for any previous tasks in process
5977 step = "Waiting for previous operations to terminate"
5978 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5979 self._write_ns_status(
5980 nsr_id=nsr_id,
5981 ns_state=None,
5982 current_operation="SCALING",
5983 current_operation_id=nslcmop_id,
5984 )
5985
5986 step = "Getting nslcmop from database"
5987 self.logger.debug(
5988 step + " after having waited for previous tasks to be completed"
5989 )
5990 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5991
5992 step = "Getting nsr from database"
5993 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5994 old_operational_status = db_nsr["operational-status"]
5995 old_config_status = db_nsr["config-status"]
5996
5997 step = "Parsing scaling parameters"
5998 db_nsr_update["operational-status"] = "scaling"
5999 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6000 nsr_deployed = db_nsr["_admin"].get("deployed")
6001
6002 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
6003 "scaleByStepData"
6004 ]["member-vnf-index"]
6005 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
6006 "scaleByStepData"
6007 ]["scaling-group-descriptor"]
6008 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
6009 # for backward compatibility
6010 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
6011 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
6012 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
6013 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6014
6015 step = "Getting vnfr from database"
6016 db_vnfr = self.db.get_one(
6017 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
6018 )
6019
6020 vca_id = self.get_vca_id(db_vnfr, db_nsr)
6021
6022 step = "Getting vnfd from database"
6023 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
6024
6025 base_folder = db_vnfd["_admin"]["storage"]
6026
6027 step = "Getting scaling-group-descriptor"
6028 scaling_descriptor = find_in_list(
6029 get_scaling_aspect(db_vnfd),
6030 lambda scale_desc: scale_desc["name"] == scaling_group,
6031 )
6032 if not scaling_descriptor:
6033 raise LcmException(
6034 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6035 "at vnfd:scaling-group-descriptor".format(scaling_group)
6036 )
6037
6038 step = "Sending scale order to VIM"
6039 # TODO check if ns is in a proper status
6040 nb_scale_op = 0
6041 if not db_nsr["_admin"].get("scaling-group"):
6042 self.update_db_2(
6043 "nsrs",
6044 nsr_id,
6045 {
6046 "_admin.scaling-group": [
6047 {"name": scaling_group, "nb-scale-op": 0}
6048 ]
6049 },
6050 )
6051 admin_scale_index = 0
6052 else:
6053 for admin_scale_index, admin_scale_info in enumerate(
6054 db_nsr["_admin"]["scaling-group"]
6055 ):
6056 if admin_scale_info["name"] == scaling_group:
6057 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
6058 break
6059 else: # not found, set index one plus last element and add new entry with the name
6060 admin_scale_index += 1
6061 db_nsr_update[
6062 "_admin.scaling-group.{}.name".format(admin_scale_index)
6063 ] = scaling_group
6064
6065 vca_scaling_info = []
6066 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
6067 if scaling_type == "SCALE_OUT":
6068 if "aspect-delta-details" not in scaling_descriptor:
6069 raise LcmException(
6070 "Aspect delta details not fount in scaling descriptor {}".format(
6071 scaling_descriptor["name"]
6072 )
6073 )
6074 # count if max-instance-count is reached
6075 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
6076
6077 scaling_info["scaling_direction"] = "OUT"
6078 scaling_info["vdu-create"] = {}
6079 scaling_info["kdu-create"] = {}
6080 for delta in deltas:
6081 for vdu_delta in delta.get("vdu-delta", {}):
6082 vdud = get_vdu(db_vnfd, vdu_delta["id"])
6083 # vdu_index also provides the number of instance of the targeted vdu
6084 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
6085 cloud_init_text = self._get_vdu_cloud_init_content(
6086 vdud, db_vnfd
6087 )
6088 if cloud_init_text:
6089 additional_params = (
6090 self._get_vdu_additional_params(db_vnfr, vdud["id"])
6091 or {}
6092 )
6093 cloud_init_list = []
6094
6095 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
6096 max_instance_count = 10
6097 if vdu_profile and "max-number-of-instances" in vdu_profile:
6098 max_instance_count = vdu_profile.get(
6099 "max-number-of-instances", 10
6100 )
6101
6102 default_instance_num = get_number_of_instances(
6103 db_vnfd, vdud["id"]
6104 )
6105 instances_number = vdu_delta.get("number-of-instances", 1)
6106 nb_scale_op += instances_number
6107
6108 new_instance_count = nb_scale_op + default_instance_num
6109 # Control if new count is over max and vdu count is less than max.
6110 # Then assign new instance count
6111 if new_instance_count > max_instance_count > vdu_count:
6112 instances_number = new_instance_count - max_instance_count
6113 else:
6114 instances_number = instances_number
6115
6116 if new_instance_count > max_instance_count:
6117 raise LcmException(
6118 "reached the limit of {} (max-instance-count) "
6119 "scaling-out operations for the "
6120 "scaling-group-descriptor '{}'".format(
6121 nb_scale_op, scaling_group
6122 )
6123 )
6124 for x in range(vdu_delta.get("number-of-instances", 1)):
6125 if cloud_init_text:
6126 # TODO Information of its own ip is not available because db_vnfr is not updated.
6127 additional_params["OSM"] = get_osm_params(
6128 db_vnfr, vdu_delta["id"], vdu_index + x
6129 )
6130 cloud_init_list.append(
6131 self._parse_cloud_init(
6132 cloud_init_text,
6133 additional_params,
6134 db_vnfd["id"],
6135 vdud["id"],
6136 )
6137 )
6138 vca_scaling_info.append(
6139 {
6140 "osm_vdu_id": vdu_delta["id"],
6141 "member-vnf-index": vnf_index,
6142 "type": "create",
6143 "vdu_index": vdu_index + x,
6144 }
6145 )
6146 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
6147 for kdu_delta in delta.get("kdu-resource-delta", {}):
6148 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
6149 kdu_name = kdu_profile["kdu-name"]
6150 resource_name = kdu_profile.get("resource-name", "")
6151
6152 # Might have different kdus in the same delta
6153 # Should have list for each kdu
6154 if not scaling_info["kdu-create"].get(kdu_name, None):
6155 scaling_info["kdu-create"][kdu_name] = []
6156
6157 kdur = get_kdur(db_vnfr, kdu_name)
6158 if kdur.get("helm-chart"):
6159 k8s_cluster_type = "helm-chart-v3"
6160 self.logger.debug("kdur: {}".format(kdur))
6161 if (
6162 kdur.get("helm-version")
6163 and kdur.get("helm-version") == "v2"
6164 ):
6165 k8s_cluster_type = "helm-chart"
6166 elif kdur.get("juju-bundle"):
6167 k8s_cluster_type = "juju-bundle"
6168 else:
6169 raise LcmException(
6170 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6171 "juju-bundle. Maybe an old NBI version is running".format(
6172 db_vnfr["member-vnf-index-ref"], kdu_name
6173 )
6174 )
6175
6176 max_instance_count = 10
6177 if kdu_profile and "max-number-of-instances" in kdu_profile:
6178 max_instance_count = kdu_profile.get(
6179 "max-number-of-instances", 10
6180 )
6181
6182 nb_scale_op += kdu_delta.get("number-of-instances", 1)
6183 deployed_kdu, _ = get_deployed_kdu(
6184 nsr_deployed, kdu_name, vnf_index
6185 )
6186 if deployed_kdu is None:
6187 raise LcmException(
6188 "KDU '{}' for vnf '{}' not deployed".format(
6189 kdu_name, vnf_index
6190 )
6191 )
6192 kdu_instance = deployed_kdu.get("kdu-instance")
6193 instance_num = await self.k8scluster_map[
6194 k8s_cluster_type
6195 ].get_scale_count(
6196 resource_name,
6197 kdu_instance,
6198 vca_id=vca_id,
6199 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6200 kdu_model=deployed_kdu.get("kdu-model"),
6201 )
6202 kdu_replica_count = instance_num + kdu_delta.get(
6203 "number-of-instances", 1
6204 )
6205
6206 # Control if new count is over max and instance_num is less than max.
6207 # Then assign max instance number to kdu replica count
6208 if kdu_replica_count > max_instance_count > instance_num:
6209 kdu_replica_count = max_instance_count
6210 if kdu_replica_count > max_instance_count:
6211 raise LcmException(
6212 "reached the limit of {} (max-instance-count) "
6213 "scaling-out operations for the "
6214 "scaling-group-descriptor '{}'".format(
6215 instance_num, scaling_group
6216 )
6217 )
6218
6219 for x in range(kdu_delta.get("number-of-instances", 1)):
6220 vca_scaling_info.append(
6221 {
6222 "osm_kdu_id": kdu_name,
6223 "member-vnf-index": vnf_index,
6224 "type": "create",
6225 "kdu_index": instance_num + x - 1,
6226 }
6227 )
6228 scaling_info["kdu-create"][kdu_name].append(
6229 {
6230 "member-vnf-index": vnf_index,
6231 "type": "create",
6232 "k8s-cluster-type": k8s_cluster_type,
6233 "resource-name": resource_name,
6234 "scale": kdu_replica_count,
6235 }
6236 )
6237 elif scaling_type == "SCALE_IN":
6238 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
6239
6240 scaling_info["scaling_direction"] = "IN"
6241 scaling_info["vdu-delete"] = {}
6242 scaling_info["kdu-delete"] = {}
6243
6244 for delta in deltas:
6245 for vdu_delta in delta.get("vdu-delta", {}):
6246 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
6247 min_instance_count = 0
6248 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
6249 if vdu_profile and "min-number-of-instances" in vdu_profile:
6250 min_instance_count = vdu_profile["min-number-of-instances"]
6251
6252 default_instance_num = get_number_of_instances(
6253 db_vnfd, vdu_delta["id"]
6254 )
6255 instance_num = vdu_delta.get("number-of-instances", 1)
6256 nb_scale_op -= instance_num
6257
6258 new_instance_count = nb_scale_op + default_instance_num
6259
6260 if new_instance_count < min_instance_count < vdu_count:
6261 instances_number = min_instance_count - new_instance_count
6262 else:
6263 instances_number = instance_num
6264
6265 if new_instance_count < min_instance_count:
6266 raise LcmException(
6267 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6268 "scaling-group-descriptor '{}'".format(
6269 nb_scale_op, scaling_group
6270 )
6271 )
6272 for x in range(vdu_delta.get("number-of-instances", 1)):
6273 vca_scaling_info.append(
6274 {
6275 "osm_vdu_id": vdu_delta["id"],
6276 "member-vnf-index": vnf_index,
6277 "type": "delete",
6278 "vdu_index": vdu_index - 1 - x,
6279 }
6280 )
6281 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
6282 for kdu_delta in delta.get("kdu-resource-delta", {}):
6283 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
6284 kdu_name = kdu_profile["kdu-name"]
6285 resource_name = kdu_profile.get("resource-name", "")
6286
6287 if not scaling_info["kdu-delete"].get(kdu_name, None):
6288 scaling_info["kdu-delete"][kdu_name] = []
6289
6290 kdur = get_kdur(db_vnfr, kdu_name)
6291 if kdur.get("helm-chart"):
6292 k8s_cluster_type = "helm-chart-v3"
6293 self.logger.debug("kdur: {}".format(kdur))
6294 if (
6295 kdur.get("helm-version")
6296 and kdur.get("helm-version") == "v2"
6297 ):
6298 k8s_cluster_type = "helm-chart"
6299 elif kdur.get("juju-bundle"):
6300 k8s_cluster_type = "juju-bundle"
6301 else:
6302 raise LcmException(
6303 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6304 "juju-bundle. Maybe an old NBI version is running".format(
6305 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
6306 )
6307 )
6308
6309 min_instance_count = 0
6310 if kdu_profile and "min-number-of-instances" in kdu_profile:
6311 min_instance_count = kdu_profile["min-number-of-instances"]
6312
6313 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
6314 deployed_kdu, _ = get_deployed_kdu(
6315 nsr_deployed, kdu_name, vnf_index
6316 )
6317 if deployed_kdu is None:
6318 raise LcmException(
6319 "KDU '{}' for vnf '{}' not deployed".format(
6320 kdu_name, vnf_index
6321 )
6322 )
6323 kdu_instance = deployed_kdu.get("kdu-instance")
6324 instance_num = await self.k8scluster_map[
6325 k8s_cluster_type
6326 ].get_scale_count(
6327 resource_name,
6328 kdu_instance,
6329 vca_id=vca_id,
6330 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6331 kdu_model=deployed_kdu.get("kdu-model"),
6332 )
6333 kdu_replica_count = instance_num - kdu_delta.get(
6334 "number-of-instances", 1
6335 )
6336
6337 if kdu_replica_count < min_instance_count < instance_num:
6338 kdu_replica_count = min_instance_count
6339 if kdu_replica_count < min_instance_count:
6340 raise LcmException(
6341 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6342 "scaling-group-descriptor '{}'".format(
6343 instance_num, scaling_group
6344 )
6345 )
6346
6347 for x in range(kdu_delta.get("number-of-instances", 1)):
6348 vca_scaling_info.append(
6349 {
6350 "osm_kdu_id": kdu_name,
6351 "member-vnf-index": vnf_index,
6352 "type": "delete",
6353 "kdu_index": instance_num - x - 1,
6354 }
6355 )
6356 scaling_info["kdu-delete"][kdu_name].append(
6357 {
6358 "member-vnf-index": vnf_index,
6359 "type": "delete",
6360 "k8s-cluster-type": k8s_cluster_type,
6361 "resource-name": resource_name,
6362 "scale": kdu_replica_count,
6363 }
6364 )
6365
6366 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6367 vdu_delete = copy(scaling_info.get("vdu-delete"))
6368 if scaling_info["scaling_direction"] == "IN":
6369 for vdur in reversed(db_vnfr["vdur"]):
6370 if vdu_delete.get(vdur["vdu-id-ref"]):
6371 vdu_delete[vdur["vdu-id-ref"]] -= 1
6372 scaling_info["vdu"].append(
6373 {
6374 "name": vdur.get("name") or vdur.get("vdu-name"),
6375 "vdu_id": vdur["vdu-id-ref"],
6376 "interface": [],
6377 }
6378 )
6379 for interface in vdur["interfaces"]:
6380 scaling_info["vdu"][-1]["interface"].append(
6381 {
6382 "name": interface["name"],
6383 "ip_address": interface["ip-address"],
6384 "mac_address": interface.get("mac-address"),
6385 }
6386 )
6387 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6388
6389 # PRE-SCALE BEGIN
6390 step = "Executing pre-scale vnf-config-primitive"
6391 if scaling_descriptor.get("scaling-config-action"):
6392 for scaling_config_action in scaling_descriptor[
6393 "scaling-config-action"
6394 ]:
6395 if (
6396 scaling_config_action.get("trigger") == "pre-scale-in"
6397 and scaling_type == "SCALE_IN"
6398 ) or (
6399 scaling_config_action.get("trigger") == "pre-scale-out"
6400 and scaling_type == "SCALE_OUT"
6401 ):
6402 vnf_config_primitive = scaling_config_action[
6403 "vnf-config-primitive-name-ref"
6404 ]
6405 step = db_nslcmop_update[
6406 "detailed-status"
6407 ] = "executing pre-scale scaling-config-action '{}'".format(
6408 vnf_config_primitive
6409 )
6410
6411 # look for primitive
6412 for config_primitive in (
6413 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6414 ).get("config-primitive", ()):
6415 if config_primitive["name"] == vnf_config_primitive:
6416 break
6417 else:
6418 raise LcmException(
6419 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6420 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6421 "primitive".format(scaling_group, vnf_config_primitive)
6422 )
6423
6424 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6425 if db_vnfr.get("additionalParamsForVnf"):
6426 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6427
6428 scale_process = "VCA"
6429 db_nsr_update["config-status"] = "configuring pre-scaling"
6430 primitive_params = self._map_primitive_params(
6431 config_primitive, {}, vnfr_params
6432 )
6433
6434 # Pre-scale retry check: Check if this sub-operation has been executed before
6435 op_index = self._check_or_add_scale_suboperation(
6436 db_nslcmop,
6437 vnf_index,
6438 vnf_config_primitive,
6439 primitive_params,
6440 "PRE-SCALE",
6441 )
6442 if op_index == self.SUBOPERATION_STATUS_SKIP:
6443 # Skip sub-operation
6444 result = "COMPLETED"
6445 result_detail = "Done"
6446 self.logger.debug(
6447 logging_text
6448 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6449 vnf_config_primitive, result, result_detail
6450 )
6451 )
6452 else:
6453 if op_index == self.SUBOPERATION_STATUS_NEW:
6454 # New sub-operation: Get index of this sub-operation
6455 op_index = (
6456 len(db_nslcmop.get("_admin", {}).get("operations"))
6457 - 1
6458 )
6459 self.logger.debug(
6460 logging_text
6461 + "vnf_config_primitive={} New sub-operation".format(
6462 vnf_config_primitive
6463 )
6464 )
6465 else:
6466 # retry: Get registered params for this existing sub-operation
6467 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6468 op_index
6469 ]
6470 vnf_index = op.get("member_vnf_index")
6471 vnf_config_primitive = op.get("primitive")
6472 primitive_params = op.get("primitive_params")
6473 self.logger.debug(
6474 logging_text
6475 + "vnf_config_primitive={} Sub-operation retry".format(
6476 vnf_config_primitive
6477 )
6478 )
6479 # Execute the primitive, either with new (first-time) or registered (reintent) args
6480 ee_descriptor_id = config_primitive.get(
6481 "execution-environment-ref"
6482 )
6483 primitive_name = config_primitive.get(
6484 "execution-environment-primitive", vnf_config_primitive
6485 )
6486 ee_id, vca_type = self._look_for_deployed_vca(
6487 nsr_deployed["VCA"],
6488 member_vnf_index=vnf_index,
6489 vdu_id=None,
6490 vdu_count_index=None,
6491 ee_descriptor_id=ee_descriptor_id,
6492 )
6493 result, result_detail = await self._ns_execute_primitive(
6494 ee_id,
6495 primitive_name,
6496 primitive_params,
6497 vca_type=vca_type,
6498 vca_id=vca_id,
6499 )
6500 self.logger.debug(
6501 logging_text
6502 + "vnf_config_primitive={} Done with result {} {}".format(
6503 vnf_config_primitive, result, result_detail
6504 )
6505 )
6506 # Update operationState = COMPLETED | FAILED
6507 self._update_suboperation_status(
6508 db_nslcmop, op_index, result, result_detail
6509 )
6510
6511 if result == "FAILED":
6512 raise LcmException(result_detail)
6513 db_nsr_update["config-status"] = old_config_status
6514 scale_process = None
6515 # PRE-SCALE END
6516
6517 db_nsr_update[
6518 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
6519 ] = nb_scale_op
6520 db_nsr_update[
6521 "_admin.scaling-group.{}.time".format(admin_scale_index)
6522 ] = time()
6523
6524 # SCALE-IN VCA - BEGIN
6525 if vca_scaling_info:
6526 step = db_nslcmop_update[
6527 "detailed-status"
6528 ] = "Deleting the execution environments"
6529 scale_process = "VCA"
6530 for vca_info in vca_scaling_info:
6531 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
6532 member_vnf_index = str(vca_info["member-vnf-index"])
6533 self.logger.debug(
6534 logging_text + "vdu info: {}".format(vca_info)
6535 )
6536 if vca_info.get("osm_vdu_id"):
6537 vdu_id = vca_info["osm_vdu_id"]
6538 vdu_index = int(vca_info["vdu_index"])
6539 stage[
6540 1
6541 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6542 member_vnf_index, vdu_id, vdu_index
6543 )
6544 stage[2] = step = "Scaling in VCA"
6545 self._write_op_status(op_id=nslcmop_id, stage=stage)
6546 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
6547 config_update = db_nsr["configurationStatus"]
6548 for vca_index, vca in enumerate(vca_update):
6549 if (
6550 (vca or vca.get("ee_id"))
6551 and vca["member-vnf-index"] == member_vnf_index
6552 and vca["vdu_count_index"] == vdu_index
6553 ):
6554 if vca.get("vdu_id"):
6555 config_descriptor = get_configuration(
6556 db_vnfd, vca.get("vdu_id")
6557 )
6558 elif vca.get("kdu_name"):
6559 config_descriptor = get_configuration(
6560 db_vnfd, vca.get("kdu_name")
6561 )
6562 else:
6563 config_descriptor = get_configuration(
6564 db_vnfd, db_vnfd["id"]
6565 )
6566 operation_params = (
6567 db_nslcmop.get("operationParams") or {}
6568 )
6569 exec_terminate_primitives = not operation_params.get(
6570 "skip_terminate_primitives"
6571 ) and vca.get("needed_terminate")
6572 task = asyncio.ensure_future(
6573 asyncio.wait_for(
6574 self.destroy_N2VC(
6575 logging_text,
6576 db_nslcmop,
6577 vca,
6578 config_descriptor,
6579 vca_index,
6580 destroy_ee=True,
6581 exec_primitives=exec_terminate_primitives,
6582 scaling_in=True,
6583 vca_id=vca_id,
6584 ),
6585 timeout=self.timeout_charm_delete,
6586 )
6587 )
6588 tasks_dict_info[task] = "Terminating VCA {}".format(
6589 vca.get("ee_id")
6590 )
6591 del vca_update[vca_index]
6592 del config_update[vca_index]
6593 # wait for pending tasks of terminate primitives
6594 if tasks_dict_info:
6595 self.logger.debug(
6596 logging_text
6597 + "Waiting for tasks {}".format(
6598 list(tasks_dict_info.keys())
6599 )
6600 )
6601 error_list = await self._wait_for_tasks(
6602 logging_text,
6603 tasks_dict_info,
6604 min(
6605 self.timeout_charm_delete, self.timeout_ns_terminate
6606 ),
6607 stage,
6608 nslcmop_id,
6609 )
6610 tasks_dict_info.clear()
6611 if error_list:
6612 raise LcmException("; ".join(error_list))
6613
6614 db_vca_and_config_update = {
6615 "_admin.deployed.VCA": vca_update,
6616 "configurationStatus": config_update,
6617 }
6618 self.update_db_2(
6619 "nsrs", db_nsr["_id"], db_vca_and_config_update
6620 )
6621 scale_process = None
6622 # SCALE-IN VCA - END
6623
6624 # SCALE RO - BEGIN
6625 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
6626 scale_process = "RO"
6627 if self.ro_config.get("ng"):
6628 await self._scale_ng_ro(
6629 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
6630 )
6631 scaling_info.pop("vdu-create", None)
6632 scaling_info.pop("vdu-delete", None)
6633
6634 scale_process = None
6635 # SCALE RO - END
6636
6637 # SCALE KDU - BEGIN
6638 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
6639 scale_process = "KDU"
6640 await self._scale_kdu(
6641 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6642 )
6643 scaling_info.pop("kdu-create", None)
6644 scaling_info.pop("kdu-delete", None)
6645
6646 scale_process = None
6647 # SCALE KDU - END
6648
6649 if db_nsr_update:
6650 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6651
6652 # SCALE-UP VCA - BEGIN
6653 if vca_scaling_info:
6654 step = db_nslcmop_update[
6655 "detailed-status"
6656 ] = "Creating new execution environments"
6657 scale_process = "VCA"
6658 for vca_info in vca_scaling_info:
6659 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
6660 member_vnf_index = str(vca_info["member-vnf-index"])
6661 self.logger.debug(
6662 logging_text + "vdu info: {}".format(vca_info)
6663 )
6664 vnfd_id = db_vnfr["vnfd-ref"]
6665 if vca_info.get("osm_vdu_id"):
6666 vdu_index = int(vca_info["vdu_index"])
6667 deploy_params = {"OSM": get_osm_params(db_vnfr)}
6668 if db_vnfr.get("additionalParamsForVnf"):
6669 deploy_params.update(
6670 parse_yaml_strings(
6671 db_vnfr["additionalParamsForVnf"].copy()
6672 )
6673 )
6674 descriptor_config = get_configuration(
6675 db_vnfd, db_vnfd["id"]
6676 )
6677 if descriptor_config:
6678 vdu_id = None
6679 vdu_name = None
6680 kdu_name = None
6681 self._deploy_n2vc(
6682 logging_text=logging_text
6683 + "member_vnf_index={} ".format(member_vnf_index),
6684 db_nsr=db_nsr,
6685 db_vnfr=db_vnfr,
6686 nslcmop_id=nslcmop_id,
6687 nsr_id=nsr_id,
6688 nsi_id=nsi_id,
6689 vnfd_id=vnfd_id,
6690 vdu_id=vdu_id,
6691 kdu_name=kdu_name,
6692 member_vnf_index=member_vnf_index,
6693 vdu_index=vdu_index,
6694 vdu_name=vdu_name,
6695 deploy_params=deploy_params,
6696 descriptor_config=descriptor_config,
6697 base_folder=base_folder,
6698 task_instantiation_info=tasks_dict_info,
6699 stage=stage,
6700 )
6701 vdu_id = vca_info["osm_vdu_id"]
6702 vdur = find_in_list(
6703 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6704 )
6705 descriptor_config = get_configuration(db_vnfd, vdu_id)
6706 if vdur.get("additionalParams"):
6707 deploy_params_vdu = parse_yaml_strings(
6708 vdur["additionalParams"]
6709 )
6710 else:
6711 deploy_params_vdu = deploy_params
6712 deploy_params_vdu["OSM"] = get_osm_params(
6713 db_vnfr, vdu_id, vdu_count_index=vdu_index
6714 )
6715 if descriptor_config:
6716 vdu_name = None
6717 kdu_name = None
6718 stage[
6719 1
6720 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6721 member_vnf_index, vdu_id, vdu_index
6722 )
6723 stage[2] = step = "Scaling out VCA"
6724 self._write_op_status(op_id=nslcmop_id, stage=stage)
6725 self._deploy_n2vc(
6726 logging_text=logging_text
6727 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6728 member_vnf_index, vdu_id, vdu_index
6729 ),
6730 db_nsr=db_nsr,
6731 db_vnfr=db_vnfr,
6732 nslcmop_id=nslcmop_id,
6733 nsr_id=nsr_id,
6734 nsi_id=nsi_id,
6735 vnfd_id=vnfd_id,
6736 vdu_id=vdu_id,
6737 kdu_name=kdu_name,
6738 member_vnf_index=member_vnf_index,
6739 vdu_index=vdu_index,
6740 vdu_name=vdu_name,
6741 deploy_params=deploy_params_vdu,
6742 descriptor_config=descriptor_config,
6743 base_folder=base_folder,
6744 task_instantiation_info=tasks_dict_info,
6745 stage=stage,
6746 )
6747 # SCALE-UP VCA - END
6748 scale_process = None
6749
6750 # POST-SCALE BEGIN
6751 # execute primitive service POST-SCALING
6752 step = "Executing post-scale vnf-config-primitive"
6753 if scaling_descriptor.get("scaling-config-action"):
6754 for scaling_config_action in scaling_descriptor[
6755 "scaling-config-action"
6756 ]:
6757 if (
6758 scaling_config_action.get("trigger") == "post-scale-in"
6759 and scaling_type == "SCALE_IN"
6760 ) or (
6761 scaling_config_action.get("trigger") == "post-scale-out"
6762 and scaling_type == "SCALE_OUT"
6763 ):
6764 vnf_config_primitive = scaling_config_action[
6765 "vnf-config-primitive-name-ref"
6766 ]
6767 step = db_nslcmop_update[
6768 "detailed-status"
6769 ] = "executing post-scale scaling-config-action '{}'".format(
6770 vnf_config_primitive
6771 )
6772
6773 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6774 if db_vnfr.get("additionalParamsForVnf"):
6775 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6776
6777 # look for primitive
6778 for config_primitive in (
6779 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6780 ).get("config-primitive", ()):
6781 if config_primitive["name"] == vnf_config_primitive:
6782 break
6783 else:
6784 raise LcmException(
6785 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6786 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6787 "config-primitive".format(
6788 scaling_group, vnf_config_primitive
6789 )
6790 )
6791 scale_process = "VCA"
6792 db_nsr_update["config-status"] = "configuring post-scaling"
6793 primitive_params = self._map_primitive_params(
6794 config_primitive, {}, vnfr_params
6795 )
6796
6797 # Post-scale retry check: Check if this sub-operation has been executed before
6798 op_index = self._check_or_add_scale_suboperation(
6799 db_nslcmop,
6800 vnf_index,
6801 vnf_config_primitive,
6802 primitive_params,
6803 "POST-SCALE",
6804 )
6805 if op_index == self.SUBOPERATION_STATUS_SKIP:
6806 # Skip sub-operation
6807 result = "COMPLETED"
6808 result_detail = "Done"
6809 self.logger.debug(
6810 logging_text
6811 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6812 vnf_config_primitive, result, result_detail
6813 )
6814 )
6815 else:
6816 if op_index == self.SUBOPERATION_STATUS_NEW:
6817 # New sub-operation: Get index of this sub-operation
6818 op_index = (
6819 len(db_nslcmop.get("_admin", {}).get("operations"))
6820 - 1
6821 )
6822 self.logger.debug(
6823 logging_text
6824 + "vnf_config_primitive={} New sub-operation".format(
6825 vnf_config_primitive
6826 )
6827 )
6828 else:
6829 # retry: Get registered params for this existing sub-operation
6830 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6831 op_index
6832 ]
6833 vnf_index = op.get("member_vnf_index")
6834 vnf_config_primitive = op.get("primitive")
6835 primitive_params = op.get("primitive_params")
6836 self.logger.debug(
6837 logging_text
6838 + "vnf_config_primitive={} Sub-operation retry".format(
6839 vnf_config_primitive
6840 )
6841 )
6842 # Execute the primitive, either with new (first-time) or registered (reintent) args
6843 ee_descriptor_id = config_primitive.get(
6844 "execution-environment-ref"
6845 )
6846 primitive_name = config_primitive.get(
6847 "execution-environment-primitive", vnf_config_primitive
6848 )
6849 ee_id, vca_type = self._look_for_deployed_vca(
6850 nsr_deployed["VCA"],
6851 member_vnf_index=vnf_index,
6852 vdu_id=None,
6853 vdu_count_index=None,
6854 ee_descriptor_id=ee_descriptor_id,
6855 )
6856 result, result_detail = await self._ns_execute_primitive(
6857 ee_id,
6858 primitive_name,
6859 primitive_params,
6860 vca_type=vca_type,
6861 vca_id=vca_id,
6862 )
6863 self.logger.debug(
6864 logging_text
6865 + "vnf_config_primitive={} Done with result {} {}".format(
6866 vnf_config_primitive, result, result_detail
6867 )
6868 )
6869 # Update operationState = COMPLETED | FAILED
6870 self._update_suboperation_status(
6871 db_nslcmop, op_index, result, result_detail
6872 )
6873
6874 if result == "FAILED":
6875 raise LcmException(result_detail)
6876 db_nsr_update["config-status"] = old_config_status
6877 scale_process = None
6878 # POST-SCALE END
6879
6880 db_nsr_update[
6881 "detailed-status"
6882 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6883 db_nsr_update["operational-status"] = (
6884 "running"
6885 if old_operational_status == "failed"
6886 else old_operational_status
6887 )
6888 db_nsr_update["config-status"] = old_config_status
6889 return
6890 except (
6891 ROclient.ROClientException,
6892 DbException,
6893 LcmException,
6894 NgRoException,
6895 ) as e:
6896 self.logger.error(logging_text + "Exit Exception {}".format(e))
6897 exc = e
6898 except asyncio.CancelledError:
6899 self.logger.error(
6900 logging_text + "Cancelled Exception while '{}'".format(step)
6901 )
6902 exc = "Operation was cancelled"
6903 except Exception as e:
6904 exc = traceback.format_exc()
6905 self.logger.critical(
6906 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6907 exc_info=True,
6908 )
6909 finally:
6910 self._write_ns_status(
6911 nsr_id=nsr_id,
6912 ns_state=None,
6913 current_operation="IDLE",
6914 current_operation_id=None,
6915 )
6916 if tasks_dict_info:
6917 stage[1] = "Waiting for instantiate pending tasks."
6918 self.logger.debug(logging_text + stage[1])
6919 exc = await self._wait_for_tasks(
6920 logging_text,
6921 tasks_dict_info,
6922 self.timeout_ns_deploy,
6923 stage,
6924 nslcmop_id,
6925 nsr_id=nsr_id,
6926 )
6927 if exc:
6928 db_nslcmop_update[
6929 "detailed-status"
6930 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6931 nslcmop_operation_state = "FAILED"
6932 if db_nsr:
6933 db_nsr_update["operational-status"] = old_operational_status
6934 db_nsr_update["config-status"] = old_config_status
6935 db_nsr_update["detailed-status"] = ""
6936 if scale_process:
6937 if "VCA" in scale_process:
6938 db_nsr_update["config-status"] = "failed"
6939 if "RO" in scale_process:
6940 db_nsr_update["operational-status"] = "failed"
6941 db_nsr_update[
6942 "detailed-status"
6943 ] = "FAILED scaling nslcmop={} {}: {}".format(
6944 nslcmop_id, step, exc
6945 )
6946 else:
6947 error_description_nslcmop = None
6948 nslcmop_operation_state = "COMPLETED"
6949 db_nslcmop_update["detailed-status"] = "Done"
6950
6951 self._write_op_status(
6952 op_id=nslcmop_id,
6953 stage="",
6954 error_message=error_description_nslcmop,
6955 operation_state=nslcmop_operation_state,
6956 other_update=db_nslcmop_update,
6957 )
6958 if db_nsr:
6959 self._write_ns_status(
6960 nsr_id=nsr_id,
6961 ns_state=None,
6962 current_operation="IDLE",
6963 current_operation_id=None,
6964 other_update=db_nsr_update,
6965 )
6966
6967 if nslcmop_operation_state:
6968 try:
6969 msg = {
6970 "nsr_id": nsr_id,
6971 "nslcmop_id": nslcmop_id,
6972 "operationState": nslcmop_operation_state,
6973 }
6974 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6975 except Exception as e:
6976 self.logger.error(
6977 logging_text + "kafka_write notification Exception {}".format(e)
6978 )
6979 self.logger.debug(logging_text + "Exit")
6980 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6981
6982 async def _scale_kdu(
6983 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6984 ):
6985 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6986 for kdu_name in _scaling_info:
6987 for kdu_scaling_info in _scaling_info[kdu_name]:
6988 deployed_kdu, index = get_deployed_kdu(
6989 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6990 )
6991 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6992 kdu_instance = deployed_kdu["kdu-instance"]
6993 kdu_model = deployed_kdu.get("kdu-model")
6994 scale = int(kdu_scaling_info["scale"])
6995 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6996
6997 db_dict = {
6998 "collection": "nsrs",
6999 "filter": {"_id": nsr_id},
7000 "path": "_admin.deployed.K8s.{}".format(index),
7001 }
7002
7003 step = "scaling application {}".format(
7004 kdu_scaling_info["resource-name"]
7005 )
7006 self.logger.debug(logging_text + step)
7007
7008 if kdu_scaling_info["type"] == "delete":
7009 kdu_config = get_configuration(db_vnfd, kdu_name)
7010 if (
7011 kdu_config
7012 and kdu_config.get("terminate-config-primitive")
7013 and get_juju_ee_ref(db_vnfd, kdu_name) is None
7014 ):
7015 terminate_config_primitive_list = kdu_config.get(
7016 "terminate-config-primitive"
7017 )
7018 terminate_config_primitive_list.sort(
7019 key=lambda val: int(val["seq"])
7020 )
7021
7022 for (
7023 terminate_config_primitive
7024 ) in terminate_config_primitive_list:
7025 primitive_params_ = self._map_primitive_params(
7026 terminate_config_primitive, {}, {}
7027 )
7028 step = "execute terminate config primitive"
7029 self.logger.debug(logging_text + step)
7030 await asyncio.wait_for(
7031 self.k8scluster_map[k8s_cluster_type].exec_primitive(
7032 cluster_uuid=cluster_uuid,
7033 kdu_instance=kdu_instance,
7034 primitive_name=terminate_config_primitive["name"],
7035 params=primitive_params_,
7036 db_dict=db_dict,
7037 vca_id=vca_id,
7038 ),
7039 timeout=600,
7040 )
7041
7042 await asyncio.wait_for(
7043 self.k8scluster_map[k8s_cluster_type].scale(
7044 kdu_instance,
7045 scale,
7046 kdu_scaling_info["resource-name"],
7047 vca_id=vca_id,
7048 cluster_uuid=cluster_uuid,
7049 kdu_model=kdu_model,
7050 atomic=True,
7051 db_dict=db_dict,
7052 ),
7053 timeout=self.timeout_vca_on_error,
7054 )
7055
7056 if kdu_scaling_info["type"] == "create":
7057 kdu_config = get_configuration(db_vnfd, kdu_name)
7058 if (
7059 kdu_config
7060 and kdu_config.get("initial-config-primitive")
7061 and get_juju_ee_ref(db_vnfd, kdu_name) is None
7062 ):
7063 initial_config_primitive_list = kdu_config.get(
7064 "initial-config-primitive"
7065 )
7066 initial_config_primitive_list.sort(
7067 key=lambda val: int(val["seq"])
7068 )
7069
7070 for initial_config_primitive in initial_config_primitive_list:
7071 primitive_params_ = self._map_primitive_params(
7072 initial_config_primitive, {}, {}
7073 )
7074 step = "execute initial config primitive"
7075 self.logger.debug(logging_text + step)
7076 await asyncio.wait_for(
7077 self.k8scluster_map[k8s_cluster_type].exec_primitive(
7078 cluster_uuid=cluster_uuid,
7079 kdu_instance=kdu_instance,
7080 primitive_name=initial_config_primitive["name"],
7081 params=primitive_params_,
7082 db_dict=db_dict,
7083 vca_id=vca_id,
7084 ),
7085 timeout=600,
7086 )
7087
7088 async def _scale_ng_ro(
7089 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
7090 ):
7091 nsr_id = db_nslcmop["nsInstanceId"]
7092 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
7093 db_vnfrs = {}
7094
7095 # read from db: vnfd's for every vnf
7096 db_vnfds = []
7097
7098 # for each vnf in ns, read vnfd
7099 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
7100 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
7101 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
7102 # if we haven't this vnfd, read it from db
7103 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
7104 # read from db
7105 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
7106 db_vnfds.append(vnfd)
7107 n2vc_key = self.n2vc.get_public_key()
7108 n2vc_key_list = [n2vc_key]
7109 self.scale_vnfr(
7110 db_vnfr,
7111 vdu_scaling_info.get("vdu-create"),
7112 vdu_scaling_info.get("vdu-delete"),
7113 mark_delete=True,
7114 )
7115 # db_vnfr has been updated, update db_vnfrs to use it
7116 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
7117 await self._instantiate_ng_ro(
7118 logging_text,
7119 nsr_id,
7120 db_nsd,
7121 db_nsr,
7122 db_nslcmop,
7123 db_vnfrs,
7124 db_vnfds,
7125 n2vc_key_list,
7126 stage=stage,
7127 start_deploy=time(),
7128 timeout_ns_deploy=self.timeout_ns_deploy,
7129 )
7130 if vdu_scaling_info.get("vdu-delete"):
7131 self.scale_vnfr(
7132 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
7133 )
7134
7135 async def extract_prometheus_scrape_jobs(
7136 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
7137 ):
7138 # look if exist a file called 'prometheus*.j2' and
7139 artifact_content = self.fs.dir_ls(artifact_path)
7140 job_file = next(
7141 (
7142 f
7143 for f in artifact_content
7144 if f.startswith("prometheus") and f.endswith(".j2")
7145 ),
7146 None,
7147 )
7148 if not job_file:
7149 return
7150 with self.fs.file_open((artifact_path, job_file), "r") as f:
7151 job_data = f.read()
7152
7153 # TODO get_service
7154 _, _, service = ee_id.partition(".") # remove prefix "namespace."
7155 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
7156 host_port = "80"
7157 vnfr_id = vnfr_id.replace("-", "")
7158 variables = {
7159 "JOB_NAME": vnfr_id,
7160 "TARGET_IP": target_ip,
7161 "EXPORTER_POD_IP": host_name,
7162 "EXPORTER_POD_PORT": host_port,
7163 }
7164 job_list = parse_job(job_data, variables)
7165 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7166 for job in job_list:
7167 if (
7168 not isinstance(job.get("job_name"), str)
7169 or vnfr_id not in job["job_name"]
7170 ):
7171 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
7172 job["nsr_id"] = nsr_id
7173 job["vnfr_id"] = vnfr_id
7174 return job_list
7175
7176 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7177 """
7178 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7179
7180 :param: vim_account_id: VIM Account ID
7181
7182 :return: (cloud_name, cloud_credential)
7183 """
7184 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7185 return config.get("vca_cloud"), config.get("vca_cloud_credential")
7186
7187 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7188 """
7189 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7190
7191 :param: vim_account_id: VIM Account ID
7192
7193 :return: (cloud_name, cloud_credential)
7194 """
7195 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7196 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
7197
7198 async def migrate(self, nsr_id, nslcmop_id):
7199 """
7200 Migrate VNFs and VDUs instances in a NS
7201
7202 :param: nsr_id: NS Instance ID
7203 :param: nslcmop_id: nslcmop ID of migrate
7204
7205 """
7206 # Try to lock HA task here
7207 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
7208 if not task_is_locked_by_me:
7209 return
7210 logging_text = "Task ns={} migrate ".format(nsr_id)
7211 self.logger.debug(logging_text + "Enter")
7212 # get all needed from database
7213 db_nslcmop = None
7214 db_nslcmop_update = {}
7215 nslcmop_operation_state = None
7216 db_nsr_update = {}
7217 target = {}
7218 exc = None
7219 # in case of error, indicates what part of scale was failed to put nsr at error status
7220 start_deploy = time()
7221
7222 try:
7223 # wait for any previous tasks in process
7224 step = "Waiting for previous operations to terminate"
7225 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
7226
7227 self._write_ns_status(
7228 nsr_id=nsr_id,
7229 ns_state=None,
7230 current_operation="MIGRATING",
7231 current_operation_id=nslcmop_id
7232 )
7233 step = "Getting nslcmop from database"
7234 self.logger.debug(step + " after having waited for previous tasks to be completed")
7235 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
7236 migrate_params = db_nslcmop.get("operationParams")
7237
7238 target = {}
7239 target.update(migrate_params)
7240 desc = await self.RO.migrate(nsr_id, target)
7241 self.logger.debug("RO return > {}".format(desc))
7242 action_id = desc["action_id"]
7243 await self._wait_ng_ro(
7244 nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
7245 )
7246 except (ROclient.ROClientException, DbException, LcmException) as e:
7247 self.logger.error("Exit Exception {}".format(e))
7248 exc = e
7249 except asyncio.CancelledError:
7250 self.logger.error("Cancelled Exception while '{}'".format(step))
7251 exc = "Operation was cancelled"
7252 except Exception as e:
7253 exc = traceback.format_exc()
7254 self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
7255 finally:
7256 self._write_ns_status(
7257 nsr_id=nsr_id,
7258 ns_state=None,
7259 current_operation="IDLE",
7260 current_operation_id=None,
7261 )
7262 if exc:
7263 db_nslcmop_update[
7264 "detailed-status"
7265 ] = "FAILED {}: {}".format(step, exc)
7266 nslcmop_operation_state = "FAILED"
7267 else:
7268 nslcmop_operation_state = "COMPLETED"
7269 db_nslcmop_update["detailed-status"] = "Done"
7270 db_nsr_update["detailed-status"] = "Done"
7271
7272 self._write_op_status(
7273 op_id=nslcmop_id,
7274 stage="",
7275 error_message="",
7276 operation_state=nslcmop_operation_state,
7277 other_update=db_nslcmop_update,
7278 )
7279 if nslcmop_operation_state:
7280 try:
7281 msg = {
7282 "nsr_id": nsr_id,
7283 "nslcmop_id": nslcmop_id,
7284 "operationState": nslcmop_operation_state,
7285 }
7286 await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
7287 except Exception as e:
7288 self.logger.error(
7289 logging_text + "kafka_write notification Exception {}".format(e)
7290 )
7291 self.logger.debug(logging_text + "Exit")
7292 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")