1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
33 from osm_lcm
import ROclient
34 from osm_lcm
.data_utils
.nsr
import get_deployed_kdu
35 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
36 from osm_lcm
.lcm_utils
import (
44 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
45 from osm_lcm
.data_utils
.vnfd
import (
48 get_ee_sorted_initial_config_primitive_list
,
49 get_ee_sorted_terminate_config_primitive_list
,
51 get_virtual_link_profiles
,
56 get_number_of_instances
,
60 from osm_lcm
.data_utils
.list_utils
import find_in_list
61 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
62 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
63 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
64 from n2vc
.k8s_helm_conn
import K8sHelmConnector
65 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
66 from n2vc
.k8s_juju_conn
import K8sJujuConnector
68 from osm_common
.dbbase
import DbException
69 from osm_common
.fsbase
import FsException
71 from osm_lcm
.data_utils
.database
.database
import Database
72 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
74 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
75 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
77 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
79 from copy
import copy
, deepcopy
81 from uuid
import uuid4
83 from random
import randint
85 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
89 timeout_vca_on_error
= (
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete
= 10 * 60
95 timeout_primitive
= 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive
= (
98 ) # timeout for some progress in a primitive execution
100 SUBOPERATION_STATUS_NOT_FOUND
= -1
101 SUBOPERATION_STATUS_NEW
= -2
102 SUBOPERATION_STATUS_SKIP
= -3
103 task_name_deploy_vca
= "Deploying VCA"
105 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
111 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
113 self
.db
= Database().instance
.db
114 self
.fs
= Filesystem().instance
.fs
116 self
.lcm_tasks
= lcm_tasks
117 self
.timeout
= config
["timeout"]
118 self
.ro_config
= config
["ro_config"]
119 self
.ng_ro
= config
["ro_config"].get("ng")
120 self
.vca_config
= config
["VCA"].copy()
122 # create N2VC connector
123 self
.n2vc
= N2VCJujuConnector(
126 on_update_db
=self
._on
_update
_n
2vc
_db
,
131 self
.conn_helm_ee
= LCMHelmConn(
134 vca_config
=self
.vca_config
,
135 on_update_db
=self
._on
_update
_n
2vc
_db
,
138 self
.k8sclusterhelm2
= K8sHelmConnector(
139 kubectl_command
=self
.vca_config
.get("kubectlpath"),
140 helm_command
=self
.vca_config
.get("helmpath"),
147 self
.k8sclusterhelm3
= K8sHelm3Connector(
148 kubectl_command
=self
.vca_config
.get("kubectlpath"),
149 helm_command
=self
.vca_config
.get("helm3path"),
156 self
.k8sclusterjuju
= K8sJujuConnector(
157 kubectl_command
=self
.vca_config
.get("kubectlpath"),
158 juju_command
=self
.vca_config
.get("jujupath"),
161 on_update_db
=self
._on
_update
_k
8s
_db
,
166 self
.k8scluster_map
= {
167 "helm-chart": self
.k8sclusterhelm2
,
168 "helm-chart-v3": self
.k8sclusterhelm3
,
169 "chart": self
.k8sclusterhelm3
,
170 "juju-bundle": self
.k8sclusterjuju
,
171 "juju": self
.k8sclusterjuju
,
175 "lxc_proxy_charm": self
.n2vc
,
176 "native_charm": self
.n2vc
,
177 "k8s_proxy_charm": self
.n2vc
,
178 "helm": self
.conn_helm_ee
,
179 "helm-v3": self
.conn_helm_ee
,
182 self
.prometheus
= prometheus
185 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
188 def increment_ip_mac(ip_mac
, vm_index
=1):
189 if not isinstance(ip_mac
, str):
192 # try with ipv4 look for last dot
193 i
= ip_mac
.rfind(".")
196 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i
= ip_mac
.rfind(":")
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
203 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
209 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
214 # TODO filter RO descriptor fields...
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict
["deploymentStatus"] = ro_descriptor
220 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
222 except Exception as e
:
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
227 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
229 # remove last dot from path (if exists)
230 if path
.endswith("."):
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
237 nsr_id
= filter.get("_id")
239 # read ns record from database
240 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
241 current_ns_status
= nsr
.get("nsState")
243 # get vca status for NS
244 status_dict
= await self
.n2vc
.get_status(
245 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
250 db_dict
["vcaStatus"] = status_dict
251 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
253 # update configurationStatus for this VCA
255 vca_index
= int(path
[path
.rfind(".") + 1 :])
258 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
260 vca_status
= vca_list
[vca_index
].get("status")
262 configuration_status_list
= nsr
.get("configurationStatus")
263 config_status
= configuration_status_list
[vca_index
].get("status")
265 if config_status
== "BROKEN" and vca_status
!= "failed":
266 db_dict
["configurationStatus"][vca_index
] = "READY"
267 elif config_status
!= "BROKEN" and vca_status
== "failed":
268 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
269 except Exception as e
:
270 # not update configurationStatus
271 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
276 if current_ns_status
in ("READY", "DEGRADED"):
277 error_description
= ""
279 if status_dict
.get("machines"):
280 for machine_id
in status_dict
.get("machines"):
281 machine
= status_dict
.get("machines").get(machine_id
)
282 # check machine agent-status
283 if machine
.get("agent-status"):
284 s
= machine
.get("agent-status").get("status")
287 error_description
+= (
288 "machine {} agent-status={} ; ".format(
292 # check machine instance status
293 if machine
.get("instance-status"):
294 s
= machine
.get("instance-status").get("status")
297 error_description
+= (
298 "machine {} instance-status={} ; ".format(
303 if status_dict
.get("applications"):
304 for app_id
in status_dict
.get("applications"):
305 app
= status_dict
.get("applications").get(app_id
)
306 # check application status
307 if app
.get("status"):
308 s
= app
.get("status").get("status")
311 error_description
+= (
312 "application {} status={} ; ".format(app_id
, s
)
315 if error_description
:
316 db_dict
["errorDescription"] = error_description
317 if current_ns_status
== "READY" and is_degraded
:
318 db_dict
["nsState"] = "DEGRADED"
319 if current_ns_status
== "DEGRADED" and not is_degraded
:
320 db_dict
["nsState"] = "READY"
323 self
.update_db_2("nsrs", nsr_id
, db_dict
)
325 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
327 except Exception as e
:
328 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
330 async def _on_update_k8s_db(
331 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
345 nsr_id
= filter.get("_id")
347 # get vca status for NS
348 vca_status
= await self
.k8sclusterjuju
.status_kdu(
351 complete_status
=True,
357 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
359 await self
.k8sclusterjuju
.update_vca_status(
360 db_dict
["vcaStatus"],
366 self
.update_db_2("nsrs", nsr_id
, db_dict
)
368 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
370 except Exception as e
:
371 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
374 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
376 env
= Environment(undefined
=StrictUndefined
)
377 template
= env
.from_string(cloud_init_text
)
378 return template
.render(additional_params
or {})
379 except UndefinedError
as e
:
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
385 except (TemplateError
, TemplateNotFound
) as e
:
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
392 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
393 cloud_init_content
= cloud_init_file
= None
395 if vdu
.get("cloud-init-file"):
396 base_folder
= vnfd
["_admin"]["storage"]
397 cloud_init_file
= "{}/{}/cloud_init/{}".format(
398 base_folder
["folder"],
399 base_folder
["pkg-dir"],
400 vdu
["cloud-init-file"],
402 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
403 cloud_init_content
= ci_file
.read()
404 elif vdu
.get("cloud-init"):
405 cloud_init_content
= vdu
["cloud-init"]
407 return cloud_init_content
408 except FsException
as e
:
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd
["id"], vdu
["id"], cloud_init_file
, e
415 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
417 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
420 additional_params
= vdur
.get("additionalParams")
421 return parse_yaml_strings(additional_params
)
423 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
425 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
426 :param vnfd: input vnfd
427 :param new_id: overrides vnf id if provided
428 :param additionalParams: Instantiation params for VNFs provided
429 :param nsrId: Id of the NSR
430 :return: copy of vnfd
432 vnfd_RO
= deepcopy(vnfd
)
433 # remove unused by RO configuration, monitoring, scaling and internal keys
434 vnfd_RO
.pop("_id", None)
435 vnfd_RO
.pop("_admin", None)
436 vnfd_RO
.pop("monitoring-param", None)
437 vnfd_RO
.pop("scaling-group-descriptor", None)
438 vnfd_RO
.pop("kdu", None)
439 vnfd_RO
.pop("k8s-cluster", None)
441 vnfd_RO
["id"] = new_id
443 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
444 for vdu
in get_iterable(vnfd_RO
, "vdu"):
445 vdu
.pop("cloud-init-file", None)
446 vdu
.pop("cloud-init", None)
450 def ip_profile_2_RO(ip_profile
):
451 RO_ip_profile
= deepcopy(ip_profile
)
452 if "dns-server" in RO_ip_profile
:
453 if isinstance(RO_ip_profile
["dns-server"], list):
454 RO_ip_profile
["dns-address"] = []
455 for ds
in RO_ip_profile
.pop("dns-server"):
456 RO_ip_profile
["dns-address"].append(ds
["address"])
458 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
459 if RO_ip_profile
.get("ip-version") == "ipv4":
460 RO_ip_profile
["ip-version"] = "IPv4"
461 if RO_ip_profile
.get("ip-version") == "ipv6":
462 RO_ip_profile
["ip-version"] = "IPv6"
463 if "dhcp-params" in RO_ip_profile
:
464 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
467 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
468 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
469 if db_vim
["_admin"]["operationalState"] != "ENABLED":
471 "VIM={} is not available. operationalState={}".format(
472 vim_account
, db_vim
["_admin"]["operationalState"]
475 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
478 def get_ro_wim_id_for_wim_account(self
, wim_account
):
479 if isinstance(wim_account
, str):
480 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
481 if db_wim
["_admin"]["operationalState"] != "ENABLED":
483 "WIM={} is not available. operationalState={}".format(
484 wim_account
, db_wim
["_admin"]["operationalState"]
487 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
492 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
494 db_vdu_push_list
= []
496 db_update
= {"_admin.modified": time()}
498 for vdu_id
, vdu_count
in vdu_create
.items():
502 for vdur
in reversed(db_vnfr
["vdur"])
503 if vdur
["vdu-id-ref"] == vdu_id
508 # Read the template saved in the db:
509 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
510 vdur_template
= db_vnfr
.get("vdur-template")
511 if not vdur_template
:
513 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
517 vdur
= vdur_template
[0]
518 #Delete a template from the database after using it
519 self
.db
.set_one("vnfrs",
520 {"_id": db_vnfr
["_id"]},
522 pull
={"vdur-template": {"_id": vdur
['_id']}}
524 for count
in range(vdu_count
):
525 vdur_copy
= deepcopy(vdur
)
526 vdur_copy
["status"] = "BUILD"
527 vdur_copy
["status-detailed"] = None
528 vdur_copy
["ip-address"] = None
529 vdur_copy
["_id"] = str(uuid4())
530 vdur_copy
["count-index"] += count
+ 1
531 vdur_copy
["id"] = "{}-{}".format(
532 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
534 vdur_copy
.pop("vim_info", None)
535 for iface
in vdur_copy
["interfaces"]:
536 if iface
.get("fixed-ip"):
537 iface
["ip-address"] = self
.increment_ip_mac(
538 iface
["ip-address"], count
+ 1
541 iface
.pop("ip-address", None)
542 if iface
.get("fixed-mac"):
543 iface
["mac-address"] = self
.increment_ip_mac(
544 iface
["mac-address"], count
+ 1
547 iface
.pop("mac-address", None)
551 ) # only first vdu can be managment of vnf
552 db_vdu_push_list
.append(vdur_copy
)
553 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
555 if len(db_vnfr
["vdur"]) == 1:
556 # The scale will move to 0 instances
557 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
558 template_vdur
= [db_vnfr
["vdur"][0]]
559 for vdu_id
, vdu_count
in vdu_delete
.items():
561 indexes_to_delete
= [
563 for iv
in enumerate(db_vnfr
["vdur"])
564 if iv
[1]["vdu-id-ref"] == vdu_id
568 "vdur.{}.status".format(i
): "DELETING"
569 for i
in indexes_to_delete
[-vdu_count
:]
573 # it must be deleted one by one because common.db does not allow otherwise
576 for v
in reversed(db_vnfr
["vdur"])
577 if v
["vdu-id-ref"] == vdu_id
579 for vdu
in vdus_to_delete
[:vdu_count
]:
582 {"_id": db_vnfr
["_id"]},
584 pull
={"vdur": {"_id": vdu
["_id"]}},
588 db_push
["vdur"] = db_vdu_push_list
590 db_push
["vdur-template"] = template_vdur
593 db_vnfr
["vdur-template"] = template_vdur
594 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
595 # modify passed dictionary db_vnfr
596 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
597 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
599 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
601 Updates database nsr with the RO info for the created vld
602 :param ns_update_nsr: dictionary to be filled with the updated info
603 :param db_nsr: content of db_nsr. This is also modified
604 :param nsr_desc_RO: nsr descriptor from RO
605 :return: Nothing, LcmException is raised on errors
608 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
609 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
610 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
612 vld
["vim-id"] = net_RO
.get("vim_net_id")
613 vld
["name"] = net_RO
.get("vim_name")
614 vld
["status"] = net_RO
.get("status")
615 vld
["status-detailed"] = net_RO
.get("error_msg")
616 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
620 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
623 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
625 for db_vnfr
in db_vnfrs
.values():
626 vnfr_update
= {"status": "ERROR"}
627 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
628 if "status" not in vdur
:
629 vdur
["status"] = "ERROR"
630 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
632 vdur
["status-detailed"] = str(error_text
)
634 "vdur.{}.status-detailed".format(vdu_index
)
636 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
637 except DbException
as e
:
638 self
.logger
.error("Cannot update vnf. {}".format(e
))
640 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
642 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
643 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
647 for vnf_index
, db_vnfr
in db_vnfrs
.items():
648 for vnf_RO
in nsr_desc_RO
["vnfs"]:
649 if vnf_RO
["member_vnf_index"] != vnf_index
:
652 if vnf_RO
.get("ip_address"):
653 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
656 elif not db_vnfr
.get("ip-address"):
657 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
658 raise LcmExceptionNoMgmtIP(
659 "ns member_vnf_index '{}' has no IP address".format(
664 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
665 vdur_RO_count_index
= 0
666 if vdur
.get("pdu-type"):
668 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
669 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
671 if vdur
["count-index"] != vdur_RO_count_index
:
672 vdur_RO_count_index
+= 1
674 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
675 if vdur_RO
.get("ip_address"):
676 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
678 vdur
["ip-address"] = None
679 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
680 vdur
["name"] = vdur_RO
.get("vim_name")
681 vdur
["status"] = vdur_RO
.get("status")
682 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
683 for ifacer
in get_iterable(vdur
, "interfaces"):
684 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
685 if ifacer
["name"] == interface_RO
.get("internal_name"):
686 ifacer
["ip-address"] = interface_RO
.get(
689 ifacer
["mac-address"] = interface_RO
.get(
695 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
696 "from VIM info".format(
697 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
700 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
704 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
706 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
710 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
711 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
712 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
714 vld
["vim-id"] = net_RO
.get("vim_net_id")
715 vld
["name"] = net_RO
.get("vim_name")
716 vld
["status"] = net_RO
.get("status")
717 vld
["status-detailed"] = net_RO
.get("error_msg")
718 vnfr_update
["vld.{}".format(vld_index
)] = vld
722 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
727 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
732 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
737 def _get_ns_config_info(self
, nsr_id
):
739 Generates a mapping between vnf,vdu elements and the N2VC id
740 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
741 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
742 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
743 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
745 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
748 ns_config_info
= {"osm-config-mapping": mapping
}
749 for vca
in vca_deployed_list
:
750 if not vca
["member-vnf-index"]:
752 if not vca
["vdu_id"]:
753 mapping
[vca
["member-vnf-index"]] = vca
["application"]
757 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
759 ] = vca
["application"]
760 return ns_config_info
762 async def _instantiate_ng_ro(
779 def get_vim_account(vim_account_id
):
781 if vim_account_id
in db_vims
:
782 return db_vims
[vim_account_id
]
783 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
784 db_vims
[vim_account_id
] = db_vim
787 # modify target_vld info with instantiation parameters
788 def parse_vld_instantiation_params(
789 target_vim
, target_vld
, vld_params
, target_sdn
791 if vld_params
.get("ip-profile"):
792 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
795 if vld_params
.get("provider-network"):
796 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
799 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
800 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
803 if vld_params
.get("wimAccountId"):
804 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
805 target_vld
["vim_info"][target_wim
] = {}
806 for param
in ("vim-network-name", "vim-network-id"):
807 if vld_params
.get(param
):
808 if isinstance(vld_params
[param
], dict):
809 for vim
, vim_net
in vld_params
[param
].items():
810 other_target_vim
= "vim:" + vim
812 target_vld
["vim_info"],
813 (other_target_vim
, param
.replace("-", "_")),
816 else: # isinstance str
817 target_vld
["vim_info"][target_vim
][
818 param
.replace("-", "_")
819 ] = vld_params
[param
]
820 if vld_params
.get("common_id"):
821 target_vld
["common_id"] = vld_params
.get("common_id")
823 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
824 def update_ns_vld_target(target
, ns_params
):
825 for vnf_params
in ns_params
.get("vnf", ()):
826 if vnf_params
.get("vimAccountId"):
830 for vnfr
in db_vnfrs
.values()
831 if vnf_params
["member-vnf-index"]
832 == vnfr
["member-vnf-index-ref"]
836 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
837 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
838 target_vld
= find_in_list(
839 get_iterable(vdur
, "interfaces"),
840 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
843 if vnf_params
.get("vimAccountId") not in a_vld
.get(
846 target
["ns"]["vld"][a_index
].get("vim_info").update(
848 "vim:{}".format(vnf_params
["vimAccountId"]): {
849 "vim_network_name": ""
854 nslcmop_id
= db_nslcmop
["_id"]
856 "name": db_nsr
["name"],
859 "image": deepcopy(db_nsr
["image"]),
860 "flavor": deepcopy(db_nsr
["flavor"]),
861 "action_id": nslcmop_id
,
862 "cloud_init_content": {},
864 for image
in target
["image"]:
865 image
["vim_info"] = {}
866 for flavor
in target
["flavor"]:
867 flavor
["vim_info"] = {}
869 if db_nslcmop
.get("lcmOperationType") != "instantiate":
870 # get parameters of instantiation:
871 db_nslcmop_instantiate
= self
.db
.get_list(
874 "nsInstanceId": db_nslcmop
["nsInstanceId"],
875 "lcmOperationType": "instantiate",
878 ns_params
= db_nslcmop_instantiate
.get("operationParams")
880 ns_params
= db_nslcmop
.get("operationParams")
881 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
882 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
885 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
886 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
890 "mgmt-network": vld
.get("mgmt-network", False),
891 "type": vld
.get("type"),
894 "vim_network_name": vld
.get("vim-network-name"),
895 "vim_account_id": ns_params
["vimAccountId"],
899 # check if this network needs SDN assist
900 if vld
.get("pci-interfaces"):
901 db_vim
= get_vim_account(ns_params
["vimAccountId"])
902 sdnc_id
= db_vim
["config"].get("sdn-controller")
904 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
905 target_sdn
= "sdn:{}".format(sdnc_id
)
906 target_vld
["vim_info"][target_sdn
] = {
908 "target_vim": target_vim
,
910 "type": vld
.get("type"),
913 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
914 for nsd_vnf_profile
in nsd_vnf_profiles
:
915 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
916 if cp
["virtual-link-profile-id"] == vld
["id"]:
918 "member_vnf:{}.{}".format(
919 cp
["constituent-cpd-id"][0][
920 "constituent-base-element-id"
922 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
924 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
926 # check at nsd descriptor, if there is an ip-profile
928 nsd_vlp
= find_in_list(
929 get_virtual_link_profiles(nsd
),
930 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
935 and nsd_vlp
.get("virtual-link-protocol-data")
936 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
938 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
941 ip_profile_dest_data
= {}
942 if "ip-version" in ip_profile_source_data
:
943 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
946 if "cidr" in ip_profile_source_data
:
947 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
950 if "gateway-ip" in ip_profile_source_data
:
951 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
954 if "dhcp-enabled" in ip_profile_source_data
:
955 ip_profile_dest_data
["dhcp-params"] = {
956 "enabled": ip_profile_source_data
["dhcp-enabled"]
958 vld_params
["ip-profile"] = ip_profile_dest_data
960 # update vld_params with instantiation params
961 vld_instantiation_params
= find_in_list(
962 get_iterable(ns_params
, "vld"),
963 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
965 if vld_instantiation_params
:
966 vld_params
.update(vld_instantiation_params
)
967 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
968 target
["ns"]["vld"].append(target_vld
)
969 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
970 update_ns_vld_target(target
, ns_params
)
972 for vnfr
in db_vnfrs
.values():
974 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
976 vnf_params
= find_in_list(
977 get_iterable(ns_params
, "vnf"),
978 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
980 target_vnf
= deepcopy(vnfr
)
981 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
982 for vld
in target_vnf
.get("vld", ()):
983 # check if connected to a ns.vld, to fill target'
984 vnf_cp
= find_in_list(
985 vnfd
.get("int-virtual-link-desc", ()),
986 lambda cpd
: cpd
.get("id") == vld
["id"],
989 ns_cp
= "member_vnf:{}.{}".format(
990 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
992 if cp2target
.get(ns_cp
):
993 vld
["target"] = cp2target
[ns_cp
]
996 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
998 # check if this network needs SDN assist
1000 if vld
.get("pci-interfaces"):
1001 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1002 sdnc_id
= db_vim
["config"].get("sdn-controller")
1004 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1005 target_sdn
= "sdn:{}".format(sdnc_id
)
1006 vld
["vim_info"][target_sdn
] = {
1008 "target_vim": target_vim
,
1010 "type": vld
.get("type"),
1013 # check at vnfd descriptor, if there is an ip-profile
1015 vnfd_vlp
= find_in_list(
1016 get_virtual_link_profiles(vnfd
),
1017 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1021 and vnfd_vlp
.get("virtual-link-protocol-data")
1022 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1024 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1027 ip_profile_dest_data
= {}
1028 if "ip-version" in ip_profile_source_data
:
1029 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1032 if "cidr" in ip_profile_source_data
:
1033 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1036 if "gateway-ip" in ip_profile_source_data
:
1037 ip_profile_dest_data
[
1039 ] = ip_profile_source_data
["gateway-ip"]
1040 if "dhcp-enabled" in ip_profile_source_data
:
1041 ip_profile_dest_data
["dhcp-params"] = {
1042 "enabled": ip_profile_source_data
["dhcp-enabled"]
1045 vld_params
["ip-profile"] = ip_profile_dest_data
1046 # update vld_params with instantiation params
1048 vld_instantiation_params
= find_in_list(
1049 get_iterable(vnf_params
, "internal-vld"),
1050 lambda i_vld
: i_vld
["name"] == vld
["id"],
1052 if vld_instantiation_params
:
1053 vld_params
.update(vld_instantiation_params
)
1054 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1057 for vdur
in target_vnf
.get("vdur", ()):
1058 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1059 continue # This vdu must not be created
1060 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1062 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1065 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1066 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1069 and vdu_configuration
.get("config-access")
1070 and vdu_configuration
.get("config-access").get("ssh-access")
1072 vdur
["ssh-keys"] = ssh_keys_all
1073 vdur
["ssh-access-required"] = vdu_configuration
[
1075 ]["ssh-access"]["required"]
1078 and vnf_configuration
.get("config-access")
1079 and vnf_configuration
.get("config-access").get("ssh-access")
1080 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1082 vdur
["ssh-keys"] = ssh_keys_all
1083 vdur
["ssh-access-required"] = vnf_configuration
[
1085 ]["ssh-access"]["required"]
1086 elif ssh_keys_instantiation
and find_in_list(
1087 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1089 vdur
["ssh-keys"] = ssh_keys_instantiation
1091 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1093 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1095 if vdud
.get("cloud-init-file"):
1096 vdur
["cloud-init"] = "{}:file:{}".format(
1097 vnfd
["_id"], vdud
.get("cloud-init-file")
1099 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1100 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1101 base_folder
= vnfd
["_admin"]["storage"]
1102 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1103 base_folder
["folder"],
1104 base_folder
["pkg-dir"],
1105 vdud
.get("cloud-init-file"),
1107 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1108 target
["cloud_init_content"][
1111 elif vdud
.get("cloud-init"):
1112 vdur
["cloud-init"] = "{}:vdu:{}".format(
1113 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1115 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1116 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1119 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1120 deploy_params_vdu
= self
._format
_additional
_params
(
1121 vdur
.get("additionalParams") or {}
1123 deploy_params_vdu
["OSM"] = get_osm_params(
1124 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1126 vdur
["additionalParams"] = deploy_params_vdu
1129 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1130 if target_vim
not in ns_flavor
["vim_info"]:
1131 ns_flavor
["vim_info"][target_vim
] = {}
1134 # in case alternative images are provided we must check if they should be applied
1135 # for the vim_type, modify the vim_type taking into account
1136 ns_image_id
= int(vdur
["ns-image-id"])
1137 if vdur
.get("alt-image-ids"):
1138 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1139 vim_type
= db_vim
["vim_type"]
1140 for alt_image_id
in vdur
.get("alt-image-ids"):
1141 ns_alt_image
= target
["image"][int(alt_image_id
)]
1142 if vim_type
== ns_alt_image
.get("vim-type"):
1143 # must use alternative image
1145 "use alternative image id: {}".format(alt_image_id
)
1147 ns_image_id
= alt_image_id
1148 vdur
["ns-image-id"] = ns_image_id
1150 ns_image
= target
["image"][int(ns_image_id
)]
1151 if target_vim
not in ns_image
["vim_info"]:
1152 ns_image
["vim_info"][target_vim
] = {}
1154 vdur
["vim_info"] = {target_vim
: {}}
1155 # instantiation parameters
1157 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1158 # vdud["id"]), None)
1159 vdur_list
.append(vdur
)
1160 target_vnf
["vdur"] = vdur_list
1161 target
["vnf"].append(target_vnf
)
1163 desc
= await self
.RO
.deploy(nsr_id
, target
)
1164 self
.logger
.debug("RO return > {}".format(desc
))
1165 action_id
= desc
["action_id"]
1166 await self
._wait
_ng
_ro
(
1167 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1172 "_admin.deployed.RO.operational-status": "running",
1173 "detailed-status": " ".join(stage
),
1175 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1176 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1177 self
._write
_op
_status
(nslcmop_id
, stage
)
1179 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1183 async def _wait_ng_ro(
1192 detailed_status_old
= None
1194 start_time
= start_time
or time()
1195 while time() <= start_time
+ timeout
:
1196 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1197 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1198 if desc_status
["status"] == "FAILED":
1199 raise NgRoException(desc_status
["details"])
1200 elif desc_status
["status"] == "BUILD":
1202 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1203 elif desc_status
["status"] == "DONE":
1205 stage
[2] = "Deployed at VIM"
1208 assert False, "ROclient.check_ns_status returns unknown {}".format(
1209 desc_status
["status"]
1211 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1212 detailed_status_old
= stage
[2]
1213 db_nsr_update
["detailed-status"] = " ".join(stage
)
1214 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1215 self
._write
_op
_status
(nslcmop_id
, stage
)
1216 await asyncio
.sleep(15, loop
=self
.loop
)
1217 else: # timeout_ns_deploy
1218 raise NgRoException("Timeout waiting ns to deploy")
1220 async def _terminate_ng_ro(
1221 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1226 start_deploy
= time()
1233 "action_id": nslcmop_id
,
1235 desc
= await self
.RO
.deploy(nsr_id
, target
)
1236 action_id
= desc
["action_id"]
1237 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1238 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1241 + "ns terminate action at RO. action_id={}".format(action_id
)
1245 delete_timeout
= 20 * 60 # 20 minutes
1246 await self
._wait
_ng
_ro
(
1247 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1250 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1251 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1253 await self
.RO
.delete(nsr_id
)
1254 except Exception as e
:
1255 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1256 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1257 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1258 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1260 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1262 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1263 failed_detail
.append("delete conflict: {}".format(e
))
1266 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1269 failed_detail
.append("delete error: {}".format(e
))
1272 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1276 stage
[2] = "Error deleting from VIM"
1278 stage
[2] = "Deleted from VIM"
1279 db_nsr_update
["detailed-status"] = " ".join(stage
)
1280 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1281 self
._write
_op
_status
(nslcmop_id
, stage
)
1284 raise LcmException("; ".join(failed_detail
))
1287 async def instantiate_RO(
1301 :param logging_text: preffix text to use at logging
1302 :param nsr_id: nsr identity
1303 :param nsd: database content of ns descriptor
1304 :param db_nsr: database content of ns record
1305 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1307 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1308 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1309 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1310 :return: None or exception
1313 start_deploy
= time()
1314 ns_params
= db_nslcmop
.get("operationParams")
1315 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1316 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1318 timeout_ns_deploy
= self
.timeout
.get(
1319 "ns_deploy", self
.timeout_ns_deploy
1322 # Check for and optionally request placement optimization. Database will be updated if placement activated
1323 stage
[2] = "Waiting for Placement."
1324 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1325 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1326 for vnfr
in db_vnfrs
.values():
1327 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1330 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1332 return await self
._instantiate
_ng
_ro
(
1345 except Exception as e
:
1346 stage
[2] = "ERROR deploying at VIM"
1347 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1349 "Error deploying at VIM {}".format(e
),
1350 exc_info
=not isinstance(
1353 ROclient
.ROClientException
,
1362 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1364 Wait for kdu to be up, get ip address
1365 :param logging_text: prefix use for logging
1372 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1375 while nb_tries
< 360:
1376 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1380 for x
in get_iterable(db_vnfr
, "kdur")
1381 if x
.get("kdu-name") == kdu_name
1387 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1389 if kdur
.get("status"):
1390 if kdur
["status"] in ("READY", "ENABLED"):
1391 return kdur
.get("ip-address")
1394 "target KDU={} is in error state".format(kdu_name
)
1397 await asyncio
.sleep(10, loop
=self
.loop
)
1399 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1401 async def wait_vm_up_insert_key_ro(
1402 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1405 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1406 :param logging_text: prefix use for logging
1411 :param pub_key: public ssh key to inject, None to skip
1412 :param user: user to apply the public ssh key
1416 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1420 target_vdu_id
= None
1426 if ro_retries
>= 360: # 1 hour
1428 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1431 await asyncio
.sleep(10, loop
=self
.loop
)
1434 if not target_vdu_id
:
1435 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1437 if not vdu_id
: # for the VNF case
1438 if db_vnfr
.get("status") == "ERROR":
1440 "Cannot inject ssh-key because target VNF is in error state"
1442 ip_address
= db_vnfr
.get("ip-address")
1448 for x
in get_iterable(db_vnfr
, "vdur")
1449 if x
.get("ip-address") == ip_address
1457 for x
in get_iterable(db_vnfr
, "vdur")
1458 if x
.get("vdu-id-ref") == vdu_id
1459 and x
.get("count-index") == vdu_index
1465 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1466 ): # If only one, this should be the target vdu
1467 vdur
= db_vnfr
["vdur"][0]
1470 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1471 vnfr_id
, vdu_id
, vdu_index
1474 # New generation RO stores information at "vim_info"
1477 if vdur
.get("vim_info"):
1479 t
for t
in vdur
["vim_info"]
1480 ) # there should be only one key
1481 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1483 vdur
.get("pdu-type")
1484 or vdur
.get("status") == "ACTIVE"
1485 or ng_ro_status
== "ACTIVE"
1487 ip_address
= vdur
.get("ip-address")
1490 target_vdu_id
= vdur
["vdu-id-ref"]
1491 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1493 "Cannot inject ssh-key because target VM is in error state"
1496 if not target_vdu_id
:
1499 # inject public key into machine
1500 if pub_key
and user
:
1501 self
.logger
.debug(logging_text
+ "Inserting RO key")
1502 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1503 if vdur
.get("pdu-type"):
1504 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1507 ro_vm_id
= "{}-{}".format(
1508 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1509 ) # TODO add vdu_index
1513 "action": "inject_ssh_key",
1517 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1519 desc
= await self
.RO
.deploy(nsr_id
, target
)
1520 action_id
= desc
["action_id"]
1521 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1524 # wait until NS is deployed at RO
1526 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1527 ro_nsr_id
= deep_get(
1528 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1532 result_dict
= await self
.RO
.create_action(
1534 item_id_name
=ro_nsr_id
,
1536 "add_public_key": pub_key
,
1541 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1542 if not result_dict
or not isinstance(result_dict
, dict):
1544 "Unknown response from RO when injecting key"
1546 for result
in result_dict
.values():
1547 if result
.get("vim_result") == 200:
1550 raise ROclient
.ROClientException(
1551 "error injecting key: {}".format(
1552 result
.get("description")
1556 except NgRoException
as e
:
1558 "Reaching max tries injecting key. Error: {}".format(e
)
1560 except ROclient
.ROClientException
as e
:
1564 + "error injecting key: {}. Retrying until {} seconds".format(
1571 "Reaching max tries injecting key. Error: {}".format(e
)
1578 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1580 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1582 my_vca
= vca_deployed_list
[vca_index
]
1583 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1584 # vdu or kdu: no dependencies
1588 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1589 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1590 configuration_status_list
= db_nsr
["configurationStatus"]
1591 for index
, vca_deployed
in enumerate(configuration_status_list
):
1592 if index
== vca_index
:
1595 if not my_vca
.get("member-vnf-index") or (
1596 vca_deployed
.get("member-vnf-index")
1597 == my_vca
.get("member-vnf-index")
1599 internal_status
= configuration_status_list
[index
].get("status")
1600 if internal_status
== "READY":
1602 elif internal_status
== "BROKEN":
1604 "Configuration aborted because dependent charm/s has failed"
1609 # no dependencies, return
1611 await asyncio
.sleep(10)
1614 raise LcmException("Configuration aborted because dependent charm/s timeout")
1616 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1619 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1621 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1622 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1625 async def instantiate_N2VC(
1642 ee_config_descriptor
,
1644 nsr_id
= db_nsr
["_id"]
1645 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1646 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1647 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1648 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1650 "collection": "nsrs",
1651 "filter": {"_id": nsr_id
},
1652 "path": db_update_entry
,
1658 element_under_configuration
= nsr_id
1662 vnfr_id
= db_vnfr
["_id"]
1663 osm_config
["osm"]["vnf_id"] = vnfr_id
1665 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1667 if vca_type
== "native_charm":
1670 index_number
= vdu_index
or 0
1673 element_type
= "VNF"
1674 element_under_configuration
= vnfr_id
1675 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1677 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1678 element_type
= "VDU"
1679 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1680 osm_config
["osm"]["vdu_id"] = vdu_id
1682 namespace
+= ".{}".format(kdu_name
)
1683 element_type
= "KDU"
1684 element_under_configuration
= kdu_name
1685 osm_config
["osm"]["kdu_name"] = kdu_name
1688 artifact_path
= "{}/{}/{}/{}".format(
1689 base_folder
["folder"],
1690 base_folder
["pkg-dir"],
1692 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1697 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1699 # get initial_config_primitive_list that applies to this element
1700 initial_config_primitive_list
= config_descriptor
.get(
1701 "initial-config-primitive"
1705 "Initial config primitive list > {}".format(
1706 initial_config_primitive_list
1710 # add config if not present for NS charm
1711 ee_descriptor_id
= ee_config_descriptor
.get("id")
1712 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1713 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1714 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1718 "Initial config primitive list #2 > {}".format(
1719 initial_config_primitive_list
1722 # n2vc_redesign STEP 3.1
1723 # find old ee_id if exists
1724 ee_id
= vca_deployed
.get("ee_id")
1726 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1727 # create or register execution environment in VCA
1728 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1730 self
._write
_configuration
_status
(
1732 vca_index
=vca_index
,
1734 element_under_configuration
=element_under_configuration
,
1735 element_type
=element_type
,
1738 step
= "create execution environment"
1739 self
.logger
.debug(logging_text
+ step
)
1743 if vca_type
== "k8s_proxy_charm":
1744 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1745 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1746 namespace
=namespace
,
1747 artifact_path
=artifact_path
,
1751 elif vca_type
== "helm" or vca_type
== "helm-v3":
1752 ee_id
, credentials
= await self
.vca_map
[
1754 ].create_execution_environment(
1755 namespace
=namespace
,
1759 artifact_path
=artifact_path
,
1763 ee_id
, credentials
= await self
.vca_map
[
1765 ].create_execution_environment(
1766 namespace
=namespace
,
1772 elif vca_type
== "native_charm":
1773 step
= "Waiting to VM being up and getting IP address"
1774 self
.logger
.debug(logging_text
+ step
)
1775 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1784 credentials
= {"hostname": rw_mgmt_ip
}
1786 username
= deep_get(
1787 config_descriptor
, ("config-access", "ssh-access", "default-user")
1789 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1790 # merged. Meanwhile let's get username from initial-config-primitive
1791 if not username
and initial_config_primitive_list
:
1792 for config_primitive
in initial_config_primitive_list
:
1793 for param
in config_primitive
.get("parameter", ()):
1794 if param
["name"] == "ssh-username":
1795 username
= param
["value"]
1799 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1800 "'config-access.ssh-access.default-user'"
1802 credentials
["username"] = username
1803 # n2vc_redesign STEP 3.2
1805 self
._write
_configuration
_status
(
1807 vca_index
=vca_index
,
1808 status
="REGISTERING",
1809 element_under_configuration
=element_under_configuration
,
1810 element_type
=element_type
,
1813 step
= "register execution environment {}".format(credentials
)
1814 self
.logger
.debug(logging_text
+ step
)
1815 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1816 credentials
=credentials
,
1817 namespace
=namespace
,
1822 # for compatibility with MON/POL modules, the need model and application name at database
1823 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1824 ee_id_parts
= ee_id
.split(".")
1825 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1826 if len(ee_id_parts
) >= 2:
1827 model_name
= ee_id_parts
[0]
1828 application_name
= ee_id_parts
[1]
1829 db_nsr_update
[db_update_entry
+ "model"] = model_name
1830 db_nsr_update
[db_update_entry
+ "application"] = application_name
1832 # n2vc_redesign STEP 3.3
1833 step
= "Install configuration Software"
1835 self
._write
_configuration
_status
(
1837 vca_index
=vca_index
,
1838 status
="INSTALLING SW",
1839 element_under_configuration
=element_under_configuration
,
1840 element_type
=element_type
,
1841 other_update
=db_nsr_update
,
1844 # TODO check if already done
1845 self
.logger
.debug(logging_text
+ step
)
1847 if vca_type
== "native_charm":
1848 config_primitive
= next(
1849 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1852 if config_primitive
:
1853 config
= self
._map
_primitive
_params
(
1854 config_primitive
, {}, deploy_params
1857 if vca_type
== "lxc_proxy_charm":
1858 if element_type
== "NS":
1859 num_units
= db_nsr
.get("config-units") or 1
1860 elif element_type
== "VNF":
1861 num_units
= db_vnfr
.get("config-units") or 1
1862 elif element_type
== "VDU":
1863 for v
in db_vnfr
["vdur"]:
1864 if vdu_id
== v
["vdu-id-ref"]:
1865 num_units
= v
.get("config-units") or 1
1867 if vca_type
!= "k8s_proxy_charm":
1868 await self
.vca_map
[vca_type
].install_configuration_sw(
1870 artifact_path
=artifact_path
,
1873 num_units
=num_units
,
1878 # write in db flag of configuration_sw already installed
1880 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1883 # add relations for this VCA (wait for other peers related with this VCA)
1884 await self
._add
_vca
_relations
(
1885 logging_text
=logging_text
,
1887 vca_index
=vca_index
,
1892 # if SSH access is required, then get execution environment SSH public
1893 # if native charm we have waited already to VM be UP
1894 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1897 # self.logger.debug("get ssh key block")
1899 config_descriptor
, ("config-access", "ssh-access", "required")
1901 # self.logger.debug("ssh key needed")
1902 # Needed to inject a ssh key
1905 ("config-access", "ssh-access", "default-user"),
1907 step
= "Install configuration Software, getting public ssh key"
1908 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1909 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1912 step
= "Insert public key into VM user={} ssh_key={}".format(
1916 # self.logger.debug("no need to get ssh key")
1917 step
= "Waiting to VM being up and getting IP address"
1918 self
.logger
.debug(logging_text
+ step
)
1920 # n2vc_redesign STEP 5.1
1921 # wait for RO (ip-address) Insert pub_key into VM
1924 rw_mgmt_ip
= await self
.wait_kdu_up(
1925 logging_text
, nsr_id
, vnfr_id
, kdu_name
1928 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1938 rw_mgmt_ip
= None # This is for a NS configuration
1940 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1942 # store rw_mgmt_ip in deploy params for later replacement
1943 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1945 # n2vc_redesign STEP 6 Execute initial config primitive
1946 step
= "execute initial config primitive"
1948 # wait for dependent primitives execution (NS -> VNF -> VDU)
1949 if initial_config_primitive_list
:
1950 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1952 # stage, in function of element type: vdu, kdu, vnf or ns
1953 my_vca
= vca_deployed_list
[vca_index
]
1954 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1956 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1957 elif my_vca
.get("member-vnf-index"):
1959 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1962 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1964 self
._write
_configuration
_status
(
1965 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1968 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1970 check_if_terminated_needed
= True
1971 for initial_config_primitive
in initial_config_primitive_list
:
1972 # adding information on the vca_deployed if it is a NS execution environment
1973 if not vca_deployed
["member-vnf-index"]:
1974 deploy_params
["ns_config_info"] = json
.dumps(
1975 self
._get
_ns
_config
_info
(nsr_id
)
1977 # TODO check if already done
1978 primitive_params_
= self
._map
_primitive
_params
(
1979 initial_config_primitive
, {}, deploy_params
1982 step
= "execute primitive '{}' params '{}'".format(
1983 initial_config_primitive
["name"], primitive_params_
1985 self
.logger
.debug(logging_text
+ step
)
1986 await self
.vca_map
[vca_type
].exec_primitive(
1988 primitive_name
=initial_config_primitive
["name"],
1989 params_dict
=primitive_params_
,
1994 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1995 if check_if_terminated_needed
:
1996 if config_descriptor
.get("terminate-config-primitive"):
1998 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2000 check_if_terminated_needed
= False
2002 # TODO register in database that primitive is done
2004 # STEP 7 Configure metrics
2005 if vca_type
== "helm" or vca_type
== "helm-v3":
2006 prometheus_jobs
= await self
.add_prometheus_metrics(
2008 artifact_path
=artifact_path
,
2009 ee_config_descriptor
=ee_config_descriptor
,
2012 target_ip
=rw_mgmt_ip
,
2018 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2021 step
= "instantiated at VCA"
2022 self
.logger
.debug(logging_text
+ step
)
2024 self
._write
_configuration
_status
(
2025 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2028 except Exception as e
: # TODO not use Exception but N2VC exception
2029 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2031 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2034 "Exception while {} : {}".format(step
, e
), exc_info
=True
2036 self
._write
_configuration
_status
(
2037 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2039 raise LcmException("{} {}".format(step
, e
)) from e
2041 def _write_ns_status(
2045 current_operation
: str,
2046 current_operation_id
: str,
2047 error_description
: str = None,
2048 error_detail
: str = None,
2049 other_update
: dict = None,
2052 Update db_nsr fields.
2055 :param current_operation:
2056 :param current_operation_id:
2057 :param error_description:
2058 :param error_detail:
2059 :param other_update: Other required changes at database if provided, will be cleared
2063 db_dict
= other_update
or {}
2066 ] = current_operation_id
# for backward compatibility
2067 db_dict
["_admin.current-operation"] = current_operation_id
2068 db_dict
["_admin.operation-type"] = (
2069 current_operation
if current_operation
!= "IDLE" else None
2071 db_dict
["currentOperation"] = current_operation
2072 db_dict
["currentOperationID"] = current_operation_id
2073 db_dict
["errorDescription"] = error_description
2074 db_dict
["errorDetail"] = error_detail
2077 db_dict
["nsState"] = ns_state
2078 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2079 except DbException
as e
:
2080 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2082 def _write_op_status(
2086 error_message
: str = None,
2087 queuePosition
: int = 0,
2088 operation_state
: str = None,
2089 other_update
: dict = None,
2092 db_dict
= other_update
or {}
2093 db_dict
["queuePosition"] = queuePosition
2094 if isinstance(stage
, list):
2095 db_dict
["stage"] = stage
[0]
2096 db_dict
["detailed-status"] = " ".join(stage
)
2097 elif stage
is not None:
2098 db_dict
["stage"] = str(stage
)
2100 if error_message
is not None:
2101 db_dict
["errorMessage"] = error_message
2102 if operation_state
is not None:
2103 db_dict
["operationState"] = operation_state
2104 db_dict
["statusEnteredTime"] = time()
2105 self
.update_db_2("nslcmops", op_id
, db_dict
)
2106 except DbException
as e
:
2108 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2111 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2113 nsr_id
= db_nsr
["_id"]
2114 # configurationStatus
2115 config_status
= db_nsr
.get("configurationStatus")
2118 "configurationStatus.{}.status".format(index
): status
2119 for index
, v
in enumerate(config_status
)
2123 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2125 except DbException
as e
:
2127 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2130 def _write_configuration_status(
2135 element_under_configuration
: str = None,
2136 element_type
: str = None,
2137 other_update
: dict = None,
2140 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2141 # .format(vca_index, status))
2144 db_path
= "configurationStatus.{}.".format(vca_index
)
2145 db_dict
= other_update
or {}
2147 db_dict
[db_path
+ "status"] = status
2148 if element_under_configuration
:
2150 db_path
+ "elementUnderConfiguration"
2151 ] = element_under_configuration
2153 db_dict
[db_path
+ "elementType"] = element_type
2154 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2155 except DbException
as e
:
2157 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2158 status
, nsr_id
, vca_index
, e
2162 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2164 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2165 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2166 Database is used because the result can be obtained from a different LCM worker in case of HA.
2167 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2168 :param db_nslcmop: database content of nslcmop
2169 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2170 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2171 computed 'vim-account-id'
2174 nslcmop_id
= db_nslcmop
["_id"]
2175 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2176 if placement_engine
== "PLA":
2178 logging_text
+ "Invoke and wait for placement optimization"
2180 await self
.msg
.aiowrite(
2181 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2183 db_poll_interval
= 5
2184 wait
= db_poll_interval
* 10
2186 while not pla_result
and wait
>= 0:
2187 await asyncio
.sleep(db_poll_interval
)
2188 wait
-= db_poll_interval
2189 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2190 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2194 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2197 for pla_vnf
in pla_result
["vnf"]:
2198 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2199 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2204 {"_id": vnfr
["_id"]},
2205 {"vim-account-id": pla_vnf
["vimAccountId"]},
2208 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2211 def update_nsrs_with_pla_result(self
, params
):
2213 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2215 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2217 except Exception as e
:
2218 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2220 async def instantiate(self
, nsr_id
, nslcmop_id
):
2223 :param nsr_id: ns instance to deploy
2224 :param nslcmop_id: operation to run
2228 # Try to lock HA task here
2229 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2230 if not task_is_locked_by_me
:
2232 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2236 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2237 self
.logger
.debug(logging_text
+ "Enter")
2239 # get all needed from database
2241 # database nsrs record
2244 # database nslcmops record
2247 # update operation on nsrs
2249 # update operation on nslcmops
2250 db_nslcmop_update
= {}
2252 nslcmop_operation_state
= None
2253 db_vnfrs
= {} # vnf's info indexed by member-index
2255 tasks_dict_info
= {} # from task to info text
2259 "Stage 1/5: preparation of the environment.",
2260 "Waiting for previous operations to terminate.",
2263 # ^ stage, step, VIM progress
2265 # wait for any previous tasks in process
2266 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2268 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2269 stage
[1] = "Reading from database."
2270 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2271 db_nsr_update
["detailed-status"] = "creating"
2272 db_nsr_update
["operational-status"] = "init"
2273 self
._write
_ns
_status
(
2275 ns_state
="BUILDING",
2276 current_operation
="INSTANTIATING",
2277 current_operation_id
=nslcmop_id
,
2278 other_update
=db_nsr_update
,
2280 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2282 # read from db: operation
2283 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2284 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2285 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2286 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2287 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2289 ns_params
= db_nslcmop
.get("operationParams")
2290 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2291 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2293 timeout_ns_deploy
= self
.timeout
.get(
2294 "ns_deploy", self
.timeout_ns_deploy
2298 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2299 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2300 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2301 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2302 self
.fs
.sync(db_nsr
["nsd-id"])
2304 # nsr_name = db_nsr["name"] # TODO short-name??
2306 # read from db: vnf's of this ns
2307 stage
[1] = "Getting vnfrs from db."
2308 self
.logger
.debug(logging_text
+ stage
[1])
2309 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2311 # read from db: vnfd's for every vnf
2312 db_vnfds
= [] # every vnfd data
2314 # for each vnf in ns, read vnfd
2315 for vnfr
in db_vnfrs_list
:
2316 if vnfr
.get("kdur"):
2318 for kdur
in vnfr
["kdur"]:
2319 if kdur
.get("additionalParams"):
2320 kdur
["additionalParams"] = json
.loads(
2321 kdur
["additionalParams"]
2323 kdur_list
.append(kdur
)
2324 vnfr
["kdur"] = kdur_list
2326 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2327 vnfd_id
= vnfr
["vnfd-id"]
2328 vnfd_ref
= vnfr
["vnfd-ref"]
2329 self
.fs
.sync(vnfd_id
)
2331 # if we haven't this vnfd, read it from db
2332 if vnfd_id
not in db_vnfds
:
2334 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2337 self
.logger
.debug(logging_text
+ stage
[1])
2338 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2341 db_vnfds
.append(vnfd
)
2343 # Get or generates the _admin.deployed.VCA list
2344 vca_deployed_list
= None
2345 if db_nsr
["_admin"].get("deployed"):
2346 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2347 if vca_deployed_list
is None:
2348 vca_deployed_list
= []
2349 configuration_status_list
= []
2350 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2351 db_nsr_update
["configurationStatus"] = configuration_status_list
2352 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2353 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2354 elif isinstance(vca_deployed_list
, dict):
2355 # maintain backward compatibility. Change a dict to list at database
2356 vca_deployed_list
= list(vca_deployed_list
.values())
2357 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2358 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2361 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2363 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2364 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2366 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2367 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2368 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2370 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2373 # n2vc_redesign STEP 2 Deploy Network Scenario
2374 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2375 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2377 stage
[1] = "Deploying KDUs."
2378 # self.logger.debug(logging_text + "Before deploy_kdus")
2379 # Call to deploy_kdus in case exists the "vdu:kdu" param
2380 await self
.deploy_kdus(
2381 logging_text
=logging_text
,
2383 nslcmop_id
=nslcmop_id
,
2386 task_instantiation_info
=tasks_dict_info
,
2389 stage
[1] = "Getting VCA public key."
2390 # n2vc_redesign STEP 1 Get VCA public ssh-key
2391 # feature 1429. Add n2vc public key to needed VMs
2392 n2vc_key
= self
.n2vc
.get_public_key()
2393 n2vc_key_list
= [n2vc_key
]
2394 if self
.vca_config
.get("public_key"):
2395 n2vc_key_list
.append(self
.vca_config
["public_key"])
2397 stage
[1] = "Deploying NS at VIM."
2398 task_ro
= asyncio
.ensure_future(
2399 self
.instantiate_RO(
2400 logging_text
=logging_text
,
2404 db_nslcmop
=db_nslcmop
,
2407 n2vc_key_list
=n2vc_key_list
,
2411 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2412 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2414 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2415 stage
[1] = "Deploying Execution Environments."
2416 self
.logger
.debug(logging_text
+ stage
[1])
2418 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2419 for vnf_profile
in get_vnf_profiles(nsd
):
2420 vnfd_id
= vnf_profile
["vnfd-id"]
2421 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2422 member_vnf_index
= str(vnf_profile
["id"])
2423 db_vnfr
= db_vnfrs
[member_vnf_index
]
2424 base_folder
= vnfd
["_admin"]["storage"]
2430 # Get additional parameters
2431 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2432 if db_vnfr
.get("additionalParamsForVnf"):
2433 deploy_params
.update(
2434 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2437 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2438 if descriptor_config
:
2440 logging_text
=logging_text
2441 + "member_vnf_index={} ".format(member_vnf_index
),
2444 nslcmop_id
=nslcmop_id
,
2450 member_vnf_index
=member_vnf_index
,
2451 vdu_index
=vdu_index
,
2453 deploy_params
=deploy_params
,
2454 descriptor_config
=descriptor_config
,
2455 base_folder
=base_folder
,
2456 task_instantiation_info
=tasks_dict_info
,
2460 # Deploy charms for each VDU that supports one.
2461 for vdud
in get_vdu_list(vnfd
):
2463 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2464 vdur
= find_in_list(
2465 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2468 if vdur
.get("additionalParams"):
2469 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2471 deploy_params_vdu
= deploy_params
2472 deploy_params_vdu
["OSM"] = get_osm_params(
2473 db_vnfr
, vdu_id
, vdu_count_index
=0
2475 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2477 self
.logger
.debug("VDUD > {}".format(vdud
))
2479 "Descriptor config > {}".format(descriptor_config
)
2481 if descriptor_config
:
2484 for vdu_index
in range(vdud_count
):
2485 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2487 logging_text
=logging_text
2488 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2489 member_vnf_index
, vdu_id
, vdu_index
2493 nslcmop_id
=nslcmop_id
,
2499 member_vnf_index
=member_vnf_index
,
2500 vdu_index
=vdu_index
,
2502 deploy_params
=deploy_params_vdu
,
2503 descriptor_config
=descriptor_config
,
2504 base_folder
=base_folder
,
2505 task_instantiation_info
=tasks_dict_info
,
2508 for kdud
in get_kdu_list(vnfd
):
2509 kdu_name
= kdud
["name"]
2510 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2511 if descriptor_config
:
2516 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2518 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2519 if kdur
.get("additionalParams"):
2520 deploy_params_kdu
.update(
2521 parse_yaml_strings(kdur
["additionalParams"].copy())
2525 logging_text
=logging_text
,
2528 nslcmop_id
=nslcmop_id
,
2534 member_vnf_index
=member_vnf_index
,
2535 vdu_index
=vdu_index
,
2537 deploy_params
=deploy_params_kdu
,
2538 descriptor_config
=descriptor_config
,
2539 base_folder
=base_folder
,
2540 task_instantiation_info
=tasks_dict_info
,
2544 # Check if this NS has a charm configuration
2545 descriptor_config
= nsd
.get("ns-configuration")
2546 if descriptor_config
and descriptor_config
.get("juju"):
2549 member_vnf_index
= None
2555 # Get additional parameters
2556 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2557 if db_nsr
.get("additionalParamsForNs"):
2558 deploy_params
.update(
2559 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2561 base_folder
= nsd
["_admin"]["storage"]
2563 logging_text
=logging_text
,
2566 nslcmop_id
=nslcmop_id
,
2572 member_vnf_index
=member_vnf_index
,
2573 vdu_index
=vdu_index
,
2575 deploy_params
=deploy_params
,
2576 descriptor_config
=descriptor_config
,
2577 base_folder
=base_folder
,
2578 task_instantiation_info
=tasks_dict_info
,
2582 # rest of staff will be done at finally
2585 ROclient
.ROClientException
,
2591 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2594 except asyncio
.CancelledError
:
2596 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2598 exc
= "Operation was cancelled"
2599 except Exception as e
:
2600 exc
= traceback
.format_exc()
2601 self
.logger
.critical(
2602 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2607 error_list
.append(str(exc
))
2609 # wait for pending tasks
2611 stage
[1] = "Waiting for instantiate pending tasks."
2612 self
.logger
.debug(logging_text
+ stage
[1])
2613 error_list
+= await self
._wait
_for
_tasks
(
2621 stage
[1] = stage
[2] = ""
2622 except asyncio
.CancelledError
:
2623 error_list
.append("Cancelled")
2624 # TODO cancel all tasks
2625 except Exception as exc
:
2626 error_list
.append(str(exc
))
2628 # update operation-status
2629 db_nsr_update
["operational-status"] = "running"
2630 # let's begin with VCA 'configured' status (later we can change it)
2631 db_nsr_update
["config-status"] = "configured"
2632 for task
, task_name
in tasks_dict_info
.items():
2633 if not task
.done() or task
.cancelled() or task
.exception():
2634 if task_name
.startswith(self
.task_name_deploy_vca
):
2635 # A N2VC task is pending
2636 db_nsr_update
["config-status"] = "failed"
2638 # RO or KDU task is pending
2639 db_nsr_update
["operational-status"] = "failed"
2641 # update status at database
2643 error_detail
= ". ".join(error_list
)
2644 self
.logger
.error(logging_text
+ error_detail
)
2645 error_description_nslcmop
= "{} Detail: {}".format(
2646 stage
[0], error_detail
2648 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2649 nslcmop_id
, stage
[0]
2652 db_nsr_update
["detailed-status"] = (
2653 error_description_nsr
+ " Detail: " + error_detail
2655 db_nslcmop_update
["detailed-status"] = error_detail
2656 nslcmop_operation_state
= "FAILED"
2660 error_description_nsr
= error_description_nslcmop
= None
2662 db_nsr_update
["detailed-status"] = "Done"
2663 db_nslcmop_update
["detailed-status"] = "Done"
2664 nslcmop_operation_state
= "COMPLETED"
2667 self
._write
_ns
_status
(
2670 current_operation
="IDLE",
2671 current_operation_id
=None,
2672 error_description
=error_description_nsr
,
2673 error_detail
=error_detail
,
2674 other_update
=db_nsr_update
,
2676 self
._write
_op
_status
(
2679 error_message
=error_description_nslcmop
,
2680 operation_state
=nslcmop_operation_state
,
2681 other_update
=db_nslcmop_update
,
2684 if nslcmop_operation_state
:
2686 await self
.msg
.aiowrite(
2691 "nslcmop_id": nslcmop_id
,
2692 "operationState": nslcmop_operation_state
,
2696 except Exception as e
:
2698 logging_text
+ "kafka_write notification Exception {}".format(e
)
2701 self
.logger
.debug(logging_text
+ "Exit")
2702 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2704 async def _add_vca_relations(
2709 timeout
: int = 3600,
2710 vca_type
: str = None,
2715 # 1. find all relations for this VCA
2716 # 2. wait for other peers related
2720 vca_type
= vca_type
or "lxc_proxy_charm"
2722 # STEP 1: find all relations for this VCA
2725 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2726 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2729 my_vca
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))[vca_index
]
2731 # read all ns-configuration relations
2732 ns_relations
= list()
2733 db_ns_relations
= deep_get(nsd
, ("ns-configuration", "relation"))
2735 for r
in db_ns_relations
:
2736 # check if this VCA is in the relation
2737 if my_vca
.get("member-vnf-index") in (
2738 r
.get("entities")[0].get("id"),
2739 r
.get("entities")[1].get("id"),
2741 ns_relations
.append(r
)
2743 # read all vnf-configuration relations
2744 vnf_relations
= list()
2745 db_vnfd_list
= db_nsr
.get("vnfd-id")
2747 for vnfd
in db_vnfd_list
:
2748 db_vnf_relations
= None
2749 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2750 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2751 if db_vnf_configuration
:
2752 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2753 if db_vnf_relations
:
2754 for r
in db_vnf_relations
:
2755 # check if this VCA is in the relation
2756 if my_vca
.get("vdu_id") in (
2757 r
.get("entities")[0].get("id"),
2758 r
.get("entities")[1].get("id"),
2760 vnf_relations
.append(r
)
2762 # if no relations, terminate
2763 if not ns_relations
and not vnf_relations
:
2764 self
.logger
.debug(logging_text
+ " No relations")
2769 + " adding relations\n {}\n {}".format(
2770 ns_relations
, vnf_relations
2779 if now
- start
>= timeout
:
2780 self
.logger
.error(logging_text
+ " : timeout adding relations")
2783 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2784 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2786 # for each defined NS relation, find the VCA's related
2787 for r
in ns_relations
.copy():
2788 from_vca_ee_id
= None
2790 from_vca_endpoint
= None
2791 to_vca_endpoint
= None
2792 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2793 for vca
in vca_list
:
2794 if vca
.get("member-vnf-index") == r
.get("entities")[0].get(
2796 ) and vca
.get("config_sw_installed"):
2797 from_vca_ee_id
= vca
.get("ee_id")
2798 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2799 if vca
.get("member-vnf-index") == r
.get("entities")[1].get(
2801 ) and vca
.get("config_sw_installed"):
2802 to_vca_ee_id
= vca
.get("ee_id")
2803 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2804 if from_vca_ee_id
and to_vca_ee_id
:
2806 await self
.vca_map
[vca_type
].add_relation(
2807 ee_id_1
=from_vca_ee_id
,
2808 ee_id_2
=to_vca_ee_id
,
2809 endpoint_1
=from_vca_endpoint
,
2810 endpoint_2
=to_vca_endpoint
,
2813 # remove entry from relations list
2814 ns_relations
.remove(r
)
2816 # check failed peers
2818 vca_status_list
= db_nsr
.get("configurationStatus")
2820 for i
in range(len(vca_list
)):
2822 vca_status
= vca_status_list
[i
]
2823 if vca
.get("member-vnf-index") == r
.get("entities")[
2826 if vca_status
.get("status") == "BROKEN":
2827 # peer broken: remove relation from list
2828 ns_relations
.remove(r
)
2829 if vca
.get("member-vnf-index") == r
.get("entities")[
2832 if vca_status
.get("status") == "BROKEN":
2833 # peer broken: remove relation from list
2834 ns_relations
.remove(r
)
2839 # for each defined VNF relation, find the VCA's related
2840 for r
in vnf_relations
.copy():
2841 from_vca_ee_id
= None
2843 from_vca_endpoint
= None
2844 to_vca_endpoint
= None
2845 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2846 for vca
in vca_list
:
2847 key_to_check
= "vdu_id"
2848 if vca
.get("vdu_id") is None:
2849 key_to_check
= "vnfd_id"
2850 if vca
.get(key_to_check
) == r
.get("entities")[0].get(
2852 ) and vca
.get("config_sw_installed"):
2853 from_vca_ee_id
= vca
.get("ee_id")
2854 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2855 if vca
.get(key_to_check
) == r
.get("entities")[1].get(
2857 ) and vca
.get("config_sw_installed"):
2858 to_vca_ee_id
= vca
.get("ee_id")
2859 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2860 if from_vca_ee_id
and to_vca_ee_id
:
2862 await self
.vca_map
[vca_type
].add_relation(
2863 ee_id_1
=from_vca_ee_id
,
2864 ee_id_2
=to_vca_ee_id
,
2865 endpoint_1
=from_vca_endpoint
,
2866 endpoint_2
=to_vca_endpoint
,
2869 # remove entry from relations list
2870 vnf_relations
.remove(r
)
2872 # check failed peers
2874 vca_status_list
= db_nsr
.get("configurationStatus")
2876 for i
in range(len(vca_list
)):
2878 vca_status
= vca_status_list
[i
]
2879 if vca
.get("vdu_id") == r
.get("entities")[0].get(
2882 if vca_status
.get("status") == "BROKEN":
2883 # peer broken: remove relation from list
2884 vnf_relations
.remove(r
)
2885 if vca
.get("vdu_id") == r
.get("entities")[1].get(
2888 if vca_status
.get("status") == "BROKEN":
2889 # peer broken: remove relation from list
2890 vnf_relations
.remove(r
)
2896 await asyncio
.sleep(5.0)
2898 if not ns_relations
and not vnf_relations
:
2899 self
.logger
.debug("Relations added")
2904 except Exception as e
:
2905 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2908 async def _install_kdu(
2916 k8s_instance_info
: dict,
2917 k8params
: dict = None,
2923 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2926 "collection": "nsrs",
2927 "filter": {"_id": nsr_id
},
2928 "path": nsr_db_path
,
2931 if k8s_instance_info
.get("kdu-deployment-name"):
2932 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2934 kdu_instance
= self
.k8scluster_map
[
2936 ].generate_kdu_instance_name(
2937 db_dict
=db_dict_install
,
2938 kdu_model
=k8s_instance_info
["kdu-model"],
2939 kdu_name
=k8s_instance_info
["kdu-name"],
2942 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2944 await self
.k8scluster_map
[k8sclustertype
].install(
2945 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2946 kdu_model
=k8s_instance_info
["kdu-model"],
2949 db_dict
=db_dict_install
,
2951 kdu_name
=k8s_instance_info
["kdu-name"],
2952 namespace
=k8s_instance_info
["namespace"],
2953 kdu_instance
=kdu_instance
,
2957 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2960 # Obtain services to obtain management service ip
2961 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2962 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2963 kdu_instance
=kdu_instance
,
2964 namespace
=k8s_instance_info
["namespace"],
2967 # Obtain management service info (if exists)
2968 vnfr_update_dict
= {}
2969 kdu_config
= get_configuration(vnfd
, kdud
["name"])
2971 target_ee_list
= kdu_config
.get("execution-environment-list", [])
2976 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2979 for service
in kdud
.get("service", [])
2980 if service
.get("mgmt-service")
2982 for mgmt_service
in mgmt_services
:
2983 for service
in services
:
2984 if service
["name"].startswith(mgmt_service
["name"]):
2985 # Mgmt service found, Obtain service ip
2986 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2987 if isinstance(ip
, list) and len(ip
) == 1:
2991 "kdur.{}.ip-address".format(kdu_index
)
2994 # Check if must update also mgmt ip at the vnf
2995 service_external_cp
= mgmt_service
.get(
2996 "external-connection-point-ref"
2998 if service_external_cp
:
3000 deep_get(vnfd
, ("mgmt-interface", "cp"))
3001 == service_external_cp
3003 vnfr_update_dict
["ip-address"] = ip
3008 "external-connection-point-ref", ""
3010 == service_external_cp
,
3013 "kdur.{}.ip-address".format(kdu_index
)
3018 "Mgmt service name: {} not found".format(
3019 mgmt_service
["name"]
3023 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3024 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3026 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3029 and kdu_config
.get("initial-config-primitive")
3030 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3032 initial_config_primitive_list
= kdu_config
.get(
3033 "initial-config-primitive"
3035 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3037 for initial_config_primitive
in initial_config_primitive_list
:
3038 primitive_params_
= self
._map
_primitive
_params
(
3039 initial_config_primitive
, {}, {}
3042 await asyncio
.wait_for(
3043 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3044 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3045 kdu_instance
=kdu_instance
,
3046 primitive_name
=initial_config_primitive
["name"],
3047 params
=primitive_params_
,
3048 db_dict
=db_dict_install
,
3054 except Exception as e
:
3055 # Prepare update db with error and raise exception
3058 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3062 vnfr_data
.get("_id"),
3063 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3066 # ignore to keep original exception
3068 # reraise original error
3073 async def deploy_kdus(
3080 task_instantiation_info
,
3082 # Launch kdus if present in the descriptor
3084 k8scluster_id_2_uuic
= {
3085 "helm-chart-v3": {},
3090 async def _get_cluster_id(cluster_id
, cluster_type
):
3091 nonlocal k8scluster_id_2_uuic
3092 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3093 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3095 # check if K8scluster is creating and wait look if previous tasks in process
3096 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3097 "k8scluster", cluster_id
3100 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3101 task_name
, cluster_id
3103 self
.logger
.debug(logging_text
+ text
)
3104 await asyncio
.wait(task_dependency
, timeout
=3600)
3106 db_k8scluster
= self
.db
.get_one(
3107 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3109 if not db_k8scluster
:
3110 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3112 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3114 if cluster_type
== "helm-chart-v3":
3116 # backward compatibility for existing clusters that have not been initialized for helm v3
3117 k8s_credentials
= yaml
.safe_dump(
3118 db_k8scluster
.get("credentials")
3120 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3121 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3123 db_k8scluster_update
= {}
3124 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3125 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3126 db_k8scluster_update
[
3127 "_admin.helm-chart-v3.created"
3129 db_k8scluster_update
[
3130 "_admin.helm-chart-v3.operationalState"
3133 "k8sclusters", cluster_id
, db_k8scluster_update
3135 except Exception as e
:
3138 + "error initializing helm-v3 cluster: {}".format(str(e
))
3141 "K8s cluster '{}' has not been initialized for '{}'".format(
3142 cluster_id
, cluster_type
3147 "K8s cluster '{}' has not been initialized for '{}'".format(
3148 cluster_id
, cluster_type
3151 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3154 logging_text
+= "Deploy kdus: "
3157 db_nsr_update
= {"_admin.deployed.K8s": []}
3158 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3161 updated_cluster_list
= []
3162 updated_v3_cluster_list
= []
3164 for vnfr_data
in db_vnfrs
.values():
3165 vca_id
= self
.get_vca_id(vnfr_data
, {})
3166 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3167 # Step 0: Prepare and set parameters
3168 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3169 vnfd_id
= vnfr_data
.get("vnfd-id")
3170 vnfd_with_id
= find_in_list(
3171 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3175 for kdud
in vnfd_with_id
["kdu"]
3176 if kdud
["name"] == kdur
["kdu-name"]
3178 namespace
= kdur
.get("k8s-namespace")
3179 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3180 if kdur
.get("helm-chart"):
3181 kdumodel
= kdur
["helm-chart"]
3182 # Default version: helm3, if helm-version is v2 assign v2
3183 k8sclustertype
= "helm-chart-v3"
3184 self
.logger
.debug("kdur: {}".format(kdur
))
3186 kdur
.get("helm-version")
3187 and kdur
.get("helm-version") == "v2"
3189 k8sclustertype
= "helm-chart"
3190 elif kdur
.get("juju-bundle"):
3191 kdumodel
= kdur
["juju-bundle"]
3192 k8sclustertype
= "juju-bundle"
3195 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3196 "juju-bundle. Maybe an old NBI version is running".format(
3197 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3200 # check if kdumodel is a file and exists
3202 vnfd_with_id
= find_in_list(
3203 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3205 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3206 if storage
and storage
.get(
3208 ): # may be not present if vnfd has not artifacts
3209 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3210 filename
= "{}/{}/{}s/{}".format(
3216 if self
.fs
.file_exists(
3217 filename
, mode
="file"
3218 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3219 kdumodel
= self
.fs
.path
+ filename
3220 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3222 except Exception: # it is not a file
3225 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3226 step
= "Synchronize repos for k8s cluster '{}'".format(
3229 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3233 k8sclustertype
== "helm-chart"
3234 and cluster_uuid
not in updated_cluster_list
3236 k8sclustertype
== "helm-chart-v3"
3237 and cluster_uuid
not in updated_v3_cluster_list
3239 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3240 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3241 cluster_uuid
=cluster_uuid
3244 if del_repo_list
or added_repo_dict
:
3245 if k8sclustertype
== "helm-chart":
3247 "_admin.helm_charts_added." + item
: None
3248 for item
in del_repo_list
3251 "_admin.helm_charts_added." + item
: name
3252 for item
, name
in added_repo_dict
.items()
3254 updated_cluster_list
.append(cluster_uuid
)
3255 elif k8sclustertype
== "helm-chart-v3":
3257 "_admin.helm_charts_v3_added." + item
: None
3258 for item
in del_repo_list
3261 "_admin.helm_charts_v3_added." + item
: name
3262 for item
, name
in added_repo_dict
.items()
3264 updated_v3_cluster_list
.append(cluster_uuid
)
3266 logging_text
+ "repos synchronized on k8s cluster "
3267 "'{}' to_delete: {}, to_add: {}".format(
3268 k8s_cluster_id
, del_repo_list
, added_repo_dict
3273 {"_id": k8s_cluster_id
},
3279 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3280 vnfr_data
["member-vnf-index-ref"],
3284 k8s_instance_info
= {
3285 "kdu-instance": None,
3286 "k8scluster-uuid": cluster_uuid
,
3287 "k8scluster-type": k8sclustertype
,
3288 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3289 "kdu-name": kdur
["kdu-name"],
3290 "kdu-model": kdumodel
,
3291 "namespace": namespace
,
3292 "kdu-deployment-name": kdu_deployment_name
,
3294 db_path
= "_admin.deployed.K8s.{}".format(index
)
3295 db_nsr_update
[db_path
] = k8s_instance_info
3296 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3297 vnfd_with_id
= find_in_list(
3298 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3300 task
= asyncio
.ensure_future(
3309 k8params
=desc_params
,
3314 self
.lcm_tasks
.register(
3318 "instantiate_KDU-{}".format(index
),
3321 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3327 except (LcmException
, asyncio
.CancelledError
):
3329 except Exception as e
:
3330 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3331 if isinstance(e
, (N2VCException
, DbException
)):
3332 self
.logger
.error(logging_text
+ msg
)
3334 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3335 raise LcmException(msg
)
3338 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3357 task_instantiation_info
,
3360 # launch instantiate_N2VC in a asyncio task and register task object
3361 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3362 # if not found, create one entry and update database
3363 # fill db_nsr._admin.deployed.VCA.<index>
3366 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3368 if "execution-environment-list" in descriptor_config
:
3369 ee_list
= descriptor_config
.get("execution-environment-list", [])
3370 elif "juju" in descriptor_config
:
3371 ee_list
= [descriptor_config
] # ns charms
3372 else: # other types as script are not supported
3375 for ee_item
in ee_list
:
3378 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3379 ee_item
.get("juju"), ee_item
.get("helm-chart")
3382 ee_descriptor_id
= ee_item
.get("id")
3383 if ee_item
.get("juju"):
3384 vca_name
= ee_item
["juju"].get("charm")
3387 if ee_item
["juju"].get("charm") is not None
3390 if ee_item
["juju"].get("cloud") == "k8s":
3391 vca_type
= "k8s_proxy_charm"
3392 elif ee_item
["juju"].get("proxy") is False:
3393 vca_type
= "native_charm"
3394 elif ee_item
.get("helm-chart"):
3395 vca_name
= ee_item
["helm-chart"]
3396 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3399 vca_type
= "helm-v3"
3402 logging_text
+ "skipping non juju neither charm configuration"
3407 for vca_index
, vca_deployed
in enumerate(
3408 db_nsr
["_admin"]["deployed"]["VCA"]
3410 if not vca_deployed
:
3413 vca_deployed
.get("member-vnf-index") == member_vnf_index
3414 and vca_deployed
.get("vdu_id") == vdu_id
3415 and vca_deployed
.get("kdu_name") == kdu_name
3416 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3417 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3421 # not found, create one.
3423 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3426 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3428 target
+= "/kdu/{}".format(kdu_name
)
3430 "target_element": target
,
3431 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3432 "member-vnf-index": member_vnf_index
,
3434 "kdu_name": kdu_name
,
3435 "vdu_count_index": vdu_index
,
3436 "operational-status": "init", # TODO revise
3437 "detailed-status": "", # TODO revise
3438 "step": "initial-deploy", # TODO revise
3440 "vdu_name": vdu_name
,
3442 "ee_descriptor_id": ee_descriptor_id
,
3446 # create VCA and configurationStatus in db
3448 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3449 "configurationStatus.{}".format(vca_index
): dict(),
3451 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3453 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3455 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3456 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3457 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3460 task_n2vc
= asyncio
.ensure_future(
3461 self
.instantiate_N2VC(
3462 logging_text
=logging_text
,
3463 vca_index
=vca_index
,
3469 vdu_index
=vdu_index
,
3470 deploy_params
=deploy_params
,
3471 config_descriptor
=descriptor_config
,
3472 base_folder
=base_folder
,
3473 nslcmop_id
=nslcmop_id
,
3477 ee_config_descriptor
=ee_item
,
3480 self
.lcm_tasks
.register(
3484 "instantiate_N2VC-{}".format(vca_index
),
3487 task_instantiation_info
[
3489 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3490 member_vnf_index
or "", vdu_id
or ""
3494 def _create_nslcmop(nsr_id
, operation
, params
):
3496 Creates a ns-lcm-opp content to be stored at database.
3497 :param nsr_id: internal id of the instance
3498 :param operation: instantiate, terminate, scale, action, ...
3499 :param params: user parameters for the operation
3500 :return: dictionary following SOL005 format
3502 # Raise exception if invalid arguments
3503 if not (nsr_id
and operation
and params
):
3505 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3512 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3513 "operationState": "PROCESSING",
3514 "statusEnteredTime": now
,
3515 "nsInstanceId": nsr_id
,
3516 "lcmOperationType": operation
,
3518 "isAutomaticInvocation": False,
3519 "operationParams": params
,
3520 "isCancelPending": False,
3522 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3523 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3528 def _format_additional_params(self
, params
):
3529 params
= params
or {}
3530 for key
, value
in params
.items():
3531 if str(value
).startswith("!!yaml "):
3532 params
[key
] = yaml
.safe_load(value
[7:])
3535 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3536 primitive
= seq
.get("name")
3537 primitive_params
= {}
3539 "member_vnf_index": vnf_index
,
3540 "primitive": primitive
,
3541 "primitive_params": primitive_params
,
3544 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3548 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3549 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3550 if op
.get("operationState") == "COMPLETED":
3551 # b. Skip sub-operation
3552 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3553 return self
.SUBOPERATION_STATUS_SKIP
3555 # c. retry executing sub-operation
3556 # The sub-operation exists, and operationState != 'COMPLETED'
3557 # Update operationState = 'PROCESSING' to indicate a retry.
3558 operationState
= "PROCESSING"
3559 detailed_status
= "In progress"
3560 self
._update
_suboperation
_status
(
3561 db_nslcmop
, op_index
, operationState
, detailed_status
3563 # Return the sub-operation index
3564 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3565 # with arguments extracted from the sub-operation
3568 # Find a sub-operation where all keys in a matching dictionary must match
3569 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3570 def _find_suboperation(self
, db_nslcmop
, match
):
3571 if db_nslcmop
and match
:
3572 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3573 for i
, op
in enumerate(op_list
):
3574 if all(op
.get(k
) == match
[k
] for k
in match
):
3576 return self
.SUBOPERATION_STATUS_NOT_FOUND
3578 # Update status for a sub-operation given its index
3579 def _update_suboperation_status(
3580 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3582 # Update DB for HA tasks
3583 q_filter
= {"_id": db_nslcmop
["_id"]}
3585 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3586 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3589 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3592 # Add sub-operation, return the index of the added sub-operation
3593 # Optionally, set operationState, detailed-status, and operationType
3594 # Status and type are currently set for 'scale' sub-operations:
3595 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3596 # 'detailed-status' : status message
3597 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3598 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3599 def _add_suboperation(
3607 mapped_primitive_params
,
3608 operationState
=None,
3609 detailed_status
=None,
3612 RO_scaling_info
=None,
3615 return self
.SUBOPERATION_STATUS_NOT_FOUND
3616 # Get the "_admin.operations" list, if it exists
3617 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3618 op_list
= db_nslcmop_admin
.get("operations")
3619 # Create or append to the "_admin.operations" list
3621 "member_vnf_index": vnf_index
,
3623 "vdu_count_index": vdu_count_index
,
3624 "primitive": primitive
,
3625 "primitive_params": mapped_primitive_params
,
3628 new_op
["operationState"] = operationState
3630 new_op
["detailed-status"] = detailed_status
3632 new_op
["lcmOperationType"] = operationType
3634 new_op
["RO_nsr_id"] = RO_nsr_id
3636 new_op
["RO_scaling_info"] = RO_scaling_info
3638 # No existing operations, create key 'operations' with current operation as first list element
3639 db_nslcmop_admin
.update({"operations": [new_op
]})
3640 op_list
= db_nslcmop_admin
.get("operations")
3642 # Existing operations, append operation to list
3643 op_list
.append(new_op
)
3645 db_nslcmop_update
= {"_admin.operations": op_list
}
3646 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3647 op_index
= len(op_list
) - 1
3650 # Helper methods for scale() sub-operations
3652 # pre-scale/post-scale:
3653 # Check for 3 different cases:
3654 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3655 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3656 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3657 def _check_or_add_scale_suboperation(
3661 vnf_config_primitive
,
3665 RO_scaling_info
=None,
3667 # Find this sub-operation
3668 if RO_nsr_id
and RO_scaling_info
:
3669 operationType
= "SCALE-RO"
3671 "member_vnf_index": vnf_index
,
3672 "RO_nsr_id": RO_nsr_id
,
3673 "RO_scaling_info": RO_scaling_info
,
3677 "member_vnf_index": vnf_index
,
3678 "primitive": vnf_config_primitive
,
3679 "primitive_params": primitive_params
,
3680 "lcmOperationType": operationType
,
3682 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3683 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3684 # a. New sub-operation
3685 # The sub-operation does not exist, add it.
3686 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3687 # The following parameters are set to None for all kind of scaling:
3689 vdu_count_index
= None
3691 if RO_nsr_id
and RO_scaling_info
:
3692 vnf_config_primitive
= None
3693 primitive_params
= None
3696 RO_scaling_info
= None
3697 # Initial status for sub-operation
3698 operationState
= "PROCESSING"
3699 detailed_status
= "In progress"
3700 # Add sub-operation for pre/post-scaling (zero or more operations)
3701 self
._add
_suboperation
(
3707 vnf_config_primitive
,
3715 return self
.SUBOPERATION_STATUS_NEW
3717 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3718 # or op_index (operationState != 'COMPLETED')
3719 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3721 # Function to return execution_environment id
3723 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3724 # TODO vdu_index_count
3725 for vca
in vca_deployed_list
:
3726 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3729 async def destroy_N2VC(
3737 exec_primitives
=True,
3742 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3743 :param logging_text:
3745 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3746 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3747 :param vca_index: index in the database _admin.deployed.VCA
3748 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3749 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3750 not executed properly
3751 :param scaling_in: True destroys the application, False destroys the model
3752 :return: None or exception
3757 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3758 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3762 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3764 # execute terminate_primitives
3766 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3767 config_descriptor
.get("terminate-config-primitive"),
3768 vca_deployed
.get("ee_descriptor_id"),
3770 vdu_id
= vca_deployed
.get("vdu_id")
3771 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3772 vdu_name
= vca_deployed
.get("vdu_name")
3773 vnf_index
= vca_deployed
.get("member-vnf-index")
3774 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3775 for seq
in terminate_primitives
:
3776 # For each sequence in list, get primitive and call _ns_execute_primitive()
3777 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3778 vnf_index
, seq
.get("name")
3780 self
.logger
.debug(logging_text
+ step
)
3781 # Create the primitive for each sequence, i.e. "primitive": "touch"
3782 primitive
= seq
.get("name")
3783 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3788 self
._add
_suboperation
(
3795 mapped_primitive_params
,
3797 # Sub-operations: Call _ns_execute_primitive() instead of action()
3799 result
, result_detail
= await self
._ns
_execute
_primitive
(
3800 vca_deployed
["ee_id"],
3802 mapped_primitive_params
,
3806 except LcmException
:
3807 # this happens when VCA is not deployed. In this case it is not needed to terminate
3809 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3810 if result
not in result_ok
:
3812 "terminate_primitive {} for vnf_member_index={} fails with "
3813 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3815 # set that this VCA do not need terminated
3816 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3820 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3823 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3824 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3827 await self
.vca_map
[vca_type
].delete_execution_environment(
3828 vca_deployed
["ee_id"],
3829 scaling_in
=scaling_in
,
3834 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3835 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3836 namespace
= "." + db_nsr
["_id"]
3838 await self
.n2vc
.delete_namespace(
3839 namespace
=namespace
,
3840 total_timeout
=self
.timeout_charm_delete
,
3843 except N2VCNotFound
: # already deleted. Skip
3845 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3847 async def _terminate_RO(
3848 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3851 Terminates a deployment from RO
3852 :param logging_text:
3853 :param nsr_deployed: db_nsr._admin.deployed
3856 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3857 this method will update only the index 2, but it will write on database the concatenated content of the list
3862 ro_nsr_id
= ro_delete_action
= None
3863 if nsr_deployed
and nsr_deployed
.get("RO"):
3864 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3865 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3868 stage
[2] = "Deleting ns from VIM."
3869 db_nsr_update
["detailed-status"] = " ".join(stage
)
3870 self
._write
_op
_status
(nslcmop_id
, stage
)
3871 self
.logger
.debug(logging_text
+ stage
[2])
3872 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3873 self
._write
_op
_status
(nslcmop_id
, stage
)
3874 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3875 ro_delete_action
= desc
["action_id"]
3877 "_admin.deployed.RO.nsr_delete_action_id"
3878 ] = ro_delete_action
3879 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3880 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3881 if ro_delete_action
:
3882 # wait until NS is deleted from VIM
3883 stage
[2] = "Waiting ns deleted from VIM."
3884 detailed_status_old
= None
3888 + " RO_id={} ro_delete_action={}".format(
3889 ro_nsr_id
, ro_delete_action
3892 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3893 self
._write
_op
_status
(nslcmop_id
, stage
)
3895 delete_timeout
= 20 * 60 # 20 minutes
3896 while delete_timeout
> 0:
3897 desc
= await self
.RO
.show(
3899 item_id_name
=ro_nsr_id
,
3900 extra_item
="action",
3901 extra_item_id
=ro_delete_action
,
3905 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3907 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3908 if ns_status
== "ERROR":
3909 raise ROclient
.ROClientException(ns_status_info
)
3910 elif ns_status
== "BUILD":
3911 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3912 elif ns_status
== "ACTIVE":
3913 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3914 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3919 ), "ROclient.check_action_status returns unknown {}".format(
3922 if stage
[2] != detailed_status_old
:
3923 detailed_status_old
= stage
[2]
3924 db_nsr_update
["detailed-status"] = " ".join(stage
)
3925 self
._write
_op
_status
(nslcmop_id
, stage
)
3926 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3927 await asyncio
.sleep(5, loop
=self
.loop
)
3929 else: # delete_timeout <= 0:
3930 raise ROclient
.ROClientException(
3931 "Timeout waiting ns deleted from VIM"
3934 except Exception as e
:
3935 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3937 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3939 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3940 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3941 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3943 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
3946 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3948 failed_detail
.append("delete conflict: {}".format(e
))
3951 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
3954 failed_detail
.append("delete error: {}".format(e
))
3956 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
3960 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3961 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3963 stage
[2] = "Deleting nsd from RO."
3964 db_nsr_update
["detailed-status"] = " ".join(stage
)
3965 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3966 self
._write
_op
_status
(nslcmop_id
, stage
)
3967 await self
.RO
.delete("nsd", ro_nsd_id
)
3969 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
3971 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3972 except Exception as e
:
3974 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3976 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3978 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
3981 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3983 failed_detail
.append(
3984 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
3986 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3988 failed_detail
.append(
3989 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
3991 self
.logger
.error(logging_text
+ failed_detail
[-1])
3993 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3994 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3995 if not vnf_deployed
or not vnf_deployed
["id"]:
3998 ro_vnfd_id
= vnf_deployed
["id"]
4001 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4002 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4004 db_nsr_update
["detailed-status"] = " ".join(stage
)
4005 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4006 self
._write
_op
_status
(nslcmop_id
, stage
)
4007 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4009 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4011 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4012 except Exception as e
:
4014 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4017 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4021 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4024 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4026 failed_detail
.append(
4027 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4029 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4031 failed_detail
.append(
4032 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4034 self
.logger
.error(logging_text
+ failed_detail
[-1])
4037 stage
[2] = "Error deleting from VIM"
4039 stage
[2] = "Deleted from VIM"
4040 db_nsr_update
["detailed-status"] = " ".join(stage
)
4041 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4042 self
._write
_op
_status
(nslcmop_id
, stage
)
4045 raise LcmException("; ".join(failed_detail
))
4047 async def terminate(self
, nsr_id
, nslcmop_id
):
4048 # Try to lock HA task here
4049 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4050 if not task_is_locked_by_me
:
4053 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4054 self
.logger
.debug(logging_text
+ "Enter")
4055 timeout_ns_terminate
= self
.timeout_ns_terminate
4058 operation_params
= None
4060 error_list
= [] # annotates all failed error messages
4061 db_nslcmop_update
= {}
4062 autoremove
= False # autoremove after terminated
4063 tasks_dict_info
= {}
4066 "Stage 1/3: Preparing task.",
4067 "Waiting for previous operations to terminate.",
4070 # ^ contains [stage, step, VIM-status]
4072 # wait for any previous tasks in process
4073 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4075 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4076 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4077 operation_params
= db_nslcmop
.get("operationParams") or {}
4078 if operation_params
.get("timeout_ns_terminate"):
4079 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4080 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4081 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4083 db_nsr_update
["operational-status"] = "terminating"
4084 db_nsr_update
["config-status"] = "terminating"
4085 self
._write
_ns
_status
(
4087 ns_state
="TERMINATING",
4088 current_operation
="TERMINATING",
4089 current_operation_id
=nslcmop_id
,
4090 other_update
=db_nsr_update
,
4092 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4093 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4094 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4097 stage
[1] = "Getting vnf descriptors from db."
4098 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4100 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4102 db_vnfds_from_id
= {}
4103 db_vnfds_from_member_index
= {}
4105 for vnfr
in db_vnfrs_list
:
4106 vnfd_id
= vnfr
["vnfd-id"]
4107 if vnfd_id
not in db_vnfds_from_id
:
4108 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4109 db_vnfds_from_id
[vnfd_id
] = vnfd
4110 db_vnfds_from_member_index
[
4111 vnfr
["member-vnf-index-ref"]
4112 ] = db_vnfds_from_id
[vnfd_id
]
4114 # Destroy individual execution environments when there are terminating primitives.
4115 # Rest of EE will be deleted at once
4116 # TODO - check before calling _destroy_N2VC
4117 # if not operation_params.get("skip_terminate_primitives"):#
4118 # or not vca.get("needed_terminate"):
4119 stage
[0] = "Stage 2/3 execute terminating primitives."
4120 self
.logger
.debug(logging_text
+ stage
[0])
4121 stage
[1] = "Looking execution environment that needs terminate."
4122 self
.logger
.debug(logging_text
+ stage
[1])
4124 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4125 config_descriptor
= None
4126 vca_member_vnf_index
= vca
.get("member-vnf-index")
4127 vca_id
= self
.get_vca_id(
4128 db_vnfrs_dict
.get(vca_member_vnf_index
)
4129 if vca_member_vnf_index
4133 if not vca
or not vca
.get("ee_id"):
4135 if not vca
.get("member-vnf-index"):
4137 config_descriptor
= db_nsr
.get("ns-configuration")
4138 elif vca
.get("vdu_id"):
4139 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4140 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4141 elif vca
.get("kdu_name"):
4142 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4143 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4145 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4146 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4147 vca_type
= vca
.get("type")
4148 exec_terminate_primitives
= not operation_params
.get(
4149 "skip_terminate_primitives"
4150 ) and vca
.get("needed_terminate")
4151 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4152 # pending native charms
4154 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4156 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4157 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4158 task
= asyncio
.ensure_future(
4166 exec_terminate_primitives
,
4170 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4172 # wait for pending tasks of terminate primitives
4176 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4178 error_list
= await self
._wait
_for
_tasks
(
4181 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4185 tasks_dict_info
.clear()
4187 return # raise LcmException("; ".join(error_list))
4189 # remove All execution environments at once
4190 stage
[0] = "Stage 3/3 delete all."
4192 if nsr_deployed
.get("VCA"):
4193 stage
[1] = "Deleting all execution environments."
4194 self
.logger
.debug(logging_text
+ stage
[1])
4195 vca_id
= self
.get_vca_id({}, db_nsr
)
4196 task_delete_ee
= asyncio
.ensure_future(
4198 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4199 timeout
=self
.timeout_charm_delete
,
4202 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4203 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4205 # Delete from k8scluster
4206 stage
[1] = "Deleting KDUs."
4207 self
.logger
.debug(logging_text
+ stage
[1])
4208 # print(nsr_deployed)
4209 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4210 if not kdu
or not kdu
.get("kdu-instance"):
4212 kdu_instance
= kdu
.get("kdu-instance")
4213 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4214 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4215 vca_id
= self
.get_vca_id({}, db_nsr
)
4216 task_delete_kdu_instance
= asyncio
.ensure_future(
4217 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4218 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4219 kdu_instance
=kdu_instance
,
4226 + "Unknown k8s deployment type {}".format(
4227 kdu
.get("k8scluster-type")
4232 task_delete_kdu_instance
4233 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4236 stage
[1] = "Deleting ns from VIM."
4238 task_delete_ro
= asyncio
.ensure_future(
4239 self
._terminate
_ng
_ro
(
4240 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4244 task_delete_ro
= asyncio
.ensure_future(
4246 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4249 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4251 # rest of staff will be done at finally
4254 ROclient
.ROClientException
,
4259 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4261 except asyncio
.CancelledError
:
4263 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4265 exc
= "Operation was cancelled"
4266 except Exception as e
:
4267 exc
= traceback
.format_exc()
4268 self
.logger
.critical(
4269 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4274 error_list
.append(str(exc
))
4276 # wait for pending tasks
4278 stage
[1] = "Waiting for terminate pending tasks."
4279 self
.logger
.debug(logging_text
+ stage
[1])
4280 error_list
+= await self
._wait
_for
_tasks
(
4283 timeout_ns_terminate
,
4287 stage
[1] = stage
[2] = ""
4288 except asyncio
.CancelledError
:
4289 error_list
.append("Cancelled")
4290 # TODO cancell all tasks
4291 except Exception as exc
:
4292 error_list
.append(str(exc
))
4293 # update status at database
4295 error_detail
= "; ".join(error_list
)
4296 # self.logger.error(logging_text + error_detail)
4297 error_description_nslcmop
= "{} Detail: {}".format(
4298 stage
[0], error_detail
4300 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4301 nslcmop_id
, stage
[0]
4304 db_nsr_update
["operational-status"] = "failed"
4305 db_nsr_update
["detailed-status"] = (
4306 error_description_nsr
+ " Detail: " + error_detail
4308 db_nslcmop_update
["detailed-status"] = error_detail
4309 nslcmop_operation_state
= "FAILED"
4313 error_description_nsr
= error_description_nslcmop
= None
4314 ns_state
= "NOT_INSTANTIATED"
4315 db_nsr_update
["operational-status"] = "terminated"
4316 db_nsr_update
["detailed-status"] = "Done"
4317 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4318 db_nslcmop_update
["detailed-status"] = "Done"
4319 nslcmop_operation_state
= "COMPLETED"
4322 self
._write
_ns
_status
(
4325 current_operation
="IDLE",
4326 current_operation_id
=None,
4327 error_description
=error_description_nsr
,
4328 error_detail
=error_detail
,
4329 other_update
=db_nsr_update
,
4331 self
._write
_op
_status
(
4334 error_message
=error_description_nslcmop
,
4335 operation_state
=nslcmop_operation_state
,
4336 other_update
=db_nslcmop_update
,
4338 if ns_state
== "NOT_INSTANTIATED":
4342 {"nsr-id-ref": nsr_id
},
4343 {"_admin.nsState": "NOT_INSTANTIATED"},
4345 except DbException
as e
:
4348 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4352 if operation_params
:
4353 autoremove
= operation_params
.get("autoremove", False)
4354 if nslcmop_operation_state
:
4356 await self
.msg
.aiowrite(
4361 "nslcmop_id": nslcmop_id
,
4362 "operationState": nslcmop_operation_state
,
4363 "autoremove": autoremove
,
4367 except Exception as e
:
4369 logging_text
+ "kafka_write notification Exception {}".format(e
)
4372 self
.logger
.debug(logging_text
+ "Exit")
4373 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4375 async def _wait_for_tasks(
4376 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4379 error_detail_list
= []
4381 pending_tasks
= list(created_tasks_info
.keys())
4382 num_tasks
= len(pending_tasks
)
4384 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4385 self
._write
_op
_status
(nslcmop_id
, stage
)
4386 while pending_tasks
:
4388 _timeout
= timeout
+ time_start
- time()
4389 done
, pending_tasks
= await asyncio
.wait(
4390 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4392 num_done
+= len(done
)
4393 if not done
: # Timeout
4394 for task
in pending_tasks
:
4395 new_error
= created_tasks_info
[task
] + ": Timeout"
4396 error_detail_list
.append(new_error
)
4397 error_list
.append(new_error
)
4400 if task
.cancelled():
4403 exc
= task
.exception()
4405 if isinstance(exc
, asyncio
.TimeoutError
):
4407 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4408 error_list
.append(created_tasks_info
[task
])
4409 error_detail_list
.append(new_error
)
4416 ROclient
.ROClientException
,
4422 self
.logger
.error(logging_text
+ new_error
)
4424 exc_traceback
= "".join(
4425 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4429 + created_tasks_info
[task
]
4435 logging_text
+ created_tasks_info
[task
] + ": Done"
4437 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4439 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4440 if nsr_id
: # update also nsr
4445 "errorDescription": "Error at: " + ", ".join(error_list
),
4446 "errorDetail": ". ".join(error_detail_list
),
4449 self
._write
_op
_status
(nslcmop_id
, stage
)
4450 return error_detail_list
4453 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4455 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4456 The default-value is used. If it is between < > it look for a value at instantiation_params
4457 :param primitive_desc: portion of VNFD/NSD that describes primitive
4458 :param params: Params provided by user
4459 :param instantiation_params: Instantiation params provided by user
4460 :return: a dictionary with the calculated params
4462 calculated_params
= {}
4463 for parameter
in primitive_desc
.get("parameter", ()):
4464 param_name
= parameter
["name"]
4465 if param_name
in params
:
4466 calculated_params
[param_name
] = params
[param_name
]
4467 elif "default-value" in parameter
or "value" in parameter
:
4468 if "value" in parameter
:
4469 calculated_params
[param_name
] = parameter
["value"]
4471 calculated_params
[param_name
] = parameter
["default-value"]
4473 isinstance(calculated_params
[param_name
], str)
4474 and calculated_params
[param_name
].startswith("<")
4475 and calculated_params
[param_name
].endswith(">")
4477 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4478 calculated_params
[param_name
] = instantiation_params
[
4479 calculated_params
[param_name
][1:-1]
4483 "Parameter {} needed to execute primitive {} not provided".format(
4484 calculated_params
[param_name
], primitive_desc
["name"]
4489 "Parameter {} needed to execute primitive {} not provided".format(
4490 param_name
, primitive_desc
["name"]
4494 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4495 calculated_params
[param_name
] = yaml
.safe_dump(
4496 calculated_params
[param_name
], default_flow_style
=True, width
=256
4498 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4500 ].startswith("!!yaml "):
4501 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4502 if parameter
.get("data-type") == "INTEGER":
4504 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4505 except ValueError: # error converting string to int
4507 "Parameter {} of primitive {} must be integer".format(
4508 param_name
, primitive_desc
["name"]
4511 elif parameter
.get("data-type") == "BOOLEAN":
4512 calculated_params
[param_name
] = not (
4513 (str(calculated_params
[param_name
])).lower() == "false"
4516 # add always ns_config_info if primitive name is config
4517 if primitive_desc
["name"] == "config":
4518 if "ns_config_info" in instantiation_params
:
4519 calculated_params
["ns_config_info"] = instantiation_params
[
4522 return calculated_params
4524 def _look_for_deployed_vca(
4531 ee_descriptor_id
=None,
4533 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4534 for vca
in deployed_vca
:
4537 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4540 vdu_count_index
is not None
4541 and vdu_count_index
!= vca
["vdu_count_index"]
4544 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4546 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4550 # vca_deployed not found
4552 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4553 " is not deployed".format(
4562 ee_id
= vca
.get("ee_id")
4564 "type", "lxc_proxy_charm"
4565 ) # default value for backward compatibility - proxy charm
4568 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4569 "execution environment".format(
4570 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4573 return ee_id
, vca_type
4575 async def _ns_execute_primitive(
4581 retries_interval
=30,
4588 if primitive
== "config":
4589 primitive_params
= {"params": primitive_params
}
4591 vca_type
= vca_type
or "lxc_proxy_charm"
4595 output
= await asyncio
.wait_for(
4596 self
.vca_map
[vca_type
].exec_primitive(
4598 primitive_name
=primitive
,
4599 params_dict
=primitive_params
,
4600 progress_timeout
=self
.timeout_progress_primitive
,
4601 total_timeout
=self
.timeout_primitive
,
4606 timeout
=timeout
or self
.timeout_primitive
,
4610 except asyncio
.CancelledError
:
4612 except Exception as e
: # asyncio.TimeoutError
4613 if isinstance(e
, asyncio
.TimeoutError
):
4618 "Error executing action {} on {} -> {}".format(
4623 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4625 return "FAILED", str(e
)
4627 return "COMPLETED", output
4629 except (LcmException
, asyncio
.CancelledError
):
4631 except Exception as e
:
4632 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4634 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4636 Updating the vca_status with latest juju information in nsrs record
4637 :param: nsr_id: Id of the nsr
4638 :param: nslcmop_id: Id of the nslcmop
4642 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4643 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4644 vca_id
= self
.get_vca_id({}, db_nsr
)
4645 if db_nsr
["_admin"]["deployed"]["K8s"]:
4646 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4647 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4648 await self
._on
_update
_k
8s
_db
(
4649 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4652 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4653 table
, filter = "nsrs", {"_id": nsr_id
}
4654 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4655 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4657 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4658 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4660 async def action(self
, nsr_id
, nslcmop_id
):
4661 # Try to lock HA task here
4662 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4663 if not task_is_locked_by_me
:
4666 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4667 self
.logger
.debug(logging_text
+ "Enter")
4668 # get all needed from database
4672 db_nslcmop_update
= {}
4673 nslcmop_operation_state
= None
4674 error_description_nslcmop
= None
4677 # wait for any previous tasks in process
4678 step
= "Waiting for previous operations to terminate"
4679 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4681 self
._write
_ns
_status
(
4684 current_operation
="RUNNING ACTION",
4685 current_operation_id
=nslcmop_id
,
4688 step
= "Getting information from database"
4689 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4690 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4691 if db_nslcmop
["operationParams"].get("primitive_params"):
4692 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4693 db_nslcmop
["operationParams"]["primitive_params"]
4696 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4697 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4698 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4699 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4700 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4701 primitive
= db_nslcmop
["operationParams"]["primitive"]
4702 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4703 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4704 "timeout_ns_action", self
.timeout_primitive
4708 step
= "Getting vnfr from database"
4709 db_vnfr
= self
.db
.get_one(
4710 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4712 if db_vnfr
.get("kdur"):
4714 for kdur
in db_vnfr
["kdur"]:
4715 if kdur
.get("additionalParams"):
4716 kdur
["additionalParams"] = json
.loads(
4717 kdur
["additionalParams"]
4719 kdur_list
.append(kdur
)
4720 db_vnfr
["kdur"] = kdur_list
4721 step
= "Getting vnfd from database"
4722 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4724 step
= "Getting nsd from database"
4725 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4727 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4728 # for backward compatibility
4729 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4730 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4731 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4732 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4734 # look for primitive
4735 config_primitive_desc
= descriptor_configuration
= None
4737 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4739 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4741 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4743 descriptor_configuration
= db_nsd
.get("ns-configuration")
4745 if descriptor_configuration
and descriptor_configuration
.get(
4748 for config_primitive
in descriptor_configuration
["config-primitive"]:
4749 if config_primitive
["name"] == primitive
:
4750 config_primitive_desc
= config_primitive
4753 if not config_primitive_desc
:
4754 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4756 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4760 primitive_name
= primitive
4761 ee_descriptor_id
= None
4763 primitive_name
= config_primitive_desc
.get(
4764 "execution-environment-primitive", primitive
4766 ee_descriptor_id
= config_primitive_desc
.get(
4767 "execution-environment-ref"
4773 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4775 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4778 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4780 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4782 desc_params
= parse_yaml_strings(
4783 db_vnfr
.get("additionalParamsForVnf")
4786 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4787 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4788 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4790 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4791 actions
.add(primitive
["name"])
4792 for primitive
in kdu_configuration
.get("config-primitive", []):
4793 actions
.add(primitive
["name"])
4794 kdu_action
= True if primitive_name
in actions
else False
4796 # TODO check if ns is in a proper status
4798 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4800 # kdur and desc_params already set from before
4801 if primitive_params
:
4802 desc_params
.update(primitive_params
)
4803 # TODO Check if we will need something at vnf level
4804 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4806 kdu_name
== kdu
["kdu-name"]
4807 and kdu
["member-vnf-index"] == vnf_index
4812 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4815 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4816 msg
= "unknown k8scluster-type '{}'".format(
4817 kdu
.get("k8scluster-type")
4819 raise LcmException(msg
)
4822 "collection": "nsrs",
4823 "filter": {"_id": nsr_id
},
4824 "path": "_admin.deployed.K8s.{}".format(index
),
4828 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4830 step
= "Executing kdu {}".format(primitive_name
)
4831 if primitive_name
== "upgrade":
4832 if desc_params
.get("kdu_model"):
4833 kdu_model
= desc_params
.get("kdu_model")
4834 del desc_params
["kdu_model"]
4836 kdu_model
= kdu
.get("kdu-model")
4837 parts
= kdu_model
.split(sep
=":")
4839 kdu_model
= parts
[0]
4841 detailed_status
= await asyncio
.wait_for(
4842 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4843 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4844 kdu_instance
=kdu
.get("kdu-instance"),
4846 kdu_model
=kdu_model
,
4849 timeout
=timeout_ns_action
,
4851 timeout
=timeout_ns_action
+ 10,
4854 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4856 elif primitive_name
== "rollback":
4857 detailed_status
= await asyncio
.wait_for(
4858 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4859 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4860 kdu_instance
=kdu
.get("kdu-instance"),
4863 timeout
=timeout_ns_action
,
4865 elif primitive_name
== "status":
4866 detailed_status
= await asyncio
.wait_for(
4867 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4868 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4869 kdu_instance
=kdu
.get("kdu-instance"),
4872 timeout
=timeout_ns_action
,
4875 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4876 kdu
["kdu-name"], nsr_id
4878 params
= self
._map
_primitive
_params
(
4879 config_primitive_desc
, primitive_params
, desc_params
4882 detailed_status
= await asyncio
.wait_for(
4883 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4884 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4885 kdu_instance
=kdu_instance
,
4886 primitive_name
=primitive_name
,
4889 timeout
=timeout_ns_action
,
4892 timeout
=timeout_ns_action
,
4896 nslcmop_operation_state
= "COMPLETED"
4898 detailed_status
= ""
4899 nslcmop_operation_state
= "FAILED"
4901 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4902 nsr_deployed
["VCA"],
4903 member_vnf_index
=vnf_index
,
4905 vdu_count_index
=vdu_count_index
,
4906 ee_descriptor_id
=ee_descriptor_id
,
4908 for vca_index
, vca_deployed
in enumerate(
4909 db_nsr
["_admin"]["deployed"]["VCA"]
4911 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4913 "collection": "nsrs",
4914 "filter": {"_id": nsr_id
},
4915 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4919 nslcmop_operation_state
,
4921 ) = await self
._ns
_execute
_primitive
(
4923 primitive
=primitive_name
,
4924 primitive_params
=self
._map
_primitive
_params
(
4925 config_primitive_desc
, primitive_params
, desc_params
4927 timeout
=timeout_ns_action
,
4933 db_nslcmop_update
["detailed-status"] = detailed_status
4934 error_description_nslcmop
= (
4935 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
4939 + " task Done with result {} {}".format(
4940 nslcmop_operation_state
, detailed_status
4943 return # database update is called inside finally
4945 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
4946 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4948 except asyncio
.CancelledError
:
4950 logging_text
+ "Cancelled Exception while '{}'".format(step
)
4952 exc
= "Operation was cancelled"
4953 except asyncio
.TimeoutError
:
4954 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
4956 except Exception as e
:
4957 exc
= traceback
.format_exc()
4958 self
.logger
.critical(
4959 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
4968 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4969 nslcmop_operation_state
= "FAILED"
4971 self
._write
_ns
_status
(
4975 ], # TODO check if degraded. For the moment use previous status
4976 current_operation
="IDLE",
4977 current_operation_id
=None,
4978 # error_description=error_description_nsr,
4979 # error_detail=error_detail,
4980 other_update
=db_nsr_update
,
4983 self
._write
_op
_status
(
4986 error_message
=error_description_nslcmop
,
4987 operation_state
=nslcmop_operation_state
,
4988 other_update
=db_nslcmop_update
,
4991 if nslcmop_operation_state
:
4993 await self
.msg
.aiowrite(
4998 "nslcmop_id": nslcmop_id
,
4999 "operationState": nslcmop_operation_state
,
5003 except Exception as e
:
5005 logging_text
+ "kafka_write notification Exception {}".format(e
)
5007 self
.logger
.debug(logging_text
+ "Exit")
5008 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5009 return nslcmop_operation_state
, detailed_status
5011 async def scale(self
, nsr_id
, nslcmop_id
):
5012 # Try to lock HA task here
5013 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5014 if not task_is_locked_by_me
:
5017 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5018 stage
= ["", "", ""]
5019 tasks_dict_info
= {}
5020 # ^ stage, step, VIM progress
5021 self
.logger
.debug(logging_text
+ "Enter")
5022 # get all needed from database
5024 db_nslcmop_update
= {}
5027 # in case of error, indicates what part of scale was failed to put nsr at error status
5028 scale_process
= None
5029 old_operational_status
= ""
5030 old_config_status
= ""
5033 # wait for any previous tasks in process
5034 step
= "Waiting for previous operations to terminate"
5035 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5036 self
._write
_ns
_status
(
5039 current_operation
="SCALING",
5040 current_operation_id
=nslcmop_id
,
5043 step
= "Getting nslcmop from database"
5045 step
+ " after having waited for previous tasks to be completed"
5047 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5049 step
= "Getting nsr from database"
5050 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5051 old_operational_status
= db_nsr
["operational-status"]
5052 old_config_status
= db_nsr
["config-status"]
5054 step
= "Parsing scaling parameters"
5055 db_nsr_update
["operational-status"] = "scaling"
5056 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5057 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5059 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5061 ]["member-vnf-index"]
5062 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5064 ]["scaling-group-descriptor"]
5065 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5066 # for backward compatibility
5067 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5068 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5069 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5070 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5072 step
= "Getting vnfr from database"
5073 db_vnfr
= self
.db
.get_one(
5074 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5077 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5079 step
= "Getting vnfd from database"
5080 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5082 base_folder
= db_vnfd
["_admin"]["storage"]
5084 step
= "Getting scaling-group-descriptor"
5085 scaling_descriptor
= find_in_list(
5086 get_scaling_aspect(db_vnfd
),
5087 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5089 if not scaling_descriptor
:
5091 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5092 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5095 step
= "Sending scale order to VIM"
5096 # TODO check if ns is in a proper status
5098 if not db_nsr
["_admin"].get("scaling-group"):
5103 "_admin.scaling-group": [
5104 {"name": scaling_group
, "nb-scale-op": 0}
5108 admin_scale_index
= 0
5110 for admin_scale_index
, admin_scale_info
in enumerate(
5111 db_nsr
["_admin"]["scaling-group"]
5113 if admin_scale_info
["name"] == scaling_group
:
5114 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5116 else: # not found, set index one plus last element and add new entry with the name
5117 admin_scale_index
+= 1
5119 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5122 vca_scaling_info
= []
5123 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5124 if scaling_type
== "SCALE_OUT":
5125 if "aspect-delta-details" not in scaling_descriptor
:
5127 "Aspect delta details not fount in scaling descriptor {}".format(
5128 scaling_descriptor
["name"]
5131 # count if max-instance-count is reached
5132 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5134 scaling_info
["scaling_direction"] = "OUT"
5135 scaling_info
["vdu-create"] = {}
5136 scaling_info
["kdu-create"] = {}
5137 for delta
in deltas
:
5138 for vdu_delta
in delta
.get("vdu-delta", {}):
5139 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5140 # vdu_index also provides the number of instance of the targeted vdu
5141 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5142 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5146 additional_params
= (
5147 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5150 cloud_init_list
= []
5152 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5153 max_instance_count
= 10
5154 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5155 max_instance_count
= vdu_profile
.get(
5156 "max-number-of-instances", 10
5159 default_instance_num
= get_number_of_instances(
5162 instances_number
= vdu_delta
.get("number-of-instances", 1)
5163 nb_scale_op
+= instances_number
5165 new_instance_count
= nb_scale_op
+ default_instance_num
5166 # Control if new count is over max and vdu count is less than max.
5167 # Then assign new instance count
5168 if new_instance_count
> max_instance_count
> vdu_count
:
5169 instances_number
= new_instance_count
- max_instance_count
5171 instances_number
= instances_number
5173 if new_instance_count
> max_instance_count
:
5175 "reached the limit of {} (max-instance-count) "
5176 "scaling-out operations for the "
5177 "scaling-group-descriptor '{}'".format(
5178 nb_scale_op
, scaling_group
5181 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5183 # TODO Information of its own ip is not available because db_vnfr is not updated.
5184 additional_params
["OSM"] = get_osm_params(
5185 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5187 cloud_init_list
.append(
5188 self
._parse
_cloud
_init
(
5195 vca_scaling_info
.append(
5197 "osm_vdu_id": vdu_delta
["id"],
5198 "member-vnf-index": vnf_index
,
5200 "vdu_index": vdu_index
+ x
,
5203 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5204 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5205 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5206 kdu_name
= kdu_profile
["kdu-name"]
5207 resource_name
= kdu_profile
["resource-name"]
5209 # Might have different kdus in the same delta
5210 # Should have list for each kdu
5211 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5212 scaling_info
["kdu-create"][kdu_name
] = []
5214 kdur
= get_kdur(db_vnfr
, kdu_name
)
5215 if kdur
.get("helm-chart"):
5216 k8s_cluster_type
= "helm-chart-v3"
5217 self
.logger
.debug("kdur: {}".format(kdur
))
5219 kdur
.get("helm-version")
5220 and kdur
.get("helm-version") == "v2"
5222 k8s_cluster_type
= "helm-chart"
5223 raise NotImplementedError
5224 elif kdur
.get("juju-bundle"):
5225 k8s_cluster_type
= "juju-bundle"
5228 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5229 "juju-bundle. Maybe an old NBI version is running".format(
5230 db_vnfr
["member-vnf-index-ref"], kdu_name
5234 max_instance_count
= 10
5235 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5236 max_instance_count
= kdu_profile
.get(
5237 "max-number-of-instances", 10
5240 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5241 deployed_kdu
, _
= get_deployed_kdu(
5242 nsr_deployed
, kdu_name
, vnf_index
5244 if deployed_kdu
is None:
5246 "KDU '{}' for vnf '{}' not deployed".format(
5250 kdu_instance
= deployed_kdu
.get("kdu-instance")
5251 instance_num
= await self
.k8scluster_map
[
5253 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5254 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5255 "number-of-instances", 1
5258 # Control if new count is over max and instance_num is less than max.
5259 # Then assign max instance number to kdu replica count
5260 if kdu_replica_count
> max_instance_count
> instance_num
:
5261 kdu_replica_count
= max_instance_count
5262 if kdu_replica_count
> max_instance_count
:
5264 "reached the limit of {} (max-instance-count) "
5265 "scaling-out operations for the "
5266 "scaling-group-descriptor '{}'".format(
5267 instance_num
, scaling_group
5271 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5272 vca_scaling_info
.append(
5274 "osm_kdu_id": kdu_name
,
5275 "member-vnf-index": vnf_index
,
5277 "kdu_index": instance_num
+ x
- 1,
5280 scaling_info
["kdu-create"][kdu_name
].append(
5282 "member-vnf-index": vnf_index
,
5284 "k8s-cluster-type": k8s_cluster_type
,
5285 "resource-name": resource_name
,
5286 "scale": kdu_replica_count
,
5289 elif scaling_type
== "SCALE_IN":
5290 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5292 scaling_info
["scaling_direction"] = "IN"
5293 scaling_info
["vdu-delete"] = {}
5294 scaling_info
["kdu-delete"] = {}
5296 for delta
in deltas
:
5297 for vdu_delta
in delta
.get("vdu-delta", {}):
5298 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5299 min_instance_count
= 0
5300 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5301 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5302 min_instance_count
= vdu_profile
["min-number-of-instances"]
5304 default_instance_num
= get_number_of_instances(
5305 db_vnfd
, vdu_delta
["id"]
5307 instance_num
= vdu_delta
.get("number-of-instances", 1)
5308 nb_scale_op
-= instance_num
5310 new_instance_count
= nb_scale_op
+ default_instance_num
5312 if new_instance_count
< min_instance_count
< vdu_count
:
5313 instances_number
= min_instance_count
- new_instance_count
5315 instances_number
= instance_num
5317 if new_instance_count
< min_instance_count
:
5319 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5320 "scaling-group-descriptor '{}'".format(
5321 nb_scale_op
, scaling_group
5324 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5325 vca_scaling_info
.append(
5327 "osm_vdu_id": vdu_delta
["id"],
5328 "member-vnf-index": vnf_index
,
5330 "vdu_index": vdu_index
- 1 - x
,
5333 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5334 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5335 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5336 kdu_name
= kdu_profile
["kdu-name"]
5337 resource_name
= kdu_profile
["resource-name"]
5339 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5340 scaling_info
["kdu-delete"][kdu_name
] = []
5342 kdur
= get_kdur(db_vnfr
, kdu_name
)
5343 if kdur
.get("helm-chart"):
5344 k8s_cluster_type
= "helm-chart-v3"
5345 self
.logger
.debug("kdur: {}".format(kdur
))
5347 kdur
.get("helm-version")
5348 and kdur
.get("helm-version") == "v2"
5350 k8s_cluster_type
= "helm-chart"
5351 raise NotImplementedError
5352 elif kdur
.get("juju-bundle"):
5353 k8s_cluster_type
= "juju-bundle"
5356 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5357 "juju-bundle. Maybe an old NBI version is running".format(
5358 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5362 min_instance_count
= 0
5363 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5364 min_instance_count
= kdu_profile
["min-number-of-instances"]
5366 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5367 deployed_kdu
, _
= get_deployed_kdu(
5368 nsr_deployed
, kdu_name
, vnf_index
5370 if deployed_kdu
is None:
5372 "KDU '{}' for vnf '{}' not deployed".format(
5376 kdu_instance
= deployed_kdu
.get("kdu-instance")
5377 instance_num
= await self
.k8scluster_map
[
5379 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5380 kdu_replica_count
= instance_num
- kdu_delta
.get(
5381 "number-of-instances", 1
5384 if kdu_replica_count
< min_instance_count
< instance_num
:
5385 kdu_replica_count
= min_instance_count
5386 if kdu_replica_count
< min_instance_count
:
5388 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5389 "scaling-group-descriptor '{}'".format(
5390 instance_num
, scaling_group
5394 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5395 vca_scaling_info
.append(
5397 "osm_kdu_id": kdu_name
,
5398 "member-vnf-index": vnf_index
,
5400 "kdu_index": instance_num
- x
- 1,
5403 scaling_info
["kdu-delete"][kdu_name
].append(
5405 "member-vnf-index": vnf_index
,
5407 "k8s-cluster-type": k8s_cluster_type
,
5408 "resource-name": resource_name
,
5409 "scale": kdu_replica_count
,
5413 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5414 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5415 if scaling_info
["scaling_direction"] == "IN":
5416 for vdur
in reversed(db_vnfr
["vdur"]):
5417 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5418 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5419 scaling_info
["vdu"].append(
5421 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5422 "vdu_id": vdur
["vdu-id-ref"],
5426 for interface
in vdur
["interfaces"]:
5427 scaling_info
["vdu"][-1]["interface"].append(
5429 "name": interface
["name"],
5430 "ip_address": interface
["ip-address"],
5431 "mac_address": interface
.get("mac-address"),
5434 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5437 step
= "Executing pre-scale vnf-config-primitive"
5438 if scaling_descriptor
.get("scaling-config-action"):
5439 for scaling_config_action
in scaling_descriptor
[
5440 "scaling-config-action"
5443 scaling_config_action
.get("trigger") == "pre-scale-in"
5444 and scaling_type
== "SCALE_IN"
5446 scaling_config_action
.get("trigger") == "pre-scale-out"
5447 and scaling_type
== "SCALE_OUT"
5449 vnf_config_primitive
= scaling_config_action
[
5450 "vnf-config-primitive-name-ref"
5452 step
= db_nslcmop_update
[
5454 ] = "executing pre-scale scaling-config-action '{}'".format(
5455 vnf_config_primitive
5458 # look for primitive
5459 for config_primitive
in (
5460 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5461 ).get("config-primitive", ()):
5462 if config_primitive
["name"] == vnf_config_primitive
:
5466 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5467 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5468 "primitive".format(scaling_group
, vnf_config_primitive
)
5471 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5472 if db_vnfr
.get("additionalParamsForVnf"):
5473 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5475 scale_process
= "VCA"
5476 db_nsr_update
["config-status"] = "configuring pre-scaling"
5477 primitive_params
= self
._map
_primitive
_params
(
5478 config_primitive
, {}, vnfr_params
5481 # Pre-scale retry check: Check if this sub-operation has been executed before
5482 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5485 vnf_config_primitive
,
5489 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5490 # Skip sub-operation
5491 result
= "COMPLETED"
5492 result_detail
= "Done"
5495 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5496 vnf_config_primitive
, result
, result_detail
5500 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5501 # New sub-operation: Get index of this sub-operation
5503 len(db_nslcmop
.get("_admin", {}).get("operations"))
5508 + "vnf_config_primitive={} New sub-operation".format(
5509 vnf_config_primitive
5513 # retry: Get registered params for this existing sub-operation
5514 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5517 vnf_index
= op
.get("member_vnf_index")
5518 vnf_config_primitive
= op
.get("primitive")
5519 primitive_params
= op
.get("primitive_params")
5522 + "vnf_config_primitive={} Sub-operation retry".format(
5523 vnf_config_primitive
5526 # Execute the primitive, either with new (first-time) or registered (reintent) args
5527 ee_descriptor_id
= config_primitive
.get(
5528 "execution-environment-ref"
5530 primitive_name
= config_primitive
.get(
5531 "execution-environment-primitive", vnf_config_primitive
5533 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5534 nsr_deployed
["VCA"],
5535 member_vnf_index
=vnf_index
,
5537 vdu_count_index
=None,
5538 ee_descriptor_id
=ee_descriptor_id
,
5540 result
, result_detail
= await self
._ns
_execute
_primitive
(
5549 + "vnf_config_primitive={} Done with result {} {}".format(
5550 vnf_config_primitive
, result
, result_detail
5553 # Update operationState = COMPLETED | FAILED
5554 self
._update
_suboperation
_status
(
5555 db_nslcmop
, op_index
, result
, result_detail
5558 if result
== "FAILED":
5559 raise LcmException(result_detail
)
5560 db_nsr_update
["config-status"] = old_config_status
5561 scale_process
= None
5565 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5568 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5571 # SCALE-IN VCA - BEGIN
5572 if vca_scaling_info
:
5573 step
= db_nslcmop_update
[
5575 ] = "Deleting the execution environments"
5576 scale_process
= "VCA"
5577 for vca_info
in vca_scaling_info
:
5578 if vca_info
["type"] == "delete":
5579 member_vnf_index
= str(vca_info
["member-vnf-index"])
5581 logging_text
+ "vdu info: {}".format(vca_info
)
5583 if vca_info
.get("osm_vdu_id"):
5584 vdu_id
= vca_info
["osm_vdu_id"]
5585 vdu_index
= int(vca_info
["vdu_index"])
5588 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5589 member_vnf_index
, vdu_id
, vdu_index
5593 kdu_id
= vca_info
["osm_kdu_id"]
5596 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5597 member_vnf_index
, kdu_id
, vdu_index
5599 stage
[2] = step
= "Scaling in VCA"
5600 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5601 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5602 config_update
= db_nsr
["configurationStatus"]
5603 for vca_index
, vca
in enumerate(vca_update
):
5605 (vca
or vca
.get("ee_id"))
5606 and vca
["member-vnf-index"] == member_vnf_index
5607 and vca
["vdu_count_index"] == vdu_index
5609 if vca
.get("vdu_id"):
5610 config_descriptor
= get_configuration(
5611 db_vnfd
, vca
.get("vdu_id")
5613 elif vca
.get("kdu_name"):
5614 config_descriptor
= get_configuration(
5615 db_vnfd
, vca
.get("kdu_name")
5618 config_descriptor
= get_configuration(
5619 db_vnfd
, db_vnfd
["id"]
5621 operation_params
= (
5622 db_nslcmop
.get("operationParams") or {}
5624 exec_terminate_primitives
= not operation_params
.get(
5625 "skip_terminate_primitives"
5626 ) and vca
.get("needed_terminate")
5627 task
= asyncio
.ensure_future(
5636 exec_primitives
=exec_terminate_primitives
,
5640 timeout
=self
.timeout_charm_delete
,
5643 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5646 del vca_update
[vca_index
]
5647 del config_update
[vca_index
]
5648 # wait for pending tasks of terminate primitives
5652 + "Waiting for tasks {}".format(
5653 list(tasks_dict_info
.keys())
5656 error_list
= await self
._wait
_for
_tasks
(
5660 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5665 tasks_dict_info
.clear()
5667 raise LcmException("; ".join(error_list
))
5669 db_vca_and_config_update
= {
5670 "_admin.deployed.VCA": vca_update
,
5671 "configurationStatus": config_update
,
5674 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5676 scale_process
= None
5677 # SCALE-IN VCA - END
5680 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5681 scale_process
= "RO"
5682 if self
.ro_config
.get("ng"):
5683 await self
._scale
_ng
_ro
(
5684 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5686 scaling_info
.pop("vdu-create", None)
5687 scaling_info
.pop("vdu-delete", None)
5689 scale_process
= None
5693 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5694 scale_process
= "KDU"
5695 await self
._scale
_kdu
(
5696 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5698 scaling_info
.pop("kdu-create", None)
5699 scaling_info
.pop("kdu-delete", None)
5701 scale_process
= None
5705 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5707 # SCALE-UP VCA - BEGIN
5708 if vca_scaling_info
:
5709 step
= db_nslcmop_update
[
5711 ] = "Creating new execution environments"
5712 scale_process
= "VCA"
5713 for vca_info
in vca_scaling_info
:
5714 if vca_info
["type"] == "create":
5715 member_vnf_index
= str(vca_info
["member-vnf-index"])
5717 logging_text
+ "vdu info: {}".format(vca_info
)
5719 vnfd_id
= db_vnfr
["vnfd-ref"]
5720 if vca_info
.get("osm_vdu_id"):
5721 vdu_index
= int(vca_info
["vdu_index"])
5722 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5723 if db_vnfr
.get("additionalParamsForVnf"):
5724 deploy_params
.update(
5726 db_vnfr
["additionalParamsForVnf"].copy()
5729 descriptor_config
= get_configuration(
5730 db_vnfd
, db_vnfd
["id"]
5732 if descriptor_config
:
5737 logging_text
=logging_text
5738 + "member_vnf_index={} ".format(member_vnf_index
),
5741 nslcmop_id
=nslcmop_id
,
5747 member_vnf_index
=member_vnf_index
,
5748 vdu_index
=vdu_index
,
5750 deploy_params
=deploy_params
,
5751 descriptor_config
=descriptor_config
,
5752 base_folder
=base_folder
,
5753 task_instantiation_info
=tasks_dict_info
,
5756 vdu_id
= vca_info
["osm_vdu_id"]
5757 vdur
= find_in_list(
5758 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5760 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5761 if vdur
.get("additionalParams"):
5762 deploy_params_vdu
= parse_yaml_strings(
5763 vdur
["additionalParams"]
5766 deploy_params_vdu
= deploy_params
5767 deploy_params_vdu
["OSM"] = get_osm_params(
5768 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5770 if descriptor_config
:
5775 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5776 member_vnf_index
, vdu_id
, vdu_index
5778 stage
[2] = step
= "Scaling out VCA"
5779 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5781 logging_text
=logging_text
5782 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5783 member_vnf_index
, vdu_id
, vdu_index
5787 nslcmop_id
=nslcmop_id
,
5793 member_vnf_index
=member_vnf_index
,
5794 vdu_index
=vdu_index
,
5796 deploy_params
=deploy_params_vdu
,
5797 descriptor_config
=descriptor_config
,
5798 base_folder
=base_folder
,
5799 task_instantiation_info
=tasks_dict_info
,
5803 kdu_name
= vca_info
["osm_kdu_id"]
5804 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5805 if descriptor_config
:
5807 kdu_index
= int(vca_info
["kdu_index"])
5811 for x
in db_vnfr
["kdur"]
5812 if x
["kdu-name"] == kdu_name
5814 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5815 if kdur
.get("additionalParams"):
5816 deploy_params_kdu
= parse_yaml_strings(
5817 kdur
["additionalParams"]
5821 logging_text
=logging_text
,
5824 nslcmop_id
=nslcmop_id
,
5830 member_vnf_index
=member_vnf_index
,
5831 vdu_index
=kdu_index
,
5833 deploy_params
=deploy_params_kdu
,
5834 descriptor_config
=descriptor_config
,
5835 base_folder
=base_folder
,
5836 task_instantiation_info
=tasks_dict_info
,
5839 # SCALE-UP VCA - END
5840 scale_process
= None
5843 # execute primitive service POST-SCALING
5844 step
= "Executing post-scale vnf-config-primitive"
5845 if scaling_descriptor
.get("scaling-config-action"):
5846 for scaling_config_action
in scaling_descriptor
[
5847 "scaling-config-action"
5850 scaling_config_action
.get("trigger") == "post-scale-in"
5851 and scaling_type
== "SCALE_IN"
5853 scaling_config_action
.get("trigger") == "post-scale-out"
5854 and scaling_type
== "SCALE_OUT"
5856 vnf_config_primitive
= scaling_config_action
[
5857 "vnf-config-primitive-name-ref"
5859 step
= db_nslcmop_update
[
5861 ] = "executing post-scale scaling-config-action '{}'".format(
5862 vnf_config_primitive
5865 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5866 if db_vnfr
.get("additionalParamsForVnf"):
5867 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5869 # look for primitive
5870 for config_primitive
in (
5871 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5872 ).get("config-primitive", ()):
5873 if config_primitive
["name"] == vnf_config_primitive
:
5877 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5878 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5879 "config-primitive".format(
5880 scaling_group
, vnf_config_primitive
5883 scale_process
= "VCA"
5884 db_nsr_update
["config-status"] = "configuring post-scaling"
5885 primitive_params
= self
._map
_primitive
_params
(
5886 config_primitive
, {}, vnfr_params
5889 # Post-scale retry check: Check if this sub-operation has been executed before
5890 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5893 vnf_config_primitive
,
5897 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5898 # Skip sub-operation
5899 result
= "COMPLETED"
5900 result_detail
= "Done"
5903 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5904 vnf_config_primitive
, result
, result_detail
5908 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5909 # New sub-operation: Get index of this sub-operation
5911 len(db_nslcmop
.get("_admin", {}).get("operations"))
5916 + "vnf_config_primitive={} New sub-operation".format(
5917 vnf_config_primitive
5921 # retry: Get registered params for this existing sub-operation
5922 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5925 vnf_index
= op
.get("member_vnf_index")
5926 vnf_config_primitive
= op
.get("primitive")
5927 primitive_params
= op
.get("primitive_params")
5930 + "vnf_config_primitive={} Sub-operation retry".format(
5931 vnf_config_primitive
5934 # Execute the primitive, either with new (first-time) or registered (reintent) args
5935 ee_descriptor_id
= config_primitive
.get(
5936 "execution-environment-ref"
5938 primitive_name
= config_primitive
.get(
5939 "execution-environment-primitive", vnf_config_primitive
5941 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5942 nsr_deployed
["VCA"],
5943 member_vnf_index
=vnf_index
,
5945 vdu_count_index
=None,
5946 ee_descriptor_id
=ee_descriptor_id
,
5948 result
, result_detail
= await self
._ns
_execute
_primitive
(
5957 + "vnf_config_primitive={} Done with result {} {}".format(
5958 vnf_config_primitive
, result
, result_detail
5961 # Update operationState = COMPLETED | FAILED
5962 self
._update
_suboperation
_status
(
5963 db_nslcmop
, op_index
, result
, result_detail
5966 if result
== "FAILED":
5967 raise LcmException(result_detail
)
5968 db_nsr_update
["config-status"] = old_config_status
5969 scale_process
= None
5974 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5975 db_nsr_update
["operational-status"] = (
5977 if old_operational_status
== "failed"
5978 else old_operational_status
5980 db_nsr_update
["config-status"] = old_config_status
5983 ROclient
.ROClientException
,
5988 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5990 except asyncio
.CancelledError
:
5992 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5994 exc
= "Operation was cancelled"
5995 except Exception as e
:
5996 exc
= traceback
.format_exc()
5997 self
.logger
.critical(
5998 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6002 self
._write
_ns
_status
(
6005 current_operation
="IDLE",
6006 current_operation_id
=None,
6009 stage
[1] = "Waiting for instantiate pending tasks."
6010 self
.logger
.debug(logging_text
+ stage
[1])
6011 exc
= await self
._wait
_for
_tasks
(
6014 self
.timeout_ns_deploy
,
6022 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6023 nslcmop_operation_state
= "FAILED"
6025 db_nsr_update
["operational-status"] = old_operational_status
6026 db_nsr_update
["config-status"] = old_config_status
6027 db_nsr_update
["detailed-status"] = ""
6029 if "VCA" in scale_process
:
6030 db_nsr_update
["config-status"] = "failed"
6031 if "RO" in scale_process
:
6032 db_nsr_update
["operational-status"] = "failed"
6035 ] = "FAILED scaling nslcmop={} {}: {}".format(
6036 nslcmop_id
, step
, exc
6039 error_description_nslcmop
= None
6040 nslcmop_operation_state
= "COMPLETED"
6041 db_nslcmop_update
["detailed-status"] = "Done"
6043 self
._write
_op
_status
(
6046 error_message
=error_description_nslcmop
,
6047 operation_state
=nslcmop_operation_state
,
6048 other_update
=db_nslcmop_update
,
6051 self
._write
_ns
_status
(
6054 current_operation
="IDLE",
6055 current_operation_id
=None,
6056 other_update
=db_nsr_update
,
6059 if nslcmop_operation_state
:
6063 "nslcmop_id": nslcmop_id
,
6064 "operationState": nslcmop_operation_state
,
6066 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6067 except Exception as e
:
6069 logging_text
+ "kafka_write notification Exception {}".format(e
)
6071 self
.logger
.debug(logging_text
+ "Exit")
6072 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6074 async def _scale_kdu(
6075 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6077 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6078 for kdu_name
in _scaling_info
:
6079 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6080 deployed_kdu
, index
= get_deployed_kdu(
6081 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6083 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6084 kdu_instance
= deployed_kdu
["kdu-instance"]
6085 scale
= int(kdu_scaling_info
["scale"])
6086 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6089 "collection": "nsrs",
6090 "filter": {"_id": nsr_id
},
6091 "path": "_admin.deployed.K8s.{}".format(index
),
6094 step
= "scaling application {}".format(
6095 kdu_scaling_info
["resource-name"]
6097 self
.logger
.debug(logging_text
+ step
)
6099 if kdu_scaling_info
["type"] == "delete":
6100 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6103 and kdu_config
.get("terminate-config-primitive")
6104 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6106 terminate_config_primitive_list
= kdu_config
.get(
6107 "terminate-config-primitive"
6109 terminate_config_primitive_list
.sort(
6110 key
=lambda val
: int(val
["seq"])
6114 terminate_config_primitive
6115 ) in terminate_config_primitive_list
:
6116 primitive_params_
= self
._map
_primitive
_params
(
6117 terminate_config_primitive
, {}, {}
6119 step
= "execute terminate config primitive"
6120 self
.logger
.debug(logging_text
+ step
)
6121 await asyncio
.wait_for(
6122 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6123 cluster_uuid
=cluster_uuid
,
6124 kdu_instance
=kdu_instance
,
6125 primitive_name
=terminate_config_primitive
["name"],
6126 params
=primitive_params_
,
6133 await asyncio
.wait_for(
6134 self
.k8scluster_map
[k8s_cluster_type
].scale(
6137 kdu_scaling_info
["resource-name"],
6140 timeout
=self
.timeout_vca_on_error
,
6143 if kdu_scaling_info
["type"] == "create":
6144 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6147 and kdu_config
.get("initial-config-primitive")
6148 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6150 initial_config_primitive_list
= kdu_config
.get(
6151 "initial-config-primitive"
6153 initial_config_primitive_list
.sort(
6154 key
=lambda val
: int(val
["seq"])
6157 for initial_config_primitive
in initial_config_primitive_list
:
6158 primitive_params_
= self
._map
_primitive
_params
(
6159 initial_config_primitive
, {}, {}
6161 step
= "execute initial config primitive"
6162 self
.logger
.debug(logging_text
+ step
)
6163 await asyncio
.wait_for(
6164 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6165 cluster_uuid
=cluster_uuid
,
6166 kdu_instance
=kdu_instance
,
6167 primitive_name
=initial_config_primitive
["name"],
6168 params
=primitive_params_
,
6175 async def _scale_ng_ro(
6176 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6178 nsr_id
= db_nslcmop
["nsInstanceId"]
6179 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6182 # read from db: vnfd's for every vnf
6185 # for each vnf in ns, read vnfd
6186 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6187 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6188 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6189 # if we haven't this vnfd, read it from db
6190 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6192 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6193 db_vnfds
.append(vnfd
)
6194 n2vc_key
= self
.n2vc
.get_public_key()
6195 n2vc_key_list
= [n2vc_key
]
6198 vdu_scaling_info
.get("vdu-create"),
6199 vdu_scaling_info
.get("vdu-delete"),
6202 # db_vnfr has been updated, update db_vnfrs to use it
6203 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6204 await self
._instantiate
_ng
_ro
(
6214 start_deploy
=time(),
6215 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6217 if vdu_scaling_info
.get("vdu-delete"):
6219 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6222 async def add_prometheus_metrics(
6223 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6225 if not self
.prometheus
:
6227 # look if exist a file called 'prometheus*.j2' and
6228 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6232 for f
in artifact_content
6233 if f
.startswith("prometheus") and f
.endswith(".j2")
6239 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6243 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6244 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6246 vnfr_id
= vnfr_id
.replace("-", "")
6248 "JOB_NAME": vnfr_id
,
6249 "TARGET_IP": target_ip
,
6250 "EXPORTER_POD_IP": host_name
,
6251 "EXPORTER_POD_PORT": host_port
,
6253 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6254 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6255 for job
in job_list
:
6257 not isinstance(job
.get("job_name"), str)
6258 or vnfr_id
not in job
["job_name"]
6260 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6261 job
["nsr_id"] = nsr_id
6262 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6263 if await self
.prometheus
.update(job_dict
):
6264 return list(job_dict
.keys())
6266 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6268 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6270 :param: vim_account_id: VIM Account ID
6272 :return: (cloud_name, cloud_credential)
6274 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6275 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6277 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6279 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6281 :param: vim_account_id: VIM Account ID
6283 :return: (cloud_name, cloud_credential)
6285 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6286 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")