1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from osm_lcm
.data_utils
.wim
import (
108 get_target_wim_attrs
,
109 select_feasible_wim_account
,
112 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
113 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
115 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
116 from osm_lcm
.osm_config
import OsmConfigBuilder
117 from osm_lcm
.prometheus
import parse_job
119 from copy
import copy
, deepcopy
120 from time
import time
121 from uuid
import uuid4
123 from random
import randint
125 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
128 class NsLcm(LcmBase
):
129 timeout_scale_on_error
= (
131 ) # Time for charm from first time at blocked,error status to mark as failed
132 timeout_scale_on_error_outer_factor
= 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
133 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
134 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
135 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
136 timeout_charm_delete
= 10 * 60
137 timeout_primitive
= 30 * 60 # Timeout for primitive execution
138 timeout_primitive_outer_factor
= 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
139 timeout_ns_update
= 30 * 60 # timeout for ns update
140 timeout_progress_primitive
= (
142 ) # timeout for some progress in a primitive execution
143 timeout_migrate
= 1800 # default global timeout for migrating vnfs
144 timeout_operate
= 1800 # default global timeout for migrating vnfs
145 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
146 SUBOPERATION_STATUS_NOT_FOUND
= -1
147 SUBOPERATION_STATUS_NEW
= -2
148 SUBOPERATION_STATUS_SKIP
= -3
149 task_name_deploy_vca
= "Deploying VCA"
151 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
153 Init, Connect to database, filesystem storage, and messaging
154 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
157 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
159 self
.db
= Database().instance
.db
160 self
.fs
= Filesystem().instance
.fs
162 self
.lcm_tasks
= lcm_tasks
163 self
.timeout
= config
["timeout"]
164 self
.ro_config
= config
["ro_config"]
165 self
.ng_ro
= config
["ro_config"].get("ng")
166 self
.vca_config
= config
["VCA"].copy()
168 # create N2VC connector
169 self
.n2vc
= N2VCJujuConnector(
172 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.conn_helm_ee
= LCMHelmConn(
180 vca_config
=self
.vca_config
,
181 on_update_db
=self
._on
_update
_n
2vc
_db
,
184 self
.k8sclusterhelm2
= K8sHelmConnector(
185 kubectl_command
=self
.vca_config
.get("kubectlpath"),
186 helm_command
=self
.vca_config
.get("helmpath"),
193 self
.k8sclusterhelm3
= K8sHelm3Connector(
194 kubectl_command
=self
.vca_config
.get("kubectlpath"),
195 helm_command
=self
.vca_config
.get("helm3path"),
202 self
.k8sclusterjuju
= K8sJujuConnector(
203 kubectl_command
=self
.vca_config
.get("kubectlpath"),
204 juju_command
=self
.vca_config
.get("jujupath"),
207 on_update_db
=self
._on
_update
_k
8s
_db
,
212 self
.k8scluster_map
= {
213 "helm-chart": self
.k8sclusterhelm2
,
214 "helm-chart-v3": self
.k8sclusterhelm3
,
215 "chart": self
.k8sclusterhelm3
,
216 "juju-bundle": self
.k8sclusterjuju
,
217 "juju": self
.k8sclusterjuju
,
221 "lxc_proxy_charm": self
.n2vc
,
222 "native_charm": self
.n2vc
,
223 "k8s_proxy_charm": self
.n2vc
,
224 "helm": self
.conn_helm_ee
,
225 "helm-v3": self
.conn_helm_ee
,
229 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
231 self
.op_status_map
= {
232 "instantiation": self
.RO
.status
,
233 "termination": self
.RO
.status
,
234 "migrate": self
.RO
.status
,
235 "healing": self
.RO
.recreate_status
,
236 "verticalscale": self
.RO
.status
,
237 "start_stop_rebuild": self
.RO
.status
,
241 def increment_ip_mac(ip_mac
, vm_index
=1):
242 if not isinstance(ip_mac
, str):
245 # try with ipv4 look for last dot
246 i
= ip_mac
.rfind(".")
249 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
250 # try with ipv6 or mac look for last colon. Operate in hex
251 i
= ip_mac
.rfind(":")
254 # format in hex, len can be 2 for mac or 4 for ipv6
255 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
256 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
262 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
264 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
267 # TODO filter RO descriptor fields...
271 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
272 db_dict
["deploymentStatus"] = ro_descriptor
273 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
275 except Exception as e
:
277 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
280 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
282 # remove last dot from path (if exists)
283 if path
.endswith("."):
286 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
287 # .format(table, filter, path, updated_data))
290 nsr_id
= filter.get("_id")
292 # read ns record from database
293 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
294 current_ns_status
= nsr
.get("nsState")
296 # get vca status for NS
297 status_dict
= await self
.n2vc
.get_status(
298 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
303 db_dict
["vcaStatus"] = status_dict
305 # update configurationStatus for this VCA
307 vca_index
= int(path
[path
.rfind(".") + 1 :])
310 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
312 vca_status
= vca_list
[vca_index
].get("status")
314 configuration_status_list
= nsr
.get("configurationStatus")
315 config_status
= configuration_status_list
[vca_index
].get("status")
317 if config_status
== "BROKEN" and vca_status
!= "failed":
318 db_dict
["configurationStatus"][vca_index
] = "READY"
319 elif config_status
!= "BROKEN" and vca_status
== "failed":
320 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
321 except Exception as e
:
322 # not update configurationStatus
323 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
325 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
326 # if nsState = 'DEGRADED' check if all is OK
328 if current_ns_status
in ("READY", "DEGRADED"):
329 error_description
= ""
331 if status_dict
.get("machines"):
332 for machine_id
in status_dict
.get("machines"):
333 machine
= status_dict
.get("machines").get(machine_id
)
334 # check machine agent-status
335 if machine
.get("agent-status"):
336 s
= machine
.get("agent-status").get("status")
339 error_description
+= (
340 "machine {} agent-status={} ; ".format(
344 # check machine instance status
345 if machine
.get("instance-status"):
346 s
= machine
.get("instance-status").get("status")
349 error_description
+= (
350 "machine {} instance-status={} ; ".format(
355 if status_dict
.get("applications"):
356 for app_id
in status_dict
.get("applications"):
357 app
= status_dict
.get("applications").get(app_id
)
358 # check application status
359 if app
.get("status"):
360 s
= app
.get("status").get("status")
363 error_description
+= (
364 "application {} status={} ; ".format(app_id
, s
)
367 if error_description
:
368 db_dict
["errorDescription"] = error_description
369 if current_ns_status
== "READY" and is_degraded
:
370 db_dict
["nsState"] = "DEGRADED"
371 if current_ns_status
== "DEGRADED" and not is_degraded
:
372 db_dict
["nsState"] = "READY"
375 self
.update_db_2("nsrs", nsr_id
, db_dict
)
377 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
379 except Exception as e
:
380 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
382 async def _on_update_k8s_db(
383 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
386 Updating vca status in NSR record
387 :param cluster_uuid: UUID of a k8s cluster
388 :param kdu_instance: The unique name of the KDU instance
389 :param filter: To get nsr_id
390 :cluster_type: The cluster type (juju, k8s)
394 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
395 # .format(cluster_uuid, kdu_instance, filter))
397 nsr_id
= filter.get("_id")
399 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
400 cluster_uuid
=cluster_uuid
,
401 kdu_instance
=kdu_instance
,
403 complete_status
=True,
409 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
412 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
416 self
.update_db_2("nsrs", nsr_id
, db_dict
)
417 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
419 except Exception as e
:
420 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
423 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
426 undefined
=StrictUndefined
,
427 autoescape
=select_autoescape(default_for_string
=True, default
=True),
429 template
= env
.from_string(cloud_init_text
)
430 return template
.render(additional_params
or {})
431 except UndefinedError
as e
:
433 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
434 "file, must be provided in the instantiation parameters inside the "
435 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
437 except (TemplateError
, TemplateNotFound
) as e
:
439 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
444 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
445 cloud_init_content
= cloud_init_file
= None
447 if vdu
.get("cloud-init-file"):
448 base_folder
= vnfd
["_admin"]["storage"]
449 if base_folder
["pkg-dir"]:
450 cloud_init_file
= "{}/{}/cloud_init/{}".format(
451 base_folder
["folder"],
452 base_folder
["pkg-dir"],
453 vdu
["cloud-init-file"],
456 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
457 base_folder
["folder"],
458 vdu
["cloud-init-file"],
460 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
461 cloud_init_content
= ci_file
.read()
462 elif vdu
.get("cloud-init"):
463 cloud_init_content
= vdu
["cloud-init"]
465 return cloud_init_content
466 except FsException
as e
:
468 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
469 vnfd
["id"], vdu
["id"], cloud_init_file
, e
473 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
475 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
477 additional_params
= vdur
.get("additionalParams")
478 return parse_yaml_strings(additional_params
)
480 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
482 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
483 :param vnfd: input vnfd
484 :param new_id: overrides vnf id if provided
485 :param additionalParams: Instantiation params for VNFs provided
486 :param nsrId: Id of the NSR
487 :return: copy of vnfd
489 vnfd_RO
= deepcopy(vnfd
)
490 # remove unused by RO configuration, monitoring, scaling and internal keys
491 vnfd_RO
.pop("_id", None)
492 vnfd_RO
.pop("_admin", None)
493 vnfd_RO
.pop("monitoring-param", None)
494 vnfd_RO
.pop("scaling-group-descriptor", None)
495 vnfd_RO
.pop("kdu", None)
496 vnfd_RO
.pop("k8s-cluster", None)
498 vnfd_RO
["id"] = new_id
500 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
501 for vdu
in get_iterable(vnfd_RO
, "vdu"):
502 vdu
.pop("cloud-init-file", None)
503 vdu
.pop("cloud-init", None)
507 def ip_profile_2_RO(ip_profile
):
508 RO_ip_profile
= deepcopy(ip_profile
)
509 if "dns-server" in RO_ip_profile
:
510 if isinstance(RO_ip_profile
["dns-server"], list):
511 RO_ip_profile
["dns-address"] = []
512 for ds
in RO_ip_profile
.pop("dns-server"):
513 RO_ip_profile
["dns-address"].append(ds
["address"])
515 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
516 if RO_ip_profile
.get("ip-version") == "ipv4":
517 RO_ip_profile
["ip-version"] = "IPv4"
518 if RO_ip_profile
.get("ip-version") == "ipv6":
519 RO_ip_profile
["ip-version"] = "IPv6"
520 if "dhcp-params" in RO_ip_profile
:
521 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
524 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
525 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
526 if db_vim
["_admin"]["operationalState"] != "ENABLED":
528 "VIM={} is not available. operationalState={}".format(
529 vim_account
, db_vim
["_admin"]["operationalState"]
532 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
535 def get_ro_wim_id_for_wim_account(self
, wim_account
):
536 if isinstance(wim_account
, str):
537 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
538 if db_wim
["_admin"]["operationalState"] != "ENABLED":
540 "WIM={} is not available. operationalState={}".format(
541 wim_account
, db_wim
["_admin"]["operationalState"]
544 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
549 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
551 db_vdu_push_list
= []
553 db_update
= {"_admin.modified": time()}
555 for vdu_id
, vdu_count
in vdu_create
.items():
559 for vdur
in reversed(db_vnfr
["vdur"])
560 if vdur
["vdu-id-ref"] == vdu_id
565 # Read the template saved in the db:
567 "No vdur in the database. Using the vdur-template to scale"
569 vdur_template
= db_vnfr
.get("vdur-template")
570 if not vdur_template
:
572 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
576 vdur
= vdur_template
[0]
577 # Delete a template from the database after using it
580 {"_id": db_vnfr
["_id"]},
582 pull
={"vdur-template": {"_id": vdur
["_id"]}},
584 for count
in range(vdu_count
):
585 vdur_copy
= deepcopy(vdur
)
586 vdur_copy
["status"] = "BUILD"
587 vdur_copy
["status-detailed"] = None
588 vdur_copy
["ip-address"] = None
589 vdur_copy
["_id"] = str(uuid4())
590 vdur_copy
["count-index"] += count
+ 1
591 vdur_copy
["id"] = "{}-{}".format(
592 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
594 vdur_copy
.pop("vim_info", None)
595 for iface
in vdur_copy
["interfaces"]:
596 if iface
.get("fixed-ip"):
597 iface
["ip-address"] = self
.increment_ip_mac(
598 iface
["ip-address"], count
+ 1
601 iface
.pop("ip-address", None)
602 if iface
.get("fixed-mac"):
603 iface
["mac-address"] = self
.increment_ip_mac(
604 iface
["mac-address"], count
+ 1
607 iface
.pop("mac-address", None)
611 ) # only first vdu can be managment of vnf
612 db_vdu_push_list
.append(vdur_copy
)
613 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
615 if len(db_vnfr
["vdur"]) == 1:
616 # The scale will move to 0 instances
618 "Scaling to 0 !, creating the template with the last vdur"
620 template_vdur
= [db_vnfr
["vdur"][0]]
621 for vdu_id
, vdu_count
in vdu_delete
.items():
623 indexes_to_delete
= [
625 for iv
in enumerate(db_vnfr
["vdur"])
626 if iv
[1]["vdu-id-ref"] == vdu_id
630 "vdur.{}.status".format(i
): "DELETING"
631 for i
in indexes_to_delete
[-vdu_count
:]
635 # it must be deleted one by one because common.db does not allow otherwise
638 for v
in reversed(db_vnfr
["vdur"])
639 if v
["vdu-id-ref"] == vdu_id
641 for vdu
in vdus_to_delete
[:vdu_count
]:
644 {"_id": db_vnfr
["_id"]},
646 pull
={"vdur": {"_id": vdu
["_id"]}},
650 db_push
["vdur"] = db_vdu_push_list
652 db_push
["vdur-template"] = template_vdur
655 db_vnfr
["vdur-template"] = template_vdur
656 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
657 # modify passed dictionary db_vnfr
658 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
659 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
661 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
663 Updates database nsr with the RO info for the created vld
664 :param ns_update_nsr: dictionary to be filled with the updated info
665 :param db_nsr: content of db_nsr. This is also modified
666 :param nsr_desc_RO: nsr descriptor from RO
667 :return: Nothing, LcmException is raised on errors
670 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
671 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
672 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
674 vld
["vim-id"] = net_RO
.get("vim_net_id")
675 vld
["name"] = net_RO
.get("vim_name")
676 vld
["status"] = net_RO
.get("status")
677 vld
["status-detailed"] = net_RO
.get("error_msg")
678 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
682 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
685 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
687 for db_vnfr
in db_vnfrs
.values():
688 vnfr_update
= {"status": "ERROR"}
689 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
690 if "status" not in vdur
:
691 vdur
["status"] = "ERROR"
692 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
694 vdur
["status-detailed"] = str(error_text
)
696 "vdur.{}.status-detailed".format(vdu_index
)
698 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
699 except DbException
as e
:
700 self
.logger
.error("Cannot update vnf. {}".format(e
))
702 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
704 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
705 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
706 :param nsr_desc_RO: nsr descriptor from RO
707 :return: Nothing, LcmException is raised on errors
709 for vnf_index
, db_vnfr
in db_vnfrs
.items():
710 for vnf_RO
in nsr_desc_RO
["vnfs"]:
711 if vnf_RO
["member_vnf_index"] != vnf_index
:
714 if vnf_RO
.get("ip_address"):
715 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
718 elif not db_vnfr
.get("ip-address"):
719 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
720 raise LcmExceptionNoMgmtIP(
721 "ns member_vnf_index '{}' has no IP address".format(
726 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
727 vdur_RO_count_index
= 0
728 if vdur
.get("pdu-type"):
730 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
731 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
733 if vdur
["count-index"] != vdur_RO_count_index
:
734 vdur_RO_count_index
+= 1
736 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
737 if vdur_RO
.get("ip_address"):
738 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
740 vdur
["ip-address"] = None
741 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
742 vdur
["name"] = vdur_RO
.get("vim_name")
743 vdur
["status"] = vdur_RO
.get("status")
744 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
745 for ifacer
in get_iterable(vdur
, "interfaces"):
746 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
747 if ifacer
["name"] == interface_RO
.get("internal_name"):
748 ifacer
["ip-address"] = interface_RO
.get(
751 ifacer
["mac-address"] = interface_RO
.get(
757 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
758 "from VIM info".format(
759 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
762 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
766 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
768 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
772 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
773 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
774 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
776 vld
["vim-id"] = net_RO
.get("vim_net_id")
777 vld
["name"] = net_RO
.get("vim_name")
778 vld
["status"] = net_RO
.get("status")
779 vld
["status-detailed"] = net_RO
.get("error_msg")
780 vnfr_update
["vld.{}".format(vld_index
)] = vld
784 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
789 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
794 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
799 def _get_ns_config_info(self
, nsr_id
):
801 Generates a mapping between vnf,vdu elements and the N2VC id
802 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
803 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
804 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
805 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
807 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
808 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
810 ns_config_info
= {"osm-config-mapping": mapping
}
811 for vca
in vca_deployed_list
:
812 if not vca
["member-vnf-index"]:
814 if not vca
["vdu_id"]:
815 mapping
[vca
["member-vnf-index"]] = vca
["application"]
819 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
821 ] = vca
["application"]
822 return ns_config_info
824 async def _instantiate_ng_ro(
841 def get_vim_account(vim_account_id
):
843 if vim_account_id
in db_vims
:
844 return db_vims
[vim_account_id
]
845 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
846 db_vims
[vim_account_id
] = db_vim
849 # modify target_vld info with instantiation parameters
850 def parse_vld_instantiation_params(
851 target_vim
, target_vld
, vld_params
, target_sdn
853 if vld_params
.get("ip-profile"):
854 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
857 if vld_params
.get("provider-network"):
858 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
861 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
862 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
866 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
867 # if wim_account_id is specified in vld_params, validate if it is feasible.
868 wim_account_id
, db_wim
= select_feasible_wim_account(
869 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
873 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
874 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
875 # update vld_params with correct WIM account Id
876 vld_params
["wimAccountId"] = wim_account_id
878 target_wim
= "wim:{}".format(wim_account_id
)
879 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
880 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
881 if len(sdn_ports
) > 0:
882 target_vld
["vim_info"][target_wim
] = target_wim_attrs
883 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
886 "Target VLD with WIM data: {:s}".format(str(target_vld
))
889 for param
in ("vim-network-name", "vim-network-id"):
890 if vld_params
.get(param
):
891 if isinstance(vld_params
[param
], dict):
892 for vim
, vim_net
in vld_params
[param
].items():
893 other_target_vim
= "vim:" + vim
895 target_vld
["vim_info"],
896 (other_target_vim
, param
.replace("-", "_")),
899 else: # isinstance str
900 target_vld
["vim_info"][target_vim
][
901 param
.replace("-", "_")
902 ] = vld_params
[param
]
903 if vld_params
.get("common_id"):
904 target_vld
["common_id"] = vld_params
.get("common_id")
906 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
907 def update_ns_vld_target(target
, ns_params
):
908 for vnf_params
in ns_params
.get("vnf", ()):
909 if vnf_params
.get("vimAccountId"):
913 for vnfr
in db_vnfrs
.values()
914 if vnf_params
["member-vnf-index"]
915 == vnfr
["member-vnf-index-ref"]
919 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
922 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
923 target_vld
= find_in_list(
924 get_iterable(vdur
, "interfaces"),
925 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
928 vld_params
= find_in_list(
929 get_iterable(ns_params
, "vld"),
930 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
934 if vnf_params
.get("vimAccountId") not in a_vld
.get(
937 target_vim_network_list
= [
938 v
for _
, v
in a_vld
.get("vim_info").items()
940 target_vim_network_name
= next(
942 item
.get("vim_network_name", "")
943 for item
in target_vim_network_list
948 target
["ns"]["vld"][a_index
].get("vim_info").update(
950 "vim:{}".format(vnf_params
["vimAccountId"]): {
951 "vim_network_name": target_vim_network_name
,
957 for param
in ("vim-network-name", "vim-network-id"):
958 if vld_params
.get(param
) and isinstance(
959 vld_params
[param
], dict
961 for vim
, vim_net
in vld_params
[
964 other_target_vim
= "vim:" + vim
966 target
["ns"]["vld"][a_index
].get(
971 param
.replace("-", "_"),
976 nslcmop_id
= db_nslcmop
["_id"]
978 "name": db_nsr
["name"],
981 "image": deepcopy(db_nsr
["image"]),
982 "flavor": deepcopy(db_nsr
["flavor"]),
983 "action_id": nslcmop_id
,
984 "cloud_init_content": {},
986 for image
in target
["image"]:
987 image
["vim_info"] = {}
988 for flavor
in target
["flavor"]:
989 flavor
["vim_info"] = {}
990 if db_nsr
.get("affinity-or-anti-affinity-group"):
991 target
["affinity-or-anti-affinity-group"] = deepcopy(
992 db_nsr
["affinity-or-anti-affinity-group"]
994 for affinity_or_anti_affinity_group
in target
[
995 "affinity-or-anti-affinity-group"
997 affinity_or_anti_affinity_group
["vim_info"] = {}
999 if db_nslcmop
.get("lcmOperationType") != "instantiate":
1000 # get parameters of instantiation:
1001 db_nslcmop_instantiate
= self
.db
.get_list(
1004 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1005 "lcmOperationType": "instantiate",
1008 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1010 ns_params
= db_nslcmop
.get("operationParams")
1011 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1012 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1015 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1016 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1019 "name": vld
["name"],
1020 "mgmt-network": vld
.get("mgmt-network", False),
1021 "type": vld
.get("type"),
1024 "vim_network_name": vld
.get("vim-network-name"),
1025 "vim_account_id": ns_params
["vimAccountId"],
1029 # check if this network needs SDN assist
1030 if vld
.get("pci-interfaces"):
1031 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1032 sdnc_id
= db_vim
["config"].get("sdn-controller")
1034 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1035 target_sdn
= "sdn:{}".format(sdnc_id
)
1036 target_vld
["vim_info"][target_sdn
] = {
1038 "target_vim": target_vim
,
1040 "type": vld
.get("type"),
1043 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1044 for nsd_vnf_profile
in nsd_vnf_profiles
:
1045 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1046 if cp
["virtual-link-profile-id"] == vld
["id"]:
1048 "member_vnf:{}.{}".format(
1049 cp
["constituent-cpd-id"][0][
1050 "constituent-base-element-id"
1052 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1054 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1056 # check at nsd descriptor, if there is an ip-profile
1058 nsd_vlp
= find_in_list(
1059 get_virtual_link_profiles(nsd
),
1060 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1065 and nsd_vlp
.get("virtual-link-protocol-data")
1066 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1068 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1071 ip_profile_dest_data
= {}
1072 if "ip-version" in ip_profile_source_data
:
1073 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1076 if "cidr" in ip_profile_source_data
:
1077 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1080 if "gateway-ip" in ip_profile_source_data
:
1081 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1084 if "dhcp-enabled" in ip_profile_source_data
:
1085 ip_profile_dest_data
["dhcp-params"] = {
1086 "enabled": ip_profile_source_data
["dhcp-enabled"]
1088 vld_params
["ip-profile"] = ip_profile_dest_data
1090 # update vld_params with instantiation params
1091 vld_instantiation_params
= find_in_list(
1092 get_iterable(ns_params
, "vld"),
1093 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1095 if vld_instantiation_params
:
1096 vld_params
.update(vld_instantiation_params
)
1097 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1098 target
["ns"]["vld"].append(target_vld
)
1099 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1100 update_ns_vld_target(target
, ns_params
)
1102 for vnfr
in db_vnfrs
.values():
1103 vnfd
= find_in_list(
1104 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1106 vnf_params
= find_in_list(
1107 get_iterable(ns_params
, "vnf"),
1108 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1110 target_vnf
= deepcopy(vnfr
)
1111 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1112 for vld
in target_vnf
.get("vld", ()):
1113 # check if connected to a ns.vld, to fill target'
1114 vnf_cp
= find_in_list(
1115 vnfd
.get("int-virtual-link-desc", ()),
1116 lambda cpd
: cpd
.get("id") == vld
["id"],
1119 ns_cp
= "member_vnf:{}.{}".format(
1120 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1122 if cp2target
.get(ns_cp
):
1123 vld
["target"] = cp2target
[ns_cp
]
1126 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1128 # check if this network needs SDN assist
1130 if vld
.get("pci-interfaces"):
1131 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1132 sdnc_id
= db_vim
["config"].get("sdn-controller")
1134 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1135 target_sdn
= "sdn:{}".format(sdnc_id
)
1136 vld
["vim_info"][target_sdn
] = {
1138 "target_vim": target_vim
,
1140 "type": vld
.get("type"),
1143 # check at vnfd descriptor, if there is an ip-profile
1145 vnfd_vlp
= find_in_list(
1146 get_virtual_link_profiles(vnfd
),
1147 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1151 and vnfd_vlp
.get("virtual-link-protocol-data")
1152 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1154 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1157 ip_profile_dest_data
= {}
1158 if "ip-version" in ip_profile_source_data
:
1159 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1162 if "cidr" in ip_profile_source_data
:
1163 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1166 if "gateway-ip" in ip_profile_source_data
:
1167 ip_profile_dest_data
[
1169 ] = ip_profile_source_data
["gateway-ip"]
1170 if "dhcp-enabled" in ip_profile_source_data
:
1171 ip_profile_dest_data
["dhcp-params"] = {
1172 "enabled": ip_profile_source_data
["dhcp-enabled"]
1175 vld_params
["ip-profile"] = ip_profile_dest_data
1176 # update vld_params with instantiation params
1178 vld_instantiation_params
= find_in_list(
1179 get_iterable(vnf_params
, "internal-vld"),
1180 lambda i_vld
: i_vld
["name"] == vld
["id"],
1182 if vld_instantiation_params
:
1183 vld_params
.update(vld_instantiation_params
)
1184 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1187 for vdur
in target_vnf
.get("vdur", ()):
1188 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1189 continue # This vdu must not be created
1190 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1192 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1195 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1196 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1199 and vdu_configuration
.get("config-access")
1200 and vdu_configuration
.get("config-access").get("ssh-access")
1202 vdur
["ssh-keys"] = ssh_keys_all
1203 vdur
["ssh-access-required"] = vdu_configuration
[
1205 ]["ssh-access"]["required"]
1208 and vnf_configuration
.get("config-access")
1209 and vnf_configuration
.get("config-access").get("ssh-access")
1210 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1212 vdur
["ssh-keys"] = ssh_keys_all
1213 vdur
["ssh-access-required"] = vnf_configuration
[
1215 ]["ssh-access"]["required"]
1216 elif ssh_keys_instantiation
and find_in_list(
1217 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1219 vdur
["ssh-keys"] = ssh_keys_instantiation
1221 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1223 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1225 if vdud
.get("cloud-init-file"):
1226 vdur
["cloud-init"] = "{}:file:{}".format(
1227 vnfd
["_id"], vdud
.get("cloud-init-file")
1229 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1230 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1231 base_folder
= vnfd
["_admin"]["storage"]
1232 if base_folder
["pkg-dir"]:
1233 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1234 base_folder
["folder"],
1235 base_folder
["pkg-dir"],
1236 vdud
.get("cloud-init-file"),
1239 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1240 base_folder
["folder"],
1241 vdud
.get("cloud-init-file"),
1243 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1244 target
["cloud_init_content"][
1247 elif vdud
.get("cloud-init"):
1248 vdur
["cloud-init"] = "{}:vdu:{}".format(
1249 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1251 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1252 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1255 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1256 deploy_params_vdu
= self
._format
_additional
_params
(
1257 vdur
.get("additionalParams") or {}
1259 deploy_params_vdu
["OSM"] = get_osm_params(
1260 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1262 vdur
["additionalParams"] = deploy_params_vdu
1265 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1266 if target_vim
not in ns_flavor
["vim_info"]:
1267 ns_flavor
["vim_info"][target_vim
] = {}
1270 # in case alternative images are provided we must check if they should be applied
1271 # for the vim_type, modify the vim_type taking into account
1272 ns_image_id
= int(vdur
["ns-image-id"])
1273 if vdur
.get("alt-image-ids"):
1274 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1275 vim_type
= db_vim
["vim_type"]
1276 for alt_image_id
in vdur
.get("alt-image-ids"):
1277 ns_alt_image
= target
["image"][int(alt_image_id
)]
1278 if vim_type
== ns_alt_image
.get("vim-type"):
1279 # must use alternative image
1281 "use alternative image id: {}".format(alt_image_id
)
1283 ns_image_id
= alt_image_id
1284 vdur
["ns-image-id"] = ns_image_id
1286 ns_image
= target
["image"][int(ns_image_id
)]
1287 if target_vim
not in ns_image
["vim_info"]:
1288 ns_image
["vim_info"][target_vim
] = {}
1291 if vdur
.get("affinity-or-anti-affinity-group-id"):
1292 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1293 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1294 if target_vim
not in ns_ags
["vim_info"]:
1295 ns_ags
["vim_info"][target_vim
] = {}
1297 vdur
["vim_info"] = {target_vim
: {}}
1298 # instantiation parameters
1300 vdu_instantiation_params
= find_in_list(
1301 get_iterable(vnf_params
, "vdu"),
1302 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1304 if vdu_instantiation_params
:
1305 # Parse the vdu_volumes from the instantiation params
1306 vdu_volumes
= get_volumes_from_instantiation_params(
1307 vdu_instantiation_params
, vdud
1309 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1310 vdur_list
.append(vdur
)
1311 target_vnf
["vdur"] = vdur_list
1312 target
["vnf"].append(target_vnf
)
1314 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1315 desc
= await self
.RO
.deploy(nsr_id
, target
)
1316 self
.logger
.debug("RO return > {}".format(desc
))
1317 action_id
= desc
["action_id"]
1318 await self
._wait
_ng
_ro
(
1325 operation
="instantiation",
1330 "_admin.deployed.RO.operational-status": "running",
1331 "detailed-status": " ".join(stage
),
1333 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1334 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1335 self
._write
_op
_status
(nslcmop_id
, stage
)
1337 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1341 async def _wait_ng_ro(
1351 detailed_status_old
= None
1353 start_time
= start_time
or time()
1354 while time() <= start_time
+ timeout
:
1355 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1356 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1357 if desc_status
["status"] == "FAILED":
1358 raise NgRoException(desc_status
["details"])
1359 elif desc_status
["status"] == "BUILD":
1361 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1362 elif desc_status
["status"] == "DONE":
1364 stage
[2] = "Deployed at VIM"
1367 assert False, "ROclient.check_ns_status returns unknown {}".format(
1368 desc_status
["status"]
1370 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1371 detailed_status_old
= stage
[2]
1372 db_nsr_update
["detailed-status"] = " ".join(stage
)
1373 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1374 self
._write
_op
_status
(nslcmop_id
, stage
)
1375 await asyncio
.sleep(15, loop
=self
.loop
)
1376 else: # timeout_ns_deploy
1377 raise NgRoException("Timeout waiting ns to deploy")
1379 async def _terminate_ng_ro(
1380 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1385 start_deploy
= time()
1392 "action_id": nslcmop_id
,
1394 desc
= await self
.RO
.deploy(nsr_id
, target
)
1395 action_id
= desc
["action_id"]
1396 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1397 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1400 + "ns terminate action at RO. action_id={}".format(action_id
)
1404 delete_timeout
= 20 * 60 # 20 minutes
1405 await self
._wait
_ng
_ro
(
1412 operation
="termination",
1415 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1416 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1418 await self
.RO
.delete(nsr_id
)
1419 except Exception as e
:
1420 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1421 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1422 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1423 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1425 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1427 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1428 failed_detail
.append("delete conflict: {}".format(e
))
1431 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1434 failed_detail
.append("delete error: {}".format(e
))
1437 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1441 stage
[2] = "Error deleting from VIM"
1443 stage
[2] = "Deleted from VIM"
1444 db_nsr_update
["detailed-status"] = " ".join(stage
)
1445 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1446 self
._write
_op
_status
(nslcmop_id
, stage
)
1449 raise LcmException("; ".join(failed_detail
))
1452 async def instantiate_RO(
1466 :param logging_text: preffix text to use at logging
1467 :param nsr_id: nsr identity
1468 :param nsd: database content of ns descriptor
1469 :param db_nsr: database content of ns record
1470 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1472 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1473 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1474 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1475 :return: None or exception
1478 start_deploy
= time()
1479 ns_params
= db_nslcmop
.get("operationParams")
1480 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1481 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1483 timeout_ns_deploy
= self
.timeout
.get(
1484 "ns_deploy", self
.timeout_ns_deploy
1487 # Check for and optionally request placement optimization. Database will be updated if placement activated
1488 stage
[2] = "Waiting for Placement."
1489 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1490 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1491 for vnfr
in db_vnfrs
.values():
1492 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1495 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1497 return await self
._instantiate
_ng
_ro
(
1510 except Exception as e
:
1511 stage
[2] = "ERROR deploying at VIM"
1512 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1514 "Error deploying at VIM {}".format(e
),
1515 exc_info
=not isinstance(
1518 ROclient
.ROClientException
,
1527 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1529 Wait for kdu to be up, get ip address
1530 :param logging_text: prefix use for logging
1534 :return: IP address, K8s services
1537 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1540 while nb_tries
< 360:
1541 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1545 for x
in get_iterable(db_vnfr
, "kdur")
1546 if x
.get("kdu-name") == kdu_name
1552 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1554 if kdur
.get("status"):
1555 if kdur
["status"] in ("READY", "ENABLED"):
1556 return kdur
.get("ip-address"), kdur
.get("services")
1559 "target KDU={} is in error state".format(kdu_name
)
1562 await asyncio
.sleep(10, loop
=self
.loop
)
1564 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1566 async def wait_vm_up_insert_key_ro(
1567 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1570 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1571 :param logging_text: prefix use for logging
1576 :param pub_key: public ssh key to inject, None to skip
1577 :param user: user to apply the public ssh key
1581 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1585 target_vdu_id
= None
1591 if ro_retries
>= 360: # 1 hour
1593 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1596 await asyncio
.sleep(10, loop
=self
.loop
)
1599 if not target_vdu_id
:
1600 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1602 if not vdu_id
: # for the VNF case
1603 if db_vnfr
.get("status") == "ERROR":
1605 "Cannot inject ssh-key because target VNF is in error state"
1607 ip_address
= db_vnfr
.get("ip-address")
1613 for x
in get_iterable(db_vnfr
, "vdur")
1614 if x
.get("ip-address") == ip_address
1622 for x
in get_iterable(db_vnfr
, "vdur")
1623 if x
.get("vdu-id-ref") == vdu_id
1624 and x
.get("count-index") == vdu_index
1630 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1631 ): # If only one, this should be the target vdu
1632 vdur
= db_vnfr
["vdur"][0]
1635 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1636 vnfr_id
, vdu_id
, vdu_index
1639 # New generation RO stores information at "vim_info"
1642 if vdur
.get("vim_info"):
1644 t
for t
in vdur
["vim_info"]
1645 ) # there should be only one key
1646 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1648 vdur
.get("pdu-type")
1649 or vdur
.get("status") == "ACTIVE"
1650 or ng_ro_status
== "ACTIVE"
1652 ip_address
= vdur
.get("ip-address")
1655 target_vdu_id
= vdur
["vdu-id-ref"]
1656 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1658 "Cannot inject ssh-key because target VM is in error state"
1661 if not target_vdu_id
:
1664 # inject public key into machine
1665 if pub_key
and user
:
1666 self
.logger
.debug(logging_text
+ "Inserting RO key")
1667 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1668 if vdur
.get("pdu-type"):
1669 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1672 ro_vm_id
= "{}-{}".format(
1673 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1674 ) # TODO add vdu_index
1678 "action": "inject_ssh_key",
1682 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1684 desc
= await self
.RO
.deploy(nsr_id
, target
)
1685 action_id
= desc
["action_id"]
1686 await self
._wait
_ng
_ro
(
1687 nsr_id
, action_id
, timeout
=600, operation
="instantiation"
1691 # wait until NS is deployed at RO
1693 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1694 ro_nsr_id
= deep_get(
1695 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1699 result_dict
= await self
.RO
.create_action(
1701 item_id_name
=ro_nsr_id
,
1703 "add_public_key": pub_key
,
1708 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1709 if not result_dict
or not isinstance(result_dict
, dict):
1711 "Unknown response from RO when injecting key"
1713 for result
in result_dict
.values():
1714 if result
.get("vim_result") == 200:
1717 raise ROclient
.ROClientException(
1718 "error injecting key: {}".format(
1719 result
.get("description")
1723 except NgRoException
as e
:
1725 "Reaching max tries injecting key. Error: {}".format(e
)
1727 except ROclient
.ROClientException
as e
:
1731 + "error injecting key: {}. Retrying until {} seconds".format(
1738 "Reaching max tries injecting key. Error: {}".format(e
)
1745 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1747 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1749 my_vca
= vca_deployed_list
[vca_index
]
1750 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1751 # vdu or kdu: no dependencies
1755 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1756 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1757 configuration_status_list
= db_nsr
["configurationStatus"]
1758 for index
, vca_deployed
in enumerate(configuration_status_list
):
1759 if index
== vca_index
:
1762 if not my_vca
.get("member-vnf-index") or (
1763 vca_deployed
.get("member-vnf-index")
1764 == my_vca
.get("member-vnf-index")
1766 internal_status
= configuration_status_list
[index
].get("status")
1767 if internal_status
== "READY":
1769 elif internal_status
== "BROKEN":
1771 "Configuration aborted because dependent charm/s has failed"
1776 # no dependencies, return
1778 await asyncio
.sleep(10)
1781 raise LcmException("Configuration aborted because dependent charm/s timeout")
1783 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1786 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1788 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1789 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1792 async def instantiate_N2VC(
1809 ee_config_descriptor
,
1811 nsr_id
= db_nsr
["_id"]
1812 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1813 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1814 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1815 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1817 "collection": "nsrs",
1818 "filter": {"_id": nsr_id
},
1819 "path": db_update_entry
,
1825 element_under_configuration
= nsr_id
1829 vnfr_id
= db_vnfr
["_id"]
1830 osm_config
["osm"]["vnf_id"] = vnfr_id
1832 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1834 if vca_type
== "native_charm":
1837 index_number
= vdu_index
or 0
1840 element_type
= "VNF"
1841 element_under_configuration
= vnfr_id
1842 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1844 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1845 element_type
= "VDU"
1846 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1847 osm_config
["osm"]["vdu_id"] = vdu_id
1849 namespace
+= ".{}".format(kdu_name
)
1850 element_type
= "KDU"
1851 element_under_configuration
= kdu_name
1852 osm_config
["osm"]["kdu_name"] = kdu_name
1855 if base_folder
["pkg-dir"]:
1856 artifact_path
= "{}/{}/{}/{}".format(
1857 base_folder
["folder"],
1858 base_folder
["pkg-dir"],
1861 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1866 artifact_path
= "{}/Scripts/{}/{}/".format(
1867 base_folder
["folder"],
1870 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1875 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1877 # get initial_config_primitive_list that applies to this element
1878 initial_config_primitive_list
= config_descriptor
.get(
1879 "initial-config-primitive"
1883 "Initial config primitive list > {}".format(
1884 initial_config_primitive_list
1888 # add config if not present for NS charm
1889 ee_descriptor_id
= ee_config_descriptor
.get("id")
1890 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1891 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1892 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1896 "Initial config primitive list #2 > {}".format(
1897 initial_config_primitive_list
1900 # n2vc_redesign STEP 3.1
1901 # find old ee_id if exists
1902 ee_id
= vca_deployed
.get("ee_id")
1904 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1905 # create or register execution environment in VCA
1906 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1908 self
._write
_configuration
_status
(
1910 vca_index
=vca_index
,
1912 element_under_configuration
=element_under_configuration
,
1913 element_type
=element_type
,
1916 step
= "create execution environment"
1917 self
.logger
.debug(logging_text
+ step
)
1921 if vca_type
== "k8s_proxy_charm":
1922 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1923 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1924 namespace
=namespace
,
1925 artifact_path
=artifact_path
,
1929 elif vca_type
== "helm" or vca_type
== "helm-v3":
1930 ee_id
, credentials
= await self
.vca_map
[
1932 ].create_execution_environment(
1933 namespace
=namespace
,
1937 artifact_path
=artifact_path
,
1938 chart_model
=vca_name
,
1942 ee_id
, credentials
= await self
.vca_map
[
1944 ].create_execution_environment(
1945 namespace
=namespace
,
1951 elif vca_type
== "native_charm":
1952 step
= "Waiting to VM being up and getting IP address"
1953 self
.logger
.debug(logging_text
+ step
)
1954 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1963 credentials
= {"hostname": rw_mgmt_ip
}
1965 username
= deep_get(
1966 config_descriptor
, ("config-access", "ssh-access", "default-user")
1968 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1969 # merged. Meanwhile let's get username from initial-config-primitive
1970 if not username
and initial_config_primitive_list
:
1971 for config_primitive
in initial_config_primitive_list
:
1972 for param
in config_primitive
.get("parameter", ()):
1973 if param
["name"] == "ssh-username":
1974 username
= param
["value"]
1978 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1979 "'config-access.ssh-access.default-user'"
1981 credentials
["username"] = username
1982 # n2vc_redesign STEP 3.2
1984 self
._write
_configuration
_status
(
1986 vca_index
=vca_index
,
1987 status
="REGISTERING",
1988 element_under_configuration
=element_under_configuration
,
1989 element_type
=element_type
,
1992 step
= "register execution environment {}".format(credentials
)
1993 self
.logger
.debug(logging_text
+ step
)
1994 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1995 credentials
=credentials
,
1996 namespace
=namespace
,
2001 # for compatibility with MON/POL modules, the need model and application name at database
2002 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
2003 ee_id_parts
= ee_id
.split(".")
2004 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
2005 if len(ee_id_parts
) >= 2:
2006 model_name
= ee_id_parts
[0]
2007 application_name
= ee_id_parts
[1]
2008 db_nsr_update
[db_update_entry
+ "model"] = model_name
2009 db_nsr_update
[db_update_entry
+ "application"] = application_name
2011 # n2vc_redesign STEP 3.3
2012 step
= "Install configuration Software"
2014 self
._write
_configuration
_status
(
2016 vca_index
=vca_index
,
2017 status
="INSTALLING SW",
2018 element_under_configuration
=element_under_configuration
,
2019 element_type
=element_type
,
2020 other_update
=db_nsr_update
,
2023 # TODO check if already done
2024 self
.logger
.debug(logging_text
+ step
)
2026 if vca_type
== "native_charm":
2027 config_primitive
= next(
2028 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2031 if config_primitive
:
2032 config
= self
._map
_primitive
_params
(
2033 config_primitive
, {}, deploy_params
2036 if vca_type
== "lxc_proxy_charm":
2037 if element_type
== "NS":
2038 num_units
= db_nsr
.get("config-units") or 1
2039 elif element_type
== "VNF":
2040 num_units
= db_vnfr
.get("config-units") or 1
2041 elif element_type
== "VDU":
2042 for v
in db_vnfr
["vdur"]:
2043 if vdu_id
== v
["vdu-id-ref"]:
2044 num_units
= v
.get("config-units") or 1
2046 if vca_type
!= "k8s_proxy_charm":
2047 await self
.vca_map
[vca_type
].install_configuration_sw(
2049 artifact_path
=artifact_path
,
2052 num_units
=num_units
,
2057 # write in db flag of configuration_sw already installed
2059 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2062 # add relations for this VCA (wait for other peers related with this VCA)
2063 await self
._add
_vca
_relations
(
2064 logging_text
=logging_text
,
2067 vca_index
=vca_index
,
2070 # if SSH access is required, then get execution environment SSH public
2071 # if native charm we have waited already to VM be UP
2072 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2075 # self.logger.debug("get ssh key block")
2077 config_descriptor
, ("config-access", "ssh-access", "required")
2079 # self.logger.debug("ssh key needed")
2080 # Needed to inject a ssh key
2083 ("config-access", "ssh-access", "default-user"),
2085 step
= "Install configuration Software, getting public ssh key"
2086 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2087 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2090 step
= "Insert public key into VM user={} ssh_key={}".format(
2094 # self.logger.debug("no need to get ssh key")
2095 step
= "Waiting to VM being up and getting IP address"
2096 self
.logger
.debug(logging_text
+ step
)
2098 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2101 # n2vc_redesign STEP 5.1
2102 # wait for RO (ip-address) Insert pub_key into VM
2105 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2106 logging_text
, nsr_id
, vnfr_id
, kdu_name
2108 vnfd
= self
.db
.get_one(
2110 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2112 kdu
= get_kdu(vnfd
, kdu_name
)
2114 service
["name"] for service
in get_kdu_services(kdu
)
2116 exposed_services
= []
2117 for service
in services
:
2118 if any(s
in service
["name"] for s
in kdu_services
):
2119 exposed_services
.append(service
)
2120 await self
.vca_map
[vca_type
].exec_primitive(
2122 primitive_name
="config",
2124 "osm-config": json
.dumps(
2126 k8s
={"services": exposed_services
}
2133 # This verification is needed in order to avoid trying to add a public key
2134 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2135 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2136 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2138 elif db_vnfr
.get("vdur"):
2139 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2149 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2151 # store rw_mgmt_ip in deploy params for later replacement
2152 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2154 # n2vc_redesign STEP 6 Execute initial config primitive
2155 step
= "execute initial config primitive"
2157 # wait for dependent primitives execution (NS -> VNF -> VDU)
2158 if initial_config_primitive_list
:
2159 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2161 # stage, in function of element type: vdu, kdu, vnf or ns
2162 my_vca
= vca_deployed_list
[vca_index
]
2163 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2165 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2166 elif my_vca
.get("member-vnf-index"):
2168 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2171 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2173 self
._write
_configuration
_status
(
2174 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2177 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2179 check_if_terminated_needed
= True
2180 for initial_config_primitive
in initial_config_primitive_list
:
2181 # adding information on the vca_deployed if it is a NS execution environment
2182 if not vca_deployed
["member-vnf-index"]:
2183 deploy_params
["ns_config_info"] = json
.dumps(
2184 self
._get
_ns
_config
_info
(nsr_id
)
2186 # TODO check if already done
2187 primitive_params_
= self
._map
_primitive
_params
(
2188 initial_config_primitive
, {}, deploy_params
2191 step
= "execute primitive '{}' params '{}'".format(
2192 initial_config_primitive
["name"], primitive_params_
2194 self
.logger
.debug(logging_text
+ step
)
2195 await self
.vca_map
[vca_type
].exec_primitive(
2197 primitive_name
=initial_config_primitive
["name"],
2198 params_dict
=primitive_params_
,
2203 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2204 if check_if_terminated_needed
:
2205 if config_descriptor
.get("terminate-config-primitive"):
2207 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2209 check_if_terminated_needed
= False
2211 # TODO register in database that primitive is done
2213 # STEP 7 Configure metrics
2214 if vca_type
== "helm" or vca_type
== "helm-v3":
2215 # TODO: review for those cases where the helm chart is a reference and
2216 # is not part of the NF package
2217 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2219 artifact_path
=artifact_path
,
2220 ee_config_descriptor
=ee_config_descriptor
,
2223 target_ip
=rw_mgmt_ip
,
2229 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2232 for job
in prometheus_jobs
:
2235 {"job_name": job
["job_name"]},
2238 fail_on_empty
=False,
2241 step
= "instantiated at VCA"
2242 self
.logger
.debug(logging_text
+ step
)
2244 self
._write
_configuration
_status
(
2245 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2248 except Exception as e
: # TODO not use Exception but N2VC exception
2249 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2251 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2254 "Exception while {} : {}".format(step
, e
), exc_info
=True
2256 self
._write
_configuration
_status
(
2257 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2259 raise LcmException("{} {}".format(step
, e
)) from e
2261 def _write_ns_status(
2265 current_operation
: str,
2266 current_operation_id
: str,
2267 error_description
: str = None,
2268 error_detail
: str = None,
2269 other_update
: dict = None,
2272 Update db_nsr fields.
2275 :param current_operation:
2276 :param current_operation_id:
2277 :param error_description:
2278 :param error_detail:
2279 :param other_update: Other required changes at database if provided, will be cleared
2283 db_dict
= other_update
or {}
2286 ] = current_operation_id
# for backward compatibility
2287 db_dict
["_admin.current-operation"] = current_operation_id
2288 db_dict
["_admin.operation-type"] = (
2289 current_operation
if current_operation
!= "IDLE" else None
2291 db_dict
["currentOperation"] = current_operation
2292 db_dict
["currentOperationID"] = current_operation_id
2293 db_dict
["errorDescription"] = error_description
2294 db_dict
["errorDetail"] = error_detail
2297 db_dict
["nsState"] = ns_state
2298 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2299 except DbException
as e
:
2300 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2302 def _write_op_status(
2306 error_message
: str = None,
2307 queuePosition
: int = 0,
2308 operation_state
: str = None,
2309 other_update
: dict = None,
2312 db_dict
= other_update
or {}
2313 db_dict
["queuePosition"] = queuePosition
2314 if isinstance(stage
, list):
2315 db_dict
["stage"] = stage
[0]
2316 db_dict
["detailed-status"] = " ".join(stage
)
2317 elif stage
is not None:
2318 db_dict
["stage"] = str(stage
)
2320 if error_message
is not None:
2321 db_dict
["errorMessage"] = error_message
2322 if operation_state
is not None:
2323 db_dict
["operationState"] = operation_state
2324 db_dict
["statusEnteredTime"] = time()
2325 self
.update_db_2("nslcmops", op_id
, db_dict
)
2326 except DbException
as e
:
2328 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2331 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2333 nsr_id
= db_nsr
["_id"]
2334 # configurationStatus
2335 config_status
= db_nsr
.get("configurationStatus")
2338 "configurationStatus.{}.status".format(index
): status
2339 for index
, v
in enumerate(config_status
)
2343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2345 except DbException
as e
:
2347 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2350 def _write_configuration_status(
2355 element_under_configuration
: str = None,
2356 element_type
: str = None,
2357 other_update
: dict = None,
2360 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2361 # .format(vca_index, status))
2364 db_path
= "configurationStatus.{}.".format(vca_index
)
2365 db_dict
= other_update
or {}
2367 db_dict
[db_path
+ "status"] = status
2368 if element_under_configuration
:
2370 db_path
+ "elementUnderConfiguration"
2371 ] = element_under_configuration
2373 db_dict
[db_path
+ "elementType"] = element_type
2374 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2375 except DbException
as e
:
2377 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2378 status
, nsr_id
, vca_index
, e
2382 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2384 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2385 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2386 Database is used because the result can be obtained from a different LCM worker in case of HA.
2387 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2388 :param db_nslcmop: database content of nslcmop
2389 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2390 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2391 computed 'vim-account-id'
2394 nslcmop_id
= db_nslcmop
["_id"]
2395 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2396 if placement_engine
== "PLA":
2398 logging_text
+ "Invoke and wait for placement optimization"
2400 await self
.msg
.aiowrite(
2401 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2403 db_poll_interval
= 5
2404 wait
= db_poll_interval
* 10
2406 while not pla_result
and wait
>= 0:
2407 await asyncio
.sleep(db_poll_interval
)
2408 wait
-= db_poll_interval
2409 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2410 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2414 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2417 for pla_vnf
in pla_result
["vnf"]:
2418 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2419 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2424 {"_id": vnfr
["_id"]},
2425 {"vim-account-id": pla_vnf
["vimAccountId"]},
2428 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2431 def update_nsrs_with_pla_result(self
, params
):
2433 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2435 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2437 except Exception as e
:
2438 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2440 async def instantiate(self
, nsr_id
, nslcmop_id
):
2443 :param nsr_id: ns instance to deploy
2444 :param nslcmop_id: operation to run
2448 # Try to lock HA task here
2449 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2450 if not task_is_locked_by_me
:
2452 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2456 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2457 self
.logger
.debug(logging_text
+ "Enter")
2459 # get all needed from database
2461 # database nsrs record
2464 # database nslcmops record
2467 # update operation on nsrs
2469 # update operation on nslcmops
2470 db_nslcmop_update
= {}
2472 nslcmop_operation_state
= None
2473 db_vnfrs
= {} # vnf's info indexed by member-index
2475 tasks_dict_info
= {} # from task to info text
2479 "Stage 1/5: preparation of the environment.",
2480 "Waiting for previous operations to terminate.",
2483 # ^ stage, step, VIM progress
2485 # wait for any previous tasks in process
2486 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2488 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2489 stage
[1] = "Reading from database."
2490 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2491 db_nsr_update
["detailed-status"] = "creating"
2492 db_nsr_update
["operational-status"] = "init"
2493 self
._write
_ns
_status
(
2495 ns_state
="BUILDING",
2496 current_operation
="INSTANTIATING",
2497 current_operation_id
=nslcmop_id
,
2498 other_update
=db_nsr_update
,
2500 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2502 # read from db: operation
2503 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2504 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2505 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2506 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2507 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2509 ns_params
= db_nslcmop
.get("operationParams")
2510 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2511 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2513 timeout_ns_deploy
= self
.timeout
.get(
2514 "ns_deploy", self
.timeout_ns_deploy
2518 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2519 self
.logger
.debug(logging_text
+ stage
[1])
2520 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2521 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2522 self
.logger
.debug(logging_text
+ stage
[1])
2523 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2524 self
.fs
.sync(db_nsr
["nsd-id"])
2526 # nsr_name = db_nsr["name"] # TODO short-name??
2528 # read from db: vnf's of this ns
2529 stage
[1] = "Getting vnfrs from db."
2530 self
.logger
.debug(logging_text
+ stage
[1])
2531 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2533 # read from db: vnfd's for every vnf
2534 db_vnfds
= [] # every vnfd data
2536 # for each vnf in ns, read vnfd
2537 for vnfr
in db_vnfrs_list
:
2538 if vnfr
.get("kdur"):
2540 for kdur
in vnfr
["kdur"]:
2541 if kdur
.get("additionalParams"):
2542 kdur
["additionalParams"] = json
.loads(
2543 kdur
["additionalParams"]
2545 kdur_list
.append(kdur
)
2546 vnfr
["kdur"] = kdur_list
2548 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2549 vnfd_id
= vnfr
["vnfd-id"]
2550 vnfd_ref
= vnfr
["vnfd-ref"]
2551 self
.fs
.sync(vnfd_id
)
2553 # if we haven't this vnfd, read it from db
2554 if vnfd_id
not in db_vnfds
:
2556 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2559 self
.logger
.debug(logging_text
+ stage
[1])
2560 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2563 db_vnfds
.append(vnfd
)
2565 # Get or generates the _admin.deployed.VCA list
2566 vca_deployed_list
= None
2567 if db_nsr
["_admin"].get("deployed"):
2568 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2569 if vca_deployed_list
is None:
2570 vca_deployed_list
= []
2571 configuration_status_list
= []
2572 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2573 db_nsr_update
["configurationStatus"] = configuration_status_list
2574 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2575 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2576 elif isinstance(vca_deployed_list
, dict):
2577 # maintain backward compatibility. Change a dict to list at database
2578 vca_deployed_list
= list(vca_deployed_list
.values())
2579 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2580 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2583 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2585 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2586 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2588 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2589 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2590 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2592 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2595 # n2vc_redesign STEP 2 Deploy Network Scenario
2596 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2597 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2599 stage
[1] = "Deploying KDUs."
2600 # self.logger.debug(logging_text + "Before deploy_kdus")
2601 # Call to deploy_kdus in case exists the "vdu:kdu" param
2602 await self
.deploy_kdus(
2603 logging_text
=logging_text
,
2605 nslcmop_id
=nslcmop_id
,
2608 task_instantiation_info
=tasks_dict_info
,
2611 stage
[1] = "Getting VCA public key."
2612 # n2vc_redesign STEP 1 Get VCA public ssh-key
2613 # feature 1429. Add n2vc public key to needed VMs
2614 n2vc_key
= self
.n2vc
.get_public_key()
2615 n2vc_key_list
= [n2vc_key
]
2616 if self
.vca_config
.get("public_key"):
2617 n2vc_key_list
.append(self
.vca_config
["public_key"])
2619 stage
[1] = "Deploying NS at VIM."
2620 task_ro
= asyncio
.ensure_future(
2621 self
.instantiate_RO(
2622 logging_text
=logging_text
,
2626 db_nslcmop
=db_nslcmop
,
2629 n2vc_key_list
=n2vc_key_list
,
2633 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2634 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2636 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2637 stage
[1] = "Deploying Execution Environments."
2638 self
.logger
.debug(logging_text
+ stage
[1])
2640 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2641 for vnf_profile
in get_vnf_profiles(nsd
):
2642 vnfd_id
= vnf_profile
["vnfd-id"]
2643 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2644 member_vnf_index
= str(vnf_profile
["id"])
2645 db_vnfr
= db_vnfrs
[member_vnf_index
]
2646 base_folder
= vnfd
["_admin"]["storage"]
2652 # Get additional parameters
2653 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2654 if db_vnfr
.get("additionalParamsForVnf"):
2655 deploy_params
.update(
2656 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2659 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2660 if descriptor_config
:
2662 logging_text
=logging_text
2663 + "member_vnf_index={} ".format(member_vnf_index
),
2666 nslcmop_id
=nslcmop_id
,
2672 member_vnf_index
=member_vnf_index
,
2673 vdu_index
=vdu_index
,
2675 deploy_params
=deploy_params
,
2676 descriptor_config
=descriptor_config
,
2677 base_folder
=base_folder
,
2678 task_instantiation_info
=tasks_dict_info
,
2682 # Deploy charms for each VDU that supports one.
2683 for vdud
in get_vdu_list(vnfd
):
2685 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2686 vdur
= find_in_list(
2687 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2690 if vdur
.get("additionalParams"):
2691 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2693 deploy_params_vdu
= deploy_params
2694 deploy_params_vdu
["OSM"] = get_osm_params(
2695 db_vnfr
, vdu_id
, vdu_count_index
=0
2697 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2699 self
.logger
.debug("VDUD > {}".format(vdud
))
2701 "Descriptor config > {}".format(descriptor_config
)
2703 if descriptor_config
:
2706 for vdu_index
in range(vdud_count
):
2707 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2709 logging_text
=logging_text
2710 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2711 member_vnf_index
, vdu_id
, vdu_index
2715 nslcmop_id
=nslcmop_id
,
2721 member_vnf_index
=member_vnf_index
,
2722 vdu_index
=vdu_index
,
2724 deploy_params
=deploy_params_vdu
,
2725 descriptor_config
=descriptor_config
,
2726 base_folder
=base_folder
,
2727 task_instantiation_info
=tasks_dict_info
,
2730 for kdud
in get_kdu_list(vnfd
):
2731 kdu_name
= kdud
["name"]
2732 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2733 if descriptor_config
:
2738 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2740 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2741 if kdur
.get("additionalParams"):
2742 deploy_params_kdu
.update(
2743 parse_yaml_strings(kdur
["additionalParams"].copy())
2747 logging_text
=logging_text
,
2750 nslcmop_id
=nslcmop_id
,
2756 member_vnf_index
=member_vnf_index
,
2757 vdu_index
=vdu_index
,
2759 deploy_params
=deploy_params_kdu
,
2760 descriptor_config
=descriptor_config
,
2761 base_folder
=base_folder
,
2762 task_instantiation_info
=tasks_dict_info
,
2766 # Check if this NS has a charm configuration
2767 descriptor_config
= nsd
.get("ns-configuration")
2768 if descriptor_config
and descriptor_config
.get("juju"):
2771 member_vnf_index
= None
2777 # Get additional parameters
2778 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2779 if db_nsr
.get("additionalParamsForNs"):
2780 deploy_params
.update(
2781 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2783 base_folder
= nsd
["_admin"]["storage"]
2785 logging_text
=logging_text
,
2788 nslcmop_id
=nslcmop_id
,
2794 member_vnf_index
=member_vnf_index
,
2795 vdu_index
=vdu_index
,
2797 deploy_params
=deploy_params
,
2798 descriptor_config
=descriptor_config
,
2799 base_folder
=base_folder
,
2800 task_instantiation_info
=tasks_dict_info
,
2804 # rest of staff will be done at finally
2807 ROclient
.ROClientException
,
2813 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2816 except asyncio
.CancelledError
:
2818 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2820 exc
= "Operation was cancelled"
2821 except Exception as e
:
2822 exc
= traceback
.format_exc()
2823 self
.logger
.critical(
2824 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2829 error_list
.append(str(exc
))
2831 # wait for pending tasks
2833 stage
[1] = "Waiting for instantiate pending tasks."
2834 self
.logger
.debug(logging_text
+ stage
[1])
2835 error_list
+= await self
._wait
_for
_tasks
(
2843 stage
[1] = stage
[2] = ""
2844 except asyncio
.CancelledError
:
2845 error_list
.append("Cancelled")
2846 # TODO cancel all tasks
2847 except Exception as exc
:
2848 error_list
.append(str(exc
))
2850 # update operation-status
2851 db_nsr_update
["operational-status"] = "running"
2852 # let's begin with VCA 'configured' status (later we can change it)
2853 db_nsr_update
["config-status"] = "configured"
2854 for task
, task_name
in tasks_dict_info
.items():
2855 if not task
.done() or task
.cancelled() or task
.exception():
2856 if task_name
.startswith(self
.task_name_deploy_vca
):
2857 # A N2VC task is pending
2858 db_nsr_update
["config-status"] = "failed"
2860 # RO or KDU task is pending
2861 db_nsr_update
["operational-status"] = "failed"
2863 # update status at database
2865 error_detail
= ". ".join(error_list
)
2866 self
.logger
.error(logging_text
+ error_detail
)
2867 error_description_nslcmop
= "{} Detail: {}".format(
2868 stage
[0], error_detail
2870 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2871 nslcmop_id
, stage
[0]
2874 db_nsr_update
["detailed-status"] = (
2875 error_description_nsr
+ " Detail: " + error_detail
2877 db_nslcmop_update
["detailed-status"] = error_detail
2878 nslcmop_operation_state
= "FAILED"
2882 error_description_nsr
= error_description_nslcmop
= None
2884 db_nsr_update
["detailed-status"] = "Done"
2885 db_nslcmop_update
["detailed-status"] = "Done"
2886 nslcmop_operation_state
= "COMPLETED"
2889 self
._write
_ns
_status
(
2892 current_operation
="IDLE",
2893 current_operation_id
=None,
2894 error_description
=error_description_nsr
,
2895 error_detail
=error_detail
,
2896 other_update
=db_nsr_update
,
2898 self
._write
_op
_status
(
2901 error_message
=error_description_nslcmop
,
2902 operation_state
=nslcmop_operation_state
,
2903 other_update
=db_nslcmop_update
,
2906 if nslcmop_operation_state
:
2908 await self
.msg
.aiowrite(
2913 "nslcmop_id": nslcmop_id
,
2914 "operationState": nslcmop_operation_state
,
2918 except Exception as e
:
2920 logging_text
+ "kafka_write notification Exception {}".format(e
)
2923 self
.logger
.debug(logging_text
+ "Exit")
2924 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2926 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2927 if vnfd_id
not in cached_vnfds
:
2928 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2929 return cached_vnfds
[vnfd_id
]
2931 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2932 if vnf_profile_id
not in cached_vnfrs
:
2933 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2936 "member-vnf-index-ref": vnf_profile_id
,
2937 "nsr-id-ref": nsr_id
,
2940 return cached_vnfrs
[vnf_profile_id
]
2942 def _is_deployed_vca_in_relation(
2943 self
, vca
: DeployedVCA
, relation
: Relation
2946 for endpoint
in (relation
.provider
, relation
.requirer
):
2947 if endpoint
["kdu-resource-profile-id"]:
2950 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2951 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2952 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2958 def _update_ee_relation_data_with_implicit_data(
2959 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2961 ee_relation_data
= safe_get_ee_relation(
2962 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2964 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2965 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2966 "execution-environment-ref"
2968 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2969 vnfd_id
= vnf_profile
["vnfd-id"]
2970 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2973 if ee_relation_level
== EELevel
.VNF
2974 else ee_relation_data
["vdu-profile-id"]
2976 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2979 f
"not execution environments found for ee_relation {ee_relation_data}"
2981 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2982 return ee_relation_data
2984 def _get_ns_relations(
2987 nsd
: Dict
[str, Any
],
2989 cached_vnfds
: Dict
[str, Any
],
2990 ) -> List
[Relation
]:
2992 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2993 for r
in db_ns_relations
:
2994 provider_dict
= None
2995 requirer_dict
= None
2996 if all(key
in r
for key
in ("provider", "requirer")):
2997 provider_dict
= r
["provider"]
2998 requirer_dict
= r
["requirer"]
2999 elif "entities" in r
:
3000 provider_id
= r
["entities"][0]["id"]
3003 "endpoint": r
["entities"][0]["endpoint"],
3005 if provider_id
!= nsd
["id"]:
3006 provider_dict
["vnf-profile-id"] = provider_id
3007 requirer_id
= r
["entities"][1]["id"]
3010 "endpoint": r
["entities"][1]["endpoint"],
3012 if requirer_id
!= nsd
["id"]:
3013 requirer_dict
["vnf-profile-id"] = requirer_id
3016 "provider/requirer or entities must be included in the relation."
3018 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3019 nsr_id
, nsd
, provider_dict
, cached_vnfds
3021 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3022 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3024 provider
= EERelation(relation_provider
)
3025 requirer
= EERelation(relation_requirer
)
3026 relation
= Relation(r
["name"], provider
, requirer
)
3027 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3029 relations
.append(relation
)
3032 def _get_vnf_relations(
3035 nsd
: Dict
[str, Any
],
3037 cached_vnfds
: Dict
[str, Any
],
3038 ) -> List
[Relation
]:
3040 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3041 vnf_profile_id
= vnf_profile
["id"]
3042 vnfd_id
= vnf_profile
["vnfd-id"]
3043 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3044 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3045 for r
in db_vnf_relations
:
3046 provider_dict
= None
3047 requirer_dict
= None
3048 if all(key
in r
for key
in ("provider", "requirer")):
3049 provider_dict
= r
["provider"]
3050 requirer_dict
= r
["requirer"]
3051 elif "entities" in r
:
3052 provider_id
= r
["entities"][0]["id"]
3055 "vnf-profile-id": vnf_profile_id
,
3056 "endpoint": r
["entities"][0]["endpoint"],
3058 if provider_id
!= vnfd_id
:
3059 provider_dict
["vdu-profile-id"] = provider_id
3060 requirer_id
= r
["entities"][1]["id"]
3063 "vnf-profile-id": vnf_profile_id
,
3064 "endpoint": r
["entities"][1]["endpoint"],
3066 if requirer_id
!= vnfd_id
:
3067 requirer_dict
["vdu-profile-id"] = requirer_id
3070 "provider/requirer or entities must be included in the relation."
3072 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3073 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3075 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3076 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3078 provider
= EERelation(relation_provider
)
3079 requirer
= EERelation(relation_requirer
)
3080 relation
= Relation(r
["name"], provider
, requirer
)
3081 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3083 relations
.append(relation
)
3086 def _get_kdu_resource_data(
3088 ee_relation
: EERelation
,
3089 db_nsr
: Dict
[str, Any
],
3090 cached_vnfds
: Dict
[str, Any
],
3091 ) -> DeployedK8sResource
:
3092 nsd
= get_nsd(db_nsr
)
3093 vnf_profiles
= get_vnf_profiles(nsd
)
3094 vnfd_id
= find_in_list(
3096 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3098 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3099 kdu_resource_profile
= get_kdu_resource_profile(
3100 db_vnfd
, ee_relation
.kdu_resource_profile_id
3102 kdu_name
= kdu_resource_profile
["kdu-name"]
3103 deployed_kdu
, _
= get_deployed_kdu(
3104 db_nsr
.get("_admin", ()).get("deployed", ()),
3106 ee_relation
.vnf_profile_id
,
3108 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3111 def _get_deployed_component(
3113 ee_relation
: EERelation
,
3114 db_nsr
: Dict
[str, Any
],
3115 cached_vnfds
: Dict
[str, Any
],
3116 ) -> DeployedComponent
:
3117 nsr_id
= db_nsr
["_id"]
3118 deployed_component
= None
3119 ee_level
= EELevel
.get_level(ee_relation
)
3120 if ee_level
== EELevel
.NS
:
3121 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3123 deployed_component
= DeployedVCA(nsr_id
, vca
)
3124 elif ee_level
== EELevel
.VNF
:
3125 vca
= get_deployed_vca(
3129 "member-vnf-index": ee_relation
.vnf_profile_id
,
3130 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3134 deployed_component
= DeployedVCA(nsr_id
, vca
)
3135 elif ee_level
== EELevel
.VDU
:
3136 vca
= get_deployed_vca(
3139 "vdu_id": ee_relation
.vdu_profile_id
,
3140 "member-vnf-index": ee_relation
.vnf_profile_id
,
3141 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3145 deployed_component
= DeployedVCA(nsr_id
, vca
)
3146 elif ee_level
== EELevel
.KDU
:
3147 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3148 ee_relation
, db_nsr
, cached_vnfds
3150 if kdu_resource_data
:
3151 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3152 return deployed_component
3154 async def _add_relation(
3158 db_nsr
: Dict
[str, Any
],
3159 cached_vnfds
: Dict
[str, Any
],
3160 cached_vnfrs
: Dict
[str, Any
],
3162 deployed_provider
= self
._get
_deployed
_component
(
3163 relation
.provider
, db_nsr
, cached_vnfds
3165 deployed_requirer
= self
._get
_deployed
_component
(
3166 relation
.requirer
, db_nsr
, cached_vnfds
3170 and deployed_requirer
3171 and deployed_provider
.config_sw_installed
3172 and deployed_requirer
.config_sw_installed
3174 provider_db_vnfr
= (
3176 relation
.provider
.nsr_id
,
3177 relation
.provider
.vnf_profile_id
,
3180 if relation
.provider
.vnf_profile_id
3183 requirer_db_vnfr
= (
3185 relation
.requirer
.nsr_id
,
3186 relation
.requirer
.vnf_profile_id
,
3189 if relation
.requirer
.vnf_profile_id
3192 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3193 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3194 provider_relation_endpoint
= RelationEndpoint(
3195 deployed_provider
.ee_id
,
3197 relation
.provider
.endpoint
,
3199 requirer_relation_endpoint
= RelationEndpoint(
3200 deployed_requirer
.ee_id
,
3202 relation
.requirer
.endpoint
,
3204 await self
.vca_map
[vca_type
].add_relation(
3205 provider
=provider_relation_endpoint
,
3206 requirer
=requirer_relation_endpoint
,
3208 # remove entry from relations list
3212 async def _add_vca_relations(
3218 timeout
: int = 3600,
3222 # 1. find all relations for this VCA
3223 # 2. wait for other peers related
3227 # STEP 1: find all relations for this VCA
3230 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3231 nsd
= get_nsd(db_nsr
)
3234 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3235 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3240 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3241 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3243 # if no relations, terminate
3245 self
.logger
.debug(logging_text
+ " No relations")
3248 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3255 if now
- start
>= timeout
:
3256 self
.logger
.error(logging_text
+ " : timeout adding relations")
3259 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3260 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3262 # for each relation, find the VCA's related
3263 for relation
in relations
.copy():
3264 added
= await self
._add
_relation
(
3272 relations
.remove(relation
)
3275 self
.logger
.debug("Relations added")
3277 await asyncio
.sleep(5.0)
3281 except Exception as e
:
3282 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3285 async def _install_kdu(
3293 k8s_instance_info
: dict,
3294 k8params
: dict = None,
3300 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3303 "collection": "nsrs",
3304 "filter": {"_id": nsr_id
},
3305 "path": nsr_db_path
,
3308 if k8s_instance_info
.get("kdu-deployment-name"):
3309 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3311 kdu_instance
= self
.k8scluster_map
[
3313 ].generate_kdu_instance_name(
3314 db_dict
=db_dict_install
,
3315 kdu_model
=k8s_instance_info
["kdu-model"],
3316 kdu_name
=k8s_instance_info
["kdu-name"],
3319 # Update the nsrs table with the kdu-instance value
3323 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3326 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3327 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3328 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3329 # namespace, this first verification could be removed, and the next step would be done for any kind
3331 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3332 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3333 if k8sclustertype
in ("juju", "juju-bundle"):
3334 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3335 # that the user passed a namespace which he wants its KDU to be deployed in)
3341 "_admin.projects_write": k8s_instance_info
["namespace"],
3342 "_admin.projects_read": k8s_instance_info
["namespace"],
3348 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3353 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3355 k8s_instance_info
["namespace"] = kdu_instance
3357 await self
.k8scluster_map
[k8sclustertype
].install(
3358 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3359 kdu_model
=k8s_instance_info
["kdu-model"],
3362 db_dict
=db_dict_install
,
3364 kdu_name
=k8s_instance_info
["kdu-name"],
3365 namespace
=k8s_instance_info
["namespace"],
3366 kdu_instance
=kdu_instance
,
3370 # Obtain services to obtain management service ip
3371 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3372 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3373 kdu_instance
=kdu_instance
,
3374 namespace
=k8s_instance_info
["namespace"],
3377 # Obtain management service info (if exists)
3378 vnfr_update_dict
= {}
3379 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3381 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3386 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3389 for service
in kdud
.get("service", [])
3390 if service
.get("mgmt-service")
3392 for mgmt_service
in mgmt_services
:
3393 for service
in services
:
3394 if service
["name"].startswith(mgmt_service
["name"]):
3395 # Mgmt service found, Obtain service ip
3396 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3397 if isinstance(ip
, list) and len(ip
) == 1:
3401 "kdur.{}.ip-address".format(kdu_index
)
3404 # Check if must update also mgmt ip at the vnf
3405 service_external_cp
= mgmt_service
.get(
3406 "external-connection-point-ref"
3408 if service_external_cp
:
3410 deep_get(vnfd
, ("mgmt-interface", "cp"))
3411 == service_external_cp
3413 vnfr_update_dict
["ip-address"] = ip
3418 "external-connection-point-ref", ""
3420 == service_external_cp
,
3423 "kdur.{}.ip-address".format(kdu_index
)
3428 "Mgmt service name: {} not found".format(
3429 mgmt_service
["name"]
3433 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3434 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3436 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3439 and kdu_config
.get("initial-config-primitive")
3440 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3442 initial_config_primitive_list
= kdu_config
.get(
3443 "initial-config-primitive"
3445 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3447 for initial_config_primitive
in initial_config_primitive_list
:
3448 primitive_params_
= self
._map
_primitive
_params
(
3449 initial_config_primitive
, {}, {}
3452 await asyncio
.wait_for(
3453 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3454 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3455 kdu_instance
=kdu_instance
,
3456 primitive_name
=initial_config_primitive
["name"],
3457 params
=primitive_params_
,
3458 db_dict
=db_dict_install
,
3464 except Exception as e
:
3465 # Prepare update db with error and raise exception
3468 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3472 vnfr_data
.get("_id"),
3473 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3476 # ignore to keep original exception
3478 # reraise original error
3483 async def deploy_kdus(
3490 task_instantiation_info
,
3492 # Launch kdus if present in the descriptor
3494 k8scluster_id_2_uuic
= {
3495 "helm-chart-v3": {},
3500 async def _get_cluster_id(cluster_id
, cluster_type
):
3501 nonlocal k8scluster_id_2_uuic
3502 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3503 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3505 # check if K8scluster is creating and wait look if previous tasks in process
3506 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3507 "k8scluster", cluster_id
3510 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3511 task_name
, cluster_id
3513 self
.logger
.debug(logging_text
+ text
)
3514 await asyncio
.wait(task_dependency
, timeout
=3600)
3516 db_k8scluster
= self
.db
.get_one(
3517 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3519 if not db_k8scluster
:
3520 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3522 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3524 if cluster_type
== "helm-chart-v3":
3526 # backward compatibility for existing clusters that have not been initialized for helm v3
3527 k8s_credentials
= yaml
.safe_dump(
3528 db_k8scluster
.get("credentials")
3530 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3531 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3533 db_k8scluster_update
= {}
3534 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3535 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3536 db_k8scluster_update
[
3537 "_admin.helm-chart-v3.created"
3539 db_k8scluster_update
[
3540 "_admin.helm-chart-v3.operationalState"
3543 "k8sclusters", cluster_id
, db_k8scluster_update
3545 except Exception as e
:
3548 + "error initializing helm-v3 cluster: {}".format(str(e
))
3551 "K8s cluster '{}' has not been initialized for '{}'".format(
3552 cluster_id
, cluster_type
3557 "K8s cluster '{}' has not been initialized for '{}'".format(
3558 cluster_id
, cluster_type
3561 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3564 logging_text
+= "Deploy kdus: "
3567 db_nsr_update
= {"_admin.deployed.K8s": []}
3568 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3571 updated_cluster_list
= []
3572 updated_v3_cluster_list
= []
3574 for vnfr_data
in db_vnfrs
.values():
3575 vca_id
= self
.get_vca_id(vnfr_data
, {})
3576 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3577 # Step 0: Prepare and set parameters
3578 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3579 vnfd_id
= vnfr_data
.get("vnfd-id")
3580 vnfd_with_id
= find_in_list(
3581 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3585 for kdud
in vnfd_with_id
["kdu"]
3586 if kdud
["name"] == kdur
["kdu-name"]
3588 namespace
= kdur
.get("k8s-namespace")
3589 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3590 if kdur
.get("helm-chart"):
3591 kdumodel
= kdur
["helm-chart"]
3592 # Default version: helm3, if helm-version is v2 assign v2
3593 k8sclustertype
= "helm-chart-v3"
3594 self
.logger
.debug("kdur: {}".format(kdur
))
3596 kdur
.get("helm-version")
3597 and kdur
.get("helm-version") == "v2"
3599 k8sclustertype
= "helm-chart"
3600 elif kdur
.get("juju-bundle"):
3601 kdumodel
= kdur
["juju-bundle"]
3602 k8sclustertype
= "juju-bundle"
3605 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3606 "juju-bundle. Maybe an old NBI version is running".format(
3607 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3610 # check if kdumodel is a file and exists
3612 vnfd_with_id
= find_in_list(
3613 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3615 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3616 if storage
: # may be not present if vnfd has not artifacts
3617 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3618 if storage
["pkg-dir"]:
3619 filename
= "{}/{}/{}s/{}".format(
3626 filename
= "{}/Scripts/{}s/{}".format(
3631 if self
.fs
.file_exists(
3632 filename
, mode
="file"
3633 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3634 kdumodel
= self
.fs
.path
+ filename
3635 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3637 except Exception: # it is not a file
3640 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3641 step
= "Synchronize repos for k8s cluster '{}'".format(
3644 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3648 k8sclustertype
== "helm-chart"
3649 and cluster_uuid
not in updated_cluster_list
3651 k8sclustertype
== "helm-chart-v3"
3652 and cluster_uuid
not in updated_v3_cluster_list
3654 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3655 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3656 cluster_uuid
=cluster_uuid
3659 if del_repo_list
or added_repo_dict
:
3660 if k8sclustertype
== "helm-chart":
3662 "_admin.helm_charts_added." + item
: None
3663 for item
in del_repo_list
3666 "_admin.helm_charts_added." + item
: name
3667 for item
, name
in added_repo_dict
.items()
3669 updated_cluster_list
.append(cluster_uuid
)
3670 elif k8sclustertype
== "helm-chart-v3":
3672 "_admin.helm_charts_v3_added." + item
: None
3673 for item
in del_repo_list
3676 "_admin.helm_charts_v3_added." + item
: name
3677 for item
, name
in added_repo_dict
.items()
3679 updated_v3_cluster_list
.append(cluster_uuid
)
3681 logging_text
+ "repos synchronized on k8s cluster "
3682 "'{}' to_delete: {}, to_add: {}".format(
3683 k8s_cluster_id
, del_repo_list
, added_repo_dict
3688 {"_id": k8s_cluster_id
},
3694 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3695 vnfr_data
["member-vnf-index-ref"],
3699 k8s_instance_info
= {
3700 "kdu-instance": None,
3701 "k8scluster-uuid": cluster_uuid
,
3702 "k8scluster-type": k8sclustertype
,
3703 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3704 "kdu-name": kdur
["kdu-name"],
3705 "kdu-model": kdumodel
,
3706 "namespace": namespace
,
3707 "kdu-deployment-name": kdu_deployment_name
,
3709 db_path
= "_admin.deployed.K8s.{}".format(index
)
3710 db_nsr_update
[db_path
] = k8s_instance_info
3711 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3712 vnfd_with_id
= find_in_list(
3713 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3715 task
= asyncio
.ensure_future(
3724 k8params
=desc_params
,
3729 self
.lcm_tasks
.register(
3733 "instantiate_KDU-{}".format(index
),
3736 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3742 except (LcmException
, asyncio
.CancelledError
):
3744 except Exception as e
:
3745 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3746 if isinstance(e
, (N2VCException
, DbException
)):
3747 self
.logger
.error(logging_text
+ msg
)
3749 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3750 raise LcmException(msg
)
3753 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3772 task_instantiation_info
,
3775 # launch instantiate_N2VC in a asyncio task and register task object
3776 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3777 # if not found, create one entry and update database
3778 # fill db_nsr._admin.deployed.VCA.<index>
3781 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3785 get_charm_name
= False
3786 if "execution-environment-list" in descriptor_config
:
3787 ee_list
= descriptor_config
.get("execution-environment-list", [])
3788 elif "juju" in descriptor_config
:
3789 ee_list
= [descriptor_config
] # ns charms
3790 if "execution-environment-list" not in descriptor_config
:
3791 # charm name is only required for ns charms
3792 get_charm_name
= True
3793 else: # other types as script are not supported
3796 for ee_item
in ee_list
:
3799 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3800 ee_item
.get("juju"), ee_item
.get("helm-chart")
3803 ee_descriptor_id
= ee_item
.get("id")
3804 if ee_item
.get("juju"):
3805 vca_name
= ee_item
["juju"].get("charm")
3807 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3810 if ee_item
["juju"].get("charm") is not None
3813 if ee_item
["juju"].get("cloud") == "k8s":
3814 vca_type
= "k8s_proxy_charm"
3815 elif ee_item
["juju"].get("proxy") is False:
3816 vca_type
= "native_charm"
3817 elif ee_item
.get("helm-chart"):
3818 vca_name
= ee_item
["helm-chart"]
3819 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3822 vca_type
= "helm-v3"
3825 logging_text
+ "skipping non juju neither charm configuration"
3830 for vca_index
, vca_deployed
in enumerate(
3831 db_nsr
["_admin"]["deployed"]["VCA"]
3833 if not vca_deployed
:
3836 vca_deployed
.get("member-vnf-index") == member_vnf_index
3837 and vca_deployed
.get("vdu_id") == vdu_id
3838 and vca_deployed
.get("kdu_name") == kdu_name
3839 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3840 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3844 # not found, create one.
3846 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3849 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3851 target
+= "/kdu/{}".format(kdu_name
)
3853 "target_element": target
,
3854 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3855 "member-vnf-index": member_vnf_index
,
3857 "kdu_name": kdu_name
,
3858 "vdu_count_index": vdu_index
,
3859 "operational-status": "init", # TODO revise
3860 "detailed-status": "", # TODO revise
3861 "step": "initial-deploy", # TODO revise
3863 "vdu_name": vdu_name
,
3865 "ee_descriptor_id": ee_descriptor_id
,
3866 "charm_name": charm_name
,
3870 # create VCA and configurationStatus in db
3872 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3873 "configurationStatus.{}".format(vca_index
): dict(),
3875 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3877 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3879 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3880 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3881 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3884 task_n2vc
= asyncio
.ensure_future(
3885 self
.instantiate_N2VC(
3886 logging_text
=logging_text
,
3887 vca_index
=vca_index
,
3893 vdu_index
=vdu_index
,
3894 deploy_params
=deploy_params
,
3895 config_descriptor
=descriptor_config
,
3896 base_folder
=base_folder
,
3897 nslcmop_id
=nslcmop_id
,
3901 ee_config_descriptor
=ee_item
,
3904 self
.lcm_tasks
.register(
3908 "instantiate_N2VC-{}".format(vca_index
),
3911 task_instantiation_info
[
3913 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3914 member_vnf_index
or "", vdu_id
or ""
3918 def _create_nslcmop(nsr_id
, operation
, params
):
3920 Creates a ns-lcm-opp content to be stored at database.
3921 :param nsr_id: internal id of the instance
3922 :param operation: instantiate, terminate, scale, action, ...
3923 :param params: user parameters for the operation
3924 :return: dictionary following SOL005 format
3926 # Raise exception if invalid arguments
3927 if not (nsr_id
and operation
and params
):
3929 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3936 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3937 "operationState": "PROCESSING",
3938 "statusEnteredTime": now
,
3939 "nsInstanceId": nsr_id
,
3940 "lcmOperationType": operation
,
3942 "isAutomaticInvocation": False,
3943 "operationParams": params
,
3944 "isCancelPending": False,
3946 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3947 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3952 def _format_additional_params(self
, params
):
3953 params
= params
or {}
3954 for key
, value
in params
.items():
3955 if str(value
).startswith("!!yaml "):
3956 params
[key
] = yaml
.safe_load(value
[7:])
3959 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3960 primitive
= seq
.get("name")
3961 primitive_params
= {}
3963 "member_vnf_index": vnf_index
,
3964 "primitive": primitive
,
3965 "primitive_params": primitive_params
,
3968 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3972 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3973 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3974 if op
.get("operationState") == "COMPLETED":
3975 # b. Skip sub-operation
3976 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3977 return self
.SUBOPERATION_STATUS_SKIP
3979 # c. retry executing sub-operation
3980 # The sub-operation exists, and operationState != 'COMPLETED'
3981 # Update operationState = 'PROCESSING' to indicate a retry.
3982 operationState
= "PROCESSING"
3983 detailed_status
= "In progress"
3984 self
._update
_suboperation
_status
(
3985 db_nslcmop
, op_index
, operationState
, detailed_status
3987 # Return the sub-operation index
3988 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3989 # with arguments extracted from the sub-operation
3992 # Find a sub-operation where all keys in a matching dictionary must match
3993 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3994 def _find_suboperation(self
, db_nslcmop
, match
):
3995 if db_nslcmop
and match
:
3996 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3997 for i
, op
in enumerate(op_list
):
3998 if all(op
.get(k
) == match
[k
] for k
in match
):
4000 return self
.SUBOPERATION_STATUS_NOT_FOUND
4002 # Update status for a sub-operation given its index
4003 def _update_suboperation_status(
4004 self
, db_nslcmop
, op_index
, operationState
, detailed_status
4006 # Update DB for HA tasks
4007 q_filter
= {"_id": db_nslcmop
["_id"]}
4009 "_admin.operations.{}.operationState".format(op_index
): operationState
,
4010 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
4013 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
4016 # Add sub-operation, return the index of the added sub-operation
4017 # Optionally, set operationState, detailed-status, and operationType
4018 # Status and type are currently set for 'scale' sub-operations:
4019 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4020 # 'detailed-status' : status message
4021 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4022 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4023 def _add_suboperation(
4031 mapped_primitive_params
,
4032 operationState
=None,
4033 detailed_status
=None,
4036 RO_scaling_info
=None,
4039 return self
.SUBOPERATION_STATUS_NOT_FOUND
4040 # Get the "_admin.operations" list, if it exists
4041 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4042 op_list
= db_nslcmop_admin
.get("operations")
4043 # Create or append to the "_admin.operations" list
4045 "member_vnf_index": vnf_index
,
4047 "vdu_count_index": vdu_count_index
,
4048 "primitive": primitive
,
4049 "primitive_params": mapped_primitive_params
,
4052 new_op
["operationState"] = operationState
4054 new_op
["detailed-status"] = detailed_status
4056 new_op
["lcmOperationType"] = operationType
4058 new_op
["RO_nsr_id"] = RO_nsr_id
4060 new_op
["RO_scaling_info"] = RO_scaling_info
4062 # No existing operations, create key 'operations' with current operation as first list element
4063 db_nslcmop_admin
.update({"operations": [new_op
]})
4064 op_list
= db_nslcmop_admin
.get("operations")
4066 # Existing operations, append operation to list
4067 op_list
.append(new_op
)
4069 db_nslcmop_update
= {"_admin.operations": op_list
}
4070 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4071 op_index
= len(op_list
) - 1
4074 # Helper methods for scale() sub-operations
4076 # pre-scale/post-scale:
4077 # Check for 3 different cases:
4078 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4079 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4080 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4081 def _check_or_add_scale_suboperation(
4085 vnf_config_primitive
,
4089 RO_scaling_info
=None,
4091 # Find this sub-operation
4092 if RO_nsr_id
and RO_scaling_info
:
4093 operationType
= "SCALE-RO"
4095 "member_vnf_index": vnf_index
,
4096 "RO_nsr_id": RO_nsr_id
,
4097 "RO_scaling_info": RO_scaling_info
,
4101 "member_vnf_index": vnf_index
,
4102 "primitive": vnf_config_primitive
,
4103 "primitive_params": primitive_params
,
4104 "lcmOperationType": operationType
,
4106 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4107 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4108 # a. New sub-operation
4109 # The sub-operation does not exist, add it.
4110 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4111 # The following parameters are set to None for all kind of scaling:
4113 vdu_count_index
= None
4115 if RO_nsr_id
and RO_scaling_info
:
4116 vnf_config_primitive
= None
4117 primitive_params
= None
4120 RO_scaling_info
= None
4121 # Initial status for sub-operation
4122 operationState
= "PROCESSING"
4123 detailed_status
= "In progress"
4124 # Add sub-operation for pre/post-scaling (zero or more operations)
4125 self
._add
_suboperation
(
4131 vnf_config_primitive
,
4139 return self
.SUBOPERATION_STATUS_NEW
4141 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4142 # or op_index (operationState != 'COMPLETED')
4143 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4145 # Function to return execution_environment id
4147 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4148 # TODO vdu_index_count
4149 for vca
in vca_deployed_list
:
4150 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4153 async def destroy_N2VC(
4161 exec_primitives
=True,
4166 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4167 :param logging_text:
4169 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4170 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4171 :param vca_index: index in the database _admin.deployed.VCA
4172 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4173 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4174 not executed properly
4175 :param scaling_in: True destroys the application, False destroys the model
4176 :return: None or exception
4181 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4182 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4186 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4188 # execute terminate_primitives
4190 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4191 config_descriptor
.get("terminate-config-primitive"),
4192 vca_deployed
.get("ee_descriptor_id"),
4194 vdu_id
= vca_deployed
.get("vdu_id")
4195 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4196 vdu_name
= vca_deployed
.get("vdu_name")
4197 vnf_index
= vca_deployed
.get("member-vnf-index")
4198 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4199 for seq
in terminate_primitives
:
4200 # For each sequence in list, get primitive and call _ns_execute_primitive()
4201 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4202 vnf_index
, seq
.get("name")
4204 self
.logger
.debug(logging_text
+ step
)
4205 # Create the primitive for each sequence, i.e. "primitive": "touch"
4206 primitive
= seq
.get("name")
4207 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4212 self
._add
_suboperation
(
4219 mapped_primitive_params
,
4221 # Sub-operations: Call _ns_execute_primitive() instead of action()
4223 result
, result_detail
= await self
._ns
_execute
_primitive
(
4224 vca_deployed
["ee_id"],
4226 mapped_primitive_params
,
4230 except LcmException
:
4231 # this happens when VCA is not deployed. In this case it is not needed to terminate
4233 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4234 if result
not in result_ok
:
4236 "terminate_primitive {} for vnf_member_index={} fails with "
4237 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4239 # set that this VCA do not need terminated
4240 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4244 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4247 # Delete Prometheus Jobs if any
4248 # This uses NSR_ID, so it will destroy any jobs under this index
4249 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4252 await self
.vca_map
[vca_type
].delete_execution_environment(
4253 vca_deployed
["ee_id"],
4254 scaling_in
=scaling_in
,
4259 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4260 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4261 namespace
= "." + db_nsr
["_id"]
4263 await self
.n2vc
.delete_namespace(
4264 namespace
=namespace
,
4265 total_timeout
=self
.timeout_charm_delete
,
4268 except N2VCNotFound
: # already deleted. Skip
4270 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4272 async def _terminate_RO(
4273 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4276 Terminates a deployment from RO
4277 :param logging_text:
4278 :param nsr_deployed: db_nsr._admin.deployed
4281 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4282 this method will update only the index 2, but it will write on database the concatenated content of the list
4287 ro_nsr_id
= ro_delete_action
= None
4288 if nsr_deployed
and nsr_deployed
.get("RO"):
4289 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4290 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4293 stage
[2] = "Deleting ns from VIM."
4294 db_nsr_update
["detailed-status"] = " ".join(stage
)
4295 self
._write
_op
_status
(nslcmop_id
, stage
)
4296 self
.logger
.debug(logging_text
+ stage
[2])
4297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4298 self
._write
_op
_status
(nslcmop_id
, stage
)
4299 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4300 ro_delete_action
= desc
["action_id"]
4302 "_admin.deployed.RO.nsr_delete_action_id"
4303 ] = ro_delete_action
4304 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4305 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4306 if ro_delete_action
:
4307 # wait until NS is deleted from VIM
4308 stage
[2] = "Waiting ns deleted from VIM."
4309 detailed_status_old
= None
4313 + " RO_id={} ro_delete_action={}".format(
4314 ro_nsr_id
, ro_delete_action
4317 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4318 self
._write
_op
_status
(nslcmop_id
, stage
)
4320 delete_timeout
= 20 * 60 # 20 minutes
4321 while delete_timeout
> 0:
4322 desc
= await self
.RO
.show(
4324 item_id_name
=ro_nsr_id
,
4325 extra_item
="action",
4326 extra_item_id
=ro_delete_action
,
4330 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4332 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4333 if ns_status
== "ERROR":
4334 raise ROclient
.ROClientException(ns_status_info
)
4335 elif ns_status
== "BUILD":
4336 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4337 elif ns_status
== "ACTIVE":
4338 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4339 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4344 ), "ROclient.check_action_status returns unknown {}".format(
4347 if stage
[2] != detailed_status_old
:
4348 detailed_status_old
= stage
[2]
4349 db_nsr_update
["detailed-status"] = " ".join(stage
)
4350 self
._write
_op
_status
(nslcmop_id
, stage
)
4351 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4352 await asyncio
.sleep(5, loop
=self
.loop
)
4354 else: # delete_timeout <= 0:
4355 raise ROclient
.ROClientException(
4356 "Timeout waiting ns deleted from VIM"
4359 except Exception as e
:
4360 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4362 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4364 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4365 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4366 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4368 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4371 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4373 failed_detail
.append("delete conflict: {}".format(e
))
4376 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4379 failed_detail
.append("delete error: {}".format(e
))
4381 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4385 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4386 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4388 stage
[2] = "Deleting nsd from RO."
4389 db_nsr_update
["detailed-status"] = " ".join(stage
)
4390 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4391 self
._write
_op
_status
(nslcmop_id
, stage
)
4392 await self
.RO
.delete("nsd", ro_nsd_id
)
4394 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4396 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4397 except Exception as e
:
4399 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4401 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4403 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4406 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4408 failed_detail
.append(
4409 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4411 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4413 failed_detail
.append(
4414 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4416 self
.logger
.error(logging_text
+ failed_detail
[-1])
4418 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4419 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4420 if not vnf_deployed
or not vnf_deployed
["id"]:
4423 ro_vnfd_id
= vnf_deployed
["id"]
4426 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4427 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4429 db_nsr_update
["detailed-status"] = " ".join(stage
)
4430 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4431 self
._write
_op
_status
(nslcmop_id
, stage
)
4432 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4434 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4436 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4437 except Exception as e
:
4439 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4442 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4446 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4449 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4451 failed_detail
.append(
4452 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4454 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4456 failed_detail
.append(
4457 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4459 self
.logger
.error(logging_text
+ failed_detail
[-1])
4462 stage
[2] = "Error deleting from VIM"
4464 stage
[2] = "Deleted from VIM"
4465 db_nsr_update
["detailed-status"] = " ".join(stage
)
4466 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4467 self
._write
_op
_status
(nslcmop_id
, stage
)
4470 raise LcmException("; ".join(failed_detail
))
4472 async def terminate(self
, nsr_id
, nslcmop_id
):
4473 # Try to lock HA task here
4474 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4475 if not task_is_locked_by_me
:
4478 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4479 self
.logger
.debug(logging_text
+ "Enter")
4480 timeout_ns_terminate
= self
.timeout_ns_terminate
4483 operation_params
= None
4485 error_list
= [] # annotates all failed error messages
4486 db_nslcmop_update
= {}
4487 autoremove
= False # autoremove after terminated
4488 tasks_dict_info
= {}
4491 "Stage 1/3: Preparing task.",
4492 "Waiting for previous operations to terminate.",
4495 # ^ contains [stage, step, VIM-status]
4497 # wait for any previous tasks in process
4498 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4500 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4501 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4502 operation_params
= db_nslcmop
.get("operationParams") or {}
4503 if operation_params
.get("timeout_ns_terminate"):
4504 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4505 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4506 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4508 db_nsr_update
["operational-status"] = "terminating"
4509 db_nsr_update
["config-status"] = "terminating"
4510 self
._write
_ns
_status
(
4512 ns_state
="TERMINATING",
4513 current_operation
="TERMINATING",
4514 current_operation_id
=nslcmop_id
,
4515 other_update
=db_nsr_update
,
4517 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4518 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4519 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4522 stage
[1] = "Getting vnf descriptors from db."
4523 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4525 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4527 db_vnfds_from_id
= {}
4528 db_vnfds_from_member_index
= {}
4530 for vnfr
in db_vnfrs_list
:
4531 vnfd_id
= vnfr
["vnfd-id"]
4532 if vnfd_id
not in db_vnfds_from_id
:
4533 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4534 db_vnfds_from_id
[vnfd_id
] = vnfd
4535 db_vnfds_from_member_index
[
4536 vnfr
["member-vnf-index-ref"]
4537 ] = db_vnfds_from_id
[vnfd_id
]
4539 # Destroy individual execution environments when there are terminating primitives.
4540 # Rest of EE will be deleted at once
4541 # TODO - check before calling _destroy_N2VC
4542 # if not operation_params.get("skip_terminate_primitives"):#
4543 # or not vca.get("needed_terminate"):
4544 stage
[0] = "Stage 2/3 execute terminating primitives."
4545 self
.logger
.debug(logging_text
+ stage
[0])
4546 stage
[1] = "Looking execution environment that needs terminate."
4547 self
.logger
.debug(logging_text
+ stage
[1])
4549 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4550 config_descriptor
= None
4551 vca_member_vnf_index
= vca
.get("member-vnf-index")
4552 vca_id
= self
.get_vca_id(
4553 db_vnfrs_dict
.get(vca_member_vnf_index
)
4554 if vca_member_vnf_index
4558 if not vca
or not vca
.get("ee_id"):
4560 if not vca
.get("member-vnf-index"):
4562 config_descriptor
= db_nsr
.get("ns-configuration")
4563 elif vca
.get("vdu_id"):
4564 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4565 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4566 elif vca
.get("kdu_name"):
4567 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4568 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4570 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4571 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4572 vca_type
= vca
.get("type")
4573 exec_terminate_primitives
= not operation_params
.get(
4574 "skip_terminate_primitives"
4575 ) and vca
.get("needed_terminate")
4576 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4577 # pending native charms
4579 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4581 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4582 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4583 task
= asyncio
.ensure_future(
4591 exec_terminate_primitives
,
4595 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4597 # wait for pending tasks of terminate primitives
4601 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4603 error_list
= await self
._wait
_for
_tasks
(
4606 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4610 tasks_dict_info
.clear()
4612 return # raise LcmException("; ".join(error_list))
4614 # remove All execution environments at once
4615 stage
[0] = "Stage 3/3 delete all."
4617 if nsr_deployed
.get("VCA"):
4618 stage
[1] = "Deleting all execution environments."
4619 self
.logger
.debug(logging_text
+ stage
[1])
4620 vca_id
= self
.get_vca_id({}, db_nsr
)
4621 task_delete_ee
= asyncio
.ensure_future(
4623 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4624 timeout
=self
.timeout_charm_delete
,
4627 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4628 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4630 # Delete from k8scluster
4631 stage
[1] = "Deleting KDUs."
4632 self
.logger
.debug(logging_text
+ stage
[1])
4633 # print(nsr_deployed)
4634 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4635 if not kdu
or not kdu
.get("kdu-instance"):
4637 kdu_instance
= kdu
.get("kdu-instance")
4638 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4639 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4640 vca_id
= self
.get_vca_id({}, db_nsr
)
4641 task_delete_kdu_instance
= asyncio
.ensure_future(
4642 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4643 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4644 kdu_instance
=kdu_instance
,
4646 namespace
=kdu
.get("namespace"),
4652 + "Unknown k8s deployment type {}".format(
4653 kdu
.get("k8scluster-type")
4658 task_delete_kdu_instance
4659 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4662 stage
[1] = "Deleting ns from VIM."
4664 task_delete_ro
= asyncio
.ensure_future(
4665 self
._terminate
_ng
_ro
(
4666 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4670 task_delete_ro
= asyncio
.ensure_future(
4672 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4675 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4677 # rest of staff will be done at finally
4680 ROclient
.ROClientException
,
4685 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4687 except asyncio
.CancelledError
:
4689 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4691 exc
= "Operation was cancelled"
4692 except Exception as e
:
4693 exc
= traceback
.format_exc()
4694 self
.logger
.critical(
4695 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4700 error_list
.append(str(exc
))
4702 # wait for pending tasks
4704 stage
[1] = "Waiting for terminate pending tasks."
4705 self
.logger
.debug(logging_text
+ stage
[1])
4706 error_list
+= await self
._wait
_for
_tasks
(
4709 timeout_ns_terminate
,
4713 stage
[1] = stage
[2] = ""
4714 except asyncio
.CancelledError
:
4715 error_list
.append("Cancelled")
4716 # TODO cancell all tasks
4717 except Exception as exc
:
4718 error_list
.append(str(exc
))
4719 # update status at database
4721 error_detail
= "; ".join(error_list
)
4722 # self.logger.error(logging_text + error_detail)
4723 error_description_nslcmop
= "{} Detail: {}".format(
4724 stage
[0], error_detail
4726 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4727 nslcmop_id
, stage
[0]
4730 db_nsr_update
["operational-status"] = "failed"
4731 db_nsr_update
["detailed-status"] = (
4732 error_description_nsr
+ " Detail: " + error_detail
4734 db_nslcmop_update
["detailed-status"] = error_detail
4735 nslcmop_operation_state
= "FAILED"
4739 error_description_nsr
= error_description_nslcmop
= None
4740 ns_state
= "NOT_INSTANTIATED"
4741 db_nsr_update
["operational-status"] = "terminated"
4742 db_nsr_update
["detailed-status"] = "Done"
4743 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4744 db_nslcmop_update
["detailed-status"] = "Done"
4745 nslcmop_operation_state
= "COMPLETED"
4748 self
._write
_ns
_status
(
4751 current_operation
="IDLE",
4752 current_operation_id
=None,
4753 error_description
=error_description_nsr
,
4754 error_detail
=error_detail
,
4755 other_update
=db_nsr_update
,
4757 self
._write
_op
_status
(
4760 error_message
=error_description_nslcmop
,
4761 operation_state
=nslcmop_operation_state
,
4762 other_update
=db_nslcmop_update
,
4764 if ns_state
== "NOT_INSTANTIATED":
4768 {"nsr-id-ref": nsr_id
},
4769 {"_admin.nsState": "NOT_INSTANTIATED"},
4771 except DbException
as e
:
4774 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4778 if operation_params
:
4779 autoremove
= operation_params
.get("autoremove", False)
4780 if nslcmop_operation_state
:
4782 await self
.msg
.aiowrite(
4787 "nslcmop_id": nslcmop_id
,
4788 "operationState": nslcmop_operation_state
,
4789 "autoremove": autoremove
,
4793 except Exception as e
:
4795 logging_text
+ "kafka_write notification Exception {}".format(e
)
4798 self
.logger
.debug(logging_text
+ "Exit")
4799 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4801 async def _wait_for_tasks(
4802 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4805 error_detail_list
= []
4807 pending_tasks
= list(created_tasks_info
.keys())
4808 num_tasks
= len(pending_tasks
)
4810 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4811 self
._write
_op
_status
(nslcmop_id
, stage
)
4812 while pending_tasks
:
4814 _timeout
= timeout
+ time_start
- time()
4815 done
, pending_tasks
= await asyncio
.wait(
4816 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4818 num_done
+= len(done
)
4819 if not done
: # Timeout
4820 for task
in pending_tasks
:
4821 new_error
= created_tasks_info
[task
] + ": Timeout"
4822 error_detail_list
.append(new_error
)
4823 error_list
.append(new_error
)
4826 if task
.cancelled():
4829 exc
= task
.exception()
4831 if isinstance(exc
, asyncio
.TimeoutError
):
4833 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4834 error_list
.append(created_tasks_info
[task
])
4835 error_detail_list
.append(new_error
)
4842 ROclient
.ROClientException
,
4848 self
.logger
.error(logging_text
+ new_error
)
4850 exc_traceback
= "".join(
4851 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4855 + created_tasks_info
[task
]
4861 logging_text
+ created_tasks_info
[task
] + ": Done"
4863 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4865 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4866 if nsr_id
: # update also nsr
4871 "errorDescription": "Error at: " + ", ".join(error_list
),
4872 "errorDetail": ". ".join(error_detail_list
),
4875 self
._write
_op
_status
(nslcmop_id
, stage
)
4876 return error_detail_list
4879 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4881 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4882 The default-value is used. If it is between < > it look for a value at instantiation_params
4883 :param primitive_desc: portion of VNFD/NSD that describes primitive
4884 :param params: Params provided by user
4885 :param instantiation_params: Instantiation params provided by user
4886 :return: a dictionary with the calculated params
4888 calculated_params
= {}
4889 for parameter
in primitive_desc
.get("parameter", ()):
4890 param_name
= parameter
["name"]
4891 if param_name
in params
:
4892 calculated_params
[param_name
] = params
[param_name
]
4893 elif "default-value" in parameter
or "value" in parameter
:
4894 if "value" in parameter
:
4895 calculated_params
[param_name
] = parameter
["value"]
4897 calculated_params
[param_name
] = parameter
["default-value"]
4899 isinstance(calculated_params
[param_name
], str)
4900 and calculated_params
[param_name
].startswith("<")
4901 and calculated_params
[param_name
].endswith(">")
4903 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4904 calculated_params
[param_name
] = instantiation_params
[
4905 calculated_params
[param_name
][1:-1]
4909 "Parameter {} needed to execute primitive {} not provided".format(
4910 calculated_params
[param_name
], primitive_desc
["name"]
4915 "Parameter {} needed to execute primitive {} not provided".format(
4916 param_name
, primitive_desc
["name"]
4920 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4921 calculated_params
[param_name
] = yaml
.safe_dump(
4922 calculated_params
[param_name
], default_flow_style
=True, width
=256
4924 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4926 ].startswith("!!yaml "):
4927 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4928 if parameter
.get("data-type") == "INTEGER":
4930 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4931 except ValueError: # error converting string to int
4933 "Parameter {} of primitive {} must be integer".format(
4934 param_name
, primitive_desc
["name"]
4937 elif parameter
.get("data-type") == "BOOLEAN":
4938 calculated_params
[param_name
] = not (
4939 (str(calculated_params
[param_name
])).lower() == "false"
4942 # add always ns_config_info if primitive name is config
4943 if primitive_desc
["name"] == "config":
4944 if "ns_config_info" in instantiation_params
:
4945 calculated_params
["ns_config_info"] = instantiation_params
[
4948 return calculated_params
4950 def _look_for_deployed_vca(
4957 ee_descriptor_id
=None,
4959 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4960 for vca
in deployed_vca
:
4963 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4966 vdu_count_index
is not None
4967 and vdu_count_index
!= vca
["vdu_count_index"]
4970 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4972 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4976 # vca_deployed not found
4978 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4979 " is not deployed".format(
4988 ee_id
= vca
.get("ee_id")
4990 "type", "lxc_proxy_charm"
4991 ) # default value for backward compatibility - proxy charm
4994 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4995 "execution environment".format(
4996 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4999 return ee_id
, vca_type
5001 async def _ns_execute_primitive(
5007 retries_interval
=30,
5014 if primitive
== "config":
5015 primitive_params
= {"params": primitive_params
}
5017 vca_type
= vca_type
or "lxc_proxy_charm"
5021 output
= await asyncio
.wait_for(
5022 self
.vca_map
[vca_type
].exec_primitive(
5024 primitive_name
=primitive
,
5025 params_dict
=primitive_params
,
5026 progress_timeout
=self
.timeout_progress_primitive
,
5027 total_timeout
=self
.timeout_primitive
,
5032 timeout
=timeout
or self
.timeout_primitive
,
5036 except asyncio
.CancelledError
:
5038 except Exception as e
:
5042 "Error executing action {} on {} -> {}".format(
5047 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5049 if isinstance(e
, asyncio
.TimeoutError
):
5051 message
="Timed out waiting for action to complete"
5053 return "FAILED", getattr(e
, "message", repr(e
))
5055 return "COMPLETED", output
5057 except (LcmException
, asyncio
.CancelledError
):
5059 except Exception as e
:
5060 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5062 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5064 Updating the vca_status with latest juju information in nsrs record
5065 :param: nsr_id: Id of the nsr
5066 :param: nslcmop_id: Id of the nslcmop
5070 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5071 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5072 vca_id
= self
.get_vca_id({}, db_nsr
)
5073 if db_nsr
["_admin"]["deployed"]["K8s"]:
5074 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5075 cluster_uuid
, kdu_instance
, cluster_type
= (
5076 k8s
["k8scluster-uuid"],
5077 k8s
["kdu-instance"],
5078 k8s
["k8scluster-type"],
5080 await self
._on
_update
_k
8s
_db
(
5081 cluster_uuid
=cluster_uuid
,
5082 kdu_instance
=kdu_instance
,
5083 filter={"_id": nsr_id
},
5085 cluster_type
=cluster_type
,
5088 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5089 table
, filter = "nsrs", {"_id": nsr_id
}
5090 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5091 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5093 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5094 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5096 async def action(self
, nsr_id
, nslcmop_id
):
5097 # Try to lock HA task here
5098 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5099 if not task_is_locked_by_me
:
5102 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5103 self
.logger
.debug(logging_text
+ "Enter")
5104 # get all needed from database
5108 db_nslcmop_update
= {}
5109 nslcmop_operation_state
= None
5110 error_description_nslcmop
= None
5113 # wait for any previous tasks in process
5114 step
= "Waiting for previous operations to terminate"
5115 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5117 self
._write
_ns
_status
(
5120 current_operation
="RUNNING ACTION",
5121 current_operation_id
=nslcmop_id
,
5124 step
= "Getting information from database"
5125 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5126 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5127 if db_nslcmop
["operationParams"].get("primitive_params"):
5128 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5129 db_nslcmop
["operationParams"]["primitive_params"]
5132 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5133 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5134 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5135 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5136 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5137 primitive
= db_nslcmop
["operationParams"]["primitive"]
5138 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5139 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5140 "timeout_ns_action", self
.timeout_primitive
5144 step
= "Getting vnfr from database"
5145 db_vnfr
= self
.db
.get_one(
5146 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5148 if db_vnfr
.get("kdur"):
5150 for kdur
in db_vnfr
["kdur"]:
5151 if kdur
.get("additionalParams"):
5152 kdur
["additionalParams"] = json
.loads(
5153 kdur
["additionalParams"]
5155 kdur_list
.append(kdur
)
5156 db_vnfr
["kdur"] = kdur_list
5157 step
= "Getting vnfd from database"
5158 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5160 # Sync filesystem before running a primitive
5161 self
.fs
.sync(db_vnfr
["vnfd-id"])
5163 step
= "Getting nsd from database"
5164 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5166 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5167 # for backward compatibility
5168 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5169 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5170 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5171 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5173 # look for primitive
5174 config_primitive_desc
= descriptor_configuration
= None
5176 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5178 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5180 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5182 descriptor_configuration
= db_nsd
.get("ns-configuration")
5184 if descriptor_configuration
and descriptor_configuration
.get(
5187 for config_primitive
in descriptor_configuration
["config-primitive"]:
5188 if config_primitive
["name"] == primitive
:
5189 config_primitive_desc
= config_primitive
5192 if not config_primitive_desc
:
5193 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5195 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5199 primitive_name
= primitive
5200 ee_descriptor_id
= None
5202 primitive_name
= config_primitive_desc
.get(
5203 "execution-environment-primitive", primitive
5205 ee_descriptor_id
= config_primitive_desc
.get(
5206 "execution-environment-ref"
5212 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5214 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5217 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5219 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5221 desc_params
= parse_yaml_strings(
5222 db_vnfr
.get("additionalParamsForVnf")
5225 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5226 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5227 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5229 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5230 actions
.add(primitive
["name"])
5231 for primitive
in kdu_configuration
.get("config-primitive", []):
5232 actions
.add(primitive
["name"])
5234 nsr_deployed
["K8s"],
5235 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5236 and kdu
["member-vnf-index"] == vnf_index
,
5240 if primitive_name
in actions
5241 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5245 # TODO check if ns is in a proper status
5247 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5249 # kdur and desc_params already set from before
5250 if primitive_params
:
5251 desc_params
.update(primitive_params
)
5252 # TODO Check if we will need something at vnf level
5253 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5255 kdu_name
== kdu
["kdu-name"]
5256 and kdu
["member-vnf-index"] == vnf_index
5261 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5264 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5265 msg
= "unknown k8scluster-type '{}'".format(
5266 kdu
.get("k8scluster-type")
5268 raise LcmException(msg
)
5271 "collection": "nsrs",
5272 "filter": {"_id": nsr_id
},
5273 "path": "_admin.deployed.K8s.{}".format(index
),
5277 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5279 step
= "Executing kdu {}".format(primitive_name
)
5280 if primitive_name
== "upgrade":
5281 if desc_params
.get("kdu_model"):
5282 kdu_model
= desc_params
.get("kdu_model")
5283 del desc_params
["kdu_model"]
5285 kdu_model
= kdu
.get("kdu-model")
5286 parts
= kdu_model
.split(sep
=":")
5288 kdu_model
= parts
[0]
5289 if desc_params
.get("kdu_atomic_upgrade"):
5290 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5291 del desc_params
["kdu_atomic_upgrade"]
5293 atomic_upgrade
= True
5295 detailed_status
= await asyncio
.wait_for(
5296 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5297 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5298 kdu_instance
=kdu
.get("kdu-instance"),
5299 atomic
=atomic_upgrade
,
5300 kdu_model
=kdu_model
,
5303 timeout
=timeout_ns_action
,
5305 timeout
=timeout_ns_action
+ 10,
5308 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5310 elif primitive_name
== "rollback":
5311 detailed_status
= await asyncio
.wait_for(
5312 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5313 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5314 kdu_instance
=kdu
.get("kdu-instance"),
5317 timeout
=timeout_ns_action
,
5319 elif primitive_name
== "status":
5320 detailed_status
= await asyncio
.wait_for(
5321 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5322 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5323 kdu_instance
=kdu
.get("kdu-instance"),
5326 timeout
=timeout_ns_action
,
5329 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5330 kdu
["kdu-name"], nsr_id
5332 params
= self
._map
_primitive
_params
(
5333 config_primitive_desc
, primitive_params
, desc_params
5336 detailed_status
= await asyncio
.wait_for(
5337 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5338 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5339 kdu_instance
=kdu_instance
,
5340 primitive_name
=primitive_name
,
5343 timeout
=timeout_ns_action
,
5346 timeout
=timeout_ns_action
,
5350 nslcmop_operation_state
= "COMPLETED"
5352 detailed_status
= ""
5353 nslcmop_operation_state
= "FAILED"
5355 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5356 nsr_deployed
["VCA"],
5357 member_vnf_index
=vnf_index
,
5359 vdu_count_index
=vdu_count_index
,
5360 ee_descriptor_id
=ee_descriptor_id
,
5362 for vca_index
, vca_deployed
in enumerate(
5363 db_nsr
["_admin"]["deployed"]["VCA"]
5365 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5367 "collection": "nsrs",
5368 "filter": {"_id": nsr_id
},
5369 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5373 nslcmop_operation_state
,
5375 ) = await self
._ns
_execute
_primitive
(
5377 primitive
=primitive_name
,
5378 primitive_params
=self
._map
_primitive
_params
(
5379 config_primitive_desc
, primitive_params
, desc_params
5381 timeout
=timeout_ns_action
,
5387 db_nslcmop_update
["detailed-status"] = detailed_status
5388 error_description_nslcmop
= (
5389 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5393 + "Done with result {} {}".format(
5394 nslcmop_operation_state
, detailed_status
5397 return # database update is called inside finally
5399 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5400 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5402 except asyncio
.CancelledError
:
5404 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5406 exc
= "Operation was cancelled"
5407 except asyncio
.TimeoutError
:
5408 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5410 except Exception as e
:
5411 exc
= traceback
.format_exc()
5412 self
.logger
.critical(
5413 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5422 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5423 nslcmop_operation_state
= "FAILED"
5425 self
._write
_ns
_status
(
5429 ], # TODO check if degraded. For the moment use previous status
5430 current_operation
="IDLE",
5431 current_operation_id
=None,
5432 # error_description=error_description_nsr,
5433 # error_detail=error_detail,
5434 other_update
=db_nsr_update
,
5437 self
._write
_op
_status
(
5440 error_message
=error_description_nslcmop
,
5441 operation_state
=nslcmop_operation_state
,
5442 other_update
=db_nslcmop_update
,
5445 if nslcmop_operation_state
:
5447 await self
.msg
.aiowrite(
5452 "nslcmop_id": nslcmop_id
,
5453 "operationState": nslcmop_operation_state
,
5457 except Exception as e
:
5459 logging_text
+ "kafka_write notification Exception {}".format(e
)
5461 self
.logger
.debug(logging_text
+ "Exit")
5462 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5463 return nslcmop_operation_state
, detailed_status
5465 async def terminate_vdus(
5466 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5468 """This method terminates VDUs
5471 db_vnfr: VNF instance record
5472 member_vnf_index: VNF index to identify the VDUs to be removed
5473 db_nsr: NS instance record
5474 update_db_nslcmops: Nslcmop update record
5476 vca_scaling_info
= []
5477 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5478 scaling_info
["scaling_direction"] = "IN"
5479 scaling_info
["vdu-delete"] = {}
5480 scaling_info
["kdu-delete"] = {}
5481 db_vdur
= db_vnfr
.get("vdur")
5482 vdur_list
= copy(db_vdur
)
5484 for index
, vdu
in enumerate(vdur_list
):
5485 vca_scaling_info
.append(
5487 "osm_vdu_id": vdu
["vdu-id-ref"],
5488 "member-vnf-index": member_vnf_index
,
5490 "vdu_index": count_index
,
5493 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5494 scaling_info
["vdu"].append(
5496 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5497 "vdu_id": vdu
["vdu-id-ref"],
5501 for interface
in vdu
["interfaces"]:
5502 scaling_info
["vdu"][index
]["interface"].append(
5504 "name": interface
["name"],
5505 "ip_address": interface
["ip-address"],
5506 "mac_address": interface
.get("mac-address"),
5509 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5510 stage
[2] = "Terminating VDUs"
5511 if scaling_info
.get("vdu-delete"):
5512 # scale_process = "RO"
5513 if self
.ro_config
.get("ng"):
5514 await self
._scale
_ng
_ro
(
5523 async def remove_vnf(self
, nsr_id
, nslcmop_id
, vnf_instance_id
):
5524 """This method is to Remove VNF instances from NS.
5527 nsr_id: NS instance id
5528 nslcmop_id: nslcmop id of update
5529 vnf_instance_id: id of the VNF instance to be removed
5532 result: (str, str) COMPLETED/FAILED, details
5536 logging_text
= "Task ns={} update ".format(nsr_id
)
5537 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5538 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5539 if check_vnfr_count
> 1:
5540 stage
= ["", "", ""]
5541 step
= "Getting nslcmop from database"
5543 step
+ " after having waited for previous tasks to be completed"
5545 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5546 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5547 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5548 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5549 """ db_vnfr = self.db.get_one(
5550 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5552 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5553 await self
.terminate_vdus(
5562 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5563 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5564 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get(
5565 "constituent-vnfr-ref"
5567 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5568 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5569 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5570 return "COMPLETED", "Done"
5572 step
= "Terminate VNF Failed with"
5574 "{} Cannot terminate the last VNF in this NS.".format(
5578 except (LcmException
, asyncio
.CancelledError
):
5580 except Exception as e
:
5581 self
.logger
.debug("Error removing VNF {}".format(e
))
5582 return "FAILED", "Error removing VNF {}".format(e
)
5584 async def _ns_redeploy_vnf(
5592 """This method updates and redeploys VNF instances
5595 nsr_id: NS instance id
5596 nslcmop_id: nslcmop id
5597 db_vnfd: VNF descriptor
5598 db_vnfr: VNF instance record
5599 db_nsr: NS instance record
5602 result: (str, str) COMPLETED/FAILED, details
5606 stage
= ["", "", ""]
5607 logging_text
= "Task ns={} update ".format(nsr_id
)
5608 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5609 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5611 # Terminate old VNF resources
5612 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5613 await self
.terminate_vdus(
5622 # old_vnfd_id = db_vnfr["vnfd-id"]
5623 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5624 new_db_vnfd
= db_vnfd
5625 # new_vnfd_ref = new_db_vnfd["id"]
5626 # new_vnfd_id = vnfd_id
5630 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5632 "name": cp
.get("id"),
5633 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5634 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5637 new_vnfr_cp
.append(vnf_cp
)
5638 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5639 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5640 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5642 "revision": latest_vnfd_revision
,
5643 "connection-point": new_vnfr_cp
,
5647 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5648 updated_db_vnfr
= self
.db
.get_one(
5650 {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
},
5653 # Instantiate new VNF resources
5654 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5655 vca_scaling_info
= []
5656 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5657 scaling_info
["scaling_direction"] = "OUT"
5658 scaling_info
["vdu-create"] = {}
5659 scaling_info
["kdu-create"] = {}
5660 vdud_instantiate_list
= db_vnfd
["vdu"]
5661 for index
, vdud
in enumerate(vdud_instantiate_list
):
5662 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
5664 additional_params
= (
5665 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5668 cloud_init_list
= []
5670 # TODO Information of its own ip is not available because db_vnfr is not updated.
5671 additional_params
["OSM"] = get_osm_params(
5672 updated_db_vnfr
, vdud
["id"], 1
5674 cloud_init_list
.append(
5675 self
._parse
_cloud
_init
(
5682 vca_scaling_info
.append(
5684 "osm_vdu_id": vdud
["id"],
5685 "member-vnf-index": member_vnf_index
,
5687 "vdu_index": count_index
,
5690 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5691 if self
.ro_config
.get("ng"):
5693 "New Resources to be deployed: {}".format(scaling_info
)
5695 await self
._scale
_ng
_ro
(
5703 return "COMPLETED", "Done"
5704 except (LcmException
, asyncio
.CancelledError
):
5706 except Exception as e
:
5707 self
.logger
.debug("Error updating VNF {}".format(e
))
5708 return "FAILED", "Error updating VNF {}".format(e
)
5710 async def _ns_charm_upgrade(
5716 timeout
: float = None,
5718 """This method upgrade charms in VNF instances
5721 ee_id: Execution environment id
5722 path: Local path to the charm
5724 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5725 timeout: (Float) Timeout for the ns update operation
5728 result: (str, str) COMPLETED/FAILED, details
5731 charm_type
= charm_type
or "lxc_proxy_charm"
5732 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5736 charm_type
=charm_type
,
5737 timeout
=timeout
or self
.timeout_ns_update
,
5741 return "COMPLETED", output
5743 except (LcmException
, asyncio
.CancelledError
):
5746 except Exception as e
:
5748 self
.logger
.debug("Error upgrading charm {}".format(path
))
5750 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5752 async def update(self
, nsr_id
, nslcmop_id
):
5753 """Update NS according to different update types
5755 This method performs upgrade of VNF instances then updates the revision
5756 number in VNF record
5759 nsr_id: Network service will be updated
5760 nslcmop_id: ns lcm operation id
5763 It may raise DbException, LcmException, N2VCException, K8sException
5766 # Try to lock HA task here
5767 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5768 if not task_is_locked_by_me
:
5771 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5772 self
.logger
.debug(logging_text
+ "Enter")
5774 # Set the required variables to be filled up later
5776 db_nslcmop_update
= {}
5778 nslcmop_operation_state
= None
5780 error_description_nslcmop
= ""
5782 change_type
= "updated"
5783 detailed_status
= ""
5786 # wait for any previous tasks in process
5787 step
= "Waiting for previous operations to terminate"
5788 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5789 self
._write
_ns
_status
(
5792 current_operation
="UPDATING",
5793 current_operation_id
=nslcmop_id
,
5796 step
= "Getting nslcmop from database"
5797 db_nslcmop
= self
.db
.get_one(
5798 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5800 update_type
= db_nslcmop
["operationParams"]["updateType"]
5802 step
= "Getting nsr from database"
5803 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5804 old_operational_status
= db_nsr
["operational-status"]
5805 db_nsr_update
["operational-status"] = "updating"
5806 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5807 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5809 if update_type
== "CHANGE_VNFPKG":
5811 # Get the input parameters given through update request
5812 vnf_instance_id
= db_nslcmop
["operationParams"][
5813 "changeVnfPackageData"
5814 ].get("vnfInstanceId")
5816 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5819 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5821 step
= "Getting vnfr from database"
5822 db_vnfr
= self
.db
.get_one(
5823 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5826 step
= "Getting vnfds from database"
5828 latest_vnfd
= self
.db
.get_one(
5829 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5831 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5834 current_vnf_revision
= db_vnfr
.get("revision", 1)
5835 current_vnfd
= self
.db
.get_one(
5837 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5838 fail_on_empty
=False,
5840 # Charm artifact paths will be filled up later
5842 current_charm_artifact_path
,
5843 target_charm_artifact_path
,
5844 charm_artifact_paths
,
5847 step
= "Checking if revision has changed in VNFD"
5848 if current_vnf_revision
!= latest_vnfd_revision
:
5850 change_type
= "policy_updated"
5852 # There is new revision of VNFD, update operation is required
5853 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5854 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5856 step
= "Removing the VNFD packages if they exist in the local path"
5857 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5858 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5860 step
= "Get the VNFD packages from FSMongo"
5861 self
.fs
.sync(from_path
=latest_vnfd_path
)
5862 self
.fs
.sync(from_path
=current_vnfd_path
)
5865 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5867 base_folder
= latest_vnfd
["_admin"]["storage"]
5869 for charm_index
, charm_deployed
in enumerate(
5870 get_iterable(nsr_deployed
, "VCA")
5872 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5874 # Getting charm-id and charm-type
5875 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5876 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5877 charm_type
= charm_deployed
.get("type")
5880 ee_id
= charm_deployed
.get("ee_id")
5882 step
= "Getting descriptor config"
5883 descriptor_config
= get_configuration(
5884 current_vnfd
, current_vnfd
["id"]
5887 if "execution-environment-list" in descriptor_config
:
5888 ee_list
= descriptor_config
.get(
5889 "execution-environment-list", []
5894 # There could be several charm used in the same VNF
5895 for ee_item
in ee_list
:
5896 if ee_item
.get("juju"):
5898 step
= "Getting charm name"
5899 charm_name
= ee_item
["juju"].get("charm")
5901 step
= "Setting Charm artifact paths"
5902 current_charm_artifact_path
.append(
5903 get_charm_artifact_path(
5907 current_vnf_revision
,
5910 target_charm_artifact_path
.append(
5911 get_charm_artifact_path(
5915 latest_vnfd_revision
,
5919 charm_artifact_paths
= zip(
5920 current_charm_artifact_path
, target_charm_artifact_path
5923 step
= "Checking if software version has changed in VNFD"
5924 if find_software_version(current_vnfd
) != find_software_version(
5928 step
= "Checking if existing VNF has charm"
5929 for current_charm_path
, target_charm_path
in list(
5930 charm_artifact_paths
5932 if current_charm_path
:
5934 "Software version change is not supported as VNF instance {} has charm.".format(
5939 # There is no change in the charm package, then redeploy the VNF
5940 # based on new descriptor
5941 step
= "Redeploying VNF"
5942 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5943 (result
, detailed_status
) = await self
._ns
_redeploy
_vnf
(
5944 nsr_id
, nslcmop_id
, latest_vnfd
, db_vnfr
, db_nsr
5946 if result
== "FAILED":
5947 nslcmop_operation_state
= result
5948 error_description_nslcmop
= detailed_status
5949 db_nslcmop_update
["detailed-status"] = detailed_status
5952 + " step {} Done with result {} {}".format(
5953 step
, nslcmop_operation_state
, detailed_status
5958 step
= "Checking if any charm package has changed or not"
5959 for current_charm_path
, target_charm_path
in list(
5960 charm_artifact_paths
5964 and target_charm_path
5965 and self
.check_charm_hash_changed(
5966 current_charm_path
, target_charm_path
5970 step
= "Checking whether VNF uses juju bundle"
5971 if check_juju_bundle_existence(current_vnfd
):
5974 "Charm upgrade is not supported for the instance which"
5975 " uses juju-bundle: {}".format(
5976 check_juju_bundle_existence(current_vnfd
)
5980 step
= "Upgrading Charm"
5984 ) = await self
._ns
_charm
_upgrade
(
5987 charm_type
=charm_type
,
5988 path
=self
.fs
.path
+ target_charm_path
,
5989 timeout
=timeout_seconds
,
5992 if result
== "FAILED":
5993 nslcmop_operation_state
= result
5994 error_description_nslcmop
= detailed_status
5996 db_nslcmop_update
["detailed-status"] = detailed_status
5999 + " step {} Done with result {} {}".format(
6000 step
, nslcmop_operation_state
, detailed_status
6004 step
= "Updating policies"
6005 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6006 result
= "COMPLETED"
6007 detailed_status
= "Done"
6008 db_nslcmop_update
["detailed-status"] = "Done"
6010 # If nslcmop_operation_state is None, so any operation is not failed.
6011 if not nslcmop_operation_state
:
6012 nslcmop_operation_state
= "COMPLETED"
6014 # If update CHANGE_VNFPKG nslcmop_operation is successful
6015 # vnf revision need to be updated
6016 vnfr_update
["revision"] = latest_vnfd_revision
6017 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
6021 + " task Done with result {} {}".format(
6022 nslcmop_operation_state
, detailed_status
6025 elif update_type
== "REMOVE_VNF":
6026 # This part is included in https://osm.etsi.org/gerrit/11876
6027 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
6028 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
6029 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
6030 step
= "Removing VNF"
6031 (result
, detailed_status
) = await self
.remove_vnf(
6032 nsr_id
, nslcmop_id
, vnf_instance_id
6034 if result
== "FAILED":
6035 nslcmop_operation_state
= result
6036 error_description_nslcmop
= detailed_status
6037 db_nslcmop_update
["detailed-status"] = detailed_status
6038 change_type
= "vnf_terminated"
6039 if not nslcmop_operation_state
:
6040 nslcmop_operation_state
= "COMPLETED"
6043 + " task Done with result {} {}".format(
6044 nslcmop_operation_state
, detailed_status
6048 elif update_type
== "OPERATE_VNF":
6049 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"][
6052 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"][
6055 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"][
6058 (result
, detailed_status
) = await self
.rebuild_start_stop(
6059 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
6061 if result
== "FAILED":
6062 nslcmop_operation_state
= result
6063 error_description_nslcmop
= detailed_status
6064 db_nslcmop_update
["detailed-status"] = detailed_status
6065 if not nslcmop_operation_state
:
6066 nslcmop_operation_state
= "COMPLETED"
6069 + " task Done with result {} {}".format(
6070 nslcmop_operation_state
, detailed_status
6074 # If nslcmop_operation_state is None, so any operation is not failed.
6075 # All operations are executed in overall.
6076 if not nslcmop_operation_state
:
6077 nslcmop_operation_state
= "COMPLETED"
6078 db_nsr_update
["operational-status"] = old_operational_status
6080 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6081 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6083 except asyncio
.CancelledError
:
6085 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6087 exc
= "Operation was cancelled"
6088 except asyncio
.TimeoutError
:
6089 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6091 except Exception as e
:
6092 exc
= traceback
.format_exc()
6093 self
.logger
.critical(
6094 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6103 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6104 nslcmop_operation_state
= "FAILED"
6105 db_nsr_update
["operational-status"] = old_operational_status
6107 self
._write
_ns
_status
(
6109 ns_state
=db_nsr
["nsState"],
6110 current_operation
="IDLE",
6111 current_operation_id
=None,
6112 other_update
=db_nsr_update
,
6115 self
._write
_op
_status
(
6118 error_message
=error_description_nslcmop
,
6119 operation_state
=nslcmop_operation_state
,
6120 other_update
=db_nslcmop_update
,
6123 if nslcmop_operation_state
:
6127 "nslcmop_id": nslcmop_id
,
6128 "operationState": nslcmop_operation_state
,
6130 if change_type
in ("vnf_terminated", "policy_updated"):
6131 msg
.update({"vnf_member_index": member_vnf_index
})
6132 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6133 except Exception as e
:
6135 logging_text
+ "kafka_write notification Exception {}".format(e
)
6137 self
.logger
.debug(logging_text
+ "Exit")
6138 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6139 return nslcmop_operation_state
, detailed_status
6141 async def scale(self
, nsr_id
, nslcmop_id
):
6142 # Try to lock HA task here
6143 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6144 if not task_is_locked_by_me
:
6147 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6148 stage
= ["", "", ""]
6149 tasks_dict_info
= {}
6150 # ^ stage, step, VIM progress
6151 self
.logger
.debug(logging_text
+ "Enter")
6152 # get all needed from database
6154 db_nslcmop_update
= {}
6157 # in case of error, indicates what part of scale was failed to put nsr at error status
6158 scale_process
= None
6159 old_operational_status
= ""
6160 old_config_status
= ""
6163 # wait for any previous tasks in process
6164 step
= "Waiting for previous operations to terminate"
6165 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6166 self
._write
_ns
_status
(
6169 current_operation
="SCALING",
6170 current_operation_id
=nslcmop_id
,
6173 step
= "Getting nslcmop from database"
6175 step
+ " after having waited for previous tasks to be completed"
6177 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6179 step
= "Getting nsr from database"
6180 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6181 old_operational_status
= db_nsr
["operational-status"]
6182 old_config_status
= db_nsr
["config-status"]
6184 step
= "Parsing scaling parameters"
6185 db_nsr_update
["operational-status"] = "scaling"
6186 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6187 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6189 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6191 ]["member-vnf-index"]
6192 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6194 ]["scaling-group-descriptor"]
6195 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6196 # for backward compatibility
6197 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6198 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6199 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6200 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6202 step
= "Getting vnfr from database"
6203 db_vnfr
= self
.db
.get_one(
6204 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6207 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6209 step
= "Getting vnfd from database"
6210 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6212 base_folder
= db_vnfd
["_admin"]["storage"]
6214 step
= "Getting scaling-group-descriptor"
6215 scaling_descriptor
= find_in_list(
6216 get_scaling_aspect(db_vnfd
),
6217 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6219 if not scaling_descriptor
:
6221 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6222 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6225 step
= "Sending scale order to VIM"
6226 # TODO check if ns is in a proper status
6228 if not db_nsr
["_admin"].get("scaling-group"):
6233 "_admin.scaling-group": [
6234 {"name": scaling_group
, "nb-scale-op": 0}
6238 admin_scale_index
= 0
6240 for admin_scale_index
, admin_scale_info
in enumerate(
6241 db_nsr
["_admin"]["scaling-group"]
6243 if admin_scale_info
["name"] == scaling_group
:
6244 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6246 else: # not found, set index one plus last element and add new entry with the name
6247 admin_scale_index
+= 1
6249 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6252 vca_scaling_info
= []
6253 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6254 if scaling_type
== "SCALE_OUT":
6255 if "aspect-delta-details" not in scaling_descriptor
:
6257 "Aspect delta details not fount in scaling descriptor {}".format(
6258 scaling_descriptor
["name"]
6261 # count if max-instance-count is reached
6262 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6264 scaling_info
["scaling_direction"] = "OUT"
6265 scaling_info
["vdu-create"] = {}
6266 scaling_info
["kdu-create"] = {}
6267 for delta
in deltas
:
6268 for vdu_delta
in delta
.get("vdu-delta", {}):
6269 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6270 # vdu_index also provides the number of instance of the targeted vdu
6271 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6272 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6276 additional_params
= (
6277 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6280 cloud_init_list
= []
6282 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6283 max_instance_count
= 10
6284 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6285 max_instance_count
= vdu_profile
.get(
6286 "max-number-of-instances", 10
6289 default_instance_num
= get_number_of_instances(
6292 instances_number
= vdu_delta
.get("number-of-instances", 1)
6293 nb_scale_op
+= instances_number
6295 new_instance_count
= nb_scale_op
+ default_instance_num
6296 # Control if new count is over max and vdu count is less than max.
6297 # Then assign new instance count
6298 if new_instance_count
> max_instance_count
> vdu_count
:
6299 instances_number
= new_instance_count
- max_instance_count
6301 instances_number
= instances_number
6303 if new_instance_count
> max_instance_count
:
6305 "reached the limit of {} (max-instance-count) "
6306 "scaling-out operations for the "
6307 "scaling-group-descriptor '{}'".format(
6308 nb_scale_op
, scaling_group
6311 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6313 # TODO Information of its own ip is not available because db_vnfr is not updated.
6314 additional_params
["OSM"] = get_osm_params(
6315 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6317 cloud_init_list
.append(
6318 self
._parse
_cloud
_init
(
6325 vca_scaling_info
.append(
6327 "osm_vdu_id": vdu_delta
["id"],
6328 "member-vnf-index": vnf_index
,
6330 "vdu_index": vdu_index
+ x
,
6333 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6334 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6335 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6336 kdu_name
= kdu_profile
["kdu-name"]
6337 resource_name
= kdu_profile
.get("resource-name", "")
6339 # Might have different kdus in the same delta
6340 # Should have list for each kdu
6341 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6342 scaling_info
["kdu-create"][kdu_name
] = []
6344 kdur
= get_kdur(db_vnfr
, kdu_name
)
6345 if kdur
.get("helm-chart"):
6346 k8s_cluster_type
= "helm-chart-v3"
6347 self
.logger
.debug("kdur: {}".format(kdur
))
6349 kdur
.get("helm-version")
6350 and kdur
.get("helm-version") == "v2"
6352 k8s_cluster_type
= "helm-chart"
6353 elif kdur
.get("juju-bundle"):
6354 k8s_cluster_type
= "juju-bundle"
6357 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6358 "juju-bundle. Maybe an old NBI version is running".format(
6359 db_vnfr
["member-vnf-index-ref"], kdu_name
6363 max_instance_count
= 10
6364 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6365 max_instance_count
= kdu_profile
.get(
6366 "max-number-of-instances", 10
6369 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6370 deployed_kdu
, _
= get_deployed_kdu(
6371 nsr_deployed
, kdu_name
, vnf_index
6373 if deployed_kdu
is None:
6375 "KDU '{}' for vnf '{}' not deployed".format(
6379 kdu_instance
= deployed_kdu
.get("kdu-instance")
6380 instance_num
= await self
.k8scluster_map
[
6386 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6387 kdu_model
=deployed_kdu
.get("kdu-model"),
6389 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6390 "number-of-instances", 1
6393 # Control if new count is over max and instance_num is less than max.
6394 # Then assign max instance number to kdu replica count
6395 if kdu_replica_count
> max_instance_count
> instance_num
:
6396 kdu_replica_count
= max_instance_count
6397 if kdu_replica_count
> max_instance_count
:
6399 "reached the limit of {} (max-instance-count) "
6400 "scaling-out operations for the "
6401 "scaling-group-descriptor '{}'".format(
6402 instance_num
, scaling_group
6406 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6407 vca_scaling_info
.append(
6409 "osm_kdu_id": kdu_name
,
6410 "member-vnf-index": vnf_index
,
6412 "kdu_index": instance_num
+ x
- 1,
6415 scaling_info
["kdu-create"][kdu_name
].append(
6417 "member-vnf-index": vnf_index
,
6419 "k8s-cluster-type": k8s_cluster_type
,
6420 "resource-name": resource_name
,
6421 "scale": kdu_replica_count
,
6424 elif scaling_type
== "SCALE_IN":
6425 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6427 scaling_info
["scaling_direction"] = "IN"
6428 scaling_info
["vdu-delete"] = {}
6429 scaling_info
["kdu-delete"] = {}
6431 for delta
in deltas
:
6432 for vdu_delta
in delta
.get("vdu-delta", {}):
6433 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6434 min_instance_count
= 0
6435 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6436 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6437 min_instance_count
= vdu_profile
["min-number-of-instances"]
6439 default_instance_num
= get_number_of_instances(
6440 db_vnfd
, vdu_delta
["id"]
6442 instance_num
= vdu_delta
.get("number-of-instances", 1)
6443 nb_scale_op
-= instance_num
6445 new_instance_count
= nb_scale_op
+ default_instance_num
6447 if new_instance_count
< min_instance_count
< vdu_count
:
6448 instances_number
= min_instance_count
- new_instance_count
6450 instances_number
= instance_num
6452 if new_instance_count
< min_instance_count
:
6454 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6455 "scaling-group-descriptor '{}'".format(
6456 nb_scale_op
, scaling_group
6459 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6460 vca_scaling_info
.append(
6462 "osm_vdu_id": vdu_delta
["id"],
6463 "member-vnf-index": vnf_index
,
6465 "vdu_index": vdu_index
- 1 - x
,
6468 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6469 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6470 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6471 kdu_name
= kdu_profile
["kdu-name"]
6472 resource_name
= kdu_profile
.get("resource-name", "")
6474 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6475 scaling_info
["kdu-delete"][kdu_name
] = []
6477 kdur
= get_kdur(db_vnfr
, kdu_name
)
6478 if kdur
.get("helm-chart"):
6479 k8s_cluster_type
= "helm-chart-v3"
6480 self
.logger
.debug("kdur: {}".format(kdur
))
6482 kdur
.get("helm-version")
6483 and kdur
.get("helm-version") == "v2"
6485 k8s_cluster_type
= "helm-chart"
6486 elif kdur
.get("juju-bundle"):
6487 k8s_cluster_type
= "juju-bundle"
6490 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6491 "juju-bundle. Maybe an old NBI version is running".format(
6492 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6496 min_instance_count
= 0
6497 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6498 min_instance_count
= kdu_profile
["min-number-of-instances"]
6500 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6501 deployed_kdu
, _
= get_deployed_kdu(
6502 nsr_deployed
, kdu_name
, vnf_index
6504 if deployed_kdu
is None:
6506 "KDU '{}' for vnf '{}' not deployed".format(
6510 kdu_instance
= deployed_kdu
.get("kdu-instance")
6511 instance_num
= await self
.k8scluster_map
[
6517 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6518 kdu_model
=deployed_kdu
.get("kdu-model"),
6520 kdu_replica_count
= instance_num
- kdu_delta
.get(
6521 "number-of-instances", 1
6524 if kdu_replica_count
< min_instance_count
< instance_num
:
6525 kdu_replica_count
= min_instance_count
6526 if kdu_replica_count
< min_instance_count
:
6528 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6529 "scaling-group-descriptor '{}'".format(
6530 instance_num
, scaling_group
6534 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6535 vca_scaling_info
.append(
6537 "osm_kdu_id": kdu_name
,
6538 "member-vnf-index": vnf_index
,
6540 "kdu_index": instance_num
- x
- 1,
6543 scaling_info
["kdu-delete"][kdu_name
].append(
6545 "member-vnf-index": vnf_index
,
6547 "k8s-cluster-type": k8s_cluster_type
,
6548 "resource-name": resource_name
,
6549 "scale": kdu_replica_count
,
6553 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6554 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6555 if scaling_info
["scaling_direction"] == "IN":
6556 for vdur
in reversed(db_vnfr
["vdur"]):
6557 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6558 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6559 scaling_info
["vdu"].append(
6561 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6562 "vdu_id": vdur
["vdu-id-ref"],
6566 for interface
in vdur
["interfaces"]:
6567 scaling_info
["vdu"][-1]["interface"].append(
6569 "name": interface
["name"],
6570 "ip_address": interface
["ip-address"],
6571 "mac_address": interface
.get("mac-address"),
6574 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6577 step
= "Executing pre-scale vnf-config-primitive"
6578 if scaling_descriptor
.get("scaling-config-action"):
6579 for scaling_config_action
in scaling_descriptor
[
6580 "scaling-config-action"
6583 scaling_config_action
.get("trigger") == "pre-scale-in"
6584 and scaling_type
== "SCALE_IN"
6586 scaling_config_action
.get("trigger") == "pre-scale-out"
6587 and scaling_type
== "SCALE_OUT"
6589 vnf_config_primitive
= scaling_config_action
[
6590 "vnf-config-primitive-name-ref"
6592 step
= db_nslcmop_update
[
6594 ] = "executing pre-scale scaling-config-action '{}'".format(
6595 vnf_config_primitive
6598 # look for primitive
6599 for config_primitive
in (
6600 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6601 ).get("config-primitive", ()):
6602 if config_primitive
["name"] == vnf_config_primitive
:
6606 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6607 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6608 "primitive".format(scaling_group
, vnf_config_primitive
)
6611 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6612 if db_vnfr
.get("additionalParamsForVnf"):
6613 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6615 scale_process
= "VCA"
6616 db_nsr_update
["config-status"] = "configuring pre-scaling"
6617 primitive_params
= self
._map
_primitive
_params
(
6618 config_primitive
, {}, vnfr_params
6621 # Pre-scale retry check: Check if this sub-operation has been executed before
6622 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6625 vnf_config_primitive
,
6629 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6630 # Skip sub-operation
6631 result
= "COMPLETED"
6632 result_detail
= "Done"
6635 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6636 vnf_config_primitive
, result
, result_detail
6640 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6641 # New sub-operation: Get index of this sub-operation
6643 len(db_nslcmop
.get("_admin", {}).get("operations"))
6648 + "vnf_config_primitive={} New sub-operation".format(
6649 vnf_config_primitive
6653 # retry: Get registered params for this existing sub-operation
6654 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6657 vnf_index
= op
.get("member_vnf_index")
6658 vnf_config_primitive
= op
.get("primitive")
6659 primitive_params
= op
.get("primitive_params")
6662 + "vnf_config_primitive={} Sub-operation retry".format(
6663 vnf_config_primitive
6666 # Execute the primitive, either with new (first-time) or registered (reintent) args
6667 ee_descriptor_id
= config_primitive
.get(
6668 "execution-environment-ref"
6670 primitive_name
= config_primitive
.get(
6671 "execution-environment-primitive", vnf_config_primitive
6673 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6674 nsr_deployed
["VCA"],
6675 member_vnf_index
=vnf_index
,
6677 vdu_count_index
=None,
6678 ee_descriptor_id
=ee_descriptor_id
,
6680 result
, result_detail
= await self
._ns
_execute
_primitive
(
6689 + "vnf_config_primitive={} Done with result {} {}".format(
6690 vnf_config_primitive
, result
, result_detail
6693 # Update operationState = COMPLETED | FAILED
6694 self
._update
_suboperation
_status
(
6695 db_nslcmop
, op_index
, result
, result_detail
6698 if result
== "FAILED":
6699 raise LcmException(result_detail
)
6700 db_nsr_update
["config-status"] = old_config_status
6701 scale_process
= None
6705 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6708 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6711 # SCALE-IN VCA - BEGIN
6712 if vca_scaling_info
:
6713 step
= db_nslcmop_update
[
6715 ] = "Deleting the execution environments"
6716 scale_process
= "VCA"
6717 for vca_info
in vca_scaling_info
:
6718 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6719 member_vnf_index
= str(vca_info
["member-vnf-index"])
6721 logging_text
+ "vdu info: {}".format(vca_info
)
6723 if vca_info
.get("osm_vdu_id"):
6724 vdu_id
= vca_info
["osm_vdu_id"]
6725 vdu_index
= int(vca_info
["vdu_index"])
6728 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6729 member_vnf_index
, vdu_id
, vdu_index
6731 stage
[2] = step
= "Scaling in VCA"
6732 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6733 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6734 config_update
= db_nsr
["configurationStatus"]
6735 for vca_index
, vca
in enumerate(vca_update
):
6737 (vca
or vca
.get("ee_id"))
6738 and vca
["member-vnf-index"] == member_vnf_index
6739 and vca
["vdu_count_index"] == vdu_index
6741 if vca
.get("vdu_id"):
6742 config_descriptor
= get_configuration(
6743 db_vnfd
, vca
.get("vdu_id")
6745 elif vca
.get("kdu_name"):
6746 config_descriptor
= get_configuration(
6747 db_vnfd
, vca
.get("kdu_name")
6750 config_descriptor
= get_configuration(
6751 db_vnfd
, db_vnfd
["id"]
6753 operation_params
= (
6754 db_nslcmop
.get("operationParams") or {}
6756 exec_terminate_primitives
= not operation_params
.get(
6757 "skip_terminate_primitives"
6758 ) and vca
.get("needed_terminate")
6759 task
= asyncio
.ensure_future(
6768 exec_primitives
=exec_terminate_primitives
,
6772 timeout
=self
.timeout_charm_delete
,
6775 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6778 del vca_update
[vca_index
]
6779 del config_update
[vca_index
]
6780 # wait for pending tasks of terminate primitives
6784 + "Waiting for tasks {}".format(
6785 list(tasks_dict_info
.keys())
6788 error_list
= await self
._wait
_for
_tasks
(
6792 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6797 tasks_dict_info
.clear()
6799 raise LcmException("; ".join(error_list
))
6801 db_vca_and_config_update
= {
6802 "_admin.deployed.VCA": vca_update
,
6803 "configurationStatus": config_update
,
6806 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6808 scale_process
= None
6809 # SCALE-IN VCA - END
6812 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6813 scale_process
= "RO"
6814 if self
.ro_config
.get("ng"):
6815 await self
._scale
_ng
_ro
(
6816 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6818 scaling_info
.pop("vdu-create", None)
6819 scaling_info
.pop("vdu-delete", None)
6821 scale_process
= None
6825 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6826 scale_process
= "KDU"
6827 await self
._scale
_kdu
(
6828 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6830 scaling_info
.pop("kdu-create", None)
6831 scaling_info
.pop("kdu-delete", None)
6833 scale_process
= None
6837 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6839 # SCALE-UP VCA - BEGIN
6840 if vca_scaling_info
:
6841 step
= db_nslcmop_update
[
6843 ] = "Creating new execution environments"
6844 scale_process
= "VCA"
6845 for vca_info
in vca_scaling_info
:
6846 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6847 member_vnf_index
= str(vca_info
["member-vnf-index"])
6849 logging_text
+ "vdu info: {}".format(vca_info
)
6851 vnfd_id
= db_vnfr
["vnfd-ref"]
6852 if vca_info
.get("osm_vdu_id"):
6853 vdu_index
= int(vca_info
["vdu_index"])
6854 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6855 if db_vnfr
.get("additionalParamsForVnf"):
6856 deploy_params
.update(
6858 db_vnfr
["additionalParamsForVnf"].copy()
6861 descriptor_config
= get_configuration(
6862 db_vnfd
, db_vnfd
["id"]
6864 if descriptor_config
:
6869 logging_text
=logging_text
6870 + "member_vnf_index={} ".format(member_vnf_index
),
6873 nslcmop_id
=nslcmop_id
,
6879 member_vnf_index
=member_vnf_index
,
6880 vdu_index
=vdu_index
,
6882 deploy_params
=deploy_params
,
6883 descriptor_config
=descriptor_config
,
6884 base_folder
=base_folder
,
6885 task_instantiation_info
=tasks_dict_info
,
6888 vdu_id
= vca_info
["osm_vdu_id"]
6889 vdur
= find_in_list(
6890 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6892 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6893 if vdur
.get("additionalParams"):
6894 deploy_params_vdu
= parse_yaml_strings(
6895 vdur
["additionalParams"]
6898 deploy_params_vdu
= deploy_params
6899 deploy_params_vdu
["OSM"] = get_osm_params(
6900 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6902 if descriptor_config
:
6907 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6908 member_vnf_index
, vdu_id
, vdu_index
6910 stage
[2] = step
= "Scaling out VCA"
6911 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6913 logging_text
=logging_text
6914 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6915 member_vnf_index
, vdu_id
, vdu_index
6919 nslcmop_id
=nslcmop_id
,
6925 member_vnf_index
=member_vnf_index
,
6926 vdu_index
=vdu_index
,
6928 deploy_params
=deploy_params_vdu
,
6929 descriptor_config
=descriptor_config
,
6930 base_folder
=base_folder
,
6931 task_instantiation_info
=tasks_dict_info
,
6934 # SCALE-UP VCA - END
6935 scale_process
= None
6938 # execute primitive service POST-SCALING
6939 step
= "Executing post-scale vnf-config-primitive"
6940 if scaling_descriptor
.get("scaling-config-action"):
6941 for scaling_config_action
in scaling_descriptor
[
6942 "scaling-config-action"
6945 scaling_config_action
.get("trigger") == "post-scale-in"
6946 and scaling_type
== "SCALE_IN"
6948 scaling_config_action
.get("trigger") == "post-scale-out"
6949 and scaling_type
== "SCALE_OUT"
6951 vnf_config_primitive
= scaling_config_action
[
6952 "vnf-config-primitive-name-ref"
6954 step
= db_nslcmop_update
[
6956 ] = "executing post-scale scaling-config-action '{}'".format(
6957 vnf_config_primitive
6960 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6961 if db_vnfr
.get("additionalParamsForVnf"):
6962 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6964 # look for primitive
6965 for config_primitive
in (
6966 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6967 ).get("config-primitive", ()):
6968 if config_primitive
["name"] == vnf_config_primitive
:
6972 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6973 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6974 "config-primitive".format(
6975 scaling_group
, vnf_config_primitive
6978 scale_process
= "VCA"
6979 db_nsr_update
["config-status"] = "configuring post-scaling"
6980 primitive_params
= self
._map
_primitive
_params
(
6981 config_primitive
, {}, vnfr_params
6984 # Post-scale retry check: Check if this sub-operation has been executed before
6985 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6988 vnf_config_primitive
,
6992 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6993 # Skip sub-operation
6994 result
= "COMPLETED"
6995 result_detail
= "Done"
6998 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6999 vnf_config_primitive
, result
, result_detail
7003 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
7004 # New sub-operation: Get index of this sub-operation
7006 len(db_nslcmop
.get("_admin", {}).get("operations"))
7011 + "vnf_config_primitive={} New sub-operation".format(
7012 vnf_config_primitive
7016 # retry: Get registered params for this existing sub-operation
7017 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
7020 vnf_index
= op
.get("member_vnf_index")
7021 vnf_config_primitive
= op
.get("primitive")
7022 primitive_params
= op
.get("primitive_params")
7025 + "vnf_config_primitive={} Sub-operation retry".format(
7026 vnf_config_primitive
7029 # Execute the primitive, either with new (first-time) or registered (reintent) args
7030 ee_descriptor_id
= config_primitive
.get(
7031 "execution-environment-ref"
7033 primitive_name
= config_primitive
.get(
7034 "execution-environment-primitive", vnf_config_primitive
7036 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
7037 nsr_deployed
["VCA"],
7038 member_vnf_index
=vnf_index
,
7040 vdu_count_index
=None,
7041 ee_descriptor_id
=ee_descriptor_id
,
7043 result
, result_detail
= await self
._ns
_execute
_primitive
(
7052 + "vnf_config_primitive={} Done with result {} {}".format(
7053 vnf_config_primitive
, result
, result_detail
7056 # Update operationState = COMPLETED | FAILED
7057 self
._update
_suboperation
_status
(
7058 db_nslcmop
, op_index
, result
, result_detail
7061 if result
== "FAILED":
7062 raise LcmException(result_detail
)
7063 db_nsr_update
["config-status"] = old_config_status
7064 scale_process
= None
7069 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7070 db_nsr_update
["operational-status"] = (
7072 if old_operational_status
== "failed"
7073 else old_operational_status
7075 db_nsr_update
["config-status"] = old_config_status
7078 ROclient
.ROClientException
,
7083 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7085 except asyncio
.CancelledError
:
7087 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7089 exc
= "Operation was cancelled"
7090 except Exception as e
:
7091 exc
= traceback
.format_exc()
7092 self
.logger
.critical(
7093 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7097 self
._write
_ns
_status
(
7100 current_operation
="IDLE",
7101 current_operation_id
=None,
7104 stage
[1] = "Waiting for instantiate pending tasks."
7105 self
.logger
.debug(logging_text
+ stage
[1])
7106 exc
= await self
._wait
_for
_tasks
(
7109 self
.timeout_ns_deploy
,
7117 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7118 nslcmop_operation_state
= "FAILED"
7120 db_nsr_update
["operational-status"] = old_operational_status
7121 db_nsr_update
["config-status"] = old_config_status
7122 db_nsr_update
["detailed-status"] = ""
7124 if "VCA" in scale_process
:
7125 db_nsr_update
["config-status"] = "failed"
7126 if "RO" in scale_process
:
7127 db_nsr_update
["operational-status"] = "failed"
7130 ] = "FAILED scaling nslcmop={} {}: {}".format(
7131 nslcmop_id
, step
, exc
7134 error_description_nslcmop
= None
7135 nslcmop_operation_state
= "COMPLETED"
7136 db_nslcmop_update
["detailed-status"] = "Done"
7138 self
._write
_op
_status
(
7141 error_message
=error_description_nslcmop
,
7142 operation_state
=nslcmop_operation_state
,
7143 other_update
=db_nslcmop_update
,
7146 self
._write
_ns
_status
(
7149 current_operation
="IDLE",
7150 current_operation_id
=None,
7151 other_update
=db_nsr_update
,
7154 if nslcmop_operation_state
:
7158 "nslcmop_id": nslcmop_id
,
7159 "operationState": nslcmop_operation_state
,
7161 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7162 except Exception as e
:
7164 logging_text
+ "kafka_write notification Exception {}".format(e
)
7166 self
.logger
.debug(logging_text
+ "Exit")
7167 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7169 async def _scale_kdu(
7170 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7172 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7173 for kdu_name
in _scaling_info
:
7174 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7175 deployed_kdu
, index
= get_deployed_kdu(
7176 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7178 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7179 kdu_instance
= deployed_kdu
["kdu-instance"]
7180 kdu_model
= deployed_kdu
.get("kdu-model")
7181 scale
= int(kdu_scaling_info
["scale"])
7182 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7185 "collection": "nsrs",
7186 "filter": {"_id": nsr_id
},
7187 "path": "_admin.deployed.K8s.{}".format(index
),
7190 step
= "scaling application {}".format(
7191 kdu_scaling_info
["resource-name"]
7193 self
.logger
.debug(logging_text
+ step
)
7195 if kdu_scaling_info
["type"] == "delete":
7196 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7199 and kdu_config
.get("terminate-config-primitive")
7200 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7202 terminate_config_primitive_list
= kdu_config
.get(
7203 "terminate-config-primitive"
7205 terminate_config_primitive_list
.sort(
7206 key
=lambda val
: int(val
["seq"])
7210 terminate_config_primitive
7211 ) in terminate_config_primitive_list
:
7212 primitive_params_
= self
._map
_primitive
_params
(
7213 terminate_config_primitive
, {}, {}
7215 step
= "execute terminate config primitive"
7216 self
.logger
.debug(logging_text
+ step
)
7217 await asyncio
.wait_for(
7218 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7219 cluster_uuid
=cluster_uuid
,
7220 kdu_instance
=kdu_instance
,
7221 primitive_name
=terminate_config_primitive
["name"],
7222 params
=primitive_params_
,
7224 total_timeout
=self
.timeout_primitive
,
7227 timeout
=self
.timeout_primitive
7228 * self
.timeout_primitive_outer_factor
,
7231 await asyncio
.wait_for(
7232 self
.k8scluster_map
[k8s_cluster_type
].scale(
7233 kdu_instance
=kdu_instance
,
7235 resource_name
=kdu_scaling_info
["resource-name"],
7236 total_timeout
=self
.timeout_scale_on_error
,
7238 cluster_uuid
=cluster_uuid
,
7239 kdu_model
=kdu_model
,
7243 timeout
=self
.timeout_scale_on_error
7244 * self
.timeout_scale_on_error_outer_factor
,
7247 if kdu_scaling_info
["type"] == "create":
7248 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7251 and kdu_config
.get("initial-config-primitive")
7252 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7254 initial_config_primitive_list
= kdu_config
.get(
7255 "initial-config-primitive"
7257 initial_config_primitive_list
.sort(
7258 key
=lambda val
: int(val
["seq"])
7261 for initial_config_primitive
in initial_config_primitive_list
:
7262 primitive_params_
= self
._map
_primitive
_params
(
7263 initial_config_primitive
, {}, {}
7265 step
= "execute initial config primitive"
7266 self
.logger
.debug(logging_text
+ step
)
7267 await asyncio
.wait_for(
7268 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7269 cluster_uuid
=cluster_uuid
,
7270 kdu_instance
=kdu_instance
,
7271 primitive_name
=initial_config_primitive
["name"],
7272 params
=primitive_params_
,
7279 async def _scale_ng_ro(
7280 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7282 nsr_id
= db_nslcmop
["nsInstanceId"]
7283 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7286 # read from db: vnfd's for every vnf
7289 # for each vnf in ns, read vnfd
7290 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7291 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7292 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7293 # if we haven't this vnfd, read it from db
7294 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7296 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7297 db_vnfds
.append(vnfd
)
7298 n2vc_key
= self
.n2vc
.get_public_key()
7299 n2vc_key_list
= [n2vc_key
]
7302 vdu_scaling_info
.get("vdu-create"),
7303 vdu_scaling_info
.get("vdu-delete"),
7306 # db_vnfr has been updated, update db_vnfrs to use it
7307 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7308 await self
._instantiate
_ng
_ro
(
7318 start_deploy
=time(),
7319 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7321 if vdu_scaling_info
.get("vdu-delete"):
7323 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7326 async def extract_prometheus_scrape_jobs(
7327 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7329 # look if exist a file called 'prometheus*.j2' and
7330 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7334 for f
in artifact_content
7335 if f
.startswith("prometheus") and f
.endswith(".j2")
7341 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7345 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7346 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7348 vnfr_id
= vnfr_id
.replace("-", "")
7350 "JOB_NAME": vnfr_id
,
7351 "TARGET_IP": target_ip
,
7352 "EXPORTER_POD_IP": host_name
,
7353 "EXPORTER_POD_PORT": host_port
,
7355 job_list
= parse_job(job_data
, variables
)
7356 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7357 for job
in job_list
:
7359 not isinstance(job
.get("job_name"), str)
7360 or vnfr_id
not in job
["job_name"]
7362 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7363 job
["nsr_id"] = nsr_id
7364 job
["vnfr_id"] = vnfr_id
7367 async def rebuild_start_stop(
7368 self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
7370 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7371 self
.logger
.info(logging_text
+ "Enter")
7372 stage
= ["Preparing the environment", ""]
7373 # database nsrs record
7377 # in case of error, indicates what part of scale was failed to put nsr at error status
7378 start_deploy
= time()
7380 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7381 vim_account_id
= db_vnfr
.get("vim-account-id")
7382 vim_info_key
= "vim:" + vim_account_id
7383 vdu_id
= additional_param
["vdu_id"]
7384 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7385 vdur
= find_in_list(
7386 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7389 vdu_vim_name
= vdur
["name"]
7390 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7391 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7393 raise LcmException("Target vdu is not found")
7394 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7395 # wait for any previous tasks in process
7396 stage
[1] = "Waiting for previous operations to terminate"
7397 self
.logger
.info(stage
[1])
7398 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7400 stage
[1] = "Reading from database."
7401 self
.logger
.info(stage
[1])
7402 self
._write
_ns
_status
(
7405 current_operation
=operation_type
.upper(),
7406 current_operation_id
=nslcmop_id
,
7408 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7411 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7412 db_nsr_update
["operational-status"] = operation_type
7413 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7417 "vim_vm_id": vim_vm_id
,
7419 "vdu_index": additional_param
["count-index"],
7420 "vdu_id": vdur
["id"],
7421 "target_vim": target_vim
,
7422 "vim_account_id": vim_account_id
,
7425 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7426 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7427 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7428 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7429 self
.logger
.info("response from RO: {}".format(result_dict
))
7430 action_id
= result_dict
["action_id"]
7431 await self
._wait
_ng
_ro
(
7436 self
.timeout_operate
,
7438 "start_stop_rebuild",
7440 return "COMPLETED", "Done"
7441 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7442 self
.logger
.error("Exit Exception {}".format(e
))
7444 except asyncio
.CancelledError
:
7445 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7446 exc
= "Operation was cancelled"
7447 except Exception as e
:
7448 exc
= traceback
.format_exc()
7449 self
.logger
.critical(
7450 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7452 return "FAILED", "Error in operate VNF {}".format(exc
)
7454 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7456 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7458 :param: vim_account_id: VIM Account ID
7460 :return: (cloud_name, cloud_credential)
7462 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7463 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7465 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7467 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7469 :param: vim_account_id: VIM Account ID
7471 :return: (cloud_name, cloud_credential)
7473 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7474 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7476 async def migrate(self
, nsr_id
, nslcmop_id
):
7478 Migrate VNFs and VDUs instances in a NS
7480 :param: nsr_id: NS Instance ID
7481 :param: nslcmop_id: nslcmop ID of migrate
7484 # Try to lock HA task here
7485 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7486 if not task_is_locked_by_me
:
7488 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7489 self
.logger
.debug(logging_text
+ "Enter")
7490 # get all needed from database
7492 db_nslcmop_update
= {}
7493 nslcmop_operation_state
= None
7497 # in case of error, indicates what part of scale was failed to put nsr at error status
7498 start_deploy
= time()
7501 # wait for any previous tasks in process
7502 step
= "Waiting for previous operations to terminate"
7503 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7505 self
._write
_ns
_status
(
7508 current_operation
="MIGRATING",
7509 current_operation_id
=nslcmop_id
,
7511 step
= "Getting nslcmop from database"
7513 step
+ " after having waited for previous tasks to be completed"
7515 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7516 migrate_params
= db_nslcmop
.get("operationParams")
7519 target
.update(migrate_params
)
7520 desc
= await self
.RO
.migrate(nsr_id
, target
)
7521 self
.logger
.debug("RO return > {}".format(desc
))
7522 action_id
= desc
["action_id"]
7523 await self
._wait
_ng
_ro
(
7528 self
.timeout_migrate
,
7529 operation
="migrate",
7531 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7532 self
.logger
.error("Exit Exception {}".format(e
))
7534 except asyncio
.CancelledError
:
7535 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7536 exc
= "Operation was cancelled"
7537 except Exception as e
:
7538 exc
= traceback
.format_exc()
7539 self
.logger
.critical(
7540 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7543 self
._write
_ns
_status
(
7546 current_operation
="IDLE",
7547 current_operation_id
=None,
7550 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7551 nslcmop_operation_state
= "FAILED"
7553 nslcmop_operation_state
= "COMPLETED"
7554 db_nslcmop_update
["detailed-status"] = "Done"
7555 db_nsr_update
["detailed-status"] = "Done"
7557 self
._write
_op
_status
(
7561 operation_state
=nslcmop_operation_state
,
7562 other_update
=db_nslcmop_update
,
7564 if nslcmop_operation_state
:
7568 "nslcmop_id": nslcmop_id
,
7569 "operationState": nslcmop_operation_state
,
7571 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7572 except Exception as e
:
7574 logging_text
+ "kafka_write notification Exception {}".format(e
)
7576 self
.logger
.debug(logging_text
+ "Exit")
7577 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7579 async def heal(self
, nsr_id
, nslcmop_id
):
7583 :param nsr_id: ns instance to heal
7584 :param nslcmop_id: operation to run
7588 # Try to lock HA task here
7589 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7590 if not task_is_locked_by_me
:
7593 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7594 stage
= ["", "", ""]
7595 tasks_dict_info
= {}
7596 # ^ stage, step, VIM progress
7597 self
.logger
.debug(logging_text
+ "Enter")
7598 # get all needed from database
7600 db_nslcmop_update
= {}
7602 db_vnfrs
= {} # vnf's info indexed by _id
7604 old_operational_status
= ""
7605 old_config_status
= ""
7608 # wait for any previous tasks in process
7609 step
= "Waiting for previous operations to terminate"
7610 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7611 self
._write
_ns
_status
(
7614 current_operation
="HEALING",
7615 current_operation_id
=nslcmop_id
,
7618 step
= "Getting nslcmop from database"
7620 step
+ " after having waited for previous tasks to be completed"
7622 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7624 step
= "Getting nsr from database"
7625 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7626 old_operational_status
= db_nsr
["operational-status"]
7627 old_config_status
= db_nsr
["config-status"]
7630 "_admin.deployed.RO.operational-status": "healing",
7632 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7634 step
= "Sending heal order to VIM"
7635 task_ro
= asyncio
.ensure_future(
7637 logging_text
=logging_text
,
7639 db_nslcmop
=db_nslcmop
,
7643 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7644 tasks_dict_info
[task_ro
] = "Healing at VIM"
7648 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7649 self
.logger
.debug(logging_text
+ stage
[1])
7650 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7651 self
.fs
.sync(db_nsr
["nsd-id"])
7653 # read from db: vnfr's of this ns
7654 step
= "Getting vnfrs from db"
7655 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7656 for vnfr
in db_vnfrs_list
:
7657 db_vnfrs
[vnfr
["_id"]] = vnfr
7658 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7660 # Check for each target VNF
7661 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7662 for target_vnf
in target_list
:
7663 # Find this VNF in the list from DB
7664 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7666 db_vnfr
= db_vnfrs
[vnfr_id
]
7667 vnfd_id
= db_vnfr
.get("vnfd-id")
7668 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7669 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7670 base_folder
= vnfd
["_admin"]["storage"]
7675 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7676 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7678 # Check each target VDU and deploy N2VC
7679 target_vdu_list
= target_vnf
.get("additionalParams", {}).get(
7682 if not target_vdu_list
:
7683 # Codigo nuevo para crear diccionario
7684 target_vdu_list
= []
7685 for existing_vdu
in db_vnfr
.get("vdur"):
7686 vdu_name
= existing_vdu
.get("vdu-name", None)
7687 vdu_index
= existing_vdu
.get("count-index", 0)
7688 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get(
7691 vdu_to_be_healed
= {
7693 "count-index": vdu_index
,
7694 "run-day1": vdu_run_day1
,
7696 target_vdu_list
.append(vdu_to_be_healed
)
7697 for target_vdu
in target_vdu_list
:
7698 deploy_params_vdu
= target_vdu
7699 # Set run-day1 vnf level value if not vdu level value exists
7700 if not deploy_params_vdu
.get("run-day1") and target_vnf
[
7703 deploy_params_vdu
["run-day1"] = target_vnf
[
7706 vdu_name
= target_vdu
.get("vdu-id", None)
7707 # TODO: Get vdu_id from vdud.
7709 # For multi instance VDU count-index is mandatory
7710 # For single session VDU count-indes is 0
7711 vdu_index
= target_vdu
.get("count-index", 0)
7713 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7714 stage
[1] = "Deploying Execution Environments."
7715 self
.logger
.debug(logging_text
+ stage
[1])
7717 # VNF Level charm. Normal case when proxy charms.
7718 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7719 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7720 if descriptor_config
:
7721 # Continue if healed machine is management machine
7722 vnf_ip_address
= db_vnfr
.get("ip-address")
7723 target_instance
= None
7724 for instance
in db_vnfr
.get("vdur", None):
7726 instance
["vdu-name"] == vdu_name
7727 and instance
["count-index"] == vdu_index
7729 target_instance
= instance
7731 if vnf_ip_address
== target_instance
.get("ip-address"):
7733 logging_text
=logging_text
7734 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7735 member_vnf_index
, vdu_name
, vdu_index
7739 nslcmop_id
=nslcmop_id
,
7745 member_vnf_index
=member_vnf_index
,
7748 deploy_params
=deploy_params_vdu
,
7749 descriptor_config
=descriptor_config
,
7750 base_folder
=base_folder
,
7751 task_instantiation_info
=tasks_dict_info
,
7755 # VDU Level charm. Normal case with native charms.
7756 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7757 if descriptor_config
:
7759 logging_text
=logging_text
7760 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7761 member_vnf_index
, vdu_name
, vdu_index
7765 nslcmop_id
=nslcmop_id
,
7771 member_vnf_index
=member_vnf_index
,
7772 vdu_index
=vdu_index
,
7774 deploy_params
=deploy_params_vdu
,
7775 descriptor_config
=descriptor_config
,
7776 base_folder
=base_folder
,
7777 task_instantiation_info
=tasks_dict_info
,
7782 ROclient
.ROClientException
,
7787 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7789 except asyncio
.CancelledError
:
7791 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7793 exc
= "Operation was cancelled"
7794 except Exception as e
:
7795 exc
= traceback
.format_exc()
7796 self
.logger
.critical(
7797 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7802 stage
[1] = "Waiting for healing pending tasks."
7803 self
.logger
.debug(logging_text
+ stage
[1])
7804 exc
= await self
._wait
_for
_tasks
(
7807 self
.timeout_ns_deploy
,
7815 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7816 nslcmop_operation_state
= "FAILED"
7818 db_nsr_update
["operational-status"] = old_operational_status
7819 db_nsr_update
["config-status"] = old_config_status
7822 ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id
, step
, exc
)
7823 for task
, task_name
in tasks_dict_info
.items():
7824 if not task
.done() or task
.cancelled() or task
.exception():
7825 if task_name
.startswith(self
.task_name_deploy_vca
):
7826 # A N2VC task is pending
7827 db_nsr_update
["config-status"] = "failed"
7829 # RO task is pending
7830 db_nsr_update
["operational-status"] = "failed"
7832 error_description_nslcmop
= None
7833 nslcmop_operation_state
= "COMPLETED"
7834 db_nslcmop_update
["detailed-status"] = "Done"
7835 db_nsr_update
["detailed-status"] = "Done"
7836 db_nsr_update
["operational-status"] = "running"
7837 db_nsr_update
["config-status"] = "configured"
7839 self
._write
_op
_status
(
7842 error_message
=error_description_nslcmop
,
7843 operation_state
=nslcmop_operation_state
,
7844 other_update
=db_nslcmop_update
,
7847 self
._write
_ns
_status
(
7850 current_operation
="IDLE",
7851 current_operation_id
=None,
7852 other_update
=db_nsr_update
,
7855 if nslcmop_operation_state
:
7859 "nslcmop_id": nslcmop_id
,
7860 "operationState": nslcmop_operation_state
,
7862 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7863 except Exception as e
:
7865 logging_text
+ "kafka_write notification Exception {}".format(e
)
7867 self
.logger
.debug(logging_text
+ "Exit")
7868 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7879 :param logging_text: preffix text to use at logging
7880 :param nsr_id: nsr identity
7881 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7882 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7883 :return: None or exception
7886 def get_vim_account(vim_account_id
):
7888 if vim_account_id
in db_vims
:
7889 return db_vims
[vim_account_id
]
7890 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7891 db_vims
[vim_account_id
] = db_vim
7896 ns_params
= db_nslcmop
.get("operationParams")
7897 if ns_params
and ns_params
.get("timeout_ns_heal"):
7898 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7900 timeout_ns_heal
= self
.timeout
.get("ns_heal", self
.timeout_ns_heal
)
7904 nslcmop_id
= db_nslcmop
["_id"]
7906 "action_id": nslcmop_id
,
7908 self
.logger
.warning(
7909 "db_nslcmop={} and timeout_ns_heal={}".format(
7910 db_nslcmop
, timeout_ns_heal
7913 target
.update(db_nslcmop
.get("operationParams", {}))
7915 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7916 desc
= await self
.RO
.recreate(nsr_id
, target
)
7917 self
.logger
.debug("RO return > {}".format(desc
))
7918 action_id
= desc
["action_id"]
7919 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7920 await self
._wait
_ng
_ro
(
7927 operation
="healing",
7932 "_admin.deployed.RO.operational-status": "running",
7933 "detailed-status": " ".join(stage
),
7935 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7936 self
._write
_op
_status
(nslcmop_id
, stage
)
7938 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7941 except Exception as e
:
7942 stage
[2] = "ERROR healing at VIM"
7943 # self.set_vnfr_at_error(db_vnfrs, str(e))
7945 "Error healing at VIM {}".format(e
),
7946 exc_info
=not isinstance(
7949 ROclient
.ROClientException
,
7975 task_instantiation_info
,
7978 # launch instantiate_N2VC in a asyncio task and register task object
7979 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7980 # if not found, create one entry and update database
7981 # fill db_nsr._admin.deployed.VCA.<index>
7984 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7988 get_charm_name
= False
7989 if "execution-environment-list" in descriptor_config
:
7990 ee_list
= descriptor_config
.get("execution-environment-list", [])
7991 elif "juju" in descriptor_config
:
7992 ee_list
= [descriptor_config
] # ns charms
7993 if "execution-environment-list" not in descriptor_config
:
7994 # charm name is only required for ns charms
7995 get_charm_name
= True
7996 else: # other types as script are not supported
7999 for ee_item
in ee_list
:
8002 + "_deploy_n2vc ee_item juju={}, helm={}".format(
8003 ee_item
.get("juju"), ee_item
.get("helm-chart")
8006 ee_descriptor_id
= ee_item
.get("id")
8007 if ee_item
.get("juju"):
8008 vca_name
= ee_item
["juju"].get("charm")
8010 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
8013 if ee_item
["juju"].get("charm") is not None
8016 if ee_item
["juju"].get("cloud") == "k8s":
8017 vca_type
= "k8s_proxy_charm"
8018 elif ee_item
["juju"].get("proxy") is False:
8019 vca_type
= "native_charm"
8020 elif ee_item
.get("helm-chart"):
8021 vca_name
= ee_item
["helm-chart"]
8022 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
8025 vca_type
= "helm-v3"
8028 logging_text
+ "skipping non juju neither charm configuration"
8033 for vca_index
, vca_deployed
in enumerate(
8034 db_nsr
["_admin"]["deployed"]["VCA"]
8036 if not vca_deployed
:
8039 vca_deployed
.get("member-vnf-index") == member_vnf_index
8040 and vca_deployed
.get("vdu_id") == vdu_id
8041 and vca_deployed
.get("kdu_name") == kdu_name
8042 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
8043 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
8047 # not found, create one.
8049 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
8052 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
8054 target
+= "/kdu/{}".format(kdu_name
)
8056 "target_element": target
,
8057 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
8058 "member-vnf-index": member_vnf_index
,
8060 "kdu_name": kdu_name
,
8061 "vdu_count_index": vdu_index
,
8062 "operational-status": "init", # TODO revise
8063 "detailed-status": "", # TODO revise
8064 "step": "initial-deploy", # TODO revise
8066 "vdu_name": vdu_name
,
8068 "ee_descriptor_id": ee_descriptor_id
,
8069 "charm_name": charm_name
,
8073 # create VCA and configurationStatus in db
8075 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
8076 "configurationStatus.{}".format(vca_index
): dict(),
8078 self
.update_db_2("nsrs", nsr_id
, db_dict
)
8080 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
8082 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
8083 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
8084 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
8087 task_n2vc
= asyncio
.ensure_future(
8089 logging_text
=logging_text
,
8090 vca_index
=vca_index
,
8096 vdu_index
=vdu_index
,
8097 deploy_params
=deploy_params
,
8098 config_descriptor
=descriptor_config
,
8099 base_folder
=base_folder
,
8100 nslcmop_id
=nslcmop_id
,
8104 ee_config_descriptor
=ee_item
,
8107 self
.lcm_tasks
.register(
8111 "instantiate_N2VC-{}".format(vca_index
),
8114 task_instantiation_info
[
8116 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8117 member_vnf_index
or "", vdu_id
or ""
8120 async def heal_N2VC(
8137 ee_config_descriptor
,
8139 nsr_id
= db_nsr
["_id"]
8140 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8141 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8142 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8143 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8145 "collection": "nsrs",
8146 "filter": {"_id": nsr_id
},
8147 "path": db_update_entry
,
8153 element_under_configuration
= nsr_id
8157 vnfr_id
= db_vnfr
["_id"]
8158 osm_config
["osm"]["vnf_id"] = vnfr_id
8160 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8162 if vca_type
== "native_charm":
8165 index_number
= vdu_index
or 0
8168 element_type
= "VNF"
8169 element_under_configuration
= vnfr_id
8170 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8172 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8173 element_type
= "VDU"
8174 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8175 osm_config
["osm"]["vdu_id"] = vdu_id
8177 namespace
+= ".{}".format(kdu_name
)
8178 element_type
= "KDU"
8179 element_under_configuration
= kdu_name
8180 osm_config
["osm"]["kdu_name"] = kdu_name
8183 if base_folder
["pkg-dir"]:
8184 artifact_path
= "{}/{}/{}/{}".format(
8185 base_folder
["folder"],
8186 base_folder
["pkg-dir"],
8189 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8194 artifact_path
= "{}/Scripts/{}/{}/".format(
8195 base_folder
["folder"],
8198 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8203 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8205 # get initial_config_primitive_list that applies to this element
8206 initial_config_primitive_list
= config_descriptor
.get(
8207 "initial-config-primitive"
8211 "Initial config primitive list > {}".format(
8212 initial_config_primitive_list
8216 # add config if not present for NS charm
8217 ee_descriptor_id
= ee_config_descriptor
.get("id")
8218 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8219 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8220 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8224 "Initial config primitive list #2 > {}".format(
8225 initial_config_primitive_list
8228 # n2vc_redesign STEP 3.1
8229 # find old ee_id if exists
8230 ee_id
= vca_deployed
.get("ee_id")
8232 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8233 # create or register execution environment in VCA. Only for native charms when healing
8234 if vca_type
== "native_charm":
8235 step
= "Waiting to VM being up and getting IP address"
8236 self
.logger
.debug(logging_text
+ step
)
8237 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8246 credentials
= {"hostname": rw_mgmt_ip
}
8248 username
= deep_get(
8249 config_descriptor
, ("config-access", "ssh-access", "default-user")
8251 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8252 # merged. Meanwhile let's get username from initial-config-primitive
8253 if not username
and initial_config_primitive_list
:
8254 for config_primitive
in initial_config_primitive_list
:
8255 for param
in config_primitive
.get("parameter", ()):
8256 if param
["name"] == "ssh-username":
8257 username
= param
["value"]
8261 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8262 "'config-access.ssh-access.default-user'"
8264 credentials
["username"] = username
8266 # n2vc_redesign STEP 3.2
8267 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8268 self
._write
_configuration
_status
(
8270 vca_index
=vca_index
,
8271 status
="REGISTERING",
8272 element_under_configuration
=element_under_configuration
,
8273 element_type
=element_type
,
8276 step
= "register execution environment {}".format(credentials
)
8277 self
.logger
.debug(logging_text
+ step
)
8278 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8279 credentials
=credentials
,
8280 namespace
=namespace
,
8285 # update ee_id en db
8287 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8289 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8291 # for compatibility with MON/POL modules, the need model and application name at database
8292 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8293 # Not sure if this need to be done when healing
8295 ee_id_parts = ee_id.split(".")
8296 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8297 if len(ee_id_parts) >= 2:
8298 model_name = ee_id_parts[0]
8299 application_name = ee_id_parts[1]
8300 db_nsr_update[db_update_entry + "model"] = model_name
8301 db_nsr_update[db_update_entry + "application"] = application_name
8304 # n2vc_redesign STEP 3.3
8305 # Install configuration software. Only for native charms.
8306 step
= "Install configuration Software"
8308 self
._write
_configuration
_status
(
8310 vca_index
=vca_index
,
8311 status
="INSTALLING SW",
8312 element_under_configuration
=element_under_configuration
,
8313 element_type
=element_type
,
8314 # other_update=db_nsr_update,
8318 # TODO check if already done
8319 self
.logger
.debug(logging_text
+ step
)
8321 if vca_type
== "native_charm":
8322 config_primitive
= next(
8323 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8326 if config_primitive
:
8327 config
= self
._map
_primitive
_params
(
8328 config_primitive
, {}, deploy_params
8330 await self
.vca_map
[vca_type
].install_configuration_sw(
8332 artifact_path
=artifact_path
,
8340 # write in db flag of configuration_sw already installed
8342 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8345 # Not sure if this need to be done when healing
8347 # add relations for this VCA (wait for other peers related with this VCA)
8348 await self._add_vca_relations(
8349 logging_text=logging_text,
8352 vca_index=vca_index,
8356 # if SSH access is required, then get execution environment SSH public
8357 # if native charm we have waited already to VM be UP
8358 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8361 # self.logger.debug("get ssh key block")
8363 config_descriptor
, ("config-access", "ssh-access", "required")
8365 # self.logger.debug("ssh key needed")
8366 # Needed to inject a ssh key
8369 ("config-access", "ssh-access", "default-user"),
8371 step
= "Install configuration Software, getting public ssh key"
8372 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8373 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8376 step
= "Insert public key into VM user={} ssh_key={}".format(
8380 # self.logger.debug("no need to get ssh key")
8381 step
= "Waiting to VM being up and getting IP address"
8382 self
.logger
.debug(logging_text
+ step
)
8384 # n2vc_redesign STEP 5.1
8385 # wait for RO (ip-address) Insert pub_key into VM
8386 # IMPORTANT: We need do wait for RO to complete healing operation.
8387 await self
._wait
_heal
_ro
(nsr_id
, self
.timeout_ns_heal
)
8390 rw_mgmt_ip
= await self
.wait_kdu_up(
8391 logging_text
, nsr_id
, vnfr_id
, kdu_name
8394 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8404 rw_mgmt_ip
= None # This is for a NS configuration
8406 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8408 # store rw_mgmt_ip in deploy params for later replacement
8409 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8412 # get run-day1 operation parameter
8413 runDay1
= deploy_params
.get("run-day1", False)
8415 "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
, vdu_id
, runDay1
)
8418 # n2vc_redesign STEP 6 Execute initial config primitive
8419 step
= "execute initial config primitive"
8421 # wait for dependent primitives execution (NS -> VNF -> VDU)
8422 if initial_config_primitive_list
:
8423 await self
._wait
_dependent
_n
2vc
(
8424 nsr_id
, vca_deployed_list
, vca_index
8427 # stage, in function of element type: vdu, kdu, vnf or ns
8428 my_vca
= vca_deployed_list
[vca_index
]
8429 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8431 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8432 elif my_vca
.get("member-vnf-index"):
8434 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8437 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8439 self
._write
_configuration
_status
(
8440 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8443 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8445 check_if_terminated_needed
= True
8446 for initial_config_primitive
in initial_config_primitive_list
:
8447 # adding information on the vca_deployed if it is a NS execution environment
8448 if not vca_deployed
["member-vnf-index"]:
8449 deploy_params
["ns_config_info"] = json
.dumps(
8450 self
._get
_ns
_config
_info
(nsr_id
)
8452 # TODO check if already done
8453 primitive_params_
= self
._map
_primitive
_params
(
8454 initial_config_primitive
, {}, deploy_params
8457 step
= "execute primitive '{}' params '{}'".format(
8458 initial_config_primitive
["name"], primitive_params_
8460 self
.logger
.debug(logging_text
+ step
)
8461 await self
.vca_map
[vca_type
].exec_primitive(
8463 primitive_name
=initial_config_primitive
["name"],
8464 params_dict
=primitive_params_
,
8469 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8470 if check_if_terminated_needed
:
8471 if config_descriptor
.get("terminate-config-primitive"):
8475 {db_update_entry
+ "needed_terminate": True},
8477 check_if_terminated_needed
= False
8479 # TODO register in database that primitive is done
8481 # STEP 7 Configure metrics
8482 # Not sure if this need to be done when healing
8484 if vca_type == "helm" or vca_type == "helm-v3":
8485 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8487 artifact_path=artifact_path,
8488 ee_config_descriptor=ee_config_descriptor,
8491 target_ip=rw_mgmt_ip,
8497 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8500 for job in prometheus_jobs:
8503 {"job_name": job["job_name"]},
8506 fail_on_empty=False,
8510 step
= "instantiated at VCA"
8511 self
.logger
.debug(logging_text
+ step
)
8513 self
._write
_configuration
_status
(
8514 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8517 except Exception as e
: # TODO not use Exception but N2VC exception
8518 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8520 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8523 "Exception while {} : {}".format(step
, e
), exc_info
=True
8525 self
._write
_configuration
_status
(
8526 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8528 raise LcmException("{} {}".format(step
, e
)) from e
8530 async def _wait_heal_ro(
8536 while time() <= start_time
+ timeout
:
8537 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8538 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"][
8539 "operational-status"
8541 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8542 if operational_status_ro
!= "healing":
8544 await asyncio
.sleep(15, loop
=self
.loop
)
8545 else: # timeout_ns_deploy
8546 raise NgRoException("Timeout waiting ns to deploy")
8548 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8550 Vertical Scale the VDUs in a NS
8552 :param: nsr_id: NS Instance ID
8553 :param: nslcmop_id: nslcmop ID of migrate
8556 # Try to lock HA task here
8557 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8558 if not task_is_locked_by_me
:
8560 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8561 self
.logger
.debug(logging_text
+ "Enter")
8562 # get all needed from database
8564 db_nslcmop_update
= {}
8565 nslcmop_operation_state
= None
8569 # in case of error, indicates what part of scale was failed to put nsr at error status
8570 start_deploy
= time()
8573 # wait for any previous tasks in process
8574 step
= "Waiting for previous operations to terminate"
8575 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
8577 self
._write
_ns
_status
(
8580 current_operation
="VerticalScale",
8581 current_operation_id
=nslcmop_id
,
8583 step
= "Getting nslcmop from database"
8585 step
+ " after having waited for previous tasks to be completed"
8587 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8588 operationParams
= db_nslcmop
.get("operationParams")
8590 target
.update(operationParams
)
8591 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8592 self
.logger
.debug("RO return > {}".format(desc
))
8593 action_id
= desc
["action_id"]
8594 await self
._wait
_ng
_ro
(
8599 self
.timeout_verticalscale
,
8600 operation
="verticalscale",
8602 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8603 self
.logger
.error("Exit Exception {}".format(e
))
8605 except asyncio
.CancelledError
:
8606 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8607 exc
= "Operation was cancelled"
8608 except Exception as e
:
8609 exc
= traceback
.format_exc()
8610 self
.logger
.critical(
8611 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
8614 self
._write
_ns
_status
(
8617 current_operation
="IDLE",
8618 current_operation_id
=None,
8621 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
8622 nslcmop_operation_state
= "FAILED"
8624 nslcmop_operation_state
= "COMPLETED"
8625 db_nslcmop_update
["detailed-status"] = "Done"
8626 db_nsr_update
["detailed-status"] = "Done"
8628 self
._write
_op
_status
(
8632 operation_state
=nslcmop_operation_state
,
8633 other_update
=db_nslcmop_update
,
8635 if nslcmop_operation_state
:
8639 "nslcmop_id": nslcmop_id
,
8640 "operationState": nslcmop_operation_state
,
8642 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8643 except Exception as e
:
8645 logging_text
+ "kafka_write notification Exception {}".format(e
)
8647 self
.logger
.debug(logging_text
+ "Exit")
8648 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")