1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
33 from osm_lcm
import ROclient
34 from osm_lcm
.data_utils
.nsr
import get_deployed_kdu
35 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
36 from osm_lcm
.lcm_utils
import (
44 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
45 from osm_lcm
.data_utils
.vnfd
import (
48 get_ee_sorted_initial_config_primitive_list
,
49 get_ee_sorted_terminate_config_primitive_list
,
51 get_virtual_link_profiles
,
56 get_number_of_instances
,
60 from osm_lcm
.data_utils
.list_utils
import find_in_list
61 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
62 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
63 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
64 from n2vc
.k8s_helm_conn
import K8sHelmConnector
65 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
66 from n2vc
.k8s_juju_conn
import K8sJujuConnector
68 from osm_common
.dbbase
import DbException
69 from osm_common
.fsbase
import FsException
71 from osm_lcm
.data_utils
.database
.database
import Database
72 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
73 from osm_lcm
.data_utils
.wim
import (
76 select_feasible_wim_account
,
79 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
80 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
82 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
84 from copy
import copy
, deepcopy
86 from uuid
import uuid4
88 from random
import randint
90 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
94 timeout_vca_on_error
= (
96 ) # Time for charm from first time at blocked,error status to mark as failed
97 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
98 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
99 timeout_charm_delete
= 10 * 60
100 timeout_primitive
= 30 * 60 # timeout for primitive execution
101 timeout_progress_primitive
= (
103 ) # timeout for some progress in a primitive execution
105 SUBOPERATION_STATUS_NOT_FOUND
= -1
106 SUBOPERATION_STATUS_NEW
= -2
107 SUBOPERATION_STATUS_SKIP
= -3
108 task_name_deploy_vca
= "Deploying VCA"
110 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
112 Init, Connect to database, filesystem storage, and messaging
113 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
116 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
118 self
.db
= Database().instance
.db
119 self
.fs
= Filesystem().instance
.fs
121 self
.lcm_tasks
= lcm_tasks
122 self
.timeout
= config
["timeout"]
123 self
.ro_config
= config
["ro_config"]
124 self
.ng_ro
= config
["ro_config"].get("ng")
125 self
.vca_config
= config
["VCA"].copy()
127 # create N2VC connector
128 self
.n2vc
= N2VCJujuConnector(
131 on_update_db
=self
._on
_update
_n
2vc
_db
,
136 self
.conn_helm_ee
= LCMHelmConn(
139 vca_config
=self
.vca_config
,
140 on_update_db
=self
._on
_update
_n
2vc
_db
,
143 self
.k8sclusterhelm2
= K8sHelmConnector(
144 kubectl_command
=self
.vca_config
.get("kubectlpath"),
145 helm_command
=self
.vca_config
.get("helmpath"),
152 self
.k8sclusterhelm3
= K8sHelm3Connector(
153 kubectl_command
=self
.vca_config
.get("kubectlpath"),
154 helm_command
=self
.vca_config
.get("helm3path"),
161 self
.k8sclusterjuju
= K8sJujuConnector(
162 kubectl_command
=self
.vca_config
.get("kubectlpath"),
163 juju_command
=self
.vca_config
.get("jujupath"),
166 on_update_db
=self
._on
_update
_k
8s
_db
,
171 self
.k8scluster_map
= {
172 "helm-chart": self
.k8sclusterhelm2
,
173 "helm-chart-v3": self
.k8sclusterhelm3
,
174 "chart": self
.k8sclusterhelm3
,
175 "juju-bundle": self
.k8sclusterjuju
,
176 "juju": self
.k8sclusterjuju
,
180 "lxc_proxy_charm": self
.n2vc
,
181 "native_charm": self
.n2vc
,
182 "k8s_proxy_charm": self
.n2vc
,
183 "helm": self
.conn_helm_ee
,
184 "helm-v3": self
.conn_helm_ee
,
187 self
.prometheus
= prometheus
190 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
193 def increment_ip_mac(ip_mac
, vm_index
=1):
194 if not isinstance(ip_mac
, str):
197 # try with ipv4 look for last dot
198 i
= ip_mac
.rfind(".")
201 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
202 # try with ipv6 or mac look for last colon. Operate in hex
203 i
= ip_mac
.rfind(":")
206 # format in hex, len can be 2 for mac or 4 for ipv6
207 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
208 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
214 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
216 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
219 # TODO filter RO descriptor fields...
223 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
224 db_dict
["deploymentStatus"] = ro_descriptor
225 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
227 except Exception as e
:
229 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
232 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
234 # remove last dot from path (if exists)
235 if path
.endswith("."):
238 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
239 # .format(table, filter, path, updated_data))
242 nsr_id
= filter.get("_id")
244 # read ns record from database
245 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
246 current_ns_status
= nsr
.get("nsState")
248 # get vca status for NS
249 status_dict
= await self
.n2vc
.get_status(
250 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
255 db_dict
["vcaStatus"] = status_dict
256 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
258 # update configurationStatus for this VCA
260 vca_index
= int(path
[path
.rfind(".") + 1 :])
263 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
265 vca_status
= vca_list
[vca_index
].get("status")
267 configuration_status_list
= nsr
.get("configurationStatus")
268 config_status
= configuration_status_list
[vca_index
].get("status")
270 if config_status
== "BROKEN" and vca_status
!= "failed":
271 db_dict
["configurationStatus"][vca_index
] = "READY"
272 elif config_status
!= "BROKEN" and vca_status
== "failed":
273 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
274 except Exception as e
:
275 # not update configurationStatus
276 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
278 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
279 # if nsState = 'DEGRADED' check if all is OK
281 if current_ns_status
in ("READY", "DEGRADED"):
282 error_description
= ""
284 if status_dict
.get("machines"):
285 for machine_id
in status_dict
.get("machines"):
286 machine
= status_dict
.get("machines").get(machine_id
)
287 # check machine agent-status
288 if machine
.get("agent-status"):
289 s
= machine
.get("agent-status").get("status")
292 error_description
+= (
293 "machine {} agent-status={} ; ".format(
297 # check machine instance status
298 if machine
.get("instance-status"):
299 s
= machine
.get("instance-status").get("status")
302 error_description
+= (
303 "machine {} instance-status={} ; ".format(
308 if status_dict
.get("applications"):
309 for app_id
in status_dict
.get("applications"):
310 app
= status_dict
.get("applications").get(app_id
)
311 # check application status
312 if app
.get("status"):
313 s
= app
.get("status").get("status")
316 error_description
+= (
317 "application {} status={} ; ".format(app_id
, s
)
320 if error_description
:
321 db_dict
["errorDescription"] = error_description
322 if current_ns_status
== "READY" and is_degraded
:
323 db_dict
["nsState"] = "DEGRADED"
324 if current_ns_status
== "DEGRADED" and not is_degraded
:
325 db_dict
["nsState"] = "READY"
328 self
.update_db_2("nsrs", nsr_id
, db_dict
)
330 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
332 except Exception as e
:
333 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
335 async def _on_update_k8s_db(
336 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
339 Updating vca status in NSR record
340 :param cluster_uuid: UUID of a k8s cluster
341 :param kdu_instance: The unique name of the KDU instance
342 :param filter: To get nsr_id
343 :cluster_type: The cluster type (juju, k8s)
347 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
348 # .format(cluster_uuid, kdu_instance, filter))
350 nsr_id
= filter.get("_id")
352 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
353 cluster_uuid
=cluster_uuid
,
354 kdu_instance
=kdu_instance
,
356 complete_status
=True,
362 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
364 if cluster_type
in ("juju-bundle", "juju"):
365 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
366 # status in a similar way between Juju Bundles and Helm Charts on this side
367 await self
.k8sclusterjuju
.update_vca_status(
368 db_dict
["vcaStatus"],
374 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
378 self
.update_db_2("nsrs", nsr_id
, db_dict
)
379 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
381 except Exception as e
:
382 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
385 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
387 env
= Environment(undefined
=StrictUndefined
)
388 template
= env
.from_string(cloud_init_text
)
389 return template
.render(additional_params
or {})
390 except UndefinedError
as e
:
392 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
393 "file, must be provided in the instantiation parameters inside the "
394 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
396 except (TemplateError
, TemplateNotFound
) as e
:
398 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
403 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
404 cloud_init_content
= cloud_init_file
= None
406 if vdu
.get("cloud-init-file"):
407 base_folder
= vnfd
["_admin"]["storage"]
408 cloud_init_file
= "{}/{}/cloud_init/{}".format(
409 base_folder
["folder"],
410 base_folder
["pkg-dir"],
411 vdu
["cloud-init-file"],
413 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
414 cloud_init_content
= ci_file
.read()
415 elif vdu
.get("cloud-init"):
416 cloud_init_content
= vdu
["cloud-init"]
418 return cloud_init_content
419 except FsException
as e
:
421 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
422 vnfd
["id"], vdu
["id"], cloud_init_file
, e
426 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
428 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
431 additional_params
= vdur
.get("additionalParams")
432 return parse_yaml_strings(additional_params
)
434 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
436 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
437 :param vnfd: input vnfd
438 :param new_id: overrides vnf id if provided
439 :param additionalParams: Instantiation params for VNFs provided
440 :param nsrId: Id of the NSR
441 :return: copy of vnfd
443 vnfd_RO
= deepcopy(vnfd
)
444 # remove unused by RO configuration, monitoring, scaling and internal keys
445 vnfd_RO
.pop("_id", None)
446 vnfd_RO
.pop("_admin", None)
447 vnfd_RO
.pop("monitoring-param", None)
448 vnfd_RO
.pop("scaling-group-descriptor", None)
449 vnfd_RO
.pop("kdu", None)
450 vnfd_RO
.pop("k8s-cluster", None)
452 vnfd_RO
["id"] = new_id
454 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
455 for vdu
in get_iterable(vnfd_RO
, "vdu"):
456 vdu
.pop("cloud-init-file", None)
457 vdu
.pop("cloud-init", None)
461 def ip_profile_2_RO(ip_profile
):
462 RO_ip_profile
= deepcopy(ip_profile
)
463 if "dns-server" in RO_ip_profile
:
464 if isinstance(RO_ip_profile
["dns-server"], list):
465 RO_ip_profile
["dns-address"] = []
466 for ds
in RO_ip_profile
.pop("dns-server"):
467 RO_ip_profile
["dns-address"].append(ds
["address"])
469 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
470 if RO_ip_profile
.get("ip-version") == "ipv4":
471 RO_ip_profile
["ip-version"] = "IPv4"
472 if RO_ip_profile
.get("ip-version") == "ipv6":
473 RO_ip_profile
["ip-version"] = "IPv6"
474 if "dhcp-params" in RO_ip_profile
:
475 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
478 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
479 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
480 if db_vim
["_admin"]["operationalState"] != "ENABLED":
482 "VIM={} is not available. operationalState={}".format(
483 vim_account
, db_vim
["_admin"]["operationalState"]
486 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
489 def get_ro_wim_id_for_wim_account(self
, wim_account
):
490 if isinstance(wim_account
, str):
491 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
492 if db_wim
["_admin"]["operationalState"] != "ENABLED":
494 "WIM={} is not available. operationalState={}".format(
495 wim_account
, db_wim
["_admin"]["operationalState"]
498 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
503 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
505 db_vdu_push_list
= []
507 db_update
= {"_admin.modified": time()}
509 for vdu_id
, vdu_count
in vdu_create
.items():
513 for vdur
in reversed(db_vnfr
["vdur"])
514 if vdur
["vdu-id-ref"] == vdu_id
519 # Read the template saved in the db:
520 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
521 vdur_template
= db_vnfr
.get("vdur-template")
522 if not vdur_template
:
524 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
528 vdur
= vdur_template
[0]
529 #Delete a template from the database after using it
530 self
.db
.set_one("vnfrs",
531 {"_id": db_vnfr
["_id"]},
533 pull
={"vdur-template": {"_id": vdur
['_id']}}
535 for count
in range(vdu_count
):
536 vdur_copy
= deepcopy(vdur
)
537 vdur_copy
["status"] = "BUILD"
538 vdur_copy
["status-detailed"] = None
539 vdur_copy
["ip-address"] = None
540 vdur_copy
["_id"] = str(uuid4())
541 vdur_copy
["count-index"] += count
+ 1
542 vdur_copy
["id"] = "{}-{}".format(
543 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
545 vdur_copy
.pop("vim_info", None)
546 for iface
in vdur_copy
["interfaces"]:
547 if iface
.get("fixed-ip"):
548 iface
["ip-address"] = self
.increment_ip_mac(
549 iface
["ip-address"], count
+ 1
552 iface
.pop("ip-address", None)
553 if iface
.get("fixed-mac"):
554 iface
["mac-address"] = self
.increment_ip_mac(
555 iface
["mac-address"], count
+ 1
558 iface
.pop("mac-address", None)
562 ) # only first vdu can be managment of vnf
563 db_vdu_push_list
.append(vdur_copy
)
564 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
566 if len(db_vnfr
["vdur"]) == 1:
567 # The scale will move to 0 instances
568 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
569 template_vdur
= [db_vnfr
["vdur"][0]]
570 for vdu_id
, vdu_count
in vdu_delete
.items():
572 indexes_to_delete
= [
574 for iv
in enumerate(db_vnfr
["vdur"])
575 if iv
[1]["vdu-id-ref"] == vdu_id
579 "vdur.{}.status".format(i
): "DELETING"
580 for i
in indexes_to_delete
[-vdu_count
:]
584 # it must be deleted one by one because common.db does not allow otherwise
587 for v
in reversed(db_vnfr
["vdur"])
588 if v
["vdu-id-ref"] == vdu_id
590 for vdu
in vdus_to_delete
[:vdu_count
]:
593 {"_id": db_vnfr
["_id"]},
595 pull
={"vdur": {"_id": vdu
["_id"]}},
599 db_push
["vdur"] = db_vdu_push_list
601 db_push
["vdur-template"] = template_vdur
604 db_vnfr
["vdur-template"] = template_vdur
605 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
606 # modify passed dictionary db_vnfr
607 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
608 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
610 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
612 Updates database nsr with the RO info for the created vld
613 :param ns_update_nsr: dictionary to be filled with the updated info
614 :param db_nsr: content of db_nsr. This is also modified
615 :param nsr_desc_RO: nsr descriptor from RO
616 :return: Nothing, LcmException is raised on errors
619 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
620 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
621 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
623 vld
["vim-id"] = net_RO
.get("vim_net_id")
624 vld
["name"] = net_RO
.get("vim_name")
625 vld
["status"] = net_RO
.get("status")
626 vld
["status-detailed"] = net_RO
.get("error_msg")
627 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
631 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
634 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
636 for db_vnfr
in db_vnfrs
.values():
637 vnfr_update
= {"status": "ERROR"}
638 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
639 if "status" not in vdur
:
640 vdur
["status"] = "ERROR"
641 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
643 vdur
["status-detailed"] = str(error_text
)
645 "vdur.{}.status-detailed".format(vdu_index
)
647 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
648 except DbException
as e
:
649 self
.logger
.error("Cannot update vnf. {}".format(e
))
651 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
653 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
654 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
655 :param nsr_desc_RO: nsr descriptor from RO
656 :return: Nothing, LcmException is raised on errors
658 for vnf_index
, db_vnfr
in db_vnfrs
.items():
659 for vnf_RO
in nsr_desc_RO
["vnfs"]:
660 if vnf_RO
["member_vnf_index"] != vnf_index
:
663 if vnf_RO
.get("ip_address"):
664 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
667 elif not db_vnfr
.get("ip-address"):
668 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
669 raise LcmExceptionNoMgmtIP(
670 "ns member_vnf_index '{}' has no IP address".format(
675 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
676 vdur_RO_count_index
= 0
677 if vdur
.get("pdu-type"):
679 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
680 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
682 if vdur
["count-index"] != vdur_RO_count_index
:
683 vdur_RO_count_index
+= 1
685 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
686 if vdur_RO
.get("ip_address"):
687 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
689 vdur
["ip-address"] = None
690 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
691 vdur
["name"] = vdur_RO
.get("vim_name")
692 vdur
["status"] = vdur_RO
.get("status")
693 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
694 for ifacer
in get_iterable(vdur
, "interfaces"):
695 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
696 if ifacer
["name"] == interface_RO
.get("internal_name"):
697 ifacer
["ip-address"] = interface_RO
.get(
700 ifacer
["mac-address"] = interface_RO
.get(
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
707 "from VIM info".format(
708 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
711 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
715 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
717 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
721 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
722 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
723 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
725 vld
["vim-id"] = net_RO
.get("vim_net_id")
726 vld
["name"] = net_RO
.get("vim_name")
727 vld
["status"] = net_RO
.get("status")
728 vld
["status-detailed"] = net_RO
.get("error_msg")
729 vnfr_update
["vld.{}".format(vld_index
)] = vld
733 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
738 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
743 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
748 def _get_ns_config_info(self
, nsr_id
):
750 Generates a mapping between vnf,vdu elements and the N2VC id
751 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
752 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
753 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
754 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
756 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
757 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
759 ns_config_info
= {"osm-config-mapping": mapping
}
760 for vca
in vca_deployed_list
:
761 if not vca
["member-vnf-index"]:
763 if not vca
["vdu_id"]:
764 mapping
[vca
["member-vnf-index"]] = vca
["application"]
768 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
770 ] = vca
["application"]
771 return ns_config_info
773 async def _instantiate_ng_ro(
790 def get_vim_account(vim_account_id
):
792 if vim_account_id
in db_vims
:
793 return db_vims
[vim_account_id
]
794 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
795 db_vims
[vim_account_id
] = db_vim
798 # modify target_vld info with instantiation parameters
799 def parse_vld_instantiation_params(
800 target_vim
, target_vld
, vld_params
, target_sdn
802 if vld_params
.get("ip-profile"):
803 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
806 if vld_params
.get("provider-network"):
807 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
810 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
811 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
815 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
816 # if wim_account_id is specified in vld_params, validate if it is feasible.
817 wim_account_id
, db_wim
= select_feasible_wim_account(
818 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
822 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
823 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
824 # update vld_params with correct WIM account Id
825 vld_params
["wimAccountId"] = wim_account_id
827 target_wim
= "wim:{}".format(wim_account_id
)
828 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
829 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
830 if len(sdn_ports
) > 0:
831 target_vld
["vim_info"][target_wim
] = target_wim_attrs
832 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
835 "Target VLD with WIM data: {:s}".format(str(target_vld
))
838 for param
in ("vim-network-name", "vim-network-id"):
839 if vld_params
.get(param
):
840 if isinstance(vld_params
[param
], dict):
841 for vim
, vim_net
in vld_params
[param
].items():
842 other_target_vim
= "vim:" + vim
844 target_vld
["vim_info"],
845 (other_target_vim
, param
.replace("-", "_")),
848 else: # isinstance str
849 target_vld
["vim_info"][target_vim
][
850 param
.replace("-", "_")
851 ] = vld_params
[param
]
852 if vld_params
.get("common_id"):
853 target_vld
["common_id"] = vld_params
.get("common_id")
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target
, ns_params
):
857 for vnf_params
in ns_params
.get("vnf", ()):
858 if vnf_params
.get("vimAccountId"):
862 for vnfr
in db_vnfrs
.values()
863 if vnf_params
["member-vnf-index"]
864 == vnfr
["member-vnf-index-ref"]
868 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
869 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
870 target_vld
= find_in_list(
871 get_iterable(vdur
, "interfaces"),
872 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
875 if vnf_params
.get("vimAccountId") not in a_vld
.get(
878 target
["ns"]["vld"][a_index
].get("vim_info").update(
880 "vim:{}".format(vnf_params
["vimAccountId"]): {
881 "vim_network_name": ""
886 nslcmop_id
= db_nslcmop
["_id"]
888 "name": db_nsr
["name"],
891 "image": deepcopy(db_nsr
["image"]),
892 "flavor": deepcopy(db_nsr
["flavor"]),
893 "action_id": nslcmop_id
,
894 "cloud_init_content": {},
896 for image
in target
["image"]:
897 image
["vim_info"] = {}
898 for flavor
in target
["flavor"]:
899 flavor
["vim_info"] = {}
900 if db_nsr
.get("affinity-or-anti-affinity-group"):
901 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group
["vim_info"] = {}
905 if db_nslcmop
.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate
= self
.db
.get_list(
910 "nsInstanceId": db_nslcmop
["nsInstanceId"],
911 "lcmOperationType": "instantiate",
914 ns_params
= db_nslcmop_instantiate
.get("operationParams")
916 ns_params
= db_nslcmop
.get("operationParams")
917 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
918 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
921 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
922 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
926 "mgmt-network": vld
.get("mgmt-network", False),
927 "type": vld
.get("type"),
930 "vim_network_name": vld
.get("vim-network-name"),
931 "vim_account_id": ns_params
["vimAccountId"],
935 # check if this network needs SDN assist
936 if vld
.get("pci-interfaces"):
937 db_vim
= get_vim_account(ns_params
["vimAccountId"])
938 sdnc_id
= db_vim
["config"].get("sdn-controller")
940 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
941 target_sdn
= "sdn:{}".format(sdnc_id
)
942 target_vld
["vim_info"][target_sdn
] = {
944 "target_vim": target_vim
,
946 "type": vld
.get("type"),
949 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
950 for nsd_vnf_profile
in nsd_vnf_profiles
:
951 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
952 if cp
["virtual-link-profile-id"] == vld
["id"]:
954 "member_vnf:{}.{}".format(
955 cp
["constituent-cpd-id"][0][
956 "constituent-base-element-id"
958 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
960 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
962 # check at nsd descriptor, if there is an ip-profile
964 nsd_vlp
= find_in_list(
965 get_virtual_link_profiles(nsd
),
966 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
971 and nsd_vlp
.get("virtual-link-protocol-data")
972 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
974 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
977 ip_profile_dest_data
= {}
978 if "ip-version" in ip_profile_source_data
:
979 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
982 if "cidr" in ip_profile_source_data
:
983 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
986 if "gateway-ip" in ip_profile_source_data
:
987 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
990 if "dhcp-enabled" in ip_profile_source_data
:
991 ip_profile_dest_data
["dhcp-params"] = {
992 "enabled": ip_profile_source_data
["dhcp-enabled"]
994 vld_params
["ip-profile"] = ip_profile_dest_data
996 # update vld_params with instantiation params
997 vld_instantiation_params
= find_in_list(
998 get_iterable(ns_params
, "vld"),
999 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1001 if vld_instantiation_params
:
1002 vld_params
.update(vld_instantiation_params
)
1003 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1004 target
["ns"]["vld"].append(target_vld
)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target
, ns_params
)
1008 for vnfr
in db_vnfrs
.values():
1009 vnfd
= find_in_list(
1010 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1012 vnf_params
= find_in_list(
1013 get_iterable(ns_params
, "vnf"),
1014 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1016 target_vnf
= deepcopy(vnfr
)
1017 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1018 for vld
in target_vnf
.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp
= find_in_list(
1021 vnfd
.get("int-virtual-link-desc", ()),
1022 lambda cpd
: cpd
.get("id") == vld
["id"],
1025 ns_cp
= "member_vnf:{}.{}".format(
1026 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1028 if cp2target
.get(ns_cp
):
1029 vld
["target"] = cp2target
[ns_cp
]
1032 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1034 # check if this network needs SDN assist
1036 if vld
.get("pci-interfaces"):
1037 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1038 sdnc_id
= db_vim
["config"].get("sdn-controller")
1040 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1041 target_sdn
= "sdn:{}".format(sdnc_id
)
1042 vld
["vim_info"][target_sdn
] = {
1044 "target_vim": target_vim
,
1046 "type": vld
.get("type"),
1049 # check at vnfd descriptor, if there is an ip-profile
1051 vnfd_vlp
= find_in_list(
1052 get_virtual_link_profiles(vnfd
),
1053 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1057 and vnfd_vlp
.get("virtual-link-protocol-data")
1058 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1060 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1063 ip_profile_dest_data
= {}
1064 if "ip-version" in ip_profile_source_data
:
1065 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1068 if "cidr" in ip_profile_source_data
:
1069 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1072 if "gateway-ip" in ip_profile_source_data
:
1073 ip_profile_dest_data
[
1075 ] = ip_profile_source_data
["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data
:
1077 ip_profile_dest_data
["dhcp-params"] = {
1078 "enabled": ip_profile_source_data
["dhcp-enabled"]
1081 vld_params
["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1084 vld_instantiation_params
= find_in_list(
1085 get_iterable(vnf_params
, "internal-vld"),
1086 lambda i_vld
: i_vld
["name"] == vld
["id"],
1088 if vld_instantiation_params
:
1089 vld_params
.update(vld_instantiation_params
)
1090 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1093 for vdur
in target_vnf
.get("vdur", ()):
1094 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1098 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1101 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1102 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1105 and vdu_configuration
.get("config-access")
1106 and vdu_configuration
.get("config-access").get("ssh-access")
1108 vdur
["ssh-keys"] = ssh_keys_all
1109 vdur
["ssh-access-required"] = vdu_configuration
[
1111 ]["ssh-access"]["required"]
1114 and vnf_configuration
.get("config-access")
1115 and vnf_configuration
.get("config-access").get("ssh-access")
1116 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1118 vdur
["ssh-keys"] = ssh_keys_all
1119 vdur
["ssh-access-required"] = vnf_configuration
[
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation
and find_in_list(
1123 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1125 vdur
["ssh-keys"] = ssh_keys_instantiation
1127 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1129 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1131 if vdud
.get("cloud-init-file"):
1132 vdur
["cloud-init"] = "{}:file:{}".format(
1133 vnfd
["_id"], vdud
.get("cloud-init-file")
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1137 base_folder
= vnfd
["_admin"]["storage"]
1138 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1139 base_folder
["folder"],
1140 base_folder
["pkg-dir"],
1141 vdud
.get("cloud-init-file"),
1143 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1144 target
["cloud_init_content"][
1147 elif vdud
.get("cloud-init"):
1148 vdur
["cloud-init"] = "{}:vdu:{}".format(
1149 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1151 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1152 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1155 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1156 deploy_params_vdu
= self
._format
_additional
_params
(
1157 vdur
.get("additionalParams") or {}
1159 deploy_params_vdu
["OSM"] = get_osm_params(
1160 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1162 vdur
["additionalParams"] = deploy_params_vdu
1165 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1166 if target_vim
not in ns_flavor
["vim_info"]:
1167 ns_flavor
["vim_info"][target_vim
] = {}
1170 # in case alternative images are provided we must check if they should be applied
1171 # for the vim_type, modify the vim_type taking into account
1172 ns_image_id
= int(vdur
["ns-image-id"])
1173 if vdur
.get("alt-image-ids"):
1174 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1175 vim_type
= db_vim
["vim_type"]
1176 for alt_image_id
in vdur
.get("alt-image-ids"):
1177 ns_alt_image
= target
["image"][int(alt_image_id
)]
1178 if vim_type
== ns_alt_image
.get("vim-type"):
1179 # must use alternative image
1181 "use alternative image id: {}".format(alt_image_id
)
1183 ns_image_id
= alt_image_id
1184 vdur
["ns-image-id"] = ns_image_id
1186 ns_image
= target
["image"][int(ns_image_id
)]
1187 if target_vim
not in ns_image
["vim_info"]:
1188 ns_image
["vim_info"][target_vim
] = {}
1191 if vdur
.get("affinity-or-anti-affinity-group-id"):
1192 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1193 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1194 if target_vim
not in ns_ags
["vim_info"]:
1195 ns_ags
["vim_info"][target_vim
] = {}
1197 vdur
["vim_info"] = {target_vim
: {}}
1198 # instantiation parameters
1200 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1201 # vdud["id"]), None)
1202 vdur_list
.append(vdur
)
1203 target_vnf
["vdur"] = vdur_list
1204 target
["vnf"].append(target_vnf
)
1206 desc
= await self
.RO
.deploy(nsr_id
, target
)
1207 self
.logger
.debug("RO return > {}".format(desc
))
1208 action_id
= desc
["action_id"]
1209 await self
._wait
_ng
_ro
(
1210 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1215 "_admin.deployed.RO.operational-status": "running",
1216 "detailed-status": " ".join(stage
),
1218 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1219 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1220 self
._write
_op
_status
(nslcmop_id
, stage
)
1222 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1226 async def _wait_ng_ro(
1235 detailed_status_old
= None
1237 start_time
= start_time
or time()
1238 while time() <= start_time
+ timeout
:
1239 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1240 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1241 if desc_status
["status"] == "FAILED":
1242 raise NgRoException(desc_status
["details"])
1243 elif desc_status
["status"] == "BUILD":
1245 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1246 elif desc_status
["status"] == "DONE":
1248 stage
[2] = "Deployed at VIM"
1251 assert False, "ROclient.check_ns_status returns unknown {}".format(
1252 desc_status
["status"]
1254 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1255 detailed_status_old
= stage
[2]
1256 db_nsr_update
["detailed-status"] = " ".join(stage
)
1257 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1258 self
._write
_op
_status
(nslcmop_id
, stage
)
1259 await asyncio
.sleep(15, loop
=self
.loop
)
1260 else: # timeout_ns_deploy
1261 raise NgRoException("Timeout waiting ns to deploy")
1263 async def _terminate_ng_ro(
1264 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1269 start_deploy
= time()
1276 "action_id": nslcmop_id
,
1278 desc
= await self
.RO
.deploy(nsr_id
, target
)
1279 action_id
= desc
["action_id"]
1280 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1281 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1284 + "ns terminate action at RO. action_id={}".format(action_id
)
1288 delete_timeout
= 20 * 60 # 20 minutes
1289 await self
._wait
_ng
_ro
(
1290 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1293 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1294 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1296 await self
.RO
.delete(nsr_id
)
1297 except Exception as e
:
1298 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1299 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1300 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1303 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1305 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1306 failed_detail
.append("delete conflict: {}".format(e
))
1309 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1312 failed_detail
.append("delete error: {}".format(e
))
1315 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1319 stage
[2] = "Error deleting from VIM"
1321 stage
[2] = "Deleted from VIM"
1322 db_nsr_update
["detailed-status"] = " ".join(stage
)
1323 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1324 self
._write
_op
_status
(nslcmop_id
, stage
)
1327 raise LcmException("; ".join(failed_detail
))
1330 async def instantiate_RO(
1344 :param logging_text: preffix text to use at logging
1345 :param nsr_id: nsr identity
1346 :param nsd: database content of ns descriptor
1347 :param db_nsr: database content of ns record
1348 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1350 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1351 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1352 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1353 :return: None or exception
1356 start_deploy
= time()
1357 ns_params
= db_nslcmop
.get("operationParams")
1358 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1359 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1361 timeout_ns_deploy
= self
.timeout
.get(
1362 "ns_deploy", self
.timeout_ns_deploy
1365 # Check for and optionally request placement optimization. Database will be updated if placement activated
1366 stage
[2] = "Waiting for Placement."
1367 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1368 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1369 for vnfr
in db_vnfrs
.values():
1370 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1373 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1375 return await self
._instantiate
_ng
_ro
(
1388 except Exception as e
:
1389 stage
[2] = "ERROR deploying at VIM"
1390 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1392 "Error deploying at VIM {}".format(e
),
1393 exc_info
=not isinstance(
1396 ROclient
.ROClientException
,
1405 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1407 Wait for kdu to be up, get ip address
1408 :param logging_text: prefix use for logging
1415 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1418 while nb_tries
< 360:
1419 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1423 for x
in get_iterable(db_vnfr
, "kdur")
1424 if x
.get("kdu-name") == kdu_name
1430 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1432 if kdur
.get("status"):
1433 if kdur
["status"] in ("READY", "ENABLED"):
1434 return kdur
.get("ip-address")
1437 "target KDU={} is in error state".format(kdu_name
)
1440 await asyncio
.sleep(10, loop
=self
.loop
)
1442 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1444 async def wait_vm_up_insert_key_ro(
1445 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1448 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1449 :param logging_text: prefix use for logging
1454 :param pub_key: public ssh key to inject, None to skip
1455 :param user: user to apply the public ssh key
1459 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1463 target_vdu_id
= None
1469 if ro_retries
>= 360: # 1 hour
1471 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1474 await asyncio
.sleep(10, loop
=self
.loop
)
1477 if not target_vdu_id
:
1478 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1480 if not vdu_id
: # for the VNF case
1481 if db_vnfr
.get("status") == "ERROR":
1483 "Cannot inject ssh-key because target VNF is in error state"
1485 ip_address
= db_vnfr
.get("ip-address")
1491 for x
in get_iterable(db_vnfr
, "vdur")
1492 if x
.get("ip-address") == ip_address
1500 for x
in get_iterable(db_vnfr
, "vdur")
1501 if x
.get("vdu-id-ref") == vdu_id
1502 and x
.get("count-index") == vdu_index
1508 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1509 ): # If only one, this should be the target vdu
1510 vdur
= db_vnfr
["vdur"][0]
1513 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1514 vnfr_id
, vdu_id
, vdu_index
1517 # New generation RO stores information at "vim_info"
1520 if vdur
.get("vim_info"):
1522 t
for t
in vdur
["vim_info"]
1523 ) # there should be only one key
1524 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1526 vdur
.get("pdu-type")
1527 or vdur
.get("status") == "ACTIVE"
1528 or ng_ro_status
== "ACTIVE"
1530 ip_address
= vdur
.get("ip-address")
1533 target_vdu_id
= vdur
["vdu-id-ref"]
1534 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1536 "Cannot inject ssh-key because target VM is in error state"
1539 if not target_vdu_id
:
1542 # inject public key into machine
1543 if pub_key
and user
:
1544 self
.logger
.debug(logging_text
+ "Inserting RO key")
1545 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1546 if vdur
.get("pdu-type"):
1547 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1550 ro_vm_id
= "{}-{}".format(
1551 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1552 ) # TODO add vdu_index
1556 "action": "inject_ssh_key",
1560 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1562 desc
= await self
.RO
.deploy(nsr_id
, target
)
1563 action_id
= desc
["action_id"]
1564 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1567 # wait until NS is deployed at RO
1569 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1570 ro_nsr_id
= deep_get(
1571 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1575 result_dict
= await self
.RO
.create_action(
1577 item_id_name
=ro_nsr_id
,
1579 "add_public_key": pub_key
,
1584 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1585 if not result_dict
or not isinstance(result_dict
, dict):
1587 "Unknown response from RO when injecting key"
1589 for result
in result_dict
.values():
1590 if result
.get("vim_result") == 200:
1593 raise ROclient
.ROClientException(
1594 "error injecting key: {}".format(
1595 result
.get("description")
1599 except NgRoException
as e
:
1601 "Reaching max tries injecting key. Error: {}".format(e
)
1603 except ROclient
.ROClientException
as e
:
1607 + "error injecting key: {}. Retrying until {} seconds".format(
1614 "Reaching max tries injecting key. Error: {}".format(e
)
1621 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1623 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1625 my_vca
= vca_deployed_list
[vca_index
]
1626 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1627 # vdu or kdu: no dependencies
1631 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1632 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1633 configuration_status_list
= db_nsr
["configurationStatus"]
1634 for index
, vca_deployed
in enumerate(configuration_status_list
):
1635 if index
== vca_index
:
1638 if not my_vca
.get("member-vnf-index") or (
1639 vca_deployed
.get("member-vnf-index")
1640 == my_vca
.get("member-vnf-index")
1642 internal_status
= configuration_status_list
[index
].get("status")
1643 if internal_status
== "READY":
1645 elif internal_status
== "BROKEN":
1647 "Configuration aborted because dependent charm/s has failed"
1652 # no dependencies, return
1654 await asyncio
.sleep(10)
1657 raise LcmException("Configuration aborted because dependent charm/s timeout")
1659 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1662 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1664 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1665 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1668 async def instantiate_N2VC(
1685 ee_config_descriptor
,
1687 nsr_id
= db_nsr
["_id"]
1688 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1689 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1690 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1691 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1693 "collection": "nsrs",
1694 "filter": {"_id": nsr_id
},
1695 "path": db_update_entry
,
1701 element_under_configuration
= nsr_id
1705 vnfr_id
= db_vnfr
["_id"]
1706 osm_config
["osm"]["vnf_id"] = vnfr_id
1708 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1710 if vca_type
== "native_charm":
1713 index_number
= vdu_index
or 0
1716 element_type
= "VNF"
1717 element_under_configuration
= vnfr_id
1718 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1720 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1721 element_type
= "VDU"
1722 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1723 osm_config
["osm"]["vdu_id"] = vdu_id
1725 namespace
+= ".{}".format(kdu_name
)
1726 element_type
= "KDU"
1727 element_under_configuration
= kdu_name
1728 osm_config
["osm"]["kdu_name"] = kdu_name
1731 artifact_path
= "{}/{}/{}/{}".format(
1732 base_folder
["folder"],
1733 base_folder
["pkg-dir"],
1735 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1740 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1742 # get initial_config_primitive_list that applies to this element
1743 initial_config_primitive_list
= config_descriptor
.get(
1744 "initial-config-primitive"
1748 "Initial config primitive list > {}".format(
1749 initial_config_primitive_list
1753 # add config if not present for NS charm
1754 ee_descriptor_id
= ee_config_descriptor
.get("id")
1755 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1756 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1757 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1761 "Initial config primitive list #2 > {}".format(
1762 initial_config_primitive_list
1765 # n2vc_redesign STEP 3.1
1766 # find old ee_id if exists
1767 ee_id
= vca_deployed
.get("ee_id")
1769 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1770 # create or register execution environment in VCA
1771 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1773 self
._write
_configuration
_status
(
1775 vca_index
=vca_index
,
1777 element_under_configuration
=element_under_configuration
,
1778 element_type
=element_type
,
1781 step
= "create execution environment"
1782 self
.logger
.debug(logging_text
+ step
)
1786 if vca_type
== "k8s_proxy_charm":
1787 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1788 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1789 namespace
=namespace
,
1790 artifact_path
=artifact_path
,
1794 elif vca_type
== "helm" or vca_type
== "helm-v3":
1795 ee_id
, credentials
= await self
.vca_map
[
1797 ].create_execution_environment(
1798 namespace
=namespace
,
1802 artifact_path
=artifact_path
,
1806 ee_id
, credentials
= await self
.vca_map
[
1808 ].create_execution_environment(
1809 namespace
=namespace
,
1815 elif vca_type
== "native_charm":
1816 step
= "Waiting to VM being up and getting IP address"
1817 self
.logger
.debug(logging_text
+ step
)
1818 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1827 credentials
= {"hostname": rw_mgmt_ip
}
1829 username
= deep_get(
1830 config_descriptor
, ("config-access", "ssh-access", "default-user")
1832 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1833 # merged. Meanwhile let's get username from initial-config-primitive
1834 if not username
and initial_config_primitive_list
:
1835 for config_primitive
in initial_config_primitive_list
:
1836 for param
in config_primitive
.get("parameter", ()):
1837 if param
["name"] == "ssh-username":
1838 username
= param
["value"]
1842 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1843 "'config-access.ssh-access.default-user'"
1845 credentials
["username"] = username
1846 # n2vc_redesign STEP 3.2
1848 self
._write
_configuration
_status
(
1850 vca_index
=vca_index
,
1851 status
="REGISTERING",
1852 element_under_configuration
=element_under_configuration
,
1853 element_type
=element_type
,
1856 step
= "register execution environment {}".format(credentials
)
1857 self
.logger
.debug(logging_text
+ step
)
1858 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1859 credentials
=credentials
,
1860 namespace
=namespace
,
1865 # for compatibility with MON/POL modules, the need model and application name at database
1866 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1867 ee_id_parts
= ee_id
.split(".")
1868 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1869 if len(ee_id_parts
) >= 2:
1870 model_name
= ee_id_parts
[0]
1871 application_name
= ee_id_parts
[1]
1872 db_nsr_update
[db_update_entry
+ "model"] = model_name
1873 db_nsr_update
[db_update_entry
+ "application"] = application_name
1875 # n2vc_redesign STEP 3.3
1876 step
= "Install configuration Software"
1878 self
._write
_configuration
_status
(
1880 vca_index
=vca_index
,
1881 status
="INSTALLING SW",
1882 element_under_configuration
=element_under_configuration
,
1883 element_type
=element_type
,
1884 other_update
=db_nsr_update
,
1887 # TODO check if already done
1888 self
.logger
.debug(logging_text
+ step
)
1890 if vca_type
== "native_charm":
1891 config_primitive
= next(
1892 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1895 if config_primitive
:
1896 config
= self
._map
_primitive
_params
(
1897 config_primitive
, {}, deploy_params
1900 if vca_type
== "lxc_proxy_charm":
1901 if element_type
== "NS":
1902 num_units
= db_nsr
.get("config-units") or 1
1903 elif element_type
== "VNF":
1904 num_units
= db_vnfr
.get("config-units") or 1
1905 elif element_type
== "VDU":
1906 for v
in db_vnfr
["vdur"]:
1907 if vdu_id
== v
["vdu-id-ref"]:
1908 num_units
= v
.get("config-units") or 1
1910 if vca_type
!= "k8s_proxy_charm":
1911 await self
.vca_map
[vca_type
].install_configuration_sw(
1913 artifact_path
=artifact_path
,
1916 num_units
=num_units
,
1921 # write in db flag of configuration_sw already installed
1923 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1926 # add relations for this VCA (wait for other peers related with this VCA)
1927 await self
._add
_vca
_relations
(
1928 logging_text
=logging_text
,
1930 vca_index
=vca_index
,
1935 # if SSH access is required, then get execution environment SSH public
1936 # if native charm we have waited already to VM be UP
1937 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1940 # self.logger.debug("get ssh key block")
1942 config_descriptor
, ("config-access", "ssh-access", "required")
1944 # self.logger.debug("ssh key needed")
1945 # Needed to inject a ssh key
1948 ("config-access", "ssh-access", "default-user"),
1950 step
= "Install configuration Software, getting public ssh key"
1951 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1952 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1955 step
= "Insert public key into VM user={} ssh_key={}".format(
1959 # self.logger.debug("no need to get ssh key")
1960 step
= "Waiting to VM being up and getting IP address"
1961 self
.logger
.debug(logging_text
+ step
)
1963 # n2vc_redesign STEP 5.1
1964 # wait for RO (ip-address) Insert pub_key into VM
1967 rw_mgmt_ip
= await self
.wait_kdu_up(
1968 logging_text
, nsr_id
, vnfr_id
, kdu_name
1971 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1981 rw_mgmt_ip
= None # This is for a NS configuration
1983 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1985 # store rw_mgmt_ip in deploy params for later replacement
1986 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1988 # n2vc_redesign STEP 6 Execute initial config primitive
1989 step
= "execute initial config primitive"
1991 # wait for dependent primitives execution (NS -> VNF -> VDU)
1992 if initial_config_primitive_list
:
1993 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1995 # stage, in function of element type: vdu, kdu, vnf or ns
1996 my_vca
= vca_deployed_list
[vca_index
]
1997 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1999 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2000 elif my_vca
.get("member-vnf-index"):
2002 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2005 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2007 self
._write
_configuration
_status
(
2008 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2011 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2013 check_if_terminated_needed
= True
2014 for initial_config_primitive
in initial_config_primitive_list
:
2015 # adding information on the vca_deployed if it is a NS execution environment
2016 if not vca_deployed
["member-vnf-index"]:
2017 deploy_params
["ns_config_info"] = json
.dumps(
2018 self
._get
_ns
_config
_info
(nsr_id
)
2020 # TODO check if already done
2021 primitive_params_
= self
._map
_primitive
_params
(
2022 initial_config_primitive
, {}, deploy_params
2025 step
= "execute primitive '{}' params '{}'".format(
2026 initial_config_primitive
["name"], primitive_params_
2028 self
.logger
.debug(logging_text
+ step
)
2029 await self
.vca_map
[vca_type
].exec_primitive(
2031 primitive_name
=initial_config_primitive
["name"],
2032 params_dict
=primitive_params_
,
2037 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2038 if check_if_terminated_needed
:
2039 if config_descriptor
.get("terminate-config-primitive"):
2041 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2043 check_if_terminated_needed
= False
2045 # TODO register in database that primitive is done
2047 # STEP 7 Configure metrics
2048 if vca_type
== "helm" or vca_type
== "helm-v3":
2049 prometheus_jobs
= await self
.add_prometheus_metrics(
2051 artifact_path
=artifact_path
,
2052 ee_config_descriptor
=ee_config_descriptor
,
2055 target_ip
=rw_mgmt_ip
,
2061 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2064 step
= "instantiated at VCA"
2065 self
.logger
.debug(logging_text
+ step
)
2067 self
._write
_configuration
_status
(
2068 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2071 except Exception as e
: # TODO not use Exception but N2VC exception
2072 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2074 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2077 "Exception while {} : {}".format(step
, e
), exc_info
=True
2079 self
._write
_configuration
_status
(
2080 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2082 raise LcmException("{} {}".format(step
, e
)) from e
2084 def _write_ns_status(
2088 current_operation
: str,
2089 current_operation_id
: str,
2090 error_description
: str = None,
2091 error_detail
: str = None,
2092 other_update
: dict = None,
2095 Update db_nsr fields.
2098 :param current_operation:
2099 :param current_operation_id:
2100 :param error_description:
2101 :param error_detail:
2102 :param other_update: Other required changes at database if provided, will be cleared
2106 db_dict
= other_update
or {}
2109 ] = current_operation_id
# for backward compatibility
2110 db_dict
["_admin.current-operation"] = current_operation_id
2111 db_dict
["_admin.operation-type"] = (
2112 current_operation
if current_operation
!= "IDLE" else None
2114 db_dict
["currentOperation"] = current_operation
2115 db_dict
["currentOperationID"] = current_operation_id
2116 db_dict
["errorDescription"] = error_description
2117 db_dict
["errorDetail"] = error_detail
2120 db_dict
["nsState"] = ns_state
2121 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2122 except DbException
as e
:
2123 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2125 def _write_op_status(
2129 error_message
: str = None,
2130 queuePosition
: int = 0,
2131 operation_state
: str = None,
2132 other_update
: dict = None,
2135 db_dict
= other_update
or {}
2136 db_dict
["queuePosition"] = queuePosition
2137 if isinstance(stage
, list):
2138 db_dict
["stage"] = stage
[0]
2139 db_dict
["detailed-status"] = " ".join(stage
)
2140 elif stage
is not None:
2141 db_dict
["stage"] = str(stage
)
2143 if error_message
is not None:
2144 db_dict
["errorMessage"] = error_message
2145 if operation_state
is not None:
2146 db_dict
["operationState"] = operation_state
2147 db_dict
["statusEnteredTime"] = time()
2148 self
.update_db_2("nslcmops", op_id
, db_dict
)
2149 except DbException
as e
:
2151 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2154 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2156 nsr_id
= db_nsr
["_id"]
2157 # configurationStatus
2158 config_status
= db_nsr
.get("configurationStatus")
2161 "configurationStatus.{}.status".format(index
): status
2162 for index
, v
in enumerate(config_status
)
2166 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2168 except DbException
as e
:
2170 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2173 def _write_configuration_status(
2178 element_under_configuration
: str = None,
2179 element_type
: str = None,
2180 other_update
: dict = None,
2183 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2184 # .format(vca_index, status))
2187 db_path
= "configurationStatus.{}.".format(vca_index
)
2188 db_dict
= other_update
or {}
2190 db_dict
[db_path
+ "status"] = status
2191 if element_under_configuration
:
2193 db_path
+ "elementUnderConfiguration"
2194 ] = element_under_configuration
2196 db_dict
[db_path
+ "elementType"] = element_type
2197 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2198 except DbException
as e
:
2200 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2201 status
, nsr_id
, vca_index
, e
2205 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2207 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2208 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2209 Database is used because the result can be obtained from a different LCM worker in case of HA.
2210 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2211 :param db_nslcmop: database content of nslcmop
2212 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2213 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2214 computed 'vim-account-id'
2217 nslcmop_id
= db_nslcmop
["_id"]
2218 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2219 if placement_engine
== "PLA":
2221 logging_text
+ "Invoke and wait for placement optimization"
2223 await self
.msg
.aiowrite(
2224 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2226 db_poll_interval
= 5
2227 wait
= db_poll_interval
* 10
2229 while not pla_result
and wait
>= 0:
2230 await asyncio
.sleep(db_poll_interval
)
2231 wait
-= db_poll_interval
2232 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2233 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2237 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2240 for pla_vnf
in pla_result
["vnf"]:
2241 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2242 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2247 {"_id": vnfr
["_id"]},
2248 {"vim-account-id": pla_vnf
["vimAccountId"]},
2251 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2254 def update_nsrs_with_pla_result(self
, params
):
2256 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2258 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2260 except Exception as e
:
2261 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2263 async def instantiate(self
, nsr_id
, nslcmop_id
):
2266 :param nsr_id: ns instance to deploy
2267 :param nslcmop_id: operation to run
2271 # Try to lock HA task here
2272 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2273 if not task_is_locked_by_me
:
2275 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2279 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2280 self
.logger
.debug(logging_text
+ "Enter")
2282 # get all needed from database
2284 # database nsrs record
2287 # database nslcmops record
2290 # update operation on nsrs
2292 # update operation on nslcmops
2293 db_nslcmop_update
= {}
2295 nslcmop_operation_state
= None
2296 db_vnfrs
= {} # vnf's info indexed by member-index
2298 tasks_dict_info
= {} # from task to info text
2302 "Stage 1/5: preparation of the environment.",
2303 "Waiting for previous operations to terminate.",
2306 # ^ stage, step, VIM progress
2308 # wait for any previous tasks in process
2309 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2311 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2312 stage
[1] = "Reading from database."
2313 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2314 db_nsr_update
["detailed-status"] = "creating"
2315 db_nsr_update
["operational-status"] = "init"
2316 self
._write
_ns
_status
(
2318 ns_state
="BUILDING",
2319 current_operation
="INSTANTIATING",
2320 current_operation_id
=nslcmop_id
,
2321 other_update
=db_nsr_update
,
2323 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2325 # read from db: operation
2326 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2327 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2328 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2329 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2330 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2332 ns_params
= db_nslcmop
.get("operationParams")
2333 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2334 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2336 timeout_ns_deploy
= self
.timeout
.get(
2337 "ns_deploy", self
.timeout_ns_deploy
2341 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2342 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2343 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2344 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2345 self
.fs
.sync(db_nsr
["nsd-id"])
2347 # nsr_name = db_nsr["name"] # TODO short-name??
2349 # read from db: vnf's of this ns
2350 stage
[1] = "Getting vnfrs from db."
2351 self
.logger
.debug(logging_text
+ stage
[1])
2352 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2354 # read from db: vnfd's for every vnf
2355 db_vnfds
= [] # every vnfd data
2357 # for each vnf in ns, read vnfd
2358 for vnfr
in db_vnfrs_list
:
2359 if vnfr
.get("kdur"):
2361 for kdur
in vnfr
["kdur"]:
2362 if kdur
.get("additionalParams"):
2363 kdur
["additionalParams"] = json
.loads(
2364 kdur
["additionalParams"]
2366 kdur_list
.append(kdur
)
2367 vnfr
["kdur"] = kdur_list
2369 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2370 vnfd_id
= vnfr
["vnfd-id"]
2371 vnfd_ref
= vnfr
["vnfd-ref"]
2372 self
.fs
.sync(vnfd_id
)
2374 # if we haven't this vnfd, read it from db
2375 if vnfd_id
not in db_vnfds
:
2377 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2380 self
.logger
.debug(logging_text
+ stage
[1])
2381 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2384 db_vnfds
.append(vnfd
)
2386 # Get or generates the _admin.deployed.VCA list
2387 vca_deployed_list
= None
2388 if db_nsr
["_admin"].get("deployed"):
2389 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2390 if vca_deployed_list
is None:
2391 vca_deployed_list
= []
2392 configuration_status_list
= []
2393 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2394 db_nsr_update
["configurationStatus"] = configuration_status_list
2395 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2396 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2397 elif isinstance(vca_deployed_list
, dict):
2398 # maintain backward compatibility. Change a dict to list at database
2399 vca_deployed_list
= list(vca_deployed_list
.values())
2400 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2401 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2404 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2406 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2407 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2409 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2410 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2411 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2413 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2416 # n2vc_redesign STEP 2 Deploy Network Scenario
2417 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2418 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2420 stage
[1] = "Deploying KDUs."
2421 # self.logger.debug(logging_text + "Before deploy_kdus")
2422 # Call to deploy_kdus in case exists the "vdu:kdu" param
2423 await self
.deploy_kdus(
2424 logging_text
=logging_text
,
2426 nslcmop_id
=nslcmop_id
,
2429 task_instantiation_info
=tasks_dict_info
,
2432 stage
[1] = "Getting VCA public key."
2433 # n2vc_redesign STEP 1 Get VCA public ssh-key
2434 # feature 1429. Add n2vc public key to needed VMs
2435 n2vc_key
= self
.n2vc
.get_public_key()
2436 n2vc_key_list
= [n2vc_key
]
2437 if self
.vca_config
.get("public_key"):
2438 n2vc_key_list
.append(self
.vca_config
["public_key"])
2440 stage
[1] = "Deploying NS at VIM."
2441 task_ro
= asyncio
.ensure_future(
2442 self
.instantiate_RO(
2443 logging_text
=logging_text
,
2447 db_nslcmop
=db_nslcmop
,
2450 n2vc_key_list
=n2vc_key_list
,
2454 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2455 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2457 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2458 stage
[1] = "Deploying Execution Environments."
2459 self
.logger
.debug(logging_text
+ stage
[1])
2461 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2462 for vnf_profile
in get_vnf_profiles(nsd
):
2463 vnfd_id
= vnf_profile
["vnfd-id"]
2464 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2465 member_vnf_index
= str(vnf_profile
["id"])
2466 db_vnfr
= db_vnfrs
[member_vnf_index
]
2467 base_folder
= vnfd
["_admin"]["storage"]
2473 # Get additional parameters
2474 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2475 if db_vnfr
.get("additionalParamsForVnf"):
2476 deploy_params
.update(
2477 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2480 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2481 if descriptor_config
:
2483 logging_text
=logging_text
2484 + "member_vnf_index={} ".format(member_vnf_index
),
2487 nslcmop_id
=nslcmop_id
,
2493 member_vnf_index
=member_vnf_index
,
2494 vdu_index
=vdu_index
,
2496 deploy_params
=deploy_params
,
2497 descriptor_config
=descriptor_config
,
2498 base_folder
=base_folder
,
2499 task_instantiation_info
=tasks_dict_info
,
2503 # Deploy charms for each VDU that supports one.
2504 for vdud
in get_vdu_list(vnfd
):
2506 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2507 vdur
= find_in_list(
2508 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2511 if vdur
.get("additionalParams"):
2512 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2514 deploy_params_vdu
= deploy_params
2515 deploy_params_vdu
["OSM"] = get_osm_params(
2516 db_vnfr
, vdu_id
, vdu_count_index
=0
2518 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2520 self
.logger
.debug("VDUD > {}".format(vdud
))
2522 "Descriptor config > {}".format(descriptor_config
)
2524 if descriptor_config
:
2527 for vdu_index
in range(vdud_count
):
2528 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2530 logging_text
=logging_text
2531 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2532 member_vnf_index
, vdu_id
, vdu_index
2536 nslcmop_id
=nslcmop_id
,
2542 member_vnf_index
=member_vnf_index
,
2543 vdu_index
=vdu_index
,
2545 deploy_params
=deploy_params_vdu
,
2546 descriptor_config
=descriptor_config
,
2547 base_folder
=base_folder
,
2548 task_instantiation_info
=tasks_dict_info
,
2551 for kdud
in get_kdu_list(vnfd
):
2552 kdu_name
= kdud
["name"]
2553 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2554 if descriptor_config
:
2559 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2561 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2562 if kdur
.get("additionalParams"):
2563 deploy_params_kdu
.update(
2564 parse_yaml_strings(kdur
["additionalParams"].copy())
2568 logging_text
=logging_text
,
2571 nslcmop_id
=nslcmop_id
,
2577 member_vnf_index
=member_vnf_index
,
2578 vdu_index
=vdu_index
,
2580 deploy_params
=deploy_params_kdu
,
2581 descriptor_config
=descriptor_config
,
2582 base_folder
=base_folder
,
2583 task_instantiation_info
=tasks_dict_info
,
2587 # Check if this NS has a charm configuration
2588 descriptor_config
= nsd
.get("ns-configuration")
2589 if descriptor_config
and descriptor_config
.get("juju"):
2592 member_vnf_index
= None
2598 # Get additional parameters
2599 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2600 if db_nsr
.get("additionalParamsForNs"):
2601 deploy_params
.update(
2602 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2604 base_folder
= nsd
["_admin"]["storage"]
2606 logging_text
=logging_text
,
2609 nslcmop_id
=nslcmop_id
,
2615 member_vnf_index
=member_vnf_index
,
2616 vdu_index
=vdu_index
,
2618 deploy_params
=deploy_params
,
2619 descriptor_config
=descriptor_config
,
2620 base_folder
=base_folder
,
2621 task_instantiation_info
=tasks_dict_info
,
2625 # rest of staff will be done at finally
2628 ROclient
.ROClientException
,
2634 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2637 except asyncio
.CancelledError
:
2639 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2641 exc
= "Operation was cancelled"
2642 except Exception as e
:
2643 exc
= traceback
.format_exc()
2644 self
.logger
.critical(
2645 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2650 error_list
.append(str(exc
))
2652 # wait for pending tasks
2654 stage
[1] = "Waiting for instantiate pending tasks."
2655 self
.logger
.debug(logging_text
+ stage
[1])
2656 error_list
+= await self
._wait
_for
_tasks
(
2664 stage
[1] = stage
[2] = ""
2665 except asyncio
.CancelledError
:
2666 error_list
.append("Cancelled")
2667 # TODO cancel all tasks
2668 except Exception as exc
:
2669 error_list
.append(str(exc
))
2671 # update operation-status
2672 db_nsr_update
["operational-status"] = "running"
2673 # let's begin with VCA 'configured' status (later we can change it)
2674 db_nsr_update
["config-status"] = "configured"
2675 for task
, task_name
in tasks_dict_info
.items():
2676 if not task
.done() or task
.cancelled() or task
.exception():
2677 if task_name
.startswith(self
.task_name_deploy_vca
):
2678 # A N2VC task is pending
2679 db_nsr_update
["config-status"] = "failed"
2681 # RO or KDU task is pending
2682 db_nsr_update
["operational-status"] = "failed"
2684 # update status at database
2686 error_detail
= ". ".join(error_list
)
2687 self
.logger
.error(logging_text
+ error_detail
)
2688 error_description_nslcmop
= "{} Detail: {}".format(
2689 stage
[0], error_detail
2691 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2692 nslcmop_id
, stage
[0]
2695 db_nsr_update
["detailed-status"] = (
2696 error_description_nsr
+ " Detail: " + error_detail
2698 db_nslcmop_update
["detailed-status"] = error_detail
2699 nslcmop_operation_state
= "FAILED"
2703 error_description_nsr
= error_description_nslcmop
= None
2705 db_nsr_update
["detailed-status"] = "Done"
2706 db_nslcmop_update
["detailed-status"] = "Done"
2707 nslcmop_operation_state
= "COMPLETED"
2710 self
._write
_ns
_status
(
2713 current_operation
="IDLE",
2714 current_operation_id
=None,
2715 error_description
=error_description_nsr
,
2716 error_detail
=error_detail
,
2717 other_update
=db_nsr_update
,
2719 self
._write
_op
_status
(
2722 error_message
=error_description_nslcmop
,
2723 operation_state
=nslcmop_operation_state
,
2724 other_update
=db_nslcmop_update
,
2727 if nslcmop_operation_state
:
2729 await self
.msg
.aiowrite(
2734 "nslcmop_id": nslcmop_id
,
2735 "operationState": nslcmop_operation_state
,
2739 except Exception as e
:
2741 logging_text
+ "kafka_write notification Exception {}".format(e
)
2744 self
.logger
.debug(logging_text
+ "Exit")
2745 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2747 async def _add_vca_relations(
2752 timeout
: int = 3600,
2753 vca_type
: str = None,
2758 # 1. find all relations for this VCA
2759 # 2. wait for other peers related
2763 vca_type
= vca_type
or "lxc_proxy_charm"
2765 # STEP 1: find all relations for this VCA
2768 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2769 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2772 my_vca
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))[vca_index
]
2774 # read all ns-configuration relations
2775 ns_relations
= list()
2776 db_ns_relations
= deep_get(nsd
, ("ns-configuration", "relation"))
2778 for r
in db_ns_relations
:
2779 # check if this VCA is in the relation
2780 if my_vca
.get("member-vnf-index") in (
2781 r
.get("entities")[0].get("id"),
2782 r
.get("entities")[1].get("id"),
2784 ns_relations
.append(r
)
2786 # read all vnf-configuration relations
2787 vnf_relations
= list()
2788 db_vnfd_list
= db_nsr
.get("vnfd-id")
2790 for vnfd
in db_vnfd_list
:
2791 db_vnf_relations
= None
2792 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2793 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2794 if db_vnf_configuration
:
2795 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2796 if db_vnf_relations
:
2797 for r
in db_vnf_relations
:
2798 # check if this VCA is in the relation
2799 if my_vca
.get("vdu_id") in (
2800 r
.get("entities")[0].get("id"),
2801 r
.get("entities")[1].get("id"),
2803 vnf_relations
.append(r
)
2805 # if no relations, terminate
2806 if not ns_relations
and not vnf_relations
:
2807 self
.logger
.debug(logging_text
+ " No relations")
2812 + " adding relations\n {}\n {}".format(
2813 ns_relations
, vnf_relations
2822 if now
- start
>= timeout
:
2823 self
.logger
.error(logging_text
+ " : timeout adding relations")
2826 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2827 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2829 # for each defined NS relation, find the VCA's related
2830 for r
in ns_relations
.copy():
2831 from_vca_ee_id
= None
2833 from_vca_endpoint
= None
2834 to_vca_endpoint
= None
2835 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2836 for vca
in vca_list
:
2837 if vca
.get("member-vnf-index") == r
.get("entities")[0].get(
2839 ) and vca
.get("config_sw_installed"):
2840 from_vca_ee_id
= vca
.get("ee_id")
2841 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2842 if vca
.get("member-vnf-index") == r
.get("entities")[1].get(
2844 ) and vca
.get("config_sw_installed"):
2845 to_vca_ee_id
= vca
.get("ee_id")
2846 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2847 if from_vca_ee_id
and to_vca_ee_id
:
2849 await self
.vca_map
[vca_type
].add_relation(
2850 ee_id_1
=from_vca_ee_id
,
2851 ee_id_2
=to_vca_ee_id
,
2852 endpoint_1
=from_vca_endpoint
,
2853 endpoint_2
=to_vca_endpoint
,
2856 # remove entry from relations list
2857 ns_relations
.remove(r
)
2859 # check failed peers
2861 vca_status_list
= db_nsr
.get("configurationStatus")
2863 for i
in range(len(vca_list
)):
2865 vca_status
= vca_status_list
[i
]
2866 if vca
.get("member-vnf-index") == r
.get("entities")[
2869 if vca_status
.get("status") == "BROKEN":
2870 # peer broken: remove relation from list
2871 ns_relations
.remove(r
)
2872 if vca
.get("member-vnf-index") == r
.get("entities")[
2875 if vca_status
.get("status") == "BROKEN":
2876 # peer broken: remove relation from list
2877 ns_relations
.remove(r
)
2882 # for each defined VNF relation, find the VCA's related
2883 for r
in vnf_relations
.copy():
2884 from_vca_ee_id
= None
2886 from_vca_endpoint
= None
2887 to_vca_endpoint
= None
2888 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2889 for vca
in vca_list
:
2890 key_to_check
= "vdu_id"
2891 if vca
.get("vdu_id") is None:
2892 key_to_check
= "vnfd_id"
2893 if vca
.get(key_to_check
) == r
.get("entities")[0].get(
2895 ) and vca
.get("config_sw_installed"):
2896 from_vca_ee_id
= vca
.get("ee_id")
2897 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2898 if vca
.get(key_to_check
) == r
.get("entities")[1].get(
2900 ) and vca
.get("config_sw_installed"):
2901 to_vca_ee_id
= vca
.get("ee_id")
2902 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2903 if from_vca_ee_id
and to_vca_ee_id
:
2905 await self
.vca_map
[vca_type
].add_relation(
2906 ee_id_1
=from_vca_ee_id
,
2907 ee_id_2
=to_vca_ee_id
,
2908 endpoint_1
=from_vca_endpoint
,
2909 endpoint_2
=to_vca_endpoint
,
2912 # remove entry from relations list
2913 vnf_relations
.remove(r
)
2915 # check failed peers
2917 vca_status_list
= db_nsr
.get("configurationStatus")
2919 for i
in range(len(vca_list
)):
2921 vca_status
= vca_status_list
[i
]
2922 if vca
.get("vdu_id") == r
.get("entities")[0].get(
2925 if vca_status
.get("status") == "BROKEN":
2926 # peer broken: remove relation from list
2927 vnf_relations
.remove(r
)
2928 if vca
.get("vdu_id") == r
.get("entities")[1].get(
2931 if vca_status
.get("status") == "BROKEN":
2932 # peer broken: remove relation from list
2933 vnf_relations
.remove(r
)
2939 await asyncio
.sleep(5.0)
2941 if not ns_relations
and not vnf_relations
:
2942 self
.logger
.debug("Relations added")
2947 except Exception as e
:
2948 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2951 async def _install_kdu(
2959 k8s_instance_info
: dict,
2960 k8params
: dict = None,
2966 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2969 "collection": "nsrs",
2970 "filter": {"_id": nsr_id
},
2971 "path": nsr_db_path
,
2974 if k8s_instance_info
.get("kdu-deployment-name"):
2975 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2977 kdu_instance
= self
.k8scluster_map
[
2979 ].generate_kdu_instance_name(
2980 db_dict
=db_dict_install
,
2981 kdu_model
=k8s_instance_info
["kdu-model"],
2982 kdu_name
=k8s_instance_info
["kdu-name"],
2985 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2987 await self
.k8scluster_map
[k8sclustertype
].install(
2988 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2989 kdu_model
=k8s_instance_info
["kdu-model"],
2992 db_dict
=db_dict_install
,
2994 kdu_name
=k8s_instance_info
["kdu-name"],
2995 namespace
=k8s_instance_info
["namespace"],
2996 kdu_instance
=kdu_instance
,
3000 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3003 # Obtain services to obtain management service ip
3004 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3005 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3006 kdu_instance
=kdu_instance
,
3007 namespace
=k8s_instance_info
["namespace"],
3010 # Obtain management service info (if exists)
3011 vnfr_update_dict
= {}
3012 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3014 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3019 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3022 for service
in kdud
.get("service", [])
3023 if service
.get("mgmt-service")
3025 for mgmt_service
in mgmt_services
:
3026 for service
in services
:
3027 if service
["name"].startswith(mgmt_service
["name"]):
3028 # Mgmt service found, Obtain service ip
3029 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3030 if isinstance(ip
, list) and len(ip
) == 1:
3034 "kdur.{}.ip-address".format(kdu_index
)
3037 # Check if must update also mgmt ip at the vnf
3038 service_external_cp
= mgmt_service
.get(
3039 "external-connection-point-ref"
3041 if service_external_cp
:
3043 deep_get(vnfd
, ("mgmt-interface", "cp"))
3044 == service_external_cp
3046 vnfr_update_dict
["ip-address"] = ip
3051 "external-connection-point-ref", ""
3053 == service_external_cp
,
3056 "kdur.{}.ip-address".format(kdu_index
)
3061 "Mgmt service name: {} not found".format(
3062 mgmt_service
["name"]
3066 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3067 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3069 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3072 and kdu_config
.get("initial-config-primitive")
3073 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3075 initial_config_primitive_list
= kdu_config
.get(
3076 "initial-config-primitive"
3078 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3080 for initial_config_primitive
in initial_config_primitive_list
:
3081 primitive_params_
= self
._map
_primitive
_params
(
3082 initial_config_primitive
, {}, {}
3085 await asyncio
.wait_for(
3086 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3087 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3088 kdu_instance
=kdu_instance
,
3089 primitive_name
=initial_config_primitive
["name"],
3090 params
=primitive_params_
,
3091 db_dict
=db_dict_install
,
3097 except Exception as e
:
3098 # Prepare update db with error and raise exception
3101 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3105 vnfr_data
.get("_id"),
3106 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3109 # ignore to keep original exception
3111 # reraise original error
3116 async def deploy_kdus(
3123 task_instantiation_info
,
3125 # Launch kdus if present in the descriptor
3127 k8scluster_id_2_uuic
= {
3128 "helm-chart-v3": {},
3133 async def _get_cluster_id(cluster_id
, cluster_type
):
3134 nonlocal k8scluster_id_2_uuic
3135 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3136 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3138 # check if K8scluster is creating and wait look if previous tasks in process
3139 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3140 "k8scluster", cluster_id
3143 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3144 task_name
, cluster_id
3146 self
.logger
.debug(logging_text
+ text
)
3147 await asyncio
.wait(task_dependency
, timeout
=3600)
3149 db_k8scluster
= self
.db
.get_one(
3150 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3152 if not db_k8scluster
:
3153 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3155 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3157 if cluster_type
== "helm-chart-v3":
3159 # backward compatibility for existing clusters that have not been initialized for helm v3
3160 k8s_credentials
= yaml
.safe_dump(
3161 db_k8scluster
.get("credentials")
3163 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3164 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3166 db_k8scluster_update
= {}
3167 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3168 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3169 db_k8scluster_update
[
3170 "_admin.helm-chart-v3.created"
3172 db_k8scluster_update
[
3173 "_admin.helm-chart-v3.operationalState"
3176 "k8sclusters", cluster_id
, db_k8scluster_update
3178 except Exception as e
:
3181 + "error initializing helm-v3 cluster: {}".format(str(e
))
3184 "K8s cluster '{}' has not been initialized for '{}'".format(
3185 cluster_id
, cluster_type
3190 "K8s cluster '{}' has not been initialized for '{}'".format(
3191 cluster_id
, cluster_type
3194 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3197 logging_text
+= "Deploy kdus: "
3200 db_nsr_update
= {"_admin.deployed.K8s": []}
3201 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3204 updated_cluster_list
= []
3205 updated_v3_cluster_list
= []
3207 for vnfr_data
in db_vnfrs
.values():
3208 vca_id
= self
.get_vca_id(vnfr_data
, {})
3209 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3210 # Step 0: Prepare and set parameters
3211 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3212 vnfd_id
= vnfr_data
.get("vnfd-id")
3213 vnfd_with_id
= find_in_list(
3214 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3218 for kdud
in vnfd_with_id
["kdu"]
3219 if kdud
["name"] == kdur
["kdu-name"]
3221 namespace
= kdur
.get("k8s-namespace")
3222 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3223 if kdur
.get("helm-chart"):
3224 kdumodel
= kdur
["helm-chart"]
3225 # Default version: helm3, if helm-version is v2 assign v2
3226 k8sclustertype
= "helm-chart-v3"
3227 self
.logger
.debug("kdur: {}".format(kdur
))
3229 kdur
.get("helm-version")
3230 and kdur
.get("helm-version") == "v2"
3232 k8sclustertype
= "helm-chart"
3233 elif kdur
.get("juju-bundle"):
3234 kdumodel
= kdur
["juju-bundle"]
3235 k8sclustertype
= "juju-bundle"
3238 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3239 "juju-bundle. Maybe an old NBI version is running".format(
3240 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3243 # check if kdumodel is a file and exists
3245 vnfd_with_id
= find_in_list(
3246 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3248 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3249 if storage
and storage
.get(
3251 ): # may be not present if vnfd has not artifacts
3252 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3253 filename
= "{}/{}/{}s/{}".format(
3259 if self
.fs
.file_exists(
3260 filename
, mode
="file"
3261 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3262 kdumodel
= self
.fs
.path
+ filename
3263 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3265 except Exception: # it is not a file
3268 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3269 step
= "Synchronize repos for k8s cluster '{}'".format(
3272 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3276 k8sclustertype
== "helm-chart"
3277 and cluster_uuid
not in updated_cluster_list
3279 k8sclustertype
== "helm-chart-v3"
3280 and cluster_uuid
not in updated_v3_cluster_list
3282 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3283 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3284 cluster_uuid
=cluster_uuid
3287 if del_repo_list
or added_repo_dict
:
3288 if k8sclustertype
== "helm-chart":
3290 "_admin.helm_charts_added." + item
: None
3291 for item
in del_repo_list
3294 "_admin.helm_charts_added." + item
: name
3295 for item
, name
in added_repo_dict
.items()
3297 updated_cluster_list
.append(cluster_uuid
)
3298 elif k8sclustertype
== "helm-chart-v3":
3300 "_admin.helm_charts_v3_added." + item
: None
3301 for item
in del_repo_list
3304 "_admin.helm_charts_v3_added." + item
: name
3305 for item
, name
in added_repo_dict
.items()
3307 updated_v3_cluster_list
.append(cluster_uuid
)
3309 logging_text
+ "repos synchronized on k8s cluster "
3310 "'{}' to_delete: {}, to_add: {}".format(
3311 k8s_cluster_id
, del_repo_list
, added_repo_dict
3316 {"_id": k8s_cluster_id
},
3322 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3323 vnfr_data
["member-vnf-index-ref"],
3327 k8s_instance_info
= {
3328 "kdu-instance": None,
3329 "k8scluster-uuid": cluster_uuid
,
3330 "k8scluster-type": k8sclustertype
,
3331 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3332 "kdu-name": kdur
["kdu-name"],
3333 "kdu-model": kdumodel
,
3334 "namespace": namespace
,
3335 "kdu-deployment-name": kdu_deployment_name
,
3337 db_path
= "_admin.deployed.K8s.{}".format(index
)
3338 db_nsr_update
[db_path
] = k8s_instance_info
3339 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3340 vnfd_with_id
= find_in_list(
3341 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3343 task
= asyncio
.ensure_future(
3352 k8params
=desc_params
,
3357 self
.lcm_tasks
.register(
3361 "instantiate_KDU-{}".format(index
),
3364 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3370 except (LcmException
, asyncio
.CancelledError
):
3372 except Exception as e
:
3373 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3374 if isinstance(e
, (N2VCException
, DbException
)):
3375 self
.logger
.error(logging_text
+ msg
)
3377 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3378 raise LcmException(msg
)
3381 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3400 task_instantiation_info
,
3403 # launch instantiate_N2VC in a asyncio task and register task object
3404 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3405 # if not found, create one entry and update database
3406 # fill db_nsr._admin.deployed.VCA.<index>
3409 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3411 if "execution-environment-list" in descriptor_config
:
3412 ee_list
= descriptor_config
.get("execution-environment-list", [])
3413 elif "juju" in descriptor_config
:
3414 ee_list
= [descriptor_config
] # ns charms
3415 else: # other types as script are not supported
3418 for ee_item
in ee_list
:
3421 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3422 ee_item
.get("juju"), ee_item
.get("helm-chart")
3425 ee_descriptor_id
= ee_item
.get("id")
3426 if ee_item
.get("juju"):
3427 vca_name
= ee_item
["juju"].get("charm")
3430 if ee_item
["juju"].get("charm") is not None
3433 if ee_item
["juju"].get("cloud") == "k8s":
3434 vca_type
= "k8s_proxy_charm"
3435 elif ee_item
["juju"].get("proxy") is False:
3436 vca_type
= "native_charm"
3437 elif ee_item
.get("helm-chart"):
3438 vca_name
= ee_item
["helm-chart"]
3439 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3442 vca_type
= "helm-v3"
3445 logging_text
+ "skipping non juju neither charm configuration"
3450 for vca_index
, vca_deployed
in enumerate(
3451 db_nsr
["_admin"]["deployed"]["VCA"]
3453 if not vca_deployed
:
3456 vca_deployed
.get("member-vnf-index") == member_vnf_index
3457 and vca_deployed
.get("vdu_id") == vdu_id
3458 and vca_deployed
.get("kdu_name") == kdu_name
3459 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3460 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3464 # not found, create one.
3466 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3469 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3471 target
+= "/kdu/{}".format(kdu_name
)
3473 "target_element": target
,
3474 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3475 "member-vnf-index": member_vnf_index
,
3477 "kdu_name": kdu_name
,
3478 "vdu_count_index": vdu_index
,
3479 "operational-status": "init", # TODO revise
3480 "detailed-status": "", # TODO revise
3481 "step": "initial-deploy", # TODO revise
3483 "vdu_name": vdu_name
,
3485 "ee_descriptor_id": ee_descriptor_id
,
3489 # create VCA and configurationStatus in db
3491 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3492 "configurationStatus.{}".format(vca_index
): dict(),
3494 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3496 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3498 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3499 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3500 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3503 task_n2vc
= asyncio
.ensure_future(
3504 self
.instantiate_N2VC(
3505 logging_text
=logging_text
,
3506 vca_index
=vca_index
,
3512 vdu_index
=vdu_index
,
3513 deploy_params
=deploy_params
,
3514 config_descriptor
=descriptor_config
,
3515 base_folder
=base_folder
,
3516 nslcmop_id
=nslcmop_id
,
3520 ee_config_descriptor
=ee_item
,
3523 self
.lcm_tasks
.register(
3527 "instantiate_N2VC-{}".format(vca_index
),
3530 task_instantiation_info
[
3532 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3533 member_vnf_index
or "", vdu_id
or ""
3537 def _create_nslcmop(nsr_id
, operation
, params
):
3539 Creates a ns-lcm-opp content to be stored at database.
3540 :param nsr_id: internal id of the instance
3541 :param operation: instantiate, terminate, scale, action, ...
3542 :param params: user parameters for the operation
3543 :return: dictionary following SOL005 format
3545 # Raise exception if invalid arguments
3546 if not (nsr_id
and operation
and params
):
3548 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3555 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3556 "operationState": "PROCESSING",
3557 "statusEnteredTime": now
,
3558 "nsInstanceId": nsr_id
,
3559 "lcmOperationType": operation
,
3561 "isAutomaticInvocation": False,
3562 "operationParams": params
,
3563 "isCancelPending": False,
3565 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3566 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3571 def _format_additional_params(self
, params
):
3572 params
= params
or {}
3573 for key
, value
in params
.items():
3574 if str(value
).startswith("!!yaml "):
3575 params
[key
] = yaml
.safe_load(value
[7:])
3578 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3579 primitive
= seq
.get("name")
3580 primitive_params
= {}
3582 "member_vnf_index": vnf_index
,
3583 "primitive": primitive
,
3584 "primitive_params": primitive_params
,
3587 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3591 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3592 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3593 if op
.get("operationState") == "COMPLETED":
3594 # b. Skip sub-operation
3595 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3596 return self
.SUBOPERATION_STATUS_SKIP
3598 # c. retry executing sub-operation
3599 # The sub-operation exists, and operationState != 'COMPLETED'
3600 # Update operationState = 'PROCESSING' to indicate a retry.
3601 operationState
= "PROCESSING"
3602 detailed_status
= "In progress"
3603 self
._update
_suboperation
_status
(
3604 db_nslcmop
, op_index
, operationState
, detailed_status
3606 # Return the sub-operation index
3607 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3608 # with arguments extracted from the sub-operation
3611 # Find a sub-operation where all keys in a matching dictionary must match
3612 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3613 def _find_suboperation(self
, db_nslcmop
, match
):
3614 if db_nslcmop
and match
:
3615 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3616 for i
, op
in enumerate(op_list
):
3617 if all(op
.get(k
) == match
[k
] for k
in match
):
3619 return self
.SUBOPERATION_STATUS_NOT_FOUND
3621 # Update status for a sub-operation given its index
3622 def _update_suboperation_status(
3623 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3625 # Update DB for HA tasks
3626 q_filter
= {"_id": db_nslcmop
["_id"]}
3628 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3629 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3632 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3635 # Add sub-operation, return the index of the added sub-operation
3636 # Optionally, set operationState, detailed-status, and operationType
3637 # Status and type are currently set for 'scale' sub-operations:
3638 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3639 # 'detailed-status' : status message
3640 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3641 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3642 def _add_suboperation(
3650 mapped_primitive_params
,
3651 operationState
=None,
3652 detailed_status
=None,
3655 RO_scaling_info
=None,
3658 return self
.SUBOPERATION_STATUS_NOT_FOUND
3659 # Get the "_admin.operations" list, if it exists
3660 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3661 op_list
= db_nslcmop_admin
.get("operations")
3662 # Create or append to the "_admin.operations" list
3664 "member_vnf_index": vnf_index
,
3666 "vdu_count_index": vdu_count_index
,
3667 "primitive": primitive
,
3668 "primitive_params": mapped_primitive_params
,
3671 new_op
["operationState"] = operationState
3673 new_op
["detailed-status"] = detailed_status
3675 new_op
["lcmOperationType"] = operationType
3677 new_op
["RO_nsr_id"] = RO_nsr_id
3679 new_op
["RO_scaling_info"] = RO_scaling_info
3681 # No existing operations, create key 'operations' with current operation as first list element
3682 db_nslcmop_admin
.update({"operations": [new_op
]})
3683 op_list
= db_nslcmop_admin
.get("operations")
3685 # Existing operations, append operation to list
3686 op_list
.append(new_op
)
3688 db_nslcmop_update
= {"_admin.operations": op_list
}
3689 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3690 op_index
= len(op_list
) - 1
3693 # Helper methods for scale() sub-operations
3695 # pre-scale/post-scale:
3696 # Check for 3 different cases:
3697 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3698 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3699 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3700 def _check_or_add_scale_suboperation(
3704 vnf_config_primitive
,
3708 RO_scaling_info
=None,
3710 # Find this sub-operation
3711 if RO_nsr_id
and RO_scaling_info
:
3712 operationType
= "SCALE-RO"
3714 "member_vnf_index": vnf_index
,
3715 "RO_nsr_id": RO_nsr_id
,
3716 "RO_scaling_info": RO_scaling_info
,
3720 "member_vnf_index": vnf_index
,
3721 "primitive": vnf_config_primitive
,
3722 "primitive_params": primitive_params
,
3723 "lcmOperationType": operationType
,
3725 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3726 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3727 # a. New sub-operation
3728 # The sub-operation does not exist, add it.
3729 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3730 # The following parameters are set to None for all kind of scaling:
3732 vdu_count_index
= None
3734 if RO_nsr_id
and RO_scaling_info
:
3735 vnf_config_primitive
= None
3736 primitive_params
= None
3739 RO_scaling_info
= None
3740 # Initial status for sub-operation
3741 operationState
= "PROCESSING"
3742 detailed_status
= "In progress"
3743 # Add sub-operation for pre/post-scaling (zero or more operations)
3744 self
._add
_suboperation
(
3750 vnf_config_primitive
,
3758 return self
.SUBOPERATION_STATUS_NEW
3760 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3761 # or op_index (operationState != 'COMPLETED')
3762 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3764 # Function to return execution_environment id
3766 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3767 # TODO vdu_index_count
3768 for vca
in vca_deployed_list
:
3769 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3772 async def destroy_N2VC(
3780 exec_primitives
=True,
3785 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3786 :param logging_text:
3788 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3789 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3790 :param vca_index: index in the database _admin.deployed.VCA
3791 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3792 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3793 not executed properly
3794 :param scaling_in: True destroys the application, False destroys the model
3795 :return: None or exception
3800 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3801 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3805 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3807 # execute terminate_primitives
3809 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3810 config_descriptor
.get("terminate-config-primitive"),
3811 vca_deployed
.get("ee_descriptor_id"),
3813 vdu_id
= vca_deployed
.get("vdu_id")
3814 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3815 vdu_name
= vca_deployed
.get("vdu_name")
3816 vnf_index
= vca_deployed
.get("member-vnf-index")
3817 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3818 for seq
in terminate_primitives
:
3819 # For each sequence in list, get primitive and call _ns_execute_primitive()
3820 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3821 vnf_index
, seq
.get("name")
3823 self
.logger
.debug(logging_text
+ step
)
3824 # Create the primitive for each sequence, i.e. "primitive": "touch"
3825 primitive
= seq
.get("name")
3826 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3831 self
._add
_suboperation
(
3838 mapped_primitive_params
,
3840 # Sub-operations: Call _ns_execute_primitive() instead of action()
3842 result
, result_detail
= await self
._ns
_execute
_primitive
(
3843 vca_deployed
["ee_id"],
3845 mapped_primitive_params
,
3849 except LcmException
:
3850 # this happens when VCA is not deployed. In this case it is not needed to terminate
3852 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3853 if result
not in result_ok
:
3855 "terminate_primitive {} for vnf_member_index={} fails with "
3856 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3858 # set that this VCA do not need terminated
3859 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3863 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3866 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3867 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3870 await self
.vca_map
[vca_type
].delete_execution_environment(
3871 vca_deployed
["ee_id"],
3872 scaling_in
=scaling_in
,
3877 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3878 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3879 namespace
= "." + db_nsr
["_id"]
3881 await self
.n2vc
.delete_namespace(
3882 namespace
=namespace
,
3883 total_timeout
=self
.timeout_charm_delete
,
3886 except N2VCNotFound
: # already deleted. Skip
3888 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3890 async def _terminate_RO(
3891 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3894 Terminates a deployment from RO
3895 :param logging_text:
3896 :param nsr_deployed: db_nsr._admin.deployed
3899 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3900 this method will update only the index 2, but it will write on database the concatenated content of the list
3905 ro_nsr_id
= ro_delete_action
= None
3906 if nsr_deployed
and nsr_deployed
.get("RO"):
3907 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3908 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3911 stage
[2] = "Deleting ns from VIM."
3912 db_nsr_update
["detailed-status"] = " ".join(stage
)
3913 self
._write
_op
_status
(nslcmop_id
, stage
)
3914 self
.logger
.debug(logging_text
+ stage
[2])
3915 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3916 self
._write
_op
_status
(nslcmop_id
, stage
)
3917 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3918 ro_delete_action
= desc
["action_id"]
3920 "_admin.deployed.RO.nsr_delete_action_id"
3921 ] = ro_delete_action
3922 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3923 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3924 if ro_delete_action
:
3925 # wait until NS is deleted from VIM
3926 stage
[2] = "Waiting ns deleted from VIM."
3927 detailed_status_old
= None
3931 + " RO_id={} ro_delete_action={}".format(
3932 ro_nsr_id
, ro_delete_action
3935 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3936 self
._write
_op
_status
(nslcmop_id
, stage
)
3938 delete_timeout
= 20 * 60 # 20 minutes
3939 while delete_timeout
> 0:
3940 desc
= await self
.RO
.show(
3942 item_id_name
=ro_nsr_id
,
3943 extra_item
="action",
3944 extra_item_id
=ro_delete_action
,
3948 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3950 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3951 if ns_status
== "ERROR":
3952 raise ROclient
.ROClientException(ns_status_info
)
3953 elif ns_status
== "BUILD":
3954 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3955 elif ns_status
== "ACTIVE":
3956 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3957 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3962 ), "ROclient.check_action_status returns unknown {}".format(
3965 if stage
[2] != detailed_status_old
:
3966 detailed_status_old
= stage
[2]
3967 db_nsr_update
["detailed-status"] = " ".join(stage
)
3968 self
._write
_op
_status
(nslcmop_id
, stage
)
3969 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3970 await asyncio
.sleep(5, loop
=self
.loop
)
3972 else: # delete_timeout <= 0:
3973 raise ROclient
.ROClientException(
3974 "Timeout waiting ns deleted from VIM"
3977 except Exception as e
:
3978 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3980 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3982 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3983 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3984 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3986 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
3989 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3991 failed_detail
.append("delete conflict: {}".format(e
))
3994 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
3997 failed_detail
.append("delete error: {}".format(e
))
3999 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4003 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4004 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4006 stage
[2] = "Deleting nsd from RO."
4007 db_nsr_update
["detailed-status"] = " ".join(stage
)
4008 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4009 self
._write
_op
_status
(nslcmop_id
, stage
)
4010 await self
.RO
.delete("nsd", ro_nsd_id
)
4012 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4014 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4015 except Exception as e
:
4017 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4019 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4021 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4024 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4026 failed_detail
.append(
4027 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4029 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4031 failed_detail
.append(
4032 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4034 self
.logger
.error(logging_text
+ failed_detail
[-1])
4036 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4037 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4038 if not vnf_deployed
or not vnf_deployed
["id"]:
4041 ro_vnfd_id
= vnf_deployed
["id"]
4044 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4045 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4047 db_nsr_update
["detailed-status"] = " ".join(stage
)
4048 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4049 self
._write
_op
_status
(nslcmop_id
, stage
)
4050 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4052 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4054 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4055 except Exception as e
:
4057 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4060 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4064 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4067 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4069 failed_detail
.append(
4070 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4072 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4074 failed_detail
.append(
4075 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4077 self
.logger
.error(logging_text
+ failed_detail
[-1])
4080 stage
[2] = "Error deleting from VIM"
4082 stage
[2] = "Deleted from VIM"
4083 db_nsr_update
["detailed-status"] = " ".join(stage
)
4084 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4085 self
._write
_op
_status
(nslcmop_id
, stage
)
4088 raise LcmException("; ".join(failed_detail
))
4090 async def terminate(self
, nsr_id
, nslcmop_id
):
4091 # Try to lock HA task here
4092 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4093 if not task_is_locked_by_me
:
4096 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4097 self
.logger
.debug(logging_text
+ "Enter")
4098 timeout_ns_terminate
= self
.timeout_ns_terminate
4101 operation_params
= None
4103 error_list
= [] # annotates all failed error messages
4104 db_nslcmop_update
= {}
4105 autoremove
= False # autoremove after terminated
4106 tasks_dict_info
= {}
4109 "Stage 1/3: Preparing task.",
4110 "Waiting for previous operations to terminate.",
4113 # ^ contains [stage, step, VIM-status]
4115 # wait for any previous tasks in process
4116 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4118 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4119 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4120 operation_params
= db_nslcmop
.get("operationParams") or {}
4121 if operation_params
.get("timeout_ns_terminate"):
4122 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4123 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4124 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4126 db_nsr_update
["operational-status"] = "terminating"
4127 db_nsr_update
["config-status"] = "terminating"
4128 self
._write
_ns
_status
(
4130 ns_state
="TERMINATING",
4131 current_operation
="TERMINATING",
4132 current_operation_id
=nslcmop_id
,
4133 other_update
=db_nsr_update
,
4135 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4136 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4137 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4140 stage
[1] = "Getting vnf descriptors from db."
4141 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4143 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4145 db_vnfds_from_id
= {}
4146 db_vnfds_from_member_index
= {}
4148 for vnfr
in db_vnfrs_list
:
4149 vnfd_id
= vnfr
["vnfd-id"]
4150 if vnfd_id
not in db_vnfds_from_id
:
4151 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4152 db_vnfds_from_id
[vnfd_id
] = vnfd
4153 db_vnfds_from_member_index
[
4154 vnfr
["member-vnf-index-ref"]
4155 ] = db_vnfds_from_id
[vnfd_id
]
4157 # Destroy individual execution environments when there are terminating primitives.
4158 # Rest of EE will be deleted at once
4159 # TODO - check before calling _destroy_N2VC
4160 # if not operation_params.get("skip_terminate_primitives"):#
4161 # or not vca.get("needed_terminate"):
4162 stage
[0] = "Stage 2/3 execute terminating primitives."
4163 self
.logger
.debug(logging_text
+ stage
[0])
4164 stage
[1] = "Looking execution environment that needs terminate."
4165 self
.logger
.debug(logging_text
+ stage
[1])
4167 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4168 config_descriptor
= None
4169 vca_member_vnf_index
= vca
.get("member-vnf-index")
4170 vca_id
= self
.get_vca_id(
4171 db_vnfrs_dict
.get(vca_member_vnf_index
)
4172 if vca_member_vnf_index
4176 if not vca
or not vca
.get("ee_id"):
4178 if not vca
.get("member-vnf-index"):
4180 config_descriptor
= db_nsr
.get("ns-configuration")
4181 elif vca
.get("vdu_id"):
4182 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4183 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4184 elif vca
.get("kdu_name"):
4185 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4186 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4188 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4189 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4190 vca_type
= vca
.get("type")
4191 exec_terminate_primitives
= not operation_params
.get(
4192 "skip_terminate_primitives"
4193 ) and vca
.get("needed_terminate")
4194 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4195 # pending native charms
4197 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4199 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4200 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4201 task
= asyncio
.ensure_future(
4209 exec_terminate_primitives
,
4213 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4215 # wait for pending tasks of terminate primitives
4219 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4221 error_list
= await self
._wait
_for
_tasks
(
4224 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4228 tasks_dict_info
.clear()
4230 return # raise LcmException("; ".join(error_list))
4232 # remove All execution environments at once
4233 stage
[0] = "Stage 3/3 delete all."
4235 if nsr_deployed
.get("VCA"):
4236 stage
[1] = "Deleting all execution environments."
4237 self
.logger
.debug(logging_text
+ stage
[1])
4238 vca_id
= self
.get_vca_id({}, db_nsr
)
4239 task_delete_ee
= asyncio
.ensure_future(
4241 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4242 timeout
=self
.timeout_charm_delete
,
4245 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4246 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4248 # Delete from k8scluster
4249 stage
[1] = "Deleting KDUs."
4250 self
.logger
.debug(logging_text
+ stage
[1])
4251 # print(nsr_deployed)
4252 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4253 if not kdu
or not kdu
.get("kdu-instance"):
4255 kdu_instance
= kdu
.get("kdu-instance")
4256 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4257 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4258 vca_id
= self
.get_vca_id({}, db_nsr
)
4259 task_delete_kdu_instance
= asyncio
.ensure_future(
4260 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4261 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4262 kdu_instance
=kdu_instance
,
4269 + "Unknown k8s deployment type {}".format(
4270 kdu
.get("k8scluster-type")
4275 task_delete_kdu_instance
4276 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4279 stage
[1] = "Deleting ns from VIM."
4281 task_delete_ro
= asyncio
.ensure_future(
4282 self
._terminate
_ng
_ro
(
4283 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4287 task_delete_ro
= asyncio
.ensure_future(
4289 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4292 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4294 # rest of staff will be done at finally
4297 ROclient
.ROClientException
,
4302 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4304 except asyncio
.CancelledError
:
4306 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4308 exc
= "Operation was cancelled"
4309 except Exception as e
:
4310 exc
= traceback
.format_exc()
4311 self
.logger
.critical(
4312 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4317 error_list
.append(str(exc
))
4319 # wait for pending tasks
4321 stage
[1] = "Waiting for terminate pending tasks."
4322 self
.logger
.debug(logging_text
+ stage
[1])
4323 error_list
+= await self
._wait
_for
_tasks
(
4326 timeout_ns_terminate
,
4330 stage
[1] = stage
[2] = ""
4331 except asyncio
.CancelledError
:
4332 error_list
.append("Cancelled")
4333 # TODO cancell all tasks
4334 except Exception as exc
:
4335 error_list
.append(str(exc
))
4336 # update status at database
4338 error_detail
= "; ".join(error_list
)
4339 # self.logger.error(logging_text + error_detail)
4340 error_description_nslcmop
= "{} Detail: {}".format(
4341 stage
[0], error_detail
4343 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4344 nslcmop_id
, stage
[0]
4347 db_nsr_update
["operational-status"] = "failed"
4348 db_nsr_update
["detailed-status"] = (
4349 error_description_nsr
+ " Detail: " + error_detail
4351 db_nslcmop_update
["detailed-status"] = error_detail
4352 nslcmop_operation_state
= "FAILED"
4356 error_description_nsr
= error_description_nslcmop
= None
4357 ns_state
= "NOT_INSTANTIATED"
4358 db_nsr_update
["operational-status"] = "terminated"
4359 db_nsr_update
["detailed-status"] = "Done"
4360 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4361 db_nslcmop_update
["detailed-status"] = "Done"
4362 nslcmop_operation_state
= "COMPLETED"
4365 self
._write
_ns
_status
(
4368 current_operation
="IDLE",
4369 current_operation_id
=None,
4370 error_description
=error_description_nsr
,
4371 error_detail
=error_detail
,
4372 other_update
=db_nsr_update
,
4374 self
._write
_op
_status
(
4377 error_message
=error_description_nslcmop
,
4378 operation_state
=nslcmop_operation_state
,
4379 other_update
=db_nslcmop_update
,
4381 if ns_state
== "NOT_INSTANTIATED":
4385 {"nsr-id-ref": nsr_id
},
4386 {"_admin.nsState": "NOT_INSTANTIATED"},
4388 except DbException
as e
:
4391 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4395 if operation_params
:
4396 autoremove
= operation_params
.get("autoremove", False)
4397 if nslcmop_operation_state
:
4399 await self
.msg
.aiowrite(
4404 "nslcmop_id": nslcmop_id
,
4405 "operationState": nslcmop_operation_state
,
4406 "autoremove": autoremove
,
4410 except Exception as e
:
4412 logging_text
+ "kafka_write notification Exception {}".format(e
)
4415 self
.logger
.debug(logging_text
+ "Exit")
4416 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4418 async def _wait_for_tasks(
4419 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4422 error_detail_list
= []
4424 pending_tasks
= list(created_tasks_info
.keys())
4425 num_tasks
= len(pending_tasks
)
4427 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4428 self
._write
_op
_status
(nslcmop_id
, stage
)
4429 while pending_tasks
:
4431 _timeout
= timeout
+ time_start
- time()
4432 done
, pending_tasks
= await asyncio
.wait(
4433 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4435 num_done
+= len(done
)
4436 if not done
: # Timeout
4437 for task
in pending_tasks
:
4438 new_error
= created_tasks_info
[task
] + ": Timeout"
4439 error_detail_list
.append(new_error
)
4440 error_list
.append(new_error
)
4443 if task
.cancelled():
4446 exc
= task
.exception()
4448 if isinstance(exc
, asyncio
.TimeoutError
):
4450 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4451 error_list
.append(created_tasks_info
[task
])
4452 error_detail_list
.append(new_error
)
4459 ROclient
.ROClientException
,
4465 self
.logger
.error(logging_text
+ new_error
)
4467 exc_traceback
= "".join(
4468 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4472 + created_tasks_info
[task
]
4478 logging_text
+ created_tasks_info
[task
] + ": Done"
4480 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4482 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4483 if nsr_id
: # update also nsr
4488 "errorDescription": "Error at: " + ", ".join(error_list
),
4489 "errorDetail": ". ".join(error_detail_list
),
4492 self
._write
_op
_status
(nslcmop_id
, stage
)
4493 return error_detail_list
4496 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4498 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4499 The default-value is used. If it is between < > it look for a value at instantiation_params
4500 :param primitive_desc: portion of VNFD/NSD that describes primitive
4501 :param params: Params provided by user
4502 :param instantiation_params: Instantiation params provided by user
4503 :return: a dictionary with the calculated params
4505 calculated_params
= {}
4506 for parameter
in primitive_desc
.get("parameter", ()):
4507 param_name
= parameter
["name"]
4508 if param_name
in params
:
4509 calculated_params
[param_name
] = params
[param_name
]
4510 elif "default-value" in parameter
or "value" in parameter
:
4511 if "value" in parameter
:
4512 calculated_params
[param_name
] = parameter
["value"]
4514 calculated_params
[param_name
] = parameter
["default-value"]
4516 isinstance(calculated_params
[param_name
], str)
4517 and calculated_params
[param_name
].startswith("<")
4518 and calculated_params
[param_name
].endswith(">")
4520 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4521 calculated_params
[param_name
] = instantiation_params
[
4522 calculated_params
[param_name
][1:-1]
4526 "Parameter {} needed to execute primitive {} not provided".format(
4527 calculated_params
[param_name
], primitive_desc
["name"]
4532 "Parameter {} needed to execute primitive {} not provided".format(
4533 param_name
, primitive_desc
["name"]
4537 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4538 calculated_params
[param_name
] = yaml
.safe_dump(
4539 calculated_params
[param_name
], default_flow_style
=True, width
=256
4541 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4543 ].startswith("!!yaml "):
4544 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4545 if parameter
.get("data-type") == "INTEGER":
4547 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4548 except ValueError: # error converting string to int
4550 "Parameter {} of primitive {} must be integer".format(
4551 param_name
, primitive_desc
["name"]
4554 elif parameter
.get("data-type") == "BOOLEAN":
4555 calculated_params
[param_name
] = not (
4556 (str(calculated_params
[param_name
])).lower() == "false"
4559 # add always ns_config_info if primitive name is config
4560 if primitive_desc
["name"] == "config":
4561 if "ns_config_info" in instantiation_params
:
4562 calculated_params
["ns_config_info"] = instantiation_params
[
4565 return calculated_params
4567 def _look_for_deployed_vca(
4574 ee_descriptor_id
=None,
4576 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4577 for vca
in deployed_vca
:
4580 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4583 vdu_count_index
is not None
4584 and vdu_count_index
!= vca
["vdu_count_index"]
4587 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4589 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4593 # vca_deployed not found
4595 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4596 " is not deployed".format(
4605 ee_id
= vca
.get("ee_id")
4607 "type", "lxc_proxy_charm"
4608 ) # default value for backward compatibility - proxy charm
4611 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4612 "execution environment".format(
4613 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4616 return ee_id
, vca_type
4618 async def _ns_execute_primitive(
4624 retries_interval
=30,
4631 if primitive
== "config":
4632 primitive_params
= {"params": primitive_params
}
4634 vca_type
= vca_type
or "lxc_proxy_charm"
4638 output
= await asyncio
.wait_for(
4639 self
.vca_map
[vca_type
].exec_primitive(
4641 primitive_name
=primitive
,
4642 params_dict
=primitive_params
,
4643 progress_timeout
=self
.timeout_progress_primitive
,
4644 total_timeout
=self
.timeout_primitive
,
4649 timeout
=timeout
or self
.timeout_primitive
,
4653 except asyncio
.CancelledError
:
4655 except Exception as e
: # asyncio.TimeoutError
4656 if isinstance(e
, asyncio
.TimeoutError
):
4661 "Error executing action {} on {} -> {}".format(
4666 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4668 return "FAILED", str(e
)
4670 return "COMPLETED", output
4672 except (LcmException
, asyncio
.CancelledError
):
4674 except Exception as e
:
4675 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4677 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4679 Updating the vca_status with latest juju information in nsrs record
4680 :param: nsr_id: Id of the nsr
4681 :param: nslcmop_id: Id of the nslcmop
4685 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4686 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4687 vca_id
= self
.get_vca_id({}, db_nsr
)
4688 if db_nsr
["_admin"]["deployed"]["K8s"]:
4689 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4690 cluster_uuid
, kdu_instance
, cluster_type
= (
4691 k8s
["k8scluster-uuid"],
4692 k8s
["kdu-instance"],
4693 k8s
["k8scluster-type"],
4695 await self
._on
_update
_k
8s
_db
(
4696 cluster_uuid
=cluster_uuid
,
4697 kdu_instance
=kdu_instance
,
4698 filter={"_id": nsr_id
},
4700 cluster_type
=cluster_type
,
4703 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4704 table
, filter = "nsrs", {"_id": nsr_id
}
4705 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4706 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4708 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4709 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4711 async def action(self
, nsr_id
, nslcmop_id
):
4712 # Try to lock HA task here
4713 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4714 if not task_is_locked_by_me
:
4717 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4718 self
.logger
.debug(logging_text
+ "Enter")
4719 # get all needed from database
4723 db_nslcmop_update
= {}
4724 nslcmop_operation_state
= None
4725 error_description_nslcmop
= None
4728 # wait for any previous tasks in process
4729 step
= "Waiting for previous operations to terminate"
4730 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4732 self
._write
_ns
_status
(
4735 current_operation
="RUNNING ACTION",
4736 current_operation_id
=nslcmop_id
,
4739 step
= "Getting information from database"
4740 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4741 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4742 if db_nslcmop
["operationParams"].get("primitive_params"):
4743 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4744 db_nslcmop
["operationParams"]["primitive_params"]
4747 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4748 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4749 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4750 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4751 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4752 primitive
= db_nslcmop
["operationParams"]["primitive"]
4753 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4754 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4755 "timeout_ns_action", self
.timeout_primitive
4759 step
= "Getting vnfr from database"
4760 db_vnfr
= self
.db
.get_one(
4761 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4763 if db_vnfr
.get("kdur"):
4765 for kdur
in db_vnfr
["kdur"]:
4766 if kdur
.get("additionalParams"):
4767 kdur
["additionalParams"] = json
.loads(
4768 kdur
["additionalParams"]
4770 kdur_list
.append(kdur
)
4771 db_vnfr
["kdur"] = kdur_list
4772 step
= "Getting vnfd from database"
4773 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4775 step
= "Getting nsd from database"
4776 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4778 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4779 # for backward compatibility
4780 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4781 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4782 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4783 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4785 # look for primitive
4786 config_primitive_desc
= descriptor_configuration
= None
4788 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4790 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4792 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4794 descriptor_configuration
= db_nsd
.get("ns-configuration")
4796 if descriptor_configuration
and descriptor_configuration
.get(
4799 for config_primitive
in descriptor_configuration
["config-primitive"]:
4800 if config_primitive
["name"] == primitive
:
4801 config_primitive_desc
= config_primitive
4804 if not config_primitive_desc
:
4805 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4807 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4811 primitive_name
= primitive
4812 ee_descriptor_id
= None
4814 primitive_name
= config_primitive_desc
.get(
4815 "execution-environment-primitive", primitive
4817 ee_descriptor_id
= config_primitive_desc
.get(
4818 "execution-environment-ref"
4824 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4826 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4829 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4831 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4833 desc_params
= parse_yaml_strings(
4834 db_vnfr
.get("additionalParamsForVnf")
4837 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4838 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4839 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4841 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4842 actions
.add(primitive
["name"])
4843 for primitive
in kdu_configuration
.get("config-primitive", []):
4844 actions
.add(primitive
["name"])
4845 kdu_action
= True if primitive_name
in actions
else False
4847 # TODO check if ns is in a proper status
4849 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4851 # kdur and desc_params already set from before
4852 if primitive_params
:
4853 desc_params
.update(primitive_params
)
4854 # TODO Check if we will need something at vnf level
4855 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4857 kdu_name
== kdu
["kdu-name"]
4858 and kdu
["member-vnf-index"] == vnf_index
4863 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4866 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4867 msg
= "unknown k8scluster-type '{}'".format(
4868 kdu
.get("k8scluster-type")
4870 raise LcmException(msg
)
4873 "collection": "nsrs",
4874 "filter": {"_id": nsr_id
},
4875 "path": "_admin.deployed.K8s.{}".format(index
),
4879 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4881 step
= "Executing kdu {}".format(primitive_name
)
4882 if primitive_name
== "upgrade":
4883 if desc_params
.get("kdu_model"):
4884 kdu_model
= desc_params
.get("kdu_model")
4885 del desc_params
["kdu_model"]
4887 kdu_model
= kdu
.get("kdu-model")
4888 parts
= kdu_model
.split(sep
=":")
4890 kdu_model
= parts
[0]
4892 detailed_status
= await asyncio
.wait_for(
4893 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4894 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4895 kdu_instance
=kdu
.get("kdu-instance"),
4897 kdu_model
=kdu_model
,
4900 timeout
=timeout_ns_action
,
4902 timeout
=timeout_ns_action
+ 10,
4905 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4907 elif primitive_name
== "rollback":
4908 detailed_status
= await asyncio
.wait_for(
4909 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4910 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4911 kdu_instance
=kdu
.get("kdu-instance"),
4914 timeout
=timeout_ns_action
,
4916 elif primitive_name
== "status":
4917 detailed_status
= await asyncio
.wait_for(
4918 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4919 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4920 kdu_instance
=kdu
.get("kdu-instance"),
4923 timeout
=timeout_ns_action
,
4926 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4927 kdu
["kdu-name"], nsr_id
4929 params
= self
._map
_primitive
_params
(
4930 config_primitive_desc
, primitive_params
, desc_params
4933 detailed_status
= await asyncio
.wait_for(
4934 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4935 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4936 kdu_instance
=kdu_instance
,
4937 primitive_name
=primitive_name
,
4940 timeout
=timeout_ns_action
,
4943 timeout
=timeout_ns_action
,
4947 nslcmop_operation_state
= "COMPLETED"
4949 detailed_status
= ""
4950 nslcmop_operation_state
= "FAILED"
4952 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4953 nsr_deployed
["VCA"],
4954 member_vnf_index
=vnf_index
,
4956 vdu_count_index
=vdu_count_index
,
4957 ee_descriptor_id
=ee_descriptor_id
,
4959 for vca_index
, vca_deployed
in enumerate(
4960 db_nsr
["_admin"]["deployed"]["VCA"]
4962 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4964 "collection": "nsrs",
4965 "filter": {"_id": nsr_id
},
4966 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4970 nslcmop_operation_state
,
4972 ) = await self
._ns
_execute
_primitive
(
4974 primitive
=primitive_name
,
4975 primitive_params
=self
._map
_primitive
_params
(
4976 config_primitive_desc
, primitive_params
, desc_params
4978 timeout
=timeout_ns_action
,
4984 db_nslcmop_update
["detailed-status"] = detailed_status
4985 error_description_nslcmop
= (
4986 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
4990 + " task Done with result {} {}".format(
4991 nslcmop_operation_state
, detailed_status
4994 return # database update is called inside finally
4996 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
4997 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4999 except asyncio
.CancelledError
:
5001 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5003 exc
= "Operation was cancelled"
5004 except asyncio
.TimeoutError
:
5005 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5007 except Exception as e
:
5008 exc
= traceback
.format_exc()
5009 self
.logger
.critical(
5010 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5019 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5020 nslcmop_operation_state
= "FAILED"
5022 self
._write
_ns
_status
(
5026 ], # TODO check if degraded. For the moment use previous status
5027 current_operation
="IDLE",
5028 current_operation_id
=None,
5029 # error_description=error_description_nsr,
5030 # error_detail=error_detail,
5031 other_update
=db_nsr_update
,
5034 self
._write
_op
_status
(
5037 error_message
=error_description_nslcmop
,
5038 operation_state
=nslcmop_operation_state
,
5039 other_update
=db_nslcmop_update
,
5042 if nslcmop_operation_state
:
5044 await self
.msg
.aiowrite(
5049 "nslcmop_id": nslcmop_id
,
5050 "operationState": nslcmop_operation_state
,
5054 except Exception as e
:
5056 logging_text
+ "kafka_write notification Exception {}".format(e
)
5058 self
.logger
.debug(logging_text
+ "Exit")
5059 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5060 return nslcmop_operation_state
, detailed_status
5062 async def scale(self
, nsr_id
, nslcmop_id
):
5063 # Try to lock HA task here
5064 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5065 if not task_is_locked_by_me
:
5068 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5069 stage
= ["", "", ""]
5070 tasks_dict_info
= {}
5071 # ^ stage, step, VIM progress
5072 self
.logger
.debug(logging_text
+ "Enter")
5073 # get all needed from database
5075 db_nslcmop_update
= {}
5078 # in case of error, indicates what part of scale was failed to put nsr at error status
5079 scale_process
= None
5080 old_operational_status
= ""
5081 old_config_status
= ""
5084 # wait for any previous tasks in process
5085 step
= "Waiting for previous operations to terminate"
5086 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5087 self
._write
_ns
_status
(
5090 current_operation
="SCALING",
5091 current_operation_id
=nslcmop_id
,
5094 step
= "Getting nslcmop from database"
5096 step
+ " after having waited for previous tasks to be completed"
5098 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5100 step
= "Getting nsr from database"
5101 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5102 old_operational_status
= db_nsr
["operational-status"]
5103 old_config_status
= db_nsr
["config-status"]
5105 step
= "Parsing scaling parameters"
5106 db_nsr_update
["operational-status"] = "scaling"
5107 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5108 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5110 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5112 ]["member-vnf-index"]
5113 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5115 ]["scaling-group-descriptor"]
5116 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5117 # for backward compatibility
5118 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5119 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5120 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5121 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5123 step
= "Getting vnfr from database"
5124 db_vnfr
= self
.db
.get_one(
5125 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5128 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5130 step
= "Getting vnfd from database"
5131 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5133 base_folder
= db_vnfd
["_admin"]["storage"]
5135 step
= "Getting scaling-group-descriptor"
5136 scaling_descriptor
= find_in_list(
5137 get_scaling_aspect(db_vnfd
),
5138 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5140 if not scaling_descriptor
:
5142 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5143 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5146 step
= "Sending scale order to VIM"
5147 # TODO check if ns is in a proper status
5149 if not db_nsr
["_admin"].get("scaling-group"):
5154 "_admin.scaling-group": [
5155 {"name": scaling_group
, "nb-scale-op": 0}
5159 admin_scale_index
= 0
5161 for admin_scale_index
, admin_scale_info
in enumerate(
5162 db_nsr
["_admin"]["scaling-group"]
5164 if admin_scale_info
["name"] == scaling_group
:
5165 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5167 else: # not found, set index one plus last element and add new entry with the name
5168 admin_scale_index
+= 1
5170 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5173 vca_scaling_info
= []
5174 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5175 if scaling_type
== "SCALE_OUT":
5176 if "aspect-delta-details" not in scaling_descriptor
:
5178 "Aspect delta details not fount in scaling descriptor {}".format(
5179 scaling_descriptor
["name"]
5182 # count if max-instance-count is reached
5183 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5185 scaling_info
["scaling_direction"] = "OUT"
5186 scaling_info
["vdu-create"] = {}
5187 scaling_info
["kdu-create"] = {}
5188 for delta
in deltas
:
5189 for vdu_delta
in delta
.get("vdu-delta", {}):
5190 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5191 # vdu_index also provides the number of instance of the targeted vdu
5192 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5193 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5197 additional_params
= (
5198 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5201 cloud_init_list
= []
5203 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5204 max_instance_count
= 10
5205 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5206 max_instance_count
= vdu_profile
.get(
5207 "max-number-of-instances", 10
5210 default_instance_num
= get_number_of_instances(
5213 instances_number
= vdu_delta
.get("number-of-instances", 1)
5214 nb_scale_op
+= instances_number
5216 new_instance_count
= nb_scale_op
+ default_instance_num
5217 # Control if new count is over max and vdu count is less than max.
5218 # Then assign new instance count
5219 if new_instance_count
> max_instance_count
> vdu_count
:
5220 instances_number
= new_instance_count
- max_instance_count
5222 instances_number
= instances_number
5224 if new_instance_count
> max_instance_count
:
5226 "reached the limit of {} (max-instance-count) "
5227 "scaling-out operations for the "
5228 "scaling-group-descriptor '{}'".format(
5229 nb_scale_op
, scaling_group
5232 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5234 # TODO Information of its own ip is not available because db_vnfr is not updated.
5235 additional_params
["OSM"] = get_osm_params(
5236 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5238 cloud_init_list
.append(
5239 self
._parse
_cloud
_init
(
5246 vca_scaling_info
.append(
5248 "osm_vdu_id": vdu_delta
["id"],
5249 "member-vnf-index": vnf_index
,
5251 "vdu_index": vdu_index
+ x
,
5254 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5255 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5256 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5257 kdu_name
= kdu_profile
["kdu-name"]
5258 resource_name
= kdu_profile
["resource-name"]
5260 # Might have different kdus in the same delta
5261 # Should have list for each kdu
5262 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5263 scaling_info
["kdu-create"][kdu_name
] = []
5265 kdur
= get_kdur(db_vnfr
, kdu_name
)
5266 if kdur
.get("helm-chart"):
5267 k8s_cluster_type
= "helm-chart-v3"
5268 self
.logger
.debug("kdur: {}".format(kdur
))
5270 kdur
.get("helm-version")
5271 and kdur
.get("helm-version") == "v2"
5273 k8s_cluster_type
= "helm-chart"
5274 raise NotImplementedError
5275 elif kdur
.get("juju-bundle"):
5276 k8s_cluster_type
= "juju-bundle"
5279 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5280 "juju-bundle. Maybe an old NBI version is running".format(
5281 db_vnfr
["member-vnf-index-ref"], kdu_name
5285 max_instance_count
= 10
5286 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5287 max_instance_count
= kdu_profile
.get(
5288 "max-number-of-instances", 10
5291 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5292 deployed_kdu
, _
= get_deployed_kdu(
5293 nsr_deployed
, kdu_name
, vnf_index
5295 if deployed_kdu
is None:
5297 "KDU '{}' for vnf '{}' not deployed".format(
5301 kdu_instance
= deployed_kdu
.get("kdu-instance")
5302 instance_num
= await self
.k8scluster_map
[
5304 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5305 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5306 "number-of-instances", 1
5309 # Control if new count is over max and instance_num is less than max.
5310 # Then assign max instance number to kdu replica count
5311 if kdu_replica_count
> max_instance_count
> instance_num
:
5312 kdu_replica_count
= max_instance_count
5313 if kdu_replica_count
> max_instance_count
:
5315 "reached the limit of {} (max-instance-count) "
5316 "scaling-out operations for the "
5317 "scaling-group-descriptor '{}'".format(
5318 instance_num
, scaling_group
5322 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5323 vca_scaling_info
.append(
5325 "osm_kdu_id": kdu_name
,
5326 "member-vnf-index": vnf_index
,
5328 "kdu_index": instance_num
+ x
- 1,
5331 scaling_info
["kdu-create"][kdu_name
].append(
5333 "member-vnf-index": vnf_index
,
5335 "k8s-cluster-type": k8s_cluster_type
,
5336 "resource-name": resource_name
,
5337 "scale": kdu_replica_count
,
5340 elif scaling_type
== "SCALE_IN":
5341 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5343 scaling_info
["scaling_direction"] = "IN"
5344 scaling_info
["vdu-delete"] = {}
5345 scaling_info
["kdu-delete"] = {}
5347 for delta
in deltas
:
5348 for vdu_delta
in delta
.get("vdu-delta", {}):
5349 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5350 min_instance_count
= 0
5351 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5352 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5353 min_instance_count
= vdu_profile
["min-number-of-instances"]
5355 default_instance_num
= get_number_of_instances(
5356 db_vnfd
, vdu_delta
["id"]
5358 instance_num
= vdu_delta
.get("number-of-instances", 1)
5359 nb_scale_op
-= instance_num
5361 new_instance_count
= nb_scale_op
+ default_instance_num
5363 if new_instance_count
< min_instance_count
< vdu_count
:
5364 instances_number
= min_instance_count
- new_instance_count
5366 instances_number
= instance_num
5368 if new_instance_count
< min_instance_count
:
5370 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5371 "scaling-group-descriptor '{}'".format(
5372 nb_scale_op
, scaling_group
5375 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5376 vca_scaling_info
.append(
5378 "osm_vdu_id": vdu_delta
["id"],
5379 "member-vnf-index": vnf_index
,
5381 "vdu_index": vdu_index
- 1 - x
,
5384 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5385 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5386 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5387 kdu_name
= kdu_profile
["kdu-name"]
5388 resource_name
= kdu_profile
["resource-name"]
5390 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5391 scaling_info
["kdu-delete"][kdu_name
] = []
5393 kdur
= get_kdur(db_vnfr
, kdu_name
)
5394 if kdur
.get("helm-chart"):
5395 k8s_cluster_type
= "helm-chart-v3"
5396 self
.logger
.debug("kdur: {}".format(kdur
))
5398 kdur
.get("helm-version")
5399 and kdur
.get("helm-version") == "v2"
5401 k8s_cluster_type
= "helm-chart"
5402 raise NotImplementedError
5403 elif kdur
.get("juju-bundle"):
5404 k8s_cluster_type
= "juju-bundle"
5407 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5408 "juju-bundle. Maybe an old NBI version is running".format(
5409 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5413 min_instance_count
= 0
5414 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5415 min_instance_count
= kdu_profile
["min-number-of-instances"]
5417 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5418 deployed_kdu
, _
= get_deployed_kdu(
5419 nsr_deployed
, kdu_name
, vnf_index
5421 if deployed_kdu
is None:
5423 "KDU '{}' for vnf '{}' not deployed".format(
5427 kdu_instance
= deployed_kdu
.get("kdu-instance")
5428 instance_num
= await self
.k8scluster_map
[
5430 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5431 kdu_replica_count
= instance_num
- kdu_delta
.get(
5432 "number-of-instances", 1
5435 if kdu_replica_count
< min_instance_count
< instance_num
:
5436 kdu_replica_count
= min_instance_count
5437 if kdu_replica_count
< min_instance_count
:
5439 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5440 "scaling-group-descriptor '{}'".format(
5441 instance_num
, scaling_group
5445 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5446 vca_scaling_info
.append(
5448 "osm_kdu_id": kdu_name
,
5449 "member-vnf-index": vnf_index
,
5451 "kdu_index": instance_num
- x
- 1,
5454 scaling_info
["kdu-delete"][kdu_name
].append(
5456 "member-vnf-index": vnf_index
,
5458 "k8s-cluster-type": k8s_cluster_type
,
5459 "resource-name": resource_name
,
5460 "scale": kdu_replica_count
,
5464 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5465 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5466 if scaling_info
["scaling_direction"] == "IN":
5467 for vdur
in reversed(db_vnfr
["vdur"]):
5468 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5469 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5470 scaling_info
["vdu"].append(
5472 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5473 "vdu_id": vdur
["vdu-id-ref"],
5477 for interface
in vdur
["interfaces"]:
5478 scaling_info
["vdu"][-1]["interface"].append(
5480 "name": interface
["name"],
5481 "ip_address": interface
["ip-address"],
5482 "mac_address": interface
.get("mac-address"),
5485 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5488 step
= "Executing pre-scale vnf-config-primitive"
5489 if scaling_descriptor
.get("scaling-config-action"):
5490 for scaling_config_action
in scaling_descriptor
[
5491 "scaling-config-action"
5494 scaling_config_action
.get("trigger") == "pre-scale-in"
5495 and scaling_type
== "SCALE_IN"
5497 scaling_config_action
.get("trigger") == "pre-scale-out"
5498 and scaling_type
== "SCALE_OUT"
5500 vnf_config_primitive
= scaling_config_action
[
5501 "vnf-config-primitive-name-ref"
5503 step
= db_nslcmop_update
[
5505 ] = "executing pre-scale scaling-config-action '{}'".format(
5506 vnf_config_primitive
5509 # look for primitive
5510 for config_primitive
in (
5511 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5512 ).get("config-primitive", ()):
5513 if config_primitive
["name"] == vnf_config_primitive
:
5517 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5518 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5519 "primitive".format(scaling_group
, vnf_config_primitive
)
5522 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5523 if db_vnfr
.get("additionalParamsForVnf"):
5524 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5526 scale_process
= "VCA"
5527 db_nsr_update
["config-status"] = "configuring pre-scaling"
5528 primitive_params
= self
._map
_primitive
_params
(
5529 config_primitive
, {}, vnfr_params
5532 # Pre-scale retry check: Check if this sub-operation has been executed before
5533 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5536 vnf_config_primitive
,
5540 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5541 # Skip sub-operation
5542 result
= "COMPLETED"
5543 result_detail
= "Done"
5546 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5547 vnf_config_primitive
, result
, result_detail
5551 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5552 # New sub-operation: Get index of this sub-operation
5554 len(db_nslcmop
.get("_admin", {}).get("operations"))
5559 + "vnf_config_primitive={} New sub-operation".format(
5560 vnf_config_primitive
5564 # retry: Get registered params for this existing sub-operation
5565 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5568 vnf_index
= op
.get("member_vnf_index")
5569 vnf_config_primitive
= op
.get("primitive")
5570 primitive_params
= op
.get("primitive_params")
5573 + "vnf_config_primitive={} Sub-operation retry".format(
5574 vnf_config_primitive
5577 # Execute the primitive, either with new (first-time) or registered (reintent) args
5578 ee_descriptor_id
= config_primitive
.get(
5579 "execution-environment-ref"
5581 primitive_name
= config_primitive
.get(
5582 "execution-environment-primitive", vnf_config_primitive
5584 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5585 nsr_deployed
["VCA"],
5586 member_vnf_index
=vnf_index
,
5588 vdu_count_index
=None,
5589 ee_descriptor_id
=ee_descriptor_id
,
5591 result
, result_detail
= await self
._ns
_execute
_primitive
(
5600 + "vnf_config_primitive={} Done with result {} {}".format(
5601 vnf_config_primitive
, result
, result_detail
5604 # Update operationState = COMPLETED | FAILED
5605 self
._update
_suboperation
_status
(
5606 db_nslcmop
, op_index
, result
, result_detail
5609 if result
== "FAILED":
5610 raise LcmException(result_detail
)
5611 db_nsr_update
["config-status"] = old_config_status
5612 scale_process
= None
5616 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5619 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5622 # SCALE-IN VCA - BEGIN
5623 if vca_scaling_info
:
5624 step
= db_nslcmop_update
[
5626 ] = "Deleting the execution environments"
5627 scale_process
= "VCA"
5628 for vca_info
in vca_scaling_info
:
5629 if vca_info
["type"] == "delete":
5630 member_vnf_index
= str(vca_info
["member-vnf-index"])
5632 logging_text
+ "vdu info: {}".format(vca_info
)
5634 if vca_info
.get("osm_vdu_id"):
5635 vdu_id
= vca_info
["osm_vdu_id"]
5636 vdu_index
= int(vca_info
["vdu_index"])
5639 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5640 member_vnf_index
, vdu_id
, vdu_index
5644 kdu_id
= vca_info
["osm_kdu_id"]
5647 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5648 member_vnf_index
, kdu_id
, vdu_index
5650 stage
[2] = step
= "Scaling in VCA"
5651 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5652 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5653 config_update
= db_nsr
["configurationStatus"]
5654 for vca_index
, vca
in enumerate(vca_update
):
5656 (vca
or vca
.get("ee_id"))
5657 and vca
["member-vnf-index"] == member_vnf_index
5658 and vca
["vdu_count_index"] == vdu_index
5660 if vca
.get("vdu_id"):
5661 config_descriptor
= get_configuration(
5662 db_vnfd
, vca
.get("vdu_id")
5664 elif vca
.get("kdu_name"):
5665 config_descriptor
= get_configuration(
5666 db_vnfd
, vca
.get("kdu_name")
5669 config_descriptor
= get_configuration(
5670 db_vnfd
, db_vnfd
["id"]
5672 operation_params
= (
5673 db_nslcmop
.get("operationParams") or {}
5675 exec_terminate_primitives
= not operation_params
.get(
5676 "skip_terminate_primitives"
5677 ) and vca
.get("needed_terminate")
5678 task
= asyncio
.ensure_future(
5687 exec_primitives
=exec_terminate_primitives
,
5691 timeout
=self
.timeout_charm_delete
,
5694 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5697 del vca_update
[vca_index
]
5698 del config_update
[vca_index
]
5699 # wait for pending tasks of terminate primitives
5703 + "Waiting for tasks {}".format(
5704 list(tasks_dict_info
.keys())
5707 error_list
= await self
._wait
_for
_tasks
(
5711 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5716 tasks_dict_info
.clear()
5718 raise LcmException("; ".join(error_list
))
5720 db_vca_and_config_update
= {
5721 "_admin.deployed.VCA": vca_update
,
5722 "configurationStatus": config_update
,
5725 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5727 scale_process
= None
5728 # SCALE-IN VCA - END
5731 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5732 scale_process
= "RO"
5733 if self
.ro_config
.get("ng"):
5734 await self
._scale
_ng
_ro
(
5735 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5737 scaling_info
.pop("vdu-create", None)
5738 scaling_info
.pop("vdu-delete", None)
5740 scale_process
= None
5744 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5745 scale_process
= "KDU"
5746 await self
._scale
_kdu
(
5747 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5749 scaling_info
.pop("kdu-create", None)
5750 scaling_info
.pop("kdu-delete", None)
5752 scale_process
= None
5756 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5758 # SCALE-UP VCA - BEGIN
5759 if vca_scaling_info
:
5760 step
= db_nslcmop_update
[
5762 ] = "Creating new execution environments"
5763 scale_process
= "VCA"
5764 for vca_info
in vca_scaling_info
:
5765 if vca_info
["type"] == "create":
5766 member_vnf_index
= str(vca_info
["member-vnf-index"])
5768 logging_text
+ "vdu info: {}".format(vca_info
)
5770 vnfd_id
= db_vnfr
["vnfd-ref"]
5771 if vca_info
.get("osm_vdu_id"):
5772 vdu_index
= int(vca_info
["vdu_index"])
5773 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5774 if db_vnfr
.get("additionalParamsForVnf"):
5775 deploy_params
.update(
5777 db_vnfr
["additionalParamsForVnf"].copy()
5780 descriptor_config
= get_configuration(
5781 db_vnfd
, db_vnfd
["id"]
5783 if descriptor_config
:
5788 logging_text
=logging_text
5789 + "member_vnf_index={} ".format(member_vnf_index
),
5792 nslcmop_id
=nslcmop_id
,
5798 member_vnf_index
=member_vnf_index
,
5799 vdu_index
=vdu_index
,
5801 deploy_params
=deploy_params
,
5802 descriptor_config
=descriptor_config
,
5803 base_folder
=base_folder
,
5804 task_instantiation_info
=tasks_dict_info
,
5807 vdu_id
= vca_info
["osm_vdu_id"]
5808 vdur
= find_in_list(
5809 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5811 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5812 if vdur
.get("additionalParams"):
5813 deploy_params_vdu
= parse_yaml_strings(
5814 vdur
["additionalParams"]
5817 deploy_params_vdu
= deploy_params
5818 deploy_params_vdu
["OSM"] = get_osm_params(
5819 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5821 if descriptor_config
:
5826 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5827 member_vnf_index
, vdu_id
, vdu_index
5829 stage
[2] = step
= "Scaling out VCA"
5830 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5832 logging_text
=logging_text
5833 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5834 member_vnf_index
, vdu_id
, vdu_index
5838 nslcmop_id
=nslcmop_id
,
5844 member_vnf_index
=member_vnf_index
,
5845 vdu_index
=vdu_index
,
5847 deploy_params
=deploy_params_vdu
,
5848 descriptor_config
=descriptor_config
,
5849 base_folder
=base_folder
,
5850 task_instantiation_info
=tasks_dict_info
,
5854 kdu_name
= vca_info
["osm_kdu_id"]
5855 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5856 if descriptor_config
:
5858 kdu_index
= int(vca_info
["kdu_index"])
5862 for x
in db_vnfr
["kdur"]
5863 if x
["kdu-name"] == kdu_name
5865 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5866 if kdur
.get("additionalParams"):
5867 deploy_params_kdu
= parse_yaml_strings(
5868 kdur
["additionalParams"]
5872 logging_text
=logging_text
,
5875 nslcmop_id
=nslcmop_id
,
5881 member_vnf_index
=member_vnf_index
,
5882 vdu_index
=kdu_index
,
5884 deploy_params
=deploy_params_kdu
,
5885 descriptor_config
=descriptor_config
,
5886 base_folder
=base_folder
,
5887 task_instantiation_info
=tasks_dict_info
,
5890 # SCALE-UP VCA - END
5891 scale_process
= None
5894 # execute primitive service POST-SCALING
5895 step
= "Executing post-scale vnf-config-primitive"
5896 if scaling_descriptor
.get("scaling-config-action"):
5897 for scaling_config_action
in scaling_descriptor
[
5898 "scaling-config-action"
5901 scaling_config_action
.get("trigger") == "post-scale-in"
5902 and scaling_type
== "SCALE_IN"
5904 scaling_config_action
.get("trigger") == "post-scale-out"
5905 and scaling_type
== "SCALE_OUT"
5907 vnf_config_primitive
= scaling_config_action
[
5908 "vnf-config-primitive-name-ref"
5910 step
= db_nslcmop_update
[
5912 ] = "executing post-scale scaling-config-action '{}'".format(
5913 vnf_config_primitive
5916 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5917 if db_vnfr
.get("additionalParamsForVnf"):
5918 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5920 # look for primitive
5921 for config_primitive
in (
5922 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5923 ).get("config-primitive", ()):
5924 if config_primitive
["name"] == vnf_config_primitive
:
5928 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5929 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5930 "config-primitive".format(
5931 scaling_group
, vnf_config_primitive
5934 scale_process
= "VCA"
5935 db_nsr_update
["config-status"] = "configuring post-scaling"
5936 primitive_params
= self
._map
_primitive
_params
(
5937 config_primitive
, {}, vnfr_params
5940 # Post-scale retry check: Check if this sub-operation has been executed before
5941 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5944 vnf_config_primitive
,
5948 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5949 # Skip sub-operation
5950 result
= "COMPLETED"
5951 result_detail
= "Done"
5954 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5955 vnf_config_primitive
, result
, result_detail
5959 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5960 # New sub-operation: Get index of this sub-operation
5962 len(db_nslcmop
.get("_admin", {}).get("operations"))
5967 + "vnf_config_primitive={} New sub-operation".format(
5968 vnf_config_primitive
5972 # retry: Get registered params for this existing sub-operation
5973 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5976 vnf_index
= op
.get("member_vnf_index")
5977 vnf_config_primitive
= op
.get("primitive")
5978 primitive_params
= op
.get("primitive_params")
5981 + "vnf_config_primitive={} Sub-operation retry".format(
5982 vnf_config_primitive
5985 # Execute the primitive, either with new (first-time) or registered (reintent) args
5986 ee_descriptor_id
= config_primitive
.get(
5987 "execution-environment-ref"
5989 primitive_name
= config_primitive
.get(
5990 "execution-environment-primitive", vnf_config_primitive
5992 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5993 nsr_deployed
["VCA"],
5994 member_vnf_index
=vnf_index
,
5996 vdu_count_index
=None,
5997 ee_descriptor_id
=ee_descriptor_id
,
5999 result
, result_detail
= await self
._ns
_execute
_primitive
(
6008 + "vnf_config_primitive={} Done with result {} {}".format(
6009 vnf_config_primitive
, result
, result_detail
6012 # Update operationState = COMPLETED | FAILED
6013 self
._update
_suboperation
_status
(
6014 db_nslcmop
, op_index
, result
, result_detail
6017 if result
== "FAILED":
6018 raise LcmException(result_detail
)
6019 db_nsr_update
["config-status"] = old_config_status
6020 scale_process
= None
6025 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6026 db_nsr_update
["operational-status"] = (
6028 if old_operational_status
== "failed"
6029 else old_operational_status
6031 db_nsr_update
["config-status"] = old_config_status
6034 ROclient
.ROClientException
,
6039 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6041 except asyncio
.CancelledError
:
6043 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6045 exc
= "Operation was cancelled"
6046 except Exception as e
:
6047 exc
= traceback
.format_exc()
6048 self
.logger
.critical(
6049 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6053 self
._write
_ns
_status
(
6056 current_operation
="IDLE",
6057 current_operation_id
=None,
6060 stage
[1] = "Waiting for instantiate pending tasks."
6061 self
.logger
.debug(logging_text
+ stage
[1])
6062 exc
= await self
._wait
_for
_tasks
(
6065 self
.timeout_ns_deploy
,
6073 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6074 nslcmop_operation_state
= "FAILED"
6076 db_nsr_update
["operational-status"] = old_operational_status
6077 db_nsr_update
["config-status"] = old_config_status
6078 db_nsr_update
["detailed-status"] = ""
6080 if "VCA" in scale_process
:
6081 db_nsr_update
["config-status"] = "failed"
6082 if "RO" in scale_process
:
6083 db_nsr_update
["operational-status"] = "failed"
6086 ] = "FAILED scaling nslcmop={} {}: {}".format(
6087 nslcmop_id
, step
, exc
6090 error_description_nslcmop
= None
6091 nslcmop_operation_state
= "COMPLETED"
6092 db_nslcmop_update
["detailed-status"] = "Done"
6094 self
._write
_op
_status
(
6097 error_message
=error_description_nslcmop
,
6098 operation_state
=nslcmop_operation_state
,
6099 other_update
=db_nslcmop_update
,
6102 self
._write
_ns
_status
(
6105 current_operation
="IDLE",
6106 current_operation_id
=None,
6107 other_update
=db_nsr_update
,
6110 if nslcmop_operation_state
:
6114 "nslcmop_id": nslcmop_id
,
6115 "operationState": nslcmop_operation_state
,
6117 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6118 except Exception as e
:
6120 logging_text
+ "kafka_write notification Exception {}".format(e
)
6122 self
.logger
.debug(logging_text
+ "Exit")
6123 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6125 async def _scale_kdu(
6126 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6128 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6129 for kdu_name
in _scaling_info
:
6130 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6131 deployed_kdu
, index
= get_deployed_kdu(
6132 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6134 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6135 kdu_instance
= deployed_kdu
["kdu-instance"]
6136 scale
= int(kdu_scaling_info
["scale"])
6137 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6140 "collection": "nsrs",
6141 "filter": {"_id": nsr_id
},
6142 "path": "_admin.deployed.K8s.{}".format(index
),
6145 step
= "scaling application {}".format(
6146 kdu_scaling_info
["resource-name"]
6148 self
.logger
.debug(logging_text
+ step
)
6150 if kdu_scaling_info
["type"] == "delete":
6151 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6154 and kdu_config
.get("terminate-config-primitive")
6155 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6157 terminate_config_primitive_list
= kdu_config
.get(
6158 "terminate-config-primitive"
6160 terminate_config_primitive_list
.sort(
6161 key
=lambda val
: int(val
["seq"])
6165 terminate_config_primitive
6166 ) in terminate_config_primitive_list
:
6167 primitive_params_
= self
._map
_primitive
_params
(
6168 terminate_config_primitive
, {}, {}
6170 step
= "execute terminate config primitive"
6171 self
.logger
.debug(logging_text
+ step
)
6172 await asyncio
.wait_for(
6173 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6174 cluster_uuid
=cluster_uuid
,
6175 kdu_instance
=kdu_instance
,
6176 primitive_name
=terminate_config_primitive
["name"],
6177 params
=primitive_params_
,
6184 await asyncio
.wait_for(
6185 self
.k8scluster_map
[k8s_cluster_type
].scale(
6188 kdu_scaling_info
["resource-name"],
6191 timeout
=self
.timeout_vca_on_error
,
6194 if kdu_scaling_info
["type"] == "create":
6195 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6198 and kdu_config
.get("initial-config-primitive")
6199 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6201 initial_config_primitive_list
= kdu_config
.get(
6202 "initial-config-primitive"
6204 initial_config_primitive_list
.sort(
6205 key
=lambda val
: int(val
["seq"])
6208 for initial_config_primitive
in initial_config_primitive_list
:
6209 primitive_params_
= self
._map
_primitive
_params
(
6210 initial_config_primitive
, {}, {}
6212 step
= "execute initial config primitive"
6213 self
.logger
.debug(logging_text
+ step
)
6214 await asyncio
.wait_for(
6215 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6216 cluster_uuid
=cluster_uuid
,
6217 kdu_instance
=kdu_instance
,
6218 primitive_name
=initial_config_primitive
["name"],
6219 params
=primitive_params_
,
6226 async def _scale_ng_ro(
6227 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6229 nsr_id
= db_nslcmop
["nsInstanceId"]
6230 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6233 # read from db: vnfd's for every vnf
6236 # for each vnf in ns, read vnfd
6237 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6238 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6239 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6240 # if we haven't this vnfd, read it from db
6241 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6243 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6244 db_vnfds
.append(vnfd
)
6245 n2vc_key
= self
.n2vc
.get_public_key()
6246 n2vc_key_list
= [n2vc_key
]
6249 vdu_scaling_info
.get("vdu-create"),
6250 vdu_scaling_info
.get("vdu-delete"),
6253 # db_vnfr has been updated, update db_vnfrs to use it
6254 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6255 await self
._instantiate
_ng
_ro
(
6265 start_deploy
=time(),
6266 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6268 if vdu_scaling_info
.get("vdu-delete"):
6270 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6273 async def add_prometheus_metrics(
6274 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6276 if not self
.prometheus
:
6278 # look if exist a file called 'prometheus*.j2' and
6279 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6283 for f
in artifact_content
6284 if f
.startswith("prometheus") and f
.endswith(".j2")
6290 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6294 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6295 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6297 vnfr_id
= vnfr_id
.replace("-", "")
6299 "JOB_NAME": vnfr_id
,
6300 "TARGET_IP": target_ip
,
6301 "EXPORTER_POD_IP": host_name
,
6302 "EXPORTER_POD_PORT": host_port
,
6304 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6305 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6306 for job
in job_list
:
6308 not isinstance(job
.get("job_name"), str)
6309 or vnfr_id
not in job
["job_name"]
6311 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6312 job
["nsr_id"] = nsr_id
6313 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6314 if await self
.prometheus
.update(job_dict
):
6315 return list(job_dict
.keys())
6317 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6319 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6321 :param: vim_account_id: VIM Account ID
6323 :return: (cloud_name, cloud_credential)
6325 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6326 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6328 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6330 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6332 :param: vim_account_id: VIM Account ID
6334 :return: (cloud_name, cloud_credential)
6336 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6337 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")