Fix 1716: get nsr vca id from vim account
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :return: none
339 """
340
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
343
344 try:
345 nsr_id = filter.get("_id")
346
347 # get vca status for NS
348 vca_status = await self.k8sclusterjuju.status_kdu(
349 cluster_uuid,
350 kdu_instance,
351 complete_status=True,
352 yaml_format=False,
353 vca_id=vca_id,
354 )
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 await self.k8sclusterjuju.update_vca_status(
360 db_dict["vcaStatus"],
361 kdu_instance,
362 vca_id=vca_id,
363 )
364
365 # write to database
366 self.update_db_2("nsrs", nsr_id, db_dict)
367
368 except (asyncio.CancelledError, asyncio.TimeoutError):
369 raise
370 except Exception as e:
371 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
372
373 @staticmethod
374 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
375 try:
376 env = Environment(undefined=StrictUndefined)
377 template = env.from_string(cloud_init_text)
378 return template.render(additional_params or {})
379 except UndefinedError as e:
380 raise LcmException(
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
384 )
385 except (TemplateError, TemplateNotFound) as e:
386 raise LcmException(
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
388 vnfd_id, vdu_id, e
389 )
390 )
391
392 def _get_vdu_cloud_init_content(self, vdu, vnfd):
393 cloud_init_content = cloud_init_file = None
394 try:
395 if vdu.get("cloud-init-file"):
396 base_folder = vnfd["_admin"]["storage"]
397 cloud_init_file = "{}/{}/cloud_init/{}".format(
398 base_folder["folder"],
399 base_folder["pkg-dir"],
400 vdu["cloud-init-file"],
401 )
402 with self.fs.file_open(cloud_init_file, "r") as ci_file:
403 cloud_init_content = ci_file.read()
404 elif vdu.get("cloud-init"):
405 cloud_init_content = vdu["cloud-init"]
406
407 return cloud_init_content
408 except FsException as e:
409 raise LcmException(
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd["id"], vdu["id"], cloud_init_file, e
412 )
413 )
414
415 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
416 vdur = next(
417 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
418 )
419 additional_params = vdur.get("additionalParams")
420 return parse_yaml_strings(additional_params)
421
422 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
423 """
424 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
425 :param vnfd: input vnfd
426 :param new_id: overrides vnf id if provided
427 :param additionalParams: Instantiation params for VNFs provided
428 :param nsrId: Id of the NSR
429 :return: copy of vnfd
430 """
431 vnfd_RO = deepcopy(vnfd)
432 # remove unused by RO configuration, monitoring, scaling and internal keys
433 vnfd_RO.pop("_id", None)
434 vnfd_RO.pop("_admin", None)
435 vnfd_RO.pop("monitoring-param", None)
436 vnfd_RO.pop("scaling-group-descriptor", None)
437 vnfd_RO.pop("kdu", None)
438 vnfd_RO.pop("k8s-cluster", None)
439 if new_id:
440 vnfd_RO["id"] = new_id
441
442 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
443 for vdu in get_iterable(vnfd_RO, "vdu"):
444 vdu.pop("cloud-init-file", None)
445 vdu.pop("cloud-init", None)
446 return vnfd_RO
447
448 @staticmethod
449 def ip_profile_2_RO(ip_profile):
450 RO_ip_profile = deepcopy(ip_profile)
451 if "dns-server" in RO_ip_profile:
452 if isinstance(RO_ip_profile["dns-server"], list):
453 RO_ip_profile["dns-address"] = []
454 for ds in RO_ip_profile.pop("dns-server"):
455 RO_ip_profile["dns-address"].append(ds["address"])
456 else:
457 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
458 if RO_ip_profile.get("ip-version") == "ipv4":
459 RO_ip_profile["ip-version"] = "IPv4"
460 if RO_ip_profile.get("ip-version") == "ipv6":
461 RO_ip_profile["ip-version"] = "IPv6"
462 if "dhcp-params" in RO_ip_profile:
463 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
464 return RO_ip_profile
465
466 def _get_ro_vim_id_for_vim_account(self, vim_account):
467 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
468 if db_vim["_admin"]["operationalState"] != "ENABLED":
469 raise LcmException(
470 "VIM={} is not available. operationalState={}".format(
471 vim_account, db_vim["_admin"]["operationalState"]
472 )
473 )
474 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
475 return RO_vim_id
476
477 def get_ro_wim_id_for_wim_account(self, wim_account):
478 if isinstance(wim_account, str):
479 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
480 if db_wim["_admin"]["operationalState"] != "ENABLED":
481 raise LcmException(
482 "WIM={} is not available. operationalState={}".format(
483 wim_account, db_wim["_admin"]["operationalState"]
484 )
485 )
486 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
487 return RO_wim_id
488 else:
489 return wim_account
490
491 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
492
493 db_vdu_push_list = []
494 db_update = {"_admin.modified": time()}
495 if vdu_create:
496 for vdu_id, vdu_count in vdu_create.items():
497 vdur = next(
498 (
499 vdur
500 for vdur in reversed(db_vnfr["vdur"])
501 if vdur["vdu-id-ref"] == vdu_id
502 ),
503 None,
504 )
505 if not vdur:
506 raise LcmException(
507 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
508 vdu_id
509 )
510 )
511
512 for count in range(vdu_count):
513 vdur_copy = deepcopy(vdur)
514 vdur_copy["status"] = "BUILD"
515 vdur_copy["status-detailed"] = None
516 vdur_copy["ip-address"]: None
517 vdur_copy["_id"] = str(uuid4())
518 vdur_copy["count-index"] += count + 1
519 vdur_copy["id"] = "{}-{}".format(
520 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
521 )
522 vdur_copy.pop("vim_info", None)
523 for iface in vdur_copy["interfaces"]:
524 if iface.get("fixed-ip"):
525 iface["ip-address"] = self.increment_ip_mac(
526 iface["ip-address"], count + 1
527 )
528 else:
529 iface.pop("ip-address", None)
530 if iface.get("fixed-mac"):
531 iface["mac-address"] = self.increment_ip_mac(
532 iface["mac-address"], count + 1
533 )
534 else:
535 iface.pop("mac-address", None)
536 iface.pop(
537 "mgmt_vnf", None
538 ) # only first vdu can be managment of vnf
539 db_vdu_push_list.append(vdur_copy)
540 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
541 if vdu_delete:
542 for vdu_id, vdu_count in vdu_delete.items():
543 if mark_delete:
544 indexes_to_delete = [
545 iv[0]
546 for iv in enumerate(db_vnfr["vdur"])
547 if iv[1]["vdu-id-ref"] == vdu_id
548 ]
549 db_update.update(
550 {
551 "vdur.{}.status".format(i): "DELETING"
552 for i in indexes_to_delete[-vdu_count:]
553 }
554 )
555 else:
556 # it must be deleted one by one because common.db does not allow otherwise
557 vdus_to_delete = [
558 v
559 for v in reversed(db_vnfr["vdur"])
560 if v["vdu-id-ref"] == vdu_id
561 ]
562 for vdu in vdus_to_delete[:vdu_count]:
563 self.db.set_one(
564 "vnfrs",
565 {"_id": db_vnfr["_id"]},
566 None,
567 pull={"vdur": {"_id": vdu["_id"]}},
568 )
569 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
570 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
571 # modify passed dictionary db_vnfr
572 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
573 db_vnfr["vdur"] = db_vnfr_["vdur"]
574
575 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
576 """
577 Updates database nsr with the RO info for the created vld
578 :param ns_update_nsr: dictionary to be filled with the updated info
579 :param db_nsr: content of db_nsr. This is also modified
580 :param nsr_desc_RO: nsr descriptor from RO
581 :return: Nothing, LcmException is raised on errors
582 """
583
584 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
585 for net_RO in get_iterable(nsr_desc_RO, "nets"):
586 if vld["id"] != net_RO.get("ns_net_osm_id"):
587 continue
588 vld["vim-id"] = net_RO.get("vim_net_id")
589 vld["name"] = net_RO.get("vim_name")
590 vld["status"] = net_RO.get("status")
591 vld["status-detailed"] = net_RO.get("error_msg")
592 ns_update_nsr["vld.{}".format(vld_index)] = vld
593 break
594 else:
595 raise LcmException(
596 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
597 )
598
599 def set_vnfr_at_error(self, db_vnfrs, error_text):
600 try:
601 for db_vnfr in db_vnfrs.values():
602 vnfr_update = {"status": "ERROR"}
603 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
604 if "status" not in vdur:
605 vdur["status"] = "ERROR"
606 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
607 if error_text:
608 vdur["status-detailed"] = str(error_text)
609 vnfr_update[
610 "vdur.{}.status-detailed".format(vdu_index)
611 ] = "ERROR"
612 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
613 except DbException as e:
614 self.logger.error("Cannot update vnf. {}".format(e))
615
616 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
617 """
618 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
619 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
620 :param nsr_desc_RO: nsr descriptor from RO
621 :return: Nothing, LcmException is raised on errors
622 """
623 for vnf_index, db_vnfr in db_vnfrs.items():
624 for vnf_RO in nsr_desc_RO["vnfs"]:
625 if vnf_RO["member_vnf_index"] != vnf_index:
626 continue
627 vnfr_update = {}
628 if vnf_RO.get("ip_address"):
629 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
630 "ip_address"
631 ].split(";")[0]
632 elif not db_vnfr.get("ip-address"):
633 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
634 raise LcmExceptionNoMgmtIP(
635 "ns member_vnf_index '{}' has no IP address".format(
636 vnf_index
637 )
638 )
639
640 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
641 vdur_RO_count_index = 0
642 if vdur.get("pdu-type"):
643 continue
644 for vdur_RO in get_iterable(vnf_RO, "vms"):
645 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
646 continue
647 if vdur["count-index"] != vdur_RO_count_index:
648 vdur_RO_count_index += 1
649 continue
650 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
651 if vdur_RO.get("ip_address"):
652 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
653 else:
654 vdur["ip-address"] = None
655 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
656 vdur["name"] = vdur_RO.get("vim_name")
657 vdur["status"] = vdur_RO.get("status")
658 vdur["status-detailed"] = vdur_RO.get("error_msg")
659 for ifacer in get_iterable(vdur, "interfaces"):
660 for interface_RO in get_iterable(vdur_RO, "interfaces"):
661 if ifacer["name"] == interface_RO.get("internal_name"):
662 ifacer["ip-address"] = interface_RO.get(
663 "ip_address"
664 )
665 ifacer["mac-address"] = interface_RO.get(
666 "mac_address"
667 )
668 break
669 else:
670 raise LcmException(
671 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info".format(
673 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
674 )
675 )
676 vnfr_update["vdur.{}".format(vdu_index)] = vdur
677 break
678 else:
679 raise LcmException(
680 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
681 "VIM info".format(
682 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
683 )
684 )
685
686 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
687 for net_RO in get_iterable(nsr_desc_RO, "nets"):
688 if vld["id"] != net_RO.get("vnf_net_osm_id"):
689 continue
690 vld["vim-id"] = net_RO.get("vim_net_id")
691 vld["name"] = net_RO.get("vim_name")
692 vld["status"] = net_RO.get("status")
693 vld["status-detailed"] = net_RO.get("error_msg")
694 vnfr_update["vld.{}".format(vld_index)] = vld
695 break
696 else:
697 raise LcmException(
698 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
699 vnf_index, vld["id"]
700 )
701 )
702
703 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
704 break
705
706 else:
707 raise LcmException(
708 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
709 vnf_index
710 )
711 )
712
713 def _get_ns_config_info(self, nsr_id):
714 """
715 Generates a mapping between vnf,vdu elements and the N2VC id
716 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
717 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
718 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
719 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
720 """
721 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
722 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
723 mapping = {}
724 ns_config_info = {"osm-config-mapping": mapping}
725 for vca in vca_deployed_list:
726 if not vca["member-vnf-index"]:
727 continue
728 if not vca["vdu_id"]:
729 mapping[vca["member-vnf-index"]] = vca["application"]
730 else:
731 mapping[
732 "{}.{}.{}".format(
733 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
734 )
735 ] = vca["application"]
736 return ns_config_info
737
738 async def _instantiate_ng_ro(
739 self,
740 logging_text,
741 nsr_id,
742 nsd,
743 db_nsr,
744 db_nslcmop,
745 db_vnfrs,
746 db_vnfds,
747 n2vc_key_list,
748 stage,
749 start_deploy,
750 timeout_ns_deploy,
751 ):
752
753 db_vims = {}
754
755 def get_vim_account(vim_account_id):
756 nonlocal db_vims
757 if vim_account_id in db_vims:
758 return db_vims[vim_account_id]
759 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
760 db_vims[vim_account_id] = db_vim
761 return db_vim
762
763 # modify target_vld info with instantiation parameters
764 def parse_vld_instantiation_params(
765 target_vim, target_vld, vld_params, target_sdn
766 ):
767 if vld_params.get("ip-profile"):
768 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
769 "ip-profile"
770 ]
771 if vld_params.get("provider-network"):
772 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
773 "provider-network"
774 ]
775 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
776 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
777 "provider-network"
778 ]["sdn-ports"]
779 if vld_params.get("wimAccountId"):
780 target_wim = "wim:{}".format(vld_params["wimAccountId"])
781 target_vld["vim_info"][target_wim] = {}
782 for param in ("vim-network-name", "vim-network-id"):
783 if vld_params.get(param):
784 if isinstance(vld_params[param], dict):
785 for vim, vim_net in vld_params[param].items():
786 other_target_vim = "vim:" + vim
787 populate_dict(
788 target_vld["vim_info"],
789 (other_target_vim, param.replace("-", "_")),
790 vim_net,
791 )
792 else: # isinstance str
793 target_vld["vim_info"][target_vim][
794 param.replace("-", "_")
795 ] = vld_params[param]
796 if vld_params.get("common_id"):
797 target_vld["common_id"] = vld_params.get("common_id")
798
799 nslcmop_id = db_nslcmop["_id"]
800 target = {
801 "name": db_nsr["name"],
802 "ns": {"vld": []},
803 "vnf": [],
804 "image": deepcopy(db_nsr["image"]),
805 "flavor": deepcopy(db_nsr["flavor"]),
806 "action_id": nslcmop_id,
807 "cloud_init_content": {},
808 }
809 for image in target["image"]:
810 image["vim_info"] = {}
811 for flavor in target["flavor"]:
812 flavor["vim_info"] = {}
813
814 if db_nslcmop.get("lcmOperationType") != "instantiate":
815 # get parameters of instantiation:
816 db_nslcmop_instantiate = self.db.get_list(
817 "nslcmops",
818 {
819 "nsInstanceId": db_nslcmop["nsInstanceId"],
820 "lcmOperationType": "instantiate",
821 },
822 )[-1]
823 ns_params = db_nslcmop_instantiate.get("operationParams")
824 else:
825 ns_params = db_nslcmop.get("operationParams")
826 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
827 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
828
829 cp2target = {}
830 for vld_index, vld in enumerate(db_nsr.get("vld")):
831 target_vim = "vim:{}".format(ns_params["vimAccountId"])
832 target_vld = {
833 "id": vld["id"],
834 "name": vld["name"],
835 "mgmt-network": vld.get("mgmt-network", False),
836 "type": vld.get("type"),
837 "vim_info": {
838 target_vim: {
839 "vim_network_name": vld.get("vim-network-name"),
840 "vim_account_id": ns_params["vimAccountId"],
841 }
842 },
843 }
844 # check if this network needs SDN assist
845 if vld.get("pci-interfaces"):
846 db_vim = get_vim_account(ns_params["vimAccountId"])
847 sdnc_id = db_vim["config"].get("sdn-controller")
848 if sdnc_id:
849 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
850 target_sdn = "sdn:{}".format(sdnc_id)
851 target_vld["vim_info"][target_sdn] = {
852 "sdn": True,
853 "target_vim": target_vim,
854 "vlds": [sdn_vld],
855 "type": vld.get("type"),
856 }
857
858 nsd_vnf_profiles = get_vnf_profiles(nsd)
859 for nsd_vnf_profile in nsd_vnf_profiles:
860 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
861 if cp["virtual-link-profile-id"] == vld["id"]:
862 cp2target[
863 "member_vnf:{}.{}".format(
864 cp["constituent-cpd-id"][0][
865 "constituent-base-element-id"
866 ],
867 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
868 )
869 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
870
871 # check at nsd descriptor, if there is an ip-profile
872 vld_params = {}
873 nsd_vlp = find_in_list(
874 get_virtual_link_profiles(nsd),
875 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
876 == vld["id"],
877 )
878 if (
879 nsd_vlp
880 and nsd_vlp.get("virtual-link-protocol-data")
881 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
882 ):
883 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
884 "l3-protocol-data"
885 ]
886 ip_profile_dest_data = {}
887 if "ip-version" in ip_profile_source_data:
888 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
889 "ip-version"
890 ]
891 if "cidr" in ip_profile_source_data:
892 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
893 "cidr"
894 ]
895 if "gateway-ip" in ip_profile_source_data:
896 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
897 "gateway-ip"
898 ]
899 if "dhcp-enabled" in ip_profile_source_data:
900 ip_profile_dest_data["dhcp-params"] = {
901 "enabled": ip_profile_source_data["dhcp-enabled"]
902 }
903 vld_params["ip-profile"] = ip_profile_dest_data
904
905 # update vld_params with instantiation params
906 vld_instantiation_params = find_in_list(
907 get_iterable(ns_params, "vld"),
908 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
909 )
910 if vld_instantiation_params:
911 vld_params.update(vld_instantiation_params)
912 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
913 target["ns"]["vld"].append(target_vld)
914
915 for vnfr in db_vnfrs.values():
916 vnfd = find_in_list(
917 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
918 )
919 vnf_params = find_in_list(
920 get_iterable(ns_params, "vnf"),
921 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
922 )
923 target_vnf = deepcopy(vnfr)
924 target_vim = "vim:{}".format(vnfr["vim-account-id"])
925 for vld in target_vnf.get("vld", ()):
926 # check if connected to a ns.vld, to fill target'
927 vnf_cp = find_in_list(
928 vnfd.get("int-virtual-link-desc", ()),
929 lambda cpd: cpd.get("id") == vld["id"],
930 )
931 if vnf_cp:
932 ns_cp = "member_vnf:{}.{}".format(
933 vnfr["member-vnf-index-ref"], vnf_cp["id"]
934 )
935 if cp2target.get(ns_cp):
936 vld["target"] = cp2target[ns_cp]
937
938 vld["vim_info"] = {
939 target_vim: {"vim_network_name": vld.get("vim-network-name")}
940 }
941 # check if this network needs SDN assist
942 target_sdn = None
943 if vld.get("pci-interfaces"):
944 db_vim = get_vim_account(vnfr["vim-account-id"])
945 sdnc_id = db_vim["config"].get("sdn-controller")
946 if sdnc_id:
947 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
948 target_sdn = "sdn:{}".format(sdnc_id)
949 vld["vim_info"][target_sdn] = {
950 "sdn": True,
951 "target_vim": target_vim,
952 "vlds": [sdn_vld],
953 "type": vld.get("type"),
954 }
955
956 # check at vnfd descriptor, if there is an ip-profile
957 vld_params = {}
958 vnfd_vlp = find_in_list(
959 get_virtual_link_profiles(vnfd),
960 lambda a_link_profile: a_link_profile["id"] == vld["id"],
961 )
962 if (
963 vnfd_vlp
964 and vnfd_vlp.get("virtual-link-protocol-data")
965 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
966 ):
967 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
968 "l3-protocol-data"
969 ]
970 ip_profile_dest_data = {}
971 if "ip-version" in ip_profile_source_data:
972 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
973 "ip-version"
974 ]
975 if "cidr" in ip_profile_source_data:
976 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
977 "cidr"
978 ]
979 if "gateway-ip" in ip_profile_source_data:
980 ip_profile_dest_data[
981 "gateway-address"
982 ] = ip_profile_source_data["gateway-ip"]
983 if "dhcp-enabled" in ip_profile_source_data:
984 ip_profile_dest_data["dhcp-params"] = {
985 "enabled": ip_profile_source_data["dhcp-enabled"]
986 }
987
988 vld_params["ip-profile"] = ip_profile_dest_data
989 # update vld_params with instantiation params
990 if vnf_params:
991 vld_instantiation_params = find_in_list(
992 get_iterable(vnf_params, "internal-vld"),
993 lambda i_vld: i_vld["name"] == vld["id"],
994 )
995 if vld_instantiation_params:
996 vld_params.update(vld_instantiation_params)
997 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
998
999 vdur_list = []
1000 for vdur in target_vnf.get("vdur", ()):
1001 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1002 continue # This vdu must not be created
1003 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1004
1005 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1006
1007 if ssh_keys_all:
1008 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1009 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1010 if (
1011 vdu_configuration
1012 and vdu_configuration.get("config-access")
1013 and vdu_configuration.get("config-access").get("ssh-access")
1014 ):
1015 vdur["ssh-keys"] = ssh_keys_all
1016 vdur["ssh-access-required"] = vdu_configuration[
1017 "config-access"
1018 ]["ssh-access"]["required"]
1019 elif (
1020 vnf_configuration
1021 and vnf_configuration.get("config-access")
1022 and vnf_configuration.get("config-access").get("ssh-access")
1023 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1024 ):
1025 vdur["ssh-keys"] = ssh_keys_all
1026 vdur["ssh-access-required"] = vnf_configuration[
1027 "config-access"
1028 ]["ssh-access"]["required"]
1029 elif ssh_keys_instantiation and find_in_list(
1030 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1031 ):
1032 vdur["ssh-keys"] = ssh_keys_instantiation
1033
1034 self.logger.debug("NS > vdur > {}".format(vdur))
1035
1036 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1037 # cloud-init
1038 if vdud.get("cloud-init-file"):
1039 vdur["cloud-init"] = "{}:file:{}".format(
1040 vnfd["_id"], vdud.get("cloud-init-file")
1041 )
1042 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1043 if vdur["cloud-init"] not in target["cloud_init_content"]:
1044 base_folder = vnfd["_admin"]["storage"]
1045 cloud_init_file = "{}/{}/cloud_init/{}".format(
1046 base_folder["folder"],
1047 base_folder["pkg-dir"],
1048 vdud.get("cloud-init-file"),
1049 )
1050 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1051 target["cloud_init_content"][
1052 vdur["cloud-init"]
1053 ] = ci_file.read()
1054 elif vdud.get("cloud-init"):
1055 vdur["cloud-init"] = "{}:vdu:{}".format(
1056 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1057 )
1058 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1059 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1060 "cloud-init"
1061 ]
1062 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1063 deploy_params_vdu = self._format_additional_params(
1064 vdur.get("additionalParams") or {}
1065 )
1066 deploy_params_vdu["OSM"] = get_osm_params(
1067 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1068 )
1069 vdur["additionalParams"] = deploy_params_vdu
1070
1071 # flavor
1072 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1073 if target_vim not in ns_flavor["vim_info"]:
1074 ns_flavor["vim_info"][target_vim] = {}
1075
1076 # deal with images
1077 # in case alternative images are provided we must check if they should be applied
1078 # for the vim_type, modify the vim_type taking into account
1079 ns_image_id = int(vdur["ns-image-id"])
1080 if vdur.get("alt-image-ids"):
1081 db_vim = get_vim_account(vnfr["vim-account-id"])
1082 vim_type = db_vim["vim_type"]
1083 for alt_image_id in vdur.get("alt-image-ids"):
1084 ns_alt_image = target["image"][int(alt_image_id)]
1085 if vim_type == ns_alt_image.get("vim-type"):
1086 # must use alternative image
1087 self.logger.debug(
1088 "use alternative image id: {}".format(alt_image_id)
1089 )
1090 ns_image_id = alt_image_id
1091 vdur["ns-image-id"] = ns_image_id
1092 break
1093 ns_image = target["image"][int(ns_image_id)]
1094 if target_vim not in ns_image["vim_info"]:
1095 ns_image["vim_info"][target_vim] = {}
1096
1097 vdur["vim_info"] = {target_vim: {}}
1098 # instantiation parameters
1099 # if vnf_params:
1100 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1101 # vdud["id"]), None)
1102 vdur_list.append(vdur)
1103 target_vnf["vdur"] = vdur_list
1104 target["vnf"].append(target_vnf)
1105
1106 desc = await self.RO.deploy(nsr_id, target)
1107 self.logger.debug("RO return > {}".format(desc))
1108 action_id = desc["action_id"]
1109 await self._wait_ng_ro(
1110 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1111 )
1112
1113 # Updating NSR
1114 db_nsr_update = {
1115 "_admin.deployed.RO.operational-status": "running",
1116 "detailed-status": " ".join(stage),
1117 }
1118 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1119 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1120 self._write_op_status(nslcmop_id, stage)
1121 self.logger.debug(
1122 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1123 )
1124 return
1125
1126 async def _wait_ng_ro(
1127 self,
1128 nsr_id,
1129 action_id,
1130 nslcmop_id=None,
1131 start_time=None,
1132 timeout=600,
1133 stage=None,
1134 ):
1135 detailed_status_old = None
1136 db_nsr_update = {}
1137 start_time = start_time or time()
1138 while time() <= start_time + timeout:
1139 desc_status = await self.RO.status(nsr_id, action_id)
1140 self.logger.debug("Wait NG RO > {}".format(desc_status))
1141 if desc_status["status"] == "FAILED":
1142 raise NgRoException(desc_status["details"])
1143 elif desc_status["status"] == "BUILD":
1144 if stage:
1145 stage[2] = "VIM: ({})".format(desc_status["details"])
1146 elif desc_status["status"] == "DONE":
1147 if stage:
1148 stage[2] = "Deployed at VIM"
1149 break
1150 else:
1151 assert False, "ROclient.check_ns_status returns unknown {}".format(
1152 desc_status["status"]
1153 )
1154 if stage and nslcmop_id and stage[2] != detailed_status_old:
1155 detailed_status_old = stage[2]
1156 db_nsr_update["detailed-status"] = " ".join(stage)
1157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1158 self._write_op_status(nslcmop_id, stage)
1159 await asyncio.sleep(15, loop=self.loop)
1160 else: # timeout_ns_deploy
1161 raise NgRoException("Timeout waiting ns to deploy")
1162
1163 async def _terminate_ng_ro(
1164 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1165 ):
1166 db_nsr_update = {}
1167 failed_detail = []
1168 action_id = None
1169 start_deploy = time()
1170 try:
1171 target = {
1172 "ns": {"vld": []},
1173 "vnf": [],
1174 "image": [],
1175 "flavor": [],
1176 "action_id": nslcmop_id,
1177 }
1178 desc = await self.RO.deploy(nsr_id, target)
1179 action_id = desc["action_id"]
1180 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1181 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1182 self.logger.debug(
1183 logging_text
1184 + "ns terminate action at RO. action_id={}".format(action_id)
1185 )
1186
1187 # wait until done
1188 delete_timeout = 20 * 60 # 20 minutes
1189 await self._wait_ng_ro(
1190 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1191 )
1192
1193 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1194 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1195 # delete all nsr
1196 await self.RO.delete(nsr_id)
1197 except Exception as e:
1198 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1199 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1200 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1201 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1202 self.logger.debug(
1203 logging_text + "RO_action_id={} already deleted".format(action_id)
1204 )
1205 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1206 failed_detail.append("delete conflict: {}".format(e))
1207 self.logger.debug(
1208 logging_text
1209 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1210 )
1211 else:
1212 failed_detail.append("delete error: {}".format(e))
1213 self.logger.error(
1214 logging_text
1215 + "RO_action_id={} delete error: {}".format(action_id, e)
1216 )
1217
1218 if failed_detail:
1219 stage[2] = "Error deleting from VIM"
1220 else:
1221 stage[2] = "Deleted from VIM"
1222 db_nsr_update["detailed-status"] = " ".join(stage)
1223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1224 self._write_op_status(nslcmop_id, stage)
1225
1226 if failed_detail:
1227 raise LcmException("; ".join(failed_detail))
1228 return
1229
1230 async def instantiate_RO(
1231 self,
1232 logging_text,
1233 nsr_id,
1234 nsd,
1235 db_nsr,
1236 db_nslcmop,
1237 db_vnfrs,
1238 db_vnfds,
1239 n2vc_key_list,
1240 stage,
1241 ):
1242 """
1243 Instantiate at RO
1244 :param logging_text: preffix text to use at logging
1245 :param nsr_id: nsr identity
1246 :param nsd: database content of ns descriptor
1247 :param db_nsr: database content of ns record
1248 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1249 :param db_vnfrs:
1250 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1251 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1252 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1253 :return: None or exception
1254 """
1255 try:
1256 start_deploy = time()
1257 ns_params = db_nslcmop.get("operationParams")
1258 if ns_params and ns_params.get("timeout_ns_deploy"):
1259 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1260 else:
1261 timeout_ns_deploy = self.timeout.get(
1262 "ns_deploy", self.timeout_ns_deploy
1263 )
1264
1265 # Check for and optionally request placement optimization. Database will be updated if placement activated
1266 stage[2] = "Waiting for Placement."
1267 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1268 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1269 for vnfr in db_vnfrs.values():
1270 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1271 break
1272 else:
1273 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1274
1275 return await self._instantiate_ng_ro(
1276 logging_text,
1277 nsr_id,
1278 nsd,
1279 db_nsr,
1280 db_nslcmop,
1281 db_vnfrs,
1282 db_vnfds,
1283 n2vc_key_list,
1284 stage,
1285 start_deploy,
1286 timeout_ns_deploy,
1287 )
1288 except Exception as e:
1289 stage[2] = "ERROR deploying at VIM"
1290 self.set_vnfr_at_error(db_vnfrs, str(e))
1291 self.logger.error(
1292 "Error deploying at VIM {}".format(e),
1293 exc_info=not isinstance(
1294 e,
1295 (
1296 ROclient.ROClientException,
1297 LcmException,
1298 DbException,
1299 NgRoException,
1300 ),
1301 ),
1302 )
1303 raise
1304
1305 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1306 """
1307 Wait for kdu to be up, get ip address
1308 :param logging_text: prefix use for logging
1309 :param nsr_id:
1310 :param vnfr_id:
1311 :param kdu_name:
1312 :return: IP address
1313 """
1314
1315 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1316 nb_tries = 0
1317
1318 while nb_tries < 360:
1319 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1320 kdur = next(
1321 (
1322 x
1323 for x in get_iterable(db_vnfr, "kdur")
1324 if x.get("kdu-name") == kdu_name
1325 ),
1326 None,
1327 )
1328 if not kdur:
1329 raise LcmException(
1330 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1331 )
1332 if kdur.get("status"):
1333 if kdur["status"] in ("READY", "ENABLED"):
1334 return kdur.get("ip-address")
1335 else:
1336 raise LcmException(
1337 "target KDU={} is in error state".format(kdu_name)
1338 )
1339
1340 await asyncio.sleep(10, loop=self.loop)
1341 nb_tries += 1
1342 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1343
1344 async def wait_vm_up_insert_key_ro(
1345 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1346 ):
1347 """
1348 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1349 :param logging_text: prefix use for logging
1350 :param nsr_id:
1351 :param vnfr_id:
1352 :param vdu_id:
1353 :param vdu_index:
1354 :param pub_key: public ssh key to inject, None to skip
1355 :param user: user to apply the public ssh key
1356 :return: IP address
1357 """
1358
1359 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1360 ro_nsr_id = None
1361 ip_address = None
1362 nb_tries = 0
1363 target_vdu_id = None
1364 ro_retries = 0
1365
1366 while True:
1367
1368 ro_retries += 1
1369 if ro_retries >= 360: # 1 hour
1370 raise LcmException(
1371 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1372 )
1373
1374 await asyncio.sleep(10, loop=self.loop)
1375
1376 # get ip address
1377 if not target_vdu_id:
1378 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1379
1380 if not vdu_id: # for the VNF case
1381 if db_vnfr.get("status") == "ERROR":
1382 raise LcmException(
1383 "Cannot inject ssh-key because target VNF is in error state"
1384 )
1385 ip_address = db_vnfr.get("ip-address")
1386 if not ip_address:
1387 continue
1388 vdur = next(
1389 (
1390 x
1391 for x in get_iterable(db_vnfr, "vdur")
1392 if x.get("ip-address") == ip_address
1393 ),
1394 None,
1395 )
1396 else: # VDU case
1397 vdur = next(
1398 (
1399 x
1400 for x in get_iterable(db_vnfr, "vdur")
1401 if x.get("vdu-id-ref") == vdu_id
1402 and x.get("count-index") == vdu_index
1403 ),
1404 None,
1405 )
1406
1407 if (
1408 not vdur and len(db_vnfr.get("vdur", ())) == 1
1409 ): # If only one, this should be the target vdu
1410 vdur = db_vnfr["vdur"][0]
1411 if not vdur:
1412 raise LcmException(
1413 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1414 vnfr_id, vdu_id, vdu_index
1415 )
1416 )
1417 # New generation RO stores information at "vim_info"
1418 ng_ro_status = None
1419 target_vim = None
1420 if vdur.get("vim_info"):
1421 target_vim = next(
1422 t for t in vdur["vim_info"]
1423 ) # there should be only one key
1424 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1425 if (
1426 vdur.get("pdu-type")
1427 or vdur.get("status") == "ACTIVE"
1428 or ng_ro_status == "ACTIVE"
1429 ):
1430 ip_address = vdur.get("ip-address")
1431 if not ip_address:
1432 continue
1433 target_vdu_id = vdur["vdu-id-ref"]
1434 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1435 raise LcmException(
1436 "Cannot inject ssh-key because target VM is in error state"
1437 )
1438
1439 if not target_vdu_id:
1440 continue
1441
1442 # inject public key into machine
1443 if pub_key and user:
1444 self.logger.debug(logging_text + "Inserting RO key")
1445 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1446 if vdur.get("pdu-type"):
1447 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1448 return ip_address
1449 try:
1450 ro_vm_id = "{}-{}".format(
1451 db_vnfr["member-vnf-index-ref"], target_vdu_id
1452 ) # TODO add vdu_index
1453 if self.ng_ro:
1454 target = {
1455 "action": {
1456 "action": "inject_ssh_key",
1457 "key": pub_key,
1458 "user": user,
1459 },
1460 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1461 }
1462 desc = await self.RO.deploy(nsr_id, target)
1463 action_id = desc["action_id"]
1464 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1465 break
1466 else:
1467 # wait until NS is deployed at RO
1468 if not ro_nsr_id:
1469 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1470 ro_nsr_id = deep_get(
1471 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1472 )
1473 if not ro_nsr_id:
1474 continue
1475 result_dict = await self.RO.create_action(
1476 item="ns",
1477 item_id_name=ro_nsr_id,
1478 descriptor={
1479 "add_public_key": pub_key,
1480 "vms": [ro_vm_id],
1481 "user": user,
1482 },
1483 )
1484 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1485 if not result_dict or not isinstance(result_dict, dict):
1486 raise LcmException(
1487 "Unknown response from RO when injecting key"
1488 )
1489 for result in result_dict.values():
1490 if result.get("vim_result") == 200:
1491 break
1492 else:
1493 raise ROclient.ROClientException(
1494 "error injecting key: {}".format(
1495 result.get("description")
1496 )
1497 )
1498 break
1499 except NgRoException as e:
1500 raise LcmException(
1501 "Reaching max tries injecting key. Error: {}".format(e)
1502 )
1503 except ROclient.ROClientException as e:
1504 if not nb_tries:
1505 self.logger.debug(
1506 logging_text
1507 + "error injecting key: {}. Retrying until {} seconds".format(
1508 e, 20 * 10
1509 )
1510 )
1511 nb_tries += 1
1512 if nb_tries >= 20:
1513 raise LcmException(
1514 "Reaching max tries injecting key. Error: {}".format(e)
1515 )
1516 else:
1517 break
1518
1519 return ip_address
1520
1521 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1522 """
1523 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1524 """
1525 my_vca = vca_deployed_list[vca_index]
1526 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1527 # vdu or kdu: no dependencies
1528 return
1529 timeout = 300
1530 while timeout >= 0:
1531 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1532 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1533 configuration_status_list = db_nsr["configurationStatus"]
1534 for index, vca_deployed in enumerate(configuration_status_list):
1535 if index == vca_index:
1536 # myself
1537 continue
1538 if not my_vca.get("member-vnf-index") or (
1539 vca_deployed.get("member-vnf-index")
1540 == my_vca.get("member-vnf-index")
1541 ):
1542 internal_status = configuration_status_list[index].get("status")
1543 if internal_status == "READY":
1544 continue
1545 elif internal_status == "BROKEN":
1546 raise LcmException(
1547 "Configuration aborted because dependent charm/s has failed"
1548 )
1549 else:
1550 break
1551 else:
1552 # no dependencies, return
1553 return
1554 await asyncio.sleep(10)
1555 timeout -= 1
1556
1557 raise LcmException("Configuration aborted because dependent charm/s timeout")
1558
1559 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1560 vca_id = None
1561 if db_vnfr:
1562 vca_id = deep_get(db_vnfr, ("vca-id",))
1563 elif db_nsr:
1564 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1565 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1566 return vca_id
1567
1568 async def instantiate_N2VC(
1569 self,
1570 logging_text,
1571 vca_index,
1572 nsi_id,
1573 db_nsr,
1574 db_vnfr,
1575 vdu_id,
1576 kdu_name,
1577 vdu_index,
1578 config_descriptor,
1579 deploy_params,
1580 base_folder,
1581 nslcmop_id,
1582 stage,
1583 vca_type,
1584 vca_name,
1585 ee_config_descriptor,
1586 ):
1587 nsr_id = db_nsr["_id"]
1588 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1589 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1590 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1591 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1592 db_dict = {
1593 "collection": "nsrs",
1594 "filter": {"_id": nsr_id},
1595 "path": db_update_entry,
1596 }
1597 step = ""
1598 try:
1599
1600 element_type = "NS"
1601 element_under_configuration = nsr_id
1602
1603 vnfr_id = None
1604 if db_vnfr:
1605 vnfr_id = db_vnfr["_id"]
1606 osm_config["osm"]["vnf_id"] = vnfr_id
1607
1608 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1609
1610 if vca_type == "native_charm":
1611 index_number = 0
1612 else:
1613 index_number = vdu_index or 0
1614
1615 if vnfr_id:
1616 element_type = "VNF"
1617 element_under_configuration = vnfr_id
1618 namespace += ".{}-{}".format(vnfr_id, index_number)
1619 if vdu_id:
1620 namespace += ".{}-{}".format(vdu_id, index_number)
1621 element_type = "VDU"
1622 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1623 osm_config["osm"]["vdu_id"] = vdu_id
1624 elif kdu_name:
1625 namespace += ".{}".format(kdu_name)
1626 element_type = "KDU"
1627 element_under_configuration = kdu_name
1628 osm_config["osm"]["kdu_name"] = kdu_name
1629
1630 # Get artifact path
1631 artifact_path = "{}/{}/{}/{}".format(
1632 base_folder["folder"],
1633 base_folder["pkg-dir"],
1634 "charms"
1635 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1636 else "helm-charts",
1637 vca_name,
1638 )
1639
1640 self.logger.debug("Artifact path > {}".format(artifact_path))
1641
1642 # get initial_config_primitive_list that applies to this element
1643 initial_config_primitive_list = config_descriptor.get(
1644 "initial-config-primitive"
1645 )
1646
1647 self.logger.debug(
1648 "Initial config primitive list > {}".format(
1649 initial_config_primitive_list
1650 )
1651 )
1652
1653 # add config if not present for NS charm
1654 ee_descriptor_id = ee_config_descriptor.get("id")
1655 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1656 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1657 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1658 )
1659
1660 self.logger.debug(
1661 "Initial config primitive list #2 > {}".format(
1662 initial_config_primitive_list
1663 )
1664 )
1665 # n2vc_redesign STEP 3.1
1666 # find old ee_id if exists
1667 ee_id = vca_deployed.get("ee_id")
1668
1669 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1670 # create or register execution environment in VCA
1671 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1672
1673 self._write_configuration_status(
1674 nsr_id=nsr_id,
1675 vca_index=vca_index,
1676 status="CREATING",
1677 element_under_configuration=element_under_configuration,
1678 element_type=element_type,
1679 )
1680
1681 step = "create execution environment"
1682 self.logger.debug(logging_text + step)
1683
1684 ee_id = None
1685 credentials = None
1686 if vca_type == "k8s_proxy_charm":
1687 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1688 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1689 namespace=namespace,
1690 artifact_path=artifact_path,
1691 db_dict=db_dict,
1692 vca_id=vca_id,
1693 )
1694 elif vca_type == "helm" or vca_type == "helm-v3":
1695 ee_id, credentials = await self.vca_map[
1696 vca_type
1697 ].create_execution_environment(
1698 namespace=namespace,
1699 reuse_ee_id=ee_id,
1700 db_dict=db_dict,
1701 config=osm_config,
1702 artifact_path=artifact_path,
1703 vca_type=vca_type,
1704 )
1705 else:
1706 ee_id, credentials = await self.vca_map[
1707 vca_type
1708 ].create_execution_environment(
1709 namespace=namespace,
1710 reuse_ee_id=ee_id,
1711 db_dict=db_dict,
1712 vca_id=vca_id,
1713 )
1714
1715 elif vca_type == "native_charm":
1716 step = "Waiting to VM being up and getting IP address"
1717 self.logger.debug(logging_text + step)
1718 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1719 logging_text,
1720 nsr_id,
1721 vnfr_id,
1722 vdu_id,
1723 vdu_index,
1724 user=None,
1725 pub_key=None,
1726 )
1727 credentials = {"hostname": rw_mgmt_ip}
1728 # get username
1729 username = deep_get(
1730 config_descriptor, ("config-access", "ssh-access", "default-user")
1731 )
1732 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1733 # merged. Meanwhile let's get username from initial-config-primitive
1734 if not username and initial_config_primitive_list:
1735 for config_primitive in initial_config_primitive_list:
1736 for param in config_primitive.get("parameter", ()):
1737 if param["name"] == "ssh-username":
1738 username = param["value"]
1739 break
1740 if not username:
1741 raise LcmException(
1742 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1743 "'config-access.ssh-access.default-user'"
1744 )
1745 credentials["username"] = username
1746 # n2vc_redesign STEP 3.2
1747
1748 self._write_configuration_status(
1749 nsr_id=nsr_id,
1750 vca_index=vca_index,
1751 status="REGISTERING",
1752 element_under_configuration=element_under_configuration,
1753 element_type=element_type,
1754 )
1755
1756 step = "register execution environment {}".format(credentials)
1757 self.logger.debug(logging_text + step)
1758 ee_id = await self.vca_map[vca_type].register_execution_environment(
1759 credentials=credentials,
1760 namespace=namespace,
1761 db_dict=db_dict,
1762 vca_id=vca_id,
1763 )
1764
1765 # for compatibility with MON/POL modules, the need model and application name at database
1766 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1767 ee_id_parts = ee_id.split(".")
1768 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1769 if len(ee_id_parts) >= 2:
1770 model_name = ee_id_parts[0]
1771 application_name = ee_id_parts[1]
1772 db_nsr_update[db_update_entry + "model"] = model_name
1773 db_nsr_update[db_update_entry + "application"] = application_name
1774
1775 # n2vc_redesign STEP 3.3
1776 step = "Install configuration Software"
1777
1778 self._write_configuration_status(
1779 nsr_id=nsr_id,
1780 vca_index=vca_index,
1781 status="INSTALLING SW",
1782 element_under_configuration=element_under_configuration,
1783 element_type=element_type,
1784 other_update=db_nsr_update,
1785 )
1786
1787 # TODO check if already done
1788 self.logger.debug(logging_text + step)
1789 config = None
1790 if vca_type == "native_charm":
1791 config_primitive = next(
1792 (p for p in initial_config_primitive_list if p["name"] == "config"),
1793 None,
1794 )
1795 if config_primitive:
1796 config = self._map_primitive_params(
1797 config_primitive, {}, deploy_params
1798 )
1799 num_units = 1
1800 if vca_type == "lxc_proxy_charm":
1801 if element_type == "NS":
1802 num_units = db_nsr.get("config-units") or 1
1803 elif element_type == "VNF":
1804 num_units = db_vnfr.get("config-units") or 1
1805 elif element_type == "VDU":
1806 for v in db_vnfr["vdur"]:
1807 if vdu_id == v["vdu-id-ref"]:
1808 num_units = v.get("config-units") or 1
1809 break
1810 if vca_type != "k8s_proxy_charm":
1811 await self.vca_map[vca_type].install_configuration_sw(
1812 ee_id=ee_id,
1813 artifact_path=artifact_path,
1814 db_dict=db_dict,
1815 config=config,
1816 num_units=num_units,
1817 vca_id=vca_id,
1818 vca_type=vca_type,
1819 )
1820
1821 # write in db flag of configuration_sw already installed
1822 self.update_db_2(
1823 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1824 )
1825
1826 # add relations for this VCA (wait for other peers related with this VCA)
1827 await self._add_vca_relations(
1828 logging_text=logging_text,
1829 nsr_id=nsr_id,
1830 vca_index=vca_index,
1831 vca_id=vca_id,
1832 vca_type=vca_type,
1833 )
1834
1835 # if SSH access is required, then get execution environment SSH public
1836 # if native charm we have waited already to VM be UP
1837 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1838 pub_key = None
1839 user = None
1840 # self.logger.debug("get ssh key block")
1841 if deep_get(
1842 config_descriptor, ("config-access", "ssh-access", "required")
1843 ):
1844 # self.logger.debug("ssh key needed")
1845 # Needed to inject a ssh key
1846 user = deep_get(
1847 config_descriptor,
1848 ("config-access", "ssh-access", "default-user"),
1849 )
1850 step = "Install configuration Software, getting public ssh key"
1851 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1852 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1853 )
1854
1855 step = "Insert public key into VM user={} ssh_key={}".format(
1856 user, pub_key
1857 )
1858 else:
1859 # self.logger.debug("no need to get ssh key")
1860 step = "Waiting to VM being up and getting IP address"
1861 self.logger.debug(logging_text + step)
1862
1863 # n2vc_redesign STEP 5.1
1864 # wait for RO (ip-address) Insert pub_key into VM
1865 if vnfr_id:
1866 if kdu_name:
1867 rw_mgmt_ip = await self.wait_kdu_up(
1868 logging_text, nsr_id, vnfr_id, kdu_name
1869 )
1870 else:
1871 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1872 logging_text,
1873 nsr_id,
1874 vnfr_id,
1875 vdu_id,
1876 vdu_index,
1877 user=user,
1878 pub_key=pub_key,
1879 )
1880 else:
1881 rw_mgmt_ip = None # This is for a NS configuration
1882
1883 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1884
1885 # store rw_mgmt_ip in deploy params for later replacement
1886 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1887
1888 # n2vc_redesign STEP 6 Execute initial config primitive
1889 step = "execute initial config primitive"
1890
1891 # wait for dependent primitives execution (NS -> VNF -> VDU)
1892 if initial_config_primitive_list:
1893 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1894
1895 # stage, in function of element type: vdu, kdu, vnf or ns
1896 my_vca = vca_deployed_list[vca_index]
1897 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1898 # VDU or KDU
1899 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1900 elif my_vca.get("member-vnf-index"):
1901 # VNF
1902 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1903 else:
1904 # NS
1905 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1906
1907 self._write_configuration_status(
1908 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1909 )
1910
1911 self._write_op_status(op_id=nslcmop_id, stage=stage)
1912
1913 check_if_terminated_needed = True
1914 for initial_config_primitive in initial_config_primitive_list:
1915 # adding information on the vca_deployed if it is a NS execution environment
1916 if not vca_deployed["member-vnf-index"]:
1917 deploy_params["ns_config_info"] = json.dumps(
1918 self._get_ns_config_info(nsr_id)
1919 )
1920 # TODO check if already done
1921 primitive_params_ = self._map_primitive_params(
1922 initial_config_primitive, {}, deploy_params
1923 )
1924
1925 step = "execute primitive '{}' params '{}'".format(
1926 initial_config_primitive["name"], primitive_params_
1927 )
1928 self.logger.debug(logging_text + step)
1929 await self.vca_map[vca_type].exec_primitive(
1930 ee_id=ee_id,
1931 primitive_name=initial_config_primitive["name"],
1932 params_dict=primitive_params_,
1933 db_dict=db_dict,
1934 vca_id=vca_id,
1935 vca_type=vca_type,
1936 )
1937 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1938 if check_if_terminated_needed:
1939 if config_descriptor.get("terminate-config-primitive"):
1940 self.update_db_2(
1941 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1942 )
1943 check_if_terminated_needed = False
1944
1945 # TODO register in database that primitive is done
1946
1947 # STEP 7 Configure metrics
1948 if vca_type == "helm" or vca_type == "helm-v3":
1949 prometheus_jobs = await self.add_prometheus_metrics(
1950 ee_id=ee_id,
1951 artifact_path=artifact_path,
1952 ee_config_descriptor=ee_config_descriptor,
1953 vnfr_id=vnfr_id,
1954 nsr_id=nsr_id,
1955 target_ip=rw_mgmt_ip,
1956 )
1957 if prometheus_jobs:
1958 self.update_db_2(
1959 "nsrs",
1960 nsr_id,
1961 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1962 )
1963
1964 step = "instantiated at VCA"
1965 self.logger.debug(logging_text + step)
1966
1967 self._write_configuration_status(
1968 nsr_id=nsr_id, vca_index=vca_index, status="READY"
1969 )
1970
1971 except Exception as e: # TODO not use Exception but N2VC exception
1972 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1973 if not isinstance(
1974 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
1975 ):
1976 self.logger.error(
1977 "Exception while {} : {}".format(step, e), exc_info=True
1978 )
1979 self._write_configuration_status(
1980 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
1981 )
1982 raise LcmException("{} {}".format(step, e)) from e
1983
1984 def _write_ns_status(
1985 self,
1986 nsr_id: str,
1987 ns_state: str,
1988 current_operation: str,
1989 current_operation_id: str,
1990 error_description: str = None,
1991 error_detail: str = None,
1992 other_update: dict = None,
1993 ):
1994 """
1995 Update db_nsr fields.
1996 :param nsr_id:
1997 :param ns_state:
1998 :param current_operation:
1999 :param current_operation_id:
2000 :param error_description:
2001 :param error_detail:
2002 :param other_update: Other required changes at database if provided, will be cleared
2003 :return:
2004 """
2005 try:
2006 db_dict = other_update or {}
2007 db_dict[
2008 "_admin.nslcmop"
2009 ] = current_operation_id # for backward compatibility
2010 db_dict["_admin.current-operation"] = current_operation_id
2011 db_dict["_admin.operation-type"] = (
2012 current_operation if current_operation != "IDLE" else None
2013 )
2014 db_dict["currentOperation"] = current_operation
2015 db_dict["currentOperationID"] = current_operation_id
2016 db_dict["errorDescription"] = error_description
2017 db_dict["errorDetail"] = error_detail
2018
2019 if ns_state:
2020 db_dict["nsState"] = ns_state
2021 self.update_db_2("nsrs", nsr_id, db_dict)
2022 except DbException as e:
2023 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2024
2025 def _write_op_status(
2026 self,
2027 op_id: str,
2028 stage: list = None,
2029 error_message: str = None,
2030 queuePosition: int = 0,
2031 operation_state: str = None,
2032 other_update: dict = None,
2033 ):
2034 try:
2035 db_dict = other_update or {}
2036 db_dict["queuePosition"] = queuePosition
2037 if isinstance(stage, list):
2038 db_dict["stage"] = stage[0]
2039 db_dict["detailed-status"] = " ".join(stage)
2040 elif stage is not None:
2041 db_dict["stage"] = str(stage)
2042
2043 if error_message is not None:
2044 db_dict["errorMessage"] = error_message
2045 if operation_state is not None:
2046 db_dict["operationState"] = operation_state
2047 db_dict["statusEnteredTime"] = time()
2048 self.update_db_2("nslcmops", op_id, db_dict)
2049 except DbException as e:
2050 self.logger.warn(
2051 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2052 )
2053
2054 def _write_all_config_status(self, db_nsr: dict, status: str):
2055 try:
2056 nsr_id = db_nsr["_id"]
2057 # configurationStatus
2058 config_status = db_nsr.get("configurationStatus")
2059 if config_status:
2060 db_nsr_update = {
2061 "configurationStatus.{}.status".format(index): status
2062 for index, v in enumerate(config_status)
2063 if v
2064 }
2065 # update status
2066 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2067
2068 except DbException as e:
2069 self.logger.warn(
2070 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2071 )
2072
2073 def _write_configuration_status(
2074 self,
2075 nsr_id: str,
2076 vca_index: int,
2077 status: str = None,
2078 element_under_configuration: str = None,
2079 element_type: str = None,
2080 other_update: dict = None,
2081 ):
2082
2083 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2084 # .format(vca_index, status))
2085
2086 try:
2087 db_path = "configurationStatus.{}.".format(vca_index)
2088 db_dict = other_update or {}
2089 if status:
2090 db_dict[db_path + "status"] = status
2091 if element_under_configuration:
2092 db_dict[
2093 db_path + "elementUnderConfiguration"
2094 ] = element_under_configuration
2095 if element_type:
2096 db_dict[db_path + "elementType"] = element_type
2097 self.update_db_2("nsrs", nsr_id, db_dict)
2098 except DbException as e:
2099 self.logger.warn(
2100 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2101 status, nsr_id, vca_index, e
2102 )
2103 )
2104
2105 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2106 """
2107 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2108 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2109 Database is used because the result can be obtained from a different LCM worker in case of HA.
2110 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2111 :param db_nslcmop: database content of nslcmop
2112 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2113 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2114 computed 'vim-account-id'
2115 """
2116 modified = False
2117 nslcmop_id = db_nslcmop["_id"]
2118 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2119 if placement_engine == "PLA":
2120 self.logger.debug(
2121 logging_text + "Invoke and wait for placement optimization"
2122 )
2123 await self.msg.aiowrite(
2124 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2125 )
2126 db_poll_interval = 5
2127 wait = db_poll_interval * 10
2128 pla_result = None
2129 while not pla_result and wait >= 0:
2130 await asyncio.sleep(db_poll_interval)
2131 wait -= db_poll_interval
2132 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2133 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2134
2135 if not pla_result:
2136 raise LcmException(
2137 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2138 )
2139
2140 for pla_vnf in pla_result["vnf"]:
2141 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2142 if not pla_vnf.get("vimAccountId") or not vnfr:
2143 continue
2144 modified = True
2145 self.db.set_one(
2146 "vnfrs",
2147 {"_id": vnfr["_id"]},
2148 {"vim-account-id": pla_vnf["vimAccountId"]},
2149 )
2150 # Modifies db_vnfrs
2151 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2152 return modified
2153
2154 def update_nsrs_with_pla_result(self, params):
2155 try:
2156 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2157 self.update_db_2(
2158 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2159 )
2160 except Exception as e:
2161 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2162
2163 async def instantiate(self, nsr_id, nslcmop_id):
2164 """
2165
2166 :param nsr_id: ns instance to deploy
2167 :param nslcmop_id: operation to run
2168 :return:
2169 """
2170
2171 # Try to lock HA task here
2172 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2173 if not task_is_locked_by_me:
2174 self.logger.debug(
2175 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2176 )
2177 return
2178
2179 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2180 self.logger.debug(logging_text + "Enter")
2181
2182 # get all needed from database
2183
2184 # database nsrs record
2185 db_nsr = None
2186
2187 # database nslcmops record
2188 db_nslcmop = None
2189
2190 # update operation on nsrs
2191 db_nsr_update = {}
2192 # update operation on nslcmops
2193 db_nslcmop_update = {}
2194
2195 nslcmop_operation_state = None
2196 db_vnfrs = {} # vnf's info indexed by member-index
2197 # n2vc_info = {}
2198 tasks_dict_info = {} # from task to info text
2199 exc = None
2200 error_list = []
2201 stage = [
2202 "Stage 1/5: preparation of the environment.",
2203 "Waiting for previous operations to terminate.",
2204 "",
2205 ]
2206 # ^ stage, step, VIM progress
2207 try:
2208 # wait for any previous tasks in process
2209 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2210
2211 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2212 stage[1] = "Reading from database."
2213 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2214 db_nsr_update["detailed-status"] = "creating"
2215 db_nsr_update["operational-status"] = "init"
2216 self._write_ns_status(
2217 nsr_id=nsr_id,
2218 ns_state="BUILDING",
2219 current_operation="INSTANTIATING",
2220 current_operation_id=nslcmop_id,
2221 other_update=db_nsr_update,
2222 )
2223 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2224
2225 # read from db: operation
2226 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2227 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2228 ns_params = db_nslcmop.get("operationParams")
2229 if ns_params and ns_params.get("timeout_ns_deploy"):
2230 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2231 else:
2232 timeout_ns_deploy = self.timeout.get(
2233 "ns_deploy", self.timeout_ns_deploy
2234 )
2235
2236 # read from db: ns
2237 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2238 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2239 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2240 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2241 self.fs.sync(db_nsr["nsd-id"])
2242 db_nsr["nsd"] = nsd
2243 # nsr_name = db_nsr["name"] # TODO short-name??
2244
2245 # read from db: vnf's of this ns
2246 stage[1] = "Getting vnfrs from db."
2247 self.logger.debug(logging_text + stage[1])
2248 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2249
2250 # read from db: vnfd's for every vnf
2251 db_vnfds = [] # every vnfd data
2252
2253 # for each vnf in ns, read vnfd
2254 for vnfr in db_vnfrs_list:
2255 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2256 vnfd_id = vnfr["vnfd-id"]
2257 vnfd_ref = vnfr["vnfd-ref"]
2258 self.fs.sync(vnfd_id)
2259
2260 # if we haven't this vnfd, read it from db
2261 if vnfd_id not in db_vnfds:
2262 # read from db
2263 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2264 vnfd_id, vnfd_ref
2265 )
2266 self.logger.debug(logging_text + stage[1])
2267 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2268
2269 # store vnfd
2270 db_vnfds.append(vnfd)
2271
2272 # Get or generates the _admin.deployed.VCA list
2273 vca_deployed_list = None
2274 if db_nsr["_admin"].get("deployed"):
2275 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2276 if vca_deployed_list is None:
2277 vca_deployed_list = []
2278 configuration_status_list = []
2279 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2280 db_nsr_update["configurationStatus"] = configuration_status_list
2281 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2282 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2283 elif isinstance(vca_deployed_list, dict):
2284 # maintain backward compatibility. Change a dict to list at database
2285 vca_deployed_list = list(vca_deployed_list.values())
2286 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2287 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2288
2289 if not isinstance(
2290 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2291 ):
2292 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2293 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2294
2295 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2296 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2297 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2298 self.db.set_list(
2299 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2300 )
2301
2302 # n2vc_redesign STEP 2 Deploy Network Scenario
2303 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2304 self._write_op_status(op_id=nslcmop_id, stage=stage)
2305
2306 stage[1] = "Deploying KDUs."
2307 # self.logger.debug(logging_text + "Before deploy_kdus")
2308 # Call to deploy_kdus in case exists the "vdu:kdu" param
2309 await self.deploy_kdus(
2310 logging_text=logging_text,
2311 nsr_id=nsr_id,
2312 nslcmop_id=nslcmop_id,
2313 db_vnfrs=db_vnfrs,
2314 db_vnfds=db_vnfds,
2315 task_instantiation_info=tasks_dict_info,
2316 )
2317
2318 stage[1] = "Getting VCA public key."
2319 # n2vc_redesign STEP 1 Get VCA public ssh-key
2320 # feature 1429. Add n2vc public key to needed VMs
2321 n2vc_key = self.n2vc.get_public_key()
2322 n2vc_key_list = [n2vc_key]
2323 if self.vca_config.get("public_key"):
2324 n2vc_key_list.append(self.vca_config["public_key"])
2325
2326 stage[1] = "Deploying NS at VIM."
2327 task_ro = asyncio.ensure_future(
2328 self.instantiate_RO(
2329 logging_text=logging_text,
2330 nsr_id=nsr_id,
2331 nsd=nsd,
2332 db_nsr=db_nsr,
2333 db_nslcmop=db_nslcmop,
2334 db_vnfrs=db_vnfrs,
2335 db_vnfds=db_vnfds,
2336 n2vc_key_list=n2vc_key_list,
2337 stage=stage,
2338 )
2339 )
2340 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2341 tasks_dict_info[task_ro] = "Deploying at VIM"
2342
2343 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2344 stage[1] = "Deploying Execution Environments."
2345 self.logger.debug(logging_text + stage[1])
2346
2347 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2348 for vnf_profile in get_vnf_profiles(nsd):
2349 vnfd_id = vnf_profile["vnfd-id"]
2350 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2351 member_vnf_index = str(vnf_profile["id"])
2352 db_vnfr = db_vnfrs[member_vnf_index]
2353 base_folder = vnfd["_admin"]["storage"]
2354 vdu_id = None
2355 vdu_index = 0
2356 vdu_name = None
2357 kdu_name = None
2358
2359 # Get additional parameters
2360 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2361 if db_vnfr.get("additionalParamsForVnf"):
2362 deploy_params.update(
2363 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2364 )
2365
2366 descriptor_config = get_configuration(vnfd, vnfd["id"])
2367 if descriptor_config:
2368 self._deploy_n2vc(
2369 logging_text=logging_text
2370 + "member_vnf_index={} ".format(member_vnf_index),
2371 db_nsr=db_nsr,
2372 db_vnfr=db_vnfr,
2373 nslcmop_id=nslcmop_id,
2374 nsr_id=nsr_id,
2375 nsi_id=nsi_id,
2376 vnfd_id=vnfd_id,
2377 vdu_id=vdu_id,
2378 kdu_name=kdu_name,
2379 member_vnf_index=member_vnf_index,
2380 vdu_index=vdu_index,
2381 vdu_name=vdu_name,
2382 deploy_params=deploy_params,
2383 descriptor_config=descriptor_config,
2384 base_folder=base_folder,
2385 task_instantiation_info=tasks_dict_info,
2386 stage=stage,
2387 )
2388
2389 # Deploy charms for each VDU that supports one.
2390 for vdud in get_vdu_list(vnfd):
2391 vdu_id = vdud["id"]
2392 descriptor_config = get_configuration(vnfd, vdu_id)
2393 vdur = find_in_list(
2394 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2395 )
2396
2397 if vdur.get("additionalParams"):
2398 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2399 else:
2400 deploy_params_vdu = deploy_params
2401 deploy_params_vdu["OSM"] = get_osm_params(
2402 db_vnfr, vdu_id, vdu_count_index=0
2403 )
2404 vdud_count = get_number_of_instances(vnfd, vdu_id)
2405
2406 self.logger.debug("VDUD > {}".format(vdud))
2407 self.logger.debug(
2408 "Descriptor config > {}".format(descriptor_config)
2409 )
2410 if descriptor_config:
2411 vdu_name = None
2412 kdu_name = None
2413 for vdu_index in range(vdud_count):
2414 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2415 self._deploy_n2vc(
2416 logging_text=logging_text
2417 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2418 member_vnf_index, vdu_id, vdu_index
2419 ),
2420 db_nsr=db_nsr,
2421 db_vnfr=db_vnfr,
2422 nslcmop_id=nslcmop_id,
2423 nsr_id=nsr_id,
2424 nsi_id=nsi_id,
2425 vnfd_id=vnfd_id,
2426 vdu_id=vdu_id,
2427 kdu_name=kdu_name,
2428 member_vnf_index=member_vnf_index,
2429 vdu_index=vdu_index,
2430 vdu_name=vdu_name,
2431 deploy_params=deploy_params_vdu,
2432 descriptor_config=descriptor_config,
2433 base_folder=base_folder,
2434 task_instantiation_info=tasks_dict_info,
2435 stage=stage,
2436 )
2437 for kdud in get_kdu_list(vnfd):
2438 kdu_name = kdud["name"]
2439 descriptor_config = get_configuration(vnfd, kdu_name)
2440 if descriptor_config:
2441 vdu_id = None
2442 vdu_index = 0
2443 vdu_name = None
2444 kdur = next(
2445 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2446 )
2447 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2448 if kdur.get("additionalParams"):
2449 deploy_params_kdu = parse_yaml_strings(
2450 kdur["additionalParams"]
2451 )
2452
2453 self._deploy_n2vc(
2454 logging_text=logging_text,
2455 db_nsr=db_nsr,
2456 db_vnfr=db_vnfr,
2457 nslcmop_id=nslcmop_id,
2458 nsr_id=nsr_id,
2459 nsi_id=nsi_id,
2460 vnfd_id=vnfd_id,
2461 vdu_id=vdu_id,
2462 kdu_name=kdu_name,
2463 member_vnf_index=member_vnf_index,
2464 vdu_index=vdu_index,
2465 vdu_name=vdu_name,
2466 deploy_params=deploy_params_kdu,
2467 descriptor_config=descriptor_config,
2468 base_folder=base_folder,
2469 task_instantiation_info=tasks_dict_info,
2470 stage=stage,
2471 )
2472
2473 # Check if this NS has a charm configuration
2474 descriptor_config = nsd.get("ns-configuration")
2475 if descriptor_config and descriptor_config.get("juju"):
2476 vnfd_id = None
2477 db_vnfr = None
2478 member_vnf_index = None
2479 vdu_id = None
2480 kdu_name = None
2481 vdu_index = 0
2482 vdu_name = None
2483
2484 # Get additional parameters
2485 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2486 if db_nsr.get("additionalParamsForNs"):
2487 deploy_params.update(
2488 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2489 )
2490 base_folder = nsd["_admin"]["storage"]
2491 self._deploy_n2vc(
2492 logging_text=logging_text,
2493 db_nsr=db_nsr,
2494 db_vnfr=db_vnfr,
2495 nslcmop_id=nslcmop_id,
2496 nsr_id=nsr_id,
2497 nsi_id=nsi_id,
2498 vnfd_id=vnfd_id,
2499 vdu_id=vdu_id,
2500 kdu_name=kdu_name,
2501 member_vnf_index=member_vnf_index,
2502 vdu_index=vdu_index,
2503 vdu_name=vdu_name,
2504 deploy_params=deploy_params,
2505 descriptor_config=descriptor_config,
2506 base_folder=base_folder,
2507 task_instantiation_info=tasks_dict_info,
2508 stage=stage,
2509 )
2510
2511 # rest of staff will be done at finally
2512
2513 except (
2514 ROclient.ROClientException,
2515 DbException,
2516 LcmException,
2517 N2VCException,
2518 ) as e:
2519 self.logger.error(
2520 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2521 )
2522 exc = e
2523 except asyncio.CancelledError:
2524 self.logger.error(
2525 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2526 )
2527 exc = "Operation was cancelled"
2528 except Exception as e:
2529 exc = traceback.format_exc()
2530 self.logger.critical(
2531 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2532 exc_info=True,
2533 )
2534 finally:
2535 if exc:
2536 error_list.append(str(exc))
2537 try:
2538 # wait for pending tasks
2539 if tasks_dict_info:
2540 stage[1] = "Waiting for instantiate pending tasks."
2541 self.logger.debug(logging_text + stage[1])
2542 error_list += await self._wait_for_tasks(
2543 logging_text,
2544 tasks_dict_info,
2545 timeout_ns_deploy,
2546 stage,
2547 nslcmop_id,
2548 nsr_id=nsr_id,
2549 )
2550 stage[1] = stage[2] = ""
2551 except asyncio.CancelledError:
2552 error_list.append("Cancelled")
2553 # TODO cancel all tasks
2554 except Exception as exc:
2555 error_list.append(str(exc))
2556
2557 # update operation-status
2558 db_nsr_update["operational-status"] = "running"
2559 # let's begin with VCA 'configured' status (later we can change it)
2560 db_nsr_update["config-status"] = "configured"
2561 for task, task_name in tasks_dict_info.items():
2562 if not task.done() or task.cancelled() or task.exception():
2563 if task_name.startswith(self.task_name_deploy_vca):
2564 # A N2VC task is pending
2565 db_nsr_update["config-status"] = "failed"
2566 else:
2567 # RO or KDU task is pending
2568 db_nsr_update["operational-status"] = "failed"
2569
2570 # update status at database
2571 if error_list:
2572 error_detail = ". ".join(error_list)
2573 self.logger.error(logging_text + error_detail)
2574 error_description_nslcmop = "{} Detail: {}".format(
2575 stage[0], error_detail
2576 )
2577 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2578 nslcmop_id, stage[0]
2579 )
2580
2581 db_nsr_update["detailed-status"] = (
2582 error_description_nsr + " Detail: " + error_detail
2583 )
2584 db_nslcmop_update["detailed-status"] = error_detail
2585 nslcmop_operation_state = "FAILED"
2586 ns_state = "BROKEN"
2587 else:
2588 error_detail = None
2589 error_description_nsr = error_description_nslcmop = None
2590 ns_state = "READY"
2591 db_nsr_update["detailed-status"] = "Done"
2592 db_nslcmop_update["detailed-status"] = "Done"
2593 nslcmop_operation_state = "COMPLETED"
2594
2595 if db_nsr:
2596 self._write_ns_status(
2597 nsr_id=nsr_id,
2598 ns_state=ns_state,
2599 current_operation="IDLE",
2600 current_operation_id=None,
2601 error_description=error_description_nsr,
2602 error_detail=error_detail,
2603 other_update=db_nsr_update,
2604 )
2605 self._write_op_status(
2606 op_id=nslcmop_id,
2607 stage="",
2608 error_message=error_description_nslcmop,
2609 operation_state=nslcmop_operation_state,
2610 other_update=db_nslcmop_update,
2611 )
2612
2613 if nslcmop_operation_state:
2614 try:
2615 await self.msg.aiowrite(
2616 "ns",
2617 "instantiated",
2618 {
2619 "nsr_id": nsr_id,
2620 "nslcmop_id": nslcmop_id,
2621 "operationState": nslcmop_operation_state,
2622 },
2623 loop=self.loop,
2624 )
2625 except Exception as e:
2626 self.logger.error(
2627 logging_text + "kafka_write notification Exception {}".format(e)
2628 )
2629
2630 self.logger.debug(logging_text + "Exit")
2631 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2632
2633 async def _add_vca_relations(
2634 self,
2635 logging_text,
2636 nsr_id,
2637 vca_index: int,
2638 timeout: int = 3600,
2639 vca_type: str = None,
2640 vca_id: str = None,
2641 ) -> bool:
2642
2643 # steps:
2644 # 1. find all relations for this VCA
2645 # 2. wait for other peers related
2646 # 3. add relations
2647
2648 try:
2649 vca_type = vca_type or "lxc_proxy_charm"
2650
2651 # STEP 1: find all relations for this VCA
2652
2653 # read nsr record
2654 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2655 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2656
2657 # this VCA data
2658 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2659
2660 # read all ns-configuration relations
2661 ns_relations = list()
2662 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2663 if db_ns_relations:
2664 for r in db_ns_relations:
2665 # check if this VCA is in the relation
2666 if my_vca.get("member-vnf-index") in (
2667 r.get("entities")[0].get("id"),
2668 r.get("entities")[1].get("id"),
2669 ):
2670 ns_relations.append(r)
2671
2672 # read all vnf-configuration relations
2673 vnf_relations = list()
2674 db_vnfd_list = db_nsr.get("vnfd-id")
2675 if db_vnfd_list:
2676 for vnfd in db_vnfd_list:
2677 db_vnf_relations = None
2678 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2679 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2680 if db_vnf_configuration:
2681 db_vnf_relations = db_vnf_configuration.get("relation", [])
2682 if db_vnf_relations:
2683 for r in db_vnf_relations:
2684 # check if this VCA is in the relation
2685 if my_vca.get("vdu_id") in (
2686 r.get("entities")[0].get("id"),
2687 r.get("entities")[1].get("id"),
2688 ):
2689 vnf_relations.append(r)
2690
2691 # if no relations, terminate
2692 if not ns_relations and not vnf_relations:
2693 self.logger.debug(logging_text + " No relations")
2694 return True
2695
2696 self.logger.debug(
2697 logging_text
2698 + " adding relations\n {}\n {}".format(
2699 ns_relations, vnf_relations
2700 )
2701 )
2702
2703 # add all relations
2704 start = time()
2705 while True:
2706 # check timeout
2707 now = time()
2708 if now - start >= timeout:
2709 self.logger.error(logging_text + " : timeout adding relations")
2710 return False
2711
2712 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2713 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2714
2715 # for each defined NS relation, find the VCA's related
2716 for r in ns_relations.copy():
2717 from_vca_ee_id = None
2718 to_vca_ee_id = None
2719 from_vca_endpoint = None
2720 to_vca_endpoint = None
2721 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2722 for vca in vca_list:
2723 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2724 "id"
2725 ) and vca.get("config_sw_installed"):
2726 from_vca_ee_id = vca.get("ee_id")
2727 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2728 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2729 "id"
2730 ) and vca.get("config_sw_installed"):
2731 to_vca_ee_id = vca.get("ee_id")
2732 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2733 if from_vca_ee_id and to_vca_ee_id:
2734 # add relation
2735 await self.vca_map[vca_type].add_relation(
2736 ee_id_1=from_vca_ee_id,
2737 ee_id_2=to_vca_ee_id,
2738 endpoint_1=from_vca_endpoint,
2739 endpoint_2=to_vca_endpoint,
2740 vca_id=vca_id,
2741 )
2742 # remove entry from relations list
2743 ns_relations.remove(r)
2744 else:
2745 # check failed peers
2746 try:
2747 vca_status_list = db_nsr.get("configurationStatus")
2748 if vca_status_list:
2749 for i in range(len(vca_list)):
2750 vca = vca_list[i]
2751 vca_status = vca_status_list[i]
2752 if vca.get("member-vnf-index") == r.get("entities")[
2753 0
2754 ].get("id"):
2755 if vca_status.get("status") == "BROKEN":
2756 # peer broken: remove relation from list
2757 ns_relations.remove(r)
2758 if vca.get("member-vnf-index") == r.get("entities")[
2759 1
2760 ].get("id"):
2761 if vca_status.get("status") == "BROKEN":
2762 # peer broken: remove relation from list
2763 ns_relations.remove(r)
2764 except Exception:
2765 # ignore
2766 pass
2767
2768 # for each defined VNF relation, find the VCA's related
2769 for r in vnf_relations.copy():
2770 from_vca_ee_id = None
2771 to_vca_ee_id = None
2772 from_vca_endpoint = None
2773 to_vca_endpoint = None
2774 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2775 for vca in vca_list:
2776 key_to_check = "vdu_id"
2777 if vca.get("vdu_id") is None:
2778 key_to_check = "vnfd_id"
2779 if vca.get(key_to_check) == r.get("entities")[0].get(
2780 "id"
2781 ) and vca.get("config_sw_installed"):
2782 from_vca_ee_id = vca.get("ee_id")
2783 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2784 if vca.get(key_to_check) == r.get("entities")[1].get(
2785 "id"
2786 ) and vca.get("config_sw_installed"):
2787 to_vca_ee_id = vca.get("ee_id")
2788 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2789 if from_vca_ee_id and to_vca_ee_id:
2790 # add relation
2791 await self.vca_map[vca_type].add_relation(
2792 ee_id_1=from_vca_ee_id,
2793 ee_id_2=to_vca_ee_id,
2794 endpoint_1=from_vca_endpoint,
2795 endpoint_2=to_vca_endpoint,
2796 vca_id=vca_id,
2797 )
2798 # remove entry from relations list
2799 vnf_relations.remove(r)
2800 else:
2801 # check failed peers
2802 try:
2803 vca_status_list = db_nsr.get("configurationStatus")
2804 if vca_status_list:
2805 for i in range(len(vca_list)):
2806 vca = vca_list[i]
2807 vca_status = vca_status_list[i]
2808 if vca.get("vdu_id") == r.get("entities")[0].get(
2809 "id"
2810 ):
2811 if vca_status.get("status") == "BROKEN":
2812 # peer broken: remove relation from list
2813 vnf_relations.remove(r)
2814 if vca.get("vdu_id") == r.get("entities")[1].get(
2815 "id"
2816 ):
2817 if vca_status.get("status") == "BROKEN":
2818 # peer broken: remove relation from list
2819 vnf_relations.remove(r)
2820 except Exception:
2821 # ignore
2822 pass
2823
2824 # wait for next try
2825 await asyncio.sleep(5.0)
2826
2827 if not ns_relations and not vnf_relations:
2828 self.logger.debug("Relations added")
2829 break
2830
2831 return True
2832
2833 except Exception as e:
2834 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2835 return False
2836
2837 async def _install_kdu(
2838 self,
2839 nsr_id: str,
2840 nsr_db_path: str,
2841 vnfr_data: dict,
2842 kdu_index: int,
2843 kdud: dict,
2844 vnfd: dict,
2845 k8s_instance_info: dict,
2846 k8params: dict = None,
2847 timeout: int = 600,
2848 vca_id: str = None,
2849 ):
2850
2851 try:
2852 k8sclustertype = k8s_instance_info["k8scluster-type"]
2853 # Instantiate kdu
2854 db_dict_install = {
2855 "collection": "nsrs",
2856 "filter": {"_id": nsr_id},
2857 "path": nsr_db_path,
2858 }
2859
2860 if k8s_instance_info.get("kdu-deployment-name"):
2861 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2862 else:
2863 kdu_instance = self.k8scluster_map[
2864 k8sclustertype
2865 ].generate_kdu_instance_name(
2866 db_dict=db_dict_install,
2867 kdu_model=k8s_instance_info["kdu-model"],
2868 kdu_name=k8s_instance_info["kdu-name"],
2869 )
2870 self.update_db_2(
2871 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2872 )
2873 await self.k8scluster_map[k8sclustertype].install(
2874 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2875 kdu_model=k8s_instance_info["kdu-model"],
2876 atomic=True,
2877 params=k8params,
2878 db_dict=db_dict_install,
2879 timeout=timeout,
2880 kdu_name=k8s_instance_info["kdu-name"],
2881 namespace=k8s_instance_info["namespace"],
2882 kdu_instance=kdu_instance,
2883 vca_id=vca_id,
2884 )
2885 self.update_db_2(
2886 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2887 )
2888
2889 # Obtain services to obtain management service ip
2890 services = await self.k8scluster_map[k8sclustertype].get_services(
2891 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2892 kdu_instance=kdu_instance,
2893 namespace=k8s_instance_info["namespace"],
2894 )
2895
2896 # Obtain management service info (if exists)
2897 vnfr_update_dict = {}
2898 kdu_config = get_configuration(vnfd, kdud["name"])
2899 if kdu_config:
2900 target_ee_list = kdu_config.get("execution-environment-list", [])
2901 else:
2902 target_ee_list = []
2903
2904 if services:
2905 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2906 mgmt_services = [
2907 service
2908 for service in kdud.get("service", [])
2909 if service.get("mgmt-service")
2910 ]
2911 for mgmt_service in mgmt_services:
2912 for service in services:
2913 if service["name"].startswith(mgmt_service["name"]):
2914 # Mgmt service found, Obtain service ip
2915 ip = service.get("external_ip", service.get("cluster_ip"))
2916 if isinstance(ip, list) and len(ip) == 1:
2917 ip = ip[0]
2918
2919 vnfr_update_dict[
2920 "kdur.{}.ip-address".format(kdu_index)
2921 ] = ip
2922
2923 # Check if must update also mgmt ip at the vnf
2924 service_external_cp = mgmt_service.get(
2925 "external-connection-point-ref"
2926 )
2927 if service_external_cp:
2928 if (
2929 deep_get(vnfd, ("mgmt-interface", "cp"))
2930 == service_external_cp
2931 ):
2932 vnfr_update_dict["ip-address"] = ip
2933
2934 if find_in_list(
2935 target_ee_list,
2936 lambda ee: ee.get(
2937 "external-connection-point-ref", ""
2938 )
2939 == service_external_cp,
2940 ):
2941 vnfr_update_dict[
2942 "kdur.{}.ip-address".format(kdu_index)
2943 ] = ip
2944 break
2945 else:
2946 self.logger.warn(
2947 "Mgmt service name: {} not found".format(
2948 mgmt_service["name"]
2949 )
2950 )
2951
2952 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2953 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2954
2955 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2956 if (
2957 kdu_config
2958 and kdu_config.get("initial-config-primitive")
2959 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
2960 ):
2961 initial_config_primitive_list = kdu_config.get(
2962 "initial-config-primitive"
2963 )
2964 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2965
2966 for initial_config_primitive in initial_config_primitive_list:
2967 primitive_params_ = self._map_primitive_params(
2968 initial_config_primitive, {}, {}
2969 )
2970
2971 await asyncio.wait_for(
2972 self.k8scluster_map[k8sclustertype].exec_primitive(
2973 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2974 kdu_instance=kdu_instance,
2975 primitive_name=initial_config_primitive["name"],
2976 params=primitive_params_,
2977 db_dict=db_dict_install,
2978 vca_id=vca_id,
2979 ),
2980 timeout=timeout,
2981 )
2982
2983 except Exception as e:
2984 # Prepare update db with error and raise exception
2985 try:
2986 self.update_db_2(
2987 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
2988 )
2989 self.update_db_2(
2990 "vnfrs",
2991 vnfr_data.get("_id"),
2992 {"kdur.{}.status".format(kdu_index): "ERROR"},
2993 )
2994 except Exception:
2995 # ignore to keep original exception
2996 pass
2997 # reraise original error
2998 raise
2999
3000 return kdu_instance
3001
3002 async def deploy_kdus(
3003 self,
3004 logging_text,
3005 nsr_id,
3006 nslcmop_id,
3007 db_vnfrs,
3008 db_vnfds,
3009 task_instantiation_info,
3010 ):
3011 # Launch kdus if present in the descriptor
3012
3013 k8scluster_id_2_uuic = {
3014 "helm-chart-v3": {},
3015 "helm-chart": {},
3016 "juju-bundle": {},
3017 }
3018
3019 async def _get_cluster_id(cluster_id, cluster_type):
3020 nonlocal k8scluster_id_2_uuic
3021 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3022 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3023
3024 # check if K8scluster is creating and wait look if previous tasks in process
3025 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3026 "k8scluster", cluster_id
3027 )
3028 if task_dependency:
3029 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3030 task_name, cluster_id
3031 )
3032 self.logger.debug(logging_text + text)
3033 await asyncio.wait(task_dependency, timeout=3600)
3034
3035 db_k8scluster = self.db.get_one(
3036 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3037 )
3038 if not db_k8scluster:
3039 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3040
3041 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3042 if not k8s_id:
3043 if cluster_type == "helm-chart-v3":
3044 try:
3045 # backward compatibility for existing clusters that have not been initialized for helm v3
3046 k8s_credentials = yaml.safe_dump(
3047 db_k8scluster.get("credentials")
3048 )
3049 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3050 k8s_credentials, reuse_cluster_uuid=cluster_id
3051 )
3052 db_k8scluster_update = {}
3053 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3054 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3055 db_k8scluster_update[
3056 "_admin.helm-chart-v3.created"
3057 ] = uninstall_sw
3058 db_k8scluster_update[
3059 "_admin.helm-chart-v3.operationalState"
3060 ] = "ENABLED"
3061 self.update_db_2(
3062 "k8sclusters", cluster_id, db_k8scluster_update
3063 )
3064 except Exception as e:
3065 self.logger.error(
3066 logging_text
3067 + "error initializing helm-v3 cluster: {}".format(str(e))
3068 )
3069 raise LcmException(
3070 "K8s cluster '{}' has not been initialized for '{}'".format(
3071 cluster_id, cluster_type
3072 )
3073 )
3074 else:
3075 raise LcmException(
3076 "K8s cluster '{}' has not been initialized for '{}'".format(
3077 cluster_id, cluster_type
3078 )
3079 )
3080 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3081 return k8s_id
3082
3083 logging_text += "Deploy kdus: "
3084 step = ""
3085 try:
3086 db_nsr_update = {"_admin.deployed.K8s": []}
3087 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3088
3089 index = 0
3090 updated_cluster_list = []
3091 updated_v3_cluster_list = []
3092
3093 for vnfr_data in db_vnfrs.values():
3094 vca_id = self.get_vca_id(vnfr_data, {})
3095 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3096 # Step 0: Prepare and set parameters
3097 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3098 vnfd_id = vnfr_data.get("vnfd-id")
3099 vnfd_with_id = find_in_list(
3100 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3101 )
3102 kdud = next(
3103 kdud
3104 for kdud in vnfd_with_id["kdu"]
3105 if kdud["name"] == kdur["kdu-name"]
3106 )
3107 namespace = kdur.get("k8s-namespace")
3108 kdu_deployment_name = kdur.get("kdu-deployment-name")
3109 if kdur.get("helm-chart"):
3110 kdumodel = kdur["helm-chart"]
3111 # Default version: helm3, if helm-version is v2 assign v2
3112 k8sclustertype = "helm-chart-v3"
3113 self.logger.debug("kdur: {}".format(kdur))
3114 if (
3115 kdur.get("helm-version")
3116 and kdur.get("helm-version") == "v2"
3117 ):
3118 k8sclustertype = "helm-chart"
3119 elif kdur.get("juju-bundle"):
3120 kdumodel = kdur["juju-bundle"]
3121 k8sclustertype = "juju-bundle"
3122 else:
3123 raise LcmException(
3124 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3125 "juju-bundle. Maybe an old NBI version is running".format(
3126 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3127 )
3128 )
3129 # check if kdumodel is a file and exists
3130 try:
3131 vnfd_with_id = find_in_list(
3132 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3133 )
3134 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3135 if storage and storage.get(
3136 "pkg-dir"
3137 ): # may be not present if vnfd has not artifacts
3138 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3139 filename = "{}/{}/{}s/{}".format(
3140 storage["folder"],
3141 storage["pkg-dir"],
3142 k8sclustertype,
3143 kdumodel,
3144 )
3145 if self.fs.file_exists(
3146 filename, mode="file"
3147 ) or self.fs.file_exists(filename, mode="dir"):
3148 kdumodel = self.fs.path + filename
3149 except (asyncio.TimeoutError, asyncio.CancelledError):
3150 raise
3151 except Exception: # it is not a file
3152 pass
3153
3154 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3155 step = "Synchronize repos for k8s cluster '{}'".format(
3156 k8s_cluster_id
3157 )
3158 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3159
3160 # Synchronize repos
3161 if (
3162 k8sclustertype == "helm-chart"
3163 and cluster_uuid not in updated_cluster_list
3164 ) or (
3165 k8sclustertype == "helm-chart-v3"
3166 and cluster_uuid not in updated_v3_cluster_list
3167 ):
3168 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3169 self.k8scluster_map[k8sclustertype].synchronize_repos(
3170 cluster_uuid=cluster_uuid
3171 )
3172 )
3173 if del_repo_list or added_repo_dict:
3174 if k8sclustertype == "helm-chart":
3175 unset = {
3176 "_admin.helm_charts_added." + item: None
3177 for item in del_repo_list
3178 }
3179 updated = {
3180 "_admin.helm_charts_added." + item: name
3181 for item, name in added_repo_dict.items()
3182 }
3183 updated_cluster_list.append(cluster_uuid)
3184 elif k8sclustertype == "helm-chart-v3":
3185 unset = {
3186 "_admin.helm_charts_v3_added." + item: None
3187 for item in del_repo_list
3188 }
3189 updated = {
3190 "_admin.helm_charts_v3_added." + item: name
3191 for item, name in added_repo_dict.items()
3192 }
3193 updated_v3_cluster_list.append(cluster_uuid)
3194 self.logger.debug(
3195 logging_text + "repos synchronized on k8s cluster "
3196 "'{}' to_delete: {}, to_add: {}".format(
3197 k8s_cluster_id, del_repo_list, added_repo_dict
3198 )
3199 )
3200 self.db.set_one(
3201 "k8sclusters",
3202 {"_id": k8s_cluster_id},
3203 updated,
3204 unset=unset,
3205 )
3206
3207 # Instantiate kdu
3208 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3209 vnfr_data["member-vnf-index-ref"],
3210 kdur["kdu-name"],
3211 k8s_cluster_id,
3212 )
3213 k8s_instance_info = {
3214 "kdu-instance": None,
3215 "k8scluster-uuid": cluster_uuid,
3216 "k8scluster-type": k8sclustertype,
3217 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3218 "kdu-name": kdur["kdu-name"],
3219 "kdu-model": kdumodel,
3220 "namespace": namespace,
3221 "kdu-deployment-name": kdu_deployment_name,
3222 }
3223 db_path = "_admin.deployed.K8s.{}".format(index)
3224 db_nsr_update[db_path] = k8s_instance_info
3225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3226 vnfd_with_id = find_in_list(
3227 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3228 )
3229 task = asyncio.ensure_future(
3230 self._install_kdu(
3231 nsr_id,
3232 db_path,
3233 vnfr_data,
3234 kdu_index,
3235 kdud,
3236 vnfd_with_id,
3237 k8s_instance_info,
3238 k8params=desc_params,
3239 timeout=600,
3240 vca_id=vca_id,
3241 )
3242 )
3243 self.lcm_tasks.register(
3244 "ns",
3245 nsr_id,
3246 nslcmop_id,
3247 "instantiate_KDU-{}".format(index),
3248 task,
3249 )
3250 task_instantiation_info[task] = "Deploying KDU {}".format(
3251 kdur["kdu-name"]
3252 )
3253
3254 index += 1
3255
3256 except (LcmException, asyncio.CancelledError):
3257 raise
3258 except Exception as e:
3259 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3260 if isinstance(e, (N2VCException, DbException)):
3261 self.logger.error(logging_text + msg)
3262 else:
3263 self.logger.critical(logging_text + msg, exc_info=True)
3264 raise LcmException(msg)
3265 finally:
3266 if db_nsr_update:
3267 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3268
3269 def _deploy_n2vc(
3270 self,
3271 logging_text,
3272 db_nsr,
3273 db_vnfr,
3274 nslcmop_id,
3275 nsr_id,
3276 nsi_id,
3277 vnfd_id,
3278 vdu_id,
3279 kdu_name,
3280 member_vnf_index,
3281 vdu_index,
3282 vdu_name,
3283 deploy_params,
3284 descriptor_config,
3285 base_folder,
3286 task_instantiation_info,
3287 stage,
3288 ):
3289 # launch instantiate_N2VC in a asyncio task and register task object
3290 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3291 # if not found, create one entry and update database
3292 # fill db_nsr._admin.deployed.VCA.<index>
3293
3294 self.logger.debug(
3295 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3296 )
3297 if "execution-environment-list" in descriptor_config:
3298 ee_list = descriptor_config.get("execution-environment-list", [])
3299 elif "juju" in descriptor_config:
3300 ee_list = [descriptor_config] # ns charms
3301 else: # other types as script are not supported
3302 ee_list = []
3303
3304 for ee_item in ee_list:
3305 self.logger.debug(
3306 logging_text
3307 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3308 ee_item.get("juju"), ee_item.get("helm-chart")
3309 )
3310 )
3311 ee_descriptor_id = ee_item.get("id")
3312 if ee_item.get("juju"):
3313 vca_name = ee_item["juju"].get("charm")
3314 vca_type = (
3315 "lxc_proxy_charm"
3316 if ee_item["juju"].get("charm") is not None
3317 else "native_charm"
3318 )
3319 if ee_item["juju"].get("cloud") == "k8s":
3320 vca_type = "k8s_proxy_charm"
3321 elif ee_item["juju"].get("proxy") is False:
3322 vca_type = "native_charm"
3323 elif ee_item.get("helm-chart"):
3324 vca_name = ee_item["helm-chart"]
3325 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3326 vca_type = "helm"
3327 else:
3328 vca_type = "helm-v3"
3329 else:
3330 self.logger.debug(
3331 logging_text + "skipping non juju neither charm configuration"
3332 )
3333 continue
3334
3335 vca_index = -1
3336 for vca_index, vca_deployed in enumerate(
3337 db_nsr["_admin"]["deployed"]["VCA"]
3338 ):
3339 if not vca_deployed:
3340 continue
3341 if (
3342 vca_deployed.get("member-vnf-index") == member_vnf_index
3343 and vca_deployed.get("vdu_id") == vdu_id
3344 and vca_deployed.get("kdu_name") == kdu_name
3345 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3346 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3347 ):
3348 break
3349 else:
3350 # not found, create one.
3351 target = (
3352 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3353 )
3354 if vdu_id:
3355 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3356 elif kdu_name:
3357 target += "/kdu/{}".format(kdu_name)
3358 vca_deployed = {
3359 "target_element": target,
3360 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3361 "member-vnf-index": member_vnf_index,
3362 "vdu_id": vdu_id,
3363 "kdu_name": kdu_name,
3364 "vdu_count_index": vdu_index,
3365 "operational-status": "init", # TODO revise
3366 "detailed-status": "", # TODO revise
3367 "step": "initial-deploy", # TODO revise
3368 "vnfd_id": vnfd_id,
3369 "vdu_name": vdu_name,
3370 "type": vca_type,
3371 "ee_descriptor_id": ee_descriptor_id,
3372 }
3373 vca_index += 1
3374
3375 # create VCA and configurationStatus in db
3376 db_dict = {
3377 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3378 "configurationStatus.{}".format(vca_index): dict(),
3379 }
3380 self.update_db_2("nsrs", nsr_id, db_dict)
3381
3382 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3383
3384 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3385 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3386 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3387
3388 # Launch task
3389 task_n2vc = asyncio.ensure_future(
3390 self.instantiate_N2VC(
3391 logging_text=logging_text,
3392 vca_index=vca_index,
3393 nsi_id=nsi_id,
3394 db_nsr=db_nsr,
3395 db_vnfr=db_vnfr,
3396 vdu_id=vdu_id,
3397 kdu_name=kdu_name,
3398 vdu_index=vdu_index,
3399 deploy_params=deploy_params,
3400 config_descriptor=descriptor_config,
3401 base_folder=base_folder,
3402 nslcmop_id=nslcmop_id,
3403 stage=stage,
3404 vca_type=vca_type,
3405 vca_name=vca_name,
3406 ee_config_descriptor=ee_item,
3407 )
3408 )
3409 self.lcm_tasks.register(
3410 "ns",
3411 nsr_id,
3412 nslcmop_id,
3413 "instantiate_N2VC-{}".format(vca_index),
3414 task_n2vc,
3415 )
3416 task_instantiation_info[
3417 task_n2vc
3418 ] = self.task_name_deploy_vca + " {}.{}".format(
3419 member_vnf_index or "", vdu_id or ""
3420 )
3421
3422 @staticmethod
3423 def _create_nslcmop(nsr_id, operation, params):
3424 """
3425 Creates a ns-lcm-opp content to be stored at database.
3426 :param nsr_id: internal id of the instance
3427 :param operation: instantiate, terminate, scale, action, ...
3428 :param params: user parameters for the operation
3429 :return: dictionary following SOL005 format
3430 """
3431 # Raise exception if invalid arguments
3432 if not (nsr_id and operation and params):
3433 raise LcmException(
3434 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3435 )
3436 now = time()
3437 _id = str(uuid4())
3438 nslcmop = {
3439 "id": _id,
3440 "_id": _id,
3441 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3442 "operationState": "PROCESSING",
3443 "statusEnteredTime": now,
3444 "nsInstanceId": nsr_id,
3445 "lcmOperationType": operation,
3446 "startTime": now,
3447 "isAutomaticInvocation": False,
3448 "operationParams": params,
3449 "isCancelPending": False,
3450 "links": {
3451 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3452 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3453 },
3454 }
3455 return nslcmop
3456
3457 def _format_additional_params(self, params):
3458 params = params or {}
3459 for key, value in params.items():
3460 if str(value).startswith("!!yaml "):
3461 params[key] = yaml.safe_load(value[7:])
3462 return params
3463
3464 def _get_terminate_primitive_params(self, seq, vnf_index):
3465 primitive = seq.get("name")
3466 primitive_params = {}
3467 params = {
3468 "member_vnf_index": vnf_index,
3469 "primitive": primitive,
3470 "primitive_params": primitive_params,
3471 }
3472 desc_params = {}
3473 return self._map_primitive_params(seq, params, desc_params)
3474
3475 # sub-operations
3476
3477 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3478 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3479 if op.get("operationState") == "COMPLETED":
3480 # b. Skip sub-operation
3481 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3482 return self.SUBOPERATION_STATUS_SKIP
3483 else:
3484 # c. retry executing sub-operation
3485 # The sub-operation exists, and operationState != 'COMPLETED'
3486 # Update operationState = 'PROCESSING' to indicate a retry.
3487 operationState = "PROCESSING"
3488 detailed_status = "In progress"
3489 self._update_suboperation_status(
3490 db_nslcmop, op_index, operationState, detailed_status
3491 )
3492 # Return the sub-operation index
3493 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3494 # with arguments extracted from the sub-operation
3495 return op_index
3496
3497 # Find a sub-operation where all keys in a matching dictionary must match
3498 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3499 def _find_suboperation(self, db_nslcmop, match):
3500 if db_nslcmop and match:
3501 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3502 for i, op in enumerate(op_list):
3503 if all(op.get(k) == match[k] for k in match):
3504 return i
3505 return self.SUBOPERATION_STATUS_NOT_FOUND
3506
3507 # Update status for a sub-operation given its index
3508 def _update_suboperation_status(
3509 self, db_nslcmop, op_index, operationState, detailed_status
3510 ):
3511 # Update DB for HA tasks
3512 q_filter = {"_id": db_nslcmop["_id"]}
3513 update_dict = {
3514 "_admin.operations.{}.operationState".format(op_index): operationState,
3515 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3516 }
3517 self.db.set_one(
3518 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3519 )
3520
3521 # Add sub-operation, return the index of the added sub-operation
3522 # Optionally, set operationState, detailed-status, and operationType
3523 # Status and type are currently set for 'scale' sub-operations:
3524 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3525 # 'detailed-status' : status message
3526 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3527 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3528 def _add_suboperation(
3529 self,
3530 db_nslcmop,
3531 vnf_index,
3532 vdu_id,
3533 vdu_count_index,
3534 vdu_name,
3535 primitive,
3536 mapped_primitive_params,
3537 operationState=None,
3538 detailed_status=None,
3539 operationType=None,
3540 RO_nsr_id=None,
3541 RO_scaling_info=None,
3542 ):
3543 if not db_nslcmop:
3544 return self.SUBOPERATION_STATUS_NOT_FOUND
3545 # Get the "_admin.operations" list, if it exists
3546 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3547 op_list = db_nslcmop_admin.get("operations")
3548 # Create or append to the "_admin.operations" list
3549 new_op = {
3550 "member_vnf_index": vnf_index,
3551 "vdu_id": vdu_id,
3552 "vdu_count_index": vdu_count_index,
3553 "primitive": primitive,
3554 "primitive_params": mapped_primitive_params,
3555 }
3556 if operationState:
3557 new_op["operationState"] = operationState
3558 if detailed_status:
3559 new_op["detailed-status"] = detailed_status
3560 if operationType:
3561 new_op["lcmOperationType"] = operationType
3562 if RO_nsr_id:
3563 new_op["RO_nsr_id"] = RO_nsr_id
3564 if RO_scaling_info:
3565 new_op["RO_scaling_info"] = RO_scaling_info
3566 if not op_list:
3567 # No existing operations, create key 'operations' with current operation as first list element
3568 db_nslcmop_admin.update({"operations": [new_op]})
3569 op_list = db_nslcmop_admin.get("operations")
3570 else:
3571 # Existing operations, append operation to list
3572 op_list.append(new_op)
3573
3574 db_nslcmop_update = {"_admin.operations": op_list}
3575 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3576 op_index = len(op_list) - 1
3577 return op_index
3578
3579 # Helper methods for scale() sub-operations
3580
3581 # pre-scale/post-scale:
3582 # Check for 3 different cases:
3583 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3584 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3585 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3586 def _check_or_add_scale_suboperation(
3587 self,
3588 db_nslcmop,
3589 vnf_index,
3590 vnf_config_primitive,
3591 primitive_params,
3592 operationType,
3593 RO_nsr_id=None,
3594 RO_scaling_info=None,
3595 ):
3596 # Find this sub-operation
3597 if RO_nsr_id and RO_scaling_info:
3598 operationType = "SCALE-RO"
3599 match = {
3600 "member_vnf_index": vnf_index,
3601 "RO_nsr_id": RO_nsr_id,
3602 "RO_scaling_info": RO_scaling_info,
3603 }
3604 else:
3605 match = {
3606 "member_vnf_index": vnf_index,
3607 "primitive": vnf_config_primitive,
3608 "primitive_params": primitive_params,
3609 "lcmOperationType": operationType,
3610 }
3611 op_index = self._find_suboperation(db_nslcmop, match)
3612 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3613 # a. New sub-operation
3614 # The sub-operation does not exist, add it.
3615 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3616 # The following parameters are set to None for all kind of scaling:
3617 vdu_id = None
3618 vdu_count_index = None
3619 vdu_name = None
3620 if RO_nsr_id and RO_scaling_info:
3621 vnf_config_primitive = None
3622 primitive_params = None
3623 else:
3624 RO_nsr_id = None
3625 RO_scaling_info = None
3626 # Initial status for sub-operation
3627 operationState = "PROCESSING"
3628 detailed_status = "In progress"
3629 # Add sub-operation for pre/post-scaling (zero or more operations)
3630 self._add_suboperation(
3631 db_nslcmop,
3632 vnf_index,
3633 vdu_id,
3634 vdu_count_index,
3635 vdu_name,
3636 vnf_config_primitive,
3637 primitive_params,
3638 operationState,
3639 detailed_status,
3640 operationType,
3641 RO_nsr_id,
3642 RO_scaling_info,
3643 )
3644 return self.SUBOPERATION_STATUS_NEW
3645 else:
3646 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3647 # or op_index (operationState != 'COMPLETED')
3648 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3649
3650 # Function to return execution_environment id
3651
3652 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3653 # TODO vdu_index_count
3654 for vca in vca_deployed_list:
3655 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3656 return vca["ee_id"]
3657
3658 async def destroy_N2VC(
3659 self,
3660 logging_text,
3661 db_nslcmop,
3662 vca_deployed,
3663 config_descriptor,
3664 vca_index,
3665 destroy_ee=True,
3666 exec_primitives=True,
3667 scaling_in=False,
3668 vca_id: str = None,
3669 ):
3670 """
3671 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3672 :param logging_text:
3673 :param db_nslcmop:
3674 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3675 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3676 :param vca_index: index in the database _admin.deployed.VCA
3677 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3678 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3679 not executed properly
3680 :param scaling_in: True destroys the application, False destroys the model
3681 :return: None or exception
3682 """
3683
3684 self.logger.debug(
3685 logging_text
3686 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3687 vca_index, vca_deployed, config_descriptor, destroy_ee
3688 )
3689 )
3690
3691 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3692
3693 # execute terminate_primitives
3694 if exec_primitives:
3695 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3696 config_descriptor.get("terminate-config-primitive"),
3697 vca_deployed.get("ee_descriptor_id"),
3698 )
3699 vdu_id = vca_deployed.get("vdu_id")
3700 vdu_count_index = vca_deployed.get("vdu_count_index")
3701 vdu_name = vca_deployed.get("vdu_name")
3702 vnf_index = vca_deployed.get("member-vnf-index")
3703 if terminate_primitives and vca_deployed.get("needed_terminate"):
3704 for seq in terminate_primitives:
3705 # For each sequence in list, get primitive and call _ns_execute_primitive()
3706 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3707 vnf_index, seq.get("name")
3708 )
3709 self.logger.debug(logging_text + step)
3710 # Create the primitive for each sequence, i.e. "primitive": "touch"
3711 primitive = seq.get("name")
3712 mapped_primitive_params = self._get_terminate_primitive_params(
3713 seq, vnf_index
3714 )
3715
3716 # Add sub-operation
3717 self._add_suboperation(
3718 db_nslcmop,
3719 vnf_index,
3720 vdu_id,
3721 vdu_count_index,
3722 vdu_name,
3723 primitive,
3724 mapped_primitive_params,
3725 )
3726 # Sub-operations: Call _ns_execute_primitive() instead of action()
3727 try:
3728 result, result_detail = await self._ns_execute_primitive(
3729 vca_deployed["ee_id"],
3730 primitive,
3731 mapped_primitive_params,
3732 vca_type=vca_type,
3733 vca_id=vca_id,
3734 )
3735 except LcmException:
3736 # this happens when VCA is not deployed. In this case it is not needed to terminate
3737 continue
3738 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3739 if result not in result_ok:
3740 raise LcmException(
3741 "terminate_primitive {} for vnf_member_index={} fails with "
3742 "error {}".format(seq.get("name"), vnf_index, result_detail)
3743 )
3744 # set that this VCA do not need terminated
3745 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3746 vca_index
3747 )
3748 self.update_db_2(
3749 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3750 )
3751
3752 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3753 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3754
3755 if destroy_ee:
3756 await self.vca_map[vca_type].delete_execution_environment(
3757 vca_deployed["ee_id"],
3758 scaling_in=scaling_in,
3759 vca_type=vca_type,
3760 vca_id=vca_id,
3761 )
3762
3763 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3764 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3765 namespace = "." + db_nsr["_id"]
3766 try:
3767 await self.n2vc.delete_namespace(
3768 namespace=namespace,
3769 total_timeout=self.timeout_charm_delete,
3770 vca_id=vca_id,
3771 )
3772 except N2VCNotFound: # already deleted. Skip
3773 pass
3774 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3775
3776 async def _terminate_RO(
3777 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3778 ):
3779 """
3780 Terminates a deployment from RO
3781 :param logging_text:
3782 :param nsr_deployed: db_nsr._admin.deployed
3783 :param nsr_id:
3784 :param nslcmop_id:
3785 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3786 this method will update only the index 2, but it will write on database the concatenated content of the list
3787 :return:
3788 """
3789 db_nsr_update = {}
3790 failed_detail = []
3791 ro_nsr_id = ro_delete_action = None
3792 if nsr_deployed and nsr_deployed.get("RO"):
3793 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3794 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3795 try:
3796 if ro_nsr_id:
3797 stage[2] = "Deleting ns from VIM."
3798 db_nsr_update["detailed-status"] = " ".join(stage)
3799 self._write_op_status(nslcmop_id, stage)
3800 self.logger.debug(logging_text + stage[2])
3801 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3802 self._write_op_status(nslcmop_id, stage)
3803 desc = await self.RO.delete("ns", ro_nsr_id)
3804 ro_delete_action = desc["action_id"]
3805 db_nsr_update[
3806 "_admin.deployed.RO.nsr_delete_action_id"
3807 ] = ro_delete_action
3808 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3809 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3810 if ro_delete_action:
3811 # wait until NS is deleted from VIM
3812 stage[2] = "Waiting ns deleted from VIM."
3813 detailed_status_old = None
3814 self.logger.debug(
3815 logging_text
3816 + stage[2]
3817 + " RO_id={} ro_delete_action={}".format(
3818 ro_nsr_id, ro_delete_action
3819 )
3820 )
3821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3822 self._write_op_status(nslcmop_id, stage)
3823
3824 delete_timeout = 20 * 60 # 20 minutes
3825 while delete_timeout > 0:
3826 desc = await self.RO.show(
3827 "ns",
3828 item_id_name=ro_nsr_id,
3829 extra_item="action",
3830 extra_item_id=ro_delete_action,
3831 )
3832
3833 # deploymentStatus
3834 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3835
3836 ns_status, ns_status_info = self.RO.check_action_status(desc)
3837 if ns_status == "ERROR":
3838 raise ROclient.ROClientException(ns_status_info)
3839 elif ns_status == "BUILD":
3840 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3841 elif ns_status == "ACTIVE":
3842 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3843 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3844 break
3845 else:
3846 assert (
3847 False
3848 ), "ROclient.check_action_status returns unknown {}".format(
3849 ns_status
3850 )
3851 if stage[2] != detailed_status_old:
3852 detailed_status_old = stage[2]
3853 db_nsr_update["detailed-status"] = " ".join(stage)
3854 self._write_op_status(nslcmop_id, stage)
3855 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3856 await asyncio.sleep(5, loop=self.loop)
3857 delete_timeout -= 5
3858 else: # delete_timeout <= 0:
3859 raise ROclient.ROClientException(
3860 "Timeout waiting ns deleted from VIM"
3861 )
3862
3863 except Exception as e:
3864 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3865 if (
3866 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3867 ): # not found
3868 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3869 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3870 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3871 self.logger.debug(
3872 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3873 )
3874 elif (
3875 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3876 ): # conflict
3877 failed_detail.append("delete conflict: {}".format(e))
3878 self.logger.debug(
3879 logging_text
3880 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3881 )
3882 else:
3883 failed_detail.append("delete error: {}".format(e))
3884 self.logger.error(
3885 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3886 )
3887
3888 # Delete nsd
3889 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3890 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3891 try:
3892 stage[2] = "Deleting nsd from RO."
3893 db_nsr_update["detailed-status"] = " ".join(stage)
3894 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3895 self._write_op_status(nslcmop_id, stage)
3896 await self.RO.delete("nsd", ro_nsd_id)
3897 self.logger.debug(
3898 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3899 )
3900 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3901 except Exception as e:
3902 if (
3903 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3904 ): # not found
3905 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3906 self.logger.debug(
3907 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3908 )
3909 elif (
3910 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3911 ): # conflict
3912 failed_detail.append(
3913 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3914 )
3915 self.logger.debug(logging_text + failed_detail[-1])
3916 else:
3917 failed_detail.append(
3918 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
3919 )
3920 self.logger.error(logging_text + failed_detail[-1])
3921
3922 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3923 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3924 if not vnf_deployed or not vnf_deployed["id"]:
3925 continue
3926 try:
3927 ro_vnfd_id = vnf_deployed["id"]
3928 stage[
3929 2
3930 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3931 vnf_deployed["member-vnf-index"], ro_vnfd_id
3932 )
3933 db_nsr_update["detailed-status"] = " ".join(stage)
3934 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3935 self._write_op_status(nslcmop_id, stage)
3936 await self.RO.delete("vnfd", ro_vnfd_id)
3937 self.logger.debug(
3938 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
3939 )
3940 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3941 except Exception as e:
3942 if (
3943 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3944 ): # not found
3945 db_nsr_update[
3946 "_admin.deployed.RO.vnfd.{}.id".format(index)
3947 ] = None
3948 self.logger.debug(
3949 logging_text
3950 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
3951 )
3952 elif (
3953 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3954 ): # conflict
3955 failed_detail.append(
3956 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
3957 )
3958 self.logger.debug(logging_text + failed_detail[-1])
3959 else:
3960 failed_detail.append(
3961 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
3962 )
3963 self.logger.error(logging_text + failed_detail[-1])
3964
3965 if failed_detail:
3966 stage[2] = "Error deleting from VIM"
3967 else:
3968 stage[2] = "Deleted from VIM"
3969 db_nsr_update["detailed-status"] = " ".join(stage)
3970 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3971 self._write_op_status(nslcmop_id, stage)
3972
3973 if failed_detail:
3974 raise LcmException("; ".join(failed_detail))
3975
3976 async def terminate(self, nsr_id, nslcmop_id):
3977 # Try to lock HA task here
3978 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
3979 if not task_is_locked_by_me:
3980 return
3981
3982 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3983 self.logger.debug(logging_text + "Enter")
3984 timeout_ns_terminate = self.timeout_ns_terminate
3985 db_nsr = None
3986 db_nslcmop = None
3987 operation_params = None
3988 exc = None
3989 error_list = [] # annotates all failed error messages
3990 db_nslcmop_update = {}
3991 autoremove = False # autoremove after terminated
3992 tasks_dict_info = {}
3993 db_nsr_update = {}
3994 stage = [
3995 "Stage 1/3: Preparing task.",
3996 "Waiting for previous operations to terminate.",
3997 "",
3998 ]
3999 # ^ contains [stage, step, VIM-status]
4000 try:
4001 # wait for any previous tasks in process
4002 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4003
4004 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4005 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4006 operation_params = db_nslcmop.get("operationParams") or {}
4007 if operation_params.get("timeout_ns_terminate"):
4008 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4009 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4010 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4011
4012 db_nsr_update["operational-status"] = "terminating"
4013 db_nsr_update["config-status"] = "terminating"
4014 self._write_ns_status(
4015 nsr_id=nsr_id,
4016 ns_state="TERMINATING",
4017 current_operation="TERMINATING",
4018 current_operation_id=nslcmop_id,
4019 other_update=db_nsr_update,
4020 )
4021 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4022 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4023 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4024 return
4025
4026 stage[1] = "Getting vnf descriptors from db."
4027 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4028 db_vnfrs_dict = {
4029 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4030 }
4031 db_vnfds_from_id = {}
4032 db_vnfds_from_member_index = {}
4033 # Loop over VNFRs
4034 for vnfr in db_vnfrs_list:
4035 vnfd_id = vnfr["vnfd-id"]
4036 if vnfd_id not in db_vnfds_from_id:
4037 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4038 db_vnfds_from_id[vnfd_id] = vnfd
4039 db_vnfds_from_member_index[
4040 vnfr["member-vnf-index-ref"]
4041 ] = db_vnfds_from_id[vnfd_id]
4042
4043 # Destroy individual execution environments when there are terminating primitives.
4044 # Rest of EE will be deleted at once
4045 # TODO - check before calling _destroy_N2VC
4046 # if not operation_params.get("skip_terminate_primitives"):#
4047 # or not vca.get("needed_terminate"):
4048 stage[0] = "Stage 2/3 execute terminating primitives."
4049 self.logger.debug(logging_text + stage[0])
4050 stage[1] = "Looking execution environment that needs terminate."
4051 self.logger.debug(logging_text + stage[1])
4052
4053 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4054 config_descriptor = None
4055 vca_member_vnf_index = vca.get("member-vnf-index")
4056 vca_id = self.get_vca_id(
4057 db_vnfrs_dict.get(vca_member_vnf_index)
4058 if vca_member_vnf_index
4059 else None,
4060 db_nsr,
4061 )
4062 if not vca or not vca.get("ee_id"):
4063 continue
4064 if not vca.get("member-vnf-index"):
4065 # ns
4066 config_descriptor = db_nsr.get("ns-configuration")
4067 elif vca.get("vdu_id"):
4068 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4069 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4070 elif vca.get("kdu_name"):
4071 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4072 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4073 else:
4074 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4075 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4076 vca_type = vca.get("type")
4077 exec_terminate_primitives = not operation_params.get(
4078 "skip_terminate_primitives"
4079 ) and vca.get("needed_terminate")
4080 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4081 # pending native charms
4082 destroy_ee = (
4083 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4084 )
4085 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4086 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4087 task = asyncio.ensure_future(
4088 self.destroy_N2VC(
4089 logging_text,
4090 db_nslcmop,
4091 vca,
4092 config_descriptor,
4093 vca_index,
4094 destroy_ee,
4095 exec_terminate_primitives,
4096 vca_id=vca_id,
4097 )
4098 )
4099 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4100
4101 # wait for pending tasks of terminate primitives
4102 if tasks_dict_info:
4103 self.logger.debug(
4104 logging_text
4105 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4106 )
4107 error_list = await self._wait_for_tasks(
4108 logging_text,
4109 tasks_dict_info,
4110 min(self.timeout_charm_delete, timeout_ns_terminate),
4111 stage,
4112 nslcmop_id,
4113 )
4114 tasks_dict_info.clear()
4115 if error_list:
4116 return # raise LcmException("; ".join(error_list))
4117
4118 # remove All execution environments at once
4119 stage[0] = "Stage 3/3 delete all."
4120
4121 if nsr_deployed.get("VCA"):
4122 stage[1] = "Deleting all execution environments."
4123 self.logger.debug(logging_text + stage[1])
4124 vca_id = self.get_vca_id({}, db_nsr)
4125 task_delete_ee = asyncio.ensure_future(
4126 asyncio.wait_for(
4127 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4128 timeout=self.timeout_charm_delete,
4129 )
4130 )
4131 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4132 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4133
4134 # Delete from k8scluster
4135 stage[1] = "Deleting KDUs."
4136 self.logger.debug(logging_text + stage[1])
4137 # print(nsr_deployed)
4138 for kdu in get_iterable(nsr_deployed, "K8s"):
4139 if not kdu or not kdu.get("kdu-instance"):
4140 continue
4141 kdu_instance = kdu.get("kdu-instance")
4142 if kdu.get("k8scluster-type") in self.k8scluster_map:
4143 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4144 vca_id = self.get_vca_id({}, db_nsr)
4145 task_delete_kdu_instance = asyncio.ensure_future(
4146 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4147 cluster_uuid=kdu.get("k8scluster-uuid"),
4148 kdu_instance=kdu_instance,
4149 vca_id=vca_id,
4150 )
4151 )
4152 else:
4153 self.logger.error(
4154 logging_text
4155 + "Unknown k8s deployment type {}".format(
4156 kdu.get("k8scluster-type")
4157 )
4158 )
4159 continue
4160 tasks_dict_info[
4161 task_delete_kdu_instance
4162 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4163
4164 # remove from RO
4165 stage[1] = "Deleting ns from VIM."
4166 if self.ng_ro:
4167 task_delete_ro = asyncio.ensure_future(
4168 self._terminate_ng_ro(
4169 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4170 )
4171 )
4172 else:
4173 task_delete_ro = asyncio.ensure_future(
4174 self._terminate_RO(
4175 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4176 )
4177 )
4178 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4179
4180 # rest of staff will be done at finally
4181
4182 except (
4183 ROclient.ROClientException,
4184 DbException,
4185 LcmException,
4186 N2VCException,
4187 ) as e:
4188 self.logger.error(logging_text + "Exit Exception {}".format(e))
4189 exc = e
4190 except asyncio.CancelledError:
4191 self.logger.error(
4192 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4193 )
4194 exc = "Operation was cancelled"
4195 except Exception as e:
4196 exc = traceback.format_exc()
4197 self.logger.critical(
4198 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4199 exc_info=True,
4200 )
4201 finally:
4202 if exc:
4203 error_list.append(str(exc))
4204 try:
4205 # wait for pending tasks
4206 if tasks_dict_info:
4207 stage[1] = "Waiting for terminate pending tasks."
4208 self.logger.debug(logging_text + stage[1])
4209 error_list += await self._wait_for_tasks(
4210 logging_text,
4211 tasks_dict_info,
4212 timeout_ns_terminate,
4213 stage,
4214 nslcmop_id,
4215 )
4216 stage[1] = stage[2] = ""
4217 except asyncio.CancelledError:
4218 error_list.append("Cancelled")
4219 # TODO cancell all tasks
4220 except Exception as exc:
4221 error_list.append(str(exc))
4222 # update status at database
4223 if error_list:
4224 error_detail = "; ".join(error_list)
4225 # self.logger.error(logging_text + error_detail)
4226 error_description_nslcmop = "{} Detail: {}".format(
4227 stage[0], error_detail
4228 )
4229 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4230 nslcmop_id, stage[0]
4231 )
4232
4233 db_nsr_update["operational-status"] = "failed"
4234 db_nsr_update["detailed-status"] = (
4235 error_description_nsr + " Detail: " + error_detail
4236 )
4237 db_nslcmop_update["detailed-status"] = error_detail
4238 nslcmop_operation_state = "FAILED"
4239 ns_state = "BROKEN"
4240 else:
4241 error_detail = None
4242 error_description_nsr = error_description_nslcmop = None
4243 ns_state = "NOT_INSTANTIATED"
4244 db_nsr_update["operational-status"] = "terminated"
4245 db_nsr_update["detailed-status"] = "Done"
4246 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4247 db_nslcmop_update["detailed-status"] = "Done"
4248 nslcmop_operation_state = "COMPLETED"
4249
4250 if db_nsr:
4251 self._write_ns_status(
4252 nsr_id=nsr_id,
4253 ns_state=ns_state,
4254 current_operation="IDLE",
4255 current_operation_id=None,
4256 error_description=error_description_nsr,
4257 error_detail=error_detail,
4258 other_update=db_nsr_update,
4259 )
4260 self._write_op_status(
4261 op_id=nslcmop_id,
4262 stage="",
4263 error_message=error_description_nslcmop,
4264 operation_state=nslcmop_operation_state,
4265 other_update=db_nslcmop_update,
4266 )
4267 if ns_state == "NOT_INSTANTIATED":
4268 try:
4269 self.db.set_list(
4270 "vnfrs",
4271 {"nsr-id-ref": nsr_id},
4272 {"_admin.nsState": "NOT_INSTANTIATED"},
4273 )
4274 except DbException as e:
4275 self.logger.warn(
4276 logging_text
4277 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4278 nsr_id, e
4279 )
4280 )
4281 if operation_params:
4282 autoremove = operation_params.get("autoremove", False)
4283 if nslcmop_operation_state:
4284 try:
4285 await self.msg.aiowrite(
4286 "ns",
4287 "terminated",
4288 {
4289 "nsr_id": nsr_id,
4290 "nslcmop_id": nslcmop_id,
4291 "operationState": nslcmop_operation_state,
4292 "autoremove": autoremove,
4293 },
4294 loop=self.loop,
4295 )
4296 except Exception as e:
4297 self.logger.error(
4298 logging_text + "kafka_write notification Exception {}".format(e)
4299 )
4300
4301 self.logger.debug(logging_text + "Exit")
4302 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4303
4304 async def _wait_for_tasks(
4305 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4306 ):
4307 time_start = time()
4308 error_detail_list = []
4309 error_list = []
4310 pending_tasks = list(created_tasks_info.keys())
4311 num_tasks = len(pending_tasks)
4312 num_done = 0
4313 stage[1] = "{}/{}.".format(num_done, num_tasks)
4314 self._write_op_status(nslcmop_id, stage)
4315 while pending_tasks:
4316 new_error = None
4317 _timeout = timeout + time_start - time()
4318 done, pending_tasks = await asyncio.wait(
4319 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4320 )
4321 num_done += len(done)
4322 if not done: # Timeout
4323 for task in pending_tasks:
4324 new_error = created_tasks_info[task] + ": Timeout"
4325 error_detail_list.append(new_error)
4326 error_list.append(new_error)
4327 break
4328 for task in done:
4329 if task.cancelled():
4330 exc = "Cancelled"
4331 else:
4332 exc = task.exception()
4333 if exc:
4334 if isinstance(exc, asyncio.TimeoutError):
4335 exc = "Timeout"
4336 new_error = created_tasks_info[task] + ": {}".format(exc)
4337 error_list.append(created_tasks_info[task])
4338 error_detail_list.append(new_error)
4339 if isinstance(
4340 exc,
4341 (
4342 str,
4343 DbException,
4344 N2VCException,
4345 ROclient.ROClientException,
4346 LcmException,
4347 K8sException,
4348 NgRoException,
4349 ),
4350 ):
4351 self.logger.error(logging_text + new_error)
4352 else:
4353 exc_traceback = "".join(
4354 traceback.format_exception(None, exc, exc.__traceback__)
4355 )
4356 self.logger.error(
4357 logging_text
4358 + created_tasks_info[task]
4359 + " "
4360 + exc_traceback
4361 )
4362 else:
4363 self.logger.debug(
4364 logging_text + created_tasks_info[task] + ": Done"
4365 )
4366 stage[1] = "{}/{}.".format(num_done, num_tasks)
4367 if new_error:
4368 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4369 if nsr_id: # update also nsr
4370 self.update_db_2(
4371 "nsrs",
4372 nsr_id,
4373 {
4374 "errorDescription": "Error at: " + ", ".join(error_list),
4375 "errorDetail": ". ".join(error_detail_list),
4376 },
4377 )
4378 self._write_op_status(nslcmop_id, stage)
4379 return error_detail_list
4380
4381 @staticmethod
4382 def _map_primitive_params(primitive_desc, params, instantiation_params):
4383 """
4384 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4385 The default-value is used. If it is between < > it look for a value at instantiation_params
4386 :param primitive_desc: portion of VNFD/NSD that describes primitive
4387 :param params: Params provided by user
4388 :param instantiation_params: Instantiation params provided by user
4389 :return: a dictionary with the calculated params
4390 """
4391 calculated_params = {}
4392 for parameter in primitive_desc.get("parameter", ()):
4393 param_name = parameter["name"]
4394 if param_name in params:
4395 calculated_params[param_name] = params[param_name]
4396 elif "default-value" in parameter or "value" in parameter:
4397 if "value" in parameter:
4398 calculated_params[param_name] = parameter["value"]
4399 else:
4400 calculated_params[param_name] = parameter["default-value"]
4401 if (
4402 isinstance(calculated_params[param_name], str)
4403 and calculated_params[param_name].startswith("<")
4404 and calculated_params[param_name].endswith(">")
4405 ):
4406 if calculated_params[param_name][1:-1] in instantiation_params:
4407 calculated_params[param_name] = instantiation_params[
4408 calculated_params[param_name][1:-1]
4409 ]
4410 else:
4411 raise LcmException(
4412 "Parameter {} needed to execute primitive {} not provided".format(
4413 calculated_params[param_name], primitive_desc["name"]
4414 )
4415 )
4416 else:
4417 raise LcmException(
4418 "Parameter {} needed to execute primitive {} not provided".format(
4419 param_name, primitive_desc["name"]
4420 )
4421 )
4422
4423 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4424 calculated_params[param_name] = yaml.safe_dump(
4425 calculated_params[param_name], default_flow_style=True, width=256
4426 )
4427 elif isinstance(calculated_params[param_name], str) and calculated_params[
4428 param_name
4429 ].startswith("!!yaml "):
4430 calculated_params[param_name] = calculated_params[param_name][7:]
4431 if parameter.get("data-type") == "INTEGER":
4432 try:
4433 calculated_params[param_name] = int(calculated_params[param_name])
4434 except ValueError: # error converting string to int
4435 raise LcmException(
4436 "Parameter {} of primitive {} must be integer".format(
4437 param_name, primitive_desc["name"]
4438 )
4439 )
4440 elif parameter.get("data-type") == "BOOLEAN":
4441 calculated_params[param_name] = not (
4442 (str(calculated_params[param_name])).lower() == "false"
4443 )
4444
4445 # add always ns_config_info if primitive name is config
4446 if primitive_desc["name"] == "config":
4447 if "ns_config_info" in instantiation_params:
4448 calculated_params["ns_config_info"] = instantiation_params[
4449 "ns_config_info"
4450 ]
4451 return calculated_params
4452
4453 def _look_for_deployed_vca(
4454 self,
4455 deployed_vca,
4456 member_vnf_index,
4457 vdu_id,
4458 vdu_count_index,
4459 kdu_name=None,
4460 ee_descriptor_id=None,
4461 ):
4462 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4463 for vca in deployed_vca:
4464 if not vca:
4465 continue
4466 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4467 continue
4468 if (
4469 vdu_count_index is not None
4470 and vdu_count_index != vca["vdu_count_index"]
4471 ):
4472 continue
4473 if kdu_name and kdu_name != vca["kdu_name"]:
4474 continue
4475 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4476 continue
4477 break
4478 else:
4479 # vca_deployed not found
4480 raise LcmException(
4481 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4482 " is not deployed".format(
4483 member_vnf_index,
4484 vdu_id,
4485 vdu_count_index,
4486 kdu_name,
4487 ee_descriptor_id,
4488 )
4489 )
4490 # get ee_id
4491 ee_id = vca.get("ee_id")
4492 vca_type = vca.get(
4493 "type", "lxc_proxy_charm"
4494 ) # default value for backward compatibility - proxy charm
4495 if not ee_id:
4496 raise LcmException(
4497 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4498 "execution environment".format(
4499 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4500 )
4501 )
4502 return ee_id, vca_type
4503
4504 async def _ns_execute_primitive(
4505 self,
4506 ee_id,
4507 primitive,
4508 primitive_params,
4509 retries=0,
4510 retries_interval=30,
4511 timeout=None,
4512 vca_type=None,
4513 db_dict=None,
4514 vca_id: str = None,
4515 ) -> (str, str):
4516 try:
4517 if primitive == "config":
4518 primitive_params = {"params": primitive_params}
4519
4520 vca_type = vca_type or "lxc_proxy_charm"
4521
4522 while retries >= 0:
4523 try:
4524 output = await asyncio.wait_for(
4525 self.vca_map[vca_type].exec_primitive(
4526 ee_id=ee_id,
4527 primitive_name=primitive,
4528 params_dict=primitive_params,
4529 progress_timeout=self.timeout_progress_primitive,
4530 total_timeout=self.timeout_primitive,
4531 db_dict=db_dict,
4532 vca_id=vca_id,
4533 vca_type=vca_type,
4534 ),
4535 timeout=timeout or self.timeout_primitive,
4536 )
4537 # execution was OK
4538 break
4539 except asyncio.CancelledError:
4540 raise
4541 except Exception as e: # asyncio.TimeoutError
4542 if isinstance(e, asyncio.TimeoutError):
4543 e = "Timeout"
4544 retries -= 1
4545 if retries >= 0:
4546 self.logger.debug(
4547 "Error executing action {} on {} -> {}".format(
4548 primitive, ee_id, e
4549 )
4550 )
4551 # wait and retry
4552 await asyncio.sleep(retries_interval, loop=self.loop)
4553 else:
4554 return "FAILED", str(e)
4555
4556 return "COMPLETED", output
4557
4558 except (LcmException, asyncio.CancelledError):
4559 raise
4560 except Exception as e:
4561 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4562
4563 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4564 """
4565 Updating the vca_status with latest juju information in nsrs record
4566 :param: nsr_id: Id of the nsr
4567 :param: nslcmop_id: Id of the nslcmop
4568 :return: None
4569 """
4570
4571 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4572 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4573 vca_id = self.get_vca_id({}, db_nsr)
4574 if db_nsr["_admin"]["deployed"]["K8s"]:
4575 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4576 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4577 await self._on_update_k8s_db(
4578 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4579 )
4580 else:
4581 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4582 table, filter = "nsrs", {"_id": nsr_id}
4583 path = "_admin.deployed.VCA.{}.".format(vca_index)
4584 await self._on_update_n2vc_db(table, filter, path, {})
4585
4586 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4587 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4588
4589 async def action(self, nsr_id, nslcmop_id):
4590 # Try to lock HA task here
4591 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4592 if not task_is_locked_by_me:
4593 return
4594
4595 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4596 self.logger.debug(logging_text + "Enter")
4597 # get all needed from database
4598 db_nsr = None
4599 db_nslcmop = None
4600 db_nsr_update = {}
4601 db_nslcmop_update = {}
4602 nslcmop_operation_state = None
4603 error_description_nslcmop = None
4604 exc = None
4605 try:
4606 # wait for any previous tasks in process
4607 step = "Waiting for previous operations to terminate"
4608 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4609
4610 self._write_ns_status(
4611 nsr_id=nsr_id,
4612 ns_state=None,
4613 current_operation="RUNNING ACTION",
4614 current_operation_id=nslcmop_id,
4615 )
4616
4617 step = "Getting information from database"
4618 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4619 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4620
4621 nsr_deployed = db_nsr["_admin"].get("deployed")
4622 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4623 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4624 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4625 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4626 primitive = db_nslcmop["operationParams"]["primitive"]
4627 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4628 timeout_ns_action = db_nslcmop["operationParams"].get(
4629 "timeout_ns_action", self.timeout_primitive
4630 )
4631
4632 if vnf_index:
4633 step = "Getting vnfr from database"
4634 db_vnfr = self.db.get_one(
4635 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4636 )
4637 step = "Getting vnfd from database"
4638 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4639 else:
4640 step = "Getting nsd from database"
4641 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4642
4643 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4644 # for backward compatibility
4645 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4646 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4647 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4648 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4649
4650 # look for primitive
4651 config_primitive_desc = descriptor_configuration = None
4652 if vdu_id:
4653 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4654 elif kdu_name:
4655 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4656 elif vnf_index:
4657 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4658 else:
4659 descriptor_configuration = db_nsd.get("ns-configuration")
4660
4661 if descriptor_configuration and descriptor_configuration.get(
4662 "config-primitive"
4663 ):
4664 for config_primitive in descriptor_configuration["config-primitive"]:
4665 if config_primitive["name"] == primitive:
4666 config_primitive_desc = config_primitive
4667 break
4668
4669 if not config_primitive_desc:
4670 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4671 raise LcmException(
4672 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4673 primitive
4674 )
4675 )
4676 primitive_name = primitive
4677 ee_descriptor_id = None
4678 else:
4679 primitive_name = config_primitive_desc.get(
4680 "execution-environment-primitive", primitive
4681 )
4682 ee_descriptor_id = config_primitive_desc.get(
4683 "execution-environment-ref"
4684 )
4685
4686 if vnf_index:
4687 if vdu_id:
4688 vdur = next(
4689 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4690 )
4691 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4692 elif kdu_name:
4693 kdur = next(
4694 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4695 )
4696 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4697 else:
4698 desc_params = parse_yaml_strings(
4699 db_vnfr.get("additionalParamsForVnf")
4700 )
4701 else:
4702 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4703 if kdu_name and get_configuration(db_vnfd, kdu_name):
4704 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4705 actions = set()
4706 for primitive in kdu_configuration.get("initial-config-primitive", []):
4707 actions.add(primitive["name"])
4708 for primitive in kdu_configuration.get("config-primitive", []):
4709 actions.add(primitive["name"])
4710 kdu_action = True if primitive_name in actions else False
4711
4712 # TODO check if ns is in a proper status
4713 if kdu_name and (
4714 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4715 ):
4716 # kdur and desc_params already set from before
4717 if primitive_params:
4718 desc_params.update(primitive_params)
4719 # TODO Check if we will need something at vnf level
4720 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4721 if (
4722 kdu_name == kdu["kdu-name"]
4723 and kdu["member-vnf-index"] == vnf_index
4724 ):
4725 break
4726 else:
4727 raise LcmException(
4728 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4729 )
4730
4731 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4732 msg = "unknown k8scluster-type '{}'".format(
4733 kdu.get("k8scluster-type")
4734 )
4735 raise LcmException(msg)
4736
4737 db_dict = {
4738 "collection": "nsrs",
4739 "filter": {"_id": nsr_id},
4740 "path": "_admin.deployed.K8s.{}".format(index),
4741 }
4742 self.logger.debug(
4743 logging_text
4744 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4745 )
4746 step = "Executing kdu {}".format(primitive_name)
4747 if primitive_name == "upgrade":
4748 if desc_params.get("kdu_model"):
4749 kdu_model = desc_params.get("kdu_model")
4750 del desc_params["kdu_model"]
4751 else:
4752 kdu_model = kdu.get("kdu-model")
4753 parts = kdu_model.split(sep=":")
4754 if len(parts) == 2:
4755 kdu_model = parts[0]
4756
4757 detailed_status = await asyncio.wait_for(
4758 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4759 cluster_uuid=kdu.get("k8scluster-uuid"),
4760 kdu_instance=kdu.get("kdu-instance"),
4761 atomic=True,
4762 kdu_model=kdu_model,
4763 params=desc_params,
4764 db_dict=db_dict,
4765 timeout=timeout_ns_action,
4766 ),
4767 timeout=timeout_ns_action + 10,
4768 )
4769 self.logger.debug(
4770 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4771 )
4772 elif primitive_name == "rollback":
4773 detailed_status = await asyncio.wait_for(
4774 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4775 cluster_uuid=kdu.get("k8scluster-uuid"),
4776 kdu_instance=kdu.get("kdu-instance"),
4777 db_dict=db_dict,
4778 ),
4779 timeout=timeout_ns_action,
4780 )
4781 elif primitive_name == "status":
4782 detailed_status = await asyncio.wait_for(
4783 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4784 cluster_uuid=kdu.get("k8scluster-uuid"),
4785 kdu_instance=kdu.get("kdu-instance"),
4786 vca_id=vca_id,
4787 ),
4788 timeout=timeout_ns_action,
4789 )
4790 else:
4791 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4792 kdu["kdu-name"], nsr_id
4793 )
4794 params = self._map_primitive_params(
4795 config_primitive_desc, primitive_params, desc_params
4796 )
4797
4798 detailed_status = await asyncio.wait_for(
4799 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4800 cluster_uuid=kdu.get("k8scluster-uuid"),
4801 kdu_instance=kdu_instance,
4802 primitive_name=primitive_name,
4803 params=params,
4804 db_dict=db_dict,
4805 timeout=timeout_ns_action,
4806 vca_id=vca_id,
4807 ),
4808 timeout=timeout_ns_action,
4809 )
4810
4811 if detailed_status:
4812 nslcmop_operation_state = "COMPLETED"
4813 else:
4814 detailed_status = ""
4815 nslcmop_operation_state = "FAILED"
4816 else:
4817 ee_id, vca_type = self._look_for_deployed_vca(
4818 nsr_deployed["VCA"],
4819 member_vnf_index=vnf_index,
4820 vdu_id=vdu_id,
4821 vdu_count_index=vdu_count_index,
4822 ee_descriptor_id=ee_descriptor_id,
4823 )
4824 for vca_index, vca_deployed in enumerate(
4825 db_nsr["_admin"]["deployed"]["VCA"]
4826 ):
4827 if vca_deployed.get("member-vnf-index") == vnf_index:
4828 db_dict = {
4829 "collection": "nsrs",
4830 "filter": {"_id": nsr_id},
4831 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4832 }
4833 break
4834 (
4835 nslcmop_operation_state,
4836 detailed_status,
4837 ) = await self._ns_execute_primitive(
4838 ee_id,
4839 primitive=primitive_name,
4840 primitive_params=self._map_primitive_params(
4841 config_primitive_desc, primitive_params, desc_params
4842 ),
4843 timeout=timeout_ns_action,
4844 vca_type=vca_type,
4845 db_dict=db_dict,
4846 vca_id=vca_id,
4847 )
4848
4849 db_nslcmop_update["detailed-status"] = detailed_status
4850 error_description_nslcmop = (
4851 detailed_status if nslcmop_operation_state == "FAILED" else ""
4852 )
4853 self.logger.debug(
4854 logging_text
4855 + " task Done with result {} {}".format(
4856 nslcmop_operation_state, detailed_status
4857 )
4858 )
4859 return # database update is called inside finally
4860
4861 except (DbException, LcmException, N2VCException, K8sException) as e:
4862 self.logger.error(logging_text + "Exit Exception {}".format(e))
4863 exc = e
4864 except asyncio.CancelledError:
4865 self.logger.error(
4866 logging_text + "Cancelled Exception while '{}'".format(step)
4867 )
4868 exc = "Operation was cancelled"
4869 except asyncio.TimeoutError:
4870 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4871 exc = "Timeout"
4872 except Exception as e:
4873 exc = traceback.format_exc()
4874 self.logger.critical(
4875 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4876 exc_info=True,
4877 )
4878 finally:
4879 if exc:
4880 db_nslcmop_update[
4881 "detailed-status"
4882 ] = (
4883 detailed_status
4884 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4885 nslcmop_operation_state = "FAILED"
4886 if db_nsr:
4887 self._write_ns_status(
4888 nsr_id=nsr_id,
4889 ns_state=db_nsr[
4890 "nsState"
4891 ], # TODO check if degraded. For the moment use previous status
4892 current_operation="IDLE",
4893 current_operation_id=None,
4894 # error_description=error_description_nsr,
4895 # error_detail=error_detail,
4896 other_update=db_nsr_update,
4897 )
4898
4899 self._write_op_status(
4900 op_id=nslcmop_id,
4901 stage="",
4902 error_message=error_description_nslcmop,
4903 operation_state=nslcmop_operation_state,
4904 other_update=db_nslcmop_update,
4905 )
4906
4907 if nslcmop_operation_state:
4908 try:
4909 await self.msg.aiowrite(
4910 "ns",
4911 "actioned",
4912 {
4913 "nsr_id": nsr_id,
4914 "nslcmop_id": nslcmop_id,
4915 "operationState": nslcmop_operation_state,
4916 },
4917 loop=self.loop,
4918 )
4919 except Exception as e:
4920 self.logger.error(
4921 logging_text + "kafka_write notification Exception {}".format(e)
4922 )
4923 self.logger.debug(logging_text + "Exit")
4924 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4925 return nslcmop_operation_state, detailed_status
4926
4927 async def scale(self, nsr_id, nslcmop_id):
4928 # Try to lock HA task here
4929 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4930 if not task_is_locked_by_me:
4931 return
4932
4933 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4934 stage = ["", "", ""]
4935 tasks_dict_info = {}
4936 # ^ stage, step, VIM progress
4937 self.logger.debug(logging_text + "Enter")
4938 # get all needed from database
4939 db_nsr = None
4940 db_nslcmop_update = {}
4941 db_nsr_update = {}
4942 exc = None
4943 # in case of error, indicates what part of scale was failed to put nsr at error status
4944 scale_process = None
4945 old_operational_status = ""
4946 old_config_status = ""
4947 nsi_id = None
4948 try:
4949 # wait for any previous tasks in process
4950 step = "Waiting for previous operations to terminate"
4951 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4952 self._write_ns_status(
4953 nsr_id=nsr_id,
4954 ns_state=None,
4955 current_operation="SCALING",
4956 current_operation_id=nslcmop_id,
4957 )
4958
4959 step = "Getting nslcmop from database"
4960 self.logger.debug(
4961 step + " after having waited for previous tasks to be completed"
4962 )
4963 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4964
4965 step = "Getting nsr from database"
4966 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4967 old_operational_status = db_nsr["operational-status"]
4968 old_config_status = db_nsr["config-status"]
4969
4970 step = "Parsing scaling parameters"
4971 db_nsr_update["operational-status"] = "scaling"
4972 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4973 nsr_deployed = db_nsr["_admin"].get("deployed")
4974
4975 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
4976 "scaleByStepData"
4977 ]["member-vnf-index"]
4978 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
4979 "scaleByStepData"
4980 ]["scaling-group-descriptor"]
4981 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
4982 # for backward compatibility
4983 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4984 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4985 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4986 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4987
4988 step = "Getting vnfr from database"
4989 db_vnfr = self.db.get_one(
4990 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4991 )
4992
4993 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4994
4995 step = "Getting vnfd from database"
4996 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4997
4998 base_folder = db_vnfd["_admin"]["storage"]
4999
5000 step = "Getting scaling-group-descriptor"
5001 scaling_descriptor = find_in_list(
5002 get_scaling_aspect(db_vnfd),
5003 lambda scale_desc: scale_desc["name"] == scaling_group,
5004 )
5005 if not scaling_descriptor:
5006 raise LcmException(
5007 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5008 "at vnfd:scaling-group-descriptor".format(scaling_group)
5009 )
5010
5011 step = "Sending scale order to VIM"
5012 # TODO check if ns is in a proper status
5013 nb_scale_op = 0
5014 if not db_nsr["_admin"].get("scaling-group"):
5015 self.update_db_2(
5016 "nsrs",
5017 nsr_id,
5018 {
5019 "_admin.scaling-group": [
5020 {"name": scaling_group, "nb-scale-op": 0}
5021 ]
5022 },
5023 )
5024 admin_scale_index = 0
5025 else:
5026 for admin_scale_index, admin_scale_info in enumerate(
5027 db_nsr["_admin"]["scaling-group"]
5028 ):
5029 if admin_scale_info["name"] == scaling_group:
5030 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5031 break
5032 else: # not found, set index one plus last element and add new entry with the name
5033 admin_scale_index += 1
5034 db_nsr_update[
5035 "_admin.scaling-group.{}.name".format(admin_scale_index)
5036 ] = scaling_group
5037
5038 vca_scaling_info = []
5039 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5040 if scaling_type == "SCALE_OUT":
5041 if "aspect-delta-details" not in scaling_descriptor:
5042 raise LcmException(
5043 "Aspect delta details not fount in scaling descriptor {}".format(
5044 scaling_descriptor["name"]
5045 )
5046 )
5047 # count if max-instance-count is reached
5048 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5049
5050 scaling_info["scaling_direction"] = "OUT"
5051 scaling_info["vdu-create"] = {}
5052 scaling_info["kdu-create"] = {}
5053 for delta in deltas:
5054 for vdu_delta in delta.get("vdu-delta", {}):
5055 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5056 # vdu_index also provides the number of instance of the targeted vdu
5057 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5058 cloud_init_text = self._get_vdu_cloud_init_content(
5059 vdud, db_vnfd
5060 )
5061 if cloud_init_text:
5062 additional_params = (
5063 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5064 or {}
5065 )
5066 cloud_init_list = []
5067
5068 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5069 max_instance_count = 10
5070 if vdu_profile and "max-number-of-instances" in vdu_profile:
5071 max_instance_count = vdu_profile.get(
5072 "max-number-of-instances", 10
5073 )
5074
5075 default_instance_num = get_number_of_instances(
5076 db_vnfd, vdud["id"]
5077 )
5078 instances_number = vdu_delta.get("number-of-instances", 1)
5079 nb_scale_op += instances_number
5080
5081 new_instance_count = nb_scale_op + default_instance_num
5082 # Control if new count is over max and vdu count is less than max.
5083 # Then assign new instance count
5084 if new_instance_count > max_instance_count > vdu_count:
5085 instances_number = new_instance_count - max_instance_count
5086 else:
5087 instances_number = instances_number
5088
5089 if new_instance_count > max_instance_count:
5090 raise LcmException(
5091 "reached the limit of {} (max-instance-count) "
5092 "scaling-out operations for the "
5093 "scaling-group-descriptor '{}'".format(
5094 nb_scale_op, scaling_group
5095 )
5096 )
5097 for x in range(vdu_delta.get("number-of-instances", 1)):
5098 if cloud_init_text:
5099 # TODO Information of its own ip is not available because db_vnfr is not updated.
5100 additional_params["OSM"] = get_osm_params(
5101 db_vnfr, vdu_delta["id"], vdu_index + x
5102 )
5103 cloud_init_list.append(
5104 self._parse_cloud_init(
5105 cloud_init_text,
5106 additional_params,
5107 db_vnfd["id"],
5108 vdud["id"],
5109 )
5110 )
5111 vca_scaling_info.append(
5112 {
5113 "osm_vdu_id": vdu_delta["id"],
5114 "member-vnf-index": vnf_index,
5115 "type": "create",
5116 "vdu_index": vdu_index + x,
5117 }
5118 )
5119 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5120 for kdu_delta in delta.get("kdu-resource-delta", {}):
5121 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5122 kdu_name = kdu_profile["kdu-name"]
5123 resource_name = kdu_profile["resource-name"]
5124
5125 # Might have different kdus in the same delta
5126 # Should have list for each kdu
5127 if not scaling_info["kdu-create"].get(kdu_name, None):
5128 scaling_info["kdu-create"][kdu_name] = []
5129
5130 kdur = get_kdur(db_vnfr, kdu_name)
5131 if kdur.get("helm-chart"):
5132 k8s_cluster_type = "helm-chart-v3"
5133 self.logger.debug("kdur: {}".format(kdur))
5134 if (
5135 kdur.get("helm-version")
5136 and kdur.get("helm-version") == "v2"
5137 ):
5138 k8s_cluster_type = "helm-chart"
5139 raise NotImplementedError
5140 elif kdur.get("juju-bundle"):
5141 k8s_cluster_type = "juju-bundle"
5142 else:
5143 raise LcmException(
5144 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5145 "juju-bundle. Maybe an old NBI version is running".format(
5146 db_vnfr["member-vnf-index-ref"], kdu_name
5147 )
5148 )
5149
5150 max_instance_count = 10
5151 if kdu_profile and "max-number-of-instances" in kdu_profile:
5152 max_instance_count = kdu_profile.get(
5153 "max-number-of-instances", 10
5154 )
5155
5156 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5157 deployed_kdu, _ = get_deployed_kdu(
5158 nsr_deployed, kdu_name, vnf_index
5159 )
5160 if deployed_kdu is None:
5161 raise LcmException(
5162 "KDU '{}' for vnf '{}' not deployed".format(
5163 kdu_name, vnf_index
5164 )
5165 )
5166 kdu_instance = deployed_kdu.get("kdu-instance")
5167 instance_num = await self.k8scluster_map[
5168 k8s_cluster_type
5169 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5170 kdu_replica_count = instance_num + kdu_delta.get(
5171 "number-of-instances", 1
5172 )
5173
5174 # Control if new count is over max and instance_num is less than max.
5175 # Then assign max instance number to kdu replica count
5176 if kdu_replica_count > max_instance_count > instance_num:
5177 kdu_replica_count = max_instance_count
5178 if kdu_replica_count > max_instance_count:
5179 raise LcmException(
5180 "reached the limit of {} (max-instance-count) "
5181 "scaling-out operations for the "
5182 "scaling-group-descriptor '{}'".format(
5183 instance_num, scaling_group
5184 )
5185 )
5186
5187 for x in range(kdu_delta.get("number-of-instances", 1)):
5188 vca_scaling_info.append(
5189 {
5190 "osm_kdu_id": kdu_name,
5191 "member-vnf-index": vnf_index,
5192 "type": "create",
5193 "kdu_index": instance_num + x - 1,
5194 }
5195 )
5196 scaling_info["kdu-create"][kdu_name].append(
5197 {
5198 "member-vnf-index": vnf_index,
5199 "type": "create",
5200 "k8s-cluster-type": k8s_cluster_type,
5201 "resource-name": resource_name,
5202 "scale": kdu_replica_count,
5203 }
5204 )
5205 elif scaling_type == "SCALE_IN":
5206 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5207
5208 scaling_info["scaling_direction"] = "IN"
5209 scaling_info["vdu-delete"] = {}
5210 scaling_info["kdu-delete"] = {}
5211
5212 for delta in deltas:
5213 for vdu_delta in delta.get("vdu-delta", {}):
5214 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5215 min_instance_count = 0
5216 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5217 if vdu_profile and "min-number-of-instances" in vdu_profile:
5218 min_instance_count = vdu_profile["min-number-of-instances"]
5219
5220 default_instance_num = get_number_of_instances(
5221 db_vnfd, vdu_delta["id"]
5222 )
5223 instance_num = vdu_delta.get("number-of-instances", 1)
5224 nb_scale_op -= instance_num
5225
5226 new_instance_count = nb_scale_op + default_instance_num
5227
5228 if new_instance_count < min_instance_count < vdu_count:
5229 instances_number = min_instance_count - new_instance_count
5230 else:
5231 instances_number = instance_num
5232
5233 if new_instance_count < min_instance_count:
5234 raise LcmException(
5235 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5236 "scaling-group-descriptor '{}'".format(
5237 nb_scale_op, scaling_group
5238 )
5239 )
5240 for x in range(vdu_delta.get("number-of-instances", 1)):
5241 vca_scaling_info.append(
5242 {
5243 "osm_vdu_id": vdu_delta["id"],
5244 "member-vnf-index": vnf_index,
5245 "type": "delete",
5246 "vdu_index": vdu_index - 1 - x,
5247 }
5248 )
5249 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5250 for kdu_delta in delta.get("kdu-resource-delta", {}):
5251 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5252 kdu_name = kdu_profile["kdu-name"]
5253 resource_name = kdu_profile["resource-name"]
5254
5255 if not scaling_info["kdu-delete"].get(kdu_name, None):
5256 scaling_info["kdu-delete"][kdu_name] = []
5257
5258 kdur = get_kdur(db_vnfr, kdu_name)
5259 if kdur.get("helm-chart"):
5260 k8s_cluster_type = "helm-chart-v3"
5261 self.logger.debug("kdur: {}".format(kdur))
5262 if (
5263 kdur.get("helm-version")
5264 and kdur.get("helm-version") == "v2"
5265 ):
5266 k8s_cluster_type = "helm-chart"
5267 raise NotImplementedError
5268 elif kdur.get("juju-bundle"):
5269 k8s_cluster_type = "juju-bundle"
5270 else:
5271 raise LcmException(
5272 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5273 "juju-bundle. Maybe an old NBI version is running".format(
5274 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5275 )
5276 )
5277
5278 min_instance_count = 0
5279 if kdu_profile and "min-number-of-instances" in kdu_profile:
5280 min_instance_count = kdu_profile["min-number-of-instances"]
5281
5282 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5283 deployed_kdu, _ = get_deployed_kdu(
5284 nsr_deployed, kdu_name, vnf_index
5285 )
5286 if deployed_kdu is None:
5287 raise LcmException(
5288 "KDU '{}' for vnf '{}' not deployed".format(
5289 kdu_name, vnf_index
5290 )
5291 )
5292 kdu_instance = deployed_kdu.get("kdu-instance")
5293 instance_num = await self.k8scluster_map[
5294 k8s_cluster_type
5295 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5296 kdu_replica_count = instance_num - kdu_delta.get(
5297 "number-of-instances", 1
5298 )
5299
5300 if kdu_replica_count < min_instance_count < instance_num:
5301 kdu_replica_count = min_instance_count
5302 if kdu_replica_count < min_instance_count:
5303 raise LcmException(
5304 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5305 "scaling-group-descriptor '{}'".format(
5306 instance_num, scaling_group
5307 )
5308 )
5309
5310 for x in range(kdu_delta.get("number-of-instances", 1)):
5311 vca_scaling_info.append(
5312 {
5313 "osm_kdu_id": kdu_name,
5314 "member-vnf-index": vnf_index,
5315 "type": "delete",
5316 "kdu_index": instance_num - x - 1,
5317 }
5318 )
5319 scaling_info["kdu-delete"][kdu_name].append(
5320 {
5321 "member-vnf-index": vnf_index,
5322 "type": "delete",
5323 "k8s-cluster-type": k8s_cluster_type,
5324 "resource-name": resource_name,
5325 "scale": kdu_replica_count,
5326 }
5327 )
5328
5329 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5330 vdu_delete = copy(scaling_info.get("vdu-delete"))
5331 if scaling_info["scaling_direction"] == "IN":
5332 for vdur in reversed(db_vnfr["vdur"]):
5333 if vdu_delete.get(vdur["vdu-id-ref"]):
5334 vdu_delete[vdur["vdu-id-ref"]] -= 1
5335 scaling_info["vdu"].append(
5336 {
5337 "name": vdur.get("name") or vdur.get("vdu-name"),
5338 "vdu_id": vdur["vdu-id-ref"],
5339 "interface": [],
5340 }
5341 )
5342 for interface in vdur["interfaces"]:
5343 scaling_info["vdu"][-1]["interface"].append(
5344 {
5345 "name": interface["name"],
5346 "ip_address": interface["ip-address"],
5347 "mac_address": interface.get("mac-address"),
5348 }
5349 )
5350 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5351
5352 # PRE-SCALE BEGIN
5353 step = "Executing pre-scale vnf-config-primitive"
5354 if scaling_descriptor.get("scaling-config-action"):
5355 for scaling_config_action in scaling_descriptor[
5356 "scaling-config-action"
5357 ]:
5358 if (
5359 scaling_config_action.get("trigger") == "pre-scale-in"
5360 and scaling_type == "SCALE_IN"
5361 ) or (
5362 scaling_config_action.get("trigger") == "pre-scale-out"
5363 and scaling_type == "SCALE_OUT"
5364 ):
5365 vnf_config_primitive = scaling_config_action[
5366 "vnf-config-primitive-name-ref"
5367 ]
5368 step = db_nslcmop_update[
5369 "detailed-status"
5370 ] = "executing pre-scale scaling-config-action '{}'".format(
5371 vnf_config_primitive
5372 )
5373
5374 # look for primitive
5375 for config_primitive in (
5376 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5377 ).get("config-primitive", ()):
5378 if config_primitive["name"] == vnf_config_primitive:
5379 break
5380 else:
5381 raise LcmException(
5382 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5383 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5384 "primitive".format(scaling_group, vnf_config_primitive)
5385 )
5386
5387 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5388 if db_vnfr.get("additionalParamsForVnf"):
5389 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5390
5391 scale_process = "VCA"
5392 db_nsr_update["config-status"] = "configuring pre-scaling"
5393 primitive_params = self._map_primitive_params(
5394 config_primitive, {}, vnfr_params
5395 )
5396
5397 # Pre-scale retry check: Check if this sub-operation has been executed before
5398 op_index = self._check_or_add_scale_suboperation(
5399 db_nslcmop,
5400 vnf_index,
5401 vnf_config_primitive,
5402 primitive_params,
5403 "PRE-SCALE",
5404 )
5405 if op_index == self.SUBOPERATION_STATUS_SKIP:
5406 # Skip sub-operation
5407 result = "COMPLETED"
5408 result_detail = "Done"
5409 self.logger.debug(
5410 logging_text
5411 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5412 vnf_config_primitive, result, result_detail
5413 )
5414 )
5415 else:
5416 if op_index == self.SUBOPERATION_STATUS_NEW:
5417 # New sub-operation: Get index of this sub-operation
5418 op_index = (
5419 len(db_nslcmop.get("_admin", {}).get("operations"))
5420 - 1
5421 )
5422 self.logger.debug(
5423 logging_text
5424 + "vnf_config_primitive={} New sub-operation".format(
5425 vnf_config_primitive
5426 )
5427 )
5428 else:
5429 # retry: Get registered params for this existing sub-operation
5430 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5431 op_index
5432 ]
5433 vnf_index = op.get("member_vnf_index")
5434 vnf_config_primitive = op.get("primitive")
5435 primitive_params = op.get("primitive_params")
5436 self.logger.debug(
5437 logging_text
5438 + "vnf_config_primitive={} Sub-operation retry".format(
5439 vnf_config_primitive
5440 )
5441 )
5442 # Execute the primitive, either with new (first-time) or registered (reintent) args
5443 ee_descriptor_id = config_primitive.get(
5444 "execution-environment-ref"
5445 )
5446 primitive_name = config_primitive.get(
5447 "execution-environment-primitive", vnf_config_primitive
5448 )
5449 ee_id, vca_type = self._look_for_deployed_vca(
5450 nsr_deployed["VCA"],
5451 member_vnf_index=vnf_index,
5452 vdu_id=None,
5453 vdu_count_index=None,
5454 ee_descriptor_id=ee_descriptor_id,
5455 )
5456 result, result_detail = await self._ns_execute_primitive(
5457 ee_id,
5458 primitive_name,
5459 primitive_params,
5460 vca_type=vca_type,
5461 vca_id=vca_id,
5462 )
5463 self.logger.debug(
5464 logging_text
5465 + "vnf_config_primitive={} Done with result {} {}".format(
5466 vnf_config_primitive, result, result_detail
5467 )
5468 )
5469 # Update operationState = COMPLETED | FAILED
5470 self._update_suboperation_status(
5471 db_nslcmop, op_index, result, result_detail
5472 )
5473
5474 if result == "FAILED":
5475 raise LcmException(result_detail)
5476 db_nsr_update["config-status"] = old_config_status
5477 scale_process = None
5478 # PRE-SCALE END
5479
5480 db_nsr_update[
5481 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5482 ] = nb_scale_op
5483 db_nsr_update[
5484 "_admin.scaling-group.{}.time".format(admin_scale_index)
5485 ] = time()
5486
5487 # SCALE-IN VCA - BEGIN
5488 if vca_scaling_info:
5489 step = db_nslcmop_update[
5490 "detailed-status"
5491 ] = "Deleting the execution environments"
5492 scale_process = "VCA"
5493 for vca_info in vca_scaling_info:
5494 if vca_info["type"] == "delete":
5495 member_vnf_index = str(vca_info["member-vnf-index"])
5496 self.logger.debug(
5497 logging_text + "vdu info: {}".format(vca_info)
5498 )
5499 if vca_info.get("osm_vdu_id"):
5500 vdu_id = vca_info["osm_vdu_id"]
5501 vdu_index = int(vca_info["vdu_index"])
5502 stage[
5503 1
5504 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5505 member_vnf_index, vdu_id, vdu_index
5506 )
5507 else:
5508 vdu_index = 0
5509 kdu_id = vca_info["osm_kdu_id"]
5510 stage[
5511 1
5512 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5513 member_vnf_index, kdu_id, vdu_index
5514 )
5515 stage[2] = step = "Scaling in VCA"
5516 self._write_op_status(op_id=nslcmop_id, stage=stage)
5517 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5518 config_update = db_nsr["configurationStatus"]
5519 for vca_index, vca in enumerate(vca_update):
5520 if (
5521 (vca or vca.get("ee_id"))
5522 and vca["member-vnf-index"] == member_vnf_index
5523 and vca["vdu_count_index"] == vdu_index
5524 ):
5525 if vca.get("vdu_id"):
5526 config_descriptor = get_configuration(
5527 db_vnfd, vca.get("vdu_id")
5528 )
5529 elif vca.get("kdu_name"):
5530 config_descriptor = get_configuration(
5531 db_vnfd, vca.get("kdu_name")
5532 )
5533 else:
5534 config_descriptor = get_configuration(
5535 db_vnfd, db_vnfd["id"]
5536 )
5537 operation_params = (
5538 db_nslcmop.get("operationParams") or {}
5539 )
5540 exec_terminate_primitives = not operation_params.get(
5541 "skip_terminate_primitives"
5542 ) and vca.get("needed_terminate")
5543 task = asyncio.ensure_future(
5544 asyncio.wait_for(
5545 self.destroy_N2VC(
5546 logging_text,
5547 db_nslcmop,
5548 vca,
5549 config_descriptor,
5550 vca_index,
5551 destroy_ee=True,
5552 exec_primitives=exec_terminate_primitives,
5553 scaling_in=True,
5554 vca_id=vca_id,
5555 ),
5556 timeout=self.timeout_charm_delete,
5557 )
5558 )
5559 tasks_dict_info[task] = "Terminating VCA {}".format(
5560 vca.get("ee_id")
5561 )
5562 del vca_update[vca_index]
5563 del config_update[vca_index]
5564 # wait for pending tasks of terminate primitives
5565 if tasks_dict_info:
5566 self.logger.debug(
5567 logging_text
5568 + "Waiting for tasks {}".format(
5569 list(tasks_dict_info.keys())
5570 )
5571 )
5572 error_list = await self._wait_for_tasks(
5573 logging_text,
5574 tasks_dict_info,
5575 min(
5576 self.timeout_charm_delete, self.timeout_ns_terminate
5577 ),
5578 stage,
5579 nslcmop_id,
5580 )
5581 tasks_dict_info.clear()
5582 if error_list:
5583 raise LcmException("; ".join(error_list))
5584
5585 db_vca_and_config_update = {
5586 "_admin.deployed.VCA": vca_update,
5587 "configurationStatus": config_update,
5588 }
5589 self.update_db_2(
5590 "nsrs", db_nsr["_id"], db_vca_and_config_update
5591 )
5592 scale_process = None
5593 # SCALE-IN VCA - END
5594
5595 # SCALE RO - BEGIN
5596 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5597 scale_process = "RO"
5598 if self.ro_config.get("ng"):
5599 await self._scale_ng_ro(
5600 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5601 )
5602 scaling_info.pop("vdu-create", None)
5603 scaling_info.pop("vdu-delete", None)
5604
5605 scale_process = None
5606 # SCALE RO - END
5607
5608 # SCALE KDU - BEGIN
5609 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5610 scale_process = "KDU"
5611 await self._scale_kdu(
5612 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5613 )
5614 scaling_info.pop("kdu-create", None)
5615 scaling_info.pop("kdu-delete", None)
5616
5617 scale_process = None
5618 # SCALE KDU - END
5619
5620 if db_nsr_update:
5621 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5622
5623 # SCALE-UP VCA - BEGIN
5624 if vca_scaling_info:
5625 step = db_nslcmop_update[
5626 "detailed-status"
5627 ] = "Creating new execution environments"
5628 scale_process = "VCA"
5629 for vca_info in vca_scaling_info:
5630 if vca_info["type"] == "create":
5631 member_vnf_index = str(vca_info["member-vnf-index"])
5632 self.logger.debug(
5633 logging_text + "vdu info: {}".format(vca_info)
5634 )
5635 vnfd_id = db_vnfr["vnfd-ref"]
5636 if vca_info.get("osm_vdu_id"):
5637 vdu_index = int(vca_info["vdu_index"])
5638 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5639 if db_vnfr.get("additionalParamsForVnf"):
5640 deploy_params.update(
5641 parse_yaml_strings(
5642 db_vnfr["additionalParamsForVnf"].copy()
5643 )
5644 )
5645 descriptor_config = get_configuration(
5646 db_vnfd, db_vnfd["id"]
5647 )
5648 if descriptor_config:
5649 vdu_id = None
5650 vdu_name = None
5651 kdu_name = None
5652 self._deploy_n2vc(
5653 logging_text=logging_text
5654 + "member_vnf_index={} ".format(member_vnf_index),
5655 db_nsr=db_nsr,
5656 db_vnfr=db_vnfr,
5657 nslcmop_id=nslcmop_id,
5658 nsr_id=nsr_id,
5659 nsi_id=nsi_id,
5660 vnfd_id=vnfd_id,
5661 vdu_id=vdu_id,
5662 kdu_name=kdu_name,
5663 member_vnf_index=member_vnf_index,
5664 vdu_index=vdu_index,
5665 vdu_name=vdu_name,
5666 deploy_params=deploy_params,
5667 descriptor_config=descriptor_config,
5668 base_folder=base_folder,
5669 task_instantiation_info=tasks_dict_info,
5670 stage=stage,
5671 )
5672 vdu_id = vca_info["osm_vdu_id"]
5673 vdur = find_in_list(
5674 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5675 )
5676 descriptor_config = get_configuration(db_vnfd, vdu_id)
5677 if vdur.get("additionalParams"):
5678 deploy_params_vdu = parse_yaml_strings(
5679 vdur["additionalParams"]
5680 )
5681 else:
5682 deploy_params_vdu = deploy_params
5683 deploy_params_vdu["OSM"] = get_osm_params(
5684 db_vnfr, vdu_id, vdu_count_index=vdu_index
5685 )
5686 if descriptor_config:
5687 vdu_name = None
5688 kdu_name = None
5689 stage[
5690 1
5691 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5692 member_vnf_index, vdu_id, vdu_index
5693 )
5694 stage[2] = step = "Scaling out VCA"
5695 self._write_op_status(op_id=nslcmop_id, stage=stage)
5696 self._deploy_n2vc(
5697 logging_text=logging_text
5698 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5699 member_vnf_index, vdu_id, vdu_index
5700 ),
5701 db_nsr=db_nsr,
5702 db_vnfr=db_vnfr,
5703 nslcmop_id=nslcmop_id,
5704 nsr_id=nsr_id,
5705 nsi_id=nsi_id,
5706 vnfd_id=vnfd_id,
5707 vdu_id=vdu_id,
5708 kdu_name=kdu_name,
5709 member_vnf_index=member_vnf_index,
5710 vdu_index=vdu_index,
5711 vdu_name=vdu_name,
5712 deploy_params=deploy_params_vdu,
5713 descriptor_config=descriptor_config,
5714 base_folder=base_folder,
5715 task_instantiation_info=tasks_dict_info,
5716 stage=stage,
5717 )
5718 else:
5719 kdu_name = vca_info["osm_kdu_id"]
5720 descriptor_config = get_configuration(db_vnfd, kdu_name)
5721 if descriptor_config:
5722 vdu_id = None
5723 kdu_index = int(vca_info["kdu_index"])
5724 vdu_name = None
5725 kdur = next(
5726 x
5727 for x in db_vnfr["kdur"]
5728 if x["kdu-name"] == kdu_name
5729 )
5730 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5731 if kdur.get("additionalParams"):
5732 deploy_params_kdu = parse_yaml_strings(
5733 kdur["additionalParams"]
5734 )
5735
5736 self._deploy_n2vc(
5737 logging_text=logging_text,
5738 db_nsr=db_nsr,
5739 db_vnfr=db_vnfr,
5740 nslcmop_id=nslcmop_id,
5741 nsr_id=nsr_id,
5742 nsi_id=nsi_id,
5743 vnfd_id=vnfd_id,
5744 vdu_id=vdu_id,
5745 kdu_name=kdu_name,
5746 member_vnf_index=member_vnf_index,
5747 vdu_index=kdu_index,
5748 vdu_name=vdu_name,
5749 deploy_params=deploy_params_kdu,
5750 descriptor_config=descriptor_config,
5751 base_folder=base_folder,
5752 task_instantiation_info=tasks_dict_info,
5753 stage=stage,
5754 )
5755 # SCALE-UP VCA - END
5756 scale_process = None
5757
5758 # POST-SCALE BEGIN
5759 # execute primitive service POST-SCALING
5760 step = "Executing post-scale vnf-config-primitive"
5761 if scaling_descriptor.get("scaling-config-action"):
5762 for scaling_config_action in scaling_descriptor[
5763 "scaling-config-action"
5764 ]:
5765 if (
5766 scaling_config_action.get("trigger") == "post-scale-in"
5767 and scaling_type == "SCALE_IN"
5768 ) or (
5769 scaling_config_action.get("trigger") == "post-scale-out"
5770 and scaling_type == "SCALE_OUT"
5771 ):
5772 vnf_config_primitive = scaling_config_action[
5773 "vnf-config-primitive-name-ref"
5774 ]
5775 step = db_nslcmop_update[
5776 "detailed-status"
5777 ] = "executing post-scale scaling-config-action '{}'".format(
5778 vnf_config_primitive
5779 )
5780
5781 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5782 if db_vnfr.get("additionalParamsForVnf"):
5783 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5784
5785 # look for primitive
5786 for config_primitive in (
5787 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5788 ).get("config-primitive", ()):
5789 if config_primitive["name"] == vnf_config_primitive:
5790 break
5791 else:
5792 raise LcmException(
5793 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5794 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5795 "config-primitive".format(
5796 scaling_group, vnf_config_primitive
5797 )
5798 )
5799 scale_process = "VCA"
5800 db_nsr_update["config-status"] = "configuring post-scaling"
5801 primitive_params = self._map_primitive_params(
5802 config_primitive, {}, vnfr_params
5803 )
5804
5805 # Post-scale retry check: Check if this sub-operation has been executed before
5806 op_index = self._check_or_add_scale_suboperation(
5807 db_nslcmop,
5808 vnf_index,
5809 vnf_config_primitive,
5810 primitive_params,
5811 "POST-SCALE",
5812 )
5813 if op_index == self.SUBOPERATION_STATUS_SKIP:
5814 # Skip sub-operation
5815 result = "COMPLETED"
5816 result_detail = "Done"
5817 self.logger.debug(
5818 logging_text
5819 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5820 vnf_config_primitive, result, result_detail
5821 )
5822 )
5823 else:
5824 if op_index == self.SUBOPERATION_STATUS_NEW:
5825 # New sub-operation: Get index of this sub-operation
5826 op_index = (
5827 len(db_nslcmop.get("_admin", {}).get("operations"))
5828 - 1
5829 )
5830 self.logger.debug(
5831 logging_text
5832 + "vnf_config_primitive={} New sub-operation".format(
5833 vnf_config_primitive
5834 )
5835 )
5836 else:
5837 # retry: Get registered params for this existing sub-operation
5838 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5839 op_index
5840 ]
5841 vnf_index = op.get("member_vnf_index")
5842 vnf_config_primitive = op.get("primitive")
5843 primitive_params = op.get("primitive_params")
5844 self.logger.debug(
5845 logging_text
5846 + "vnf_config_primitive={} Sub-operation retry".format(
5847 vnf_config_primitive
5848 )
5849 )
5850 # Execute the primitive, either with new (first-time) or registered (reintent) args
5851 ee_descriptor_id = config_primitive.get(
5852 "execution-environment-ref"
5853 )
5854 primitive_name = config_primitive.get(
5855 "execution-environment-primitive", vnf_config_primitive
5856 )
5857 ee_id, vca_type = self._look_for_deployed_vca(
5858 nsr_deployed["VCA"],
5859 member_vnf_index=vnf_index,
5860 vdu_id=None,
5861 vdu_count_index=None,
5862 ee_descriptor_id=ee_descriptor_id,
5863 )
5864 result, result_detail = await self._ns_execute_primitive(
5865 ee_id,
5866 primitive_name,
5867 primitive_params,
5868 vca_type=vca_type,
5869 vca_id=vca_id,
5870 )
5871 self.logger.debug(
5872 logging_text
5873 + "vnf_config_primitive={} Done with result {} {}".format(
5874 vnf_config_primitive, result, result_detail
5875 )
5876 )
5877 # Update operationState = COMPLETED | FAILED
5878 self._update_suboperation_status(
5879 db_nslcmop, op_index, result, result_detail
5880 )
5881
5882 if result == "FAILED":
5883 raise LcmException(result_detail)
5884 db_nsr_update["config-status"] = old_config_status
5885 scale_process = None
5886 # POST-SCALE END
5887
5888 db_nsr_update[
5889 "detailed-status"
5890 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5891 db_nsr_update["operational-status"] = (
5892 "running"
5893 if old_operational_status == "failed"
5894 else old_operational_status
5895 )
5896 db_nsr_update["config-status"] = old_config_status
5897 return
5898 except (
5899 ROclient.ROClientException,
5900 DbException,
5901 LcmException,
5902 NgRoException,
5903 ) as e:
5904 self.logger.error(logging_text + "Exit Exception {}".format(e))
5905 exc = e
5906 except asyncio.CancelledError:
5907 self.logger.error(
5908 logging_text + "Cancelled Exception while '{}'".format(step)
5909 )
5910 exc = "Operation was cancelled"
5911 except Exception as e:
5912 exc = traceback.format_exc()
5913 self.logger.critical(
5914 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5915 exc_info=True,
5916 )
5917 finally:
5918 self._write_ns_status(
5919 nsr_id=nsr_id,
5920 ns_state=None,
5921 current_operation="IDLE",
5922 current_operation_id=None,
5923 )
5924 if tasks_dict_info:
5925 stage[1] = "Waiting for instantiate pending tasks."
5926 self.logger.debug(logging_text + stage[1])
5927 exc = await self._wait_for_tasks(
5928 logging_text,
5929 tasks_dict_info,
5930 self.timeout_ns_deploy,
5931 stage,
5932 nslcmop_id,
5933 nsr_id=nsr_id,
5934 )
5935 if exc:
5936 db_nslcmop_update[
5937 "detailed-status"
5938 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5939 nslcmop_operation_state = "FAILED"
5940 if db_nsr:
5941 db_nsr_update["operational-status"] = old_operational_status
5942 db_nsr_update["config-status"] = old_config_status
5943 db_nsr_update["detailed-status"] = ""
5944 if scale_process:
5945 if "VCA" in scale_process:
5946 db_nsr_update["config-status"] = "failed"
5947 if "RO" in scale_process:
5948 db_nsr_update["operational-status"] = "failed"
5949 db_nsr_update[
5950 "detailed-status"
5951 ] = "FAILED scaling nslcmop={} {}: {}".format(
5952 nslcmop_id, step, exc
5953 )
5954 else:
5955 error_description_nslcmop = None
5956 nslcmop_operation_state = "COMPLETED"
5957 db_nslcmop_update["detailed-status"] = "Done"
5958
5959 self._write_op_status(
5960 op_id=nslcmop_id,
5961 stage="",
5962 error_message=error_description_nslcmop,
5963 operation_state=nslcmop_operation_state,
5964 other_update=db_nslcmop_update,
5965 )
5966 if db_nsr:
5967 self._write_ns_status(
5968 nsr_id=nsr_id,
5969 ns_state=None,
5970 current_operation="IDLE",
5971 current_operation_id=None,
5972 other_update=db_nsr_update,
5973 )
5974
5975 if nslcmop_operation_state:
5976 try:
5977 msg = {
5978 "nsr_id": nsr_id,
5979 "nslcmop_id": nslcmop_id,
5980 "operationState": nslcmop_operation_state,
5981 }
5982 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
5983 except Exception as e:
5984 self.logger.error(
5985 logging_text + "kafka_write notification Exception {}".format(e)
5986 )
5987 self.logger.debug(logging_text + "Exit")
5988 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
5989
5990 async def _scale_kdu(
5991 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5992 ):
5993 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
5994 for kdu_name in _scaling_info:
5995 for kdu_scaling_info in _scaling_info[kdu_name]:
5996 deployed_kdu, index = get_deployed_kdu(
5997 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
5998 )
5999 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6000 kdu_instance = deployed_kdu["kdu-instance"]
6001 scale = int(kdu_scaling_info["scale"])
6002 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6003
6004 db_dict = {
6005 "collection": "nsrs",
6006 "filter": {"_id": nsr_id},
6007 "path": "_admin.deployed.K8s.{}".format(index),
6008 }
6009
6010 step = "scaling application {}".format(
6011 kdu_scaling_info["resource-name"]
6012 )
6013 self.logger.debug(logging_text + step)
6014
6015 if kdu_scaling_info["type"] == "delete":
6016 kdu_config = get_configuration(db_vnfd, kdu_name)
6017 if (
6018 kdu_config
6019 and kdu_config.get("terminate-config-primitive")
6020 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6021 ):
6022 terminate_config_primitive_list = kdu_config.get(
6023 "terminate-config-primitive"
6024 )
6025 terminate_config_primitive_list.sort(
6026 key=lambda val: int(val["seq"])
6027 )
6028
6029 for (
6030 terminate_config_primitive
6031 ) in terminate_config_primitive_list:
6032 primitive_params_ = self._map_primitive_params(
6033 terminate_config_primitive, {}, {}
6034 )
6035 step = "execute terminate config primitive"
6036 self.logger.debug(logging_text + step)
6037 await asyncio.wait_for(
6038 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6039 cluster_uuid=cluster_uuid,
6040 kdu_instance=kdu_instance,
6041 primitive_name=terminate_config_primitive["name"],
6042 params=primitive_params_,
6043 db_dict=db_dict,
6044 vca_id=vca_id,
6045 ),
6046 timeout=600,
6047 )
6048
6049 await asyncio.wait_for(
6050 self.k8scluster_map[k8s_cluster_type].scale(
6051 kdu_instance,
6052 scale,
6053 kdu_scaling_info["resource-name"],
6054 vca_id=vca_id,
6055 ),
6056 timeout=self.timeout_vca_on_error,
6057 )
6058
6059 if kdu_scaling_info["type"] == "create":
6060 kdu_config = get_configuration(db_vnfd, kdu_name)
6061 if (
6062 kdu_config
6063 and kdu_config.get("initial-config-primitive")
6064 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6065 ):
6066 initial_config_primitive_list = kdu_config.get(
6067 "initial-config-primitive"
6068 )
6069 initial_config_primitive_list.sort(
6070 key=lambda val: int(val["seq"])
6071 )
6072
6073 for initial_config_primitive in initial_config_primitive_list:
6074 primitive_params_ = self._map_primitive_params(
6075 initial_config_primitive, {}, {}
6076 )
6077 step = "execute initial config primitive"
6078 self.logger.debug(logging_text + step)
6079 await asyncio.wait_for(
6080 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6081 cluster_uuid=cluster_uuid,
6082 kdu_instance=kdu_instance,
6083 primitive_name=initial_config_primitive["name"],
6084 params=primitive_params_,
6085 db_dict=db_dict,
6086 vca_id=vca_id,
6087 ),
6088 timeout=600,
6089 )
6090
6091 async def _scale_ng_ro(
6092 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6093 ):
6094 nsr_id = db_nslcmop["nsInstanceId"]
6095 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6096 db_vnfrs = {}
6097
6098 # read from db: vnfd's for every vnf
6099 db_vnfds = []
6100
6101 # for each vnf in ns, read vnfd
6102 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6103 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6104 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6105 # if we haven't this vnfd, read it from db
6106 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6107 # read from db
6108 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6109 db_vnfds.append(vnfd)
6110 n2vc_key = self.n2vc.get_public_key()
6111 n2vc_key_list = [n2vc_key]
6112 self.scale_vnfr(
6113 db_vnfr,
6114 vdu_scaling_info.get("vdu-create"),
6115 vdu_scaling_info.get("vdu-delete"),
6116 mark_delete=True,
6117 )
6118 # db_vnfr has been updated, update db_vnfrs to use it
6119 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6120 await self._instantiate_ng_ro(
6121 logging_text,
6122 nsr_id,
6123 db_nsd,
6124 db_nsr,
6125 db_nslcmop,
6126 db_vnfrs,
6127 db_vnfds,
6128 n2vc_key_list,
6129 stage=stage,
6130 start_deploy=time(),
6131 timeout_ns_deploy=self.timeout_ns_deploy,
6132 )
6133 if vdu_scaling_info.get("vdu-delete"):
6134 self.scale_vnfr(
6135 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6136 )
6137
6138 async def add_prometheus_metrics(
6139 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6140 ):
6141 if not self.prometheus:
6142 return
6143 # look if exist a file called 'prometheus*.j2' and
6144 artifact_content = self.fs.dir_ls(artifact_path)
6145 job_file = next(
6146 (
6147 f
6148 for f in artifact_content
6149 if f.startswith("prometheus") and f.endswith(".j2")
6150 ),
6151 None,
6152 )
6153 if not job_file:
6154 return
6155 with self.fs.file_open((artifact_path, job_file), "r") as f:
6156 job_data = f.read()
6157
6158 # TODO get_service
6159 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6160 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6161 host_port = "80"
6162 vnfr_id = vnfr_id.replace("-", "")
6163 variables = {
6164 "JOB_NAME": vnfr_id,
6165 "TARGET_IP": target_ip,
6166 "EXPORTER_POD_IP": host_name,
6167 "EXPORTER_POD_PORT": host_port,
6168 }
6169 job_list = self.prometheus.parse_job(job_data, variables)
6170 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6171 for job in job_list:
6172 if (
6173 not isinstance(job.get("job_name"), str)
6174 or vnfr_id not in job["job_name"]
6175 ):
6176 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6177 job["nsr_id"] = nsr_id
6178 job_dict = {jl["job_name"]: jl for jl in job_list}
6179 if await self.prometheus.update(job_dict):
6180 return list(job_dict.keys())
6181
6182 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6183 """
6184 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6185
6186 :param: vim_account_id: VIM Account ID
6187
6188 :return: (cloud_name, cloud_credential)
6189 """
6190 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6191 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6192
6193 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6194 """
6195 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6196
6197 :param: vim_account_id: VIM Account ID
6198
6199 :return: (cloud_name, cloud_credential)
6200 """
6201 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6202 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")