Bug 1871 fix - Fix issue executing actions
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :return: none
339 """
340
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
343
344 try:
345 nsr_id = filter.get("_id")
346
347 # get vca status for NS
348 vca_status = await self.k8sclusterjuju.status_kdu(
349 cluster_uuid,
350 kdu_instance,
351 complete_status=True,
352 yaml_format=False,
353 vca_id=vca_id,
354 )
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 await self.k8sclusterjuju.update_vca_status(
360 db_dict["vcaStatus"],
361 kdu_instance,
362 vca_id=vca_id,
363 )
364
365 # write to database
366 self.update_db_2("nsrs", nsr_id, db_dict)
367
368 except (asyncio.CancelledError, asyncio.TimeoutError):
369 raise
370 except Exception as e:
371 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
372
373 @staticmethod
374 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
375 try:
376 env = Environment(undefined=StrictUndefined)
377 template = env.from_string(cloud_init_text)
378 return template.render(additional_params or {})
379 except UndefinedError as e:
380 raise LcmException(
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
384 )
385 except (TemplateError, TemplateNotFound) as e:
386 raise LcmException(
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
388 vnfd_id, vdu_id, e
389 )
390 )
391
392 def _get_vdu_cloud_init_content(self, vdu, vnfd):
393 cloud_init_content = cloud_init_file = None
394 try:
395 if vdu.get("cloud-init-file"):
396 base_folder = vnfd["_admin"]["storage"]
397 cloud_init_file = "{}/{}/cloud_init/{}".format(
398 base_folder["folder"],
399 base_folder["pkg-dir"],
400 vdu["cloud-init-file"],
401 )
402 with self.fs.file_open(cloud_init_file, "r") as ci_file:
403 cloud_init_content = ci_file.read()
404 elif vdu.get("cloud-init"):
405 cloud_init_content = vdu["cloud-init"]
406
407 return cloud_init_content
408 except FsException as e:
409 raise LcmException(
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd["id"], vdu["id"], cloud_init_file, e
412 )
413 )
414
415 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
416 vdur = next(
417 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
418 )
419 additional_params = vdur.get("additionalParams")
420 return parse_yaml_strings(additional_params)
421
422 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
423 """
424 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
425 :param vnfd: input vnfd
426 :param new_id: overrides vnf id if provided
427 :param additionalParams: Instantiation params for VNFs provided
428 :param nsrId: Id of the NSR
429 :return: copy of vnfd
430 """
431 vnfd_RO = deepcopy(vnfd)
432 # remove unused by RO configuration, monitoring, scaling and internal keys
433 vnfd_RO.pop("_id", None)
434 vnfd_RO.pop("_admin", None)
435 vnfd_RO.pop("monitoring-param", None)
436 vnfd_RO.pop("scaling-group-descriptor", None)
437 vnfd_RO.pop("kdu", None)
438 vnfd_RO.pop("k8s-cluster", None)
439 if new_id:
440 vnfd_RO["id"] = new_id
441
442 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
443 for vdu in get_iterable(vnfd_RO, "vdu"):
444 vdu.pop("cloud-init-file", None)
445 vdu.pop("cloud-init", None)
446 return vnfd_RO
447
448 @staticmethod
449 def ip_profile_2_RO(ip_profile):
450 RO_ip_profile = deepcopy(ip_profile)
451 if "dns-server" in RO_ip_profile:
452 if isinstance(RO_ip_profile["dns-server"], list):
453 RO_ip_profile["dns-address"] = []
454 for ds in RO_ip_profile.pop("dns-server"):
455 RO_ip_profile["dns-address"].append(ds["address"])
456 else:
457 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
458 if RO_ip_profile.get("ip-version") == "ipv4":
459 RO_ip_profile["ip-version"] = "IPv4"
460 if RO_ip_profile.get("ip-version") == "ipv6":
461 RO_ip_profile["ip-version"] = "IPv6"
462 if "dhcp-params" in RO_ip_profile:
463 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
464 return RO_ip_profile
465
466 def _get_ro_vim_id_for_vim_account(self, vim_account):
467 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
468 if db_vim["_admin"]["operationalState"] != "ENABLED":
469 raise LcmException(
470 "VIM={} is not available. operationalState={}".format(
471 vim_account, db_vim["_admin"]["operationalState"]
472 )
473 )
474 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
475 return RO_vim_id
476
477 def get_ro_wim_id_for_wim_account(self, wim_account):
478 if isinstance(wim_account, str):
479 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
480 if db_wim["_admin"]["operationalState"] != "ENABLED":
481 raise LcmException(
482 "WIM={} is not available. operationalState={}".format(
483 wim_account, db_wim["_admin"]["operationalState"]
484 )
485 )
486 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
487 return RO_wim_id
488 else:
489 return wim_account
490
491 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
492
493 db_vdu_push_list = []
494 db_update = {"_admin.modified": time()}
495 if vdu_create:
496 for vdu_id, vdu_count in vdu_create.items():
497 vdur = next(
498 (
499 vdur
500 for vdur in reversed(db_vnfr["vdur"])
501 if vdur["vdu-id-ref"] == vdu_id
502 ),
503 None,
504 )
505 if not vdur:
506 raise LcmException(
507 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
508 vdu_id
509 )
510 )
511
512 for count in range(vdu_count):
513 vdur_copy = deepcopy(vdur)
514 vdur_copy["status"] = "BUILD"
515 vdur_copy["status-detailed"] = None
516 vdur_copy["ip-address"] = None
517 vdur_copy["_id"] = str(uuid4())
518 vdur_copy["count-index"] += count + 1
519 vdur_copy["id"] = "{}-{}".format(
520 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
521 )
522 vdur_copy.pop("vim_info", None)
523 for iface in vdur_copy["interfaces"]:
524 if iface.get("fixed-ip"):
525 iface["ip-address"] = self.increment_ip_mac(
526 iface["ip-address"], count + 1
527 )
528 else:
529 iface.pop("ip-address", None)
530 if iface.get("fixed-mac"):
531 iface["mac-address"] = self.increment_ip_mac(
532 iface["mac-address"], count + 1
533 )
534 else:
535 iface.pop("mac-address", None)
536 iface.pop(
537 "mgmt_vnf", None
538 ) # only first vdu can be managment of vnf
539 db_vdu_push_list.append(vdur_copy)
540 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
541 if vdu_delete:
542 for vdu_id, vdu_count in vdu_delete.items():
543 if mark_delete:
544 indexes_to_delete = [
545 iv[0]
546 for iv in enumerate(db_vnfr["vdur"])
547 if iv[1]["vdu-id-ref"] == vdu_id
548 ]
549 db_update.update(
550 {
551 "vdur.{}.status".format(i): "DELETING"
552 for i in indexes_to_delete[-vdu_count:]
553 }
554 )
555 else:
556 # it must be deleted one by one because common.db does not allow otherwise
557 vdus_to_delete = [
558 v
559 for v in reversed(db_vnfr["vdur"])
560 if v["vdu-id-ref"] == vdu_id
561 ]
562 for vdu in vdus_to_delete[:vdu_count]:
563 self.db.set_one(
564 "vnfrs",
565 {"_id": db_vnfr["_id"]},
566 None,
567 pull={"vdur": {"_id": vdu["_id"]}},
568 )
569 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
570 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
571 # modify passed dictionary db_vnfr
572 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
573 db_vnfr["vdur"] = db_vnfr_["vdur"]
574
575 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
576 """
577 Updates database nsr with the RO info for the created vld
578 :param ns_update_nsr: dictionary to be filled with the updated info
579 :param db_nsr: content of db_nsr. This is also modified
580 :param nsr_desc_RO: nsr descriptor from RO
581 :return: Nothing, LcmException is raised on errors
582 """
583
584 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
585 for net_RO in get_iterable(nsr_desc_RO, "nets"):
586 if vld["id"] != net_RO.get("ns_net_osm_id"):
587 continue
588 vld["vim-id"] = net_RO.get("vim_net_id")
589 vld["name"] = net_RO.get("vim_name")
590 vld["status"] = net_RO.get("status")
591 vld["status-detailed"] = net_RO.get("error_msg")
592 ns_update_nsr["vld.{}".format(vld_index)] = vld
593 break
594 else:
595 raise LcmException(
596 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
597 )
598
599 def set_vnfr_at_error(self, db_vnfrs, error_text):
600 try:
601 for db_vnfr in db_vnfrs.values():
602 vnfr_update = {"status": "ERROR"}
603 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
604 if "status" not in vdur:
605 vdur["status"] = "ERROR"
606 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
607 if error_text:
608 vdur["status-detailed"] = str(error_text)
609 vnfr_update[
610 "vdur.{}.status-detailed".format(vdu_index)
611 ] = "ERROR"
612 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
613 except DbException as e:
614 self.logger.error("Cannot update vnf. {}".format(e))
615
616 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
617 """
618 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
619 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
620 :param nsr_desc_RO: nsr descriptor from RO
621 :return: Nothing, LcmException is raised on errors
622 """
623 for vnf_index, db_vnfr in db_vnfrs.items():
624 for vnf_RO in nsr_desc_RO["vnfs"]:
625 if vnf_RO["member_vnf_index"] != vnf_index:
626 continue
627 vnfr_update = {}
628 if vnf_RO.get("ip_address"):
629 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
630 "ip_address"
631 ].split(";")[0]
632 elif not db_vnfr.get("ip-address"):
633 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
634 raise LcmExceptionNoMgmtIP(
635 "ns member_vnf_index '{}' has no IP address".format(
636 vnf_index
637 )
638 )
639
640 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
641 vdur_RO_count_index = 0
642 if vdur.get("pdu-type"):
643 continue
644 for vdur_RO in get_iterable(vnf_RO, "vms"):
645 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
646 continue
647 if vdur["count-index"] != vdur_RO_count_index:
648 vdur_RO_count_index += 1
649 continue
650 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
651 if vdur_RO.get("ip_address"):
652 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
653 else:
654 vdur["ip-address"] = None
655 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
656 vdur["name"] = vdur_RO.get("vim_name")
657 vdur["status"] = vdur_RO.get("status")
658 vdur["status-detailed"] = vdur_RO.get("error_msg")
659 for ifacer in get_iterable(vdur, "interfaces"):
660 for interface_RO in get_iterable(vdur_RO, "interfaces"):
661 if ifacer["name"] == interface_RO.get("internal_name"):
662 ifacer["ip-address"] = interface_RO.get(
663 "ip_address"
664 )
665 ifacer["mac-address"] = interface_RO.get(
666 "mac_address"
667 )
668 break
669 else:
670 raise LcmException(
671 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
672 "from VIM info".format(
673 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
674 )
675 )
676 vnfr_update["vdur.{}".format(vdu_index)] = vdur
677 break
678 else:
679 raise LcmException(
680 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
681 "VIM info".format(
682 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
683 )
684 )
685
686 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
687 for net_RO in get_iterable(nsr_desc_RO, "nets"):
688 if vld["id"] != net_RO.get("vnf_net_osm_id"):
689 continue
690 vld["vim-id"] = net_RO.get("vim_net_id")
691 vld["name"] = net_RO.get("vim_name")
692 vld["status"] = net_RO.get("status")
693 vld["status-detailed"] = net_RO.get("error_msg")
694 vnfr_update["vld.{}".format(vld_index)] = vld
695 break
696 else:
697 raise LcmException(
698 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
699 vnf_index, vld["id"]
700 )
701 )
702
703 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
704 break
705
706 else:
707 raise LcmException(
708 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
709 vnf_index
710 )
711 )
712
713 def _get_ns_config_info(self, nsr_id):
714 """
715 Generates a mapping between vnf,vdu elements and the N2VC id
716 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
717 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
718 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
719 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
720 """
721 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
722 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
723 mapping = {}
724 ns_config_info = {"osm-config-mapping": mapping}
725 for vca in vca_deployed_list:
726 if not vca["member-vnf-index"]:
727 continue
728 if not vca["vdu_id"]:
729 mapping[vca["member-vnf-index"]] = vca["application"]
730 else:
731 mapping[
732 "{}.{}.{}".format(
733 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
734 )
735 ] = vca["application"]
736 return ns_config_info
737
738 async def _instantiate_ng_ro(
739 self,
740 logging_text,
741 nsr_id,
742 nsd,
743 db_nsr,
744 db_nslcmop,
745 db_vnfrs,
746 db_vnfds,
747 n2vc_key_list,
748 stage,
749 start_deploy,
750 timeout_ns_deploy,
751 ):
752
753 db_vims = {}
754
755 def get_vim_account(vim_account_id):
756 nonlocal db_vims
757 if vim_account_id in db_vims:
758 return db_vims[vim_account_id]
759 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
760 db_vims[vim_account_id] = db_vim
761 return db_vim
762
763 # modify target_vld info with instantiation parameters
764 def parse_vld_instantiation_params(
765 target_vim, target_vld, vld_params, target_sdn
766 ):
767 if vld_params.get("ip-profile"):
768 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
769 "ip-profile"
770 ]
771 if vld_params.get("provider-network"):
772 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
773 "provider-network"
774 ]
775 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
776 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
777 "provider-network"
778 ]["sdn-ports"]
779 if vld_params.get("wimAccountId"):
780 target_wim = "wim:{}".format(vld_params["wimAccountId"])
781 target_vld["vim_info"][target_wim] = {}
782 for param in ("vim-network-name", "vim-network-id"):
783 if vld_params.get(param):
784 if isinstance(vld_params[param], dict):
785 for vim, vim_net in vld_params[param].items():
786 other_target_vim = "vim:" + vim
787 populate_dict(
788 target_vld["vim_info"],
789 (other_target_vim, param.replace("-", "_")),
790 vim_net,
791 )
792 else: # isinstance str
793 target_vld["vim_info"][target_vim][
794 param.replace("-", "_")
795 ] = vld_params[param]
796 if vld_params.get("common_id"):
797 target_vld["common_id"] = vld_params.get("common_id")
798
799 nslcmop_id = db_nslcmop["_id"]
800 target = {
801 "name": db_nsr["name"],
802 "ns": {"vld": []},
803 "vnf": [],
804 "image": deepcopy(db_nsr["image"]),
805 "flavor": deepcopy(db_nsr["flavor"]),
806 "action_id": nslcmop_id,
807 "cloud_init_content": {},
808 }
809 for image in target["image"]:
810 image["vim_info"] = {}
811 for flavor in target["flavor"]:
812 flavor["vim_info"] = {}
813
814 if db_nslcmop.get("lcmOperationType") != "instantiate":
815 # get parameters of instantiation:
816 db_nslcmop_instantiate = self.db.get_list(
817 "nslcmops",
818 {
819 "nsInstanceId": db_nslcmop["nsInstanceId"],
820 "lcmOperationType": "instantiate",
821 },
822 )[-1]
823 ns_params = db_nslcmop_instantiate.get("operationParams")
824 else:
825 ns_params = db_nslcmop.get("operationParams")
826 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
827 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
828
829 cp2target = {}
830 for vld_index, vld in enumerate(db_nsr.get("vld")):
831 target_vim = "vim:{}".format(ns_params["vimAccountId"])
832 target_vld = {
833 "id": vld["id"],
834 "name": vld["name"],
835 "mgmt-network": vld.get("mgmt-network", False),
836 "type": vld.get("type"),
837 "vim_info": {
838 target_vim: {
839 "vim_network_name": vld.get("vim-network-name"),
840 "vim_account_id": ns_params["vimAccountId"],
841 }
842 },
843 }
844 # check if this network needs SDN assist
845 if vld.get("pci-interfaces"):
846 db_vim = get_vim_account(ns_params["vimAccountId"])
847 sdnc_id = db_vim["config"].get("sdn-controller")
848 if sdnc_id:
849 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
850 target_sdn = "sdn:{}".format(sdnc_id)
851 target_vld["vim_info"][target_sdn] = {
852 "sdn": True,
853 "target_vim": target_vim,
854 "vlds": [sdn_vld],
855 "type": vld.get("type"),
856 }
857
858 nsd_vnf_profiles = get_vnf_profiles(nsd)
859 for nsd_vnf_profile in nsd_vnf_profiles:
860 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
861 if cp["virtual-link-profile-id"] == vld["id"]:
862 cp2target[
863 "member_vnf:{}.{}".format(
864 cp["constituent-cpd-id"][0][
865 "constituent-base-element-id"
866 ],
867 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
868 )
869 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
870
871 # check at nsd descriptor, if there is an ip-profile
872 vld_params = {}
873 nsd_vlp = find_in_list(
874 get_virtual_link_profiles(nsd),
875 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
876 == vld["id"],
877 )
878 if (
879 nsd_vlp
880 and nsd_vlp.get("virtual-link-protocol-data")
881 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
882 ):
883 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
884 "l3-protocol-data"
885 ]
886 ip_profile_dest_data = {}
887 if "ip-version" in ip_profile_source_data:
888 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
889 "ip-version"
890 ]
891 if "cidr" in ip_profile_source_data:
892 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
893 "cidr"
894 ]
895 if "gateway-ip" in ip_profile_source_data:
896 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
897 "gateway-ip"
898 ]
899 if "dhcp-enabled" in ip_profile_source_data:
900 ip_profile_dest_data["dhcp-params"] = {
901 "enabled": ip_profile_source_data["dhcp-enabled"]
902 }
903 vld_params["ip-profile"] = ip_profile_dest_data
904
905 # update vld_params with instantiation params
906 vld_instantiation_params = find_in_list(
907 get_iterable(ns_params, "vld"),
908 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
909 )
910 if vld_instantiation_params:
911 vld_params.update(vld_instantiation_params)
912 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
913 target["ns"]["vld"].append(target_vld)
914
915 for vnfr in db_vnfrs.values():
916 vnfd = find_in_list(
917 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
918 )
919 vnf_params = find_in_list(
920 get_iterable(ns_params, "vnf"),
921 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
922 )
923 target_vnf = deepcopy(vnfr)
924 target_vim = "vim:{}".format(vnfr["vim-account-id"])
925 for vld in target_vnf.get("vld", ()):
926 # check if connected to a ns.vld, to fill target'
927 vnf_cp = find_in_list(
928 vnfd.get("int-virtual-link-desc", ()),
929 lambda cpd: cpd.get("id") == vld["id"],
930 )
931 if vnf_cp:
932 ns_cp = "member_vnf:{}.{}".format(
933 vnfr["member-vnf-index-ref"], vnf_cp["id"]
934 )
935 if cp2target.get(ns_cp):
936 vld["target"] = cp2target[ns_cp]
937
938 vld["vim_info"] = {
939 target_vim: {"vim_network_name": vld.get("vim-network-name")}
940 }
941 # check if this network needs SDN assist
942 target_sdn = None
943 if vld.get("pci-interfaces"):
944 db_vim = get_vim_account(vnfr["vim-account-id"])
945 sdnc_id = db_vim["config"].get("sdn-controller")
946 if sdnc_id:
947 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
948 target_sdn = "sdn:{}".format(sdnc_id)
949 vld["vim_info"][target_sdn] = {
950 "sdn": True,
951 "target_vim": target_vim,
952 "vlds": [sdn_vld],
953 "type": vld.get("type"),
954 }
955
956 # check at vnfd descriptor, if there is an ip-profile
957 vld_params = {}
958 vnfd_vlp = find_in_list(
959 get_virtual_link_profiles(vnfd),
960 lambda a_link_profile: a_link_profile["id"] == vld["id"],
961 )
962 if (
963 vnfd_vlp
964 and vnfd_vlp.get("virtual-link-protocol-data")
965 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
966 ):
967 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
968 "l3-protocol-data"
969 ]
970 ip_profile_dest_data = {}
971 if "ip-version" in ip_profile_source_data:
972 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
973 "ip-version"
974 ]
975 if "cidr" in ip_profile_source_data:
976 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
977 "cidr"
978 ]
979 if "gateway-ip" in ip_profile_source_data:
980 ip_profile_dest_data[
981 "gateway-address"
982 ] = ip_profile_source_data["gateway-ip"]
983 if "dhcp-enabled" in ip_profile_source_data:
984 ip_profile_dest_data["dhcp-params"] = {
985 "enabled": ip_profile_source_data["dhcp-enabled"]
986 }
987
988 vld_params["ip-profile"] = ip_profile_dest_data
989 # update vld_params with instantiation params
990 if vnf_params:
991 vld_instantiation_params = find_in_list(
992 get_iterable(vnf_params, "internal-vld"),
993 lambda i_vld: i_vld["name"] == vld["id"],
994 )
995 if vld_instantiation_params:
996 vld_params.update(vld_instantiation_params)
997 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
998
999 vdur_list = []
1000 for vdur in target_vnf.get("vdur", ()):
1001 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1002 continue # This vdu must not be created
1003 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1004
1005 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1006
1007 if ssh_keys_all:
1008 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1009 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1010 if (
1011 vdu_configuration
1012 and vdu_configuration.get("config-access")
1013 and vdu_configuration.get("config-access").get("ssh-access")
1014 ):
1015 vdur["ssh-keys"] = ssh_keys_all
1016 vdur["ssh-access-required"] = vdu_configuration[
1017 "config-access"
1018 ]["ssh-access"]["required"]
1019 elif (
1020 vnf_configuration
1021 and vnf_configuration.get("config-access")
1022 and vnf_configuration.get("config-access").get("ssh-access")
1023 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1024 ):
1025 vdur["ssh-keys"] = ssh_keys_all
1026 vdur["ssh-access-required"] = vnf_configuration[
1027 "config-access"
1028 ]["ssh-access"]["required"]
1029 elif ssh_keys_instantiation and find_in_list(
1030 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1031 ):
1032 vdur["ssh-keys"] = ssh_keys_instantiation
1033
1034 self.logger.debug("NS > vdur > {}".format(vdur))
1035
1036 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1037 # cloud-init
1038 if vdud.get("cloud-init-file"):
1039 vdur["cloud-init"] = "{}:file:{}".format(
1040 vnfd["_id"], vdud.get("cloud-init-file")
1041 )
1042 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1043 if vdur["cloud-init"] not in target["cloud_init_content"]:
1044 base_folder = vnfd["_admin"]["storage"]
1045 cloud_init_file = "{}/{}/cloud_init/{}".format(
1046 base_folder["folder"],
1047 base_folder["pkg-dir"],
1048 vdud.get("cloud-init-file"),
1049 )
1050 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1051 target["cloud_init_content"][
1052 vdur["cloud-init"]
1053 ] = ci_file.read()
1054 elif vdud.get("cloud-init"):
1055 vdur["cloud-init"] = "{}:vdu:{}".format(
1056 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1057 )
1058 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1059 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1060 "cloud-init"
1061 ]
1062 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1063 deploy_params_vdu = self._format_additional_params(
1064 vdur.get("additionalParams") or {}
1065 )
1066 deploy_params_vdu["OSM"] = get_osm_params(
1067 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1068 )
1069 vdur["additionalParams"] = deploy_params_vdu
1070
1071 # flavor
1072 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1073 if target_vim not in ns_flavor["vim_info"]:
1074 ns_flavor["vim_info"][target_vim] = {}
1075
1076 # deal with images
1077 # in case alternative images are provided we must check if they should be applied
1078 # for the vim_type, modify the vim_type taking into account
1079 ns_image_id = int(vdur["ns-image-id"])
1080 if vdur.get("alt-image-ids"):
1081 db_vim = get_vim_account(vnfr["vim-account-id"])
1082 vim_type = db_vim["vim_type"]
1083 for alt_image_id in vdur.get("alt-image-ids"):
1084 ns_alt_image = target["image"][int(alt_image_id)]
1085 if vim_type == ns_alt_image.get("vim-type"):
1086 # must use alternative image
1087 self.logger.debug(
1088 "use alternative image id: {}".format(alt_image_id)
1089 )
1090 ns_image_id = alt_image_id
1091 vdur["ns-image-id"] = ns_image_id
1092 break
1093 ns_image = target["image"][int(ns_image_id)]
1094 if target_vim not in ns_image["vim_info"]:
1095 ns_image["vim_info"][target_vim] = {}
1096
1097 vdur["vim_info"] = {target_vim: {}}
1098 # instantiation parameters
1099 # if vnf_params:
1100 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1101 # vdud["id"]), None)
1102 vdur_list.append(vdur)
1103 target_vnf["vdur"] = vdur_list
1104 target["vnf"].append(target_vnf)
1105
1106 desc = await self.RO.deploy(nsr_id, target)
1107 self.logger.debug("RO return > {}".format(desc))
1108 action_id = desc["action_id"]
1109 await self._wait_ng_ro(
1110 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1111 )
1112
1113 # Updating NSR
1114 db_nsr_update = {
1115 "_admin.deployed.RO.operational-status": "running",
1116 "detailed-status": " ".join(stage),
1117 }
1118 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1119 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1120 self._write_op_status(nslcmop_id, stage)
1121 self.logger.debug(
1122 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1123 )
1124 return
1125
1126 async def _wait_ng_ro(
1127 self,
1128 nsr_id,
1129 action_id,
1130 nslcmop_id=None,
1131 start_time=None,
1132 timeout=600,
1133 stage=None,
1134 ):
1135 detailed_status_old = None
1136 db_nsr_update = {}
1137 start_time = start_time or time()
1138 while time() <= start_time + timeout:
1139 desc_status = await self.RO.status(nsr_id, action_id)
1140 self.logger.debug("Wait NG RO > {}".format(desc_status))
1141 if desc_status["status"] == "FAILED":
1142 raise NgRoException(desc_status["details"])
1143 elif desc_status["status"] == "BUILD":
1144 if stage:
1145 stage[2] = "VIM: ({})".format(desc_status["details"])
1146 elif desc_status["status"] == "DONE":
1147 if stage:
1148 stage[2] = "Deployed at VIM"
1149 break
1150 else:
1151 assert False, "ROclient.check_ns_status returns unknown {}".format(
1152 desc_status["status"]
1153 )
1154 if stage and nslcmop_id and stage[2] != detailed_status_old:
1155 detailed_status_old = stage[2]
1156 db_nsr_update["detailed-status"] = " ".join(stage)
1157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1158 self._write_op_status(nslcmop_id, stage)
1159 await asyncio.sleep(15, loop=self.loop)
1160 else: # timeout_ns_deploy
1161 raise NgRoException("Timeout waiting ns to deploy")
1162
1163 async def _terminate_ng_ro(
1164 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1165 ):
1166 db_nsr_update = {}
1167 failed_detail = []
1168 action_id = None
1169 start_deploy = time()
1170 try:
1171 target = {
1172 "ns": {"vld": []},
1173 "vnf": [],
1174 "image": [],
1175 "flavor": [],
1176 "action_id": nslcmop_id,
1177 }
1178 desc = await self.RO.deploy(nsr_id, target)
1179 action_id = desc["action_id"]
1180 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1181 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1182 self.logger.debug(
1183 logging_text
1184 + "ns terminate action at RO. action_id={}".format(action_id)
1185 )
1186
1187 # wait until done
1188 delete_timeout = 20 * 60 # 20 minutes
1189 await self._wait_ng_ro(
1190 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1191 )
1192
1193 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1194 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1195 # delete all nsr
1196 await self.RO.delete(nsr_id)
1197 except Exception as e:
1198 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1199 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1200 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1201 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1202 self.logger.debug(
1203 logging_text + "RO_action_id={} already deleted".format(action_id)
1204 )
1205 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1206 failed_detail.append("delete conflict: {}".format(e))
1207 self.logger.debug(
1208 logging_text
1209 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1210 )
1211 else:
1212 failed_detail.append("delete error: {}".format(e))
1213 self.logger.error(
1214 logging_text
1215 + "RO_action_id={} delete error: {}".format(action_id, e)
1216 )
1217
1218 if failed_detail:
1219 stage[2] = "Error deleting from VIM"
1220 else:
1221 stage[2] = "Deleted from VIM"
1222 db_nsr_update["detailed-status"] = " ".join(stage)
1223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1224 self._write_op_status(nslcmop_id, stage)
1225
1226 if failed_detail:
1227 raise LcmException("; ".join(failed_detail))
1228 return
1229
1230 async def instantiate_RO(
1231 self,
1232 logging_text,
1233 nsr_id,
1234 nsd,
1235 db_nsr,
1236 db_nslcmop,
1237 db_vnfrs,
1238 db_vnfds,
1239 n2vc_key_list,
1240 stage,
1241 ):
1242 """
1243 Instantiate at RO
1244 :param logging_text: preffix text to use at logging
1245 :param nsr_id: nsr identity
1246 :param nsd: database content of ns descriptor
1247 :param db_nsr: database content of ns record
1248 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1249 :param db_vnfrs:
1250 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1251 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1252 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1253 :return: None or exception
1254 """
1255 try:
1256 start_deploy = time()
1257 ns_params = db_nslcmop.get("operationParams")
1258 if ns_params and ns_params.get("timeout_ns_deploy"):
1259 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1260 else:
1261 timeout_ns_deploy = self.timeout.get(
1262 "ns_deploy", self.timeout_ns_deploy
1263 )
1264
1265 # Check for and optionally request placement optimization. Database will be updated if placement activated
1266 stage[2] = "Waiting for Placement."
1267 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1268 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1269 for vnfr in db_vnfrs.values():
1270 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1271 break
1272 else:
1273 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1274
1275 return await self._instantiate_ng_ro(
1276 logging_text,
1277 nsr_id,
1278 nsd,
1279 db_nsr,
1280 db_nslcmop,
1281 db_vnfrs,
1282 db_vnfds,
1283 n2vc_key_list,
1284 stage,
1285 start_deploy,
1286 timeout_ns_deploy,
1287 )
1288 except Exception as e:
1289 stage[2] = "ERROR deploying at VIM"
1290 self.set_vnfr_at_error(db_vnfrs, str(e))
1291 self.logger.error(
1292 "Error deploying at VIM {}".format(e),
1293 exc_info=not isinstance(
1294 e,
1295 (
1296 ROclient.ROClientException,
1297 LcmException,
1298 DbException,
1299 NgRoException,
1300 ),
1301 ),
1302 )
1303 raise
1304
1305 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1306 """
1307 Wait for kdu to be up, get ip address
1308 :param logging_text: prefix use for logging
1309 :param nsr_id:
1310 :param vnfr_id:
1311 :param kdu_name:
1312 :return: IP address
1313 """
1314
1315 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1316 nb_tries = 0
1317
1318 while nb_tries < 360:
1319 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1320 kdur = next(
1321 (
1322 x
1323 for x in get_iterable(db_vnfr, "kdur")
1324 if x.get("kdu-name") == kdu_name
1325 ),
1326 None,
1327 )
1328 if not kdur:
1329 raise LcmException(
1330 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1331 )
1332 if kdur.get("status"):
1333 if kdur["status"] in ("READY", "ENABLED"):
1334 return kdur.get("ip-address")
1335 else:
1336 raise LcmException(
1337 "target KDU={} is in error state".format(kdu_name)
1338 )
1339
1340 await asyncio.sleep(10, loop=self.loop)
1341 nb_tries += 1
1342 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1343
1344 async def wait_vm_up_insert_key_ro(
1345 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1346 ):
1347 """
1348 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1349 :param logging_text: prefix use for logging
1350 :param nsr_id:
1351 :param vnfr_id:
1352 :param vdu_id:
1353 :param vdu_index:
1354 :param pub_key: public ssh key to inject, None to skip
1355 :param user: user to apply the public ssh key
1356 :return: IP address
1357 """
1358
1359 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1360 ro_nsr_id = None
1361 ip_address = None
1362 nb_tries = 0
1363 target_vdu_id = None
1364 ro_retries = 0
1365
1366 while True:
1367
1368 ro_retries += 1
1369 if ro_retries >= 360: # 1 hour
1370 raise LcmException(
1371 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1372 )
1373
1374 await asyncio.sleep(10, loop=self.loop)
1375
1376 # get ip address
1377 if not target_vdu_id:
1378 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1379
1380 if not vdu_id: # for the VNF case
1381 if db_vnfr.get("status") == "ERROR":
1382 raise LcmException(
1383 "Cannot inject ssh-key because target VNF is in error state"
1384 )
1385 ip_address = db_vnfr.get("ip-address")
1386 if not ip_address:
1387 continue
1388 vdur = next(
1389 (
1390 x
1391 for x in get_iterable(db_vnfr, "vdur")
1392 if x.get("ip-address") == ip_address
1393 ),
1394 None,
1395 )
1396 else: # VDU case
1397 vdur = next(
1398 (
1399 x
1400 for x in get_iterable(db_vnfr, "vdur")
1401 if x.get("vdu-id-ref") == vdu_id
1402 and x.get("count-index") == vdu_index
1403 ),
1404 None,
1405 )
1406
1407 if (
1408 not vdur and len(db_vnfr.get("vdur", ())) == 1
1409 ): # If only one, this should be the target vdu
1410 vdur = db_vnfr["vdur"][0]
1411 if not vdur:
1412 raise LcmException(
1413 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1414 vnfr_id, vdu_id, vdu_index
1415 )
1416 )
1417 # New generation RO stores information at "vim_info"
1418 ng_ro_status = None
1419 target_vim = None
1420 if vdur.get("vim_info"):
1421 target_vim = next(
1422 t for t in vdur["vim_info"]
1423 ) # there should be only one key
1424 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1425 if (
1426 vdur.get("pdu-type")
1427 or vdur.get("status") == "ACTIVE"
1428 or ng_ro_status == "ACTIVE"
1429 ):
1430 ip_address = vdur.get("ip-address")
1431 if not ip_address:
1432 continue
1433 target_vdu_id = vdur["vdu-id-ref"]
1434 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1435 raise LcmException(
1436 "Cannot inject ssh-key because target VM is in error state"
1437 )
1438
1439 if not target_vdu_id:
1440 continue
1441
1442 # inject public key into machine
1443 if pub_key and user:
1444 self.logger.debug(logging_text + "Inserting RO key")
1445 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1446 if vdur.get("pdu-type"):
1447 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1448 return ip_address
1449 try:
1450 ro_vm_id = "{}-{}".format(
1451 db_vnfr["member-vnf-index-ref"], target_vdu_id
1452 ) # TODO add vdu_index
1453 if self.ng_ro:
1454 target = {
1455 "action": {
1456 "action": "inject_ssh_key",
1457 "key": pub_key,
1458 "user": user,
1459 },
1460 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1461 }
1462 desc = await self.RO.deploy(nsr_id, target)
1463 action_id = desc["action_id"]
1464 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1465 break
1466 else:
1467 # wait until NS is deployed at RO
1468 if not ro_nsr_id:
1469 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1470 ro_nsr_id = deep_get(
1471 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1472 )
1473 if not ro_nsr_id:
1474 continue
1475 result_dict = await self.RO.create_action(
1476 item="ns",
1477 item_id_name=ro_nsr_id,
1478 descriptor={
1479 "add_public_key": pub_key,
1480 "vms": [ro_vm_id],
1481 "user": user,
1482 },
1483 )
1484 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1485 if not result_dict or not isinstance(result_dict, dict):
1486 raise LcmException(
1487 "Unknown response from RO when injecting key"
1488 )
1489 for result in result_dict.values():
1490 if result.get("vim_result") == 200:
1491 break
1492 else:
1493 raise ROclient.ROClientException(
1494 "error injecting key: {}".format(
1495 result.get("description")
1496 )
1497 )
1498 break
1499 except NgRoException as e:
1500 raise LcmException(
1501 "Reaching max tries injecting key. Error: {}".format(e)
1502 )
1503 except ROclient.ROClientException as e:
1504 if not nb_tries:
1505 self.logger.debug(
1506 logging_text
1507 + "error injecting key: {}. Retrying until {} seconds".format(
1508 e, 20 * 10
1509 )
1510 )
1511 nb_tries += 1
1512 if nb_tries >= 20:
1513 raise LcmException(
1514 "Reaching max tries injecting key. Error: {}".format(e)
1515 )
1516 else:
1517 break
1518
1519 return ip_address
1520
1521 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1522 """
1523 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1524 """
1525 my_vca = vca_deployed_list[vca_index]
1526 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1527 # vdu or kdu: no dependencies
1528 return
1529 timeout = 300
1530 while timeout >= 0:
1531 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1532 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1533 configuration_status_list = db_nsr["configurationStatus"]
1534 for index, vca_deployed in enumerate(configuration_status_list):
1535 if index == vca_index:
1536 # myself
1537 continue
1538 if not my_vca.get("member-vnf-index") or (
1539 vca_deployed.get("member-vnf-index")
1540 == my_vca.get("member-vnf-index")
1541 ):
1542 internal_status = configuration_status_list[index].get("status")
1543 if internal_status == "READY":
1544 continue
1545 elif internal_status == "BROKEN":
1546 raise LcmException(
1547 "Configuration aborted because dependent charm/s has failed"
1548 )
1549 else:
1550 break
1551 else:
1552 # no dependencies, return
1553 return
1554 await asyncio.sleep(10)
1555 timeout -= 1
1556
1557 raise LcmException("Configuration aborted because dependent charm/s timeout")
1558
1559 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1560 vca_id = None
1561 if db_vnfr:
1562 vca_id = deep_get(db_vnfr, ("vca-id",))
1563 elif db_nsr:
1564 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1565 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1566 return vca_id
1567
1568 async def instantiate_N2VC(
1569 self,
1570 logging_text,
1571 vca_index,
1572 nsi_id,
1573 db_nsr,
1574 db_vnfr,
1575 vdu_id,
1576 kdu_name,
1577 vdu_index,
1578 config_descriptor,
1579 deploy_params,
1580 base_folder,
1581 nslcmop_id,
1582 stage,
1583 vca_type,
1584 vca_name,
1585 ee_config_descriptor,
1586 ):
1587 nsr_id = db_nsr["_id"]
1588 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1589 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1590 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1591 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1592 db_dict = {
1593 "collection": "nsrs",
1594 "filter": {"_id": nsr_id},
1595 "path": db_update_entry,
1596 }
1597 step = ""
1598 try:
1599
1600 element_type = "NS"
1601 element_under_configuration = nsr_id
1602
1603 vnfr_id = None
1604 if db_vnfr:
1605 vnfr_id = db_vnfr["_id"]
1606 osm_config["osm"]["vnf_id"] = vnfr_id
1607
1608 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1609
1610 if vca_type == "native_charm":
1611 index_number = 0
1612 else:
1613 index_number = vdu_index or 0
1614
1615 if vnfr_id:
1616 element_type = "VNF"
1617 element_under_configuration = vnfr_id
1618 namespace += ".{}-{}".format(vnfr_id, index_number)
1619 if vdu_id:
1620 namespace += ".{}-{}".format(vdu_id, index_number)
1621 element_type = "VDU"
1622 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1623 osm_config["osm"]["vdu_id"] = vdu_id
1624 elif kdu_name:
1625 namespace += ".{}".format(kdu_name)
1626 element_type = "KDU"
1627 element_under_configuration = kdu_name
1628 osm_config["osm"]["kdu_name"] = kdu_name
1629
1630 # Get artifact path
1631 artifact_path = "{}/{}/{}/{}".format(
1632 base_folder["folder"],
1633 base_folder["pkg-dir"],
1634 "charms"
1635 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1636 else "helm-charts",
1637 vca_name,
1638 )
1639
1640 self.logger.debug("Artifact path > {}".format(artifact_path))
1641
1642 # get initial_config_primitive_list that applies to this element
1643 initial_config_primitive_list = config_descriptor.get(
1644 "initial-config-primitive"
1645 )
1646
1647 self.logger.debug(
1648 "Initial config primitive list > {}".format(
1649 initial_config_primitive_list
1650 )
1651 )
1652
1653 # add config if not present for NS charm
1654 ee_descriptor_id = ee_config_descriptor.get("id")
1655 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1656 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1657 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1658 )
1659
1660 self.logger.debug(
1661 "Initial config primitive list #2 > {}".format(
1662 initial_config_primitive_list
1663 )
1664 )
1665 # n2vc_redesign STEP 3.1
1666 # find old ee_id if exists
1667 ee_id = vca_deployed.get("ee_id")
1668
1669 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1670 # create or register execution environment in VCA
1671 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1672
1673 self._write_configuration_status(
1674 nsr_id=nsr_id,
1675 vca_index=vca_index,
1676 status="CREATING",
1677 element_under_configuration=element_under_configuration,
1678 element_type=element_type,
1679 )
1680
1681 step = "create execution environment"
1682 self.logger.debug(logging_text + step)
1683
1684 ee_id = None
1685 credentials = None
1686 if vca_type == "k8s_proxy_charm":
1687 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1688 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1689 namespace=namespace,
1690 artifact_path=artifact_path,
1691 db_dict=db_dict,
1692 vca_id=vca_id,
1693 )
1694 elif vca_type == "helm" or vca_type == "helm-v3":
1695 ee_id, credentials = await self.vca_map[
1696 vca_type
1697 ].create_execution_environment(
1698 namespace=namespace,
1699 reuse_ee_id=ee_id,
1700 db_dict=db_dict,
1701 config=osm_config,
1702 artifact_path=artifact_path,
1703 vca_type=vca_type,
1704 )
1705 else:
1706 ee_id, credentials = await self.vca_map[
1707 vca_type
1708 ].create_execution_environment(
1709 namespace=namespace,
1710 reuse_ee_id=ee_id,
1711 db_dict=db_dict,
1712 vca_id=vca_id,
1713 )
1714
1715 elif vca_type == "native_charm":
1716 step = "Waiting to VM being up and getting IP address"
1717 self.logger.debug(logging_text + step)
1718 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1719 logging_text,
1720 nsr_id,
1721 vnfr_id,
1722 vdu_id,
1723 vdu_index,
1724 user=None,
1725 pub_key=None,
1726 )
1727 credentials = {"hostname": rw_mgmt_ip}
1728 # get username
1729 username = deep_get(
1730 config_descriptor, ("config-access", "ssh-access", "default-user")
1731 )
1732 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1733 # merged. Meanwhile let's get username from initial-config-primitive
1734 if not username and initial_config_primitive_list:
1735 for config_primitive in initial_config_primitive_list:
1736 for param in config_primitive.get("parameter", ()):
1737 if param["name"] == "ssh-username":
1738 username = param["value"]
1739 break
1740 if not username:
1741 raise LcmException(
1742 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1743 "'config-access.ssh-access.default-user'"
1744 )
1745 credentials["username"] = username
1746 # n2vc_redesign STEP 3.2
1747
1748 self._write_configuration_status(
1749 nsr_id=nsr_id,
1750 vca_index=vca_index,
1751 status="REGISTERING",
1752 element_under_configuration=element_under_configuration,
1753 element_type=element_type,
1754 )
1755
1756 step = "register execution environment {}".format(credentials)
1757 self.logger.debug(logging_text + step)
1758 ee_id = await self.vca_map[vca_type].register_execution_environment(
1759 credentials=credentials,
1760 namespace=namespace,
1761 db_dict=db_dict,
1762 vca_id=vca_id,
1763 )
1764
1765 # for compatibility with MON/POL modules, the need model and application name at database
1766 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1767 ee_id_parts = ee_id.split(".")
1768 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1769 if len(ee_id_parts) >= 2:
1770 model_name = ee_id_parts[0]
1771 application_name = ee_id_parts[1]
1772 db_nsr_update[db_update_entry + "model"] = model_name
1773 db_nsr_update[db_update_entry + "application"] = application_name
1774
1775 # n2vc_redesign STEP 3.3
1776 step = "Install configuration Software"
1777
1778 self._write_configuration_status(
1779 nsr_id=nsr_id,
1780 vca_index=vca_index,
1781 status="INSTALLING SW",
1782 element_under_configuration=element_under_configuration,
1783 element_type=element_type,
1784 other_update=db_nsr_update,
1785 )
1786
1787 # TODO check if already done
1788 self.logger.debug(logging_text + step)
1789 config = None
1790 if vca_type == "native_charm":
1791 config_primitive = next(
1792 (p for p in initial_config_primitive_list if p["name"] == "config"),
1793 None,
1794 )
1795 if config_primitive:
1796 config = self._map_primitive_params(
1797 config_primitive, {}, deploy_params
1798 )
1799 num_units = 1
1800 if vca_type == "lxc_proxy_charm":
1801 if element_type == "NS":
1802 num_units = db_nsr.get("config-units") or 1
1803 elif element_type == "VNF":
1804 num_units = db_vnfr.get("config-units") or 1
1805 elif element_type == "VDU":
1806 for v in db_vnfr["vdur"]:
1807 if vdu_id == v["vdu-id-ref"]:
1808 num_units = v.get("config-units") or 1
1809 break
1810 if vca_type != "k8s_proxy_charm":
1811 await self.vca_map[vca_type].install_configuration_sw(
1812 ee_id=ee_id,
1813 artifact_path=artifact_path,
1814 db_dict=db_dict,
1815 config=config,
1816 num_units=num_units,
1817 vca_id=vca_id,
1818 vca_type=vca_type,
1819 )
1820
1821 # write in db flag of configuration_sw already installed
1822 self.update_db_2(
1823 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1824 )
1825
1826 # add relations for this VCA (wait for other peers related with this VCA)
1827 await self._add_vca_relations(
1828 logging_text=logging_text,
1829 nsr_id=nsr_id,
1830 vca_index=vca_index,
1831 vca_id=vca_id,
1832 vca_type=vca_type,
1833 )
1834
1835 # if SSH access is required, then get execution environment SSH public
1836 # if native charm we have waited already to VM be UP
1837 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1838 pub_key = None
1839 user = None
1840 # self.logger.debug("get ssh key block")
1841 if deep_get(
1842 config_descriptor, ("config-access", "ssh-access", "required")
1843 ):
1844 # self.logger.debug("ssh key needed")
1845 # Needed to inject a ssh key
1846 user = deep_get(
1847 config_descriptor,
1848 ("config-access", "ssh-access", "default-user"),
1849 )
1850 step = "Install configuration Software, getting public ssh key"
1851 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1852 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1853 )
1854
1855 step = "Insert public key into VM user={} ssh_key={}".format(
1856 user, pub_key
1857 )
1858 else:
1859 # self.logger.debug("no need to get ssh key")
1860 step = "Waiting to VM being up and getting IP address"
1861 self.logger.debug(logging_text + step)
1862
1863 # n2vc_redesign STEP 5.1
1864 # wait for RO (ip-address) Insert pub_key into VM
1865 if vnfr_id:
1866 if kdu_name:
1867 rw_mgmt_ip = await self.wait_kdu_up(
1868 logging_text, nsr_id, vnfr_id, kdu_name
1869 )
1870 else:
1871 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1872 logging_text,
1873 nsr_id,
1874 vnfr_id,
1875 vdu_id,
1876 vdu_index,
1877 user=user,
1878 pub_key=pub_key,
1879 )
1880 else:
1881 rw_mgmt_ip = None # This is for a NS configuration
1882
1883 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1884
1885 # store rw_mgmt_ip in deploy params for later replacement
1886 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1887
1888 # n2vc_redesign STEP 6 Execute initial config primitive
1889 step = "execute initial config primitive"
1890
1891 # wait for dependent primitives execution (NS -> VNF -> VDU)
1892 if initial_config_primitive_list:
1893 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1894
1895 # stage, in function of element type: vdu, kdu, vnf or ns
1896 my_vca = vca_deployed_list[vca_index]
1897 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1898 # VDU or KDU
1899 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1900 elif my_vca.get("member-vnf-index"):
1901 # VNF
1902 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1903 else:
1904 # NS
1905 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1906
1907 self._write_configuration_status(
1908 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1909 )
1910
1911 self._write_op_status(op_id=nslcmop_id, stage=stage)
1912
1913 check_if_terminated_needed = True
1914 for initial_config_primitive in initial_config_primitive_list:
1915 # adding information on the vca_deployed if it is a NS execution environment
1916 if not vca_deployed["member-vnf-index"]:
1917 deploy_params["ns_config_info"] = json.dumps(
1918 self._get_ns_config_info(nsr_id)
1919 )
1920 # TODO check if already done
1921 primitive_params_ = self._map_primitive_params(
1922 initial_config_primitive, {}, deploy_params
1923 )
1924
1925 step = "execute primitive '{}' params '{}'".format(
1926 initial_config_primitive["name"], primitive_params_
1927 )
1928 self.logger.debug(logging_text + step)
1929 await self.vca_map[vca_type].exec_primitive(
1930 ee_id=ee_id,
1931 primitive_name=initial_config_primitive["name"],
1932 params_dict=primitive_params_,
1933 db_dict=db_dict,
1934 vca_id=vca_id,
1935 vca_type=vca_type,
1936 )
1937 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1938 if check_if_terminated_needed:
1939 if config_descriptor.get("terminate-config-primitive"):
1940 self.update_db_2(
1941 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1942 )
1943 check_if_terminated_needed = False
1944
1945 # TODO register in database that primitive is done
1946
1947 # STEP 7 Configure metrics
1948 if vca_type == "helm" or vca_type == "helm-v3":
1949 prometheus_jobs = await self.add_prometheus_metrics(
1950 ee_id=ee_id,
1951 artifact_path=artifact_path,
1952 ee_config_descriptor=ee_config_descriptor,
1953 vnfr_id=vnfr_id,
1954 nsr_id=nsr_id,
1955 target_ip=rw_mgmt_ip,
1956 )
1957 if prometheus_jobs:
1958 self.update_db_2(
1959 "nsrs",
1960 nsr_id,
1961 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1962 )
1963
1964 step = "instantiated at VCA"
1965 self.logger.debug(logging_text + step)
1966
1967 self._write_configuration_status(
1968 nsr_id=nsr_id, vca_index=vca_index, status="READY"
1969 )
1970
1971 except Exception as e: # TODO not use Exception but N2VC exception
1972 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1973 if not isinstance(
1974 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
1975 ):
1976 self.logger.error(
1977 "Exception while {} : {}".format(step, e), exc_info=True
1978 )
1979 self._write_configuration_status(
1980 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
1981 )
1982 raise LcmException("{} {}".format(step, e)) from e
1983
1984 def _write_ns_status(
1985 self,
1986 nsr_id: str,
1987 ns_state: str,
1988 current_operation: str,
1989 current_operation_id: str,
1990 error_description: str = None,
1991 error_detail: str = None,
1992 other_update: dict = None,
1993 ):
1994 """
1995 Update db_nsr fields.
1996 :param nsr_id:
1997 :param ns_state:
1998 :param current_operation:
1999 :param current_operation_id:
2000 :param error_description:
2001 :param error_detail:
2002 :param other_update: Other required changes at database if provided, will be cleared
2003 :return:
2004 """
2005 try:
2006 db_dict = other_update or {}
2007 db_dict[
2008 "_admin.nslcmop"
2009 ] = current_operation_id # for backward compatibility
2010 db_dict["_admin.current-operation"] = current_operation_id
2011 db_dict["_admin.operation-type"] = (
2012 current_operation if current_operation != "IDLE" else None
2013 )
2014 db_dict["currentOperation"] = current_operation
2015 db_dict["currentOperationID"] = current_operation_id
2016 db_dict["errorDescription"] = error_description
2017 db_dict["errorDetail"] = error_detail
2018
2019 if ns_state:
2020 db_dict["nsState"] = ns_state
2021 self.update_db_2("nsrs", nsr_id, db_dict)
2022 except DbException as e:
2023 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2024
2025 def _write_op_status(
2026 self,
2027 op_id: str,
2028 stage: list = None,
2029 error_message: str = None,
2030 queuePosition: int = 0,
2031 operation_state: str = None,
2032 other_update: dict = None,
2033 ):
2034 try:
2035 db_dict = other_update or {}
2036 db_dict["queuePosition"] = queuePosition
2037 if isinstance(stage, list):
2038 db_dict["stage"] = stage[0]
2039 db_dict["detailed-status"] = " ".join(stage)
2040 elif stage is not None:
2041 db_dict["stage"] = str(stage)
2042
2043 if error_message is not None:
2044 db_dict["errorMessage"] = error_message
2045 if operation_state is not None:
2046 db_dict["operationState"] = operation_state
2047 db_dict["statusEnteredTime"] = time()
2048 self.update_db_2("nslcmops", op_id, db_dict)
2049 except DbException as e:
2050 self.logger.warn(
2051 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2052 )
2053
2054 def _write_all_config_status(self, db_nsr: dict, status: str):
2055 try:
2056 nsr_id = db_nsr["_id"]
2057 # configurationStatus
2058 config_status = db_nsr.get("configurationStatus")
2059 if config_status:
2060 db_nsr_update = {
2061 "configurationStatus.{}.status".format(index): status
2062 for index, v in enumerate(config_status)
2063 if v
2064 }
2065 # update status
2066 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2067
2068 except DbException as e:
2069 self.logger.warn(
2070 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2071 )
2072
2073 def _write_configuration_status(
2074 self,
2075 nsr_id: str,
2076 vca_index: int,
2077 status: str = None,
2078 element_under_configuration: str = None,
2079 element_type: str = None,
2080 other_update: dict = None,
2081 ):
2082
2083 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2084 # .format(vca_index, status))
2085
2086 try:
2087 db_path = "configurationStatus.{}.".format(vca_index)
2088 db_dict = other_update or {}
2089 if status:
2090 db_dict[db_path + "status"] = status
2091 if element_under_configuration:
2092 db_dict[
2093 db_path + "elementUnderConfiguration"
2094 ] = element_under_configuration
2095 if element_type:
2096 db_dict[db_path + "elementType"] = element_type
2097 self.update_db_2("nsrs", nsr_id, db_dict)
2098 except DbException as e:
2099 self.logger.warn(
2100 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2101 status, nsr_id, vca_index, e
2102 )
2103 )
2104
2105 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2106 """
2107 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2108 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2109 Database is used because the result can be obtained from a different LCM worker in case of HA.
2110 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2111 :param db_nslcmop: database content of nslcmop
2112 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2113 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2114 computed 'vim-account-id'
2115 """
2116 modified = False
2117 nslcmop_id = db_nslcmop["_id"]
2118 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2119 if placement_engine == "PLA":
2120 self.logger.debug(
2121 logging_text + "Invoke and wait for placement optimization"
2122 )
2123 await self.msg.aiowrite(
2124 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2125 )
2126 db_poll_interval = 5
2127 wait = db_poll_interval * 10
2128 pla_result = None
2129 while not pla_result and wait >= 0:
2130 await asyncio.sleep(db_poll_interval)
2131 wait -= db_poll_interval
2132 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2133 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2134
2135 if not pla_result:
2136 raise LcmException(
2137 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2138 )
2139
2140 for pla_vnf in pla_result["vnf"]:
2141 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2142 if not pla_vnf.get("vimAccountId") or not vnfr:
2143 continue
2144 modified = True
2145 self.db.set_one(
2146 "vnfrs",
2147 {"_id": vnfr["_id"]},
2148 {"vim-account-id": pla_vnf["vimAccountId"]},
2149 )
2150 # Modifies db_vnfrs
2151 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2152 return modified
2153
2154 def update_nsrs_with_pla_result(self, params):
2155 try:
2156 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2157 self.update_db_2(
2158 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2159 )
2160 except Exception as e:
2161 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2162
2163 async def instantiate(self, nsr_id, nslcmop_id):
2164 """
2165
2166 :param nsr_id: ns instance to deploy
2167 :param nslcmop_id: operation to run
2168 :return:
2169 """
2170
2171 # Try to lock HA task here
2172 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2173 if not task_is_locked_by_me:
2174 self.logger.debug(
2175 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2176 )
2177 return
2178
2179 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2180 self.logger.debug(logging_text + "Enter")
2181
2182 # get all needed from database
2183
2184 # database nsrs record
2185 db_nsr = None
2186
2187 # database nslcmops record
2188 db_nslcmop = None
2189
2190 # update operation on nsrs
2191 db_nsr_update = {}
2192 # update operation on nslcmops
2193 db_nslcmop_update = {}
2194
2195 nslcmop_operation_state = None
2196 db_vnfrs = {} # vnf's info indexed by member-index
2197 # n2vc_info = {}
2198 tasks_dict_info = {} # from task to info text
2199 exc = None
2200 error_list = []
2201 stage = [
2202 "Stage 1/5: preparation of the environment.",
2203 "Waiting for previous operations to terminate.",
2204 "",
2205 ]
2206 # ^ stage, step, VIM progress
2207 try:
2208 # wait for any previous tasks in process
2209 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2210
2211 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2212 stage[1] = "Reading from database."
2213 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2214 db_nsr_update["detailed-status"] = "creating"
2215 db_nsr_update["operational-status"] = "init"
2216 self._write_ns_status(
2217 nsr_id=nsr_id,
2218 ns_state="BUILDING",
2219 current_operation="INSTANTIATING",
2220 current_operation_id=nslcmop_id,
2221 other_update=db_nsr_update,
2222 )
2223 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2224
2225 # read from db: operation
2226 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2227 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2228 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2229 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2230 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2231 )
2232 ns_params = db_nslcmop.get("operationParams")
2233 if ns_params and ns_params.get("timeout_ns_deploy"):
2234 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2235 else:
2236 timeout_ns_deploy = self.timeout.get(
2237 "ns_deploy", self.timeout_ns_deploy
2238 )
2239
2240 # read from db: ns
2241 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2242 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2243 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2244 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2245 self.fs.sync(db_nsr["nsd-id"])
2246 db_nsr["nsd"] = nsd
2247 # nsr_name = db_nsr["name"] # TODO short-name??
2248
2249 # read from db: vnf's of this ns
2250 stage[1] = "Getting vnfrs from db."
2251 self.logger.debug(logging_text + stage[1])
2252 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2253
2254 # read from db: vnfd's for every vnf
2255 db_vnfds = [] # every vnfd data
2256
2257 # for each vnf in ns, read vnfd
2258 for vnfr in db_vnfrs_list:
2259 if vnfr.get("kdur"):
2260 kdur_list = []
2261 for kdur in vnfr["kdur"]:
2262 if kdur.get("additionalParams"):
2263 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
2264 kdur_list.append(kdur)
2265 vnfr["kdur"] = kdur_list
2266
2267 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2268 vnfd_id = vnfr["vnfd-id"]
2269 vnfd_ref = vnfr["vnfd-ref"]
2270 self.fs.sync(vnfd_id)
2271
2272 # if we haven't this vnfd, read it from db
2273 if vnfd_id not in db_vnfds:
2274 # read from db
2275 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2276 vnfd_id, vnfd_ref
2277 )
2278 self.logger.debug(logging_text + stage[1])
2279 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2280
2281 # store vnfd
2282 db_vnfds.append(vnfd)
2283
2284 # Get or generates the _admin.deployed.VCA list
2285 vca_deployed_list = None
2286 if db_nsr["_admin"].get("deployed"):
2287 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2288 if vca_deployed_list is None:
2289 vca_deployed_list = []
2290 configuration_status_list = []
2291 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2292 db_nsr_update["configurationStatus"] = configuration_status_list
2293 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2294 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2295 elif isinstance(vca_deployed_list, dict):
2296 # maintain backward compatibility. Change a dict to list at database
2297 vca_deployed_list = list(vca_deployed_list.values())
2298 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2299 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2300
2301 if not isinstance(
2302 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2303 ):
2304 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2305 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2306
2307 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2308 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2309 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2310 self.db.set_list(
2311 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2312 )
2313
2314 # n2vc_redesign STEP 2 Deploy Network Scenario
2315 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2316 self._write_op_status(op_id=nslcmop_id, stage=stage)
2317
2318 stage[1] = "Deploying KDUs."
2319 # self.logger.debug(logging_text + "Before deploy_kdus")
2320 # Call to deploy_kdus in case exists the "vdu:kdu" param
2321 await self.deploy_kdus(
2322 logging_text=logging_text,
2323 nsr_id=nsr_id,
2324 nslcmop_id=nslcmop_id,
2325 db_vnfrs=db_vnfrs,
2326 db_vnfds=db_vnfds,
2327 task_instantiation_info=tasks_dict_info,
2328 )
2329
2330 stage[1] = "Getting VCA public key."
2331 # n2vc_redesign STEP 1 Get VCA public ssh-key
2332 # feature 1429. Add n2vc public key to needed VMs
2333 n2vc_key = self.n2vc.get_public_key()
2334 n2vc_key_list = [n2vc_key]
2335 if self.vca_config.get("public_key"):
2336 n2vc_key_list.append(self.vca_config["public_key"])
2337
2338 stage[1] = "Deploying NS at VIM."
2339 task_ro = asyncio.ensure_future(
2340 self.instantiate_RO(
2341 logging_text=logging_text,
2342 nsr_id=nsr_id,
2343 nsd=nsd,
2344 db_nsr=db_nsr,
2345 db_nslcmop=db_nslcmop,
2346 db_vnfrs=db_vnfrs,
2347 db_vnfds=db_vnfds,
2348 n2vc_key_list=n2vc_key_list,
2349 stage=stage,
2350 )
2351 )
2352 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2353 tasks_dict_info[task_ro] = "Deploying at VIM"
2354
2355 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2356 stage[1] = "Deploying Execution Environments."
2357 self.logger.debug(logging_text + stage[1])
2358
2359 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2360 for vnf_profile in get_vnf_profiles(nsd):
2361 vnfd_id = vnf_profile["vnfd-id"]
2362 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2363 member_vnf_index = str(vnf_profile["id"])
2364 db_vnfr = db_vnfrs[member_vnf_index]
2365 base_folder = vnfd["_admin"]["storage"]
2366 vdu_id = None
2367 vdu_index = 0
2368 vdu_name = None
2369 kdu_name = None
2370
2371 # Get additional parameters
2372 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2373 if db_vnfr.get("additionalParamsForVnf"):
2374 deploy_params.update(
2375 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2376 )
2377
2378 descriptor_config = get_configuration(vnfd, vnfd["id"])
2379 if descriptor_config:
2380 self._deploy_n2vc(
2381 logging_text=logging_text
2382 + "member_vnf_index={} ".format(member_vnf_index),
2383 db_nsr=db_nsr,
2384 db_vnfr=db_vnfr,
2385 nslcmop_id=nslcmop_id,
2386 nsr_id=nsr_id,
2387 nsi_id=nsi_id,
2388 vnfd_id=vnfd_id,
2389 vdu_id=vdu_id,
2390 kdu_name=kdu_name,
2391 member_vnf_index=member_vnf_index,
2392 vdu_index=vdu_index,
2393 vdu_name=vdu_name,
2394 deploy_params=deploy_params,
2395 descriptor_config=descriptor_config,
2396 base_folder=base_folder,
2397 task_instantiation_info=tasks_dict_info,
2398 stage=stage,
2399 )
2400
2401 # Deploy charms for each VDU that supports one.
2402 for vdud in get_vdu_list(vnfd):
2403 vdu_id = vdud["id"]
2404 descriptor_config = get_configuration(vnfd, vdu_id)
2405 vdur = find_in_list(
2406 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2407 )
2408
2409 if vdur.get("additionalParams"):
2410 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2411 else:
2412 deploy_params_vdu = deploy_params
2413 deploy_params_vdu["OSM"] = get_osm_params(
2414 db_vnfr, vdu_id, vdu_count_index=0
2415 )
2416 vdud_count = get_number_of_instances(vnfd, vdu_id)
2417
2418 self.logger.debug("VDUD > {}".format(vdud))
2419 self.logger.debug(
2420 "Descriptor config > {}".format(descriptor_config)
2421 )
2422 if descriptor_config:
2423 vdu_name = None
2424 kdu_name = None
2425 for vdu_index in range(vdud_count):
2426 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2427 self._deploy_n2vc(
2428 logging_text=logging_text
2429 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2430 member_vnf_index, vdu_id, vdu_index
2431 ),
2432 db_nsr=db_nsr,
2433 db_vnfr=db_vnfr,
2434 nslcmop_id=nslcmop_id,
2435 nsr_id=nsr_id,
2436 nsi_id=nsi_id,
2437 vnfd_id=vnfd_id,
2438 vdu_id=vdu_id,
2439 kdu_name=kdu_name,
2440 member_vnf_index=member_vnf_index,
2441 vdu_index=vdu_index,
2442 vdu_name=vdu_name,
2443 deploy_params=deploy_params_vdu,
2444 descriptor_config=descriptor_config,
2445 base_folder=base_folder,
2446 task_instantiation_info=tasks_dict_info,
2447 stage=stage,
2448 )
2449 for kdud in get_kdu_list(vnfd):
2450 kdu_name = kdud["name"]
2451 descriptor_config = get_configuration(vnfd, kdu_name)
2452 if descriptor_config:
2453 vdu_id = None
2454 vdu_index = 0
2455 vdu_name = None
2456 kdur = next(
2457 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2458 )
2459 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2460 if kdur.get("additionalParams"):
2461 deploy_params_kdu = parse_yaml_strings(
2462 kdur["additionalParams"]
2463 )
2464
2465 self._deploy_n2vc(
2466 logging_text=logging_text,
2467 db_nsr=db_nsr,
2468 db_vnfr=db_vnfr,
2469 nslcmop_id=nslcmop_id,
2470 nsr_id=nsr_id,
2471 nsi_id=nsi_id,
2472 vnfd_id=vnfd_id,
2473 vdu_id=vdu_id,
2474 kdu_name=kdu_name,
2475 member_vnf_index=member_vnf_index,
2476 vdu_index=vdu_index,
2477 vdu_name=vdu_name,
2478 deploy_params=deploy_params_kdu,
2479 descriptor_config=descriptor_config,
2480 base_folder=base_folder,
2481 task_instantiation_info=tasks_dict_info,
2482 stage=stage,
2483 )
2484
2485 # Check if this NS has a charm configuration
2486 descriptor_config = nsd.get("ns-configuration")
2487 if descriptor_config and descriptor_config.get("juju"):
2488 vnfd_id = None
2489 db_vnfr = None
2490 member_vnf_index = None
2491 vdu_id = None
2492 kdu_name = None
2493 vdu_index = 0
2494 vdu_name = None
2495
2496 # Get additional parameters
2497 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2498 if db_nsr.get("additionalParamsForNs"):
2499 deploy_params.update(
2500 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2501 )
2502 base_folder = nsd["_admin"]["storage"]
2503 self._deploy_n2vc(
2504 logging_text=logging_text,
2505 db_nsr=db_nsr,
2506 db_vnfr=db_vnfr,
2507 nslcmop_id=nslcmop_id,
2508 nsr_id=nsr_id,
2509 nsi_id=nsi_id,
2510 vnfd_id=vnfd_id,
2511 vdu_id=vdu_id,
2512 kdu_name=kdu_name,
2513 member_vnf_index=member_vnf_index,
2514 vdu_index=vdu_index,
2515 vdu_name=vdu_name,
2516 deploy_params=deploy_params,
2517 descriptor_config=descriptor_config,
2518 base_folder=base_folder,
2519 task_instantiation_info=tasks_dict_info,
2520 stage=stage,
2521 )
2522
2523 # rest of staff will be done at finally
2524
2525 except (
2526 ROclient.ROClientException,
2527 DbException,
2528 LcmException,
2529 N2VCException,
2530 ) as e:
2531 self.logger.error(
2532 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2533 )
2534 exc = e
2535 except asyncio.CancelledError:
2536 self.logger.error(
2537 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2538 )
2539 exc = "Operation was cancelled"
2540 except Exception as e:
2541 exc = traceback.format_exc()
2542 self.logger.critical(
2543 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2544 exc_info=True,
2545 )
2546 finally:
2547 if exc:
2548 error_list.append(str(exc))
2549 try:
2550 # wait for pending tasks
2551 if tasks_dict_info:
2552 stage[1] = "Waiting for instantiate pending tasks."
2553 self.logger.debug(logging_text + stage[1])
2554 error_list += await self._wait_for_tasks(
2555 logging_text,
2556 tasks_dict_info,
2557 timeout_ns_deploy,
2558 stage,
2559 nslcmop_id,
2560 nsr_id=nsr_id,
2561 )
2562 stage[1] = stage[2] = ""
2563 except asyncio.CancelledError:
2564 error_list.append("Cancelled")
2565 # TODO cancel all tasks
2566 except Exception as exc:
2567 error_list.append(str(exc))
2568
2569 # update operation-status
2570 db_nsr_update["operational-status"] = "running"
2571 # let's begin with VCA 'configured' status (later we can change it)
2572 db_nsr_update["config-status"] = "configured"
2573 for task, task_name in tasks_dict_info.items():
2574 if not task.done() or task.cancelled() or task.exception():
2575 if task_name.startswith(self.task_name_deploy_vca):
2576 # A N2VC task is pending
2577 db_nsr_update["config-status"] = "failed"
2578 else:
2579 # RO or KDU task is pending
2580 db_nsr_update["operational-status"] = "failed"
2581
2582 # update status at database
2583 if error_list:
2584 error_detail = ". ".join(error_list)
2585 self.logger.error(logging_text + error_detail)
2586 error_description_nslcmop = "{} Detail: {}".format(
2587 stage[0], error_detail
2588 )
2589 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2590 nslcmop_id, stage[0]
2591 )
2592
2593 db_nsr_update["detailed-status"] = (
2594 error_description_nsr + " Detail: " + error_detail
2595 )
2596 db_nslcmop_update["detailed-status"] = error_detail
2597 nslcmop_operation_state = "FAILED"
2598 ns_state = "BROKEN"
2599 else:
2600 error_detail = None
2601 error_description_nsr = error_description_nslcmop = None
2602 ns_state = "READY"
2603 db_nsr_update["detailed-status"] = "Done"
2604 db_nslcmop_update["detailed-status"] = "Done"
2605 nslcmop_operation_state = "COMPLETED"
2606
2607 if db_nsr:
2608 self._write_ns_status(
2609 nsr_id=nsr_id,
2610 ns_state=ns_state,
2611 current_operation="IDLE",
2612 current_operation_id=None,
2613 error_description=error_description_nsr,
2614 error_detail=error_detail,
2615 other_update=db_nsr_update,
2616 )
2617 self._write_op_status(
2618 op_id=nslcmop_id,
2619 stage="",
2620 error_message=error_description_nslcmop,
2621 operation_state=nslcmop_operation_state,
2622 other_update=db_nslcmop_update,
2623 )
2624
2625 if nslcmop_operation_state:
2626 try:
2627 await self.msg.aiowrite(
2628 "ns",
2629 "instantiated",
2630 {
2631 "nsr_id": nsr_id,
2632 "nslcmop_id": nslcmop_id,
2633 "operationState": nslcmop_operation_state,
2634 },
2635 loop=self.loop,
2636 )
2637 except Exception as e:
2638 self.logger.error(
2639 logging_text + "kafka_write notification Exception {}".format(e)
2640 )
2641
2642 self.logger.debug(logging_text + "Exit")
2643 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2644
2645 async def _add_vca_relations(
2646 self,
2647 logging_text,
2648 nsr_id,
2649 vca_index: int,
2650 timeout: int = 3600,
2651 vca_type: str = None,
2652 vca_id: str = None,
2653 ) -> bool:
2654
2655 # steps:
2656 # 1. find all relations for this VCA
2657 # 2. wait for other peers related
2658 # 3. add relations
2659
2660 try:
2661 vca_type = vca_type or "lxc_proxy_charm"
2662
2663 # STEP 1: find all relations for this VCA
2664
2665 # read nsr record
2666 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2667 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2668
2669 # this VCA data
2670 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2671
2672 # read all ns-configuration relations
2673 ns_relations = list()
2674 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2675 if db_ns_relations:
2676 for r in db_ns_relations:
2677 # check if this VCA is in the relation
2678 if my_vca.get("member-vnf-index") in (
2679 r.get("entities")[0].get("id"),
2680 r.get("entities")[1].get("id"),
2681 ):
2682 ns_relations.append(r)
2683
2684 # read all vnf-configuration relations
2685 vnf_relations = list()
2686 db_vnfd_list = db_nsr.get("vnfd-id")
2687 if db_vnfd_list:
2688 for vnfd in db_vnfd_list:
2689 db_vnf_relations = None
2690 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2691 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2692 if db_vnf_configuration:
2693 db_vnf_relations = db_vnf_configuration.get("relation", [])
2694 if db_vnf_relations:
2695 for r in db_vnf_relations:
2696 # check if this VCA is in the relation
2697 if my_vca.get("vdu_id") in (
2698 r.get("entities")[0].get("id"),
2699 r.get("entities")[1].get("id"),
2700 ):
2701 vnf_relations.append(r)
2702
2703 # if no relations, terminate
2704 if not ns_relations and not vnf_relations:
2705 self.logger.debug(logging_text + " No relations")
2706 return True
2707
2708 self.logger.debug(
2709 logging_text
2710 + " adding relations\n {}\n {}".format(
2711 ns_relations, vnf_relations
2712 )
2713 )
2714
2715 # add all relations
2716 start = time()
2717 while True:
2718 # check timeout
2719 now = time()
2720 if now - start >= timeout:
2721 self.logger.error(logging_text + " : timeout adding relations")
2722 return False
2723
2724 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2725 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2726
2727 # for each defined NS relation, find the VCA's related
2728 for r in ns_relations.copy():
2729 from_vca_ee_id = None
2730 to_vca_ee_id = None
2731 from_vca_endpoint = None
2732 to_vca_endpoint = None
2733 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2734 for vca in vca_list:
2735 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2736 "id"
2737 ) and vca.get("config_sw_installed"):
2738 from_vca_ee_id = vca.get("ee_id")
2739 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2740 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2741 "id"
2742 ) and vca.get("config_sw_installed"):
2743 to_vca_ee_id = vca.get("ee_id")
2744 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2745 if from_vca_ee_id and to_vca_ee_id:
2746 # add relation
2747 await self.vca_map[vca_type].add_relation(
2748 ee_id_1=from_vca_ee_id,
2749 ee_id_2=to_vca_ee_id,
2750 endpoint_1=from_vca_endpoint,
2751 endpoint_2=to_vca_endpoint,
2752 vca_id=vca_id,
2753 )
2754 # remove entry from relations list
2755 ns_relations.remove(r)
2756 else:
2757 # check failed peers
2758 try:
2759 vca_status_list = db_nsr.get("configurationStatus")
2760 if vca_status_list:
2761 for i in range(len(vca_list)):
2762 vca = vca_list[i]
2763 vca_status = vca_status_list[i]
2764 if vca.get("member-vnf-index") == r.get("entities")[
2765 0
2766 ].get("id"):
2767 if vca_status.get("status") == "BROKEN":
2768 # peer broken: remove relation from list
2769 ns_relations.remove(r)
2770 if vca.get("member-vnf-index") == r.get("entities")[
2771 1
2772 ].get("id"):
2773 if vca_status.get("status") == "BROKEN":
2774 # peer broken: remove relation from list
2775 ns_relations.remove(r)
2776 except Exception:
2777 # ignore
2778 pass
2779
2780 # for each defined VNF relation, find the VCA's related
2781 for r in vnf_relations.copy():
2782 from_vca_ee_id = None
2783 to_vca_ee_id = None
2784 from_vca_endpoint = None
2785 to_vca_endpoint = None
2786 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2787 for vca in vca_list:
2788 key_to_check = "vdu_id"
2789 if vca.get("vdu_id") is None:
2790 key_to_check = "vnfd_id"
2791 if vca.get(key_to_check) == r.get("entities")[0].get(
2792 "id"
2793 ) and vca.get("config_sw_installed"):
2794 from_vca_ee_id = vca.get("ee_id")
2795 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2796 if vca.get(key_to_check) == r.get("entities")[1].get(
2797 "id"
2798 ) and vca.get("config_sw_installed"):
2799 to_vca_ee_id = vca.get("ee_id")
2800 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2801 if from_vca_ee_id and to_vca_ee_id:
2802 # add relation
2803 await self.vca_map[vca_type].add_relation(
2804 ee_id_1=from_vca_ee_id,
2805 ee_id_2=to_vca_ee_id,
2806 endpoint_1=from_vca_endpoint,
2807 endpoint_2=to_vca_endpoint,
2808 vca_id=vca_id,
2809 )
2810 # remove entry from relations list
2811 vnf_relations.remove(r)
2812 else:
2813 # check failed peers
2814 try:
2815 vca_status_list = db_nsr.get("configurationStatus")
2816 if vca_status_list:
2817 for i in range(len(vca_list)):
2818 vca = vca_list[i]
2819 vca_status = vca_status_list[i]
2820 if vca.get("vdu_id") == r.get("entities")[0].get(
2821 "id"
2822 ):
2823 if vca_status.get("status") == "BROKEN":
2824 # peer broken: remove relation from list
2825 vnf_relations.remove(r)
2826 if vca.get("vdu_id") == r.get("entities")[1].get(
2827 "id"
2828 ):
2829 if vca_status.get("status") == "BROKEN":
2830 # peer broken: remove relation from list
2831 vnf_relations.remove(r)
2832 except Exception:
2833 # ignore
2834 pass
2835
2836 # wait for next try
2837 await asyncio.sleep(5.0)
2838
2839 if not ns_relations and not vnf_relations:
2840 self.logger.debug("Relations added")
2841 break
2842
2843 return True
2844
2845 except Exception as e:
2846 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2847 return False
2848
2849 async def _install_kdu(
2850 self,
2851 nsr_id: str,
2852 nsr_db_path: str,
2853 vnfr_data: dict,
2854 kdu_index: int,
2855 kdud: dict,
2856 vnfd: dict,
2857 k8s_instance_info: dict,
2858 k8params: dict = None,
2859 timeout: int = 600,
2860 vca_id: str = None,
2861 ):
2862
2863 try:
2864 k8sclustertype = k8s_instance_info["k8scluster-type"]
2865 # Instantiate kdu
2866 db_dict_install = {
2867 "collection": "nsrs",
2868 "filter": {"_id": nsr_id},
2869 "path": nsr_db_path,
2870 }
2871
2872 if k8s_instance_info.get("kdu-deployment-name"):
2873 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2874 else:
2875 kdu_instance = self.k8scluster_map[
2876 k8sclustertype
2877 ].generate_kdu_instance_name(
2878 db_dict=db_dict_install,
2879 kdu_model=k8s_instance_info["kdu-model"],
2880 kdu_name=k8s_instance_info["kdu-name"],
2881 )
2882 self.update_db_2(
2883 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2884 )
2885 await self.k8scluster_map[k8sclustertype].install(
2886 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2887 kdu_model=k8s_instance_info["kdu-model"],
2888 atomic=True,
2889 params=k8params,
2890 db_dict=db_dict_install,
2891 timeout=timeout,
2892 kdu_name=k8s_instance_info["kdu-name"],
2893 namespace=k8s_instance_info["namespace"],
2894 kdu_instance=kdu_instance,
2895 vca_id=vca_id,
2896 )
2897 self.update_db_2(
2898 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2899 )
2900
2901 # Obtain services to obtain management service ip
2902 services = await self.k8scluster_map[k8sclustertype].get_services(
2903 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2904 kdu_instance=kdu_instance,
2905 namespace=k8s_instance_info["namespace"],
2906 )
2907
2908 # Obtain management service info (if exists)
2909 vnfr_update_dict = {}
2910 kdu_config = get_configuration(vnfd, kdud["name"])
2911 if kdu_config:
2912 target_ee_list = kdu_config.get("execution-environment-list", [])
2913 else:
2914 target_ee_list = []
2915
2916 if services:
2917 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2918 mgmt_services = [
2919 service
2920 for service in kdud.get("service", [])
2921 if service.get("mgmt-service")
2922 ]
2923 for mgmt_service in mgmt_services:
2924 for service in services:
2925 if service["name"].startswith(mgmt_service["name"]):
2926 # Mgmt service found, Obtain service ip
2927 ip = service.get("external_ip", service.get("cluster_ip"))
2928 if isinstance(ip, list) and len(ip) == 1:
2929 ip = ip[0]
2930
2931 vnfr_update_dict[
2932 "kdur.{}.ip-address".format(kdu_index)
2933 ] = ip
2934
2935 # Check if must update also mgmt ip at the vnf
2936 service_external_cp = mgmt_service.get(
2937 "external-connection-point-ref"
2938 )
2939 if service_external_cp:
2940 if (
2941 deep_get(vnfd, ("mgmt-interface", "cp"))
2942 == service_external_cp
2943 ):
2944 vnfr_update_dict["ip-address"] = ip
2945
2946 if find_in_list(
2947 target_ee_list,
2948 lambda ee: ee.get(
2949 "external-connection-point-ref", ""
2950 )
2951 == service_external_cp,
2952 ):
2953 vnfr_update_dict[
2954 "kdur.{}.ip-address".format(kdu_index)
2955 ] = ip
2956 break
2957 else:
2958 self.logger.warn(
2959 "Mgmt service name: {} not found".format(
2960 mgmt_service["name"]
2961 )
2962 )
2963
2964 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2965 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2966
2967 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2968 if (
2969 kdu_config
2970 and kdu_config.get("initial-config-primitive")
2971 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
2972 ):
2973 initial_config_primitive_list = kdu_config.get(
2974 "initial-config-primitive"
2975 )
2976 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2977
2978 for initial_config_primitive in initial_config_primitive_list:
2979 primitive_params_ = self._map_primitive_params(
2980 initial_config_primitive, {}, {}
2981 )
2982
2983 await asyncio.wait_for(
2984 self.k8scluster_map[k8sclustertype].exec_primitive(
2985 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2986 kdu_instance=kdu_instance,
2987 primitive_name=initial_config_primitive["name"],
2988 params=primitive_params_,
2989 db_dict=db_dict_install,
2990 vca_id=vca_id,
2991 ),
2992 timeout=timeout,
2993 )
2994
2995 except Exception as e:
2996 # Prepare update db with error and raise exception
2997 try:
2998 self.update_db_2(
2999 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3000 )
3001 self.update_db_2(
3002 "vnfrs",
3003 vnfr_data.get("_id"),
3004 {"kdur.{}.status".format(kdu_index): "ERROR"},
3005 )
3006 except Exception:
3007 # ignore to keep original exception
3008 pass
3009 # reraise original error
3010 raise
3011
3012 return kdu_instance
3013
3014 async def deploy_kdus(
3015 self,
3016 logging_text,
3017 nsr_id,
3018 nslcmop_id,
3019 db_vnfrs,
3020 db_vnfds,
3021 task_instantiation_info,
3022 ):
3023 # Launch kdus if present in the descriptor
3024
3025 k8scluster_id_2_uuic = {
3026 "helm-chart-v3": {},
3027 "helm-chart": {},
3028 "juju-bundle": {},
3029 }
3030
3031 async def _get_cluster_id(cluster_id, cluster_type):
3032 nonlocal k8scluster_id_2_uuic
3033 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3034 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3035
3036 # check if K8scluster is creating and wait look if previous tasks in process
3037 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3038 "k8scluster", cluster_id
3039 )
3040 if task_dependency:
3041 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3042 task_name, cluster_id
3043 )
3044 self.logger.debug(logging_text + text)
3045 await asyncio.wait(task_dependency, timeout=3600)
3046
3047 db_k8scluster = self.db.get_one(
3048 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3049 )
3050 if not db_k8scluster:
3051 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3052
3053 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3054 if not k8s_id:
3055 if cluster_type == "helm-chart-v3":
3056 try:
3057 # backward compatibility for existing clusters that have not been initialized for helm v3
3058 k8s_credentials = yaml.safe_dump(
3059 db_k8scluster.get("credentials")
3060 )
3061 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3062 k8s_credentials, reuse_cluster_uuid=cluster_id
3063 )
3064 db_k8scluster_update = {}
3065 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3066 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3067 db_k8scluster_update[
3068 "_admin.helm-chart-v3.created"
3069 ] = uninstall_sw
3070 db_k8scluster_update[
3071 "_admin.helm-chart-v3.operationalState"
3072 ] = "ENABLED"
3073 self.update_db_2(
3074 "k8sclusters", cluster_id, db_k8scluster_update
3075 )
3076 except Exception as e:
3077 self.logger.error(
3078 logging_text
3079 + "error initializing helm-v3 cluster: {}".format(str(e))
3080 )
3081 raise LcmException(
3082 "K8s cluster '{}' has not been initialized for '{}'".format(
3083 cluster_id, cluster_type
3084 )
3085 )
3086 else:
3087 raise LcmException(
3088 "K8s cluster '{}' has not been initialized for '{}'".format(
3089 cluster_id, cluster_type
3090 )
3091 )
3092 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3093 return k8s_id
3094
3095 logging_text += "Deploy kdus: "
3096 step = ""
3097 try:
3098 db_nsr_update = {"_admin.deployed.K8s": []}
3099 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3100
3101 index = 0
3102 updated_cluster_list = []
3103 updated_v3_cluster_list = []
3104
3105 for vnfr_data in db_vnfrs.values():
3106 vca_id = self.get_vca_id(vnfr_data, {})
3107 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3108 # Step 0: Prepare and set parameters
3109 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3110 vnfd_id = vnfr_data.get("vnfd-id")
3111 vnfd_with_id = find_in_list(
3112 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3113 )
3114 kdud = next(
3115 kdud
3116 for kdud in vnfd_with_id["kdu"]
3117 if kdud["name"] == kdur["kdu-name"]
3118 )
3119 namespace = kdur.get("k8s-namespace")
3120 kdu_deployment_name = kdur.get("kdu-deployment-name")
3121 if kdur.get("helm-chart"):
3122 kdumodel = kdur["helm-chart"]
3123 # Default version: helm3, if helm-version is v2 assign v2
3124 k8sclustertype = "helm-chart-v3"
3125 self.logger.debug("kdur: {}".format(kdur))
3126 if (
3127 kdur.get("helm-version")
3128 and kdur.get("helm-version") == "v2"
3129 ):
3130 k8sclustertype = "helm-chart"
3131 elif kdur.get("juju-bundle"):
3132 kdumodel = kdur["juju-bundle"]
3133 k8sclustertype = "juju-bundle"
3134 else:
3135 raise LcmException(
3136 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3137 "juju-bundle. Maybe an old NBI version is running".format(
3138 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3139 )
3140 )
3141 # check if kdumodel is a file and exists
3142 try:
3143 vnfd_with_id = find_in_list(
3144 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3145 )
3146 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3147 if storage and storage.get(
3148 "pkg-dir"
3149 ): # may be not present if vnfd has not artifacts
3150 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3151 filename = "{}/{}/{}s/{}".format(
3152 storage["folder"],
3153 storage["pkg-dir"],
3154 k8sclustertype,
3155 kdumodel,
3156 )
3157 if self.fs.file_exists(
3158 filename, mode="file"
3159 ) or self.fs.file_exists(filename, mode="dir"):
3160 kdumodel = self.fs.path + filename
3161 except (asyncio.TimeoutError, asyncio.CancelledError):
3162 raise
3163 except Exception: # it is not a file
3164 pass
3165
3166 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3167 step = "Synchronize repos for k8s cluster '{}'".format(
3168 k8s_cluster_id
3169 )
3170 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3171
3172 # Synchronize repos
3173 if (
3174 k8sclustertype == "helm-chart"
3175 and cluster_uuid not in updated_cluster_list
3176 ) or (
3177 k8sclustertype == "helm-chart-v3"
3178 and cluster_uuid not in updated_v3_cluster_list
3179 ):
3180 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3181 self.k8scluster_map[k8sclustertype].synchronize_repos(
3182 cluster_uuid=cluster_uuid
3183 )
3184 )
3185 if del_repo_list or added_repo_dict:
3186 if k8sclustertype == "helm-chart":
3187 unset = {
3188 "_admin.helm_charts_added." + item: None
3189 for item in del_repo_list
3190 }
3191 updated = {
3192 "_admin.helm_charts_added." + item: name
3193 for item, name in added_repo_dict.items()
3194 }
3195 updated_cluster_list.append(cluster_uuid)
3196 elif k8sclustertype == "helm-chart-v3":
3197 unset = {
3198 "_admin.helm_charts_v3_added." + item: None
3199 for item in del_repo_list
3200 }
3201 updated = {
3202 "_admin.helm_charts_v3_added." + item: name
3203 for item, name in added_repo_dict.items()
3204 }
3205 updated_v3_cluster_list.append(cluster_uuid)
3206 self.logger.debug(
3207 logging_text + "repos synchronized on k8s cluster "
3208 "'{}' to_delete: {}, to_add: {}".format(
3209 k8s_cluster_id, del_repo_list, added_repo_dict
3210 )
3211 )
3212 self.db.set_one(
3213 "k8sclusters",
3214 {"_id": k8s_cluster_id},
3215 updated,
3216 unset=unset,
3217 )
3218
3219 # Instantiate kdu
3220 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3221 vnfr_data["member-vnf-index-ref"],
3222 kdur["kdu-name"],
3223 k8s_cluster_id,
3224 )
3225 k8s_instance_info = {
3226 "kdu-instance": None,
3227 "k8scluster-uuid": cluster_uuid,
3228 "k8scluster-type": k8sclustertype,
3229 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3230 "kdu-name": kdur["kdu-name"],
3231 "kdu-model": kdumodel,
3232 "namespace": namespace,
3233 "kdu-deployment-name": kdu_deployment_name,
3234 }
3235 db_path = "_admin.deployed.K8s.{}".format(index)
3236 db_nsr_update[db_path] = k8s_instance_info
3237 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3238 vnfd_with_id = find_in_list(
3239 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3240 )
3241 task = asyncio.ensure_future(
3242 self._install_kdu(
3243 nsr_id,
3244 db_path,
3245 vnfr_data,
3246 kdu_index,
3247 kdud,
3248 vnfd_with_id,
3249 k8s_instance_info,
3250 k8params=desc_params,
3251 timeout=600,
3252 vca_id=vca_id,
3253 )
3254 )
3255 self.lcm_tasks.register(
3256 "ns",
3257 nsr_id,
3258 nslcmop_id,
3259 "instantiate_KDU-{}".format(index),
3260 task,
3261 )
3262 task_instantiation_info[task] = "Deploying KDU {}".format(
3263 kdur["kdu-name"]
3264 )
3265
3266 index += 1
3267
3268 except (LcmException, asyncio.CancelledError):
3269 raise
3270 except Exception as e:
3271 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3272 if isinstance(e, (N2VCException, DbException)):
3273 self.logger.error(logging_text + msg)
3274 else:
3275 self.logger.critical(logging_text + msg, exc_info=True)
3276 raise LcmException(msg)
3277 finally:
3278 if db_nsr_update:
3279 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3280
3281 def _deploy_n2vc(
3282 self,
3283 logging_text,
3284 db_nsr,
3285 db_vnfr,
3286 nslcmop_id,
3287 nsr_id,
3288 nsi_id,
3289 vnfd_id,
3290 vdu_id,
3291 kdu_name,
3292 member_vnf_index,
3293 vdu_index,
3294 vdu_name,
3295 deploy_params,
3296 descriptor_config,
3297 base_folder,
3298 task_instantiation_info,
3299 stage,
3300 ):
3301 # launch instantiate_N2VC in a asyncio task and register task object
3302 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3303 # if not found, create one entry and update database
3304 # fill db_nsr._admin.deployed.VCA.<index>
3305
3306 self.logger.debug(
3307 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3308 )
3309 if "execution-environment-list" in descriptor_config:
3310 ee_list = descriptor_config.get("execution-environment-list", [])
3311 elif "juju" in descriptor_config:
3312 ee_list = [descriptor_config] # ns charms
3313 else: # other types as script are not supported
3314 ee_list = []
3315
3316 for ee_item in ee_list:
3317 self.logger.debug(
3318 logging_text
3319 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3320 ee_item.get("juju"), ee_item.get("helm-chart")
3321 )
3322 )
3323 ee_descriptor_id = ee_item.get("id")
3324 if ee_item.get("juju"):
3325 vca_name = ee_item["juju"].get("charm")
3326 vca_type = (
3327 "lxc_proxy_charm"
3328 if ee_item["juju"].get("charm") is not None
3329 else "native_charm"
3330 )
3331 if ee_item["juju"].get("cloud") == "k8s":
3332 vca_type = "k8s_proxy_charm"
3333 elif ee_item["juju"].get("proxy") is False:
3334 vca_type = "native_charm"
3335 elif ee_item.get("helm-chart"):
3336 vca_name = ee_item["helm-chart"]
3337 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3338 vca_type = "helm"
3339 else:
3340 vca_type = "helm-v3"
3341 else:
3342 self.logger.debug(
3343 logging_text + "skipping non juju neither charm configuration"
3344 )
3345 continue
3346
3347 vca_index = -1
3348 for vca_index, vca_deployed in enumerate(
3349 db_nsr["_admin"]["deployed"]["VCA"]
3350 ):
3351 if not vca_deployed:
3352 continue
3353 if (
3354 vca_deployed.get("member-vnf-index") == member_vnf_index
3355 and vca_deployed.get("vdu_id") == vdu_id
3356 and vca_deployed.get("kdu_name") == kdu_name
3357 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3358 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3359 ):
3360 break
3361 else:
3362 # not found, create one.
3363 target = (
3364 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3365 )
3366 if vdu_id:
3367 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3368 elif kdu_name:
3369 target += "/kdu/{}".format(kdu_name)
3370 vca_deployed = {
3371 "target_element": target,
3372 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3373 "member-vnf-index": member_vnf_index,
3374 "vdu_id": vdu_id,
3375 "kdu_name": kdu_name,
3376 "vdu_count_index": vdu_index,
3377 "operational-status": "init", # TODO revise
3378 "detailed-status": "", # TODO revise
3379 "step": "initial-deploy", # TODO revise
3380 "vnfd_id": vnfd_id,
3381 "vdu_name": vdu_name,
3382 "type": vca_type,
3383 "ee_descriptor_id": ee_descriptor_id,
3384 }
3385 vca_index += 1
3386
3387 # create VCA and configurationStatus in db
3388 db_dict = {
3389 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3390 "configurationStatus.{}".format(vca_index): dict(),
3391 }
3392 self.update_db_2("nsrs", nsr_id, db_dict)
3393
3394 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3395
3396 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3397 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3398 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3399
3400 # Launch task
3401 task_n2vc = asyncio.ensure_future(
3402 self.instantiate_N2VC(
3403 logging_text=logging_text,
3404 vca_index=vca_index,
3405 nsi_id=nsi_id,
3406 db_nsr=db_nsr,
3407 db_vnfr=db_vnfr,
3408 vdu_id=vdu_id,
3409 kdu_name=kdu_name,
3410 vdu_index=vdu_index,
3411 deploy_params=deploy_params,
3412 config_descriptor=descriptor_config,
3413 base_folder=base_folder,
3414 nslcmop_id=nslcmop_id,
3415 stage=stage,
3416 vca_type=vca_type,
3417 vca_name=vca_name,
3418 ee_config_descriptor=ee_item,
3419 )
3420 )
3421 self.lcm_tasks.register(
3422 "ns",
3423 nsr_id,
3424 nslcmop_id,
3425 "instantiate_N2VC-{}".format(vca_index),
3426 task_n2vc,
3427 )
3428 task_instantiation_info[
3429 task_n2vc
3430 ] = self.task_name_deploy_vca + " {}.{}".format(
3431 member_vnf_index or "", vdu_id or ""
3432 )
3433
3434 @staticmethod
3435 def _create_nslcmop(nsr_id, operation, params):
3436 """
3437 Creates a ns-lcm-opp content to be stored at database.
3438 :param nsr_id: internal id of the instance
3439 :param operation: instantiate, terminate, scale, action, ...
3440 :param params: user parameters for the operation
3441 :return: dictionary following SOL005 format
3442 """
3443 # Raise exception if invalid arguments
3444 if not (nsr_id and operation and params):
3445 raise LcmException(
3446 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3447 )
3448 now = time()
3449 _id = str(uuid4())
3450 nslcmop = {
3451 "id": _id,
3452 "_id": _id,
3453 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3454 "operationState": "PROCESSING",
3455 "statusEnteredTime": now,
3456 "nsInstanceId": nsr_id,
3457 "lcmOperationType": operation,
3458 "startTime": now,
3459 "isAutomaticInvocation": False,
3460 "operationParams": params,
3461 "isCancelPending": False,
3462 "links": {
3463 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3464 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3465 },
3466 }
3467 return nslcmop
3468
3469 def _format_additional_params(self, params):
3470 params = params or {}
3471 for key, value in params.items():
3472 if str(value).startswith("!!yaml "):
3473 params[key] = yaml.safe_load(value[7:])
3474 return params
3475
3476 def _get_terminate_primitive_params(self, seq, vnf_index):
3477 primitive = seq.get("name")
3478 primitive_params = {}
3479 params = {
3480 "member_vnf_index": vnf_index,
3481 "primitive": primitive,
3482 "primitive_params": primitive_params,
3483 }
3484 desc_params = {}
3485 return self._map_primitive_params(seq, params, desc_params)
3486
3487 # sub-operations
3488
3489 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3490 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3491 if op.get("operationState") == "COMPLETED":
3492 # b. Skip sub-operation
3493 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3494 return self.SUBOPERATION_STATUS_SKIP
3495 else:
3496 # c. retry executing sub-operation
3497 # The sub-operation exists, and operationState != 'COMPLETED'
3498 # Update operationState = 'PROCESSING' to indicate a retry.
3499 operationState = "PROCESSING"
3500 detailed_status = "In progress"
3501 self._update_suboperation_status(
3502 db_nslcmop, op_index, operationState, detailed_status
3503 )
3504 # Return the sub-operation index
3505 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3506 # with arguments extracted from the sub-operation
3507 return op_index
3508
3509 # Find a sub-operation where all keys in a matching dictionary must match
3510 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3511 def _find_suboperation(self, db_nslcmop, match):
3512 if db_nslcmop and match:
3513 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3514 for i, op in enumerate(op_list):
3515 if all(op.get(k) == match[k] for k in match):
3516 return i
3517 return self.SUBOPERATION_STATUS_NOT_FOUND
3518
3519 # Update status for a sub-operation given its index
3520 def _update_suboperation_status(
3521 self, db_nslcmop, op_index, operationState, detailed_status
3522 ):
3523 # Update DB for HA tasks
3524 q_filter = {"_id": db_nslcmop["_id"]}
3525 update_dict = {
3526 "_admin.operations.{}.operationState".format(op_index): operationState,
3527 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3528 }
3529 self.db.set_one(
3530 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3531 )
3532
3533 # Add sub-operation, return the index of the added sub-operation
3534 # Optionally, set operationState, detailed-status, and operationType
3535 # Status and type are currently set for 'scale' sub-operations:
3536 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3537 # 'detailed-status' : status message
3538 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3539 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3540 def _add_suboperation(
3541 self,
3542 db_nslcmop,
3543 vnf_index,
3544 vdu_id,
3545 vdu_count_index,
3546 vdu_name,
3547 primitive,
3548 mapped_primitive_params,
3549 operationState=None,
3550 detailed_status=None,
3551 operationType=None,
3552 RO_nsr_id=None,
3553 RO_scaling_info=None,
3554 ):
3555 if not db_nslcmop:
3556 return self.SUBOPERATION_STATUS_NOT_FOUND
3557 # Get the "_admin.operations" list, if it exists
3558 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3559 op_list = db_nslcmop_admin.get("operations")
3560 # Create or append to the "_admin.operations" list
3561 new_op = {
3562 "member_vnf_index": vnf_index,
3563 "vdu_id": vdu_id,
3564 "vdu_count_index": vdu_count_index,
3565 "primitive": primitive,
3566 "primitive_params": mapped_primitive_params,
3567 }
3568 if operationState:
3569 new_op["operationState"] = operationState
3570 if detailed_status:
3571 new_op["detailed-status"] = detailed_status
3572 if operationType:
3573 new_op["lcmOperationType"] = operationType
3574 if RO_nsr_id:
3575 new_op["RO_nsr_id"] = RO_nsr_id
3576 if RO_scaling_info:
3577 new_op["RO_scaling_info"] = RO_scaling_info
3578 if not op_list:
3579 # No existing operations, create key 'operations' with current operation as first list element
3580 db_nslcmop_admin.update({"operations": [new_op]})
3581 op_list = db_nslcmop_admin.get("operations")
3582 else:
3583 # Existing operations, append operation to list
3584 op_list.append(new_op)
3585
3586 db_nslcmop_update = {"_admin.operations": op_list}
3587 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3588 op_index = len(op_list) - 1
3589 return op_index
3590
3591 # Helper methods for scale() sub-operations
3592
3593 # pre-scale/post-scale:
3594 # Check for 3 different cases:
3595 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3596 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3597 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3598 def _check_or_add_scale_suboperation(
3599 self,
3600 db_nslcmop,
3601 vnf_index,
3602 vnf_config_primitive,
3603 primitive_params,
3604 operationType,
3605 RO_nsr_id=None,
3606 RO_scaling_info=None,
3607 ):
3608 # Find this sub-operation
3609 if RO_nsr_id and RO_scaling_info:
3610 operationType = "SCALE-RO"
3611 match = {
3612 "member_vnf_index": vnf_index,
3613 "RO_nsr_id": RO_nsr_id,
3614 "RO_scaling_info": RO_scaling_info,
3615 }
3616 else:
3617 match = {
3618 "member_vnf_index": vnf_index,
3619 "primitive": vnf_config_primitive,
3620 "primitive_params": primitive_params,
3621 "lcmOperationType": operationType,
3622 }
3623 op_index = self._find_suboperation(db_nslcmop, match)
3624 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3625 # a. New sub-operation
3626 # The sub-operation does not exist, add it.
3627 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3628 # The following parameters are set to None for all kind of scaling:
3629 vdu_id = None
3630 vdu_count_index = None
3631 vdu_name = None
3632 if RO_nsr_id and RO_scaling_info:
3633 vnf_config_primitive = None
3634 primitive_params = None
3635 else:
3636 RO_nsr_id = None
3637 RO_scaling_info = None
3638 # Initial status for sub-operation
3639 operationState = "PROCESSING"
3640 detailed_status = "In progress"
3641 # Add sub-operation for pre/post-scaling (zero or more operations)
3642 self._add_suboperation(
3643 db_nslcmop,
3644 vnf_index,
3645 vdu_id,
3646 vdu_count_index,
3647 vdu_name,
3648 vnf_config_primitive,
3649 primitive_params,
3650 operationState,
3651 detailed_status,
3652 operationType,
3653 RO_nsr_id,
3654 RO_scaling_info,
3655 )
3656 return self.SUBOPERATION_STATUS_NEW
3657 else:
3658 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3659 # or op_index (operationState != 'COMPLETED')
3660 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3661
3662 # Function to return execution_environment id
3663
3664 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3665 # TODO vdu_index_count
3666 for vca in vca_deployed_list:
3667 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3668 return vca["ee_id"]
3669
3670 async def destroy_N2VC(
3671 self,
3672 logging_text,
3673 db_nslcmop,
3674 vca_deployed,
3675 config_descriptor,
3676 vca_index,
3677 destroy_ee=True,
3678 exec_primitives=True,
3679 scaling_in=False,
3680 vca_id: str = None,
3681 ):
3682 """
3683 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3684 :param logging_text:
3685 :param db_nslcmop:
3686 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3687 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3688 :param vca_index: index in the database _admin.deployed.VCA
3689 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3690 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3691 not executed properly
3692 :param scaling_in: True destroys the application, False destroys the model
3693 :return: None or exception
3694 """
3695
3696 self.logger.debug(
3697 logging_text
3698 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3699 vca_index, vca_deployed, config_descriptor, destroy_ee
3700 )
3701 )
3702
3703 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3704
3705 # execute terminate_primitives
3706 if exec_primitives:
3707 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3708 config_descriptor.get("terminate-config-primitive"),
3709 vca_deployed.get("ee_descriptor_id"),
3710 )
3711 vdu_id = vca_deployed.get("vdu_id")
3712 vdu_count_index = vca_deployed.get("vdu_count_index")
3713 vdu_name = vca_deployed.get("vdu_name")
3714 vnf_index = vca_deployed.get("member-vnf-index")
3715 if terminate_primitives and vca_deployed.get("needed_terminate"):
3716 for seq in terminate_primitives:
3717 # For each sequence in list, get primitive and call _ns_execute_primitive()
3718 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3719 vnf_index, seq.get("name")
3720 )
3721 self.logger.debug(logging_text + step)
3722 # Create the primitive for each sequence, i.e. "primitive": "touch"
3723 primitive = seq.get("name")
3724 mapped_primitive_params = self._get_terminate_primitive_params(
3725 seq, vnf_index
3726 )
3727
3728 # Add sub-operation
3729 self._add_suboperation(
3730 db_nslcmop,
3731 vnf_index,
3732 vdu_id,
3733 vdu_count_index,
3734 vdu_name,
3735 primitive,
3736 mapped_primitive_params,
3737 )
3738 # Sub-operations: Call _ns_execute_primitive() instead of action()
3739 try:
3740 result, result_detail = await self._ns_execute_primitive(
3741 vca_deployed["ee_id"],
3742 primitive,
3743 mapped_primitive_params,
3744 vca_type=vca_type,
3745 vca_id=vca_id,
3746 )
3747 except LcmException:
3748 # this happens when VCA is not deployed. In this case it is not needed to terminate
3749 continue
3750 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3751 if result not in result_ok:
3752 raise LcmException(
3753 "terminate_primitive {} for vnf_member_index={} fails with "
3754 "error {}".format(seq.get("name"), vnf_index, result_detail)
3755 )
3756 # set that this VCA do not need terminated
3757 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3758 vca_index
3759 )
3760 self.update_db_2(
3761 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3762 )
3763
3764 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3765 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3766
3767 if destroy_ee:
3768 await self.vca_map[vca_type].delete_execution_environment(
3769 vca_deployed["ee_id"],
3770 scaling_in=scaling_in,
3771 vca_type=vca_type,
3772 vca_id=vca_id,
3773 )
3774
3775 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3776 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3777 namespace = "." + db_nsr["_id"]
3778 try:
3779 await self.n2vc.delete_namespace(
3780 namespace=namespace,
3781 total_timeout=self.timeout_charm_delete,
3782 vca_id=vca_id,
3783 )
3784 except N2VCNotFound: # already deleted. Skip
3785 pass
3786 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3787
3788 async def _terminate_RO(
3789 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3790 ):
3791 """
3792 Terminates a deployment from RO
3793 :param logging_text:
3794 :param nsr_deployed: db_nsr._admin.deployed
3795 :param nsr_id:
3796 :param nslcmop_id:
3797 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3798 this method will update only the index 2, but it will write on database the concatenated content of the list
3799 :return:
3800 """
3801 db_nsr_update = {}
3802 failed_detail = []
3803 ro_nsr_id = ro_delete_action = None
3804 if nsr_deployed and nsr_deployed.get("RO"):
3805 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3806 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3807 try:
3808 if ro_nsr_id:
3809 stage[2] = "Deleting ns from VIM."
3810 db_nsr_update["detailed-status"] = " ".join(stage)
3811 self._write_op_status(nslcmop_id, stage)
3812 self.logger.debug(logging_text + stage[2])
3813 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3814 self._write_op_status(nslcmop_id, stage)
3815 desc = await self.RO.delete("ns", ro_nsr_id)
3816 ro_delete_action = desc["action_id"]
3817 db_nsr_update[
3818 "_admin.deployed.RO.nsr_delete_action_id"
3819 ] = ro_delete_action
3820 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3821 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3822 if ro_delete_action:
3823 # wait until NS is deleted from VIM
3824 stage[2] = "Waiting ns deleted from VIM."
3825 detailed_status_old = None
3826 self.logger.debug(
3827 logging_text
3828 + stage[2]
3829 + " RO_id={} ro_delete_action={}".format(
3830 ro_nsr_id, ro_delete_action
3831 )
3832 )
3833 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3834 self._write_op_status(nslcmop_id, stage)
3835
3836 delete_timeout = 20 * 60 # 20 minutes
3837 while delete_timeout > 0:
3838 desc = await self.RO.show(
3839 "ns",
3840 item_id_name=ro_nsr_id,
3841 extra_item="action",
3842 extra_item_id=ro_delete_action,
3843 )
3844
3845 # deploymentStatus
3846 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3847
3848 ns_status, ns_status_info = self.RO.check_action_status(desc)
3849 if ns_status == "ERROR":
3850 raise ROclient.ROClientException(ns_status_info)
3851 elif ns_status == "BUILD":
3852 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3853 elif ns_status == "ACTIVE":
3854 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3855 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3856 break
3857 else:
3858 assert (
3859 False
3860 ), "ROclient.check_action_status returns unknown {}".format(
3861 ns_status
3862 )
3863 if stage[2] != detailed_status_old:
3864 detailed_status_old = stage[2]
3865 db_nsr_update["detailed-status"] = " ".join(stage)
3866 self._write_op_status(nslcmop_id, stage)
3867 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3868 await asyncio.sleep(5, loop=self.loop)
3869 delete_timeout -= 5
3870 else: # delete_timeout <= 0:
3871 raise ROclient.ROClientException(
3872 "Timeout waiting ns deleted from VIM"
3873 )
3874
3875 except Exception as e:
3876 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3877 if (
3878 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3879 ): # not found
3880 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3881 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3882 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3883 self.logger.debug(
3884 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3885 )
3886 elif (
3887 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3888 ): # conflict
3889 failed_detail.append("delete conflict: {}".format(e))
3890 self.logger.debug(
3891 logging_text
3892 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3893 )
3894 else:
3895 failed_detail.append("delete error: {}".format(e))
3896 self.logger.error(
3897 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3898 )
3899
3900 # Delete nsd
3901 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3902 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3903 try:
3904 stage[2] = "Deleting nsd from RO."
3905 db_nsr_update["detailed-status"] = " ".join(stage)
3906 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3907 self._write_op_status(nslcmop_id, stage)
3908 await self.RO.delete("nsd", ro_nsd_id)
3909 self.logger.debug(
3910 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3911 )
3912 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3913 except Exception as e:
3914 if (
3915 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3916 ): # not found
3917 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3918 self.logger.debug(
3919 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3920 )
3921 elif (
3922 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3923 ): # conflict
3924 failed_detail.append(
3925 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3926 )
3927 self.logger.debug(logging_text + failed_detail[-1])
3928 else:
3929 failed_detail.append(
3930 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
3931 )
3932 self.logger.error(logging_text + failed_detail[-1])
3933
3934 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3935 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3936 if not vnf_deployed or not vnf_deployed["id"]:
3937 continue
3938 try:
3939 ro_vnfd_id = vnf_deployed["id"]
3940 stage[
3941 2
3942 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3943 vnf_deployed["member-vnf-index"], ro_vnfd_id
3944 )
3945 db_nsr_update["detailed-status"] = " ".join(stage)
3946 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3947 self._write_op_status(nslcmop_id, stage)
3948 await self.RO.delete("vnfd", ro_vnfd_id)
3949 self.logger.debug(
3950 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
3951 )
3952 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3953 except Exception as e:
3954 if (
3955 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3956 ): # not found
3957 db_nsr_update[
3958 "_admin.deployed.RO.vnfd.{}.id".format(index)
3959 ] = None
3960 self.logger.debug(
3961 logging_text
3962 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
3963 )
3964 elif (
3965 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3966 ): # conflict
3967 failed_detail.append(
3968 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
3969 )
3970 self.logger.debug(logging_text + failed_detail[-1])
3971 else:
3972 failed_detail.append(
3973 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
3974 )
3975 self.logger.error(logging_text + failed_detail[-1])
3976
3977 if failed_detail:
3978 stage[2] = "Error deleting from VIM"
3979 else:
3980 stage[2] = "Deleted from VIM"
3981 db_nsr_update["detailed-status"] = " ".join(stage)
3982 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3983 self._write_op_status(nslcmop_id, stage)
3984
3985 if failed_detail:
3986 raise LcmException("; ".join(failed_detail))
3987
3988 async def terminate(self, nsr_id, nslcmop_id):
3989 # Try to lock HA task here
3990 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
3991 if not task_is_locked_by_me:
3992 return
3993
3994 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3995 self.logger.debug(logging_text + "Enter")
3996 timeout_ns_terminate = self.timeout_ns_terminate
3997 db_nsr = None
3998 db_nslcmop = None
3999 operation_params = None
4000 exc = None
4001 error_list = [] # annotates all failed error messages
4002 db_nslcmop_update = {}
4003 autoremove = False # autoremove after terminated
4004 tasks_dict_info = {}
4005 db_nsr_update = {}
4006 stage = [
4007 "Stage 1/3: Preparing task.",
4008 "Waiting for previous operations to terminate.",
4009 "",
4010 ]
4011 # ^ contains [stage, step, VIM-status]
4012 try:
4013 # wait for any previous tasks in process
4014 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4015
4016 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4017 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4018 operation_params = db_nslcmop.get("operationParams") or {}
4019 if operation_params.get("timeout_ns_terminate"):
4020 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4021 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4022 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4023
4024 db_nsr_update["operational-status"] = "terminating"
4025 db_nsr_update["config-status"] = "terminating"
4026 self._write_ns_status(
4027 nsr_id=nsr_id,
4028 ns_state="TERMINATING",
4029 current_operation="TERMINATING",
4030 current_operation_id=nslcmop_id,
4031 other_update=db_nsr_update,
4032 )
4033 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4034 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4035 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4036 return
4037
4038 stage[1] = "Getting vnf descriptors from db."
4039 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4040 db_vnfrs_dict = {
4041 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4042 }
4043 db_vnfds_from_id = {}
4044 db_vnfds_from_member_index = {}
4045 # Loop over VNFRs
4046 for vnfr in db_vnfrs_list:
4047 vnfd_id = vnfr["vnfd-id"]
4048 if vnfd_id not in db_vnfds_from_id:
4049 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4050 db_vnfds_from_id[vnfd_id] = vnfd
4051 db_vnfds_from_member_index[
4052 vnfr["member-vnf-index-ref"]
4053 ] = db_vnfds_from_id[vnfd_id]
4054
4055 # Destroy individual execution environments when there are terminating primitives.
4056 # Rest of EE will be deleted at once
4057 # TODO - check before calling _destroy_N2VC
4058 # if not operation_params.get("skip_terminate_primitives"):#
4059 # or not vca.get("needed_terminate"):
4060 stage[0] = "Stage 2/3 execute terminating primitives."
4061 self.logger.debug(logging_text + stage[0])
4062 stage[1] = "Looking execution environment that needs terminate."
4063 self.logger.debug(logging_text + stage[1])
4064
4065 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4066 config_descriptor = None
4067 vca_member_vnf_index = vca.get("member-vnf-index")
4068 vca_id = self.get_vca_id(
4069 db_vnfrs_dict.get(vca_member_vnf_index)
4070 if vca_member_vnf_index
4071 else None,
4072 db_nsr,
4073 )
4074 if not vca or not vca.get("ee_id"):
4075 continue
4076 if not vca.get("member-vnf-index"):
4077 # ns
4078 config_descriptor = db_nsr.get("ns-configuration")
4079 elif vca.get("vdu_id"):
4080 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4081 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4082 elif vca.get("kdu_name"):
4083 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4084 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4085 else:
4086 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4087 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4088 vca_type = vca.get("type")
4089 exec_terminate_primitives = not operation_params.get(
4090 "skip_terminate_primitives"
4091 ) and vca.get("needed_terminate")
4092 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4093 # pending native charms
4094 destroy_ee = (
4095 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4096 )
4097 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4098 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4099 task = asyncio.ensure_future(
4100 self.destroy_N2VC(
4101 logging_text,
4102 db_nslcmop,
4103 vca,
4104 config_descriptor,
4105 vca_index,
4106 destroy_ee,
4107 exec_terminate_primitives,
4108 vca_id=vca_id,
4109 )
4110 )
4111 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4112
4113 # wait for pending tasks of terminate primitives
4114 if tasks_dict_info:
4115 self.logger.debug(
4116 logging_text
4117 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4118 )
4119 error_list = await self._wait_for_tasks(
4120 logging_text,
4121 tasks_dict_info,
4122 min(self.timeout_charm_delete, timeout_ns_terminate),
4123 stage,
4124 nslcmop_id,
4125 )
4126 tasks_dict_info.clear()
4127 if error_list:
4128 return # raise LcmException("; ".join(error_list))
4129
4130 # remove All execution environments at once
4131 stage[0] = "Stage 3/3 delete all."
4132
4133 if nsr_deployed.get("VCA"):
4134 stage[1] = "Deleting all execution environments."
4135 self.logger.debug(logging_text + stage[1])
4136 vca_id = self.get_vca_id({}, db_nsr)
4137 task_delete_ee = asyncio.ensure_future(
4138 asyncio.wait_for(
4139 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4140 timeout=self.timeout_charm_delete,
4141 )
4142 )
4143 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4144 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4145
4146 # Delete from k8scluster
4147 stage[1] = "Deleting KDUs."
4148 self.logger.debug(logging_text + stage[1])
4149 # print(nsr_deployed)
4150 for kdu in get_iterable(nsr_deployed, "K8s"):
4151 if not kdu or not kdu.get("kdu-instance"):
4152 continue
4153 kdu_instance = kdu.get("kdu-instance")
4154 if kdu.get("k8scluster-type") in self.k8scluster_map:
4155 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4156 vca_id = self.get_vca_id({}, db_nsr)
4157 task_delete_kdu_instance = asyncio.ensure_future(
4158 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4159 cluster_uuid=kdu.get("k8scluster-uuid"),
4160 kdu_instance=kdu_instance,
4161 vca_id=vca_id,
4162 )
4163 )
4164 else:
4165 self.logger.error(
4166 logging_text
4167 + "Unknown k8s deployment type {}".format(
4168 kdu.get("k8scluster-type")
4169 )
4170 )
4171 continue
4172 tasks_dict_info[
4173 task_delete_kdu_instance
4174 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4175
4176 # remove from RO
4177 stage[1] = "Deleting ns from VIM."
4178 if self.ng_ro:
4179 task_delete_ro = asyncio.ensure_future(
4180 self._terminate_ng_ro(
4181 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4182 )
4183 )
4184 else:
4185 task_delete_ro = asyncio.ensure_future(
4186 self._terminate_RO(
4187 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4188 )
4189 )
4190 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4191
4192 # rest of staff will be done at finally
4193
4194 except (
4195 ROclient.ROClientException,
4196 DbException,
4197 LcmException,
4198 N2VCException,
4199 ) as e:
4200 self.logger.error(logging_text + "Exit Exception {}".format(e))
4201 exc = e
4202 except asyncio.CancelledError:
4203 self.logger.error(
4204 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4205 )
4206 exc = "Operation was cancelled"
4207 except Exception as e:
4208 exc = traceback.format_exc()
4209 self.logger.critical(
4210 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4211 exc_info=True,
4212 )
4213 finally:
4214 if exc:
4215 error_list.append(str(exc))
4216 try:
4217 # wait for pending tasks
4218 if tasks_dict_info:
4219 stage[1] = "Waiting for terminate pending tasks."
4220 self.logger.debug(logging_text + stage[1])
4221 error_list += await self._wait_for_tasks(
4222 logging_text,
4223 tasks_dict_info,
4224 timeout_ns_terminate,
4225 stage,
4226 nslcmop_id,
4227 )
4228 stage[1] = stage[2] = ""
4229 except asyncio.CancelledError:
4230 error_list.append("Cancelled")
4231 # TODO cancell all tasks
4232 except Exception as exc:
4233 error_list.append(str(exc))
4234 # update status at database
4235 if error_list:
4236 error_detail = "; ".join(error_list)
4237 # self.logger.error(logging_text + error_detail)
4238 error_description_nslcmop = "{} Detail: {}".format(
4239 stage[0], error_detail
4240 )
4241 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4242 nslcmop_id, stage[0]
4243 )
4244
4245 db_nsr_update["operational-status"] = "failed"
4246 db_nsr_update["detailed-status"] = (
4247 error_description_nsr + " Detail: " + error_detail
4248 )
4249 db_nslcmop_update["detailed-status"] = error_detail
4250 nslcmop_operation_state = "FAILED"
4251 ns_state = "BROKEN"
4252 else:
4253 error_detail = None
4254 error_description_nsr = error_description_nslcmop = None
4255 ns_state = "NOT_INSTANTIATED"
4256 db_nsr_update["operational-status"] = "terminated"
4257 db_nsr_update["detailed-status"] = "Done"
4258 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4259 db_nslcmop_update["detailed-status"] = "Done"
4260 nslcmop_operation_state = "COMPLETED"
4261
4262 if db_nsr:
4263 self._write_ns_status(
4264 nsr_id=nsr_id,
4265 ns_state=ns_state,
4266 current_operation="IDLE",
4267 current_operation_id=None,
4268 error_description=error_description_nsr,
4269 error_detail=error_detail,
4270 other_update=db_nsr_update,
4271 )
4272 self._write_op_status(
4273 op_id=nslcmop_id,
4274 stage="",
4275 error_message=error_description_nslcmop,
4276 operation_state=nslcmop_operation_state,
4277 other_update=db_nslcmop_update,
4278 )
4279 if ns_state == "NOT_INSTANTIATED":
4280 try:
4281 self.db.set_list(
4282 "vnfrs",
4283 {"nsr-id-ref": nsr_id},
4284 {"_admin.nsState": "NOT_INSTANTIATED"},
4285 )
4286 except DbException as e:
4287 self.logger.warn(
4288 logging_text
4289 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4290 nsr_id, e
4291 )
4292 )
4293 if operation_params:
4294 autoremove = operation_params.get("autoremove", False)
4295 if nslcmop_operation_state:
4296 try:
4297 await self.msg.aiowrite(
4298 "ns",
4299 "terminated",
4300 {
4301 "nsr_id": nsr_id,
4302 "nslcmop_id": nslcmop_id,
4303 "operationState": nslcmop_operation_state,
4304 "autoremove": autoremove,
4305 },
4306 loop=self.loop,
4307 )
4308 except Exception as e:
4309 self.logger.error(
4310 logging_text + "kafka_write notification Exception {}".format(e)
4311 )
4312
4313 self.logger.debug(logging_text + "Exit")
4314 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4315
4316 async def _wait_for_tasks(
4317 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4318 ):
4319 time_start = time()
4320 error_detail_list = []
4321 error_list = []
4322 pending_tasks = list(created_tasks_info.keys())
4323 num_tasks = len(pending_tasks)
4324 num_done = 0
4325 stage[1] = "{}/{}.".format(num_done, num_tasks)
4326 self._write_op_status(nslcmop_id, stage)
4327 while pending_tasks:
4328 new_error = None
4329 _timeout = timeout + time_start - time()
4330 done, pending_tasks = await asyncio.wait(
4331 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4332 )
4333 num_done += len(done)
4334 if not done: # Timeout
4335 for task in pending_tasks:
4336 new_error = created_tasks_info[task] + ": Timeout"
4337 error_detail_list.append(new_error)
4338 error_list.append(new_error)
4339 break
4340 for task in done:
4341 if task.cancelled():
4342 exc = "Cancelled"
4343 else:
4344 exc = task.exception()
4345 if exc:
4346 if isinstance(exc, asyncio.TimeoutError):
4347 exc = "Timeout"
4348 new_error = created_tasks_info[task] + ": {}".format(exc)
4349 error_list.append(created_tasks_info[task])
4350 error_detail_list.append(new_error)
4351 if isinstance(
4352 exc,
4353 (
4354 str,
4355 DbException,
4356 N2VCException,
4357 ROclient.ROClientException,
4358 LcmException,
4359 K8sException,
4360 NgRoException,
4361 ),
4362 ):
4363 self.logger.error(logging_text + new_error)
4364 else:
4365 exc_traceback = "".join(
4366 traceback.format_exception(None, exc, exc.__traceback__)
4367 )
4368 self.logger.error(
4369 logging_text
4370 + created_tasks_info[task]
4371 + " "
4372 + exc_traceback
4373 )
4374 else:
4375 self.logger.debug(
4376 logging_text + created_tasks_info[task] + ": Done"
4377 )
4378 stage[1] = "{}/{}.".format(num_done, num_tasks)
4379 if new_error:
4380 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4381 if nsr_id: # update also nsr
4382 self.update_db_2(
4383 "nsrs",
4384 nsr_id,
4385 {
4386 "errorDescription": "Error at: " + ", ".join(error_list),
4387 "errorDetail": ". ".join(error_detail_list),
4388 },
4389 )
4390 self._write_op_status(nslcmop_id, stage)
4391 return error_detail_list
4392
4393 @staticmethod
4394 def _map_primitive_params(primitive_desc, params, instantiation_params):
4395 """
4396 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4397 The default-value is used. If it is between < > it look for a value at instantiation_params
4398 :param primitive_desc: portion of VNFD/NSD that describes primitive
4399 :param params: Params provided by user
4400 :param instantiation_params: Instantiation params provided by user
4401 :return: a dictionary with the calculated params
4402 """
4403 calculated_params = {}
4404 for parameter in primitive_desc.get("parameter", ()):
4405 param_name = parameter["name"]
4406 if param_name in params:
4407 calculated_params[param_name] = params[param_name]
4408 elif "default-value" in parameter or "value" in parameter:
4409 if "value" in parameter:
4410 calculated_params[param_name] = parameter["value"]
4411 else:
4412 calculated_params[param_name] = parameter["default-value"]
4413 if (
4414 isinstance(calculated_params[param_name], str)
4415 and calculated_params[param_name].startswith("<")
4416 and calculated_params[param_name].endswith(">")
4417 ):
4418 if calculated_params[param_name][1:-1] in instantiation_params:
4419 calculated_params[param_name] = instantiation_params[
4420 calculated_params[param_name][1:-1]
4421 ]
4422 else:
4423 raise LcmException(
4424 "Parameter {} needed to execute primitive {} not provided".format(
4425 calculated_params[param_name], primitive_desc["name"]
4426 )
4427 )
4428 else:
4429 raise LcmException(
4430 "Parameter {} needed to execute primitive {} not provided".format(
4431 param_name, primitive_desc["name"]
4432 )
4433 )
4434
4435 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4436 calculated_params[param_name] = yaml.safe_dump(
4437 calculated_params[param_name], default_flow_style=True, width=256
4438 )
4439 elif isinstance(calculated_params[param_name], str) and calculated_params[
4440 param_name
4441 ].startswith("!!yaml "):
4442 calculated_params[param_name] = calculated_params[param_name][7:]
4443 if parameter.get("data-type") == "INTEGER":
4444 try:
4445 calculated_params[param_name] = int(calculated_params[param_name])
4446 except ValueError: # error converting string to int
4447 raise LcmException(
4448 "Parameter {} of primitive {} must be integer".format(
4449 param_name, primitive_desc["name"]
4450 )
4451 )
4452 elif parameter.get("data-type") == "BOOLEAN":
4453 calculated_params[param_name] = not (
4454 (str(calculated_params[param_name])).lower() == "false"
4455 )
4456
4457 # add always ns_config_info if primitive name is config
4458 if primitive_desc["name"] == "config":
4459 if "ns_config_info" in instantiation_params:
4460 calculated_params["ns_config_info"] = instantiation_params[
4461 "ns_config_info"
4462 ]
4463 return calculated_params
4464
4465 def _look_for_deployed_vca(
4466 self,
4467 deployed_vca,
4468 member_vnf_index,
4469 vdu_id,
4470 vdu_count_index,
4471 kdu_name=None,
4472 ee_descriptor_id=None,
4473 ):
4474 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4475 for vca in deployed_vca:
4476 if not vca:
4477 continue
4478 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4479 continue
4480 if (
4481 vdu_count_index is not None
4482 and vdu_count_index != vca["vdu_count_index"]
4483 ):
4484 continue
4485 if kdu_name and kdu_name != vca["kdu_name"]:
4486 continue
4487 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4488 continue
4489 break
4490 else:
4491 # vca_deployed not found
4492 raise LcmException(
4493 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4494 " is not deployed".format(
4495 member_vnf_index,
4496 vdu_id,
4497 vdu_count_index,
4498 kdu_name,
4499 ee_descriptor_id,
4500 )
4501 )
4502 # get ee_id
4503 ee_id = vca.get("ee_id")
4504 vca_type = vca.get(
4505 "type", "lxc_proxy_charm"
4506 ) # default value for backward compatibility - proxy charm
4507 if not ee_id:
4508 raise LcmException(
4509 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4510 "execution environment".format(
4511 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4512 )
4513 )
4514 return ee_id, vca_type
4515
4516 async def _ns_execute_primitive(
4517 self,
4518 ee_id,
4519 primitive,
4520 primitive_params,
4521 retries=0,
4522 retries_interval=30,
4523 timeout=None,
4524 vca_type=None,
4525 db_dict=None,
4526 vca_id: str = None,
4527 ) -> (str, str):
4528 try:
4529 if primitive == "config":
4530 primitive_params = {"params": primitive_params}
4531
4532 vca_type = vca_type or "lxc_proxy_charm"
4533
4534 while retries >= 0:
4535 try:
4536 output = await asyncio.wait_for(
4537 self.vca_map[vca_type].exec_primitive(
4538 ee_id=ee_id,
4539 primitive_name=primitive,
4540 params_dict=primitive_params,
4541 progress_timeout=self.timeout_progress_primitive,
4542 total_timeout=self.timeout_primitive,
4543 db_dict=db_dict,
4544 vca_id=vca_id,
4545 vca_type=vca_type,
4546 ),
4547 timeout=timeout or self.timeout_primitive,
4548 )
4549 # execution was OK
4550 break
4551 except asyncio.CancelledError:
4552 raise
4553 except Exception as e: # asyncio.TimeoutError
4554 if isinstance(e, asyncio.TimeoutError):
4555 e = "Timeout"
4556 retries -= 1
4557 if retries >= 0:
4558 self.logger.debug(
4559 "Error executing action {} on {} -> {}".format(
4560 primitive, ee_id, e
4561 )
4562 )
4563 # wait and retry
4564 await asyncio.sleep(retries_interval, loop=self.loop)
4565 else:
4566 return "FAILED", str(e)
4567
4568 return "COMPLETED", output
4569
4570 except (LcmException, asyncio.CancelledError):
4571 raise
4572 except Exception as e:
4573 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4574
4575 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4576 """
4577 Updating the vca_status with latest juju information in nsrs record
4578 :param: nsr_id: Id of the nsr
4579 :param: nslcmop_id: Id of the nslcmop
4580 :return: None
4581 """
4582
4583 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4584 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4585 vca_id = self.get_vca_id({}, db_nsr)
4586 if db_nsr["_admin"]["deployed"]["K8s"]:
4587 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4588 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4589 await self._on_update_k8s_db(
4590 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4591 )
4592 else:
4593 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4594 table, filter = "nsrs", {"_id": nsr_id}
4595 path = "_admin.deployed.VCA.{}.".format(vca_index)
4596 await self._on_update_n2vc_db(table, filter, path, {})
4597
4598 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4599 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4600
4601 async def action(self, nsr_id, nslcmop_id):
4602 # Try to lock HA task here
4603 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4604 if not task_is_locked_by_me:
4605 return
4606
4607 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4608 self.logger.debug(logging_text + "Enter")
4609 # get all needed from database
4610 db_nsr = None
4611 db_nslcmop = None
4612 db_nsr_update = {}
4613 db_nslcmop_update = {}
4614 nslcmop_operation_state = None
4615 error_description_nslcmop = None
4616 exc = None
4617 try:
4618 # wait for any previous tasks in process
4619 step = "Waiting for previous operations to terminate"
4620 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4621
4622 self._write_ns_status(
4623 nsr_id=nsr_id,
4624 ns_state=None,
4625 current_operation="RUNNING ACTION",
4626 current_operation_id=nslcmop_id,
4627 )
4628
4629 step = "Getting information from database"
4630 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4631 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4632 if db_nslcmop["operationParams"].get("primitive_params"):
4633 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4634 db_nslcmop["operationParams"]["primitive_params"]
4635 )
4636
4637 nsr_deployed = db_nsr["_admin"].get("deployed")
4638 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4639 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4640 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4641 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4642 primitive = db_nslcmop["operationParams"]["primitive"]
4643 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4644 timeout_ns_action = db_nslcmop["operationParams"].get(
4645 "timeout_ns_action", self.timeout_primitive
4646 )
4647
4648 if vnf_index:
4649 step = "Getting vnfr from database"
4650 db_vnfr = self.db.get_one(
4651 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4652 )
4653 if db_vnfr.get("kdur"):
4654 kdur_list = []
4655 for kdur in db_vnfr["kdur"]:
4656 if kdur.get("additionalParams"):
4657 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
4658 kdur_list.append(kdur)
4659 db_vnfr["kdur"] = kdur_list
4660 step = "Getting vnfd from database"
4661 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4662 else:
4663 step = "Getting nsd from database"
4664 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4665
4666 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4667 # for backward compatibility
4668 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4669 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4670 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4671 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4672
4673 # look for primitive
4674 config_primitive_desc = descriptor_configuration = None
4675 if vdu_id:
4676 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4677 elif kdu_name:
4678 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4679 elif vnf_index:
4680 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4681 else:
4682 descriptor_configuration = db_nsd.get("ns-configuration")
4683
4684 if descriptor_configuration and descriptor_configuration.get(
4685 "config-primitive"
4686 ):
4687 for config_primitive in descriptor_configuration["config-primitive"]:
4688 if config_primitive["name"] == primitive:
4689 config_primitive_desc = config_primitive
4690 break
4691
4692 if not config_primitive_desc:
4693 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4694 raise LcmException(
4695 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4696 primitive
4697 )
4698 )
4699 primitive_name = primitive
4700 ee_descriptor_id = None
4701 else:
4702 primitive_name = config_primitive_desc.get(
4703 "execution-environment-primitive", primitive
4704 )
4705 ee_descriptor_id = config_primitive_desc.get(
4706 "execution-environment-ref"
4707 )
4708
4709 if vnf_index:
4710 if vdu_id:
4711 vdur = next(
4712 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4713 )
4714 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4715 elif kdu_name:
4716 kdur = next(
4717 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4718 )
4719 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4720 else:
4721 desc_params = parse_yaml_strings(
4722 db_vnfr.get("additionalParamsForVnf")
4723 )
4724 else:
4725 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4726 if kdu_name and get_configuration(db_vnfd, kdu_name):
4727 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4728 actions = set()
4729 for primitive in kdu_configuration.get("initial-config-primitive", []):
4730 actions.add(primitive["name"])
4731 for primitive in kdu_configuration.get("config-primitive", []):
4732 actions.add(primitive["name"])
4733 kdu_action = True if primitive_name in actions else False
4734
4735 # TODO check if ns is in a proper status
4736 if kdu_name and (
4737 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4738 ):
4739 # kdur and desc_params already set from before
4740 if primitive_params:
4741 desc_params.update(primitive_params)
4742 # TODO Check if we will need something at vnf level
4743 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4744 if (
4745 kdu_name == kdu["kdu-name"]
4746 and kdu["member-vnf-index"] == vnf_index
4747 ):
4748 break
4749 else:
4750 raise LcmException(
4751 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4752 )
4753
4754 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4755 msg = "unknown k8scluster-type '{}'".format(
4756 kdu.get("k8scluster-type")
4757 )
4758 raise LcmException(msg)
4759
4760 db_dict = {
4761 "collection": "nsrs",
4762 "filter": {"_id": nsr_id},
4763 "path": "_admin.deployed.K8s.{}".format(index),
4764 }
4765 self.logger.debug(
4766 logging_text
4767 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4768 )
4769 step = "Executing kdu {}".format(primitive_name)
4770 if primitive_name == "upgrade":
4771 if desc_params.get("kdu_model"):
4772 kdu_model = desc_params.get("kdu_model")
4773 del desc_params["kdu_model"]
4774 else:
4775 kdu_model = kdu.get("kdu-model")
4776 parts = kdu_model.split(sep=":")
4777 if len(parts) == 2:
4778 kdu_model = parts[0]
4779
4780 detailed_status = await asyncio.wait_for(
4781 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4782 cluster_uuid=kdu.get("k8scluster-uuid"),
4783 kdu_instance=kdu.get("kdu-instance"),
4784 atomic=True,
4785 kdu_model=kdu_model,
4786 params=desc_params,
4787 db_dict=db_dict,
4788 timeout=timeout_ns_action,
4789 ),
4790 timeout=timeout_ns_action + 10,
4791 )
4792 self.logger.debug(
4793 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4794 )
4795 elif primitive_name == "rollback":
4796 detailed_status = await asyncio.wait_for(
4797 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4798 cluster_uuid=kdu.get("k8scluster-uuid"),
4799 kdu_instance=kdu.get("kdu-instance"),
4800 db_dict=db_dict,
4801 ),
4802 timeout=timeout_ns_action,
4803 )
4804 elif primitive_name == "status":
4805 detailed_status = await asyncio.wait_for(
4806 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4807 cluster_uuid=kdu.get("k8scluster-uuid"),
4808 kdu_instance=kdu.get("kdu-instance"),
4809 vca_id=vca_id,
4810 ),
4811 timeout=timeout_ns_action,
4812 )
4813 else:
4814 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4815 kdu["kdu-name"], nsr_id
4816 )
4817 params = self._map_primitive_params(
4818 config_primitive_desc, primitive_params, desc_params
4819 )
4820
4821 detailed_status = await asyncio.wait_for(
4822 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4823 cluster_uuid=kdu.get("k8scluster-uuid"),
4824 kdu_instance=kdu_instance,
4825 primitive_name=primitive_name,
4826 params=params,
4827 db_dict=db_dict,
4828 timeout=timeout_ns_action,
4829 vca_id=vca_id,
4830 ),
4831 timeout=timeout_ns_action,
4832 )
4833
4834 if detailed_status:
4835 nslcmop_operation_state = "COMPLETED"
4836 else:
4837 detailed_status = ""
4838 nslcmop_operation_state = "FAILED"
4839 else:
4840 ee_id, vca_type = self._look_for_deployed_vca(
4841 nsr_deployed["VCA"],
4842 member_vnf_index=vnf_index,
4843 vdu_id=vdu_id,
4844 vdu_count_index=vdu_count_index,
4845 ee_descriptor_id=ee_descriptor_id,
4846 )
4847 for vca_index, vca_deployed in enumerate(
4848 db_nsr["_admin"]["deployed"]["VCA"]
4849 ):
4850 if vca_deployed.get("member-vnf-index") == vnf_index:
4851 db_dict = {
4852 "collection": "nsrs",
4853 "filter": {"_id": nsr_id},
4854 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4855 }
4856 break
4857 (
4858 nslcmop_operation_state,
4859 detailed_status,
4860 ) = await self._ns_execute_primitive(
4861 ee_id,
4862 primitive=primitive_name,
4863 primitive_params=self._map_primitive_params(
4864 config_primitive_desc, primitive_params, desc_params
4865 ),
4866 timeout=timeout_ns_action,
4867 vca_type=vca_type,
4868 db_dict=db_dict,
4869 vca_id=vca_id,
4870 )
4871
4872 db_nslcmop_update["detailed-status"] = detailed_status
4873 error_description_nslcmop = (
4874 detailed_status if nslcmop_operation_state == "FAILED" else ""
4875 )
4876 self.logger.debug(
4877 logging_text
4878 + " task Done with result {} {}".format(
4879 nslcmop_operation_state, detailed_status
4880 )
4881 )
4882 return # database update is called inside finally
4883
4884 except (DbException, LcmException, N2VCException, K8sException) as e:
4885 self.logger.error(logging_text + "Exit Exception {}".format(e))
4886 exc = e
4887 except asyncio.CancelledError:
4888 self.logger.error(
4889 logging_text + "Cancelled Exception while '{}'".format(step)
4890 )
4891 exc = "Operation was cancelled"
4892 except asyncio.TimeoutError:
4893 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4894 exc = "Timeout"
4895 except Exception as e:
4896 exc = traceback.format_exc()
4897 self.logger.critical(
4898 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4899 exc_info=True,
4900 )
4901 finally:
4902 if exc:
4903 db_nslcmop_update[
4904 "detailed-status"
4905 ] = (
4906 detailed_status
4907 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4908 nslcmop_operation_state = "FAILED"
4909 if db_nsr:
4910 self._write_ns_status(
4911 nsr_id=nsr_id,
4912 ns_state=db_nsr[
4913 "nsState"
4914 ], # TODO check if degraded. For the moment use previous status
4915 current_operation="IDLE",
4916 current_operation_id=None,
4917 # error_description=error_description_nsr,
4918 # error_detail=error_detail,
4919 other_update=db_nsr_update,
4920 )
4921
4922 self._write_op_status(
4923 op_id=nslcmop_id,
4924 stage="",
4925 error_message=error_description_nslcmop,
4926 operation_state=nslcmop_operation_state,
4927 other_update=db_nslcmop_update,
4928 )
4929
4930 if nslcmop_operation_state:
4931 try:
4932 await self.msg.aiowrite(
4933 "ns",
4934 "actioned",
4935 {
4936 "nsr_id": nsr_id,
4937 "nslcmop_id": nslcmop_id,
4938 "operationState": nslcmop_operation_state,
4939 },
4940 loop=self.loop,
4941 )
4942 except Exception as e:
4943 self.logger.error(
4944 logging_text + "kafka_write notification Exception {}".format(e)
4945 )
4946 self.logger.debug(logging_text + "Exit")
4947 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4948 return nslcmop_operation_state, detailed_status
4949
4950 async def scale(self, nsr_id, nslcmop_id):
4951 # Try to lock HA task here
4952 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4953 if not task_is_locked_by_me:
4954 return
4955
4956 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4957 stage = ["", "", ""]
4958 tasks_dict_info = {}
4959 # ^ stage, step, VIM progress
4960 self.logger.debug(logging_text + "Enter")
4961 # get all needed from database
4962 db_nsr = None
4963 db_nslcmop_update = {}
4964 db_nsr_update = {}
4965 exc = None
4966 # in case of error, indicates what part of scale was failed to put nsr at error status
4967 scale_process = None
4968 old_operational_status = ""
4969 old_config_status = ""
4970 nsi_id = None
4971 try:
4972 # wait for any previous tasks in process
4973 step = "Waiting for previous operations to terminate"
4974 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4975 self._write_ns_status(
4976 nsr_id=nsr_id,
4977 ns_state=None,
4978 current_operation="SCALING",
4979 current_operation_id=nslcmop_id,
4980 )
4981
4982 step = "Getting nslcmop from database"
4983 self.logger.debug(
4984 step + " after having waited for previous tasks to be completed"
4985 )
4986 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4987
4988 step = "Getting nsr from database"
4989 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4990 old_operational_status = db_nsr["operational-status"]
4991 old_config_status = db_nsr["config-status"]
4992
4993 step = "Parsing scaling parameters"
4994 db_nsr_update["operational-status"] = "scaling"
4995 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4996 nsr_deployed = db_nsr["_admin"].get("deployed")
4997
4998 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
4999 "scaleByStepData"
5000 ]["member-vnf-index"]
5001 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5002 "scaleByStepData"
5003 ]["scaling-group-descriptor"]
5004 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5005 # for backward compatibility
5006 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5007 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5008 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5009 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5010
5011 step = "Getting vnfr from database"
5012 db_vnfr = self.db.get_one(
5013 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5014 )
5015
5016 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5017
5018 step = "Getting vnfd from database"
5019 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5020
5021 base_folder = db_vnfd["_admin"]["storage"]
5022
5023 step = "Getting scaling-group-descriptor"
5024 scaling_descriptor = find_in_list(
5025 get_scaling_aspect(db_vnfd),
5026 lambda scale_desc: scale_desc["name"] == scaling_group,
5027 )
5028 if not scaling_descriptor:
5029 raise LcmException(
5030 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5031 "at vnfd:scaling-group-descriptor".format(scaling_group)
5032 )
5033
5034 step = "Sending scale order to VIM"
5035 # TODO check if ns is in a proper status
5036 nb_scale_op = 0
5037 if not db_nsr["_admin"].get("scaling-group"):
5038 self.update_db_2(
5039 "nsrs",
5040 nsr_id,
5041 {
5042 "_admin.scaling-group": [
5043 {"name": scaling_group, "nb-scale-op": 0}
5044 ]
5045 },
5046 )
5047 admin_scale_index = 0
5048 else:
5049 for admin_scale_index, admin_scale_info in enumerate(
5050 db_nsr["_admin"]["scaling-group"]
5051 ):
5052 if admin_scale_info["name"] == scaling_group:
5053 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5054 break
5055 else: # not found, set index one plus last element and add new entry with the name
5056 admin_scale_index += 1
5057 db_nsr_update[
5058 "_admin.scaling-group.{}.name".format(admin_scale_index)
5059 ] = scaling_group
5060
5061 vca_scaling_info = []
5062 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5063 if scaling_type == "SCALE_OUT":
5064 if "aspect-delta-details" not in scaling_descriptor:
5065 raise LcmException(
5066 "Aspect delta details not fount in scaling descriptor {}".format(
5067 scaling_descriptor["name"]
5068 )
5069 )
5070 # count if max-instance-count is reached
5071 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5072
5073 scaling_info["scaling_direction"] = "OUT"
5074 scaling_info["vdu-create"] = {}
5075 scaling_info["kdu-create"] = {}
5076 for delta in deltas:
5077 for vdu_delta in delta.get("vdu-delta", {}):
5078 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5079 # vdu_index also provides the number of instance of the targeted vdu
5080 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5081 cloud_init_text = self._get_vdu_cloud_init_content(
5082 vdud, db_vnfd
5083 )
5084 if cloud_init_text:
5085 additional_params = (
5086 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5087 or {}
5088 )
5089 cloud_init_list = []
5090
5091 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5092 max_instance_count = 10
5093 if vdu_profile and "max-number-of-instances" in vdu_profile:
5094 max_instance_count = vdu_profile.get(
5095 "max-number-of-instances", 10
5096 )
5097
5098 default_instance_num = get_number_of_instances(
5099 db_vnfd, vdud["id"]
5100 )
5101 instances_number = vdu_delta.get("number-of-instances", 1)
5102 nb_scale_op += instances_number
5103
5104 new_instance_count = nb_scale_op + default_instance_num
5105 # Control if new count is over max and vdu count is less than max.
5106 # Then assign new instance count
5107 if new_instance_count > max_instance_count > vdu_count:
5108 instances_number = new_instance_count - max_instance_count
5109 else:
5110 instances_number = instances_number
5111
5112 if new_instance_count > max_instance_count:
5113 raise LcmException(
5114 "reached the limit of {} (max-instance-count) "
5115 "scaling-out operations for the "
5116 "scaling-group-descriptor '{}'".format(
5117 nb_scale_op, scaling_group
5118 )
5119 )
5120 for x in range(vdu_delta.get("number-of-instances", 1)):
5121 if cloud_init_text:
5122 # TODO Information of its own ip is not available because db_vnfr is not updated.
5123 additional_params["OSM"] = get_osm_params(
5124 db_vnfr, vdu_delta["id"], vdu_index + x
5125 )
5126 cloud_init_list.append(
5127 self._parse_cloud_init(
5128 cloud_init_text,
5129 additional_params,
5130 db_vnfd["id"],
5131 vdud["id"],
5132 )
5133 )
5134 vca_scaling_info.append(
5135 {
5136 "osm_vdu_id": vdu_delta["id"],
5137 "member-vnf-index": vnf_index,
5138 "type": "create",
5139 "vdu_index": vdu_index + x,
5140 }
5141 )
5142 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5143 for kdu_delta in delta.get("kdu-resource-delta", {}):
5144 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5145 kdu_name = kdu_profile["kdu-name"]
5146 resource_name = kdu_profile["resource-name"]
5147
5148 # Might have different kdus in the same delta
5149 # Should have list for each kdu
5150 if not scaling_info["kdu-create"].get(kdu_name, None):
5151 scaling_info["kdu-create"][kdu_name] = []
5152
5153 kdur = get_kdur(db_vnfr, kdu_name)
5154 if kdur.get("helm-chart"):
5155 k8s_cluster_type = "helm-chart-v3"
5156 self.logger.debug("kdur: {}".format(kdur))
5157 if (
5158 kdur.get("helm-version")
5159 and kdur.get("helm-version") == "v2"
5160 ):
5161 k8s_cluster_type = "helm-chart"
5162 raise NotImplementedError
5163 elif kdur.get("juju-bundle"):
5164 k8s_cluster_type = "juju-bundle"
5165 else:
5166 raise LcmException(
5167 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5168 "juju-bundle. Maybe an old NBI version is running".format(
5169 db_vnfr["member-vnf-index-ref"], kdu_name
5170 )
5171 )
5172
5173 max_instance_count = 10
5174 if kdu_profile and "max-number-of-instances" in kdu_profile:
5175 max_instance_count = kdu_profile.get(
5176 "max-number-of-instances", 10
5177 )
5178
5179 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5180 deployed_kdu, _ = get_deployed_kdu(
5181 nsr_deployed, kdu_name, vnf_index
5182 )
5183 if deployed_kdu is None:
5184 raise LcmException(
5185 "KDU '{}' for vnf '{}' not deployed".format(
5186 kdu_name, vnf_index
5187 )
5188 )
5189 kdu_instance = deployed_kdu.get("kdu-instance")
5190 instance_num = await self.k8scluster_map[
5191 k8s_cluster_type
5192 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5193 kdu_replica_count = instance_num + kdu_delta.get(
5194 "number-of-instances", 1
5195 )
5196
5197 # Control if new count is over max and instance_num is less than max.
5198 # Then assign max instance number to kdu replica count
5199 if kdu_replica_count > max_instance_count > instance_num:
5200 kdu_replica_count = max_instance_count
5201 if kdu_replica_count > max_instance_count:
5202 raise LcmException(
5203 "reached the limit of {} (max-instance-count) "
5204 "scaling-out operations for the "
5205 "scaling-group-descriptor '{}'".format(
5206 instance_num, scaling_group
5207 )
5208 )
5209
5210 for x in range(kdu_delta.get("number-of-instances", 1)):
5211 vca_scaling_info.append(
5212 {
5213 "osm_kdu_id": kdu_name,
5214 "member-vnf-index": vnf_index,
5215 "type": "create",
5216 "kdu_index": instance_num + x - 1,
5217 }
5218 )
5219 scaling_info["kdu-create"][kdu_name].append(
5220 {
5221 "member-vnf-index": vnf_index,
5222 "type": "create",
5223 "k8s-cluster-type": k8s_cluster_type,
5224 "resource-name": resource_name,
5225 "scale": kdu_replica_count,
5226 }
5227 )
5228 elif scaling_type == "SCALE_IN":
5229 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5230
5231 scaling_info["scaling_direction"] = "IN"
5232 scaling_info["vdu-delete"] = {}
5233 scaling_info["kdu-delete"] = {}
5234
5235 for delta in deltas:
5236 for vdu_delta in delta.get("vdu-delta", {}):
5237 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5238 min_instance_count = 0
5239 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5240 if vdu_profile and "min-number-of-instances" in vdu_profile:
5241 min_instance_count = vdu_profile["min-number-of-instances"]
5242
5243 default_instance_num = get_number_of_instances(
5244 db_vnfd, vdu_delta["id"]
5245 )
5246 instance_num = vdu_delta.get("number-of-instances", 1)
5247 nb_scale_op -= instance_num
5248
5249 new_instance_count = nb_scale_op + default_instance_num
5250
5251 if new_instance_count < min_instance_count < vdu_count:
5252 instances_number = min_instance_count - new_instance_count
5253 else:
5254 instances_number = instance_num
5255
5256 if new_instance_count < min_instance_count:
5257 raise LcmException(
5258 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5259 "scaling-group-descriptor '{}'".format(
5260 nb_scale_op, scaling_group
5261 )
5262 )
5263 for x in range(vdu_delta.get("number-of-instances", 1)):
5264 vca_scaling_info.append(
5265 {
5266 "osm_vdu_id": vdu_delta["id"],
5267 "member-vnf-index": vnf_index,
5268 "type": "delete",
5269 "vdu_index": vdu_index - 1 - x,
5270 }
5271 )
5272 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5273 for kdu_delta in delta.get("kdu-resource-delta", {}):
5274 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5275 kdu_name = kdu_profile["kdu-name"]
5276 resource_name = kdu_profile["resource-name"]
5277
5278 if not scaling_info["kdu-delete"].get(kdu_name, None):
5279 scaling_info["kdu-delete"][kdu_name] = []
5280
5281 kdur = get_kdur(db_vnfr, kdu_name)
5282 if kdur.get("helm-chart"):
5283 k8s_cluster_type = "helm-chart-v3"
5284 self.logger.debug("kdur: {}".format(kdur))
5285 if (
5286 kdur.get("helm-version")
5287 and kdur.get("helm-version") == "v2"
5288 ):
5289 k8s_cluster_type = "helm-chart"
5290 raise NotImplementedError
5291 elif kdur.get("juju-bundle"):
5292 k8s_cluster_type = "juju-bundle"
5293 else:
5294 raise LcmException(
5295 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5296 "juju-bundle. Maybe an old NBI version is running".format(
5297 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5298 )
5299 )
5300
5301 min_instance_count = 0
5302 if kdu_profile and "min-number-of-instances" in kdu_profile:
5303 min_instance_count = kdu_profile["min-number-of-instances"]
5304
5305 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5306 deployed_kdu, _ = get_deployed_kdu(
5307 nsr_deployed, kdu_name, vnf_index
5308 )
5309 if deployed_kdu is None:
5310 raise LcmException(
5311 "KDU '{}' for vnf '{}' not deployed".format(
5312 kdu_name, vnf_index
5313 )
5314 )
5315 kdu_instance = deployed_kdu.get("kdu-instance")
5316 instance_num = await self.k8scluster_map[
5317 k8s_cluster_type
5318 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5319 kdu_replica_count = instance_num - kdu_delta.get(
5320 "number-of-instances", 1
5321 )
5322
5323 if kdu_replica_count < min_instance_count < instance_num:
5324 kdu_replica_count = min_instance_count
5325 if kdu_replica_count < min_instance_count:
5326 raise LcmException(
5327 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5328 "scaling-group-descriptor '{}'".format(
5329 instance_num, scaling_group
5330 )
5331 )
5332
5333 for x in range(kdu_delta.get("number-of-instances", 1)):
5334 vca_scaling_info.append(
5335 {
5336 "osm_kdu_id": kdu_name,
5337 "member-vnf-index": vnf_index,
5338 "type": "delete",
5339 "kdu_index": instance_num - x - 1,
5340 }
5341 )
5342 scaling_info["kdu-delete"][kdu_name].append(
5343 {
5344 "member-vnf-index": vnf_index,
5345 "type": "delete",
5346 "k8s-cluster-type": k8s_cluster_type,
5347 "resource-name": resource_name,
5348 "scale": kdu_replica_count,
5349 }
5350 )
5351
5352 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5353 vdu_delete = copy(scaling_info.get("vdu-delete"))
5354 if scaling_info["scaling_direction"] == "IN":
5355 for vdur in reversed(db_vnfr["vdur"]):
5356 if vdu_delete.get(vdur["vdu-id-ref"]):
5357 vdu_delete[vdur["vdu-id-ref"]] -= 1
5358 scaling_info["vdu"].append(
5359 {
5360 "name": vdur.get("name") or vdur.get("vdu-name"),
5361 "vdu_id": vdur["vdu-id-ref"],
5362 "interface": [],
5363 }
5364 )
5365 for interface in vdur["interfaces"]:
5366 scaling_info["vdu"][-1]["interface"].append(
5367 {
5368 "name": interface["name"],
5369 "ip_address": interface["ip-address"],
5370 "mac_address": interface.get("mac-address"),
5371 }
5372 )
5373 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5374
5375 # PRE-SCALE BEGIN
5376 step = "Executing pre-scale vnf-config-primitive"
5377 if scaling_descriptor.get("scaling-config-action"):
5378 for scaling_config_action in scaling_descriptor[
5379 "scaling-config-action"
5380 ]:
5381 if (
5382 scaling_config_action.get("trigger") == "pre-scale-in"
5383 and scaling_type == "SCALE_IN"
5384 ) or (
5385 scaling_config_action.get("trigger") == "pre-scale-out"
5386 and scaling_type == "SCALE_OUT"
5387 ):
5388 vnf_config_primitive = scaling_config_action[
5389 "vnf-config-primitive-name-ref"
5390 ]
5391 step = db_nslcmop_update[
5392 "detailed-status"
5393 ] = "executing pre-scale scaling-config-action '{}'".format(
5394 vnf_config_primitive
5395 )
5396
5397 # look for primitive
5398 for config_primitive in (
5399 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5400 ).get("config-primitive", ()):
5401 if config_primitive["name"] == vnf_config_primitive:
5402 break
5403 else:
5404 raise LcmException(
5405 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5406 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5407 "primitive".format(scaling_group, vnf_config_primitive)
5408 )
5409
5410 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5411 if db_vnfr.get("additionalParamsForVnf"):
5412 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5413
5414 scale_process = "VCA"
5415 db_nsr_update["config-status"] = "configuring pre-scaling"
5416 primitive_params = self._map_primitive_params(
5417 config_primitive, {}, vnfr_params
5418 )
5419
5420 # Pre-scale retry check: Check if this sub-operation has been executed before
5421 op_index = self._check_or_add_scale_suboperation(
5422 db_nslcmop,
5423 vnf_index,
5424 vnf_config_primitive,
5425 primitive_params,
5426 "PRE-SCALE",
5427 )
5428 if op_index == self.SUBOPERATION_STATUS_SKIP:
5429 # Skip sub-operation
5430 result = "COMPLETED"
5431 result_detail = "Done"
5432 self.logger.debug(
5433 logging_text
5434 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5435 vnf_config_primitive, result, result_detail
5436 )
5437 )
5438 else:
5439 if op_index == self.SUBOPERATION_STATUS_NEW:
5440 # New sub-operation: Get index of this sub-operation
5441 op_index = (
5442 len(db_nslcmop.get("_admin", {}).get("operations"))
5443 - 1
5444 )
5445 self.logger.debug(
5446 logging_text
5447 + "vnf_config_primitive={} New sub-operation".format(
5448 vnf_config_primitive
5449 )
5450 )
5451 else:
5452 # retry: Get registered params for this existing sub-operation
5453 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5454 op_index
5455 ]
5456 vnf_index = op.get("member_vnf_index")
5457 vnf_config_primitive = op.get("primitive")
5458 primitive_params = op.get("primitive_params")
5459 self.logger.debug(
5460 logging_text
5461 + "vnf_config_primitive={} Sub-operation retry".format(
5462 vnf_config_primitive
5463 )
5464 )
5465 # Execute the primitive, either with new (first-time) or registered (reintent) args
5466 ee_descriptor_id = config_primitive.get(
5467 "execution-environment-ref"
5468 )
5469 primitive_name = config_primitive.get(
5470 "execution-environment-primitive", vnf_config_primitive
5471 )
5472 ee_id, vca_type = self._look_for_deployed_vca(
5473 nsr_deployed["VCA"],
5474 member_vnf_index=vnf_index,
5475 vdu_id=None,
5476 vdu_count_index=None,
5477 ee_descriptor_id=ee_descriptor_id,
5478 )
5479 result, result_detail = await self._ns_execute_primitive(
5480 ee_id,
5481 primitive_name,
5482 primitive_params,
5483 vca_type=vca_type,
5484 vca_id=vca_id,
5485 )
5486 self.logger.debug(
5487 logging_text
5488 + "vnf_config_primitive={} Done with result {} {}".format(
5489 vnf_config_primitive, result, result_detail
5490 )
5491 )
5492 # Update operationState = COMPLETED | FAILED
5493 self._update_suboperation_status(
5494 db_nslcmop, op_index, result, result_detail
5495 )
5496
5497 if result == "FAILED":
5498 raise LcmException(result_detail)
5499 db_nsr_update["config-status"] = old_config_status
5500 scale_process = None
5501 # PRE-SCALE END
5502
5503 db_nsr_update[
5504 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5505 ] = nb_scale_op
5506 db_nsr_update[
5507 "_admin.scaling-group.{}.time".format(admin_scale_index)
5508 ] = time()
5509
5510 # SCALE-IN VCA - BEGIN
5511 if vca_scaling_info:
5512 step = db_nslcmop_update[
5513 "detailed-status"
5514 ] = "Deleting the execution environments"
5515 scale_process = "VCA"
5516 for vca_info in vca_scaling_info:
5517 if vca_info["type"] == "delete":
5518 member_vnf_index = str(vca_info["member-vnf-index"])
5519 self.logger.debug(
5520 logging_text + "vdu info: {}".format(vca_info)
5521 )
5522 if vca_info.get("osm_vdu_id"):
5523 vdu_id = vca_info["osm_vdu_id"]
5524 vdu_index = int(vca_info["vdu_index"])
5525 stage[
5526 1
5527 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5528 member_vnf_index, vdu_id, vdu_index
5529 )
5530 else:
5531 vdu_index = 0
5532 kdu_id = vca_info["osm_kdu_id"]
5533 stage[
5534 1
5535 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5536 member_vnf_index, kdu_id, vdu_index
5537 )
5538 stage[2] = step = "Scaling in VCA"
5539 self._write_op_status(op_id=nslcmop_id, stage=stage)
5540 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5541 config_update = db_nsr["configurationStatus"]
5542 for vca_index, vca in enumerate(vca_update):
5543 if (
5544 (vca or vca.get("ee_id"))
5545 and vca["member-vnf-index"] == member_vnf_index
5546 and vca["vdu_count_index"] == vdu_index
5547 ):
5548 if vca.get("vdu_id"):
5549 config_descriptor = get_configuration(
5550 db_vnfd, vca.get("vdu_id")
5551 )
5552 elif vca.get("kdu_name"):
5553 config_descriptor = get_configuration(
5554 db_vnfd, vca.get("kdu_name")
5555 )
5556 else:
5557 config_descriptor = get_configuration(
5558 db_vnfd, db_vnfd["id"]
5559 )
5560 operation_params = (
5561 db_nslcmop.get("operationParams") or {}
5562 )
5563 exec_terminate_primitives = not operation_params.get(
5564 "skip_terminate_primitives"
5565 ) and vca.get("needed_terminate")
5566 task = asyncio.ensure_future(
5567 asyncio.wait_for(
5568 self.destroy_N2VC(
5569 logging_text,
5570 db_nslcmop,
5571 vca,
5572 config_descriptor,
5573 vca_index,
5574 destroy_ee=True,
5575 exec_primitives=exec_terminate_primitives,
5576 scaling_in=True,
5577 vca_id=vca_id,
5578 ),
5579 timeout=self.timeout_charm_delete,
5580 )
5581 )
5582 tasks_dict_info[task] = "Terminating VCA {}".format(
5583 vca.get("ee_id")
5584 )
5585 del vca_update[vca_index]
5586 del config_update[vca_index]
5587 # wait for pending tasks of terminate primitives
5588 if tasks_dict_info:
5589 self.logger.debug(
5590 logging_text
5591 + "Waiting for tasks {}".format(
5592 list(tasks_dict_info.keys())
5593 )
5594 )
5595 error_list = await self._wait_for_tasks(
5596 logging_text,
5597 tasks_dict_info,
5598 min(
5599 self.timeout_charm_delete, self.timeout_ns_terminate
5600 ),
5601 stage,
5602 nslcmop_id,
5603 )
5604 tasks_dict_info.clear()
5605 if error_list:
5606 raise LcmException("; ".join(error_list))
5607
5608 db_vca_and_config_update = {
5609 "_admin.deployed.VCA": vca_update,
5610 "configurationStatus": config_update,
5611 }
5612 self.update_db_2(
5613 "nsrs", db_nsr["_id"], db_vca_and_config_update
5614 )
5615 scale_process = None
5616 # SCALE-IN VCA - END
5617
5618 # SCALE RO - BEGIN
5619 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5620 scale_process = "RO"
5621 if self.ro_config.get("ng"):
5622 await self._scale_ng_ro(
5623 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5624 )
5625 scaling_info.pop("vdu-create", None)
5626 scaling_info.pop("vdu-delete", None)
5627
5628 scale_process = None
5629 # SCALE RO - END
5630
5631 # SCALE KDU - BEGIN
5632 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5633 scale_process = "KDU"
5634 await self._scale_kdu(
5635 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5636 )
5637 scaling_info.pop("kdu-create", None)
5638 scaling_info.pop("kdu-delete", None)
5639
5640 scale_process = None
5641 # SCALE KDU - END
5642
5643 if db_nsr_update:
5644 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5645
5646 # SCALE-UP VCA - BEGIN
5647 if vca_scaling_info:
5648 step = db_nslcmop_update[
5649 "detailed-status"
5650 ] = "Creating new execution environments"
5651 scale_process = "VCA"
5652 for vca_info in vca_scaling_info:
5653 if vca_info["type"] == "create":
5654 member_vnf_index = str(vca_info["member-vnf-index"])
5655 self.logger.debug(
5656 logging_text + "vdu info: {}".format(vca_info)
5657 )
5658 vnfd_id = db_vnfr["vnfd-ref"]
5659 if vca_info.get("osm_vdu_id"):
5660 vdu_index = int(vca_info["vdu_index"])
5661 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5662 if db_vnfr.get("additionalParamsForVnf"):
5663 deploy_params.update(
5664 parse_yaml_strings(
5665 db_vnfr["additionalParamsForVnf"].copy()
5666 )
5667 )
5668 descriptor_config = get_configuration(
5669 db_vnfd, db_vnfd["id"]
5670 )
5671 if descriptor_config:
5672 vdu_id = None
5673 vdu_name = None
5674 kdu_name = None
5675 self._deploy_n2vc(
5676 logging_text=logging_text
5677 + "member_vnf_index={} ".format(member_vnf_index),
5678 db_nsr=db_nsr,
5679 db_vnfr=db_vnfr,
5680 nslcmop_id=nslcmop_id,
5681 nsr_id=nsr_id,
5682 nsi_id=nsi_id,
5683 vnfd_id=vnfd_id,
5684 vdu_id=vdu_id,
5685 kdu_name=kdu_name,
5686 member_vnf_index=member_vnf_index,
5687 vdu_index=vdu_index,
5688 vdu_name=vdu_name,
5689 deploy_params=deploy_params,
5690 descriptor_config=descriptor_config,
5691 base_folder=base_folder,
5692 task_instantiation_info=tasks_dict_info,
5693 stage=stage,
5694 )
5695 vdu_id = vca_info["osm_vdu_id"]
5696 vdur = find_in_list(
5697 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5698 )
5699 descriptor_config = get_configuration(db_vnfd, vdu_id)
5700 if vdur.get("additionalParams"):
5701 deploy_params_vdu = parse_yaml_strings(
5702 vdur["additionalParams"]
5703 )
5704 else:
5705 deploy_params_vdu = deploy_params
5706 deploy_params_vdu["OSM"] = get_osm_params(
5707 db_vnfr, vdu_id, vdu_count_index=vdu_index
5708 )
5709 if descriptor_config:
5710 vdu_name = None
5711 kdu_name = None
5712 stage[
5713 1
5714 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5715 member_vnf_index, vdu_id, vdu_index
5716 )
5717 stage[2] = step = "Scaling out VCA"
5718 self._write_op_status(op_id=nslcmop_id, stage=stage)
5719 self._deploy_n2vc(
5720 logging_text=logging_text
5721 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5722 member_vnf_index, vdu_id, vdu_index
5723 ),
5724 db_nsr=db_nsr,
5725 db_vnfr=db_vnfr,
5726 nslcmop_id=nslcmop_id,
5727 nsr_id=nsr_id,
5728 nsi_id=nsi_id,
5729 vnfd_id=vnfd_id,
5730 vdu_id=vdu_id,
5731 kdu_name=kdu_name,
5732 member_vnf_index=member_vnf_index,
5733 vdu_index=vdu_index,
5734 vdu_name=vdu_name,
5735 deploy_params=deploy_params_vdu,
5736 descriptor_config=descriptor_config,
5737 base_folder=base_folder,
5738 task_instantiation_info=tasks_dict_info,
5739 stage=stage,
5740 )
5741 else:
5742 kdu_name = vca_info["osm_kdu_id"]
5743 descriptor_config = get_configuration(db_vnfd, kdu_name)
5744 if descriptor_config:
5745 vdu_id = None
5746 kdu_index = int(vca_info["kdu_index"])
5747 vdu_name = None
5748 kdur = next(
5749 x
5750 for x in db_vnfr["kdur"]
5751 if x["kdu-name"] == kdu_name
5752 )
5753 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5754 if kdur.get("additionalParams"):
5755 deploy_params_kdu = parse_yaml_strings(
5756 kdur["additionalParams"]
5757 )
5758
5759 self._deploy_n2vc(
5760 logging_text=logging_text,
5761 db_nsr=db_nsr,
5762 db_vnfr=db_vnfr,
5763 nslcmop_id=nslcmop_id,
5764 nsr_id=nsr_id,
5765 nsi_id=nsi_id,
5766 vnfd_id=vnfd_id,
5767 vdu_id=vdu_id,
5768 kdu_name=kdu_name,
5769 member_vnf_index=member_vnf_index,
5770 vdu_index=kdu_index,
5771 vdu_name=vdu_name,
5772 deploy_params=deploy_params_kdu,
5773 descriptor_config=descriptor_config,
5774 base_folder=base_folder,
5775 task_instantiation_info=tasks_dict_info,
5776 stage=stage,
5777 )
5778 # SCALE-UP VCA - END
5779 scale_process = None
5780
5781 # POST-SCALE BEGIN
5782 # execute primitive service POST-SCALING
5783 step = "Executing post-scale vnf-config-primitive"
5784 if scaling_descriptor.get("scaling-config-action"):
5785 for scaling_config_action in scaling_descriptor[
5786 "scaling-config-action"
5787 ]:
5788 if (
5789 scaling_config_action.get("trigger") == "post-scale-in"
5790 and scaling_type == "SCALE_IN"
5791 ) or (
5792 scaling_config_action.get("trigger") == "post-scale-out"
5793 and scaling_type == "SCALE_OUT"
5794 ):
5795 vnf_config_primitive = scaling_config_action[
5796 "vnf-config-primitive-name-ref"
5797 ]
5798 step = db_nslcmop_update[
5799 "detailed-status"
5800 ] = "executing post-scale scaling-config-action '{}'".format(
5801 vnf_config_primitive
5802 )
5803
5804 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5805 if db_vnfr.get("additionalParamsForVnf"):
5806 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5807
5808 # look for primitive
5809 for config_primitive in (
5810 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5811 ).get("config-primitive", ()):
5812 if config_primitive["name"] == vnf_config_primitive:
5813 break
5814 else:
5815 raise LcmException(
5816 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5817 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5818 "config-primitive".format(
5819 scaling_group, vnf_config_primitive
5820 )
5821 )
5822 scale_process = "VCA"
5823 db_nsr_update["config-status"] = "configuring post-scaling"
5824 primitive_params = self._map_primitive_params(
5825 config_primitive, {}, vnfr_params
5826 )
5827
5828 # Post-scale retry check: Check if this sub-operation has been executed before
5829 op_index = self._check_or_add_scale_suboperation(
5830 db_nslcmop,
5831 vnf_index,
5832 vnf_config_primitive,
5833 primitive_params,
5834 "POST-SCALE",
5835 )
5836 if op_index == self.SUBOPERATION_STATUS_SKIP:
5837 # Skip sub-operation
5838 result = "COMPLETED"
5839 result_detail = "Done"
5840 self.logger.debug(
5841 logging_text
5842 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5843 vnf_config_primitive, result, result_detail
5844 )
5845 )
5846 else:
5847 if op_index == self.SUBOPERATION_STATUS_NEW:
5848 # New sub-operation: Get index of this sub-operation
5849 op_index = (
5850 len(db_nslcmop.get("_admin", {}).get("operations"))
5851 - 1
5852 )
5853 self.logger.debug(
5854 logging_text
5855 + "vnf_config_primitive={} New sub-operation".format(
5856 vnf_config_primitive
5857 )
5858 )
5859 else:
5860 # retry: Get registered params for this existing sub-operation
5861 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5862 op_index
5863 ]
5864 vnf_index = op.get("member_vnf_index")
5865 vnf_config_primitive = op.get("primitive")
5866 primitive_params = op.get("primitive_params")
5867 self.logger.debug(
5868 logging_text
5869 + "vnf_config_primitive={} Sub-operation retry".format(
5870 vnf_config_primitive
5871 )
5872 )
5873 # Execute the primitive, either with new (first-time) or registered (reintent) args
5874 ee_descriptor_id = config_primitive.get(
5875 "execution-environment-ref"
5876 )
5877 primitive_name = config_primitive.get(
5878 "execution-environment-primitive", vnf_config_primitive
5879 )
5880 ee_id, vca_type = self._look_for_deployed_vca(
5881 nsr_deployed["VCA"],
5882 member_vnf_index=vnf_index,
5883 vdu_id=None,
5884 vdu_count_index=None,
5885 ee_descriptor_id=ee_descriptor_id,
5886 )
5887 result, result_detail = await self._ns_execute_primitive(
5888 ee_id,
5889 primitive_name,
5890 primitive_params,
5891 vca_type=vca_type,
5892 vca_id=vca_id,
5893 )
5894 self.logger.debug(
5895 logging_text
5896 + "vnf_config_primitive={} Done with result {} {}".format(
5897 vnf_config_primitive, result, result_detail
5898 )
5899 )
5900 # Update operationState = COMPLETED | FAILED
5901 self._update_suboperation_status(
5902 db_nslcmop, op_index, result, result_detail
5903 )
5904
5905 if result == "FAILED":
5906 raise LcmException(result_detail)
5907 db_nsr_update["config-status"] = old_config_status
5908 scale_process = None
5909 # POST-SCALE END
5910
5911 db_nsr_update[
5912 "detailed-status"
5913 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5914 db_nsr_update["operational-status"] = (
5915 "running"
5916 if old_operational_status == "failed"
5917 else old_operational_status
5918 )
5919 db_nsr_update["config-status"] = old_config_status
5920 return
5921 except (
5922 ROclient.ROClientException,
5923 DbException,
5924 LcmException,
5925 NgRoException,
5926 ) as e:
5927 self.logger.error(logging_text + "Exit Exception {}".format(e))
5928 exc = e
5929 except asyncio.CancelledError:
5930 self.logger.error(
5931 logging_text + "Cancelled Exception while '{}'".format(step)
5932 )
5933 exc = "Operation was cancelled"
5934 except Exception as e:
5935 exc = traceback.format_exc()
5936 self.logger.critical(
5937 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5938 exc_info=True,
5939 )
5940 finally:
5941 self._write_ns_status(
5942 nsr_id=nsr_id,
5943 ns_state=None,
5944 current_operation="IDLE",
5945 current_operation_id=None,
5946 )
5947 if tasks_dict_info:
5948 stage[1] = "Waiting for instantiate pending tasks."
5949 self.logger.debug(logging_text + stage[1])
5950 exc = await self._wait_for_tasks(
5951 logging_text,
5952 tasks_dict_info,
5953 self.timeout_ns_deploy,
5954 stage,
5955 nslcmop_id,
5956 nsr_id=nsr_id,
5957 )
5958 if exc:
5959 db_nslcmop_update[
5960 "detailed-status"
5961 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5962 nslcmop_operation_state = "FAILED"
5963 if db_nsr:
5964 db_nsr_update["operational-status"] = old_operational_status
5965 db_nsr_update["config-status"] = old_config_status
5966 db_nsr_update["detailed-status"] = ""
5967 if scale_process:
5968 if "VCA" in scale_process:
5969 db_nsr_update["config-status"] = "failed"
5970 if "RO" in scale_process:
5971 db_nsr_update["operational-status"] = "failed"
5972 db_nsr_update[
5973 "detailed-status"
5974 ] = "FAILED scaling nslcmop={} {}: {}".format(
5975 nslcmop_id, step, exc
5976 )
5977 else:
5978 error_description_nslcmop = None
5979 nslcmop_operation_state = "COMPLETED"
5980 db_nslcmop_update["detailed-status"] = "Done"
5981
5982 self._write_op_status(
5983 op_id=nslcmop_id,
5984 stage="",
5985 error_message=error_description_nslcmop,
5986 operation_state=nslcmop_operation_state,
5987 other_update=db_nslcmop_update,
5988 )
5989 if db_nsr:
5990 self._write_ns_status(
5991 nsr_id=nsr_id,
5992 ns_state=None,
5993 current_operation="IDLE",
5994 current_operation_id=None,
5995 other_update=db_nsr_update,
5996 )
5997
5998 if nslcmop_operation_state:
5999 try:
6000 msg = {
6001 "nsr_id": nsr_id,
6002 "nslcmop_id": nslcmop_id,
6003 "operationState": nslcmop_operation_state,
6004 }
6005 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6006 except Exception as e:
6007 self.logger.error(
6008 logging_text + "kafka_write notification Exception {}".format(e)
6009 )
6010 self.logger.debug(logging_text + "Exit")
6011 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6012
6013 async def _scale_kdu(
6014 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6015 ):
6016 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6017 for kdu_name in _scaling_info:
6018 for kdu_scaling_info in _scaling_info[kdu_name]:
6019 deployed_kdu, index = get_deployed_kdu(
6020 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6021 )
6022 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6023 kdu_instance = deployed_kdu["kdu-instance"]
6024 scale = int(kdu_scaling_info["scale"])
6025 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6026
6027 db_dict = {
6028 "collection": "nsrs",
6029 "filter": {"_id": nsr_id},
6030 "path": "_admin.deployed.K8s.{}".format(index),
6031 }
6032
6033 step = "scaling application {}".format(
6034 kdu_scaling_info["resource-name"]
6035 )
6036 self.logger.debug(logging_text + step)
6037
6038 if kdu_scaling_info["type"] == "delete":
6039 kdu_config = get_configuration(db_vnfd, kdu_name)
6040 if (
6041 kdu_config
6042 and kdu_config.get("terminate-config-primitive")
6043 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6044 ):
6045 terminate_config_primitive_list = kdu_config.get(
6046 "terminate-config-primitive"
6047 )
6048 terminate_config_primitive_list.sort(
6049 key=lambda val: int(val["seq"])
6050 )
6051
6052 for (
6053 terminate_config_primitive
6054 ) in terminate_config_primitive_list:
6055 primitive_params_ = self._map_primitive_params(
6056 terminate_config_primitive, {}, {}
6057 )
6058 step = "execute terminate config primitive"
6059 self.logger.debug(logging_text + step)
6060 await asyncio.wait_for(
6061 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6062 cluster_uuid=cluster_uuid,
6063 kdu_instance=kdu_instance,
6064 primitive_name=terminate_config_primitive["name"],
6065 params=primitive_params_,
6066 db_dict=db_dict,
6067 vca_id=vca_id,
6068 ),
6069 timeout=600,
6070 )
6071
6072 await asyncio.wait_for(
6073 self.k8scluster_map[k8s_cluster_type].scale(
6074 kdu_instance,
6075 scale,
6076 kdu_scaling_info["resource-name"],
6077 vca_id=vca_id,
6078 ),
6079 timeout=self.timeout_vca_on_error,
6080 )
6081
6082 if kdu_scaling_info["type"] == "create":
6083 kdu_config = get_configuration(db_vnfd, kdu_name)
6084 if (
6085 kdu_config
6086 and kdu_config.get("initial-config-primitive")
6087 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6088 ):
6089 initial_config_primitive_list = kdu_config.get(
6090 "initial-config-primitive"
6091 )
6092 initial_config_primitive_list.sort(
6093 key=lambda val: int(val["seq"])
6094 )
6095
6096 for initial_config_primitive in initial_config_primitive_list:
6097 primitive_params_ = self._map_primitive_params(
6098 initial_config_primitive, {}, {}
6099 )
6100 step = "execute initial config primitive"
6101 self.logger.debug(logging_text + step)
6102 await asyncio.wait_for(
6103 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6104 cluster_uuid=cluster_uuid,
6105 kdu_instance=kdu_instance,
6106 primitive_name=initial_config_primitive["name"],
6107 params=primitive_params_,
6108 db_dict=db_dict,
6109 vca_id=vca_id,
6110 ),
6111 timeout=600,
6112 )
6113
6114 async def _scale_ng_ro(
6115 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6116 ):
6117 nsr_id = db_nslcmop["nsInstanceId"]
6118 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6119 db_vnfrs = {}
6120
6121 # read from db: vnfd's for every vnf
6122 db_vnfds = []
6123
6124 # for each vnf in ns, read vnfd
6125 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6126 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6127 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6128 # if we haven't this vnfd, read it from db
6129 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6130 # read from db
6131 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6132 db_vnfds.append(vnfd)
6133 n2vc_key = self.n2vc.get_public_key()
6134 n2vc_key_list = [n2vc_key]
6135 self.scale_vnfr(
6136 db_vnfr,
6137 vdu_scaling_info.get("vdu-create"),
6138 vdu_scaling_info.get("vdu-delete"),
6139 mark_delete=True,
6140 )
6141 # db_vnfr has been updated, update db_vnfrs to use it
6142 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6143 await self._instantiate_ng_ro(
6144 logging_text,
6145 nsr_id,
6146 db_nsd,
6147 db_nsr,
6148 db_nslcmop,
6149 db_vnfrs,
6150 db_vnfds,
6151 n2vc_key_list,
6152 stage=stage,
6153 start_deploy=time(),
6154 timeout_ns_deploy=self.timeout_ns_deploy,
6155 )
6156 if vdu_scaling_info.get("vdu-delete"):
6157 self.scale_vnfr(
6158 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6159 )
6160
6161 async def add_prometheus_metrics(
6162 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6163 ):
6164 if not self.prometheus:
6165 return
6166 # look if exist a file called 'prometheus*.j2' and
6167 artifact_content = self.fs.dir_ls(artifact_path)
6168 job_file = next(
6169 (
6170 f
6171 for f in artifact_content
6172 if f.startswith("prometheus") and f.endswith(".j2")
6173 ),
6174 None,
6175 )
6176 if not job_file:
6177 return
6178 with self.fs.file_open((artifact_path, job_file), "r") as f:
6179 job_data = f.read()
6180
6181 # TODO get_service
6182 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6183 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6184 host_port = "80"
6185 vnfr_id = vnfr_id.replace("-", "")
6186 variables = {
6187 "JOB_NAME": vnfr_id,
6188 "TARGET_IP": target_ip,
6189 "EXPORTER_POD_IP": host_name,
6190 "EXPORTER_POD_PORT": host_port,
6191 }
6192 job_list = self.prometheus.parse_job(job_data, variables)
6193 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6194 for job in job_list:
6195 if (
6196 not isinstance(job.get("job_name"), str)
6197 or vnfr_id not in job["job_name"]
6198 ):
6199 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6200 job["nsr_id"] = nsr_id
6201 job_dict = {jl["job_name"]: jl for jl in job_list}
6202 if await self.prometheus.update(job_dict):
6203 return list(job_dict.keys())
6204
6205 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6206 """
6207 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6208
6209 :param: vim_account_id: VIM Account ID
6210
6211 :return: (cloud_name, cloud_credential)
6212 """
6213 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6214 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6215
6216 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6217 """
6218 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6219
6220 :param: vim_account_id: VIM Account ID
6221
6222 :return: (cloud_name, cloud_credential)
6223 """
6224 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6225 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")