1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
33 from osm_lcm
import ROclient
34 from osm_lcm
.data_utils
.nsr
import get_deployed_kdu
35 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
36 from osm_lcm
.lcm_utils
import (
44 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
45 from osm_lcm
.data_utils
.vnfd
import (
48 get_ee_sorted_initial_config_primitive_list
,
49 get_ee_sorted_terminate_config_primitive_list
,
51 get_virtual_link_profiles
,
56 get_number_of_instances
,
60 from osm_lcm
.data_utils
.list_utils
import find_in_list
61 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
62 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
63 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
64 from n2vc
.k8s_helm_conn
import K8sHelmConnector
65 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
66 from n2vc
.k8s_juju_conn
import K8sJujuConnector
68 from osm_common
.dbbase
import DbException
69 from osm_common
.fsbase
import FsException
71 from osm_lcm
.data_utils
.database
.database
import Database
72 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
74 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
75 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
77 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
79 from copy
import copy
, deepcopy
81 from uuid
import uuid4
83 from random
import randint
85 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
89 timeout_vca_on_error
= (
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete
= 10 * 60
95 timeout_primitive
= 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive
= (
98 ) # timeout for some progress in a primitive execution
100 SUBOPERATION_STATUS_NOT_FOUND
= -1
101 SUBOPERATION_STATUS_NEW
= -2
102 SUBOPERATION_STATUS_SKIP
= -3
103 task_name_deploy_vca
= "Deploying VCA"
105 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
111 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
113 self
.db
= Database().instance
.db
114 self
.fs
= Filesystem().instance
.fs
116 self
.lcm_tasks
= lcm_tasks
117 self
.timeout
= config
["timeout"]
118 self
.ro_config
= config
["ro_config"]
119 self
.ng_ro
= config
["ro_config"].get("ng")
120 self
.vca_config
= config
["VCA"].copy()
122 # create N2VC connector
123 self
.n2vc
= N2VCJujuConnector(
126 on_update_db
=self
._on
_update
_n
2vc
_db
,
131 self
.conn_helm_ee
= LCMHelmConn(
134 vca_config
=self
.vca_config
,
135 on_update_db
=self
._on
_update
_n
2vc
_db
,
138 self
.k8sclusterhelm2
= K8sHelmConnector(
139 kubectl_command
=self
.vca_config
.get("kubectlpath"),
140 helm_command
=self
.vca_config
.get("helmpath"),
147 self
.k8sclusterhelm3
= K8sHelm3Connector(
148 kubectl_command
=self
.vca_config
.get("kubectlpath"),
149 helm_command
=self
.vca_config
.get("helm3path"),
156 self
.k8sclusterjuju
= K8sJujuConnector(
157 kubectl_command
=self
.vca_config
.get("kubectlpath"),
158 juju_command
=self
.vca_config
.get("jujupath"),
161 on_update_db
=self
._on
_update
_k
8s
_db
,
166 self
.k8scluster_map
= {
167 "helm-chart": self
.k8sclusterhelm2
,
168 "helm-chart-v3": self
.k8sclusterhelm3
,
169 "chart": self
.k8sclusterhelm3
,
170 "juju-bundle": self
.k8sclusterjuju
,
171 "juju": self
.k8sclusterjuju
,
175 "lxc_proxy_charm": self
.n2vc
,
176 "native_charm": self
.n2vc
,
177 "k8s_proxy_charm": self
.n2vc
,
178 "helm": self
.conn_helm_ee
,
179 "helm-v3": self
.conn_helm_ee
,
182 self
.prometheus
= prometheus
185 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
188 def increment_ip_mac(ip_mac
, vm_index
=1):
189 if not isinstance(ip_mac
, str):
192 # try with ipv4 look for last dot
193 i
= ip_mac
.rfind(".")
196 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i
= ip_mac
.rfind(":")
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
203 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
209 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
214 # TODO filter RO descriptor fields...
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict
["deploymentStatus"] = ro_descriptor
220 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
222 except Exception as e
:
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
227 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
229 # remove last dot from path (if exists)
230 if path
.endswith("."):
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
237 nsr_id
= filter.get("_id")
239 # read ns record from database
240 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
241 current_ns_status
= nsr
.get("nsState")
243 # get vca status for NS
244 status_dict
= await self
.n2vc
.get_status(
245 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
250 db_dict
["vcaStatus"] = status_dict
251 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
253 # update configurationStatus for this VCA
255 vca_index
= int(path
[path
.rfind(".") + 1 :])
258 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
260 vca_status
= vca_list
[vca_index
].get("status")
262 configuration_status_list
= nsr
.get("configurationStatus")
263 config_status
= configuration_status_list
[vca_index
].get("status")
265 if config_status
== "BROKEN" and vca_status
!= "failed":
266 db_dict
["configurationStatus"][vca_index
] = "READY"
267 elif config_status
!= "BROKEN" and vca_status
== "failed":
268 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
269 except Exception as e
:
270 # not update configurationStatus
271 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
276 if current_ns_status
in ("READY", "DEGRADED"):
277 error_description
= ""
279 if status_dict
.get("machines"):
280 for machine_id
in status_dict
.get("machines"):
281 machine
= status_dict
.get("machines").get(machine_id
)
282 # check machine agent-status
283 if machine
.get("agent-status"):
284 s
= machine
.get("agent-status").get("status")
287 error_description
+= (
288 "machine {} agent-status={} ; ".format(
292 # check machine instance status
293 if machine
.get("instance-status"):
294 s
= machine
.get("instance-status").get("status")
297 error_description
+= (
298 "machine {} instance-status={} ; ".format(
303 if status_dict
.get("applications"):
304 for app_id
in status_dict
.get("applications"):
305 app
= status_dict
.get("applications").get(app_id
)
306 # check application status
307 if app
.get("status"):
308 s
= app
.get("status").get("status")
311 error_description
+= (
312 "application {} status={} ; ".format(app_id
, s
)
315 if error_description
:
316 db_dict
["errorDescription"] = error_description
317 if current_ns_status
== "READY" and is_degraded
:
318 db_dict
["nsState"] = "DEGRADED"
319 if current_ns_status
== "DEGRADED" and not is_degraded
:
320 db_dict
["nsState"] = "READY"
323 self
.update_db_2("nsrs", nsr_id
, db_dict
)
325 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
327 except Exception as e
:
328 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
330 async def _on_update_k8s_db(
331 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
345 nsr_id
= filter.get("_id")
347 # get vca status for NS
348 vca_status
= await self
.k8sclusterjuju
.status_kdu(
351 complete_status
=True,
357 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
359 await self
.k8sclusterjuju
.update_vca_status(
360 db_dict
["vcaStatus"],
366 self
.update_db_2("nsrs", nsr_id
, db_dict
)
368 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
370 except Exception as e
:
371 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
374 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
376 env
= Environment(undefined
=StrictUndefined
)
377 template
= env
.from_string(cloud_init_text
)
378 return template
.render(additional_params
or {})
379 except UndefinedError
as e
:
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
385 except (TemplateError
, TemplateNotFound
) as e
:
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
392 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
393 cloud_init_content
= cloud_init_file
= None
395 if vdu
.get("cloud-init-file"):
396 base_folder
= vnfd
["_admin"]["storage"]
397 cloud_init_file
= "{}/{}/cloud_init/{}".format(
398 base_folder
["folder"],
399 base_folder
["pkg-dir"],
400 vdu
["cloud-init-file"],
402 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
403 cloud_init_content
= ci_file
.read()
404 elif vdu
.get("cloud-init"):
405 cloud_init_content
= vdu
["cloud-init"]
407 return cloud_init_content
408 except FsException
as e
:
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd
["id"], vdu
["id"], cloud_init_file
, e
415 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
417 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
420 additional_params
= vdur
.get("additionalParams")
421 return parse_yaml_strings(additional_params
)
423 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
425 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
426 :param vnfd: input vnfd
427 :param new_id: overrides vnf id if provided
428 :param additionalParams: Instantiation params for VNFs provided
429 :param nsrId: Id of the NSR
430 :return: copy of vnfd
432 vnfd_RO
= deepcopy(vnfd
)
433 # remove unused by RO configuration, monitoring, scaling and internal keys
434 vnfd_RO
.pop("_id", None)
435 vnfd_RO
.pop("_admin", None)
436 vnfd_RO
.pop("monitoring-param", None)
437 vnfd_RO
.pop("scaling-group-descriptor", None)
438 vnfd_RO
.pop("kdu", None)
439 vnfd_RO
.pop("k8s-cluster", None)
441 vnfd_RO
["id"] = new_id
443 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
444 for vdu
in get_iterable(vnfd_RO
, "vdu"):
445 vdu
.pop("cloud-init-file", None)
446 vdu
.pop("cloud-init", None)
450 def ip_profile_2_RO(ip_profile
):
451 RO_ip_profile
= deepcopy(ip_profile
)
452 if "dns-server" in RO_ip_profile
:
453 if isinstance(RO_ip_profile
["dns-server"], list):
454 RO_ip_profile
["dns-address"] = []
455 for ds
in RO_ip_profile
.pop("dns-server"):
456 RO_ip_profile
["dns-address"].append(ds
["address"])
458 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
459 if RO_ip_profile
.get("ip-version") == "ipv4":
460 RO_ip_profile
["ip-version"] = "IPv4"
461 if RO_ip_profile
.get("ip-version") == "ipv6":
462 RO_ip_profile
["ip-version"] = "IPv6"
463 if "dhcp-params" in RO_ip_profile
:
464 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
467 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
468 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
469 if db_vim
["_admin"]["operationalState"] != "ENABLED":
471 "VIM={} is not available. operationalState={}".format(
472 vim_account
, db_vim
["_admin"]["operationalState"]
475 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
478 def get_ro_wim_id_for_wim_account(self
, wim_account
):
479 if isinstance(wim_account
, str):
480 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
481 if db_wim
["_admin"]["operationalState"] != "ENABLED":
483 "WIM={} is not available. operationalState={}".format(
484 wim_account
, db_wim
["_admin"]["operationalState"]
487 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
492 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
494 db_vdu_push_list
= []
496 db_update
= {"_admin.modified": time()}
498 for vdu_id
, vdu_count
in vdu_create
.items():
502 for vdur
in reversed(db_vnfr
["vdur"])
503 if vdur
["vdu-id-ref"] == vdu_id
508 # Read the template saved in the db:
509 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
510 vdur_template
= db_vnfr
.get("vdur-template")
511 if not vdur_template
:
513 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
517 vdur
= vdur_template
[0]
518 #Delete a template from the database after using it
519 self
.db
.set_one("vnfrs",
520 {"_id": db_vnfr
["_id"]},
522 pull
={"vdur-template": {"_id": vdur
['_id']}}
524 for count
in range(vdu_count
):
525 vdur_copy
= deepcopy(vdur
)
526 vdur_copy
["status"] = "BUILD"
527 vdur_copy
["status-detailed"] = None
528 vdur_copy
["ip-address"] = None
529 vdur_copy
["_id"] = str(uuid4())
530 vdur_copy
["count-index"] += count
+ 1
531 vdur_copy
["id"] = "{}-{}".format(
532 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
534 vdur_copy
.pop("vim_info", None)
535 for iface
in vdur_copy
["interfaces"]:
536 if iface
.get("fixed-ip"):
537 iface
["ip-address"] = self
.increment_ip_mac(
538 iface
["ip-address"], count
+ 1
541 iface
.pop("ip-address", None)
542 if iface
.get("fixed-mac"):
543 iface
["mac-address"] = self
.increment_ip_mac(
544 iface
["mac-address"], count
+ 1
547 iface
.pop("mac-address", None)
551 ) # only first vdu can be managment of vnf
552 db_vdu_push_list
.append(vdur_copy
)
553 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
555 if len(db_vnfr
["vdur"]) == 1:
556 # The scale will move to 0 instances
557 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
558 template_vdur
= [db_vnfr
["vdur"][0]]
559 for vdu_id
, vdu_count
in vdu_delete
.items():
561 indexes_to_delete
= [
563 for iv
in enumerate(db_vnfr
["vdur"])
564 if iv
[1]["vdu-id-ref"] == vdu_id
568 "vdur.{}.status".format(i
): "DELETING"
569 for i
in indexes_to_delete
[-vdu_count
:]
573 # it must be deleted one by one because common.db does not allow otherwise
576 for v
in reversed(db_vnfr
["vdur"])
577 if v
["vdu-id-ref"] == vdu_id
579 for vdu
in vdus_to_delete
[:vdu_count
]:
582 {"_id": db_vnfr
["_id"]},
584 pull
={"vdur": {"_id": vdu
["_id"]}},
588 db_push
["vdur"] = db_vdu_push_list
590 db_push
["vdur-template"] = template_vdur
593 db_vnfr
["vdur-template"] = template_vdur
594 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
595 # modify passed dictionary db_vnfr
596 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
597 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
599 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
601 Updates database nsr with the RO info for the created vld
602 :param ns_update_nsr: dictionary to be filled with the updated info
603 :param db_nsr: content of db_nsr. This is also modified
604 :param nsr_desc_RO: nsr descriptor from RO
605 :return: Nothing, LcmException is raised on errors
608 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
609 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
610 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
612 vld
["vim-id"] = net_RO
.get("vim_net_id")
613 vld
["name"] = net_RO
.get("vim_name")
614 vld
["status"] = net_RO
.get("status")
615 vld
["status-detailed"] = net_RO
.get("error_msg")
616 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
620 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
623 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
625 for db_vnfr
in db_vnfrs
.values():
626 vnfr_update
= {"status": "ERROR"}
627 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
628 if "status" not in vdur
:
629 vdur
["status"] = "ERROR"
630 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
632 vdur
["status-detailed"] = str(error_text
)
634 "vdur.{}.status-detailed".format(vdu_index
)
636 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
637 except DbException
as e
:
638 self
.logger
.error("Cannot update vnf. {}".format(e
))
640 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
642 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
643 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
647 for vnf_index
, db_vnfr
in db_vnfrs
.items():
648 for vnf_RO
in nsr_desc_RO
["vnfs"]:
649 if vnf_RO
["member_vnf_index"] != vnf_index
:
652 if vnf_RO
.get("ip_address"):
653 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
656 elif not db_vnfr
.get("ip-address"):
657 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
658 raise LcmExceptionNoMgmtIP(
659 "ns member_vnf_index '{}' has no IP address".format(
664 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
665 vdur_RO_count_index
= 0
666 if vdur
.get("pdu-type"):
668 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
669 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
671 if vdur
["count-index"] != vdur_RO_count_index
:
672 vdur_RO_count_index
+= 1
674 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
675 if vdur_RO
.get("ip_address"):
676 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
678 vdur
["ip-address"] = None
679 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
680 vdur
["name"] = vdur_RO
.get("vim_name")
681 vdur
["status"] = vdur_RO
.get("status")
682 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
683 for ifacer
in get_iterable(vdur
, "interfaces"):
684 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
685 if ifacer
["name"] == interface_RO
.get("internal_name"):
686 ifacer
["ip-address"] = interface_RO
.get(
689 ifacer
["mac-address"] = interface_RO
.get(
695 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
696 "from VIM info".format(
697 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
700 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
704 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
706 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
710 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
711 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
712 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
714 vld
["vim-id"] = net_RO
.get("vim_net_id")
715 vld
["name"] = net_RO
.get("vim_name")
716 vld
["status"] = net_RO
.get("status")
717 vld
["status-detailed"] = net_RO
.get("error_msg")
718 vnfr_update
["vld.{}".format(vld_index
)] = vld
722 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
727 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
732 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
737 def _get_ns_config_info(self
, nsr_id
):
739 Generates a mapping between vnf,vdu elements and the N2VC id
740 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
741 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
742 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
743 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
745 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
748 ns_config_info
= {"osm-config-mapping": mapping
}
749 for vca
in vca_deployed_list
:
750 if not vca
["member-vnf-index"]:
752 if not vca
["vdu_id"]:
753 mapping
[vca
["member-vnf-index"]] = vca
["application"]
757 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
759 ] = vca
["application"]
760 return ns_config_info
762 async def _instantiate_ng_ro(
779 def get_vim_account(vim_account_id
):
781 if vim_account_id
in db_vims
:
782 return db_vims
[vim_account_id
]
783 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
784 db_vims
[vim_account_id
] = db_vim
787 # modify target_vld info with instantiation parameters
788 def parse_vld_instantiation_params(
789 target_vim
, target_vld
, vld_params
, target_sdn
791 if vld_params
.get("ip-profile"):
792 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
795 if vld_params
.get("provider-network"):
796 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
799 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
800 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
803 if vld_params
.get("wimAccountId"):
804 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
805 target_vld
["vim_info"][target_wim
] = {}
806 for param
in ("vim-network-name", "vim-network-id"):
807 if vld_params
.get(param
):
808 if isinstance(vld_params
[param
], dict):
809 for vim
, vim_net
in vld_params
[param
].items():
810 other_target_vim
= "vim:" + vim
812 target_vld
["vim_info"],
813 (other_target_vim
, param
.replace("-", "_")),
816 else: # isinstance str
817 target_vld
["vim_info"][target_vim
][
818 param
.replace("-", "_")
819 ] = vld_params
[param
]
820 if vld_params
.get("common_id"):
821 target_vld
["common_id"] = vld_params
.get("common_id")
823 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
824 def update_ns_vld_target(target
, ns_params
):
825 for vnf_params
in ns_params
.get("vnf", ()):
826 if vnf_params
.get("vimAccountId"):
830 for vnfr
in db_vnfrs
.values()
831 if vnf_params
["member-vnf-index"]
832 == vnfr
["member-vnf-index-ref"]
836 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
837 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
838 target_vld
= find_in_list(
839 get_iterable(vdur
, "interfaces"),
840 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
843 if vnf_params
.get("vimAccountId") not in a_vld
.get(
846 target
["ns"]["vld"][a_index
].get("vim_info").update(
848 "vim:{}".format(vnf_params
["vimAccountId"]): {
849 "vim_network_name": ""
854 nslcmop_id
= db_nslcmop
["_id"]
856 "name": db_nsr
["name"],
859 "image": deepcopy(db_nsr
["image"]),
860 "flavor": deepcopy(db_nsr
["flavor"]),
861 "action_id": nslcmop_id
,
862 "cloud_init_content": {},
864 for image
in target
["image"]:
865 image
["vim_info"] = {}
866 for flavor
in target
["flavor"]:
867 flavor
["vim_info"] = {}
868 if db_nsr
.get("affinity-or-anti-affinity-group"):
869 target
["affinity-or-anti-affinity-group"] = deepcopy(db_nsr
["affinity-or-anti-affinity-group"])
870 for affinity_or_anti_affinity_group
in target
["affinity-or-anti-affinity-group"]:
871 affinity_or_anti_affinity_group
["vim_info"] = {}
873 if db_nslcmop
.get("lcmOperationType") != "instantiate":
874 # get parameters of instantiation:
875 db_nslcmop_instantiate
= self
.db
.get_list(
878 "nsInstanceId": db_nslcmop
["nsInstanceId"],
879 "lcmOperationType": "instantiate",
882 ns_params
= db_nslcmop_instantiate
.get("operationParams")
884 ns_params
= db_nslcmop
.get("operationParams")
885 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
886 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
889 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
890 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
894 "mgmt-network": vld
.get("mgmt-network", False),
895 "type": vld
.get("type"),
898 "vim_network_name": vld
.get("vim-network-name"),
899 "vim_account_id": ns_params
["vimAccountId"],
903 # check if this network needs SDN assist
904 if vld
.get("pci-interfaces"):
905 db_vim
= get_vim_account(ns_params
["vimAccountId"])
906 sdnc_id
= db_vim
["config"].get("sdn-controller")
908 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
909 target_sdn
= "sdn:{}".format(sdnc_id
)
910 target_vld
["vim_info"][target_sdn
] = {
912 "target_vim": target_vim
,
914 "type": vld
.get("type"),
917 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
918 for nsd_vnf_profile
in nsd_vnf_profiles
:
919 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
920 if cp
["virtual-link-profile-id"] == vld
["id"]:
922 "member_vnf:{}.{}".format(
923 cp
["constituent-cpd-id"][0][
924 "constituent-base-element-id"
926 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
928 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
930 # check at nsd descriptor, if there is an ip-profile
932 nsd_vlp
= find_in_list(
933 get_virtual_link_profiles(nsd
),
934 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
939 and nsd_vlp
.get("virtual-link-protocol-data")
940 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
942 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
945 ip_profile_dest_data
= {}
946 if "ip-version" in ip_profile_source_data
:
947 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
950 if "cidr" in ip_profile_source_data
:
951 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
954 if "gateway-ip" in ip_profile_source_data
:
955 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
958 if "dhcp-enabled" in ip_profile_source_data
:
959 ip_profile_dest_data
["dhcp-params"] = {
960 "enabled": ip_profile_source_data
["dhcp-enabled"]
962 vld_params
["ip-profile"] = ip_profile_dest_data
964 # update vld_params with instantiation params
965 vld_instantiation_params
= find_in_list(
966 get_iterable(ns_params
, "vld"),
967 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
969 if vld_instantiation_params
:
970 vld_params
.update(vld_instantiation_params
)
971 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
972 target
["ns"]["vld"].append(target_vld
)
973 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
974 update_ns_vld_target(target
, ns_params
)
976 for vnfr
in db_vnfrs
.values():
978 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
980 vnf_params
= find_in_list(
981 get_iterable(ns_params
, "vnf"),
982 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
984 target_vnf
= deepcopy(vnfr
)
985 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
986 for vld
in target_vnf
.get("vld", ()):
987 # check if connected to a ns.vld, to fill target'
988 vnf_cp
= find_in_list(
989 vnfd
.get("int-virtual-link-desc", ()),
990 lambda cpd
: cpd
.get("id") == vld
["id"],
993 ns_cp
= "member_vnf:{}.{}".format(
994 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
996 if cp2target
.get(ns_cp
):
997 vld
["target"] = cp2target
[ns_cp
]
1000 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1002 # check if this network needs SDN assist
1004 if vld
.get("pci-interfaces"):
1005 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1006 sdnc_id
= db_vim
["config"].get("sdn-controller")
1008 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1009 target_sdn
= "sdn:{}".format(sdnc_id
)
1010 vld
["vim_info"][target_sdn
] = {
1012 "target_vim": target_vim
,
1014 "type": vld
.get("type"),
1017 # check at vnfd descriptor, if there is an ip-profile
1019 vnfd_vlp
= find_in_list(
1020 get_virtual_link_profiles(vnfd
),
1021 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1025 and vnfd_vlp
.get("virtual-link-protocol-data")
1026 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1028 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1031 ip_profile_dest_data
= {}
1032 if "ip-version" in ip_profile_source_data
:
1033 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1036 if "cidr" in ip_profile_source_data
:
1037 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1040 if "gateway-ip" in ip_profile_source_data
:
1041 ip_profile_dest_data
[
1043 ] = ip_profile_source_data
["gateway-ip"]
1044 if "dhcp-enabled" in ip_profile_source_data
:
1045 ip_profile_dest_data
["dhcp-params"] = {
1046 "enabled": ip_profile_source_data
["dhcp-enabled"]
1049 vld_params
["ip-profile"] = ip_profile_dest_data
1050 # update vld_params with instantiation params
1052 vld_instantiation_params
= find_in_list(
1053 get_iterable(vnf_params
, "internal-vld"),
1054 lambda i_vld
: i_vld
["name"] == vld
["id"],
1056 if vld_instantiation_params
:
1057 vld_params
.update(vld_instantiation_params
)
1058 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1061 for vdur
in target_vnf
.get("vdur", ()):
1062 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1063 continue # This vdu must not be created
1064 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1066 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1069 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1070 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1073 and vdu_configuration
.get("config-access")
1074 and vdu_configuration
.get("config-access").get("ssh-access")
1076 vdur
["ssh-keys"] = ssh_keys_all
1077 vdur
["ssh-access-required"] = vdu_configuration
[
1079 ]["ssh-access"]["required"]
1082 and vnf_configuration
.get("config-access")
1083 and vnf_configuration
.get("config-access").get("ssh-access")
1084 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1086 vdur
["ssh-keys"] = ssh_keys_all
1087 vdur
["ssh-access-required"] = vnf_configuration
[
1089 ]["ssh-access"]["required"]
1090 elif ssh_keys_instantiation
and find_in_list(
1091 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1093 vdur
["ssh-keys"] = ssh_keys_instantiation
1095 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1097 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1099 if vdud
.get("cloud-init-file"):
1100 vdur
["cloud-init"] = "{}:file:{}".format(
1101 vnfd
["_id"], vdud
.get("cloud-init-file")
1103 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1104 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1105 base_folder
= vnfd
["_admin"]["storage"]
1106 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1107 base_folder
["folder"],
1108 base_folder
["pkg-dir"],
1109 vdud
.get("cloud-init-file"),
1111 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1112 target
["cloud_init_content"][
1115 elif vdud
.get("cloud-init"):
1116 vdur
["cloud-init"] = "{}:vdu:{}".format(
1117 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1119 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1120 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1123 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1124 deploy_params_vdu
= self
._format
_additional
_params
(
1125 vdur
.get("additionalParams") or {}
1127 deploy_params_vdu
["OSM"] = get_osm_params(
1128 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1130 vdur
["additionalParams"] = deploy_params_vdu
1133 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1134 if target_vim
not in ns_flavor
["vim_info"]:
1135 ns_flavor
["vim_info"][target_vim
] = {}
1138 # in case alternative images are provided we must check if they should be applied
1139 # for the vim_type, modify the vim_type taking into account
1140 ns_image_id
= int(vdur
["ns-image-id"])
1141 if vdur
.get("alt-image-ids"):
1142 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1143 vim_type
= db_vim
["vim_type"]
1144 for alt_image_id
in vdur
.get("alt-image-ids"):
1145 ns_alt_image
= target
["image"][int(alt_image_id
)]
1146 if vim_type
== ns_alt_image
.get("vim-type"):
1147 # must use alternative image
1149 "use alternative image id: {}".format(alt_image_id
)
1151 ns_image_id
= alt_image_id
1152 vdur
["ns-image-id"] = ns_image_id
1154 ns_image
= target
["image"][int(ns_image_id
)]
1155 if target_vim
not in ns_image
["vim_info"]:
1156 ns_image
["vim_info"][target_vim
] = {}
1159 if vdur
.get("affinity-or-anti-affinity-group-id"):
1160 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1161 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1162 if target_vim
not in ns_ags
["vim_info"]:
1163 ns_ags
["vim_info"][target_vim
] = {}
1165 vdur
["vim_info"] = {target_vim
: {}}
1166 # instantiation parameters
1168 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1169 # vdud["id"]), None)
1170 vdur_list
.append(vdur
)
1171 target_vnf
["vdur"] = vdur_list
1172 target
["vnf"].append(target_vnf
)
1174 desc
= await self
.RO
.deploy(nsr_id
, target
)
1175 self
.logger
.debug("RO return > {}".format(desc
))
1176 action_id
= desc
["action_id"]
1177 await self
._wait
_ng
_ro
(
1178 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1183 "_admin.deployed.RO.operational-status": "running",
1184 "detailed-status": " ".join(stage
),
1186 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1187 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1188 self
._write
_op
_status
(nslcmop_id
, stage
)
1190 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1194 async def _wait_ng_ro(
1203 detailed_status_old
= None
1205 start_time
= start_time
or time()
1206 while time() <= start_time
+ timeout
:
1207 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1208 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1209 if desc_status
["status"] == "FAILED":
1210 raise NgRoException(desc_status
["details"])
1211 elif desc_status
["status"] == "BUILD":
1213 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1214 elif desc_status
["status"] == "DONE":
1216 stage
[2] = "Deployed at VIM"
1219 assert False, "ROclient.check_ns_status returns unknown {}".format(
1220 desc_status
["status"]
1222 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1223 detailed_status_old
= stage
[2]
1224 db_nsr_update
["detailed-status"] = " ".join(stage
)
1225 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1226 self
._write
_op
_status
(nslcmop_id
, stage
)
1227 await asyncio
.sleep(15, loop
=self
.loop
)
1228 else: # timeout_ns_deploy
1229 raise NgRoException("Timeout waiting ns to deploy")
1231 async def _terminate_ng_ro(
1232 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1237 start_deploy
= time()
1244 "action_id": nslcmop_id
,
1246 desc
= await self
.RO
.deploy(nsr_id
, target
)
1247 action_id
= desc
["action_id"]
1248 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1249 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1252 + "ns terminate action at RO. action_id={}".format(action_id
)
1256 delete_timeout
= 20 * 60 # 20 minutes
1257 await self
._wait
_ng
_ro
(
1258 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1261 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1262 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1264 await self
.RO
.delete(nsr_id
)
1265 except Exception as e
:
1266 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1267 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1268 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1269 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1271 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1273 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1274 failed_detail
.append("delete conflict: {}".format(e
))
1277 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1280 failed_detail
.append("delete error: {}".format(e
))
1283 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1287 stage
[2] = "Error deleting from VIM"
1289 stage
[2] = "Deleted from VIM"
1290 db_nsr_update
["detailed-status"] = " ".join(stage
)
1291 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1292 self
._write
_op
_status
(nslcmop_id
, stage
)
1295 raise LcmException("; ".join(failed_detail
))
1298 async def instantiate_RO(
1312 :param logging_text: preffix text to use at logging
1313 :param nsr_id: nsr identity
1314 :param nsd: database content of ns descriptor
1315 :param db_nsr: database content of ns record
1316 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1318 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1319 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1320 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1321 :return: None or exception
1324 start_deploy
= time()
1325 ns_params
= db_nslcmop
.get("operationParams")
1326 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1327 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1329 timeout_ns_deploy
= self
.timeout
.get(
1330 "ns_deploy", self
.timeout_ns_deploy
1333 # Check for and optionally request placement optimization. Database will be updated if placement activated
1334 stage
[2] = "Waiting for Placement."
1335 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1336 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1337 for vnfr
in db_vnfrs
.values():
1338 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1341 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1343 return await self
._instantiate
_ng
_ro
(
1356 except Exception as e
:
1357 stage
[2] = "ERROR deploying at VIM"
1358 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1360 "Error deploying at VIM {}".format(e
),
1361 exc_info
=not isinstance(
1364 ROclient
.ROClientException
,
1373 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1375 Wait for kdu to be up, get ip address
1376 :param logging_text: prefix use for logging
1383 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1386 while nb_tries
< 360:
1387 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1391 for x
in get_iterable(db_vnfr
, "kdur")
1392 if x
.get("kdu-name") == kdu_name
1398 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1400 if kdur
.get("status"):
1401 if kdur
["status"] in ("READY", "ENABLED"):
1402 return kdur
.get("ip-address")
1405 "target KDU={} is in error state".format(kdu_name
)
1408 await asyncio
.sleep(10, loop
=self
.loop
)
1410 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1412 async def wait_vm_up_insert_key_ro(
1413 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1416 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1417 :param logging_text: prefix use for logging
1422 :param pub_key: public ssh key to inject, None to skip
1423 :param user: user to apply the public ssh key
1427 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1431 target_vdu_id
= None
1437 if ro_retries
>= 360: # 1 hour
1439 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1442 await asyncio
.sleep(10, loop
=self
.loop
)
1445 if not target_vdu_id
:
1446 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1448 if not vdu_id
: # for the VNF case
1449 if db_vnfr
.get("status") == "ERROR":
1451 "Cannot inject ssh-key because target VNF is in error state"
1453 ip_address
= db_vnfr
.get("ip-address")
1459 for x
in get_iterable(db_vnfr
, "vdur")
1460 if x
.get("ip-address") == ip_address
1468 for x
in get_iterable(db_vnfr
, "vdur")
1469 if x
.get("vdu-id-ref") == vdu_id
1470 and x
.get("count-index") == vdu_index
1476 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1477 ): # If only one, this should be the target vdu
1478 vdur
= db_vnfr
["vdur"][0]
1481 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1482 vnfr_id
, vdu_id
, vdu_index
1485 # New generation RO stores information at "vim_info"
1488 if vdur
.get("vim_info"):
1490 t
for t
in vdur
["vim_info"]
1491 ) # there should be only one key
1492 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1494 vdur
.get("pdu-type")
1495 or vdur
.get("status") == "ACTIVE"
1496 or ng_ro_status
== "ACTIVE"
1498 ip_address
= vdur
.get("ip-address")
1501 target_vdu_id
= vdur
["vdu-id-ref"]
1502 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1504 "Cannot inject ssh-key because target VM is in error state"
1507 if not target_vdu_id
:
1510 # inject public key into machine
1511 if pub_key
and user
:
1512 self
.logger
.debug(logging_text
+ "Inserting RO key")
1513 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1514 if vdur
.get("pdu-type"):
1515 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1518 ro_vm_id
= "{}-{}".format(
1519 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1520 ) # TODO add vdu_index
1524 "action": "inject_ssh_key",
1528 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1530 desc
= await self
.RO
.deploy(nsr_id
, target
)
1531 action_id
= desc
["action_id"]
1532 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1535 # wait until NS is deployed at RO
1537 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1538 ro_nsr_id
= deep_get(
1539 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1543 result_dict
= await self
.RO
.create_action(
1545 item_id_name
=ro_nsr_id
,
1547 "add_public_key": pub_key
,
1552 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1553 if not result_dict
or not isinstance(result_dict
, dict):
1555 "Unknown response from RO when injecting key"
1557 for result
in result_dict
.values():
1558 if result
.get("vim_result") == 200:
1561 raise ROclient
.ROClientException(
1562 "error injecting key: {}".format(
1563 result
.get("description")
1567 except NgRoException
as e
:
1569 "Reaching max tries injecting key. Error: {}".format(e
)
1571 except ROclient
.ROClientException
as e
:
1575 + "error injecting key: {}. Retrying until {} seconds".format(
1582 "Reaching max tries injecting key. Error: {}".format(e
)
1589 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1591 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1593 my_vca
= vca_deployed_list
[vca_index
]
1594 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1595 # vdu or kdu: no dependencies
1599 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1600 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1601 configuration_status_list
= db_nsr
["configurationStatus"]
1602 for index
, vca_deployed
in enumerate(configuration_status_list
):
1603 if index
== vca_index
:
1606 if not my_vca
.get("member-vnf-index") or (
1607 vca_deployed
.get("member-vnf-index")
1608 == my_vca
.get("member-vnf-index")
1610 internal_status
= configuration_status_list
[index
].get("status")
1611 if internal_status
== "READY":
1613 elif internal_status
== "BROKEN":
1615 "Configuration aborted because dependent charm/s has failed"
1620 # no dependencies, return
1622 await asyncio
.sleep(10)
1625 raise LcmException("Configuration aborted because dependent charm/s timeout")
1627 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1630 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1632 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1633 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1636 async def instantiate_N2VC(
1653 ee_config_descriptor
,
1655 nsr_id
= db_nsr
["_id"]
1656 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1657 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1658 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1659 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1661 "collection": "nsrs",
1662 "filter": {"_id": nsr_id
},
1663 "path": db_update_entry
,
1669 element_under_configuration
= nsr_id
1673 vnfr_id
= db_vnfr
["_id"]
1674 osm_config
["osm"]["vnf_id"] = vnfr_id
1676 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1678 if vca_type
== "native_charm":
1681 index_number
= vdu_index
or 0
1684 element_type
= "VNF"
1685 element_under_configuration
= vnfr_id
1686 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1688 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1689 element_type
= "VDU"
1690 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1691 osm_config
["osm"]["vdu_id"] = vdu_id
1693 namespace
+= ".{}".format(kdu_name
)
1694 element_type
= "KDU"
1695 element_under_configuration
= kdu_name
1696 osm_config
["osm"]["kdu_name"] = kdu_name
1699 artifact_path
= "{}/{}/{}/{}".format(
1700 base_folder
["folder"],
1701 base_folder
["pkg-dir"],
1703 if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1708 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1710 # get initial_config_primitive_list that applies to this element
1711 initial_config_primitive_list
= config_descriptor
.get(
1712 "initial-config-primitive"
1716 "Initial config primitive list > {}".format(
1717 initial_config_primitive_list
1721 # add config if not present for NS charm
1722 ee_descriptor_id
= ee_config_descriptor
.get("id")
1723 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1724 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1725 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1729 "Initial config primitive list #2 > {}".format(
1730 initial_config_primitive_list
1733 # n2vc_redesign STEP 3.1
1734 # find old ee_id if exists
1735 ee_id
= vca_deployed
.get("ee_id")
1737 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1738 # create or register execution environment in VCA
1739 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1741 self
._write
_configuration
_status
(
1743 vca_index
=vca_index
,
1745 element_under_configuration
=element_under_configuration
,
1746 element_type
=element_type
,
1749 step
= "create execution environment"
1750 self
.logger
.debug(logging_text
+ step
)
1754 if vca_type
== "k8s_proxy_charm":
1755 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1756 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1757 namespace
=namespace
,
1758 artifact_path
=artifact_path
,
1762 elif vca_type
== "helm" or vca_type
== "helm-v3":
1763 ee_id
, credentials
= await self
.vca_map
[
1765 ].create_execution_environment(
1766 namespace
=namespace
,
1770 artifact_path
=artifact_path
,
1774 ee_id
, credentials
= await self
.vca_map
[
1776 ].create_execution_environment(
1777 namespace
=namespace
,
1783 elif vca_type
== "native_charm":
1784 step
= "Waiting to VM being up and getting IP address"
1785 self
.logger
.debug(logging_text
+ step
)
1786 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1795 credentials
= {"hostname": rw_mgmt_ip
}
1797 username
= deep_get(
1798 config_descriptor
, ("config-access", "ssh-access", "default-user")
1800 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1801 # merged. Meanwhile let's get username from initial-config-primitive
1802 if not username
and initial_config_primitive_list
:
1803 for config_primitive
in initial_config_primitive_list
:
1804 for param
in config_primitive
.get("parameter", ()):
1805 if param
["name"] == "ssh-username":
1806 username
= param
["value"]
1810 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1811 "'config-access.ssh-access.default-user'"
1813 credentials
["username"] = username
1814 # n2vc_redesign STEP 3.2
1816 self
._write
_configuration
_status
(
1818 vca_index
=vca_index
,
1819 status
="REGISTERING",
1820 element_under_configuration
=element_under_configuration
,
1821 element_type
=element_type
,
1824 step
= "register execution environment {}".format(credentials
)
1825 self
.logger
.debug(logging_text
+ step
)
1826 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1827 credentials
=credentials
,
1828 namespace
=namespace
,
1833 # for compatibility with MON/POL modules, the need model and application name at database
1834 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1835 ee_id_parts
= ee_id
.split(".")
1836 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1837 if len(ee_id_parts
) >= 2:
1838 model_name
= ee_id_parts
[0]
1839 application_name
= ee_id_parts
[1]
1840 db_nsr_update
[db_update_entry
+ "model"] = model_name
1841 db_nsr_update
[db_update_entry
+ "application"] = application_name
1843 # n2vc_redesign STEP 3.3
1844 step
= "Install configuration Software"
1846 self
._write
_configuration
_status
(
1848 vca_index
=vca_index
,
1849 status
="INSTALLING SW",
1850 element_under_configuration
=element_under_configuration
,
1851 element_type
=element_type
,
1852 other_update
=db_nsr_update
,
1855 # TODO check if already done
1856 self
.logger
.debug(logging_text
+ step
)
1858 if vca_type
== "native_charm":
1859 config_primitive
= next(
1860 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1863 if config_primitive
:
1864 config
= self
._map
_primitive
_params
(
1865 config_primitive
, {}, deploy_params
1868 if vca_type
== "lxc_proxy_charm":
1869 if element_type
== "NS":
1870 num_units
= db_nsr
.get("config-units") or 1
1871 elif element_type
== "VNF":
1872 num_units
= db_vnfr
.get("config-units") or 1
1873 elif element_type
== "VDU":
1874 for v
in db_vnfr
["vdur"]:
1875 if vdu_id
== v
["vdu-id-ref"]:
1876 num_units
= v
.get("config-units") or 1
1878 if vca_type
!= "k8s_proxy_charm":
1879 await self
.vca_map
[vca_type
].install_configuration_sw(
1881 artifact_path
=artifact_path
,
1884 num_units
=num_units
,
1889 # write in db flag of configuration_sw already installed
1891 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1894 # add relations for this VCA (wait for other peers related with this VCA)
1895 await self
._add
_vca
_relations
(
1896 logging_text
=logging_text
,
1898 vca_index
=vca_index
,
1903 # if SSH access is required, then get execution environment SSH public
1904 # if native charm we have waited already to VM be UP
1905 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1908 # self.logger.debug("get ssh key block")
1910 config_descriptor
, ("config-access", "ssh-access", "required")
1912 # self.logger.debug("ssh key needed")
1913 # Needed to inject a ssh key
1916 ("config-access", "ssh-access", "default-user"),
1918 step
= "Install configuration Software, getting public ssh key"
1919 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1920 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1923 step
= "Insert public key into VM user={} ssh_key={}".format(
1927 # self.logger.debug("no need to get ssh key")
1928 step
= "Waiting to VM being up and getting IP address"
1929 self
.logger
.debug(logging_text
+ step
)
1931 # n2vc_redesign STEP 5.1
1932 # wait for RO (ip-address) Insert pub_key into VM
1935 rw_mgmt_ip
= await self
.wait_kdu_up(
1936 logging_text
, nsr_id
, vnfr_id
, kdu_name
1939 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1949 rw_mgmt_ip
= None # This is for a NS configuration
1951 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
1953 # store rw_mgmt_ip in deploy params for later replacement
1954 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1956 # n2vc_redesign STEP 6 Execute initial config primitive
1957 step
= "execute initial config primitive"
1959 # wait for dependent primitives execution (NS -> VNF -> VDU)
1960 if initial_config_primitive_list
:
1961 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1963 # stage, in function of element type: vdu, kdu, vnf or ns
1964 my_vca
= vca_deployed_list
[vca_index
]
1965 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1967 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
1968 elif my_vca
.get("member-vnf-index"):
1970 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
1973 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
1975 self
._write
_configuration
_status
(
1976 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
1979 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
1981 check_if_terminated_needed
= True
1982 for initial_config_primitive
in initial_config_primitive_list
:
1983 # adding information on the vca_deployed if it is a NS execution environment
1984 if not vca_deployed
["member-vnf-index"]:
1985 deploy_params
["ns_config_info"] = json
.dumps(
1986 self
._get
_ns
_config
_info
(nsr_id
)
1988 # TODO check if already done
1989 primitive_params_
= self
._map
_primitive
_params
(
1990 initial_config_primitive
, {}, deploy_params
1993 step
= "execute primitive '{}' params '{}'".format(
1994 initial_config_primitive
["name"], primitive_params_
1996 self
.logger
.debug(logging_text
+ step
)
1997 await self
.vca_map
[vca_type
].exec_primitive(
1999 primitive_name
=initial_config_primitive
["name"],
2000 params_dict
=primitive_params_
,
2005 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2006 if check_if_terminated_needed
:
2007 if config_descriptor
.get("terminate-config-primitive"):
2009 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2011 check_if_terminated_needed
= False
2013 # TODO register in database that primitive is done
2015 # STEP 7 Configure metrics
2016 if vca_type
== "helm" or vca_type
== "helm-v3":
2017 prometheus_jobs
= await self
.add_prometheus_metrics(
2019 artifact_path
=artifact_path
,
2020 ee_config_descriptor
=ee_config_descriptor
,
2023 target_ip
=rw_mgmt_ip
,
2029 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2032 step
= "instantiated at VCA"
2033 self
.logger
.debug(logging_text
+ step
)
2035 self
._write
_configuration
_status
(
2036 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2039 except Exception as e
: # TODO not use Exception but N2VC exception
2040 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2042 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2045 "Exception while {} : {}".format(step
, e
), exc_info
=True
2047 self
._write
_configuration
_status
(
2048 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2050 raise LcmException("{} {}".format(step
, e
)) from e
2052 def _write_ns_status(
2056 current_operation
: str,
2057 current_operation_id
: str,
2058 error_description
: str = None,
2059 error_detail
: str = None,
2060 other_update
: dict = None,
2063 Update db_nsr fields.
2066 :param current_operation:
2067 :param current_operation_id:
2068 :param error_description:
2069 :param error_detail:
2070 :param other_update: Other required changes at database if provided, will be cleared
2074 db_dict
= other_update
or {}
2077 ] = current_operation_id
# for backward compatibility
2078 db_dict
["_admin.current-operation"] = current_operation_id
2079 db_dict
["_admin.operation-type"] = (
2080 current_operation
if current_operation
!= "IDLE" else None
2082 db_dict
["currentOperation"] = current_operation
2083 db_dict
["currentOperationID"] = current_operation_id
2084 db_dict
["errorDescription"] = error_description
2085 db_dict
["errorDetail"] = error_detail
2088 db_dict
["nsState"] = ns_state
2089 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2090 except DbException
as e
:
2091 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2093 def _write_op_status(
2097 error_message
: str = None,
2098 queuePosition
: int = 0,
2099 operation_state
: str = None,
2100 other_update
: dict = None,
2103 db_dict
= other_update
or {}
2104 db_dict
["queuePosition"] = queuePosition
2105 if isinstance(stage
, list):
2106 db_dict
["stage"] = stage
[0]
2107 db_dict
["detailed-status"] = " ".join(stage
)
2108 elif stage
is not None:
2109 db_dict
["stage"] = str(stage
)
2111 if error_message
is not None:
2112 db_dict
["errorMessage"] = error_message
2113 if operation_state
is not None:
2114 db_dict
["operationState"] = operation_state
2115 db_dict
["statusEnteredTime"] = time()
2116 self
.update_db_2("nslcmops", op_id
, db_dict
)
2117 except DbException
as e
:
2119 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2122 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2124 nsr_id
= db_nsr
["_id"]
2125 # configurationStatus
2126 config_status
= db_nsr
.get("configurationStatus")
2129 "configurationStatus.{}.status".format(index
): status
2130 for index
, v
in enumerate(config_status
)
2134 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2136 except DbException
as e
:
2138 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2141 def _write_configuration_status(
2146 element_under_configuration
: str = None,
2147 element_type
: str = None,
2148 other_update
: dict = None,
2151 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2152 # .format(vca_index, status))
2155 db_path
= "configurationStatus.{}.".format(vca_index
)
2156 db_dict
= other_update
or {}
2158 db_dict
[db_path
+ "status"] = status
2159 if element_under_configuration
:
2161 db_path
+ "elementUnderConfiguration"
2162 ] = element_under_configuration
2164 db_dict
[db_path
+ "elementType"] = element_type
2165 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2166 except DbException
as e
:
2168 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2169 status
, nsr_id
, vca_index
, e
2173 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2175 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2176 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2177 Database is used because the result can be obtained from a different LCM worker in case of HA.
2178 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2179 :param db_nslcmop: database content of nslcmop
2180 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2181 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2182 computed 'vim-account-id'
2185 nslcmop_id
= db_nslcmop
["_id"]
2186 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2187 if placement_engine
== "PLA":
2189 logging_text
+ "Invoke and wait for placement optimization"
2191 await self
.msg
.aiowrite(
2192 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2194 db_poll_interval
= 5
2195 wait
= db_poll_interval
* 10
2197 while not pla_result
and wait
>= 0:
2198 await asyncio
.sleep(db_poll_interval
)
2199 wait
-= db_poll_interval
2200 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2201 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2205 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2208 for pla_vnf
in pla_result
["vnf"]:
2209 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2210 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2215 {"_id": vnfr
["_id"]},
2216 {"vim-account-id": pla_vnf
["vimAccountId"]},
2219 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2222 def update_nsrs_with_pla_result(self
, params
):
2224 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2226 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2228 except Exception as e
:
2229 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2231 async def instantiate(self
, nsr_id
, nslcmop_id
):
2234 :param nsr_id: ns instance to deploy
2235 :param nslcmop_id: operation to run
2239 # Try to lock HA task here
2240 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2241 if not task_is_locked_by_me
:
2243 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2247 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2248 self
.logger
.debug(logging_text
+ "Enter")
2250 # get all needed from database
2252 # database nsrs record
2255 # database nslcmops record
2258 # update operation on nsrs
2260 # update operation on nslcmops
2261 db_nslcmop_update
= {}
2263 nslcmop_operation_state
= None
2264 db_vnfrs
= {} # vnf's info indexed by member-index
2266 tasks_dict_info
= {} # from task to info text
2270 "Stage 1/5: preparation of the environment.",
2271 "Waiting for previous operations to terminate.",
2274 # ^ stage, step, VIM progress
2276 # wait for any previous tasks in process
2277 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2279 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2280 stage
[1] = "Reading from database."
2281 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2282 db_nsr_update
["detailed-status"] = "creating"
2283 db_nsr_update
["operational-status"] = "init"
2284 self
._write
_ns
_status
(
2286 ns_state
="BUILDING",
2287 current_operation
="INSTANTIATING",
2288 current_operation_id
=nslcmop_id
,
2289 other_update
=db_nsr_update
,
2291 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2293 # read from db: operation
2294 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2295 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2296 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2297 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2298 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2300 ns_params
= db_nslcmop
.get("operationParams")
2301 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2302 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2304 timeout_ns_deploy
= self
.timeout
.get(
2305 "ns_deploy", self
.timeout_ns_deploy
2309 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2310 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2311 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2312 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2313 self
.fs
.sync(db_nsr
["nsd-id"])
2315 # nsr_name = db_nsr["name"] # TODO short-name??
2317 # read from db: vnf's of this ns
2318 stage
[1] = "Getting vnfrs from db."
2319 self
.logger
.debug(logging_text
+ stage
[1])
2320 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2322 # read from db: vnfd's for every vnf
2323 db_vnfds
= [] # every vnfd data
2325 # for each vnf in ns, read vnfd
2326 for vnfr
in db_vnfrs_list
:
2327 if vnfr
.get("kdur"):
2329 for kdur
in vnfr
["kdur"]:
2330 if kdur
.get("additionalParams"):
2331 kdur
["additionalParams"] = json
.loads(
2332 kdur
["additionalParams"]
2334 kdur_list
.append(kdur
)
2335 vnfr
["kdur"] = kdur_list
2337 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2338 vnfd_id
= vnfr
["vnfd-id"]
2339 vnfd_ref
= vnfr
["vnfd-ref"]
2340 self
.fs
.sync(vnfd_id
)
2342 # if we haven't this vnfd, read it from db
2343 if vnfd_id
not in db_vnfds
:
2345 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2348 self
.logger
.debug(logging_text
+ stage
[1])
2349 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2352 db_vnfds
.append(vnfd
)
2354 # Get or generates the _admin.deployed.VCA list
2355 vca_deployed_list
= None
2356 if db_nsr
["_admin"].get("deployed"):
2357 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2358 if vca_deployed_list
is None:
2359 vca_deployed_list
= []
2360 configuration_status_list
= []
2361 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2362 db_nsr_update
["configurationStatus"] = configuration_status_list
2363 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2364 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2365 elif isinstance(vca_deployed_list
, dict):
2366 # maintain backward compatibility. Change a dict to list at database
2367 vca_deployed_list
= list(vca_deployed_list
.values())
2368 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2369 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2372 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2374 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2375 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2377 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2378 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2379 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2381 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2384 # n2vc_redesign STEP 2 Deploy Network Scenario
2385 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2386 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2388 stage
[1] = "Deploying KDUs."
2389 # self.logger.debug(logging_text + "Before deploy_kdus")
2390 # Call to deploy_kdus in case exists the "vdu:kdu" param
2391 await self
.deploy_kdus(
2392 logging_text
=logging_text
,
2394 nslcmop_id
=nslcmop_id
,
2397 task_instantiation_info
=tasks_dict_info
,
2400 stage
[1] = "Getting VCA public key."
2401 # n2vc_redesign STEP 1 Get VCA public ssh-key
2402 # feature 1429. Add n2vc public key to needed VMs
2403 n2vc_key
= self
.n2vc
.get_public_key()
2404 n2vc_key_list
= [n2vc_key
]
2405 if self
.vca_config
.get("public_key"):
2406 n2vc_key_list
.append(self
.vca_config
["public_key"])
2408 stage
[1] = "Deploying NS at VIM."
2409 task_ro
= asyncio
.ensure_future(
2410 self
.instantiate_RO(
2411 logging_text
=logging_text
,
2415 db_nslcmop
=db_nslcmop
,
2418 n2vc_key_list
=n2vc_key_list
,
2422 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2423 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2425 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2426 stage
[1] = "Deploying Execution Environments."
2427 self
.logger
.debug(logging_text
+ stage
[1])
2429 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2430 for vnf_profile
in get_vnf_profiles(nsd
):
2431 vnfd_id
= vnf_profile
["vnfd-id"]
2432 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2433 member_vnf_index
= str(vnf_profile
["id"])
2434 db_vnfr
= db_vnfrs
[member_vnf_index
]
2435 base_folder
= vnfd
["_admin"]["storage"]
2441 # Get additional parameters
2442 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2443 if db_vnfr
.get("additionalParamsForVnf"):
2444 deploy_params
.update(
2445 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2448 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2449 if descriptor_config
:
2451 logging_text
=logging_text
2452 + "member_vnf_index={} ".format(member_vnf_index
),
2455 nslcmop_id
=nslcmop_id
,
2461 member_vnf_index
=member_vnf_index
,
2462 vdu_index
=vdu_index
,
2464 deploy_params
=deploy_params
,
2465 descriptor_config
=descriptor_config
,
2466 base_folder
=base_folder
,
2467 task_instantiation_info
=tasks_dict_info
,
2471 # Deploy charms for each VDU that supports one.
2472 for vdud
in get_vdu_list(vnfd
):
2474 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2475 vdur
= find_in_list(
2476 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2479 if vdur
.get("additionalParams"):
2480 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2482 deploy_params_vdu
= deploy_params
2483 deploy_params_vdu
["OSM"] = get_osm_params(
2484 db_vnfr
, vdu_id
, vdu_count_index
=0
2486 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2488 self
.logger
.debug("VDUD > {}".format(vdud
))
2490 "Descriptor config > {}".format(descriptor_config
)
2492 if descriptor_config
:
2495 for vdu_index
in range(vdud_count
):
2496 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2498 logging_text
=logging_text
2499 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2500 member_vnf_index
, vdu_id
, vdu_index
2504 nslcmop_id
=nslcmop_id
,
2510 member_vnf_index
=member_vnf_index
,
2511 vdu_index
=vdu_index
,
2513 deploy_params
=deploy_params_vdu
,
2514 descriptor_config
=descriptor_config
,
2515 base_folder
=base_folder
,
2516 task_instantiation_info
=tasks_dict_info
,
2519 for kdud
in get_kdu_list(vnfd
):
2520 kdu_name
= kdud
["name"]
2521 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2522 if descriptor_config
:
2527 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2529 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2530 if kdur
.get("additionalParams"):
2531 deploy_params_kdu
.update(
2532 parse_yaml_strings(kdur
["additionalParams"].copy())
2536 logging_text
=logging_text
,
2539 nslcmop_id
=nslcmop_id
,
2545 member_vnf_index
=member_vnf_index
,
2546 vdu_index
=vdu_index
,
2548 deploy_params
=deploy_params_kdu
,
2549 descriptor_config
=descriptor_config
,
2550 base_folder
=base_folder
,
2551 task_instantiation_info
=tasks_dict_info
,
2555 # Check if this NS has a charm configuration
2556 descriptor_config
= nsd
.get("ns-configuration")
2557 if descriptor_config
and descriptor_config
.get("juju"):
2560 member_vnf_index
= None
2566 # Get additional parameters
2567 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2568 if db_nsr
.get("additionalParamsForNs"):
2569 deploy_params
.update(
2570 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2572 base_folder
= nsd
["_admin"]["storage"]
2574 logging_text
=logging_text
,
2577 nslcmop_id
=nslcmop_id
,
2583 member_vnf_index
=member_vnf_index
,
2584 vdu_index
=vdu_index
,
2586 deploy_params
=deploy_params
,
2587 descriptor_config
=descriptor_config
,
2588 base_folder
=base_folder
,
2589 task_instantiation_info
=tasks_dict_info
,
2593 # rest of staff will be done at finally
2596 ROclient
.ROClientException
,
2602 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2605 except asyncio
.CancelledError
:
2607 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2609 exc
= "Operation was cancelled"
2610 except Exception as e
:
2611 exc
= traceback
.format_exc()
2612 self
.logger
.critical(
2613 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2618 error_list
.append(str(exc
))
2620 # wait for pending tasks
2622 stage
[1] = "Waiting for instantiate pending tasks."
2623 self
.logger
.debug(logging_text
+ stage
[1])
2624 error_list
+= await self
._wait
_for
_tasks
(
2632 stage
[1] = stage
[2] = ""
2633 except asyncio
.CancelledError
:
2634 error_list
.append("Cancelled")
2635 # TODO cancel all tasks
2636 except Exception as exc
:
2637 error_list
.append(str(exc
))
2639 # update operation-status
2640 db_nsr_update
["operational-status"] = "running"
2641 # let's begin with VCA 'configured' status (later we can change it)
2642 db_nsr_update
["config-status"] = "configured"
2643 for task
, task_name
in tasks_dict_info
.items():
2644 if not task
.done() or task
.cancelled() or task
.exception():
2645 if task_name
.startswith(self
.task_name_deploy_vca
):
2646 # A N2VC task is pending
2647 db_nsr_update
["config-status"] = "failed"
2649 # RO or KDU task is pending
2650 db_nsr_update
["operational-status"] = "failed"
2652 # update status at database
2654 error_detail
= ". ".join(error_list
)
2655 self
.logger
.error(logging_text
+ error_detail
)
2656 error_description_nslcmop
= "{} Detail: {}".format(
2657 stage
[0], error_detail
2659 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2660 nslcmop_id
, stage
[0]
2663 db_nsr_update
["detailed-status"] = (
2664 error_description_nsr
+ " Detail: " + error_detail
2666 db_nslcmop_update
["detailed-status"] = error_detail
2667 nslcmop_operation_state
= "FAILED"
2671 error_description_nsr
= error_description_nslcmop
= None
2673 db_nsr_update
["detailed-status"] = "Done"
2674 db_nslcmop_update
["detailed-status"] = "Done"
2675 nslcmop_operation_state
= "COMPLETED"
2678 self
._write
_ns
_status
(
2681 current_operation
="IDLE",
2682 current_operation_id
=None,
2683 error_description
=error_description_nsr
,
2684 error_detail
=error_detail
,
2685 other_update
=db_nsr_update
,
2687 self
._write
_op
_status
(
2690 error_message
=error_description_nslcmop
,
2691 operation_state
=nslcmop_operation_state
,
2692 other_update
=db_nslcmop_update
,
2695 if nslcmop_operation_state
:
2697 await self
.msg
.aiowrite(
2702 "nslcmop_id": nslcmop_id
,
2703 "operationState": nslcmop_operation_state
,
2707 except Exception as e
:
2709 logging_text
+ "kafka_write notification Exception {}".format(e
)
2712 self
.logger
.debug(logging_text
+ "Exit")
2713 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2715 async def _add_vca_relations(
2720 timeout
: int = 3600,
2721 vca_type
: str = None,
2726 # 1. find all relations for this VCA
2727 # 2. wait for other peers related
2731 vca_type
= vca_type
or "lxc_proxy_charm"
2733 # STEP 1: find all relations for this VCA
2736 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2737 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2740 my_vca
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))[vca_index
]
2742 # read all ns-configuration relations
2743 ns_relations
= list()
2744 db_ns_relations
= deep_get(nsd
, ("ns-configuration", "relation"))
2746 for r
in db_ns_relations
:
2747 # check if this VCA is in the relation
2748 if my_vca
.get("member-vnf-index") in (
2749 r
.get("entities")[0].get("id"),
2750 r
.get("entities")[1].get("id"),
2752 ns_relations
.append(r
)
2754 # read all vnf-configuration relations
2755 vnf_relations
= list()
2756 db_vnfd_list
= db_nsr
.get("vnfd-id")
2758 for vnfd
in db_vnfd_list
:
2759 db_vnf_relations
= None
2760 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2761 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2762 if db_vnf_configuration
:
2763 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2764 if db_vnf_relations
:
2765 for r
in db_vnf_relations
:
2766 # check if this VCA is in the relation
2767 if my_vca
.get("vdu_id") in (
2768 r
.get("entities")[0].get("id"),
2769 r
.get("entities")[1].get("id"),
2771 vnf_relations
.append(r
)
2773 # if no relations, terminate
2774 if not ns_relations
and not vnf_relations
:
2775 self
.logger
.debug(logging_text
+ " No relations")
2780 + " adding relations\n {}\n {}".format(
2781 ns_relations
, vnf_relations
2790 if now
- start
>= timeout
:
2791 self
.logger
.error(logging_text
+ " : timeout adding relations")
2794 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2795 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2797 # for each defined NS relation, find the VCA's related
2798 for r
in ns_relations
.copy():
2799 from_vca_ee_id
= None
2801 from_vca_endpoint
= None
2802 to_vca_endpoint
= None
2803 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2804 for vca
in vca_list
:
2805 if vca
.get("member-vnf-index") == r
.get("entities")[0].get(
2807 ) and vca
.get("config_sw_installed"):
2808 from_vca_ee_id
= vca
.get("ee_id")
2809 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2810 if vca
.get("member-vnf-index") == r
.get("entities")[1].get(
2812 ) and vca
.get("config_sw_installed"):
2813 to_vca_ee_id
= vca
.get("ee_id")
2814 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2815 if from_vca_ee_id
and to_vca_ee_id
:
2817 await self
.vca_map
[vca_type
].add_relation(
2818 ee_id_1
=from_vca_ee_id
,
2819 ee_id_2
=to_vca_ee_id
,
2820 endpoint_1
=from_vca_endpoint
,
2821 endpoint_2
=to_vca_endpoint
,
2824 # remove entry from relations list
2825 ns_relations
.remove(r
)
2827 # check failed peers
2829 vca_status_list
= db_nsr
.get("configurationStatus")
2831 for i
in range(len(vca_list
)):
2833 vca_status
= vca_status_list
[i
]
2834 if vca
.get("member-vnf-index") == r
.get("entities")[
2837 if vca_status
.get("status") == "BROKEN":
2838 # peer broken: remove relation from list
2839 ns_relations
.remove(r
)
2840 if vca
.get("member-vnf-index") == r
.get("entities")[
2843 if vca_status
.get("status") == "BROKEN":
2844 # peer broken: remove relation from list
2845 ns_relations
.remove(r
)
2850 # for each defined VNF relation, find the VCA's related
2851 for r
in vnf_relations
.copy():
2852 from_vca_ee_id
= None
2854 from_vca_endpoint
= None
2855 to_vca_endpoint
= None
2856 vca_list
= deep_get(db_nsr
, ("_admin", "deployed", "VCA"))
2857 for vca
in vca_list
:
2858 key_to_check
= "vdu_id"
2859 if vca
.get("vdu_id") is None:
2860 key_to_check
= "vnfd_id"
2861 if vca
.get(key_to_check
) == r
.get("entities")[0].get(
2863 ) and vca
.get("config_sw_installed"):
2864 from_vca_ee_id
= vca
.get("ee_id")
2865 from_vca_endpoint
= r
.get("entities")[0].get("endpoint")
2866 if vca
.get(key_to_check
) == r
.get("entities")[1].get(
2868 ) and vca
.get("config_sw_installed"):
2869 to_vca_ee_id
= vca
.get("ee_id")
2870 to_vca_endpoint
= r
.get("entities")[1].get("endpoint")
2871 if from_vca_ee_id
and to_vca_ee_id
:
2873 await self
.vca_map
[vca_type
].add_relation(
2874 ee_id_1
=from_vca_ee_id
,
2875 ee_id_2
=to_vca_ee_id
,
2876 endpoint_1
=from_vca_endpoint
,
2877 endpoint_2
=to_vca_endpoint
,
2880 # remove entry from relations list
2881 vnf_relations
.remove(r
)
2883 # check failed peers
2885 vca_status_list
= db_nsr
.get("configurationStatus")
2887 for i
in range(len(vca_list
)):
2889 vca_status
= vca_status_list
[i
]
2890 if vca
.get("vdu_id") == r
.get("entities")[0].get(
2893 if vca_status
.get("status") == "BROKEN":
2894 # peer broken: remove relation from list
2895 vnf_relations
.remove(r
)
2896 if vca
.get("vdu_id") == r
.get("entities")[1].get(
2899 if vca_status
.get("status") == "BROKEN":
2900 # peer broken: remove relation from list
2901 vnf_relations
.remove(r
)
2907 await asyncio
.sleep(5.0)
2909 if not ns_relations
and not vnf_relations
:
2910 self
.logger
.debug("Relations added")
2915 except Exception as e
:
2916 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
2919 async def _install_kdu(
2927 k8s_instance_info
: dict,
2928 k8params
: dict = None,
2934 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2937 "collection": "nsrs",
2938 "filter": {"_id": nsr_id
},
2939 "path": nsr_db_path
,
2942 if k8s_instance_info
.get("kdu-deployment-name"):
2943 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2945 kdu_instance
= self
.k8scluster_map
[
2947 ].generate_kdu_instance_name(
2948 db_dict
=db_dict_install
,
2949 kdu_model
=k8s_instance_info
["kdu-model"],
2950 kdu_name
=k8s_instance_info
["kdu-name"],
2953 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2955 await self
.k8scluster_map
[k8sclustertype
].install(
2956 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2957 kdu_model
=k8s_instance_info
["kdu-model"],
2960 db_dict
=db_dict_install
,
2962 kdu_name
=k8s_instance_info
["kdu-name"],
2963 namespace
=k8s_instance_info
["namespace"],
2964 kdu_instance
=kdu_instance
,
2968 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
2971 # Obtain services to obtain management service ip
2972 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2973 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2974 kdu_instance
=kdu_instance
,
2975 namespace
=k8s_instance_info
["namespace"],
2978 # Obtain management service info (if exists)
2979 vnfr_update_dict
= {}
2980 kdu_config
= get_configuration(vnfd
, kdud
["name"])
2982 target_ee_list
= kdu_config
.get("execution-environment-list", [])
2987 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2990 for service
in kdud
.get("service", [])
2991 if service
.get("mgmt-service")
2993 for mgmt_service
in mgmt_services
:
2994 for service
in services
:
2995 if service
["name"].startswith(mgmt_service
["name"]):
2996 # Mgmt service found, Obtain service ip
2997 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2998 if isinstance(ip
, list) and len(ip
) == 1:
3002 "kdur.{}.ip-address".format(kdu_index
)
3005 # Check if must update also mgmt ip at the vnf
3006 service_external_cp
= mgmt_service
.get(
3007 "external-connection-point-ref"
3009 if service_external_cp
:
3011 deep_get(vnfd
, ("mgmt-interface", "cp"))
3012 == service_external_cp
3014 vnfr_update_dict
["ip-address"] = ip
3019 "external-connection-point-ref", ""
3021 == service_external_cp
,
3024 "kdur.{}.ip-address".format(kdu_index
)
3029 "Mgmt service name: {} not found".format(
3030 mgmt_service
["name"]
3034 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3035 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3037 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3040 and kdu_config
.get("initial-config-primitive")
3041 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3043 initial_config_primitive_list
= kdu_config
.get(
3044 "initial-config-primitive"
3046 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3048 for initial_config_primitive
in initial_config_primitive_list
:
3049 primitive_params_
= self
._map
_primitive
_params
(
3050 initial_config_primitive
, {}, {}
3053 await asyncio
.wait_for(
3054 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3055 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3056 kdu_instance
=kdu_instance
,
3057 primitive_name
=initial_config_primitive
["name"],
3058 params
=primitive_params_
,
3059 db_dict
=db_dict_install
,
3065 except Exception as e
:
3066 # Prepare update db with error and raise exception
3069 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3073 vnfr_data
.get("_id"),
3074 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3077 # ignore to keep original exception
3079 # reraise original error
3084 async def deploy_kdus(
3091 task_instantiation_info
,
3093 # Launch kdus if present in the descriptor
3095 k8scluster_id_2_uuic
= {
3096 "helm-chart-v3": {},
3101 async def _get_cluster_id(cluster_id
, cluster_type
):
3102 nonlocal k8scluster_id_2_uuic
3103 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3104 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3106 # check if K8scluster is creating and wait look if previous tasks in process
3107 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3108 "k8scluster", cluster_id
3111 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3112 task_name
, cluster_id
3114 self
.logger
.debug(logging_text
+ text
)
3115 await asyncio
.wait(task_dependency
, timeout
=3600)
3117 db_k8scluster
= self
.db
.get_one(
3118 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3120 if not db_k8scluster
:
3121 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3123 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3125 if cluster_type
== "helm-chart-v3":
3127 # backward compatibility for existing clusters that have not been initialized for helm v3
3128 k8s_credentials
= yaml
.safe_dump(
3129 db_k8scluster
.get("credentials")
3131 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3132 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3134 db_k8scluster_update
= {}
3135 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3136 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3137 db_k8scluster_update
[
3138 "_admin.helm-chart-v3.created"
3140 db_k8scluster_update
[
3141 "_admin.helm-chart-v3.operationalState"
3144 "k8sclusters", cluster_id
, db_k8scluster_update
3146 except Exception as e
:
3149 + "error initializing helm-v3 cluster: {}".format(str(e
))
3152 "K8s cluster '{}' has not been initialized for '{}'".format(
3153 cluster_id
, cluster_type
3158 "K8s cluster '{}' has not been initialized for '{}'".format(
3159 cluster_id
, cluster_type
3162 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3165 logging_text
+= "Deploy kdus: "
3168 db_nsr_update
= {"_admin.deployed.K8s": []}
3169 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3172 updated_cluster_list
= []
3173 updated_v3_cluster_list
= []
3175 for vnfr_data
in db_vnfrs
.values():
3176 vca_id
= self
.get_vca_id(vnfr_data
, {})
3177 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3178 # Step 0: Prepare and set parameters
3179 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3180 vnfd_id
= vnfr_data
.get("vnfd-id")
3181 vnfd_with_id
= find_in_list(
3182 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3186 for kdud
in vnfd_with_id
["kdu"]
3187 if kdud
["name"] == kdur
["kdu-name"]
3189 namespace
= kdur
.get("k8s-namespace")
3190 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3191 if kdur
.get("helm-chart"):
3192 kdumodel
= kdur
["helm-chart"]
3193 # Default version: helm3, if helm-version is v2 assign v2
3194 k8sclustertype
= "helm-chart-v3"
3195 self
.logger
.debug("kdur: {}".format(kdur
))
3197 kdur
.get("helm-version")
3198 and kdur
.get("helm-version") == "v2"
3200 k8sclustertype
= "helm-chart"
3201 elif kdur
.get("juju-bundle"):
3202 kdumodel
= kdur
["juju-bundle"]
3203 k8sclustertype
= "juju-bundle"
3206 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3207 "juju-bundle. Maybe an old NBI version is running".format(
3208 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3211 # check if kdumodel is a file and exists
3213 vnfd_with_id
= find_in_list(
3214 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3216 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3217 if storage
and storage
.get(
3219 ): # may be not present if vnfd has not artifacts
3220 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3221 filename
= "{}/{}/{}s/{}".format(
3227 if self
.fs
.file_exists(
3228 filename
, mode
="file"
3229 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3230 kdumodel
= self
.fs
.path
+ filename
3231 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3233 except Exception: # it is not a file
3236 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3237 step
= "Synchronize repos for k8s cluster '{}'".format(
3240 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3244 k8sclustertype
== "helm-chart"
3245 and cluster_uuid
not in updated_cluster_list
3247 k8sclustertype
== "helm-chart-v3"
3248 and cluster_uuid
not in updated_v3_cluster_list
3250 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3251 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3252 cluster_uuid
=cluster_uuid
3255 if del_repo_list
or added_repo_dict
:
3256 if k8sclustertype
== "helm-chart":
3258 "_admin.helm_charts_added." + item
: None
3259 for item
in del_repo_list
3262 "_admin.helm_charts_added." + item
: name
3263 for item
, name
in added_repo_dict
.items()
3265 updated_cluster_list
.append(cluster_uuid
)
3266 elif k8sclustertype
== "helm-chart-v3":
3268 "_admin.helm_charts_v3_added." + item
: None
3269 for item
in del_repo_list
3272 "_admin.helm_charts_v3_added." + item
: name
3273 for item
, name
in added_repo_dict
.items()
3275 updated_v3_cluster_list
.append(cluster_uuid
)
3277 logging_text
+ "repos synchronized on k8s cluster "
3278 "'{}' to_delete: {}, to_add: {}".format(
3279 k8s_cluster_id
, del_repo_list
, added_repo_dict
3284 {"_id": k8s_cluster_id
},
3290 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3291 vnfr_data
["member-vnf-index-ref"],
3295 k8s_instance_info
= {
3296 "kdu-instance": None,
3297 "k8scluster-uuid": cluster_uuid
,
3298 "k8scluster-type": k8sclustertype
,
3299 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3300 "kdu-name": kdur
["kdu-name"],
3301 "kdu-model": kdumodel
,
3302 "namespace": namespace
,
3303 "kdu-deployment-name": kdu_deployment_name
,
3305 db_path
= "_admin.deployed.K8s.{}".format(index
)
3306 db_nsr_update
[db_path
] = k8s_instance_info
3307 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3308 vnfd_with_id
= find_in_list(
3309 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3311 task
= asyncio
.ensure_future(
3320 k8params
=desc_params
,
3325 self
.lcm_tasks
.register(
3329 "instantiate_KDU-{}".format(index
),
3332 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3338 except (LcmException
, asyncio
.CancelledError
):
3340 except Exception as e
:
3341 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3342 if isinstance(e
, (N2VCException
, DbException
)):
3343 self
.logger
.error(logging_text
+ msg
)
3345 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3346 raise LcmException(msg
)
3349 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3368 task_instantiation_info
,
3371 # launch instantiate_N2VC in a asyncio task and register task object
3372 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3373 # if not found, create one entry and update database
3374 # fill db_nsr._admin.deployed.VCA.<index>
3377 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3379 if "execution-environment-list" in descriptor_config
:
3380 ee_list
= descriptor_config
.get("execution-environment-list", [])
3381 elif "juju" in descriptor_config
:
3382 ee_list
= [descriptor_config
] # ns charms
3383 else: # other types as script are not supported
3386 for ee_item
in ee_list
:
3389 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3390 ee_item
.get("juju"), ee_item
.get("helm-chart")
3393 ee_descriptor_id
= ee_item
.get("id")
3394 if ee_item
.get("juju"):
3395 vca_name
= ee_item
["juju"].get("charm")
3398 if ee_item
["juju"].get("charm") is not None
3401 if ee_item
["juju"].get("cloud") == "k8s":
3402 vca_type
= "k8s_proxy_charm"
3403 elif ee_item
["juju"].get("proxy") is False:
3404 vca_type
= "native_charm"
3405 elif ee_item
.get("helm-chart"):
3406 vca_name
= ee_item
["helm-chart"]
3407 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3410 vca_type
= "helm-v3"
3413 logging_text
+ "skipping non juju neither charm configuration"
3418 for vca_index
, vca_deployed
in enumerate(
3419 db_nsr
["_admin"]["deployed"]["VCA"]
3421 if not vca_deployed
:
3424 vca_deployed
.get("member-vnf-index") == member_vnf_index
3425 and vca_deployed
.get("vdu_id") == vdu_id
3426 and vca_deployed
.get("kdu_name") == kdu_name
3427 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3428 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3432 # not found, create one.
3434 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3437 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3439 target
+= "/kdu/{}".format(kdu_name
)
3441 "target_element": target
,
3442 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3443 "member-vnf-index": member_vnf_index
,
3445 "kdu_name": kdu_name
,
3446 "vdu_count_index": vdu_index
,
3447 "operational-status": "init", # TODO revise
3448 "detailed-status": "", # TODO revise
3449 "step": "initial-deploy", # TODO revise
3451 "vdu_name": vdu_name
,
3453 "ee_descriptor_id": ee_descriptor_id
,
3457 # create VCA and configurationStatus in db
3459 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3460 "configurationStatus.{}".format(vca_index
): dict(),
3462 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3464 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3466 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3467 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3468 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3471 task_n2vc
= asyncio
.ensure_future(
3472 self
.instantiate_N2VC(
3473 logging_text
=logging_text
,
3474 vca_index
=vca_index
,
3480 vdu_index
=vdu_index
,
3481 deploy_params
=deploy_params
,
3482 config_descriptor
=descriptor_config
,
3483 base_folder
=base_folder
,
3484 nslcmop_id
=nslcmop_id
,
3488 ee_config_descriptor
=ee_item
,
3491 self
.lcm_tasks
.register(
3495 "instantiate_N2VC-{}".format(vca_index
),
3498 task_instantiation_info
[
3500 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3501 member_vnf_index
or "", vdu_id
or ""
3505 def _create_nslcmop(nsr_id
, operation
, params
):
3507 Creates a ns-lcm-opp content to be stored at database.
3508 :param nsr_id: internal id of the instance
3509 :param operation: instantiate, terminate, scale, action, ...
3510 :param params: user parameters for the operation
3511 :return: dictionary following SOL005 format
3513 # Raise exception if invalid arguments
3514 if not (nsr_id
and operation
and params
):
3516 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3523 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3524 "operationState": "PROCESSING",
3525 "statusEnteredTime": now
,
3526 "nsInstanceId": nsr_id
,
3527 "lcmOperationType": operation
,
3529 "isAutomaticInvocation": False,
3530 "operationParams": params
,
3531 "isCancelPending": False,
3533 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3534 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3539 def _format_additional_params(self
, params
):
3540 params
= params
or {}
3541 for key
, value
in params
.items():
3542 if str(value
).startswith("!!yaml "):
3543 params
[key
] = yaml
.safe_load(value
[7:])
3546 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3547 primitive
= seq
.get("name")
3548 primitive_params
= {}
3550 "member_vnf_index": vnf_index
,
3551 "primitive": primitive
,
3552 "primitive_params": primitive_params
,
3555 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3559 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3560 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3561 if op
.get("operationState") == "COMPLETED":
3562 # b. Skip sub-operation
3563 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3564 return self
.SUBOPERATION_STATUS_SKIP
3566 # c. retry executing sub-operation
3567 # The sub-operation exists, and operationState != 'COMPLETED'
3568 # Update operationState = 'PROCESSING' to indicate a retry.
3569 operationState
= "PROCESSING"
3570 detailed_status
= "In progress"
3571 self
._update
_suboperation
_status
(
3572 db_nslcmop
, op_index
, operationState
, detailed_status
3574 # Return the sub-operation index
3575 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3576 # with arguments extracted from the sub-operation
3579 # Find a sub-operation where all keys in a matching dictionary must match
3580 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3581 def _find_suboperation(self
, db_nslcmop
, match
):
3582 if db_nslcmop
and match
:
3583 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3584 for i
, op
in enumerate(op_list
):
3585 if all(op
.get(k
) == match
[k
] for k
in match
):
3587 return self
.SUBOPERATION_STATUS_NOT_FOUND
3589 # Update status for a sub-operation given its index
3590 def _update_suboperation_status(
3591 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3593 # Update DB for HA tasks
3594 q_filter
= {"_id": db_nslcmop
["_id"]}
3596 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3597 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3600 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3603 # Add sub-operation, return the index of the added sub-operation
3604 # Optionally, set operationState, detailed-status, and operationType
3605 # Status and type are currently set for 'scale' sub-operations:
3606 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3607 # 'detailed-status' : status message
3608 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3609 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3610 def _add_suboperation(
3618 mapped_primitive_params
,
3619 operationState
=None,
3620 detailed_status
=None,
3623 RO_scaling_info
=None,
3626 return self
.SUBOPERATION_STATUS_NOT_FOUND
3627 # Get the "_admin.operations" list, if it exists
3628 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3629 op_list
= db_nslcmop_admin
.get("operations")
3630 # Create or append to the "_admin.operations" list
3632 "member_vnf_index": vnf_index
,
3634 "vdu_count_index": vdu_count_index
,
3635 "primitive": primitive
,
3636 "primitive_params": mapped_primitive_params
,
3639 new_op
["operationState"] = operationState
3641 new_op
["detailed-status"] = detailed_status
3643 new_op
["lcmOperationType"] = operationType
3645 new_op
["RO_nsr_id"] = RO_nsr_id
3647 new_op
["RO_scaling_info"] = RO_scaling_info
3649 # No existing operations, create key 'operations' with current operation as first list element
3650 db_nslcmop_admin
.update({"operations": [new_op
]})
3651 op_list
= db_nslcmop_admin
.get("operations")
3653 # Existing operations, append operation to list
3654 op_list
.append(new_op
)
3656 db_nslcmop_update
= {"_admin.operations": op_list
}
3657 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3658 op_index
= len(op_list
) - 1
3661 # Helper methods for scale() sub-operations
3663 # pre-scale/post-scale:
3664 # Check for 3 different cases:
3665 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3666 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3667 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3668 def _check_or_add_scale_suboperation(
3672 vnf_config_primitive
,
3676 RO_scaling_info
=None,
3678 # Find this sub-operation
3679 if RO_nsr_id
and RO_scaling_info
:
3680 operationType
= "SCALE-RO"
3682 "member_vnf_index": vnf_index
,
3683 "RO_nsr_id": RO_nsr_id
,
3684 "RO_scaling_info": RO_scaling_info
,
3688 "member_vnf_index": vnf_index
,
3689 "primitive": vnf_config_primitive
,
3690 "primitive_params": primitive_params
,
3691 "lcmOperationType": operationType
,
3693 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3694 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3695 # a. New sub-operation
3696 # The sub-operation does not exist, add it.
3697 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3698 # The following parameters are set to None for all kind of scaling:
3700 vdu_count_index
= None
3702 if RO_nsr_id
and RO_scaling_info
:
3703 vnf_config_primitive
= None
3704 primitive_params
= None
3707 RO_scaling_info
= None
3708 # Initial status for sub-operation
3709 operationState
= "PROCESSING"
3710 detailed_status
= "In progress"
3711 # Add sub-operation for pre/post-scaling (zero or more operations)
3712 self
._add
_suboperation
(
3718 vnf_config_primitive
,
3726 return self
.SUBOPERATION_STATUS_NEW
3728 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3729 # or op_index (operationState != 'COMPLETED')
3730 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3732 # Function to return execution_environment id
3734 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3735 # TODO vdu_index_count
3736 for vca
in vca_deployed_list
:
3737 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3740 async def destroy_N2VC(
3748 exec_primitives
=True,
3753 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3754 :param logging_text:
3756 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3757 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3758 :param vca_index: index in the database _admin.deployed.VCA
3759 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3760 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3761 not executed properly
3762 :param scaling_in: True destroys the application, False destroys the model
3763 :return: None or exception
3768 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3769 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3773 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3775 # execute terminate_primitives
3777 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
3778 config_descriptor
.get("terminate-config-primitive"),
3779 vca_deployed
.get("ee_descriptor_id"),
3781 vdu_id
= vca_deployed
.get("vdu_id")
3782 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3783 vdu_name
= vca_deployed
.get("vdu_name")
3784 vnf_index
= vca_deployed
.get("member-vnf-index")
3785 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3786 for seq
in terminate_primitives
:
3787 # For each sequence in list, get primitive and call _ns_execute_primitive()
3788 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3789 vnf_index
, seq
.get("name")
3791 self
.logger
.debug(logging_text
+ step
)
3792 # Create the primitive for each sequence, i.e. "primitive": "touch"
3793 primitive
= seq
.get("name")
3794 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
3799 self
._add
_suboperation
(
3806 mapped_primitive_params
,
3808 # Sub-operations: Call _ns_execute_primitive() instead of action()
3810 result
, result_detail
= await self
._ns
_execute
_primitive
(
3811 vca_deployed
["ee_id"],
3813 mapped_primitive_params
,
3817 except LcmException
:
3818 # this happens when VCA is not deployed. In this case it is not needed to terminate
3820 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
3821 if result
not in result_ok
:
3823 "terminate_primitive {} for vnf_member_index={} fails with "
3824 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
3826 # set that this VCA do not need terminated
3827 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
3831 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
3834 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3835 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3838 await self
.vca_map
[vca_type
].delete_execution_environment(
3839 vca_deployed
["ee_id"],
3840 scaling_in
=scaling_in
,
3845 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
3846 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
3847 namespace
= "." + db_nsr
["_id"]
3849 await self
.n2vc
.delete_namespace(
3850 namespace
=namespace
,
3851 total_timeout
=self
.timeout_charm_delete
,
3854 except N2VCNotFound
: # already deleted. Skip
3856 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
3858 async def _terminate_RO(
3859 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
3862 Terminates a deployment from RO
3863 :param logging_text:
3864 :param nsr_deployed: db_nsr._admin.deployed
3867 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3868 this method will update only the index 2, but it will write on database the concatenated content of the list
3873 ro_nsr_id
= ro_delete_action
= None
3874 if nsr_deployed
and nsr_deployed
.get("RO"):
3875 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3876 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3879 stage
[2] = "Deleting ns from VIM."
3880 db_nsr_update
["detailed-status"] = " ".join(stage
)
3881 self
._write
_op
_status
(nslcmop_id
, stage
)
3882 self
.logger
.debug(logging_text
+ stage
[2])
3883 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3884 self
._write
_op
_status
(nslcmop_id
, stage
)
3885 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3886 ro_delete_action
= desc
["action_id"]
3888 "_admin.deployed.RO.nsr_delete_action_id"
3889 ] = ro_delete_action
3890 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3891 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3892 if ro_delete_action
:
3893 # wait until NS is deleted from VIM
3894 stage
[2] = "Waiting ns deleted from VIM."
3895 detailed_status_old
= None
3899 + " RO_id={} ro_delete_action={}".format(
3900 ro_nsr_id
, ro_delete_action
3903 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3904 self
._write
_op
_status
(nslcmop_id
, stage
)
3906 delete_timeout
= 20 * 60 # 20 minutes
3907 while delete_timeout
> 0:
3908 desc
= await self
.RO
.show(
3910 item_id_name
=ro_nsr_id
,
3911 extra_item
="action",
3912 extra_item_id
=ro_delete_action
,
3916 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3918 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3919 if ns_status
== "ERROR":
3920 raise ROclient
.ROClientException(ns_status_info
)
3921 elif ns_status
== "BUILD":
3922 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3923 elif ns_status
== "ACTIVE":
3924 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3925 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3930 ), "ROclient.check_action_status returns unknown {}".format(
3933 if stage
[2] != detailed_status_old
:
3934 detailed_status_old
= stage
[2]
3935 db_nsr_update
["detailed-status"] = " ".join(stage
)
3936 self
._write
_op
_status
(nslcmop_id
, stage
)
3937 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3938 await asyncio
.sleep(5, loop
=self
.loop
)
3940 else: # delete_timeout <= 0:
3941 raise ROclient
.ROClientException(
3942 "Timeout waiting ns deleted from VIM"
3945 except Exception as e
:
3946 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3948 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3950 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3951 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3952 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3954 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
3957 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3959 failed_detail
.append("delete conflict: {}".format(e
))
3962 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
3965 failed_detail
.append("delete error: {}".format(e
))
3967 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
3971 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3972 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3974 stage
[2] = "Deleting nsd from RO."
3975 db_nsr_update
["detailed-status"] = " ".join(stage
)
3976 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3977 self
._write
_op
_status
(nslcmop_id
, stage
)
3978 await self
.RO
.delete("nsd", ro_nsd_id
)
3980 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
3982 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3983 except Exception as e
:
3985 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
3987 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3989 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
3992 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
3994 failed_detail
.append(
3995 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
3997 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3999 failed_detail
.append(
4000 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4002 self
.logger
.error(logging_text
+ failed_detail
[-1])
4004 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4005 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4006 if not vnf_deployed
or not vnf_deployed
["id"]:
4009 ro_vnfd_id
= vnf_deployed
["id"]
4012 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4013 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4015 db_nsr_update
["detailed-status"] = " ".join(stage
)
4016 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4017 self
._write
_op
_status
(nslcmop_id
, stage
)
4018 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4020 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4022 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4023 except Exception as e
:
4025 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4028 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4032 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4035 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4037 failed_detail
.append(
4038 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4040 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4042 failed_detail
.append(
4043 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4045 self
.logger
.error(logging_text
+ failed_detail
[-1])
4048 stage
[2] = "Error deleting from VIM"
4050 stage
[2] = "Deleted from VIM"
4051 db_nsr_update
["detailed-status"] = " ".join(stage
)
4052 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4053 self
._write
_op
_status
(nslcmop_id
, stage
)
4056 raise LcmException("; ".join(failed_detail
))
4058 async def terminate(self
, nsr_id
, nslcmop_id
):
4059 # Try to lock HA task here
4060 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4061 if not task_is_locked_by_me
:
4064 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4065 self
.logger
.debug(logging_text
+ "Enter")
4066 timeout_ns_terminate
= self
.timeout_ns_terminate
4069 operation_params
= None
4071 error_list
= [] # annotates all failed error messages
4072 db_nslcmop_update
= {}
4073 autoremove
= False # autoremove after terminated
4074 tasks_dict_info
= {}
4077 "Stage 1/3: Preparing task.",
4078 "Waiting for previous operations to terminate.",
4081 # ^ contains [stage, step, VIM-status]
4083 # wait for any previous tasks in process
4084 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4086 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4087 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4088 operation_params
= db_nslcmop
.get("operationParams") or {}
4089 if operation_params
.get("timeout_ns_terminate"):
4090 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4091 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4092 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4094 db_nsr_update
["operational-status"] = "terminating"
4095 db_nsr_update
["config-status"] = "terminating"
4096 self
._write
_ns
_status
(
4098 ns_state
="TERMINATING",
4099 current_operation
="TERMINATING",
4100 current_operation_id
=nslcmop_id
,
4101 other_update
=db_nsr_update
,
4103 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4104 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4105 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4108 stage
[1] = "Getting vnf descriptors from db."
4109 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4111 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4113 db_vnfds_from_id
= {}
4114 db_vnfds_from_member_index
= {}
4116 for vnfr
in db_vnfrs_list
:
4117 vnfd_id
= vnfr
["vnfd-id"]
4118 if vnfd_id
not in db_vnfds_from_id
:
4119 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4120 db_vnfds_from_id
[vnfd_id
] = vnfd
4121 db_vnfds_from_member_index
[
4122 vnfr
["member-vnf-index-ref"]
4123 ] = db_vnfds_from_id
[vnfd_id
]
4125 # Destroy individual execution environments when there are terminating primitives.
4126 # Rest of EE will be deleted at once
4127 # TODO - check before calling _destroy_N2VC
4128 # if not operation_params.get("skip_terminate_primitives"):#
4129 # or not vca.get("needed_terminate"):
4130 stage
[0] = "Stage 2/3 execute terminating primitives."
4131 self
.logger
.debug(logging_text
+ stage
[0])
4132 stage
[1] = "Looking execution environment that needs terminate."
4133 self
.logger
.debug(logging_text
+ stage
[1])
4135 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4136 config_descriptor
= None
4137 vca_member_vnf_index
= vca
.get("member-vnf-index")
4138 vca_id
= self
.get_vca_id(
4139 db_vnfrs_dict
.get(vca_member_vnf_index
)
4140 if vca_member_vnf_index
4144 if not vca
or not vca
.get("ee_id"):
4146 if not vca
.get("member-vnf-index"):
4148 config_descriptor
= db_nsr
.get("ns-configuration")
4149 elif vca
.get("vdu_id"):
4150 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4151 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4152 elif vca
.get("kdu_name"):
4153 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4154 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4156 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4157 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4158 vca_type
= vca
.get("type")
4159 exec_terminate_primitives
= not operation_params
.get(
4160 "skip_terminate_primitives"
4161 ) and vca
.get("needed_terminate")
4162 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4163 # pending native charms
4165 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4167 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4168 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4169 task
= asyncio
.ensure_future(
4177 exec_terminate_primitives
,
4181 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4183 # wait for pending tasks of terminate primitives
4187 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4189 error_list
= await self
._wait
_for
_tasks
(
4192 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4196 tasks_dict_info
.clear()
4198 return # raise LcmException("; ".join(error_list))
4200 # remove All execution environments at once
4201 stage
[0] = "Stage 3/3 delete all."
4203 if nsr_deployed
.get("VCA"):
4204 stage
[1] = "Deleting all execution environments."
4205 self
.logger
.debug(logging_text
+ stage
[1])
4206 vca_id
= self
.get_vca_id({}, db_nsr
)
4207 task_delete_ee
= asyncio
.ensure_future(
4209 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4210 timeout
=self
.timeout_charm_delete
,
4213 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4214 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4216 # Delete from k8scluster
4217 stage
[1] = "Deleting KDUs."
4218 self
.logger
.debug(logging_text
+ stage
[1])
4219 # print(nsr_deployed)
4220 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4221 if not kdu
or not kdu
.get("kdu-instance"):
4223 kdu_instance
= kdu
.get("kdu-instance")
4224 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4225 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4226 vca_id
= self
.get_vca_id({}, db_nsr
)
4227 task_delete_kdu_instance
= asyncio
.ensure_future(
4228 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4229 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4230 kdu_instance
=kdu_instance
,
4237 + "Unknown k8s deployment type {}".format(
4238 kdu
.get("k8scluster-type")
4243 task_delete_kdu_instance
4244 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4247 stage
[1] = "Deleting ns from VIM."
4249 task_delete_ro
= asyncio
.ensure_future(
4250 self
._terminate
_ng
_ro
(
4251 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4255 task_delete_ro
= asyncio
.ensure_future(
4257 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4260 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4262 # rest of staff will be done at finally
4265 ROclient
.ROClientException
,
4270 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4272 except asyncio
.CancelledError
:
4274 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4276 exc
= "Operation was cancelled"
4277 except Exception as e
:
4278 exc
= traceback
.format_exc()
4279 self
.logger
.critical(
4280 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4285 error_list
.append(str(exc
))
4287 # wait for pending tasks
4289 stage
[1] = "Waiting for terminate pending tasks."
4290 self
.logger
.debug(logging_text
+ stage
[1])
4291 error_list
+= await self
._wait
_for
_tasks
(
4294 timeout_ns_terminate
,
4298 stage
[1] = stage
[2] = ""
4299 except asyncio
.CancelledError
:
4300 error_list
.append("Cancelled")
4301 # TODO cancell all tasks
4302 except Exception as exc
:
4303 error_list
.append(str(exc
))
4304 # update status at database
4306 error_detail
= "; ".join(error_list
)
4307 # self.logger.error(logging_text + error_detail)
4308 error_description_nslcmop
= "{} Detail: {}".format(
4309 stage
[0], error_detail
4311 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4312 nslcmop_id
, stage
[0]
4315 db_nsr_update
["operational-status"] = "failed"
4316 db_nsr_update
["detailed-status"] = (
4317 error_description_nsr
+ " Detail: " + error_detail
4319 db_nslcmop_update
["detailed-status"] = error_detail
4320 nslcmop_operation_state
= "FAILED"
4324 error_description_nsr
= error_description_nslcmop
= None
4325 ns_state
= "NOT_INSTANTIATED"
4326 db_nsr_update
["operational-status"] = "terminated"
4327 db_nsr_update
["detailed-status"] = "Done"
4328 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4329 db_nslcmop_update
["detailed-status"] = "Done"
4330 nslcmop_operation_state
= "COMPLETED"
4333 self
._write
_ns
_status
(
4336 current_operation
="IDLE",
4337 current_operation_id
=None,
4338 error_description
=error_description_nsr
,
4339 error_detail
=error_detail
,
4340 other_update
=db_nsr_update
,
4342 self
._write
_op
_status
(
4345 error_message
=error_description_nslcmop
,
4346 operation_state
=nslcmop_operation_state
,
4347 other_update
=db_nslcmop_update
,
4349 if ns_state
== "NOT_INSTANTIATED":
4353 {"nsr-id-ref": nsr_id
},
4354 {"_admin.nsState": "NOT_INSTANTIATED"},
4356 except DbException
as e
:
4359 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4363 if operation_params
:
4364 autoremove
= operation_params
.get("autoremove", False)
4365 if nslcmop_operation_state
:
4367 await self
.msg
.aiowrite(
4372 "nslcmop_id": nslcmop_id
,
4373 "operationState": nslcmop_operation_state
,
4374 "autoremove": autoremove
,
4378 except Exception as e
:
4380 logging_text
+ "kafka_write notification Exception {}".format(e
)
4383 self
.logger
.debug(logging_text
+ "Exit")
4384 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4386 async def _wait_for_tasks(
4387 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4390 error_detail_list
= []
4392 pending_tasks
= list(created_tasks_info
.keys())
4393 num_tasks
= len(pending_tasks
)
4395 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4396 self
._write
_op
_status
(nslcmop_id
, stage
)
4397 while pending_tasks
:
4399 _timeout
= timeout
+ time_start
- time()
4400 done
, pending_tasks
= await asyncio
.wait(
4401 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4403 num_done
+= len(done
)
4404 if not done
: # Timeout
4405 for task
in pending_tasks
:
4406 new_error
= created_tasks_info
[task
] + ": Timeout"
4407 error_detail_list
.append(new_error
)
4408 error_list
.append(new_error
)
4411 if task
.cancelled():
4414 exc
= task
.exception()
4416 if isinstance(exc
, asyncio
.TimeoutError
):
4418 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4419 error_list
.append(created_tasks_info
[task
])
4420 error_detail_list
.append(new_error
)
4427 ROclient
.ROClientException
,
4433 self
.logger
.error(logging_text
+ new_error
)
4435 exc_traceback
= "".join(
4436 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4440 + created_tasks_info
[task
]
4446 logging_text
+ created_tasks_info
[task
] + ": Done"
4448 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4450 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4451 if nsr_id
: # update also nsr
4456 "errorDescription": "Error at: " + ", ".join(error_list
),
4457 "errorDetail": ". ".join(error_detail_list
),
4460 self
._write
_op
_status
(nslcmop_id
, stage
)
4461 return error_detail_list
4464 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4466 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4467 The default-value is used. If it is between < > it look for a value at instantiation_params
4468 :param primitive_desc: portion of VNFD/NSD that describes primitive
4469 :param params: Params provided by user
4470 :param instantiation_params: Instantiation params provided by user
4471 :return: a dictionary with the calculated params
4473 calculated_params
= {}
4474 for parameter
in primitive_desc
.get("parameter", ()):
4475 param_name
= parameter
["name"]
4476 if param_name
in params
:
4477 calculated_params
[param_name
] = params
[param_name
]
4478 elif "default-value" in parameter
or "value" in parameter
:
4479 if "value" in parameter
:
4480 calculated_params
[param_name
] = parameter
["value"]
4482 calculated_params
[param_name
] = parameter
["default-value"]
4484 isinstance(calculated_params
[param_name
], str)
4485 and calculated_params
[param_name
].startswith("<")
4486 and calculated_params
[param_name
].endswith(">")
4488 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4489 calculated_params
[param_name
] = instantiation_params
[
4490 calculated_params
[param_name
][1:-1]
4494 "Parameter {} needed to execute primitive {} not provided".format(
4495 calculated_params
[param_name
], primitive_desc
["name"]
4500 "Parameter {} needed to execute primitive {} not provided".format(
4501 param_name
, primitive_desc
["name"]
4505 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4506 calculated_params
[param_name
] = yaml
.safe_dump(
4507 calculated_params
[param_name
], default_flow_style
=True, width
=256
4509 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4511 ].startswith("!!yaml "):
4512 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4513 if parameter
.get("data-type") == "INTEGER":
4515 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4516 except ValueError: # error converting string to int
4518 "Parameter {} of primitive {} must be integer".format(
4519 param_name
, primitive_desc
["name"]
4522 elif parameter
.get("data-type") == "BOOLEAN":
4523 calculated_params
[param_name
] = not (
4524 (str(calculated_params
[param_name
])).lower() == "false"
4527 # add always ns_config_info if primitive name is config
4528 if primitive_desc
["name"] == "config":
4529 if "ns_config_info" in instantiation_params
:
4530 calculated_params
["ns_config_info"] = instantiation_params
[
4533 return calculated_params
4535 def _look_for_deployed_vca(
4542 ee_descriptor_id
=None,
4544 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4545 for vca
in deployed_vca
:
4548 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4551 vdu_count_index
is not None
4552 and vdu_count_index
!= vca
["vdu_count_index"]
4555 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4557 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4561 # vca_deployed not found
4563 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4564 " is not deployed".format(
4573 ee_id
= vca
.get("ee_id")
4575 "type", "lxc_proxy_charm"
4576 ) # default value for backward compatibility - proxy charm
4579 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4580 "execution environment".format(
4581 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4584 return ee_id
, vca_type
4586 async def _ns_execute_primitive(
4592 retries_interval
=30,
4599 if primitive
== "config":
4600 primitive_params
= {"params": primitive_params
}
4602 vca_type
= vca_type
or "lxc_proxy_charm"
4606 output
= await asyncio
.wait_for(
4607 self
.vca_map
[vca_type
].exec_primitive(
4609 primitive_name
=primitive
,
4610 params_dict
=primitive_params
,
4611 progress_timeout
=self
.timeout_progress_primitive
,
4612 total_timeout
=self
.timeout_primitive
,
4617 timeout
=timeout
or self
.timeout_primitive
,
4621 except asyncio
.CancelledError
:
4623 except Exception as e
: # asyncio.TimeoutError
4624 if isinstance(e
, asyncio
.TimeoutError
):
4629 "Error executing action {} on {} -> {}".format(
4634 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4636 return "FAILED", str(e
)
4638 return "COMPLETED", output
4640 except (LcmException
, asyncio
.CancelledError
):
4642 except Exception as e
:
4643 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4645 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4647 Updating the vca_status with latest juju information in nsrs record
4648 :param: nsr_id: Id of the nsr
4649 :param: nslcmop_id: Id of the nslcmop
4653 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4654 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4655 vca_id
= self
.get_vca_id({}, db_nsr
)
4656 if db_nsr
["_admin"]["deployed"]["K8s"]:
4657 for k8s_index
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4658 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
4659 await self
._on
_update
_k
8s
_db
(
4660 cluster_uuid
, kdu_instance
, filter={"_id": nsr_id
}, vca_id
=vca_id
4663 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4664 table
, filter = "nsrs", {"_id": nsr_id
}
4665 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4666 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4668 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4669 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4671 async def action(self
, nsr_id
, nslcmop_id
):
4672 # Try to lock HA task here
4673 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4674 if not task_is_locked_by_me
:
4677 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4678 self
.logger
.debug(logging_text
+ "Enter")
4679 # get all needed from database
4683 db_nslcmop_update
= {}
4684 nslcmop_operation_state
= None
4685 error_description_nslcmop
= None
4688 # wait for any previous tasks in process
4689 step
= "Waiting for previous operations to terminate"
4690 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4692 self
._write
_ns
_status
(
4695 current_operation
="RUNNING ACTION",
4696 current_operation_id
=nslcmop_id
,
4699 step
= "Getting information from database"
4700 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4701 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4702 if db_nslcmop
["operationParams"].get("primitive_params"):
4703 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4704 db_nslcmop
["operationParams"]["primitive_params"]
4707 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4708 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4709 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4710 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4711 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4712 primitive
= db_nslcmop
["operationParams"]["primitive"]
4713 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4714 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4715 "timeout_ns_action", self
.timeout_primitive
4719 step
= "Getting vnfr from database"
4720 db_vnfr
= self
.db
.get_one(
4721 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4723 if db_vnfr
.get("kdur"):
4725 for kdur
in db_vnfr
["kdur"]:
4726 if kdur
.get("additionalParams"):
4727 kdur
["additionalParams"] = json
.loads(
4728 kdur
["additionalParams"]
4730 kdur_list
.append(kdur
)
4731 db_vnfr
["kdur"] = kdur_list
4732 step
= "Getting vnfd from database"
4733 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4735 step
= "Getting nsd from database"
4736 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4738 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
4739 # for backward compatibility
4740 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4741 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4742 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4743 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4745 # look for primitive
4746 config_primitive_desc
= descriptor_configuration
= None
4748 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
4750 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
4752 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
4754 descriptor_configuration
= db_nsd
.get("ns-configuration")
4756 if descriptor_configuration
and descriptor_configuration
.get(
4759 for config_primitive
in descriptor_configuration
["config-primitive"]:
4760 if config_primitive
["name"] == primitive
:
4761 config_primitive_desc
= config_primitive
4764 if not config_primitive_desc
:
4765 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
4767 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4771 primitive_name
= primitive
4772 ee_descriptor_id
= None
4774 primitive_name
= config_primitive_desc
.get(
4775 "execution-environment-primitive", primitive
4777 ee_descriptor_id
= config_primitive_desc
.get(
4778 "execution-environment-ref"
4784 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
4786 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
4789 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
4791 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
4793 desc_params
= parse_yaml_strings(
4794 db_vnfr
.get("additionalParamsForVnf")
4797 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
4798 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
4799 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
4801 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
4802 actions
.add(primitive
["name"])
4803 for primitive
in kdu_configuration
.get("config-primitive", []):
4804 actions
.add(primitive
["name"])
4805 kdu_action
= True if primitive_name
in actions
else False
4807 # TODO check if ns is in a proper status
4809 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
4811 # kdur and desc_params already set from before
4812 if primitive_params
:
4813 desc_params
.update(primitive_params
)
4814 # TODO Check if we will need something at vnf level
4815 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
4817 kdu_name
== kdu
["kdu-name"]
4818 and kdu
["member-vnf-index"] == vnf_index
4823 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
4826 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
4827 msg
= "unknown k8scluster-type '{}'".format(
4828 kdu
.get("k8scluster-type")
4830 raise LcmException(msg
)
4833 "collection": "nsrs",
4834 "filter": {"_id": nsr_id
},
4835 "path": "_admin.deployed.K8s.{}".format(index
),
4839 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
4841 step
= "Executing kdu {}".format(primitive_name
)
4842 if primitive_name
== "upgrade":
4843 if desc_params
.get("kdu_model"):
4844 kdu_model
= desc_params
.get("kdu_model")
4845 del desc_params
["kdu_model"]
4847 kdu_model
= kdu
.get("kdu-model")
4848 parts
= kdu_model
.split(sep
=":")
4850 kdu_model
= parts
[0]
4852 detailed_status
= await asyncio
.wait_for(
4853 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
4854 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4855 kdu_instance
=kdu
.get("kdu-instance"),
4857 kdu_model
=kdu_model
,
4860 timeout
=timeout_ns_action
,
4862 timeout
=timeout_ns_action
+ 10,
4865 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
4867 elif primitive_name
== "rollback":
4868 detailed_status
= await asyncio
.wait_for(
4869 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
4870 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4871 kdu_instance
=kdu
.get("kdu-instance"),
4874 timeout
=timeout_ns_action
,
4876 elif primitive_name
== "status":
4877 detailed_status
= await asyncio
.wait_for(
4878 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
4879 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4880 kdu_instance
=kdu
.get("kdu-instance"),
4883 timeout
=timeout_ns_action
,
4886 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
4887 kdu
["kdu-name"], nsr_id
4889 params
= self
._map
_primitive
_params
(
4890 config_primitive_desc
, primitive_params
, desc_params
4893 detailed_status
= await asyncio
.wait_for(
4894 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
4895 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4896 kdu_instance
=kdu_instance
,
4897 primitive_name
=primitive_name
,
4900 timeout
=timeout_ns_action
,
4903 timeout
=timeout_ns_action
,
4907 nslcmop_operation_state
= "COMPLETED"
4909 detailed_status
= ""
4910 nslcmop_operation_state
= "FAILED"
4912 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
4913 nsr_deployed
["VCA"],
4914 member_vnf_index
=vnf_index
,
4916 vdu_count_index
=vdu_count_index
,
4917 ee_descriptor_id
=ee_descriptor_id
,
4919 for vca_index
, vca_deployed
in enumerate(
4920 db_nsr
["_admin"]["deployed"]["VCA"]
4922 if vca_deployed
.get("member-vnf-index") == vnf_index
:
4924 "collection": "nsrs",
4925 "filter": {"_id": nsr_id
},
4926 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
4930 nslcmop_operation_state
,
4932 ) = await self
._ns
_execute
_primitive
(
4934 primitive
=primitive_name
,
4935 primitive_params
=self
._map
_primitive
_params
(
4936 config_primitive_desc
, primitive_params
, desc_params
4938 timeout
=timeout_ns_action
,
4944 db_nslcmop_update
["detailed-status"] = detailed_status
4945 error_description_nslcmop
= (
4946 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
4950 + " task Done with result {} {}".format(
4951 nslcmop_operation_state
, detailed_status
4954 return # database update is called inside finally
4956 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
4957 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4959 except asyncio
.CancelledError
:
4961 logging_text
+ "Cancelled Exception while '{}'".format(step
)
4963 exc
= "Operation was cancelled"
4964 except asyncio
.TimeoutError
:
4965 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
4967 except Exception as e
:
4968 exc
= traceback
.format_exc()
4969 self
.logger
.critical(
4970 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
4979 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4980 nslcmop_operation_state
= "FAILED"
4982 self
._write
_ns
_status
(
4986 ], # TODO check if degraded. For the moment use previous status
4987 current_operation
="IDLE",
4988 current_operation_id
=None,
4989 # error_description=error_description_nsr,
4990 # error_detail=error_detail,
4991 other_update
=db_nsr_update
,
4994 self
._write
_op
_status
(
4997 error_message
=error_description_nslcmop
,
4998 operation_state
=nslcmop_operation_state
,
4999 other_update
=db_nslcmop_update
,
5002 if nslcmop_operation_state
:
5004 await self
.msg
.aiowrite(
5009 "nslcmop_id": nslcmop_id
,
5010 "operationState": nslcmop_operation_state
,
5014 except Exception as e
:
5016 logging_text
+ "kafka_write notification Exception {}".format(e
)
5018 self
.logger
.debug(logging_text
+ "Exit")
5019 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5020 return nslcmop_operation_state
, detailed_status
5022 async def scale(self
, nsr_id
, nslcmop_id
):
5023 # Try to lock HA task here
5024 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5025 if not task_is_locked_by_me
:
5028 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5029 stage
= ["", "", ""]
5030 tasks_dict_info
= {}
5031 # ^ stage, step, VIM progress
5032 self
.logger
.debug(logging_text
+ "Enter")
5033 # get all needed from database
5035 db_nslcmop_update
= {}
5038 # in case of error, indicates what part of scale was failed to put nsr at error status
5039 scale_process
= None
5040 old_operational_status
= ""
5041 old_config_status
= ""
5044 # wait for any previous tasks in process
5045 step
= "Waiting for previous operations to terminate"
5046 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5047 self
._write
_ns
_status
(
5050 current_operation
="SCALING",
5051 current_operation_id
=nslcmop_id
,
5054 step
= "Getting nslcmop from database"
5056 step
+ " after having waited for previous tasks to be completed"
5058 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5060 step
= "Getting nsr from database"
5061 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5062 old_operational_status
= db_nsr
["operational-status"]
5063 old_config_status
= db_nsr
["config-status"]
5065 step
= "Parsing scaling parameters"
5066 db_nsr_update
["operational-status"] = "scaling"
5067 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5068 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5070 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5072 ]["member-vnf-index"]
5073 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5075 ]["scaling-group-descriptor"]
5076 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5077 # for backward compatibility
5078 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5079 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5080 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5081 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5083 step
= "Getting vnfr from database"
5084 db_vnfr
= self
.db
.get_one(
5085 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5088 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5090 step
= "Getting vnfd from database"
5091 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5093 base_folder
= db_vnfd
["_admin"]["storage"]
5095 step
= "Getting scaling-group-descriptor"
5096 scaling_descriptor
= find_in_list(
5097 get_scaling_aspect(db_vnfd
),
5098 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5100 if not scaling_descriptor
:
5102 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5103 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5106 step
= "Sending scale order to VIM"
5107 # TODO check if ns is in a proper status
5109 if not db_nsr
["_admin"].get("scaling-group"):
5114 "_admin.scaling-group": [
5115 {"name": scaling_group
, "nb-scale-op": 0}
5119 admin_scale_index
= 0
5121 for admin_scale_index
, admin_scale_info
in enumerate(
5122 db_nsr
["_admin"]["scaling-group"]
5124 if admin_scale_info
["name"] == scaling_group
:
5125 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
5127 else: # not found, set index one plus last element and add new entry with the name
5128 admin_scale_index
+= 1
5130 "_admin.scaling-group.{}.name".format(admin_scale_index
)
5133 vca_scaling_info
= []
5134 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
5135 if scaling_type
== "SCALE_OUT":
5136 if "aspect-delta-details" not in scaling_descriptor
:
5138 "Aspect delta details not fount in scaling descriptor {}".format(
5139 scaling_descriptor
["name"]
5142 # count if max-instance-count is reached
5143 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5145 scaling_info
["scaling_direction"] = "OUT"
5146 scaling_info
["vdu-create"] = {}
5147 scaling_info
["kdu-create"] = {}
5148 for delta
in deltas
:
5149 for vdu_delta
in delta
.get("vdu-delta", {}):
5150 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
5151 # vdu_index also provides the number of instance of the targeted vdu
5152 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5153 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5157 additional_params
= (
5158 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
5161 cloud_init_list
= []
5163 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5164 max_instance_count
= 10
5165 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
5166 max_instance_count
= vdu_profile
.get(
5167 "max-number-of-instances", 10
5170 default_instance_num
= get_number_of_instances(
5173 instances_number
= vdu_delta
.get("number-of-instances", 1)
5174 nb_scale_op
+= instances_number
5176 new_instance_count
= nb_scale_op
+ default_instance_num
5177 # Control if new count is over max and vdu count is less than max.
5178 # Then assign new instance count
5179 if new_instance_count
> max_instance_count
> vdu_count
:
5180 instances_number
= new_instance_count
- max_instance_count
5182 instances_number
= instances_number
5184 if new_instance_count
> max_instance_count
:
5186 "reached the limit of {} (max-instance-count) "
5187 "scaling-out operations for the "
5188 "scaling-group-descriptor '{}'".format(
5189 nb_scale_op
, scaling_group
5192 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5194 # TODO Information of its own ip is not available because db_vnfr is not updated.
5195 additional_params
["OSM"] = get_osm_params(
5196 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
5198 cloud_init_list
.append(
5199 self
._parse
_cloud
_init
(
5206 vca_scaling_info
.append(
5208 "osm_vdu_id": vdu_delta
["id"],
5209 "member-vnf-index": vnf_index
,
5211 "vdu_index": vdu_index
+ x
,
5214 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
5215 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5216 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5217 kdu_name
= kdu_profile
["kdu-name"]
5218 resource_name
= kdu_profile
["resource-name"]
5220 # Might have different kdus in the same delta
5221 # Should have list for each kdu
5222 if not scaling_info
["kdu-create"].get(kdu_name
, None):
5223 scaling_info
["kdu-create"][kdu_name
] = []
5225 kdur
= get_kdur(db_vnfr
, kdu_name
)
5226 if kdur
.get("helm-chart"):
5227 k8s_cluster_type
= "helm-chart-v3"
5228 self
.logger
.debug("kdur: {}".format(kdur
))
5230 kdur
.get("helm-version")
5231 and kdur
.get("helm-version") == "v2"
5233 k8s_cluster_type
= "helm-chart"
5234 raise NotImplementedError
5235 elif kdur
.get("juju-bundle"):
5236 k8s_cluster_type
= "juju-bundle"
5239 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5240 "juju-bundle. Maybe an old NBI version is running".format(
5241 db_vnfr
["member-vnf-index-ref"], kdu_name
5245 max_instance_count
= 10
5246 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
5247 max_instance_count
= kdu_profile
.get(
5248 "max-number-of-instances", 10
5251 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
5252 deployed_kdu
, _
= get_deployed_kdu(
5253 nsr_deployed
, kdu_name
, vnf_index
5255 if deployed_kdu
is None:
5257 "KDU '{}' for vnf '{}' not deployed".format(
5261 kdu_instance
= deployed_kdu
.get("kdu-instance")
5262 instance_num
= await self
.k8scluster_map
[
5264 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5265 kdu_replica_count
= instance_num
+ kdu_delta
.get(
5266 "number-of-instances", 1
5269 # Control if new count is over max and instance_num is less than max.
5270 # Then assign max instance number to kdu replica count
5271 if kdu_replica_count
> max_instance_count
> instance_num
:
5272 kdu_replica_count
= max_instance_count
5273 if kdu_replica_count
> max_instance_count
:
5275 "reached the limit of {} (max-instance-count) "
5276 "scaling-out operations for the "
5277 "scaling-group-descriptor '{}'".format(
5278 instance_num
, scaling_group
5282 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5283 vca_scaling_info
.append(
5285 "osm_kdu_id": kdu_name
,
5286 "member-vnf-index": vnf_index
,
5288 "kdu_index": instance_num
+ x
- 1,
5291 scaling_info
["kdu-create"][kdu_name
].append(
5293 "member-vnf-index": vnf_index
,
5295 "k8s-cluster-type": k8s_cluster_type
,
5296 "resource-name": resource_name
,
5297 "scale": kdu_replica_count
,
5300 elif scaling_type
== "SCALE_IN":
5301 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
5303 scaling_info
["scaling_direction"] = "IN"
5304 scaling_info
["vdu-delete"] = {}
5305 scaling_info
["kdu-delete"] = {}
5307 for delta
in deltas
:
5308 for vdu_delta
in delta
.get("vdu-delta", {}):
5309 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
5310 min_instance_count
= 0
5311 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
5312 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
5313 min_instance_count
= vdu_profile
["min-number-of-instances"]
5315 default_instance_num
= get_number_of_instances(
5316 db_vnfd
, vdu_delta
["id"]
5318 instance_num
= vdu_delta
.get("number-of-instances", 1)
5319 nb_scale_op
-= instance_num
5321 new_instance_count
= nb_scale_op
+ default_instance_num
5323 if new_instance_count
< min_instance_count
< vdu_count
:
5324 instances_number
= min_instance_count
- new_instance_count
5326 instances_number
= instance_num
5328 if new_instance_count
< min_instance_count
:
5330 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5331 "scaling-group-descriptor '{}'".format(
5332 nb_scale_op
, scaling_group
5335 for x
in range(vdu_delta
.get("number-of-instances", 1)):
5336 vca_scaling_info
.append(
5338 "osm_vdu_id": vdu_delta
["id"],
5339 "member-vnf-index": vnf_index
,
5341 "vdu_index": vdu_index
- 1 - x
,
5344 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
5345 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
5346 kdu_profile
= get_kdu_profile(db_vnfd
, kdu_delta
["id"])
5347 kdu_name
= kdu_profile
["kdu-name"]
5348 resource_name
= kdu_profile
["resource-name"]
5350 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
5351 scaling_info
["kdu-delete"][kdu_name
] = []
5353 kdur
= get_kdur(db_vnfr
, kdu_name
)
5354 if kdur
.get("helm-chart"):
5355 k8s_cluster_type
= "helm-chart-v3"
5356 self
.logger
.debug("kdur: {}".format(kdur
))
5358 kdur
.get("helm-version")
5359 and kdur
.get("helm-version") == "v2"
5361 k8s_cluster_type
= "helm-chart"
5362 raise NotImplementedError
5363 elif kdur
.get("juju-bundle"):
5364 k8s_cluster_type
= "juju-bundle"
5367 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5368 "juju-bundle. Maybe an old NBI version is running".format(
5369 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
5373 min_instance_count
= 0
5374 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
5375 min_instance_count
= kdu_profile
["min-number-of-instances"]
5377 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
5378 deployed_kdu
, _
= get_deployed_kdu(
5379 nsr_deployed
, kdu_name
, vnf_index
5381 if deployed_kdu
is None:
5383 "KDU '{}' for vnf '{}' not deployed".format(
5387 kdu_instance
= deployed_kdu
.get("kdu-instance")
5388 instance_num
= await self
.k8scluster_map
[
5390 ].get_scale_count(resource_name
, kdu_instance
, vca_id
=vca_id
)
5391 kdu_replica_count
= instance_num
- kdu_delta
.get(
5392 "number-of-instances", 1
5395 if kdu_replica_count
< min_instance_count
< instance_num
:
5396 kdu_replica_count
= min_instance_count
5397 if kdu_replica_count
< min_instance_count
:
5399 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5400 "scaling-group-descriptor '{}'".format(
5401 instance_num
, scaling_group
5405 for x
in range(kdu_delta
.get("number-of-instances", 1)):
5406 vca_scaling_info
.append(
5408 "osm_kdu_id": kdu_name
,
5409 "member-vnf-index": vnf_index
,
5411 "kdu_index": instance_num
- x
- 1,
5414 scaling_info
["kdu-delete"][kdu_name
].append(
5416 "member-vnf-index": vnf_index
,
5418 "k8s-cluster-type": k8s_cluster_type
,
5419 "resource-name": resource_name
,
5420 "scale": kdu_replica_count
,
5424 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5425 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
5426 if scaling_info
["scaling_direction"] == "IN":
5427 for vdur
in reversed(db_vnfr
["vdur"]):
5428 if vdu_delete
.get(vdur
["vdu-id-ref"]):
5429 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
5430 scaling_info
["vdu"].append(
5432 "name": vdur
.get("name") or vdur
.get("vdu-name"),
5433 "vdu_id": vdur
["vdu-id-ref"],
5437 for interface
in vdur
["interfaces"]:
5438 scaling_info
["vdu"][-1]["interface"].append(
5440 "name": interface
["name"],
5441 "ip_address": interface
["ip-address"],
5442 "mac_address": interface
.get("mac-address"),
5445 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5448 step
= "Executing pre-scale vnf-config-primitive"
5449 if scaling_descriptor
.get("scaling-config-action"):
5450 for scaling_config_action
in scaling_descriptor
[
5451 "scaling-config-action"
5454 scaling_config_action
.get("trigger") == "pre-scale-in"
5455 and scaling_type
== "SCALE_IN"
5457 scaling_config_action
.get("trigger") == "pre-scale-out"
5458 and scaling_type
== "SCALE_OUT"
5460 vnf_config_primitive
= scaling_config_action
[
5461 "vnf-config-primitive-name-ref"
5463 step
= db_nslcmop_update
[
5465 ] = "executing pre-scale scaling-config-action '{}'".format(
5466 vnf_config_primitive
5469 # look for primitive
5470 for config_primitive
in (
5471 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5472 ).get("config-primitive", ()):
5473 if config_primitive
["name"] == vnf_config_primitive
:
5477 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5478 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5479 "primitive".format(scaling_group
, vnf_config_primitive
)
5482 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5483 if db_vnfr
.get("additionalParamsForVnf"):
5484 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5486 scale_process
= "VCA"
5487 db_nsr_update
["config-status"] = "configuring pre-scaling"
5488 primitive_params
= self
._map
_primitive
_params
(
5489 config_primitive
, {}, vnfr_params
5492 # Pre-scale retry check: Check if this sub-operation has been executed before
5493 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5496 vnf_config_primitive
,
5500 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5501 # Skip sub-operation
5502 result
= "COMPLETED"
5503 result_detail
= "Done"
5506 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5507 vnf_config_primitive
, result
, result_detail
5511 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5512 # New sub-operation: Get index of this sub-operation
5514 len(db_nslcmop
.get("_admin", {}).get("operations"))
5519 + "vnf_config_primitive={} New sub-operation".format(
5520 vnf_config_primitive
5524 # retry: Get registered params for this existing sub-operation
5525 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5528 vnf_index
= op
.get("member_vnf_index")
5529 vnf_config_primitive
= op
.get("primitive")
5530 primitive_params
= op
.get("primitive_params")
5533 + "vnf_config_primitive={} Sub-operation retry".format(
5534 vnf_config_primitive
5537 # Execute the primitive, either with new (first-time) or registered (reintent) args
5538 ee_descriptor_id
= config_primitive
.get(
5539 "execution-environment-ref"
5541 primitive_name
= config_primitive
.get(
5542 "execution-environment-primitive", vnf_config_primitive
5544 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5545 nsr_deployed
["VCA"],
5546 member_vnf_index
=vnf_index
,
5548 vdu_count_index
=None,
5549 ee_descriptor_id
=ee_descriptor_id
,
5551 result
, result_detail
= await self
._ns
_execute
_primitive
(
5560 + "vnf_config_primitive={} Done with result {} {}".format(
5561 vnf_config_primitive
, result
, result_detail
5564 # Update operationState = COMPLETED | FAILED
5565 self
._update
_suboperation
_status
(
5566 db_nslcmop
, op_index
, result
, result_detail
5569 if result
== "FAILED":
5570 raise LcmException(result_detail
)
5571 db_nsr_update
["config-status"] = old_config_status
5572 scale_process
= None
5576 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
5579 "_admin.scaling-group.{}.time".format(admin_scale_index
)
5582 # SCALE-IN VCA - BEGIN
5583 if vca_scaling_info
:
5584 step
= db_nslcmop_update
[
5586 ] = "Deleting the execution environments"
5587 scale_process
= "VCA"
5588 for vca_info
in vca_scaling_info
:
5589 if vca_info
["type"] == "delete":
5590 member_vnf_index
= str(vca_info
["member-vnf-index"])
5592 logging_text
+ "vdu info: {}".format(vca_info
)
5594 if vca_info
.get("osm_vdu_id"):
5595 vdu_id
= vca_info
["osm_vdu_id"]
5596 vdu_index
= int(vca_info
["vdu_index"])
5599 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5600 member_vnf_index
, vdu_id
, vdu_index
5604 kdu_id
= vca_info
["osm_kdu_id"]
5607 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5608 member_vnf_index
, kdu_id
, vdu_index
5610 stage
[2] = step
= "Scaling in VCA"
5611 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5612 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
5613 config_update
= db_nsr
["configurationStatus"]
5614 for vca_index
, vca
in enumerate(vca_update
):
5616 (vca
or vca
.get("ee_id"))
5617 and vca
["member-vnf-index"] == member_vnf_index
5618 and vca
["vdu_count_index"] == vdu_index
5620 if vca
.get("vdu_id"):
5621 config_descriptor
= get_configuration(
5622 db_vnfd
, vca
.get("vdu_id")
5624 elif vca
.get("kdu_name"):
5625 config_descriptor
= get_configuration(
5626 db_vnfd
, vca
.get("kdu_name")
5629 config_descriptor
= get_configuration(
5630 db_vnfd
, db_vnfd
["id"]
5632 operation_params
= (
5633 db_nslcmop
.get("operationParams") or {}
5635 exec_terminate_primitives
= not operation_params
.get(
5636 "skip_terminate_primitives"
5637 ) and vca
.get("needed_terminate")
5638 task
= asyncio
.ensure_future(
5647 exec_primitives
=exec_terminate_primitives
,
5651 timeout
=self
.timeout_charm_delete
,
5654 tasks_dict_info
[task
] = "Terminating VCA {}".format(
5657 del vca_update
[vca_index
]
5658 del config_update
[vca_index
]
5659 # wait for pending tasks of terminate primitives
5663 + "Waiting for tasks {}".format(
5664 list(tasks_dict_info
.keys())
5667 error_list
= await self
._wait
_for
_tasks
(
5671 self
.timeout_charm_delete
, self
.timeout_ns_terminate
5676 tasks_dict_info
.clear()
5678 raise LcmException("; ".join(error_list
))
5680 db_vca_and_config_update
= {
5681 "_admin.deployed.VCA": vca_update
,
5682 "configurationStatus": config_update
,
5685 "nsrs", db_nsr
["_id"], db_vca_and_config_update
5687 scale_process
= None
5688 # SCALE-IN VCA - END
5691 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
5692 scale_process
= "RO"
5693 if self
.ro_config
.get("ng"):
5694 await self
._scale
_ng
_ro
(
5695 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
5697 scaling_info
.pop("vdu-create", None)
5698 scaling_info
.pop("vdu-delete", None)
5700 scale_process
= None
5704 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
5705 scale_process
= "KDU"
5706 await self
._scale
_kdu
(
5707 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
5709 scaling_info
.pop("kdu-create", None)
5710 scaling_info
.pop("kdu-delete", None)
5712 scale_process
= None
5716 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5718 # SCALE-UP VCA - BEGIN
5719 if vca_scaling_info
:
5720 step
= db_nslcmop_update
[
5722 ] = "Creating new execution environments"
5723 scale_process
= "VCA"
5724 for vca_info
in vca_scaling_info
:
5725 if vca_info
["type"] == "create":
5726 member_vnf_index
= str(vca_info
["member-vnf-index"])
5728 logging_text
+ "vdu info: {}".format(vca_info
)
5730 vnfd_id
= db_vnfr
["vnfd-ref"]
5731 if vca_info
.get("osm_vdu_id"):
5732 vdu_index
= int(vca_info
["vdu_index"])
5733 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
5734 if db_vnfr
.get("additionalParamsForVnf"):
5735 deploy_params
.update(
5737 db_vnfr
["additionalParamsForVnf"].copy()
5740 descriptor_config
= get_configuration(
5741 db_vnfd
, db_vnfd
["id"]
5743 if descriptor_config
:
5748 logging_text
=logging_text
5749 + "member_vnf_index={} ".format(member_vnf_index
),
5752 nslcmop_id
=nslcmop_id
,
5758 member_vnf_index
=member_vnf_index
,
5759 vdu_index
=vdu_index
,
5761 deploy_params
=deploy_params
,
5762 descriptor_config
=descriptor_config
,
5763 base_folder
=base_folder
,
5764 task_instantiation_info
=tasks_dict_info
,
5767 vdu_id
= vca_info
["osm_vdu_id"]
5768 vdur
= find_in_list(
5769 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
5771 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
5772 if vdur
.get("additionalParams"):
5773 deploy_params_vdu
= parse_yaml_strings(
5774 vdur
["additionalParams"]
5777 deploy_params_vdu
= deploy_params
5778 deploy_params_vdu
["OSM"] = get_osm_params(
5779 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
5781 if descriptor_config
:
5786 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5787 member_vnf_index
, vdu_id
, vdu_index
5789 stage
[2] = step
= "Scaling out VCA"
5790 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
5792 logging_text
=logging_text
5793 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5794 member_vnf_index
, vdu_id
, vdu_index
5798 nslcmop_id
=nslcmop_id
,
5804 member_vnf_index
=member_vnf_index
,
5805 vdu_index
=vdu_index
,
5807 deploy_params
=deploy_params_vdu
,
5808 descriptor_config
=descriptor_config
,
5809 base_folder
=base_folder
,
5810 task_instantiation_info
=tasks_dict_info
,
5814 kdu_name
= vca_info
["osm_kdu_id"]
5815 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
5816 if descriptor_config
:
5818 kdu_index
= int(vca_info
["kdu_index"])
5822 for x
in db_vnfr
["kdur"]
5823 if x
["kdu-name"] == kdu_name
5825 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
5826 if kdur
.get("additionalParams"):
5827 deploy_params_kdu
= parse_yaml_strings(
5828 kdur
["additionalParams"]
5832 logging_text
=logging_text
,
5835 nslcmop_id
=nslcmop_id
,
5841 member_vnf_index
=member_vnf_index
,
5842 vdu_index
=kdu_index
,
5844 deploy_params
=deploy_params_kdu
,
5845 descriptor_config
=descriptor_config
,
5846 base_folder
=base_folder
,
5847 task_instantiation_info
=tasks_dict_info
,
5850 # SCALE-UP VCA - END
5851 scale_process
= None
5854 # execute primitive service POST-SCALING
5855 step
= "Executing post-scale vnf-config-primitive"
5856 if scaling_descriptor
.get("scaling-config-action"):
5857 for scaling_config_action
in scaling_descriptor
[
5858 "scaling-config-action"
5861 scaling_config_action
.get("trigger") == "post-scale-in"
5862 and scaling_type
== "SCALE_IN"
5864 scaling_config_action
.get("trigger") == "post-scale-out"
5865 and scaling_type
== "SCALE_OUT"
5867 vnf_config_primitive
= scaling_config_action
[
5868 "vnf-config-primitive-name-ref"
5870 step
= db_nslcmop_update
[
5872 ] = "executing post-scale scaling-config-action '{}'".format(
5873 vnf_config_primitive
5876 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
5877 if db_vnfr
.get("additionalParamsForVnf"):
5878 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
5880 # look for primitive
5881 for config_primitive
in (
5882 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
5883 ).get("config-primitive", ()):
5884 if config_primitive
["name"] == vnf_config_primitive
:
5888 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5889 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5890 "config-primitive".format(
5891 scaling_group
, vnf_config_primitive
5894 scale_process
= "VCA"
5895 db_nsr_update
["config-status"] = "configuring post-scaling"
5896 primitive_params
= self
._map
_primitive
_params
(
5897 config_primitive
, {}, vnfr_params
5900 # Post-scale retry check: Check if this sub-operation has been executed before
5901 op_index
= self
._check
_or
_add
_scale
_suboperation
(
5904 vnf_config_primitive
,
5908 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
5909 # Skip sub-operation
5910 result
= "COMPLETED"
5911 result_detail
= "Done"
5914 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5915 vnf_config_primitive
, result
, result_detail
5919 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
5920 # New sub-operation: Get index of this sub-operation
5922 len(db_nslcmop
.get("_admin", {}).get("operations"))
5927 + "vnf_config_primitive={} New sub-operation".format(
5928 vnf_config_primitive
5932 # retry: Get registered params for this existing sub-operation
5933 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
5936 vnf_index
= op
.get("member_vnf_index")
5937 vnf_config_primitive
= op
.get("primitive")
5938 primitive_params
= op
.get("primitive_params")
5941 + "vnf_config_primitive={} Sub-operation retry".format(
5942 vnf_config_primitive
5945 # Execute the primitive, either with new (first-time) or registered (reintent) args
5946 ee_descriptor_id
= config_primitive
.get(
5947 "execution-environment-ref"
5949 primitive_name
= config_primitive
.get(
5950 "execution-environment-primitive", vnf_config_primitive
5952 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5953 nsr_deployed
["VCA"],
5954 member_vnf_index
=vnf_index
,
5956 vdu_count_index
=None,
5957 ee_descriptor_id
=ee_descriptor_id
,
5959 result
, result_detail
= await self
._ns
_execute
_primitive
(
5968 + "vnf_config_primitive={} Done with result {} {}".format(
5969 vnf_config_primitive
, result
, result_detail
5972 # Update operationState = COMPLETED | FAILED
5973 self
._update
_suboperation
_status
(
5974 db_nslcmop
, op_index
, result
, result_detail
5977 if result
== "FAILED":
5978 raise LcmException(result_detail
)
5979 db_nsr_update
["config-status"] = old_config_status
5980 scale_process
= None
5985 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5986 db_nsr_update
["operational-status"] = (
5988 if old_operational_status
== "failed"
5989 else old_operational_status
5991 db_nsr_update
["config-status"] = old_config_status
5994 ROclient
.ROClientException
,
5999 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6001 except asyncio
.CancelledError
:
6003 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6005 exc
= "Operation was cancelled"
6006 except Exception as e
:
6007 exc
= traceback
.format_exc()
6008 self
.logger
.critical(
6009 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6013 self
._write
_ns
_status
(
6016 current_operation
="IDLE",
6017 current_operation_id
=None,
6020 stage
[1] = "Waiting for instantiate pending tasks."
6021 self
.logger
.debug(logging_text
+ stage
[1])
6022 exc
= await self
._wait
_for
_tasks
(
6025 self
.timeout_ns_deploy
,
6033 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6034 nslcmop_operation_state
= "FAILED"
6036 db_nsr_update
["operational-status"] = old_operational_status
6037 db_nsr_update
["config-status"] = old_config_status
6038 db_nsr_update
["detailed-status"] = ""
6040 if "VCA" in scale_process
:
6041 db_nsr_update
["config-status"] = "failed"
6042 if "RO" in scale_process
:
6043 db_nsr_update
["operational-status"] = "failed"
6046 ] = "FAILED scaling nslcmop={} {}: {}".format(
6047 nslcmop_id
, step
, exc
6050 error_description_nslcmop
= None
6051 nslcmop_operation_state
= "COMPLETED"
6052 db_nslcmop_update
["detailed-status"] = "Done"
6054 self
._write
_op
_status
(
6057 error_message
=error_description_nslcmop
,
6058 operation_state
=nslcmop_operation_state
,
6059 other_update
=db_nslcmop_update
,
6062 self
._write
_ns
_status
(
6065 current_operation
="IDLE",
6066 current_operation_id
=None,
6067 other_update
=db_nsr_update
,
6070 if nslcmop_operation_state
:
6074 "nslcmop_id": nslcmop_id
,
6075 "operationState": nslcmop_operation_state
,
6077 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6078 except Exception as e
:
6080 logging_text
+ "kafka_write notification Exception {}".format(e
)
6082 self
.logger
.debug(logging_text
+ "Exit")
6083 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6085 async def _scale_kdu(
6086 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6088 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6089 for kdu_name
in _scaling_info
:
6090 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6091 deployed_kdu
, index
= get_deployed_kdu(
6092 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6094 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6095 kdu_instance
= deployed_kdu
["kdu-instance"]
6096 scale
= int(kdu_scaling_info
["scale"])
6097 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6100 "collection": "nsrs",
6101 "filter": {"_id": nsr_id
},
6102 "path": "_admin.deployed.K8s.{}".format(index
),
6105 step
= "scaling application {}".format(
6106 kdu_scaling_info
["resource-name"]
6108 self
.logger
.debug(logging_text
+ step
)
6110 if kdu_scaling_info
["type"] == "delete":
6111 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6114 and kdu_config
.get("terminate-config-primitive")
6115 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6117 terminate_config_primitive_list
= kdu_config
.get(
6118 "terminate-config-primitive"
6120 terminate_config_primitive_list
.sort(
6121 key
=lambda val
: int(val
["seq"])
6125 terminate_config_primitive
6126 ) in terminate_config_primitive_list
:
6127 primitive_params_
= self
._map
_primitive
_params
(
6128 terminate_config_primitive
, {}, {}
6130 step
= "execute terminate config primitive"
6131 self
.logger
.debug(logging_text
+ step
)
6132 await asyncio
.wait_for(
6133 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6134 cluster_uuid
=cluster_uuid
,
6135 kdu_instance
=kdu_instance
,
6136 primitive_name
=terminate_config_primitive
["name"],
6137 params
=primitive_params_
,
6144 await asyncio
.wait_for(
6145 self
.k8scluster_map
[k8s_cluster_type
].scale(
6148 kdu_scaling_info
["resource-name"],
6151 timeout
=self
.timeout_vca_on_error
,
6154 if kdu_scaling_info
["type"] == "create":
6155 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6158 and kdu_config
.get("initial-config-primitive")
6159 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6161 initial_config_primitive_list
= kdu_config
.get(
6162 "initial-config-primitive"
6164 initial_config_primitive_list
.sort(
6165 key
=lambda val
: int(val
["seq"])
6168 for initial_config_primitive
in initial_config_primitive_list
:
6169 primitive_params_
= self
._map
_primitive
_params
(
6170 initial_config_primitive
, {}, {}
6172 step
= "execute initial config primitive"
6173 self
.logger
.debug(logging_text
+ step
)
6174 await asyncio
.wait_for(
6175 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6176 cluster_uuid
=cluster_uuid
,
6177 kdu_instance
=kdu_instance
,
6178 primitive_name
=initial_config_primitive
["name"],
6179 params
=primitive_params_
,
6186 async def _scale_ng_ro(
6187 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
6189 nsr_id
= db_nslcmop
["nsInstanceId"]
6190 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
6193 # read from db: vnfd's for every vnf
6196 # for each vnf in ns, read vnfd
6197 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
6198 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
6199 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
6200 # if we haven't this vnfd, read it from db
6201 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
6203 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
6204 db_vnfds
.append(vnfd
)
6205 n2vc_key
= self
.n2vc
.get_public_key()
6206 n2vc_key_list
= [n2vc_key
]
6209 vdu_scaling_info
.get("vdu-create"),
6210 vdu_scaling_info
.get("vdu-delete"),
6213 # db_vnfr has been updated, update db_vnfrs to use it
6214 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
6215 await self
._instantiate
_ng
_ro
(
6225 start_deploy
=time(),
6226 timeout_ns_deploy
=self
.timeout_ns_deploy
,
6228 if vdu_scaling_info
.get("vdu-delete"):
6230 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
6233 async def add_prometheus_metrics(
6234 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
6236 if not self
.prometheus
:
6238 # look if exist a file called 'prometheus*.j2' and
6239 artifact_content
= self
.fs
.dir_ls(artifact_path
)
6243 for f
in artifact_content
6244 if f
.startswith("prometheus") and f
.endswith(".j2")
6250 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
6254 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
6255 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
6257 vnfr_id
= vnfr_id
.replace("-", "")
6259 "JOB_NAME": vnfr_id
,
6260 "TARGET_IP": target_ip
,
6261 "EXPORTER_POD_IP": host_name
,
6262 "EXPORTER_POD_PORT": host_port
,
6264 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
6265 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6266 for job
in job_list
:
6268 not isinstance(job
.get("job_name"), str)
6269 or vnfr_id
not in job
["job_name"]
6271 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
6272 job
["nsr_id"] = nsr_id
6273 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
6274 if await self
.prometheus
.update(job_dict
):
6275 return list(job_dict
.keys())
6277 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6279 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6281 :param: vim_account_id: VIM Account ID
6283 :return: (cloud_name, cloud_credential)
6285 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6286 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
6288 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
6290 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6292 :param: vim_account_id: VIM Account ID
6294 :return: (cloud_name, cloud_credential)
6296 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
6297 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")