1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
106 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
107 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
109 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
110 from osm_lcm
.osm_config
import OsmConfigBuilder
111 from osm_lcm
.prometheus
import parse_job
113 from copy
import copy
, deepcopy
114 from time
import time
115 from uuid
import uuid4
117 from random
import randint
119 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
122 class NsLcm(LcmBase
):
123 timeout_vca_on_error
= (
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
128 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
129 timeout_charm_delete
= 10 * 60
130 timeout_primitive
= 30 * 60 # timeout for primitive execution
131 timeout_ns_update
= 30 * 60 # timeout for ns update
132 timeout_progress_primitive
= (
134 ) # timeout for some progress in a primitive execution
135 timeout_migrate
= 1800 # default global timeout for migrating vnfs
136 timeout_operate
= 1800 # default global timeout for migrating vnfs
137 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
138 SUBOPERATION_STATUS_NOT_FOUND
= -1
139 SUBOPERATION_STATUS_NEW
= -2
140 SUBOPERATION_STATUS_SKIP
= -3
141 task_name_deploy_vca
= "Deploying VCA"
143 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
145 Init, Connect to database, filesystem storage, and messaging
146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
149 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
151 self
.db
= Database().instance
.db
152 self
.fs
= Filesystem().instance
.fs
154 self
.lcm_tasks
= lcm_tasks
155 self
.timeout
= config
["timeout"]
156 self
.ro_config
= config
["ro_config"]
157 self
.ng_ro
= config
["ro_config"].get("ng")
158 self
.vca_config
= config
["VCA"].copy()
160 # create N2VC connector
161 self
.n2vc
= N2VCJujuConnector(
164 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.conn_helm_ee
= LCMHelmConn(
172 vca_config
=self
.vca_config
,
173 on_update_db
=self
._on
_update
_n
2vc
_db
,
176 self
.k8sclusterhelm2
= K8sHelmConnector(
177 kubectl_command
=self
.vca_config
.get("kubectlpath"),
178 helm_command
=self
.vca_config
.get("helmpath"),
185 self
.k8sclusterhelm3
= K8sHelm3Connector(
186 kubectl_command
=self
.vca_config
.get("kubectlpath"),
187 helm_command
=self
.vca_config
.get("helm3path"),
194 self
.k8sclusterjuju
= K8sJujuConnector(
195 kubectl_command
=self
.vca_config
.get("kubectlpath"),
196 juju_command
=self
.vca_config
.get("jujupath"),
199 on_update_db
=self
._on
_update
_k
8s
_db
,
204 self
.k8scluster_map
= {
205 "helm-chart": self
.k8sclusterhelm2
,
206 "helm-chart-v3": self
.k8sclusterhelm3
,
207 "chart": self
.k8sclusterhelm3
,
208 "juju-bundle": self
.k8sclusterjuju
,
209 "juju": self
.k8sclusterjuju
,
213 "lxc_proxy_charm": self
.n2vc
,
214 "native_charm": self
.n2vc
,
215 "k8s_proxy_charm": self
.n2vc
,
216 "helm": self
.conn_helm_ee
,
217 "helm-v3": self
.conn_helm_ee
,
221 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
223 self
.op_status_map
= {
224 "instantiation": self
.RO
.status
,
225 "termination": self
.RO
.status
,
226 "migrate": self
.RO
.status
,
227 "healing": self
.RO
.recreate_status
,
231 def increment_ip_mac(ip_mac
, vm_index
=1):
232 if not isinstance(ip_mac
, str):
235 # try with ipv4 look for last dot
236 i
= ip_mac
.rfind(".")
239 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
240 # try with ipv6 or mac look for last colon. Operate in hex
241 i
= ip_mac
.rfind(":")
244 # format in hex, len can be 2 for mac or 4 for ipv6
245 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
246 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
252 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
254 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
257 # TODO filter RO descriptor fields...
261 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
262 db_dict
["deploymentStatus"] = ro_descriptor
263 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
265 except Exception as e
:
267 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
270 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
272 # remove last dot from path (if exists)
273 if path
.endswith("."):
276 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
277 # .format(table, filter, path, updated_data))
280 nsr_id
= filter.get("_id")
282 # read ns record from database
283 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
284 current_ns_status
= nsr
.get("nsState")
286 # get vca status for NS
287 status_dict
= await self
.n2vc
.get_status(
288 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
293 db_dict
["vcaStatus"] = status_dict
294 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
296 # update configurationStatus for this VCA
298 vca_index
= int(path
[path
.rfind(".") + 1 :])
301 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
303 vca_status
= vca_list
[vca_index
].get("status")
305 configuration_status_list
= nsr
.get("configurationStatus")
306 config_status
= configuration_status_list
[vca_index
].get("status")
308 if config_status
== "BROKEN" and vca_status
!= "failed":
309 db_dict
["configurationStatus"][vca_index
] = "READY"
310 elif config_status
!= "BROKEN" and vca_status
== "failed":
311 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
312 except Exception as e
:
313 # not update configurationStatus
314 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
316 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
317 # if nsState = 'DEGRADED' check if all is OK
319 if current_ns_status
in ("READY", "DEGRADED"):
320 error_description
= ""
322 if status_dict
.get("machines"):
323 for machine_id
in status_dict
.get("machines"):
324 machine
= status_dict
.get("machines").get(machine_id
)
325 # check machine agent-status
326 if machine
.get("agent-status"):
327 s
= machine
.get("agent-status").get("status")
330 error_description
+= (
331 "machine {} agent-status={} ; ".format(
335 # check machine instance status
336 if machine
.get("instance-status"):
337 s
= machine
.get("instance-status").get("status")
340 error_description
+= (
341 "machine {} instance-status={} ; ".format(
346 if status_dict
.get("applications"):
347 for app_id
in status_dict
.get("applications"):
348 app
= status_dict
.get("applications").get(app_id
)
349 # check application status
350 if app
.get("status"):
351 s
= app
.get("status").get("status")
354 error_description
+= (
355 "application {} status={} ; ".format(app_id
, s
)
358 if error_description
:
359 db_dict
["errorDescription"] = error_description
360 if current_ns_status
== "READY" and is_degraded
:
361 db_dict
["nsState"] = "DEGRADED"
362 if current_ns_status
== "DEGRADED" and not is_degraded
:
363 db_dict
["nsState"] = "READY"
366 self
.update_db_2("nsrs", nsr_id
, db_dict
)
368 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
370 except Exception as e
:
371 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
373 async def _on_update_k8s_db(
374 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
377 Updating vca status in NSR record
378 :param cluster_uuid: UUID of a k8s cluster
379 :param kdu_instance: The unique name of the KDU instance
380 :param filter: To get nsr_id
381 :cluster_type: The cluster type (juju, k8s)
385 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
386 # .format(cluster_uuid, kdu_instance, filter))
388 nsr_id
= filter.get("_id")
390 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
391 cluster_uuid
=cluster_uuid
,
392 kdu_instance
=kdu_instance
,
394 complete_status
=True,
400 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
402 if cluster_type
in ("juju-bundle", "juju"):
403 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
404 # status in a similar way between Juju Bundles and Helm Charts on this side
405 await self
.k8sclusterjuju
.update_vca_status(
406 db_dict
["vcaStatus"],
412 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
416 self
.update_db_2("nsrs", nsr_id
, db_dict
)
417 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
419 except Exception as e
:
420 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
423 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
425 env
= Environment(undefined
=StrictUndefined
)
426 template
= env
.from_string(cloud_init_text
)
427 return template
.render(additional_params
or {})
428 except UndefinedError
as e
:
430 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
431 "file, must be provided in the instantiation parameters inside the "
432 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
434 except (TemplateError
, TemplateNotFound
) as e
:
436 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
441 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
442 cloud_init_content
= cloud_init_file
= None
444 if vdu
.get("cloud-init-file"):
445 base_folder
= vnfd
["_admin"]["storage"]
446 if base_folder
["pkg-dir"]:
447 cloud_init_file
= "{}/{}/cloud_init/{}".format(
448 base_folder
["folder"],
449 base_folder
["pkg-dir"],
450 vdu
["cloud-init-file"],
453 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
454 base_folder
["folder"],
455 vdu
["cloud-init-file"],
457 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
458 cloud_init_content
= ci_file
.read()
459 elif vdu
.get("cloud-init"):
460 cloud_init_content
= vdu
["cloud-init"]
462 return cloud_init_content
463 except FsException
as e
:
465 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
466 vnfd
["id"], vdu
["id"], cloud_init_file
, e
470 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
472 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
474 additional_params
= vdur
.get("additionalParams")
475 return parse_yaml_strings(additional_params
)
477 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
479 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
480 :param vnfd: input vnfd
481 :param new_id: overrides vnf id if provided
482 :param additionalParams: Instantiation params for VNFs provided
483 :param nsrId: Id of the NSR
484 :return: copy of vnfd
486 vnfd_RO
= deepcopy(vnfd
)
487 # remove unused by RO configuration, monitoring, scaling and internal keys
488 vnfd_RO
.pop("_id", None)
489 vnfd_RO
.pop("_admin", None)
490 vnfd_RO
.pop("monitoring-param", None)
491 vnfd_RO
.pop("scaling-group-descriptor", None)
492 vnfd_RO
.pop("kdu", None)
493 vnfd_RO
.pop("k8s-cluster", None)
495 vnfd_RO
["id"] = new_id
497 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
498 for vdu
in get_iterable(vnfd_RO
, "vdu"):
499 vdu
.pop("cloud-init-file", None)
500 vdu
.pop("cloud-init", None)
504 def ip_profile_2_RO(ip_profile
):
505 RO_ip_profile
= deepcopy(ip_profile
)
506 if "dns-server" in RO_ip_profile
:
507 if isinstance(RO_ip_profile
["dns-server"], list):
508 RO_ip_profile
["dns-address"] = []
509 for ds
in RO_ip_profile
.pop("dns-server"):
510 RO_ip_profile
["dns-address"].append(ds
["address"])
512 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
513 if RO_ip_profile
.get("ip-version") == "ipv4":
514 RO_ip_profile
["ip-version"] = "IPv4"
515 if RO_ip_profile
.get("ip-version") == "ipv6":
516 RO_ip_profile
["ip-version"] = "IPv6"
517 if "dhcp-params" in RO_ip_profile
:
518 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
521 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
522 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
523 if db_vim
["_admin"]["operationalState"] != "ENABLED":
525 "VIM={} is not available. operationalState={}".format(
526 vim_account
, db_vim
["_admin"]["operationalState"]
529 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
532 def get_ro_wim_id_for_wim_account(self
, wim_account
):
533 if isinstance(wim_account
, str):
534 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
535 if db_wim
["_admin"]["operationalState"] != "ENABLED":
537 "WIM={} is not available. operationalState={}".format(
538 wim_account
, db_wim
["_admin"]["operationalState"]
541 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
546 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
548 db_vdu_push_list
= []
550 db_update
= {"_admin.modified": time()}
552 for vdu_id
, vdu_count
in vdu_create
.items():
556 for vdur
in reversed(db_vnfr
["vdur"])
557 if vdur
["vdu-id-ref"] == vdu_id
562 # Read the template saved in the db:
564 "No vdur in the database. Using the vdur-template to scale"
566 vdur_template
= db_vnfr
.get("vdur-template")
567 if not vdur_template
:
569 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
573 vdur
= vdur_template
[0]
574 # Delete a template from the database after using it
577 {"_id": db_vnfr
["_id"]},
579 pull
={"vdur-template": {"_id": vdur
["_id"]}},
581 for count
in range(vdu_count
):
582 vdur_copy
= deepcopy(vdur
)
583 vdur_copy
["status"] = "BUILD"
584 vdur_copy
["status-detailed"] = None
585 vdur_copy
["ip-address"] = None
586 vdur_copy
["_id"] = str(uuid4())
587 vdur_copy
["count-index"] += count
+ 1
588 vdur_copy
["id"] = "{}-{}".format(
589 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
591 vdur_copy
.pop("vim_info", None)
592 for iface
in vdur_copy
["interfaces"]:
593 if iface
.get("fixed-ip"):
594 iface
["ip-address"] = self
.increment_ip_mac(
595 iface
["ip-address"], count
+ 1
598 iface
.pop("ip-address", None)
599 if iface
.get("fixed-mac"):
600 iface
["mac-address"] = self
.increment_ip_mac(
601 iface
["mac-address"], count
+ 1
604 iface
.pop("mac-address", None)
608 ) # only first vdu can be managment of vnf
609 db_vdu_push_list
.append(vdur_copy
)
610 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
612 if len(db_vnfr
["vdur"]) == 1:
613 # The scale will move to 0 instances
615 "Scaling to 0 !, creating the template with the last vdur"
617 template_vdur
= [db_vnfr
["vdur"][0]]
618 for vdu_id
, vdu_count
in vdu_delete
.items():
620 indexes_to_delete
= [
622 for iv
in enumerate(db_vnfr
["vdur"])
623 if iv
[1]["vdu-id-ref"] == vdu_id
627 "vdur.{}.status".format(i
): "DELETING"
628 for i
in indexes_to_delete
[-vdu_count
:]
632 # it must be deleted one by one because common.db does not allow otherwise
635 for v
in reversed(db_vnfr
["vdur"])
636 if v
["vdu-id-ref"] == vdu_id
638 for vdu
in vdus_to_delete
[:vdu_count
]:
641 {"_id": db_vnfr
["_id"]},
643 pull
={"vdur": {"_id": vdu
["_id"]}},
647 db_push
["vdur"] = db_vdu_push_list
649 db_push
["vdur-template"] = template_vdur
652 db_vnfr
["vdur-template"] = template_vdur
653 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
654 # modify passed dictionary db_vnfr
655 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
656 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
658 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
660 Updates database nsr with the RO info for the created vld
661 :param ns_update_nsr: dictionary to be filled with the updated info
662 :param db_nsr: content of db_nsr. This is also modified
663 :param nsr_desc_RO: nsr descriptor from RO
664 :return: Nothing, LcmException is raised on errors
667 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
668 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
669 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
671 vld
["vim-id"] = net_RO
.get("vim_net_id")
672 vld
["name"] = net_RO
.get("vim_name")
673 vld
["status"] = net_RO
.get("status")
674 vld
["status-detailed"] = net_RO
.get("error_msg")
675 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
679 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
682 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
684 for db_vnfr
in db_vnfrs
.values():
685 vnfr_update
= {"status": "ERROR"}
686 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
687 if "status" not in vdur
:
688 vdur
["status"] = "ERROR"
689 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
691 vdur
["status-detailed"] = str(error_text
)
693 "vdur.{}.status-detailed".format(vdu_index
)
695 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
696 except DbException
as e
:
697 self
.logger
.error("Cannot update vnf. {}".format(e
))
699 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
701 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
702 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
703 :param nsr_desc_RO: nsr descriptor from RO
704 :return: Nothing, LcmException is raised on errors
706 for vnf_index
, db_vnfr
in db_vnfrs
.items():
707 for vnf_RO
in nsr_desc_RO
["vnfs"]:
708 if vnf_RO
["member_vnf_index"] != vnf_index
:
711 if vnf_RO
.get("ip_address"):
712 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
715 elif not db_vnfr
.get("ip-address"):
716 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
717 raise LcmExceptionNoMgmtIP(
718 "ns member_vnf_index '{}' has no IP address".format(
723 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
724 vdur_RO_count_index
= 0
725 if vdur
.get("pdu-type"):
727 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
728 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
730 if vdur
["count-index"] != vdur_RO_count_index
:
731 vdur_RO_count_index
+= 1
733 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
734 if vdur_RO
.get("ip_address"):
735 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
737 vdur
["ip-address"] = None
738 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
739 vdur
["name"] = vdur_RO
.get("vim_name")
740 vdur
["status"] = vdur_RO
.get("status")
741 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
742 for ifacer
in get_iterable(vdur
, "interfaces"):
743 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
744 if ifacer
["name"] == interface_RO
.get("internal_name"):
745 ifacer
["ip-address"] = interface_RO
.get(
748 ifacer
["mac-address"] = interface_RO
.get(
754 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
755 "from VIM info".format(
756 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
759 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
763 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
765 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
769 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
770 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
771 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
773 vld
["vim-id"] = net_RO
.get("vim_net_id")
774 vld
["name"] = net_RO
.get("vim_name")
775 vld
["status"] = net_RO
.get("status")
776 vld
["status-detailed"] = net_RO
.get("error_msg")
777 vnfr_update
["vld.{}".format(vld_index
)] = vld
781 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
786 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
791 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
796 def _get_ns_config_info(self
, nsr_id
):
798 Generates a mapping between vnf,vdu elements and the N2VC id
799 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
800 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
801 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
802 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
804 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
805 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
807 ns_config_info
= {"osm-config-mapping": mapping
}
808 for vca
in vca_deployed_list
:
809 if not vca
["member-vnf-index"]:
811 if not vca
["vdu_id"]:
812 mapping
[vca
["member-vnf-index"]] = vca
["application"]
816 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
818 ] = vca
["application"]
819 return ns_config_info
821 async def _instantiate_ng_ro(
838 def get_vim_account(vim_account_id
):
840 if vim_account_id
in db_vims
:
841 return db_vims
[vim_account_id
]
842 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
843 db_vims
[vim_account_id
] = db_vim
846 # modify target_vld info with instantiation parameters
847 def parse_vld_instantiation_params(
848 target_vim
, target_vld
, vld_params
, target_sdn
850 if vld_params
.get("ip-profile"):
851 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
854 if vld_params
.get("provider-network"):
855 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
858 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
859 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
862 if vld_params
.get("wimAccountId"):
863 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
864 target_vld
["vim_info"][target_wim
] = {}
865 for param
in ("vim-network-name", "vim-network-id"):
866 if vld_params
.get(param
):
867 if isinstance(vld_params
[param
], dict):
868 for vim
, vim_net
in vld_params
[param
].items():
869 other_target_vim
= "vim:" + vim
871 target_vld
["vim_info"],
872 (other_target_vim
, param
.replace("-", "_")),
875 else: # isinstance str
876 target_vld
["vim_info"][target_vim
][
877 param
.replace("-", "_")
878 ] = vld_params
[param
]
879 if vld_params
.get("common_id"):
880 target_vld
["common_id"] = vld_params
.get("common_id")
882 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
883 def update_ns_vld_target(target
, ns_params
):
884 for vnf_params
in ns_params
.get("vnf", ()):
885 if vnf_params
.get("vimAccountId"):
889 for vnfr
in db_vnfrs
.values()
890 if vnf_params
["member-vnf-index"]
891 == vnfr
["member-vnf-index-ref"]
895 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
896 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
897 target_vld
= find_in_list(
898 get_iterable(vdur
, "interfaces"),
899 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
902 vld_params
= find_in_list(
903 get_iterable(ns_params
, "vld"),
904 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
908 if vnf_params
.get("vimAccountId") not in a_vld
.get(
911 target_vim_network_list
= [
912 v
for _
, v
in a_vld
.get("vim_info").items()
914 target_vim_network_name
= next(
916 item
.get("vim_network_name", "")
917 for item
in target_vim_network_list
922 target
["ns"]["vld"][a_index
].get("vim_info").update(
924 "vim:{}".format(vnf_params
["vimAccountId"]): {
925 "vim_network_name": target_vim_network_name
,
931 for param
in ("vim-network-name", "vim-network-id"):
932 if vld_params
.get(param
) and isinstance(
933 vld_params
[param
], dict
935 for vim
, vim_net
in vld_params
[
938 other_target_vim
= "vim:" + vim
940 target
["ns"]["vld"][a_index
].get(
945 param
.replace("-", "_"),
950 nslcmop_id
= db_nslcmop
["_id"]
952 "name": db_nsr
["name"],
955 "image": deepcopy(db_nsr
["image"]),
956 "flavor": deepcopy(db_nsr
["flavor"]),
957 "action_id": nslcmop_id
,
958 "cloud_init_content": {},
960 for image
in target
["image"]:
961 image
["vim_info"] = {}
962 for flavor
in target
["flavor"]:
963 flavor
["vim_info"] = {}
964 if db_nsr
.get("affinity-or-anti-affinity-group"):
965 target
["affinity-or-anti-affinity-group"] = deepcopy(
966 db_nsr
["affinity-or-anti-affinity-group"]
968 for affinity_or_anti_affinity_group
in target
[
969 "affinity-or-anti-affinity-group"
971 affinity_or_anti_affinity_group
["vim_info"] = {}
973 if db_nslcmop
.get("lcmOperationType") != "instantiate":
974 # get parameters of instantiation:
975 db_nslcmop_instantiate
= self
.db
.get_list(
978 "nsInstanceId": db_nslcmop
["nsInstanceId"],
979 "lcmOperationType": "instantiate",
982 ns_params
= db_nslcmop_instantiate
.get("operationParams")
984 ns_params
= db_nslcmop
.get("operationParams")
985 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
986 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
989 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
990 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
994 "mgmt-network": vld
.get("mgmt-network", False),
995 "type": vld
.get("type"),
998 "vim_network_name": vld
.get("vim-network-name"),
999 "vim_account_id": ns_params
["vimAccountId"],
1003 # check if this network needs SDN assist
1004 if vld
.get("pci-interfaces"):
1005 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1006 sdnc_id
= db_vim
["config"].get("sdn-controller")
1008 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1009 target_sdn
= "sdn:{}".format(sdnc_id
)
1010 target_vld
["vim_info"][target_sdn
] = {
1012 "target_vim": target_vim
,
1014 "type": vld
.get("type"),
1017 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1018 for nsd_vnf_profile
in nsd_vnf_profiles
:
1019 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1020 if cp
["virtual-link-profile-id"] == vld
["id"]:
1022 "member_vnf:{}.{}".format(
1023 cp
["constituent-cpd-id"][0][
1024 "constituent-base-element-id"
1026 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1028 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1030 # check at nsd descriptor, if there is an ip-profile
1032 nsd_vlp
= find_in_list(
1033 get_virtual_link_profiles(nsd
),
1034 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1039 and nsd_vlp
.get("virtual-link-protocol-data")
1040 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1042 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1045 ip_profile_dest_data
= {}
1046 if "ip-version" in ip_profile_source_data
:
1047 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1050 if "cidr" in ip_profile_source_data
:
1051 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1054 if "gateway-ip" in ip_profile_source_data
:
1055 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1058 if "dhcp-enabled" in ip_profile_source_data
:
1059 ip_profile_dest_data
["dhcp-params"] = {
1060 "enabled": ip_profile_source_data
["dhcp-enabled"]
1062 vld_params
["ip-profile"] = ip_profile_dest_data
1064 # update vld_params with instantiation params
1065 vld_instantiation_params
= find_in_list(
1066 get_iterable(ns_params
, "vld"),
1067 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1069 if vld_instantiation_params
:
1070 vld_params
.update(vld_instantiation_params
)
1071 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1072 target
["ns"]["vld"].append(target_vld
)
1073 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1074 update_ns_vld_target(target
, ns_params
)
1076 for vnfr
in db_vnfrs
.values():
1077 vnfd
= find_in_list(
1078 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1080 vnf_params
= find_in_list(
1081 get_iterable(ns_params
, "vnf"),
1082 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1084 target_vnf
= deepcopy(vnfr
)
1085 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1086 for vld
in target_vnf
.get("vld", ()):
1087 # check if connected to a ns.vld, to fill target'
1088 vnf_cp
= find_in_list(
1089 vnfd
.get("int-virtual-link-desc", ()),
1090 lambda cpd
: cpd
.get("id") == vld
["id"],
1093 ns_cp
= "member_vnf:{}.{}".format(
1094 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1096 if cp2target
.get(ns_cp
):
1097 vld
["target"] = cp2target
[ns_cp
]
1100 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1102 # check if this network needs SDN assist
1104 if vld
.get("pci-interfaces"):
1105 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1106 sdnc_id
= db_vim
["config"].get("sdn-controller")
1108 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1109 target_sdn
= "sdn:{}".format(sdnc_id
)
1110 vld
["vim_info"][target_sdn
] = {
1112 "target_vim": target_vim
,
1114 "type": vld
.get("type"),
1117 # check at vnfd descriptor, if there is an ip-profile
1119 vnfd_vlp
= find_in_list(
1120 get_virtual_link_profiles(vnfd
),
1121 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1125 and vnfd_vlp
.get("virtual-link-protocol-data")
1126 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1128 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1131 ip_profile_dest_data
= {}
1132 if "ip-version" in ip_profile_source_data
:
1133 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1136 if "cidr" in ip_profile_source_data
:
1137 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1140 if "gateway-ip" in ip_profile_source_data
:
1141 ip_profile_dest_data
[
1143 ] = ip_profile_source_data
["gateway-ip"]
1144 if "dhcp-enabled" in ip_profile_source_data
:
1145 ip_profile_dest_data
["dhcp-params"] = {
1146 "enabled": ip_profile_source_data
["dhcp-enabled"]
1149 vld_params
["ip-profile"] = ip_profile_dest_data
1150 # update vld_params with instantiation params
1152 vld_instantiation_params
= find_in_list(
1153 get_iterable(vnf_params
, "internal-vld"),
1154 lambda i_vld
: i_vld
["name"] == vld
["id"],
1156 if vld_instantiation_params
:
1157 vld_params
.update(vld_instantiation_params
)
1158 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1161 for vdur
in target_vnf
.get("vdur", ()):
1162 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1163 continue # This vdu must not be created
1164 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1166 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1169 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1170 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1173 and vdu_configuration
.get("config-access")
1174 and vdu_configuration
.get("config-access").get("ssh-access")
1176 vdur
["ssh-keys"] = ssh_keys_all
1177 vdur
["ssh-access-required"] = vdu_configuration
[
1179 ]["ssh-access"]["required"]
1182 and vnf_configuration
.get("config-access")
1183 and vnf_configuration
.get("config-access").get("ssh-access")
1184 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1186 vdur
["ssh-keys"] = ssh_keys_all
1187 vdur
["ssh-access-required"] = vnf_configuration
[
1189 ]["ssh-access"]["required"]
1190 elif ssh_keys_instantiation
and find_in_list(
1191 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1193 vdur
["ssh-keys"] = ssh_keys_instantiation
1195 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1197 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1199 if vdud
.get("cloud-init-file"):
1200 vdur
["cloud-init"] = "{}:file:{}".format(
1201 vnfd
["_id"], vdud
.get("cloud-init-file")
1203 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1204 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1205 base_folder
= vnfd
["_admin"]["storage"]
1206 if base_folder
["pkg-dir"]:
1207 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1208 base_folder
["folder"],
1209 base_folder
["pkg-dir"],
1210 vdud
.get("cloud-init-file"),
1213 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1214 base_folder
["folder"],
1215 vdud
.get("cloud-init-file"),
1217 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1218 target
["cloud_init_content"][
1221 elif vdud
.get("cloud-init"):
1222 vdur
["cloud-init"] = "{}:vdu:{}".format(
1223 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1225 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1226 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1229 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1230 deploy_params_vdu
= self
._format
_additional
_params
(
1231 vdur
.get("additionalParams") or {}
1233 deploy_params_vdu
["OSM"] = get_osm_params(
1234 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1236 vdur
["additionalParams"] = deploy_params_vdu
1239 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1240 if target_vim
not in ns_flavor
["vim_info"]:
1241 ns_flavor
["vim_info"][target_vim
] = {}
1244 # in case alternative images are provided we must check if they should be applied
1245 # for the vim_type, modify the vim_type taking into account
1246 ns_image_id
= int(vdur
["ns-image-id"])
1247 if vdur
.get("alt-image-ids"):
1248 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1249 vim_type
= db_vim
["vim_type"]
1250 for alt_image_id
in vdur
.get("alt-image-ids"):
1251 ns_alt_image
= target
["image"][int(alt_image_id
)]
1252 if vim_type
== ns_alt_image
.get("vim-type"):
1253 # must use alternative image
1255 "use alternative image id: {}".format(alt_image_id
)
1257 ns_image_id
= alt_image_id
1258 vdur
["ns-image-id"] = ns_image_id
1260 ns_image
= target
["image"][int(ns_image_id
)]
1261 if target_vim
not in ns_image
["vim_info"]:
1262 ns_image
["vim_info"][target_vim
] = {}
1265 if vdur
.get("affinity-or-anti-affinity-group-id"):
1266 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1267 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1268 if target_vim
not in ns_ags
["vim_info"]:
1269 ns_ags
["vim_info"][target_vim
] = {}
1271 vdur
["vim_info"] = {target_vim
: {}}
1272 # instantiation parameters
1274 vdu_instantiation_params
= find_in_list(
1275 get_iterable(vnf_params
, "vdu"),
1276 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1278 if vdu_instantiation_params
:
1279 # Parse the vdu_volumes from the instantiation params
1280 vdu_volumes
= get_volumes_from_instantiation_params(
1281 vdu_instantiation_params
, vdud
1283 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1284 vdur_list
.append(vdur
)
1285 target_vnf
["vdur"] = vdur_list
1286 target
["vnf"].append(target_vnf
)
1288 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1289 desc
= await self
.RO
.deploy(nsr_id
, target
)
1290 self
.logger
.debug("RO return > {}".format(desc
))
1291 action_id
= desc
["action_id"]
1292 await self
._wait
_ng
_ro
(
1293 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1294 operation
="instantiation"
1299 "_admin.deployed.RO.operational-status": "running",
1300 "detailed-status": " ".join(stage
),
1302 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1304 self
._write
_op
_status
(nslcmop_id
, stage
)
1306 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1310 async def _wait_ng_ro(
1320 detailed_status_old
= None
1322 start_time
= start_time
or time()
1323 while time() <= start_time
+ timeout
:
1324 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1325 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1326 if desc_status
["status"] == "FAILED":
1327 raise NgRoException(desc_status
["details"])
1328 elif desc_status
["status"] == "BUILD":
1330 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1331 elif desc_status
["status"] == "DONE":
1333 stage
[2] = "Deployed at VIM"
1336 assert False, "ROclient.check_ns_status returns unknown {}".format(
1337 desc_status
["status"]
1339 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1340 detailed_status_old
= stage
[2]
1341 db_nsr_update
["detailed-status"] = " ".join(stage
)
1342 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1343 self
._write
_op
_status
(nslcmop_id
, stage
)
1344 await asyncio
.sleep(15, loop
=self
.loop
)
1345 else: # timeout_ns_deploy
1346 raise NgRoException("Timeout waiting ns to deploy")
1348 async def _terminate_ng_ro(
1349 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1354 start_deploy
= time()
1361 "action_id": nslcmop_id
,
1363 desc
= await self
.RO
.deploy(nsr_id
, target
)
1364 action_id
= desc
["action_id"]
1365 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1366 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1369 + "ns terminate action at RO. action_id={}".format(action_id
)
1373 delete_timeout
= 20 * 60 # 20 minutes
1374 await self
._wait
_ng
_ro
(
1375 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1376 operation
="termination"
1379 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1380 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1382 await self
.RO
.delete(nsr_id
)
1383 except Exception as e
:
1384 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1385 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1386 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1387 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1389 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1391 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1392 failed_detail
.append("delete conflict: {}".format(e
))
1395 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1398 failed_detail
.append("delete error: {}".format(e
))
1401 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1405 stage
[2] = "Error deleting from VIM"
1407 stage
[2] = "Deleted from VIM"
1408 db_nsr_update
["detailed-status"] = " ".join(stage
)
1409 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1410 self
._write
_op
_status
(nslcmop_id
, stage
)
1413 raise LcmException("; ".join(failed_detail
))
1416 async def instantiate_RO(
1430 :param logging_text: preffix text to use at logging
1431 :param nsr_id: nsr identity
1432 :param nsd: database content of ns descriptor
1433 :param db_nsr: database content of ns record
1434 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1436 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1437 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1438 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1439 :return: None or exception
1442 start_deploy
= time()
1443 ns_params
= db_nslcmop
.get("operationParams")
1444 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1445 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1447 timeout_ns_deploy
= self
.timeout
.get(
1448 "ns_deploy", self
.timeout_ns_deploy
1451 # Check for and optionally request placement optimization. Database will be updated if placement activated
1452 stage
[2] = "Waiting for Placement."
1453 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1454 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1455 for vnfr
in db_vnfrs
.values():
1456 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1459 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1461 return await self
._instantiate
_ng
_ro
(
1474 except Exception as e
:
1475 stage
[2] = "ERROR deploying at VIM"
1476 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1478 "Error deploying at VIM {}".format(e
),
1479 exc_info
=not isinstance(
1482 ROclient
.ROClientException
,
1491 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1493 Wait for kdu to be up, get ip address
1494 :param logging_text: prefix use for logging
1498 :return: IP address, K8s services
1501 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1504 while nb_tries
< 360:
1505 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1509 for x
in get_iterable(db_vnfr
, "kdur")
1510 if x
.get("kdu-name") == kdu_name
1516 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1518 if kdur
.get("status"):
1519 if kdur
["status"] in ("READY", "ENABLED"):
1520 return kdur
.get("ip-address"), kdur
.get("services")
1523 "target KDU={} is in error state".format(kdu_name
)
1526 await asyncio
.sleep(10, loop
=self
.loop
)
1528 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1530 async def wait_vm_up_insert_key_ro(
1531 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1534 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1535 :param logging_text: prefix use for logging
1540 :param pub_key: public ssh key to inject, None to skip
1541 :param user: user to apply the public ssh key
1545 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1549 target_vdu_id
= None
1555 if ro_retries
>= 360: # 1 hour
1557 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1560 await asyncio
.sleep(10, loop
=self
.loop
)
1563 if not target_vdu_id
:
1564 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1566 if not vdu_id
: # for the VNF case
1567 if db_vnfr
.get("status") == "ERROR":
1569 "Cannot inject ssh-key because target VNF is in error state"
1571 ip_address
= db_vnfr
.get("ip-address")
1577 for x
in get_iterable(db_vnfr
, "vdur")
1578 if x
.get("ip-address") == ip_address
1586 for x
in get_iterable(db_vnfr
, "vdur")
1587 if x
.get("vdu-id-ref") == vdu_id
1588 and x
.get("count-index") == vdu_index
1594 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1595 ): # If only one, this should be the target vdu
1596 vdur
= db_vnfr
["vdur"][0]
1599 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1600 vnfr_id
, vdu_id
, vdu_index
1603 # New generation RO stores information at "vim_info"
1606 if vdur
.get("vim_info"):
1608 t
for t
in vdur
["vim_info"]
1609 ) # there should be only one key
1610 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1612 vdur
.get("pdu-type")
1613 or vdur
.get("status") == "ACTIVE"
1614 or ng_ro_status
== "ACTIVE"
1616 ip_address
= vdur
.get("ip-address")
1619 target_vdu_id
= vdur
["vdu-id-ref"]
1620 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1622 "Cannot inject ssh-key because target VM is in error state"
1625 if not target_vdu_id
:
1628 # inject public key into machine
1629 if pub_key
and user
:
1630 self
.logger
.debug(logging_text
+ "Inserting RO key")
1631 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1632 if vdur
.get("pdu-type"):
1633 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1636 ro_vm_id
= "{}-{}".format(
1637 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1638 ) # TODO add vdu_index
1642 "action": "inject_ssh_key",
1646 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1648 desc
= await self
.RO
.deploy(nsr_id
, target
)
1649 action_id
= desc
["action_id"]
1650 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1653 # wait until NS is deployed at RO
1655 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1656 ro_nsr_id
= deep_get(
1657 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1661 result_dict
= await self
.RO
.create_action(
1663 item_id_name
=ro_nsr_id
,
1665 "add_public_key": pub_key
,
1670 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1671 if not result_dict
or not isinstance(result_dict
, dict):
1673 "Unknown response from RO when injecting key"
1675 for result
in result_dict
.values():
1676 if result
.get("vim_result") == 200:
1679 raise ROclient
.ROClientException(
1680 "error injecting key: {}".format(
1681 result
.get("description")
1685 except NgRoException
as e
:
1687 "Reaching max tries injecting key. Error: {}".format(e
)
1689 except ROclient
.ROClientException
as e
:
1693 + "error injecting key: {}. Retrying until {} seconds".format(
1700 "Reaching max tries injecting key. Error: {}".format(e
)
1707 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1709 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1711 my_vca
= vca_deployed_list
[vca_index
]
1712 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1713 # vdu or kdu: no dependencies
1717 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1718 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1719 configuration_status_list
= db_nsr
["configurationStatus"]
1720 for index
, vca_deployed
in enumerate(configuration_status_list
):
1721 if index
== vca_index
:
1724 if not my_vca
.get("member-vnf-index") or (
1725 vca_deployed
.get("member-vnf-index")
1726 == my_vca
.get("member-vnf-index")
1728 internal_status
= configuration_status_list
[index
].get("status")
1729 if internal_status
== "READY":
1731 elif internal_status
== "BROKEN":
1733 "Configuration aborted because dependent charm/s has failed"
1738 # no dependencies, return
1740 await asyncio
.sleep(10)
1743 raise LcmException("Configuration aborted because dependent charm/s timeout")
1745 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1748 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1750 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1751 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1754 async def instantiate_N2VC(
1771 ee_config_descriptor
,
1773 nsr_id
= db_nsr
["_id"]
1774 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1775 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1776 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1777 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1779 "collection": "nsrs",
1780 "filter": {"_id": nsr_id
},
1781 "path": db_update_entry
,
1787 element_under_configuration
= nsr_id
1791 vnfr_id
= db_vnfr
["_id"]
1792 osm_config
["osm"]["vnf_id"] = vnfr_id
1794 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1796 if vca_type
== "native_charm":
1799 index_number
= vdu_index
or 0
1802 element_type
= "VNF"
1803 element_under_configuration
= vnfr_id
1804 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1806 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1807 element_type
= "VDU"
1808 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1809 osm_config
["osm"]["vdu_id"] = vdu_id
1811 namespace
+= ".{}".format(kdu_name
)
1812 element_type
= "KDU"
1813 element_under_configuration
= kdu_name
1814 osm_config
["osm"]["kdu_name"] = kdu_name
1817 if base_folder
["pkg-dir"]:
1818 artifact_path
= "{}/{}/{}/{}".format(
1819 base_folder
["folder"],
1820 base_folder
["pkg-dir"],
1823 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1828 artifact_path
= "{}/Scripts/{}/{}/".format(
1829 base_folder
["folder"],
1832 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1837 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1839 # get initial_config_primitive_list that applies to this element
1840 initial_config_primitive_list
= config_descriptor
.get(
1841 "initial-config-primitive"
1845 "Initial config primitive list > {}".format(
1846 initial_config_primitive_list
1850 # add config if not present for NS charm
1851 ee_descriptor_id
= ee_config_descriptor
.get("id")
1852 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1853 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1854 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1858 "Initial config primitive list #2 > {}".format(
1859 initial_config_primitive_list
1862 # n2vc_redesign STEP 3.1
1863 # find old ee_id if exists
1864 ee_id
= vca_deployed
.get("ee_id")
1866 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1867 # create or register execution environment in VCA
1868 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1870 self
._write
_configuration
_status
(
1872 vca_index
=vca_index
,
1874 element_under_configuration
=element_under_configuration
,
1875 element_type
=element_type
,
1878 step
= "create execution environment"
1879 self
.logger
.debug(logging_text
+ step
)
1883 if vca_type
== "k8s_proxy_charm":
1884 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1885 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1886 namespace
=namespace
,
1887 artifact_path
=artifact_path
,
1891 elif vca_type
== "helm" or vca_type
== "helm-v3":
1892 ee_id
, credentials
= await self
.vca_map
[
1894 ].create_execution_environment(
1895 namespace
=namespace
,
1899 artifact_path
=artifact_path
,
1903 ee_id
, credentials
= await self
.vca_map
[
1905 ].create_execution_environment(
1906 namespace
=namespace
,
1912 elif vca_type
== "native_charm":
1913 step
= "Waiting to VM being up and getting IP address"
1914 self
.logger
.debug(logging_text
+ step
)
1915 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1924 credentials
= {"hostname": rw_mgmt_ip
}
1926 username
= deep_get(
1927 config_descriptor
, ("config-access", "ssh-access", "default-user")
1929 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1930 # merged. Meanwhile let's get username from initial-config-primitive
1931 if not username
and initial_config_primitive_list
:
1932 for config_primitive
in initial_config_primitive_list
:
1933 for param
in config_primitive
.get("parameter", ()):
1934 if param
["name"] == "ssh-username":
1935 username
= param
["value"]
1939 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1940 "'config-access.ssh-access.default-user'"
1942 credentials
["username"] = username
1943 # n2vc_redesign STEP 3.2
1945 self
._write
_configuration
_status
(
1947 vca_index
=vca_index
,
1948 status
="REGISTERING",
1949 element_under_configuration
=element_under_configuration
,
1950 element_type
=element_type
,
1953 step
= "register execution environment {}".format(credentials
)
1954 self
.logger
.debug(logging_text
+ step
)
1955 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1956 credentials
=credentials
,
1957 namespace
=namespace
,
1962 # for compatibility with MON/POL modules, the need model and application name at database
1963 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1964 ee_id_parts
= ee_id
.split(".")
1965 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1966 if len(ee_id_parts
) >= 2:
1967 model_name
= ee_id_parts
[0]
1968 application_name
= ee_id_parts
[1]
1969 db_nsr_update
[db_update_entry
+ "model"] = model_name
1970 db_nsr_update
[db_update_entry
+ "application"] = application_name
1972 # n2vc_redesign STEP 3.3
1973 step
= "Install configuration Software"
1975 self
._write
_configuration
_status
(
1977 vca_index
=vca_index
,
1978 status
="INSTALLING SW",
1979 element_under_configuration
=element_under_configuration
,
1980 element_type
=element_type
,
1981 other_update
=db_nsr_update
,
1984 # TODO check if already done
1985 self
.logger
.debug(logging_text
+ step
)
1987 if vca_type
== "native_charm":
1988 config_primitive
= next(
1989 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1992 if config_primitive
:
1993 config
= self
._map
_primitive
_params
(
1994 config_primitive
, {}, deploy_params
1997 if vca_type
== "lxc_proxy_charm":
1998 if element_type
== "NS":
1999 num_units
= db_nsr
.get("config-units") or 1
2000 elif element_type
== "VNF":
2001 num_units
= db_vnfr
.get("config-units") or 1
2002 elif element_type
== "VDU":
2003 for v
in db_vnfr
["vdur"]:
2004 if vdu_id
== v
["vdu-id-ref"]:
2005 num_units
= v
.get("config-units") or 1
2007 if vca_type
!= "k8s_proxy_charm":
2008 await self
.vca_map
[vca_type
].install_configuration_sw(
2010 artifact_path
=artifact_path
,
2013 num_units
=num_units
,
2018 # write in db flag of configuration_sw already installed
2020 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2023 # add relations for this VCA (wait for other peers related with this VCA)
2024 await self
._add
_vca
_relations
(
2025 logging_text
=logging_text
,
2028 vca_index
=vca_index
,
2031 # if SSH access is required, then get execution environment SSH public
2032 # if native charm we have waited already to VM be UP
2033 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2036 # self.logger.debug("get ssh key block")
2038 config_descriptor
, ("config-access", "ssh-access", "required")
2040 # self.logger.debug("ssh key needed")
2041 # Needed to inject a ssh key
2044 ("config-access", "ssh-access", "default-user"),
2046 step
= "Install configuration Software, getting public ssh key"
2047 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2048 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2051 step
= "Insert public key into VM user={} ssh_key={}".format(
2055 # self.logger.debug("no need to get ssh key")
2056 step
= "Waiting to VM being up and getting IP address"
2057 self
.logger
.debug(logging_text
+ step
)
2059 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2062 # n2vc_redesign STEP 5.1
2063 # wait for RO (ip-address) Insert pub_key into VM
2066 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2067 logging_text
, nsr_id
, vnfr_id
, kdu_name
2069 vnfd
= self
.db
.get_one(
2071 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2073 kdu
= get_kdu(vnfd
, kdu_name
)
2075 service
["name"] for service
in get_kdu_services(kdu
)
2077 exposed_services
= []
2078 for service
in services
:
2079 if any(s
in service
["name"] for s
in kdu_services
):
2080 exposed_services
.append(service
)
2081 await self
.vca_map
[vca_type
].exec_primitive(
2083 primitive_name
="config",
2085 "osm-config": json
.dumps(
2087 k8s
={"services": exposed_services
}
2094 # This verification is needed in order to avoid trying to add a public key
2095 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2096 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2097 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2099 elif db_vnfr
.get('vdur'):
2100 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2110 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2112 # store rw_mgmt_ip in deploy params for later replacement
2113 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2115 # n2vc_redesign STEP 6 Execute initial config primitive
2116 step
= "execute initial config primitive"
2118 # wait for dependent primitives execution (NS -> VNF -> VDU)
2119 if initial_config_primitive_list
:
2120 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2122 # stage, in function of element type: vdu, kdu, vnf or ns
2123 my_vca
= vca_deployed_list
[vca_index
]
2124 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2126 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2127 elif my_vca
.get("member-vnf-index"):
2129 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2132 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2134 self
._write
_configuration
_status
(
2135 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2138 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2140 check_if_terminated_needed
= True
2141 for initial_config_primitive
in initial_config_primitive_list
:
2142 # adding information on the vca_deployed if it is a NS execution environment
2143 if not vca_deployed
["member-vnf-index"]:
2144 deploy_params
["ns_config_info"] = json
.dumps(
2145 self
._get
_ns
_config
_info
(nsr_id
)
2147 # TODO check if already done
2148 primitive_params_
= self
._map
_primitive
_params
(
2149 initial_config_primitive
, {}, deploy_params
2152 step
= "execute primitive '{}' params '{}'".format(
2153 initial_config_primitive
["name"], primitive_params_
2155 self
.logger
.debug(logging_text
+ step
)
2156 await self
.vca_map
[vca_type
].exec_primitive(
2158 primitive_name
=initial_config_primitive
["name"],
2159 params_dict
=primitive_params_
,
2164 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2165 if check_if_terminated_needed
:
2166 if config_descriptor
.get("terminate-config-primitive"):
2168 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2170 check_if_terminated_needed
= False
2172 # TODO register in database that primitive is done
2174 # STEP 7 Configure metrics
2175 if vca_type
== "helm" or vca_type
== "helm-v3":
2176 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2178 artifact_path
=artifact_path
,
2179 ee_config_descriptor
=ee_config_descriptor
,
2182 target_ip
=rw_mgmt_ip
,
2188 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2191 for job
in prometheus_jobs
:
2194 {"job_name": job
["job_name"]},
2197 fail_on_empty
=False,
2200 step
= "instantiated at VCA"
2201 self
.logger
.debug(logging_text
+ step
)
2203 self
._write
_configuration
_status
(
2204 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2207 except Exception as e
: # TODO not use Exception but N2VC exception
2208 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2210 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2213 "Exception while {} : {}".format(step
, e
), exc_info
=True
2215 self
._write
_configuration
_status
(
2216 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2218 raise LcmException("{} {}".format(step
, e
)) from e
2220 def _write_ns_status(
2224 current_operation
: str,
2225 current_operation_id
: str,
2226 error_description
: str = None,
2227 error_detail
: str = None,
2228 other_update
: dict = None,
2231 Update db_nsr fields.
2234 :param current_operation:
2235 :param current_operation_id:
2236 :param error_description:
2237 :param error_detail:
2238 :param other_update: Other required changes at database if provided, will be cleared
2242 db_dict
= other_update
or {}
2245 ] = current_operation_id
# for backward compatibility
2246 db_dict
["_admin.current-operation"] = current_operation_id
2247 db_dict
["_admin.operation-type"] = (
2248 current_operation
if current_operation
!= "IDLE" else None
2250 db_dict
["currentOperation"] = current_operation
2251 db_dict
["currentOperationID"] = current_operation_id
2252 db_dict
["errorDescription"] = error_description
2253 db_dict
["errorDetail"] = error_detail
2256 db_dict
["nsState"] = ns_state
2257 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2258 except DbException
as e
:
2259 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2261 def _write_op_status(
2265 error_message
: str = None,
2266 queuePosition
: int = 0,
2267 operation_state
: str = None,
2268 other_update
: dict = None,
2271 db_dict
= other_update
or {}
2272 db_dict
["queuePosition"] = queuePosition
2273 if isinstance(stage
, list):
2274 db_dict
["stage"] = stage
[0]
2275 db_dict
["detailed-status"] = " ".join(stage
)
2276 elif stage
is not None:
2277 db_dict
["stage"] = str(stage
)
2279 if error_message
is not None:
2280 db_dict
["errorMessage"] = error_message
2281 if operation_state
is not None:
2282 db_dict
["operationState"] = operation_state
2283 db_dict
["statusEnteredTime"] = time()
2284 self
.update_db_2("nslcmops", op_id
, db_dict
)
2285 except DbException
as e
:
2287 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2290 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2292 nsr_id
= db_nsr
["_id"]
2293 # configurationStatus
2294 config_status
= db_nsr
.get("configurationStatus")
2297 "configurationStatus.{}.status".format(index
): status
2298 for index
, v
in enumerate(config_status
)
2302 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2304 except DbException
as e
:
2306 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2309 def _write_configuration_status(
2314 element_under_configuration
: str = None,
2315 element_type
: str = None,
2316 other_update
: dict = None,
2319 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2320 # .format(vca_index, status))
2323 db_path
= "configurationStatus.{}.".format(vca_index
)
2324 db_dict
= other_update
or {}
2326 db_dict
[db_path
+ "status"] = status
2327 if element_under_configuration
:
2329 db_path
+ "elementUnderConfiguration"
2330 ] = element_under_configuration
2332 db_dict
[db_path
+ "elementType"] = element_type
2333 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2334 except DbException
as e
:
2336 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2337 status
, nsr_id
, vca_index
, e
2341 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2343 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2344 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2345 Database is used because the result can be obtained from a different LCM worker in case of HA.
2346 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2347 :param db_nslcmop: database content of nslcmop
2348 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2349 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2350 computed 'vim-account-id'
2353 nslcmop_id
= db_nslcmop
["_id"]
2354 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2355 if placement_engine
== "PLA":
2357 logging_text
+ "Invoke and wait for placement optimization"
2359 await self
.msg
.aiowrite(
2360 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2362 db_poll_interval
= 5
2363 wait
= db_poll_interval
* 10
2365 while not pla_result
and wait
>= 0:
2366 await asyncio
.sleep(db_poll_interval
)
2367 wait
-= db_poll_interval
2368 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2369 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2373 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2376 for pla_vnf
in pla_result
["vnf"]:
2377 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2378 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2383 {"_id": vnfr
["_id"]},
2384 {"vim-account-id": pla_vnf
["vimAccountId"]},
2387 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2390 def update_nsrs_with_pla_result(self
, params
):
2392 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2394 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2396 except Exception as e
:
2397 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2399 async def instantiate(self
, nsr_id
, nslcmop_id
):
2402 :param nsr_id: ns instance to deploy
2403 :param nslcmop_id: operation to run
2407 # Try to lock HA task here
2408 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2409 if not task_is_locked_by_me
:
2411 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2415 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2416 self
.logger
.debug(logging_text
+ "Enter")
2418 # get all needed from database
2420 # database nsrs record
2423 # database nslcmops record
2426 # update operation on nsrs
2428 # update operation on nslcmops
2429 db_nslcmop_update
= {}
2431 nslcmop_operation_state
= None
2432 db_vnfrs
= {} # vnf's info indexed by member-index
2434 tasks_dict_info
= {} # from task to info text
2438 "Stage 1/5: preparation of the environment.",
2439 "Waiting for previous operations to terminate.",
2442 # ^ stage, step, VIM progress
2444 # wait for any previous tasks in process
2445 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2447 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2448 stage
[1] = "Reading from database."
2449 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2450 db_nsr_update
["detailed-status"] = "creating"
2451 db_nsr_update
["operational-status"] = "init"
2452 self
._write
_ns
_status
(
2454 ns_state
="BUILDING",
2455 current_operation
="INSTANTIATING",
2456 current_operation_id
=nslcmop_id
,
2457 other_update
=db_nsr_update
,
2459 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2461 # read from db: operation
2462 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2463 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2464 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2465 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2466 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2468 ns_params
= db_nslcmop
.get("operationParams")
2469 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2470 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2472 timeout_ns_deploy
= self
.timeout
.get(
2473 "ns_deploy", self
.timeout_ns_deploy
2477 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2478 self
.logger
.debug(logging_text
+ stage
[1])
2479 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2480 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2481 self
.logger
.debug(logging_text
+ stage
[1])
2482 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2483 self
.fs
.sync(db_nsr
["nsd-id"])
2485 # nsr_name = db_nsr["name"] # TODO short-name??
2487 # read from db: vnf's of this ns
2488 stage
[1] = "Getting vnfrs from db."
2489 self
.logger
.debug(logging_text
+ stage
[1])
2490 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2492 # read from db: vnfd's for every vnf
2493 db_vnfds
= [] # every vnfd data
2495 # for each vnf in ns, read vnfd
2496 for vnfr
in db_vnfrs_list
:
2497 if vnfr
.get("kdur"):
2499 for kdur
in vnfr
["kdur"]:
2500 if kdur
.get("additionalParams"):
2501 kdur
["additionalParams"] = json
.loads(
2502 kdur
["additionalParams"]
2504 kdur_list
.append(kdur
)
2505 vnfr
["kdur"] = kdur_list
2507 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2508 vnfd_id
= vnfr
["vnfd-id"]
2509 vnfd_ref
= vnfr
["vnfd-ref"]
2510 self
.fs
.sync(vnfd_id
)
2512 # if we haven't this vnfd, read it from db
2513 if vnfd_id
not in db_vnfds
:
2515 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2518 self
.logger
.debug(logging_text
+ stage
[1])
2519 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2522 db_vnfds
.append(vnfd
)
2524 # Get or generates the _admin.deployed.VCA list
2525 vca_deployed_list
= None
2526 if db_nsr
["_admin"].get("deployed"):
2527 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2528 if vca_deployed_list
is None:
2529 vca_deployed_list
= []
2530 configuration_status_list
= []
2531 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2532 db_nsr_update
["configurationStatus"] = configuration_status_list
2533 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2534 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2535 elif isinstance(vca_deployed_list
, dict):
2536 # maintain backward compatibility. Change a dict to list at database
2537 vca_deployed_list
= list(vca_deployed_list
.values())
2538 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2539 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2542 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2544 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2545 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2547 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2548 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2549 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2551 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2554 # n2vc_redesign STEP 2 Deploy Network Scenario
2555 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2556 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2558 stage
[1] = "Deploying KDUs."
2559 # self.logger.debug(logging_text + "Before deploy_kdus")
2560 # Call to deploy_kdus in case exists the "vdu:kdu" param
2561 await self
.deploy_kdus(
2562 logging_text
=logging_text
,
2564 nslcmop_id
=nslcmop_id
,
2567 task_instantiation_info
=tasks_dict_info
,
2570 stage
[1] = "Getting VCA public key."
2571 # n2vc_redesign STEP 1 Get VCA public ssh-key
2572 # feature 1429. Add n2vc public key to needed VMs
2573 n2vc_key
= self
.n2vc
.get_public_key()
2574 n2vc_key_list
= [n2vc_key
]
2575 if self
.vca_config
.get("public_key"):
2576 n2vc_key_list
.append(self
.vca_config
["public_key"])
2578 stage
[1] = "Deploying NS at VIM."
2579 task_ro
= asyncio
.ensure_future(
2580 self
.instantiate_RO(
2581 logging_text
=logging_text
,
2585 db_nslcmop
=db_nslcmop
,
2588 n2vc_key_list
=n2vc_key_list
,
2592 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2593 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2595 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2596 stage
[1] = "Deploying Execution Environments."
2597 self
.logger
.debug(logging_text
+ stage
[1])
2599 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2600 for vnf_profile
in get_vnf_profiles(nsd
):
2601 vnfd_id
= vnf_profile
["vnfd-id"]
2602 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2603 member_vnf_index
= str(vnf_profile
["id"])
2604 db_vnfr
= db_vnfrs
[member_vnf_index
]
2605 base_folder
= vnfd
["_admin"]["storage"]
2611 # Get additional parameters
2612 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2613 if db_vnfr
.get("additionalParamsForVnf"):
2614 deploy_params
.update(
2615 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2618 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2619 if descriptor_config
:
2621 logging_text
=logging_text
2622 + "member_vnf_index={} ".format(member_vnf_index
),
2625 nslcmop_id
=nslcmop_id
,
2631 member_vnf_index
=member_vnf_index
,
2632 vdu_index
=vdu_index
,
2634 deploy_params
=deploy_params
,
2635 descriptor_config
=descriptor_config
,
2636 base_folder
=base_folder
,
2637 task_instantiation_info
=tasks_dict_info
,
2641 # Deploy charms for each VDU that supports one.
2642 for vdud
in get_vdu_list(vnfd
):
2644 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2645 vdur
= find_in_list(
2646 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2649 if vdur
.get("additionalParams"):
2650 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2652 deploy_params_vdu
= deploy_params
2653 deploy_params_vdu
["OSM"] = get_osm_params(
2654 db_vnfr
, vdu_id
, vdu_count_index
=0
2656 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2658 self
.logger
.debug("VDUD > {}".format(vdud
))
2660 "Descriptor config > {}".format(descriptor_config
)
2662 if descriptor_config
:
2665 for vdu_index
in range(vdud_count
):
2666 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2668 logging_text
=logging_text
2669 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2670 member_vnf_index
, vdu_id
, vdu_index
2674 nslcmop_id
=nslcmop_id
,
2680 member_vnf_index
=member_vnf_index
,
2681 vdu_index
=vdu_index
,
2683 deploy_params
=deploy_params_vdu
,
2684 descriptor_config
=descriptor_config
,
2685 base_folder
=base_folder
,
2686 task_instantiation_info
=tasks_dict_info
,
2689 for kdud
in get_kdu_list(vnfd
):
2690 kdu_name
= kdud
["name"]
2691 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2692 if descriptor_config
:
2697 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2699 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2700 if kdur
.get("additionalParams"):
2701 deploy_params_kdu
.update(
2702 parse_yaml_strings(kdur
["additionalParams"].copy())
2706 logging_text
=logging_text
,
2709 nslcmop_id
=nslcmop_id
,
2715 member_vnf_index
=member_vnf_index
,
2716 vdu_index
=vdu_index
,
2718 deploy_params
=deploy_params_kdu
,
2719 descriptor_config
=descriptor_config
,
2720 base_folder
=base_folder
,
2721 task_instantiation_info
=tasks_dict_info
,
2725 # Check if this NS has a charm configuration
2726 descriptor_config
= nsd
.get("ns-configuration")
2727 if descriptor_config
and descriptor_config
.get("juju"):
2730 member_vnf_index
= None
2736 # Get additional parameters
2737 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2738 if db_nsr
.get("additionalParamsForNs"):
2739 deploy_params
.update(
2740 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2742 base_folder
= nsd
["_admin"]["storage"]
2744 logging_text
=logging_text
,
2747 nslcmop_id
=nslcmop_id
,
2753 member_vnf_index
=member_vnf_index
,
2754 vdu_index
=vdu_index
,
2756 deploy_params
=deploy_params
,
2757 descriptor_config
=descriptor_config
,
2758 base_folder
=base_folder
,
2759 task_instantiation_info
=tasks_dict_info
,
2763 # rest of staff will be done at finally
2766 ROclient
.ROClientException
,
2772 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2775 except asyncio
.CancelledError
:
2777 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2779 exc
= "Operation was cancelled"
2780 except Exception as e
:
2781 exc
= traceback
.format_exc()
2782 self
.logger
.critical(
2783 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2788 error_list
.append(str(exc
))
2790 # wait for pending tasks
2792 stage
[1] = "Waiting for instantiate pending tasks."
2793 self
.logger
.debug(logging_text
+ stage
[1])
2794 error_list
+= await self
._wait
_for
_tasks
(
2802 stage
[1] = stage
[2] = ""
2803 except asyncio
.CancelledError
:
2804 error_list
.append("Cancelled")
2805 # TODO cancel all tasks
2806 except Exception as exc
:
2807 error_list
.append(str(exc
))
2809 # update operation-status
2810 db_nsr_update
["operational-status"] = "running"
2811 # let's begin with VCA 'configured' status (later we can change it)
2812 db_nsr_update
["config-status"] = "configured"
2813 for task
, task_name
in tasks_dict_info
.items():
2814 if not task
.done() or task
.cancelled() or task
.exception():
2815 if task_name
.startswith(self
.task_name_deploy_vca
):
2816 # A N2VC task is pending
2817 db_nsr_update
["config-status"] = "failed"
2819 # RO or KDU task is pending
2820 db_nsr_update
["operational-status"] = "failed"
2822 # update status at database
2824 error_detail
= ". ".join(error_list
)
2825 self
.logger
.error(logging_text
+ error_detail
)
2826 error_description_nslcmop
= "{} Detail: {}".format(
2827 stage
[0], error_detail
2829 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2830 nslcmop_id
, stage
[0]
2833 db_nsr_update
["detailed-status"] = (
2834 error_description_nsr
+ " Detail: " + error_detail
2836 db_nslcmop_update
["detailed-status"] = error_detail
2837 nslcmop_operation_state
= "FAILED"
2841 error_description_nsr
= error_description_nslcmop
= None
2843 db_nsr_update
["detailed-status"] = "Done"
2844 db_nslcmop_update
["detailed-status"] = "Done"
2845 nslcmop_operation_state
= "COMPLETED"
2848 self
._write
_ns
_status
(
2851 current_operation
="IDLE",
2852 current_operation_id
=None,
2853 error_description
=error_description_nsr
,
2854 error_detail
=error_detail
,
2855 other_update
=db_nsr_update
,
2857 self
._write
_op
_status
(
2860 error_message
=error_description_nslcmop
,
2861 operation_state
=nslcmop_operation_state
,
2862 other_update
=db_nslcmop_update
,
2865 if nslcmop_operation_state
:
2867 await self
.msg
.aiowrite(
2872 "nslcmop_id": nslcmop_id
,
2873 "operationState": nslcmop_operation_state
,
2877 except Exception as e
:
2879 logging_text
+ "kafka_write notification Exception {}".format(e
)
2882 self
.logger
.debug(logging_text
+ "Exit")
2883 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2885 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2886 if vnfd_id
not in cached_vnfds
:
2887 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2888 return cached_vnfds
[vnfd_id
]
2890 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2891 if vnf_profile_id
not in cached_vnfrs
:
2892 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2895 "member-vnf-index-ref": vnf_profile_id
,
2896 "nsr-id-ref": nsr_id
,
2899 return cached_vnfrs
[vnf_profile_id
]
2901 def _is_deployed_vca_in_relation(
2902 self
, vca
: DeployedVCA
, relation
: Relation
2905 for endpoint
in (relation
.provider
, relation
.requirer
):
2906 if endpoint
["kdu-resource-profile-id"]:
2909 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2910 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2911 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2917 def _update_ee_relation_data_with_implicit_data(
2918 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2920 ee_relation_data
= safe_get_ee_relation(
2921 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2923 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2924 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2925 "execution-environment-ref"
2927 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2928 vnfd_id
= vnf_profile
["vnfd-id"]
2929 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2932 if ee_relation_level
== EELevel
.VNF
2933 else ee_relation_data
["vdu-profile-id"]
2935 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2938 f
"not execution environments found for ee_relation {ee_relation_data}"
2940 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2941 return ee_relation_data
2943 def _get_ns_relations(
2946 nsd
: Dict
[str, Any
],
2948 cached_vnfds
: Dict
[str, Any
],
2949 ) -> List
[Relation
]:
2951 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2952 for r
in db_ns_relations
:
2953 provider_dict
= None
2954 requirer_dict
= None
2955 if all(key
in r
for key
in ("provider", "requirer")):
2956 provider_dict
= r
["provider"]
2957 requirer_dict
= r
["requirer"]
2958 elif "entities" in r
:
2959 provider_id
= r
["entities"][0]["id"]
2962 "endpoint": r
["entities"][0]["endpoint"],
2964 if provider_id
!= nsd
["id"]:
2965 provider_dict
["vnf-profile-id"] = provider_id
2966 requirer_id
= r
["entities"][1]["id"]
2969 "endpoint": r
["entities"][1]["endpoint"],
2971 if requirer_id
!= nsd
["id"]:
2972 requirer_dict
["vnf-profile-id"] = requirer_id
2975 "provider/requirer or entities must be included in the relation."
2977 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2978 nsr_id
, nsd
, provider_dict
, cached_vnfds
2980 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2981 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2983 provider
= EERelation(relation_provider
)
2984 requirer
= EERelation(relation_requirer
)
2985 relation
= Relation(r
["name"], provider
, requirer
)
2986 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2988 relations
.append(relation
)
2991 def _get_vnf_relations(
2994 nsd
: Dict
[str, Any
],
2996 cached_vnfds
: Dict
[str, Any
],
2997 ) -> List
[Relation
]:
2999 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3000 vnf_profile_id
= vnf_profile
["id"]
3001 vnfd_id
= vnf_profile
["vnfd-id"]
3002 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3003 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3004 for r
in db_vnf_relations
:
3005 provider_dict
= None
3006 requirer_dict
= None
3007 if all(key
in r
for key
in ("provider", "requirer")):
3008 provider_dict
= r
["provider"]
3009 requirer_dict
= r
["requirer"]
3010 elif "entities" in r
:
3011 provider_id
= r
["entities"][0]["id"]
3014 "vnf-profile-id": vnf_profile_id
,
3015 "endpoint": r
["entities"][0]["endpoint"],
3017 if provider_id
!= vnfd_id
:
3018 provider_dict
["vdu-profile-id"] = provider_id
3019 requirer_id
= r
["entities"][1]["id"]
3022 "vnf-profile-id": vnf_profile_id
,
3023 "endpoint": r
["entities"][1]["endpoint"],
3025 if requirer_id
!= vnfd_id
:
3026 requirer_dict
["vdu-profile-id"] = requirer_id
3029 "provider/requirer or entities must be included in the relation."
3031 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3032 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3034 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3035 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3037 provider
= EERelation(relation_provider
)
3038 requirer
= EERelation(relation_requirer
)
3039 relation
= Relation(r
["name"], provider
, requirer
)
3040 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3042 relations
.append(relation
)
3045 def _get_kdu_resource_data(
3047 ee_relation
: EERelation
,
3048 db_nsr
: Dict
[str, Any
],
3049 cached_vnfds
: Dict
[str, Any
],
3050 ) -> DeployedK8sResource
:
3051 nsd
= get_nsd(db_nsr
)
3052 vnf_profiles
= get_vnf_profiles(nsd
)
3053 vnfd_id
= find_in_list(
3055 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3057 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3058 kdu_resource_profile
= get_kdu_resource_profile(
3059 db_vnfd
, ee_relation
.kdu_resource_profile_id
3061 kdu_name
= kdu_resource_profile
["kdu-name"]
3062 deployed_kdu
, _
= get_deployed_kdu(
3063 db_nsr
.get("_admin", ()).get("deployed", ()),
3065 ee_relation
.vnf_profile_id
,
3067 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3070 def _get_deployed_component(
3072 ee_relation
: EERelation
,
3073 db_nsr
: Dict
[str, Any
],
3074 cached_vnfds
: Dict
[str, Any
],
3075 ) -> DeployedComponent
:
3076 nsr_id
= db_nsr
["_id"]
3077 deployed_component
= None
3078 ee_level
= EELevel
.get_level(ee_relation
)
3079 if ee_level
== EELevel
.NS
:
3080 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3082 deployed_component
= DeployedVCA(nsr_id
, vca
)
3083 elif ee_level
== EELevel
.VNF
:
3084 vca
= get_deployed_vca(
3088 "member-vnf-index": ee_relation
.vnf_profile_id
,
3089 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3093 deployed_component
= DeployedVCA(nsr_id
, vca
)
3094 elif ee_level
== EELevel
.VDU
:
3095 vca
= get_deployed_vca(
3098 "vdu_id": ee_relation
.vdu_profile_id
,
3099 "member-vnf-index": ee_relation
.vnf_profile_id
,
3100 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3104 deployed_component
= DeployedVCA(nsr_id
, vca
)
3105 elif ee_level
== EELevel
.KDU
:
3106 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3107 ee_relation
, db_nsr
, cached_vnfds
3109 if kdu_resource_data
:
3110 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3111 return deployed_component
3113 async def _add_relation(
3117 db_nsr
: Dict
[str, Any
],
3118 cached_vnfds
: Dict
[str, Any
],
3119 cached_vnfrs
: Dict
[str, Any
],
3121 deployed_provider
= self
._get
_deployed
_component
(
3122 relation
.provider
, db_nsr
, cached_vnfds
3124 deployed_requirer
= self
._get
_deployed
_component
(
3125 relation
.requirer
, db_nsr
, cached_vnfds
3129 and deployed_requirer
3130 and deployed_provider
.config_sw_installed
3131 and deployed_requirer
.config_sw_installed
3133 provider_db_vnfr
= (
3135 relation
.provider
.nsr_id
,
3136 relation
.provider
.vnf_profile_id
,
3139 if relation
.provider
.vnf_profile_id
3142 requirer_db_vnfr
= (
3144 relation
.requirer
.nsr_id
,
3145 relation
.requirer
.vnf_profile_id
,
3148 if relation
.requirer
.vnf_profile_id
3151 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3152 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3153 provider_relation_endpoint
= RelationEndpoint(
3154 deployed_provider
.ee_id
,
3156 relation
.provider
.endpoint
,
3158 requirer_relation_endpoint
= RelationEndpoint(
3159 deployed_requirer
.ee_id
,
3161 relation
.requirer
.endpoint
,
3163 await self
.vca_map
[vca_type
].add_relation(
3164 provider
=provider_relation_endpoint
,
3165 requirer
=requirer_relation_endpoint
,
3167 # remove entry from relations list
3171 async def _add_vca_relations(
3177 timeout
: int = 3600,
3181 # 1. find all relations for this VCA
3182 # 2. wait for other peers related
3186 # STEP 1: find all relations for this VCA
3189 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3190 nsd
= get_nsd(db_nsr
)
3193 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3194 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3199 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3200 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3202 # if no relations, terminate
3204 self
.logger
.debug(logging_text
+ " No relations")
3207 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3214 if now
- start
>= timeout
:
3215 self
.logger
.error(logging_text
+ " : timeout adding relations")
3218 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3219 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3221 # for each relation, find the VCA's related
3222 for relation
in relations
.copy():
3223 added
= await self
._add
_relation
(
3231 relations
.remove(relation
)
3234 self
.logger
.debug("Relations added")
3236 await asyncio
.sleep(5.0)
3240 except Exception as e
:
3241 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3244 async def _install_kdu(
3252 k8s_instance_info
: dict,
3253 k8params
: dict = None,
3259 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3262 "collection": "nsrs",
3263 "filter": {"_id": nsr_id
},
3264 "path": nsr_db_path
,
3267 if k8s_instance_info
.get("kdu-deployment-name"):
3268 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3270 kdu_instance
= self
.k8scluster_map
[
3272 ].generate_kdu_instance_name(
3273 db_dict
=db_dict_install
,
3274 kdu_model
=k8s_instance_info
["kdu-model"],
3275 kdu_name
=k8s_instance_info
["kdu-name"],
3278 # Update the nsrs table with the kdu-instance value
3282 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3285 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3286 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3287 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3288 # namespace, this first verification could be removed, and the next step would be done for any kind
3290 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3291 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3292 if k8sclustertype
in ("juju", "juju-bundle"):
3293 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3294 # that the user passed a namespace which he wants its KDU to be deployed in)
3300 "_admin.projects_write": k8s_instance_info
["namespace"],
3301 "_admin.projects_read": k8s_instance_info
["namespace"],
3307 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3312 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3314 k8s_instance_info
["namespace"] = kdu_instance
3316 await self
.k8scluster_map
[k8sclustertype
].install(
3317 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3318 kdu_model
=k8s_instance_info
["kdu-model"],
3321 db_dict
=db_dict_install
,
3323 kdu_name
=k8s_instance_info
["kdu-name"],
3324 namespace
=k8s_instance_info
["namespace"],
3325 kdu_instance
=kdu_instance
,
3329 # Obtain services to obtain management service ip
3330 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3331 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3332 kdu_instance
=kdu_instance
,
3333 namespace
=k8s_instance_info
["namespace"],
3336 # Obtain management service info (if exists)
3337 vnfr_update_dict
= {}
3338 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3340 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3345 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3348 for service
in kdud
.get("service", [])
3349 if service
.get("mgmt-service")
3351 for mgmt_service
in mgmt_services
:
3352 for service
in services
:
3353 if service
["name"].startswith(mgmt_service
["name"]):
3354 # Mgmt service found, Obtain service ip
3355 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3356 if isinstance(ip
, list) and len(ip
) == 1:
3360 "kdur.{}.ip-address".format(kdu_index
)
3363 # Check if must update also mgmt ip at the vnf
3364 service_external_cp
= mgmt_service
.get(
3365 "external-connection-point-ref"
3367 if service_external_cp
:
3369 deep_get(vnfd
, ("mgmt-interface", "cp"))
3370 == service_external_cp
3372 vnfr_update_dict
["ip-address"] = ip
3377 "external-connection-point-ref", ""
3379 == service_external_cp
,
3382 "kdur.{}.ip-address".format(kdu_index
)
3387 "Mgmt service name: {} not found".format(
3388 mgmt_service
["name"]
3392 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3393 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3395 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3398 and kdu_config
.get("initial-config-primitive")
3399 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3401 initial_config_primitive_list
= kdu_config
.get(
3402 "initial-config-primitive"
3404 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3406 for initial_config_primitive
in initial_config_primitive_list
:
3407 primitive_params_
= self
._map
_primitive
_params
(
3408 initial_config_primitive
, {}, {}
3411 await asyncio
.wait_for(
3412 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3413 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3414 kdu_instance
=kdu_instance
,
3415 primitive_name
=initial_config_primitive
["name"],
3416 params
=primitive_params_
,
3417 db_dict
=db_dict_install
,
3423 except Exception as e
:
3424 # Prepare update db with error and raise exception
3427 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3431 vnfr_data
.get("_id"),
3432 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3435 # ignore to keep original exception
3437 # reraise original error
3442 async def deploy_kdus(
3449 task_instantiation_info
,
3451 # Launch kdus if present in the descriptor
3453 k8scluster_id_2_uuic
= {
3454 "helm-chart-v3": {},
3459 async def _get_cluster_id(cluster_id
, cluster_type
):
3460 nonlocal k8scluster_id_2_uuic
3461 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3462 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3464 # check if K8scluster is creating and wait look if previous tasks in process
3465 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3466 "k8scluster", cluster_id
3469 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3470 task_name
, cluster_id
3472 self
.logger
.debug(logging_text
+ text
)
3473 await asyncio
.wait(task_dependency
, timeout
=3600)
3475 db_k8scluster
= self
.db
.get_one(
3476 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3478 if not db_k8scluster
:
3479 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3481 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3483 if cluster_type
== "helm-chart-v3":
3485 # backward compatibility for existing clusters that have not been initialized for helm v3
3486 k8s_credentials
= yaml
.safe_dump(
3487 db_k8scluster
.get("credentials")
3489 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3490 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3492 db_k8scluster_update
= {}
3493 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3494 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3495 db_k8scluster_update
[
3496 "_admin.helm-chart-v3.created"
3498 db_k8scluster_update
[
3499 "_admin.helm-chart-v3.operationalState"
3502 "k8sclusters", cluster_id
, db_k8scluster_update
3504 except Exception as e
:
3507 + "error initializing helm-v3 cluster: {}".format(str(e
))
3510 "K8s cluster '{}' has not been initialized for '{}'".format(
3511 cluster_id
, cluster_type
3516 "K8s cluster '{}' has not been initialized for '{}'".format(
3517 cluster_id
, cluster_type
3520 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3523 logging_text
+= "Deploy kdus: "
3526 db_nsr_update
= {"_admin.deployed.K8s": []}
3527 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3530 updated_cluster_list
= []
3531 updated_v3_cluster_list
= []
3533 for vnfr_data
in db_vnfrs
.values():
3534 vca_id
= self
.get_vca_id(vnfr_data
, {})
3535 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3536 # Step 0: Prepare and set parameters
3537 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3538 vnfd_id
= vnfr_data
.get("vnfd-id")
3539 vnfd_with_id
= find_in_list(
3540 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3544 for kdud
in vnfd_with_id
["kdu"]
3545 if kdud
["name"] == kdur
["kdu-name"]
3547 namespace
= kdur
.get("k8s-namespace")
3548 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3549 if kdur
.get("helm-chart"):
3550 kdumodel
= kdur
["helm-chart"]
3551 # Default version: helm3, if helm-version is v2 assign v2
3552 k8sclustertype
= "helm-chart-v3"
3553 self
.logger
.debug("kdur: {}".format(kdur
))
3555 kdur
.get("helm-version")
3556 and kdur
.get("helm-version") == "v2"
3558 k8sclustertype
= "helm-chart"
3559 elif kdur
.get("juju-bundle"):
3560 kdumodel
= kdur
["juju-bundle"]
3561 k8sclustertype
= "juju-bundle"
3564 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3565 "juju-bundle. Maybe an old NBI version is running".format(
3566 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3569 # check if kdumodel is a file and exists
3571 vnfd_with_id
= find_in_list(
3572 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3574 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3575 if storage
: # may be not present if vnfd has not artifacts
3576 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3577 if storage
["pkg-dir"]:
3578 filename
= "{}/{}/{}s/{}".format(
3585 filename
= "{}/Scripts/{}s/{}".format(
3590 if self
.fs
.file_exists(
3591 filename
, mode
="file"
3592 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3593 kdumodel
= self
.fs
.path
+ filename
3594 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3596 except Exception: # it is not a file
3599 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3600 step
= "Synchronize repos for k8s cluster '{}'".format(
3603 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3607 k8sclustertype
== "helm-chart"
3608 and cluster_uuid
not in updated_cluster_list
3610 k8sclustertype
== "helm-chart-v3"
3611 and cluster_uuid
not in updated_v3_cluster_list
3613 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3614 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3615 cluster_uuid
=cluster_uuid
3618 if del_repo_list
or added_repo_dict
:
3619 if k8sclustertype
== "helm-chart":
3621 "_admin.helm_charts_added." + item
: None
3622 for item
in del_repo_list
3625 "_admin.helm_charts_added." + item
: name
3626 for item
, name
in added_repo_dict
.items()
3628 updated_cluster_list
.append(cluster_uuid
)
3629 elif k8sclustertype
== "helm-chart-v3":
3631 "_admin.helm_charts_v3_added." + item
: None
3632 for item
in del_repo_list
3635 "_admin.helm_charts_v3_added." + item
: name
3636 for item
, name
in added_repo_dict
.items()
3638 updated_v3_cluster_list
.append(cluster_uuid
)
3640 logging_text
+ "repos synchronized on k8s cluster "
3641 "'{}' to_delete: {}, to_add: {}".format(
3642 k8s_cluster_id
, del_repo_list
, added_repo_dict
3647 {"_id": k8s_cluster_id
},
3653 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3654 vnfr_data
["member-vnf-index-ref"],
3658 k8s_instance_info
= {
3659 "kdu-instance": None,
3660 "k8scluster-uuid": cluster_uuid
,
3661 "k8scluster-type": k8sclustertype
,
3662 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3663 "kdu-name": kdur
["kdu-name"],
3664 "kdu-model": kdumodel
,
3665 "namespace": namespace
,
3666 "kdu-deployment-name": kdu_deployment_name
,
3668 db_path
= "_admin.deployed.K8s.{}".format(index
)
3669 db_nsr_update
[db_path
] = k8s_instance_info
3670 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3671 vnfd_with_id
= find_in_list(
3672 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3674 task
= asyncio
.ensure_future(
3683 k8params
=desc_params
,
3688 self
.lcm_tasks
.register(
3692 "instantiate_KDU-{}".format(index
),
3695 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3701 except (LcmException
, asyncio
.CancelledError
):
3703 except Exception as e
:
3704 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3705 if isinstance(e
, (N2VCException
, DbException
)):
3706 self
.logger
.error(logging_text
+ msg
)
3708 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3709 raise LcmException(msg
)
3712 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3731 task_instantiation_info
,
3734 # launch instantiate_N2VC in a asyncio task and register task object
3735 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3736 # if not found, create one entry and update database
3737 # fill db_nsr._admin.deployed.VCA.<index>
3740 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3742 if "execution-environment-list" in descriptor_config
:
3743 ee_list
= descriptor_config
.get("execution-environment-list", [])
3744 elif "juju" in descriptor_config
:
3745 ee_list
= [descriptor_config
] # ns charms
3746 else: # other types as script are not supported
3749 for ee_item
in ee_list
:
3752 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3753 ee_item
.get("juju"), ee_item
.get("helm-chart")
3756 ee_descriptor_id
= ee_item
.get("id")
3757 if ee_item
.get("juju"):
3758 vca_name
= ee_item
["juju"].get("charm")
3761 if ee_item
["juju"].get("charm") is not None
3764 if ee_item
["juju"].get("cloud") == "k8s":
3765 vca_type
= "k8s_proxy_charm"
3766 elif ee_item
["juju"].get("proxy") is False:
3767 vca_type
= "native_charm"
3768 elif ee_item
.get("helm-chart"):
3769 vca_name
= ee_item
["helm-chart"]
3770 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3773 vca_type
= "helm-v3"
3776 logging_text
+ "skipping non juju neither charm configuration"
3781 for vca_index
, vca_deployed
in enumerate(
3782 db_nsr
["_admin"]["deployed"]["VCA"]
3784 if not vca_deployed
:
3787 vca_deployed
.get("member-vnf-index") == member_vnf_index
3788 and vca_deployed
.get("vdu_id") == vdu_id
3789 and vca_deployed
.get("kdu_name") == kdu_name
3790 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3791 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3795 # not found, create one.
3797 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3800 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3802 target
+= "/kdu/{}".format(kdu_name
)
3804 "target_element": target
,
3805 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3806 "member-vnf-index": member_vnf_index
,
3808 "kdu_name": kdu_name
,
3809 "vdu_count_index": vdu_index
,
3810 "operational-status": "init", # TODO revise
3811 "detailed-status": "", # TODO revise
3812 "step": "initial-deploy", # TODO revise
3814 "vdu_name": vdu_name
,
3816 "ee_descriptor_id": ee_descriptor_id
,
3820 # create VCA and configurationStatus in db
3822 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3823 "configurationStatus.{}".format(vca_index
): dict(),
3825 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3827 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3829 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3830 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3831 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3834 task_n2vc
= asyncio
.ensure_future(
3835 self
.instantiate_N2VC(
3836 logging_text
=logging_text
,
3837 vca_index
=vca_index
,
3843 vdu_index
=vdu_index
,
3844 deploy_params
=deploy_params
,
3845 config_descriptor
=descriptor_config
,
3846 base_folder
=base_folder
,
3847 nslcmop_id
=nslcmop_id
,
3851 ee_config_descriptor
=ee_item
,
3854 self
.lcm_tasks
.register(
3858 "instantiate_N2VC-{}".format(vca_index
),
3861 task_instantiation_info
[
3863 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3864 member_vnf_index
or "", vdu_id
or ""
3868 def _create_nslcmop(nsr_id
, operation
, params
):
3870 Creates a ns-lcm-opp content to be stored at database.
3871 :param nsr_id: internal id of the instance
3872 :param operation: instantiate, terminate, scale, action, ...
3873 :param params: user parameters for the operation
3874 :return: dictionary following SOL005 format
3876 # Raise exception if invalid arguments
3877 if not (nsr_id
and operation
and params
):
3879 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3886 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3887 "operationState": "PROCESSING",
3888 "statusEnteredTime": now
,
3889 "nsInstanceId": nsr_id
,
3890 "lcmOperationType": operation
,
3892 "isAutomaticInvocation": False,
3893 "operationParams": params
,
3894 "isCancelPending": False,
3896 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3897 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3902 def _format_additional_params(self
, params
):
3903 params
= params
or {}
3904 for key
, value
in params
.items():
3905 if str(value
).startswith("!!yaml "):
3906 params
[key
] = yaml
.safe_load(value
[7:])
3909 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3910 primitive
= seq
.get("name")
3911 primitive_params
= {}
3913 "member_vnf_index": vnf_index
,
3914 "primitive": primitive
,
3915 "primitive_params": primitive_params
,
3918 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3922 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3923 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3924 if op
.get("operationState") == "COMPLETED":
3925 # b. Skip sub-operation
3926 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3927 return self
.SUBOPERATION_STATUS_SKIP
3929 # c. retry executing sub-operation
3930 # The sub-operation exists, and operationState != 'COMPLETED'
3931 # Update operationState = 'PROCESSING' to indicate a retry.
3932 operationState
= "PROCESSING"
3933 detailed_status
= "In progress"
3934 self
._update
_suboperation
_status
(
3935 db_nslcmop
, op_index
, operationState
, detailed_status
3937 # Return the sub-operation index
3938 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3939 # with arguments extracted from the sub-operation
3942 # Find a sub-operation where all keys in a matching dictionary must match
3943 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3944 def _find_suboperation(self
, db_nslcmop
, match
):
3945 if db_nslcmop
and match
:
3946 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3947 for i
, op
in enumerate(op_list
):
3948 if all(op
.get(k
) == match
[k
] for k
in match
):
3950 return self
.SUBOPERATION_STATUS_NOT_FOUND
3952 # Update status for a sub-operation given its index
3953 def _update_suboperation_status(
3954 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3956 # Update DB for HA tasks
3957 q_filter
= {"_id": db_nslcmop
["_id"]}
3959 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3960 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3963 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3966 # Add sub-operation, return the index of the added sub-operation
3967 # Optionally, set operationState, detailed-status, and operationType
3968 # Status and type are currently set for 'scale' sub-operations:
3969 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3970 # 'detailed-status' : status message
3971 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3972 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3973 def _add_suboperation(
3981 mapped_primitive_params
,
3982 operationState
=None,
3983 detailed_status
=None,
3986 RO_scaling_info
=None,
3989 return self
.SUBOPERATION_STATUS_NOT_FOUND
3990 # Get the "_admin.operations" list, if it exists
3991 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3992 op_list
= db_nslcmop_admin
.get("operations")
3993 # Create or append to the "_admin.operations" list
3995 "member_vnf_index": vnf_index
,
3997 "vdu_count_index": vdu_count_index
,
3998 "primitive": primitive
,
3999 "primitive_params": mapped_primitive_params
,
4002 new_op
["operationState"] = operationState
4004 new_op
["detailed-status"] = detailed_status
4006 new_op
["lcmOperationType"] = operationType
4008 new_op
["RO_nsr_id"] = RO_nsr_id
4010 new_op
["RO_scaling_info"] = RO_scaling_info
4012 # No existing operations, create key 'operations' with current operation as first list element
4013 db_nslcmop_admin
.update({"operations": [new_op
]})
4014 op_list
= db_nslcmop_admin
.get("operations")
4016 # Existing operations, append operation to list
4017 op_list
.append(new_op
)
4019 db_nslcmop_update
= {"_admin.operations": op_list
}
4020 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4021 op_index
= len(op_list
) - 1
4024 # Helper methods for scale() sub-operations
4026 # pre-scale/post-scale:
4027 # Check for 3 different cases:
4028 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4029 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4030 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4031 def _check_or_add_scale_suboperation(
4035 vnf_config_primitive
,
4039 RO_scaling_info
=None,
4041 # Find this sub-operation
4042 if RO_nsr_id
and RO_scaling_info
:
4043 operationType
= "SCALE-RO"
4045 "member_vnf_index": vnf_index
,
4046 "RO_nsr_id": RO_nsr_id
,
4047 "RO_scaling_info": RO_scaling_info
,
4051 "member_vnf_index": vnf_index
,
4052 "primitive": vnf_config_primitive
,
4053 "primitive_params": primitive_params
,
4054 "lcmOperationType": operationType
,
4056 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4057 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4058 # a. New sub-operation
4059 # The sub-operation does not exist, add it.
4060 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4061 # The following parameters are set to None for all kind of scaling:
4063 vdu_count_index
= None
4065 if RO_nsr_id
and RO_scaling_info
:
4066 vnf_config_primitive
= None
4067 primitive_params
= None
4070 RO_scaling_info
= None
4071 # Initial status for sub-operation
4072 operationState
= "PROCESSING"
4073 detailed_status
= "In progress"
4074 # Add sub-operation for pre/post-scaling (zero or more operations)
4075 self
._add
_suboperation
(
4081 vnf_config_primitive
,
4089 return self
.SUBOPERATION_STATUS_NEW
4091 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4092 # or op_index (operationState != 'COMPLETED')
4093 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4095 # Function to return execution_environment id
4097 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4098 # TODO vdu_index_count
4099 for vca
in vca_deployed_list
:
4100 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4103 async def destroy_N2VC(
4111 exec_primitives
=True,
4116 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4117 :param logging_text:
4119 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4120 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4121 :param vca_index: index in the database _admin.deployed.VCA
4122 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4123 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4124 not executed properly
4125 :param scaling_in: True destroys the application, False destroys the model
4126 :return: None or exception
4131 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4132 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4136 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4138 # execute terminate_primitives
4140 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4141 config_descriptor
.get("terminate-config-primitive"),
4142 vca_deployed
.get("ee_descriptor_id"),
4144 vdu_id
= vca_deployed
.get("vdu_id")
4145 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4146 vdu_name
= vca_deployed
.get("vdu_name")
4147 vnf_index
= vca_deployed
.get("member-vnf-index")
4148 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4149 for seq
in terminate_primitives
:
4150 # For each sequence in list, get primitive and call _ns_execute_primitive()
4151 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4152 vnf_index
, seq
.get("name")
4154 self
.logger
.debug(logging_text
+ step
)
4155 # Create the primitive for each sequence, i.e. "primitive": "touch"
4156 primitive
= seq
.get("name")
4157 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4162 self
._add
_suboperation
(
4169 mapped_primitive_params
,
4171 # Sub-operations: Call _ns_execute_primitive() instead of action()
4173 result
, result_detail
= await self
._ns
_execute
_primitive
(
4174 vca_deployed
["ee_id"],
4176 mapped_primitive_params
,
4180 except LcmException
:
4181 # this happens when VCA is not deployed. In this case it is not needed to terminate
4183 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4184 if result
not in result_ok
:
4186 "terminate_primitive {} for vnf_member_index={} fails with "
4187 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4189 # set that this VCA do not need terminated
4190 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4194 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4197 # Delete Prometheus Jobs if any
4198 # This uses NSR_ID, so it will destroy any jobs under this index
4199 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4202 await self
.vca_map
[vca_type
].delete_execution_environment(
4203 vca_deployed
["ee_id"],
4204 scaling_in
=scaling_in
,
4209 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4210 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4211 namespace
= "." + db_nsr
["_id"]
4213 await self
.n2vc
.delete_namespace(
4214 namespace
=namespace
,
4215 total_timeout
=self
.timeout_charm_delete
,
4218 except N2VCNotFound
: # already deleted. Skip
4220 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4222 async def _terminate_RO(
4223 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4226 Terminates a deployment from RO
4227 :param logging_text:
4228 :param nsr_deployed: db_nsr._admin.deployed
4231 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4232 this method will update only the index 2, but it will write on database the concatenated content of the list
4237 ro_nsr_id
= ro_delete_action
= None
4238 if nsr_deployed
and nsr_deployed
.get("RO"):
4239 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4240 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4243 stage
[2] = "Deleting ns from VIM."
4244 db_nsr_update
["detailed-status"] = " ".join(stage
)
4245 self
._write
_op
_status
(nslcmop_id
, stage
)
4246 self
.logger
.debug(logging_text
+ stage
[2])
4247 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4248 self
._write
_op
_status
(nslcmop_id
, stage
)
4249 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4250 ro_delete_action
= desc
["action_id"]
4252 "_admin.deployed.RO.nsr_delete_action_id"
4253 ] = ro_delete_action
4254 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4255 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4256 if ro_delete_action
:
4257 # wait until NS is deleted from VIM
4258 stage
[2] = "Waiting ns deleted from VIM."
4259 detailed_status_old
= None
4263 + " RO_id={} ro_delete_action={}".format(
4264 ro_nsr_id
, ro_delete_action
4267 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4268 self
._write
_op
_status
(nslcmop_id
, stage
)
4270 delete_timeout
= 20 * 60 # 20 minutes
4271 while delete_timeout
> 0:
4272 desc
= await self
.RO
.show(
4274 item_id_name
=ro_nsr_id
,
4275 extra_item
="action",
4276 extra_item_id
=ro_delete_action
,
4280 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4282 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4283 if ns_status
== "ERROR":
4284 raise ROclient
.ROClientException(ns_status_info
)
4285 elif ns_status
== "BUILD":
4286 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4287 elif ns_status
== "ACTIVE":
4288 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4289 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4294 ), "ROclient.check_action_status returns unknown {}".format(
4297 if stage
[2] != detailed_status_old
:
4298 detailed_status_old
= stage
[2]
4299 db_nsr_update
["detailed-status"] = " ".join(stage
)
4300 self
._write
_op
_status
(nslcmop_id
, stage
)
4301 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4302 await asyncio
.sleep(5, loop
=self
.loop
)
4304 else: # delete_timeout <= 0:
4305 raise ROclient
.ROClientException(
4306 "Timeout waiting ns deleted from VIM"
4309 except Exception as e
:
4310 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4312 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4314 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4315 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4316 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4318 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4321 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4323 failed_detail
.append("delete conflict: {}".format(e
))
4326 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4329 failed_detail
.append("delete error: {}".format(e
))
4331 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4335 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4336 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4338 stage
[2] = "Deleting nsd from RO."
4339 db_nsr_update
["detailed-status"] = " ".join(stage
)
4340 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4341 self
._write
_op
_status
(nslcmop_id
, stage
)
4342 await self
.RO
.delete("nsd", ro_nsd_id
)
4344 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4346 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4347 except Exception as e
:
4349 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4351 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4353 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4356 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4358 failed_detail
.append(
4359 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4361 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4363 failed_detail
.append(
4364 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4366 self
.logger
.error(logging_text
+ failed_detail
[-1])
4368 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4369 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4370 if not vnf_deployed
or not vnf_deployed
["id"]:
4373 ro_vnfd_id
= vnf_deployed
["id"]
4376 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4377 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4379 db_nsr_update
["detailed-status"] = " ".join(stage
)
4380 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4381 self
._write
_op
_status
(nslcmop_id
, stage
)
4382 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4384 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4386 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4387 except Exception as e
:
4389 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4392 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4396 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4399 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4401 failed_detail
.append(
4402 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4404 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4406 failed_detail
.append(
4407 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4409 self
.logger
.error(logging_text
+ failed_detail
[-1])
4412 stage
[2] = "Error deleting from VIM"
4414 stage
[2] = "Deleted from VIM"
4415 db_nsr_update
["detailed-status"] = " ".join(stage
)
4416 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4417 self
._write
_op
_status
(nslcmop_id
, stage
)
4420 raise LcmException("; ".join(failed_detail
))
4422 async def terminate(self
, nsr_id
, nslcmop_id
):
4423 # Try to lock HA task here
4424 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4425 if not task_is_locked_by_me
:
4428 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4429 self
.logger
.debug(logging_text
+ "Enter")
4430 timeout_ns_terminate
= self
.timeout_ns_terminate
4433 operation_params
= None
4435 error_list
= [] # annotates all failed error messages
4436 db_nslcmop_update
= {}
4437 autoremove
= False # autoremove after terminated
4438 tasks_dict_info
= {}
4441 "Stage 1/3: Preparing task.",
4442 "Waiting for previous operations to terminate.",
4445 # ^ contains [stage, step, VIM-status]
4447 # wait for any previous tasks in process
4448 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4450 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4451 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4452 operation_params
= db_nslcmop
.get("operationParams") or {}
4453 if operation_params
.get("timeout_ns_terminate"):
4454 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4455 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4456 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4458 db_nsr_update
["operational-status"] = "terminating"
4459 db_nsr_update
["config-status"] = "terminating"
4460 self
._write
_ns
_status
(
4462 ns_state
="TERMINATING",
4463 current_operation
="TERMINATING",
4464 current_operation_id
=nslcmop_id
,
4465 other_update
=db_nsr_update
,
4467 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4468 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4469 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4472 stage
[1] = "Getting vnf descriptors from db."
4473 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4475 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4477 db_vnfds_from_id
= {}
4478 db_vnfds_from_member_index
= {}
4480 for vnfr
in db_vnfrs_list
:
4481 vnfd_id
= vnfr
["vnfd-id"]
4482 if vnfd_id
not in db_vnfds_from_id
:
4483 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4484 db_vnfds_from_id
[vnfd_id
] = vnfd
4485 db_vnfds_from_member_index
[
4486 vnfr
["member-vnf-index-ref"]
4487 ] = db_vnfds_from_id
[vnfd_id
]
4489 # Destroy individual execution environments when there are terminating primitives.
4490 # Rest of EE will be deleted at once
4491 # TODO - check before calling _destroy_N2VC
4492 # if not operation_params.get("skip_terminate_primitives"):#
4493 # or not vca.get("needed_terminate"):
4494 stage
[0] = "Stage 2/3 execute terminating primitives."
4495 self
.logger
.debug(logging_text
+ stage
[0])
4496 stage
[1] = "Looking execution environment that needs terminate."
4497 self
.logger
.debug(logging_text
+ stage
[1])
4499 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4500 config_descriptor
= None
4501 vca_member_vnf_index
= vca
.get("member-vnf-index")
4502 vca_id
= self
.get_vca_id(
4503 db_vnfrs_dict
.get(vca_member_vnf_index
)
4504 if vca_member_vnf_index
4508 if not vca
or not vca
.get("ee_id"):
4510 if not vca
.get("member-vnf-index"):
4512 config_descriptor
= db_nsr
.get("ns-configuration")
4513 elif vca
.get("vdu_id"):
4514 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4515 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4516 elif vca
.get("kdu_name"):
4517 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4518 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4520 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4521 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4522 vca_type
= vca
.get("type")
4523 exec_terminate_primitives
= not operation_params
.get(
4524 "skip_terminate_primitives"
4525 ) and vca
.get("needed_terminate")
4526 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4527 # pending native charms
4529 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4531 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4532 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4533 task
= asyncio
.ensure_future(
4541 exec_terminate_primitives
,
4545 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4547 # wait for pending tasks of terminate primitives
4551 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4553 error_list
= await self
._wait
_for
_tasks
(
4556 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4560 tasks_dict_info
.clear()
4562 return # raise LcmException("; ".join(error_list))
4564 # remove All execution environments at once
4565 stage
[0] = "Stage 3/3 delete all."
4567 if nsr_deployed
.get("VCA"):
4568 stage
[1] = "Deleting all execution environments."
4569 self
.logger
.debug(logging_text
+ stage
[1])
4570 vca_id
= self
.get_vca_id({}, db_nsr
)
4571 task_delete_ee
= asyncio
.ensure_future(
4573 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4574 timeout
=self
.timeout_charm_delete
,
4577 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4578 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4580 # Delete from k8scluster
4581 stage
[1] = "Deleting KDUs."
4582 self
.logger
.debug(logging_text
+ stage
[1])
4583 # print(nsr_deployed)
4584 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4585 if not kdu
or not kdu
.get("kdu-instance"):
4587 kdu_instance
= kdu
.get("kdu-instance")
4588 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4589 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4590 vca_id
= self
.get_vca_id({}, db_nsr
)
4591 task_delete_kdu_instance
= asyncio
.ensure_future(
4592 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4593 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4594 kdu_instance
=kdu_instance
,
4596 namespace
=kdu
.get("namespace"),
4602 + "Unknown k8s deployment type {}".format(
4603 kdu
.get("k8scluster-type")
4608 task_delete_kdu_instance
4609 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4612 stage
[1] = "Deleting ns from VIM."
4614 task_delete_ro
= asyncio
.ensure_future(
4615 self
._terminate
_ng
_ro
(
4616 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4620 task_delete_ro
= asyncio
.ensure_future(
4622 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4625 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4627 # rest of staff will be done at finally
4630 ROclient
.ROClientException
,
4635 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4637 except asyncio
.CancelledError
:
4639 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4641 exc
= "Operation was cancelled"
4642 except Exception as e
:
4643 exc
= traceback
.format_exc()
4644 self
.logger
.critical(
4645 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4650 error_list
.append(str(exc
))
4652 # wait for pending tasks
4654 stage
[1] = "Waiting for terminate pending tasks."
4655 self
.logger
.debug(logging_text
+ stage
[1])
4656 error_list
+= await self
._wait
_for
_tasks
(
4659 timeout_ns_terminate
,
4663 stage
[1] = stage
[2] = ""
4664 except asyncio
.CancelledError
:
4665 error_list
.append("Cancelled")
4666 # TODO cancell all tasks
4667 except Exception as exc
:
4668 error_list
.append(str(exc
))
4669 # update status at database
4671 error_detail
= "; ".join(error_list
)
4672 # self.logger.error(logging_text + error_detail)
4673 error_description_nslcmop
= "{} Detail: {}".format(
4674 stage
[0], error_detail
4676 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4677 nslcmop_id
, stage
[0]
4680 db_nsr_update
["operational-status"] = "failed"
4681 db_nsr_update
["detailed-status"] = (
4682 error_description_nsr
+ " Detail: " + error_detail
4684 db_nslcmop_update
["detailed-status"] = error_detail
4685 nslcmop_operation_state
= "FAILED"
4689 error_description_nsr
= error_description_nslcmop
= None
4690 ns_state
= "NOT_INSTANTIATED"
4691 db_nsr_update
["operational-status"] = "terminated"
4692 db_nsr_update
["detailed-status"] = "Done"
4693 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4694 db_nslcmop_update
["detailed-status"] = "Done"
4695 nslcmop_operation_state
= "COMPLETED"
4698 self
._write
_ns
_status
(
4701 current_operation
="IDLE",
4702 current_operation_id
=None,
4703 error_description
=error_description_nsr
,
4704 error_detail
=error_detail
,
4705 other_update
=db_nsr_update
,
4707 self
._write
_op
_status
(
4710 error_message
=error_description_nslcmop
,
4711 operation_state
=nslcmop_operation_state
,
4712 other_update
=db_nslcmop_update
,
4714 if ns_state
== "NOT_INSTANTIATED":
4718 {"nsr-id-ref": nsr_id
},
4719 {"_admin.nsState": "NOT_INSTANTIATED"},
4721 except DbException
as e
:
4724 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4728 if operation_params
:
4729 autoremove
= operation_params
.get("autoremove", False)
4730 if nslcmop_operation_state
:
4732 await self
.msg
.aiowrite(
4737 "nslcmop_id": nslcmop_id
,
4738 "operationState": nslcmop_operation_state
,
4739 "autoremove": autoremove
,
4743 except Exception as e
:
4745 logging_text
+ "kafka_write notification Exception {}".format(e
)
4748 self
.logger
.debug(logging_text
+ "Exit")
4749 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4751 async def _wait_for_tasks(
4752 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4755 error_detail_list
= []
4757 pending_tasks
= list(created_tasks_info
.keys())
4758 num_tasks
= len(pending_tasks
)
4760 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4761 self
._write
_op
_status
(nslcmop_id
, stage
)
4762 while pending_tasks
:
4764 _timeout
= timeout
+ time_start
- time()
4765 done
, pending_tasks
= await asyncio
.wait(
4766 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4768 num_done
+= len(done
)
4769 if not done
: # Timeout
4770 for task
in pending_tasks
:
4771 new_error
= created_tasks_info
[task
] + ": Timeout"
4772 error_detail_list
.append(new_error
)
4773 error_list
.append(new_error
)
4776 if task
.cancelled():
4779 exc
= task
.exception()
4781 if isinstance(exc
, asyncio
.TimeoutError
):
4783 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4784 error_list
.append(created_tasks_info
[task
])
4785 error_detail_list
.append(new_error
)
4792 ROclient
.ROClientException
,
4798 self
.logger
.error(logging_text
+ new_error
)
4800 exc_traceback
= "".join(
4801 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4805 + created_tasks_info
[task
]
4811 logging_text
+ created_tasks_info
[task
] + ": Done"
4813 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4815 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4816 if nsr_id
: # update also nsr
4821 "errorDescription": "Error at: " + ", ".join(error_list
),
4822 "errorDetail": ". ".join(error_detail_list
),
4825 self
._write
_op
_status
(nslcmop_id
, stage
)
4826 return error_detail_list
4829 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4831 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4832 The default-value is used. If it is between < > it look for a value at instantiation_params
4833 :param primitive_desc: portion of VNFD/NSD that describes primitive
4834 :param params: Params provided by user
4835 :param instantiation_params: Instantiation params provided by user
4836 :return: a dictionary with the calculated params
4838 calculated_params
= {}
4839 for parameter
in primitive_desc
.get("parameter", ()):
4840 param_name
= parameter
["name"]
4841 if param_name
in params
:
4842 calculated_params
[param_name
] = params
[param_name
]
4843 elif "default-value" in parameter
or "value" in parameter
:
4844 if "value" in parameter
:
4845 calculated_params
[param_name
] = parameter
["value"]
4847 calculated_params
[param_name
] = parameter
["default-value"]
4849 isinstance(calculated_params
[param_name
], str)
4850 and calculated_params
[param_name
].startswith("<")
4851 and calculated_params
[param_name
].endswith(">")
4853 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4854 calculated_params
[param_name
] = instantiation_params
[
4855 calculated_params
[param_name
][1:-1]
4859 "Parameter {} needed to execute primitive {} not provided".format(
4860 calculated_params
[param_name
], primitive_desc
["name"]
4865 "Parameter {} needed to execute primitive {} not provided".format(
4866 param_name
, primitive_desc
["name"]
4870 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4871 calculated_params
[param_name
] = yaml
.safe_dump(
4872 calculated_params
[param_name
], default_flow_style
=True, width
=256
4874 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4876 ].startswith("!!yaml "):
4877 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4878 if parameter
.get("data-type") == "INTEGER":
4880 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4881 except ValueError: # error converting string to int
4883 "Parameter {} of primitive {} must be integer".format(
4884 param_name
, primitive_desc
["name"]
4887 elif parameter
.get("data-type") == "BOOLEAN":
4888 calculated_params
[param_name
] = not (
4889 (str(calculated_params
[param_name
])).lower() == "false"
4892 # add always ns_config_info if primitive name is config
4893 if primitive_desc
["name"] == "config":
4894 if "ns_config_info" in instantiation_params
:
4895 calculated_params
["ns_config_info"] = instantiation_params
[
4898 return calculated_params
4900 def _look_for_deployed_vca(
4907 ee_descriptor_id
=None,
4909 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4910 for vca
in deployed_vca
:
4913 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4916 vdu_count_index
is not None
4917 and vdu_count_index
!= vca
["vdu_count_index"]
4920 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4922 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4926 # vca_deployed not found
4928 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4929 " is not deployed".format(
4938 ee_id
= vca
.get("ee_id")
4940 "type", "lxc_proxy_charm"
4941 ) # default value for backward compatibility - proxy charm
4944 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4945 "execution environment".format(
4946 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4949 return ee_id
, vca_type
4951 async def _ns_execute_primitive(
4957 retries_interval
=30,
4964 if primitive
== "config":
4965 primitive_params
= {"params": primitive_params
}
4967 vca_type
= vca_type
or "lxc_proxy_charm"
4971 output
= await asyncio
.wait_for(
4972 self
.vca_map
[vca_type
].exec_primitive(
4974 primitive_name
=primitive
,
4975 params_dict
=primitive_params
,
4976 progress_timeout
=self
.timeout_progress_primitive
,
4977 total_timeout
=self
.timeout_primitive
,
4982 timeout
=timeout
or self
.timeout_primitive
,
4986 except asyncio
.CancelledError
:
4988 except Exception as e
: # asyncio.TimeoutError
4989 if isinstance(e
, asyncio
.TimeoutError
):
4994 "Error executing action {} on {} -> {}".format(
4999 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5001 return "FAILED", str(e
)
5003 return "COMPLETED", output
5005 except (LcmException
, asyncio
.CancelledError
):
5007 except Exception as e
:
5008 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5010 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5012 Updating the vca_status with latest juju information in nsrs record
5013 :param: nsr_id: Id of the nsr
5014 :param: nslcmop_id: Id of the nslcmop
5018 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5019 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5020 vca_id
= self
.get_vca_id({}, db_nsr
)
5021 if db_nsr
["_admin"]["deployed"]["K8s"]:
5022 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5023 cluster_uuid
, kdu_instance
, cluster_type
= (
5024 k8s
["k8scluster-uuid"],
5025 k8s
["kdu-instance"],
5026 k8s
["k8scluster-type"],
5028 await self
._on
_update
_k
8s
_db
(
5029 cluster_uuid
=cluster_uuid
,
5030 kdu_instance
=kdu_instance
,
5031 filter={"_id": nsr_id
},
5033 cluster_type
=cluster_type
,
5036 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5037 table
, filter = "nsrs", {"_id": nsr_id
}
5038 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5039 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5041 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5042 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5044 async def action(self
, nsr_id
, nslcmop_id
):
5045 # Try to lock HA task here
5046 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5047 if not task_is_locked_by_me
:
5050 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5051 self
.logger
.debug(logging_text
+ "Enter")
5052 # get all needed from database
5056 db_nslcmop_update
= {}
5057 nslcmop_operation_state
= None
5058 error_description_nslcmop
= None
5061 # wait for any previous tasks in process
5062 step
= "Waiting for previous operations to terminate"
5063 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5065 self
._write
_ns
_status
(
5068 current_operation
="RUNNING ACTION",
5069 current_operation_id
=nslcmop_id
,
5072 step
= "Getting information from database"
5073 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5074 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5075 if db_nslcmop
["operationParams"].get("primitive_params"):
5076 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5077 db_nslcmop
["operationParams"]["primitive_params"]
5080 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5081 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5082 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5083 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5084 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5085 primitive
= db_nslcmop
["operationParams"]["primitive"]
5086 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5087 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5088 "timeout_ns_action", self
.timeout_primitive
5092 step
= "Getting vnfr from database"
5093 db_vnfr
= self
.db
.get_one(
5094 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5096 if db_vnfr
.get("kdur"):
5098 for kdur
in db_vnfr
["kdur"]:
5099 if kdur
.get("additionalParams"):
5100 kdur
["additionalParams"] = json
.loads(
5101 kdur
["additionalParams"]
5103 kdur_list
.append(kdur
)
5104 db_vnfr
["kdur"] = kdur_list
5105 step
= "Getting vnfd from database"
5106 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5108 # Sync filesystem before running a primitive
5109 self
.fs
.sync(db_vnfr
["vnfd-id"])
5111 step
= "Getting nsd from database"
5112 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5114 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5115 # for backward compatibility
5116 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5117 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5118 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5119 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5121 # look for primitive
5122 config_primitive_desc
= descriptor_configuration
= None
5124 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5126 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5128 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5130 descriptor_configuration
= db_nsd
.get("ns-configuration")
5132 if descriptor_configuration
and descriptor_configuration
.get(
5135 for config_primitive
in descriptor_configuration
["config-primitive"]:
5136 if config_primitive
["name"] == primitive
:
5137 config_primitive_desc
= config_primitive
5140 if not config_primitive_desc
:
5141 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5143 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5147 primitive_name
= primitive
5148 ee_descriptor_id
= None
5150 primitive_name
= config_primitive_desc
.get(
5151 "execution-environment-primitive", primitive
5153 ee_descriptor_id
= config_primitive_desc
.get(
5154 "execution-environment-ref"
5160 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5162 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5165 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5167 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5169 desc_params
= parse_yaml_strings(
5170 db_vnfr
.get("additionalParamsForVnf")
5173 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5174 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5175 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5177 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5178 actions
.add(primitive
["name"])
5179 for primitive
in kdu_configuration
.get("config-primitive", []):
5180 actions
.add(primitive
["name"])
5182 nsr_deployed
["K8s"],
5183 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5184 and kdu
["member-vnf-index"] == vnf_index
,
5188 if primitive_name
in actions
5189 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5193 # TODO check if ns is in a proper status
5195 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5197 # kdur and desc_params already set from before
5198 if primitive_params
:
5199 desc_params
.update(primitive_params
)
5200 # TODO Check if we will need something at vnf level
5201 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5203 kdu_name
== kdu
["kdu-name"]
5204 and kdu
["member-vnf-index"] == vnf_index
5209 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5212 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5213 msg
= "unknown k8scluster-type '{}'".format(
5214 kdu
.get("k8scluster-type")
5216 raise LcmException(msg
)
5219 "collection": "nsrs",
5220 "filter": {"_id": nsr_id
},
5221 "path": "_admin.deployed.K8s.{}".format(index
),
5225 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5227 step
= "Executing kdu {}".format(primitive_name
)
5228 if primitive_name
== "upgrade":
5229 if desc_params
.get("kdu_model"):
5230 kdu_model
= desc_params
.get("kdu_model")
5231 del desc_params
["kdu_model"]
5233 kdu_model
= kdu
.get("kdu-model")
5234 parts
= kdu_model
.split(sep
=":")
5236 kdu_model
= parts
[0]
5238 detailed_status
= await asyncio
.wait_for(
5239 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5240 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5241 kdu_instance
=kdu
.get("kdu-instance"),
5243 kdu_model
=kdu_model
,
5246 timeout
=timeout_ns_action
,
5248 timeout
=timeout_ns_action
+ 10,
5251 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5253 elif primitive_name
== "rollback":
5254 detailed_status
= await asyncio
.wait_for(
5255 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5256 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5257 kdu_instance
=kdu
.get("kdu-instance"),
5260 timeout
=timeout_ns_action
,
5262 elif primitive_name
== "status":
5263 detailed_status
= await asyncio
.wait_for(
5264 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5265 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5266 kdu_instance
=kdu
.get("kdu-instance"),
5269 timeout
=timeout_ns_action
,
5272 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5273 kdu
["kdu-name"], nsr_id
5275 params
= self
._map
_primitive
_params
(
5276 config_primitive_desc
, primitive_params
, desc_params
5279 detailed_status
= await asyncio
.wait_for(
5280 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5281 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5282 kdu_instance
=kdu_instance
,
5283 primitive_name
=primitive_name
,
5286 timeout
=timeout_ns_action
,
5289 timeout
=timeout_ns_action
,
5293 nslcmop_operation_state
= "COMPLETED"
5295 detailed_status
= ""
5296 nslcmop_operation_state
= "FAILED"
5298 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5299 nsr_deployed
["VCA"],
5300 member_vnf_index
=vnf_index
,
5302 vdu_count_index
=vdu_count_index
,
5303 ee_descriptor_id
=ee_descriptor_id
,
5305 for vca_index
, vca_deployed
in enumerate(
5306 db_nsr
["_admin"]["deployed"]["VCA"]
5308 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5310 "collection": "nsrs",
5311 "filter": {"_id": nsr_id
},
5312 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5316 nslcmop_operation_state
,
5318 ) = await self
._ns
_execute
_primitive
(
5320 primitive
=primitive_name
,
5321 primitive_params
=self
._map
_primitive
_params
(
5322 config_primitive_desc
, primitive_params
, desc_params
5324 timeout
=timeout_ns_action
,
5330 db_nslcmop_update
["detailed-status"] = detailed_status
5331 error_description_nslcmop
= (
5332 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5336 + " task Done with result {} {}".format(
5337 nslcmop_operation_state
, detailed_status
5340 return # database update is called inside finally
5342 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5343 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5345 except asyncio
.CancelledError
:
5347 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5349 exc
= "Operation was cancelled"
5350 except asyncio
.TimeoutError
:
5351 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5353 except Exception as e
:
5354 exc
= traceback
.format_exc()
5355 self
.logger
.critical(
5356 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5365 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5366 nslcmop_operation_state
= "FAILED"
5368 self
._write
_ns
_status
(
5372 ], # TODO check if degraded. For the moment use previous status
5373 current_operation
="IDLE",
5374 current_operation_id
=None,
5375 # error_description=error_description_nsr,
5376 # error_detail=error_detail,
5377 other_update
=db_nsr_update
,
5380 self
._write
_op
_status
(
5383 error_message
=error_description_nslcmop
,
5384 operation_state
=nslcmop_operation_state
,
5385 other_update
=db_nslcmop_update
,
5388 if nslcmop_operation_state
:
5390 await self
.msg
.aiowrite(
5395 "nslcmop_id": nslcmop_id
,
5396 "operationState": nslcmop_operation_state
,
5400 except Exception as e
:
5402 logging_text
+ "kafka_write notification Exception {}".format(e
)
5404 self
.logger
.debug(logging_text
+ "Exit")
5405 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5406 return nslcmop_operation_state
, detailed_status
5408 async def terminate_vdus(
5409 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5411 """This method terminates VDUs
5414 db_vnfr: VNF instance record
5415 member_vnf_index: VNF index to identify the VDUs to be removed
5416 db_nsr: NS instance record
5417 update_db_nslcmops: Nslcmop update record
5419 vca_scaling_info
= []
5420 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5421 scaling_info
["scaling_direction"] = "IN"
5422 scaling_info
["vdu-delete"] = {}
5423 scaling_info
["kdu-delete"] = {}
5424 db_vdur
= db_vnfr
.get("vdur")
5425 vdur_list
= copy(db_vdur
)
5427 for index
, vdu
in enumerate(vdur_list
):
5428 vca_scaling_info
.append(
5430 "osm_vdu_id": vdu
["vdu-id-ref"],
5431 "member-vnf-index": member_vnf_index
,
5433 "vdu_index": count_index
,
5435 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5436 scaling_info
["vdu"].append(
5438 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5439 "vdu_id": vdu
["vdu-id-ref"],
5442 for interface
in vdu
["interfaces"]:
5443 scaling_info
["vdu"][index
]["interface"].append(
5445 "name": interface
["name"],
5446 "ip_address": interface
["ip-address"],
5447 "mac_address": interface
.get("mac-address"),
5449 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5450 stage
[2] = "Terminating VDUs"
5451 if scaling_info
.get("vdu-delete"):
5452 # scale_process = "RO"
5453 if self
.ro_config
.get("ng"):
5454 await self
._scale
_ng
_ro
(
5455 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5458 async def remove_vnf(
5459 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5461 """This method is to Remove VNF instances from NS.
5464 nsr_id: NS instance id
5465 nslcmop_id: nslcmop id of update
5466 vnf_instance_id: id of the VNF instance to be removed
5469 result: (str, str) COMPLETED/FAILED, details
5473 logging_text
= "Task ns={} update ".format(nsr_id
)
5474 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5475 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5476 if check_vnfr_count
> 1:
5477 stage
= ["", "", ""]
5478 step
= "Getting nslcmop from database"
5479 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5480 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5481 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5482 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5483 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5484 """ db_vnfr = self.db.get_one(
5485 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5487 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5488 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5490 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5491 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5492 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5493 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5494 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5495 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5496 return "COMPLETED", "Done"
5498 step
= "Terminate VNF Failed with"
5499 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5501 except (LcmException
, asyncio
.CancelledError
):
5503 except Exception as e
:
5504 self
.logger
.debug("Error removing VNF {}".format(e
))
5505 return "FAILED", "Error removing VNF {}".format(e
)
5507 async def _ns_redeploy_vnf(
5508 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5510 """This method updates and redeploys VNF instances
5513 nsr_id: NS instance id
5514 nslcmop_id: nslcmop id
5515 db_vnfd: VNF descriptor
5516 db_vnfr: VNF instance record
5517 db_nsr: NS instance record
5520 result: (str, str) COMPLETED/FAILED, details
5524 stage
= ["", "", ""]
5525 logging_text
= "Task ns={} update ".format(nsr_id
)
5526 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5527 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5529 # Terminate old VNF resources
5530 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5531 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5533 # old_vnfd_id = db_vnfr["vnfd-id"]
5534 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5535 new_db_vnfd
= db_vnfd
5536 # new_vnfd_ref = new_db_vnfd["id"]
5537 # new_vnfd_id = vnfd_id
5541 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5543 "name": cp
.get("id"),
5544 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5545 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5548 new_vnfr_cp
.append(vnf_cp
)
5549 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5550 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5551 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5552 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5553 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5554 updated_db_vnfr
= self
.db
.get_one(
5555 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5558 # Instantiate new VNF resources
5559 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5560 vca_scaling_info
= []
5561 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5562 scaling_info
["scaling_direction"] = "OUT"
5563 scaling_info
["vdu-create"] = {}
5564 scaling_info
["kdu-create"] = {}
5565 vdud_instantiate_list
= db_vnfd
["vdu"]
5566 for index
, vdud
in enumerate(vdud_instantiate_list
):
5567 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5571 additional_params
= (
5572 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5575 cloud_init_list
= []
5577 # TODO Information of its own ip is not available because db_vnfr is not updated.
5578 additional_params
["OSM"] = get_osm_params(
5579 updated_db_vnfr
, vdud
["id"], 1
5581 cloud_init_list
.append(
5582 self
._parse
_cloud
_init
(
5589 vca_scaling_info
.append(
5591 "osm_vdu_id": vdud
["id"],
5592 "member-vnf-index": member_vnf_index
,
5594 "vdu_index": count_index
,
5597 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5598 if self
.ro_config
.get("ng"):
5600 "New Resources to be deployed: {}".format(scaling_info
))
5601 await self
._scale
_ng
_ro
(
5602 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5604 return "COMPLETED", "Done"
5605 except (LcmException
, asyncio
.CancelledError
):
5607 except Exception as e
:
5608 self
.logger
.debug("Error updating VNF {}".format(e
))
5609 return "FAILED", "Error updating VNF {}".format(e
)
5611 async def _ns_charm_upgrade(
5617 timeout
: float = None,
5619 """This method upgrade charms in VNF instances
5622 ee_id: Execution environment id
5623 path: Local path to the charm
5625 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5626 timeout: (Float) Timeout for the ns update operation
5629 result: (str, str) COMPLETED/FAILED, details
5632 charm_type
= charm_type
or "lxc_proxy_charm"
5633 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5637 charm_type
=charm_type
,
5638 timeout
=timeout
or self
.timeout_ns_update
,
5642 return "COMPLETED", output
5644 except (LcmException
, asyncio
.CancelledError
):
5647 except Exception as e
:
5649 self
.logger
.debug("Error upgrading charm {}".format(path
))
5651 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5653 async def update(self
, nsr_id
, nslcmop_id
):
5654 """Update NS according to different update types
5656 This method performs upgrade of VNF instances then updates the revision
5657 number in VNF record
5660 nsr_id: Network service will be updated
5661 nslcmop_id: ns lcm operation id
5664 It may raise DbException, LcmException, N2VCException, K8sException
5667 # Try to lock HA task here
5668 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5669 if not task_is_locked_by_me
:
5672 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5673 self
.logger
.debug(logging_text
+ "Enter")
5675 # Set the required variables to be filled up later
5677 db_nslcmop_update
= {}
5679 nslcmop_operation_state
= None
5681 error_description_nslcmop
= ""
5683 change_type
= "updated"
5684 detailed_status
= ""
5687 # wait for any previous tasks in process
5688 step
= "Waiting for previous operations to terminate"
5689 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5690 self
._write
_ns
_status
(
5693 current_operation
="UPDATING",
5694 current_operation_id
=nslcmop_id
,
5697 step
= "Getting nslcmop from database"
5698 db_nslcmop
= self
.db
.get_one(
5699 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5701 update_type
= db_nslcmop
["operationParams"]["updateType"]
5703 step
= "Getting nsr from database"
5704 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5705 old_operational_status
= db_nsr
["operational-status"]
5706 db_nsr_update
["operational-status"] = "updating"
5707 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5708 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5710 if update_type
== "CHANGE_VNFPKG":
5712 # Get the input parameters given through update request
5713 vnf_instance_id
= db_nslcmop
["operationParams"][
5714 "changeVnfPackageData"
5715 ].get("vnfInstanceId")
5717 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5720 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5722 step
= "Getting vnfr from database"
5723 db_vnfr
= self
.db
.get_one(
5724 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5727 step
= "Getting vnfds from database"
5729 latest_vnfd
= self
.db
.get_one(
5730 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5732 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5735 current_vnf_revision
= db_vnfr
.get("revision", 1)
5736 current_vnfd
= self
.db
.get_one(
5738 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5739 fail_on_empty
=False,
5741 # Charm artifact paths will be filled up later
5743 current_charm_artifact_path
,
5744 target_charm_artifact_path
,
5745 charm_artifact_paths
,
5748 step
= "Checking if revision has changed in VNFD"
5749 if current_vnf_revision
!= latest_vnfd_revision
:
5751 change_type
= "policy_updated"
5753 # There is new revision of VNFD, update operation is required
5754 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5755 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5757 step
= "Removing the VNFD packages if they exist in the local path"
5758 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5759 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5761 step
= "Get the VNFD packages from FSMongo"
5762 self
.fs
.sync(from_path
=latest_vnfd_path
)
5763 self
.fs
.sync(from_path
=current_vnfd_path
)
5766 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5768 base_folder
= latest_vnfd
["_admin"]["storage"]
5770 for charm_index
, charm_deployed
in enumerate(
5771 get_iterable(nsr_deployed
, "VCA")
5773 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5775 # Getting charm-id and charm-type
5776 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5777 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5778 charm_type
= charm_deployed
.get("type")
5781 ee_id
= charm_deployed
.get("ee_id")
5783 step
= "Getting descriptor config"
5784 descriptor_config
= get_configuration(
5785 current_vnfd
, current_vnfd
["id"]
5788 if "execution-environment-list" in descriptor_config
:
5789 ee_list
= descriptor_config
.get(
5790 "execution-environment-list", []
5795 # There could be several charm used in the same VNF
5796 for ee_item
in ee_list
:
5797 if ee_item
.get("juju"):
5799 step
= "Getting charm name"
5800 charm_name
= ee_item
["juju"].get("charm")
5802 step
= "Setting Charm artifact paths"
5803 current_charm_artifact_path
.append(
5804 get_charm_artifact_path(
5808 current_vnf_revision
,
5811 target_charm_artifact_path
.append(
5812 get_charm_artifact_path(
5816 latest_vnfd_revision
,
5820 charm_artifact_paths
= zip(
5821 current_charm_artifact_path
, target_charm_artifact_path
5824 step
= "Checking if software version has changed in VNFD"
5825 if find_software_version(current_vnfd
) != find_software_version(
5829 step
= "Checking if existing VNF has charm"
5830 for current_charm_path
, target_charm_path
in list(
5831 charm_artifact_paths
5833 if current_charm_path
:
5835 "Software version change is not supported as VNF instance {} has charm.".format(
5840 # There is no change in the charm package, then redeploy the VNF
5841 # based on new descriptor
5842 step
= "Redeploying VNF"
5843 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5847 ) = await self
._ns
_redeploy
_vnf
(
5854 if result
== "FAILED":
5855 nslcmop_operation_state
= result
5856 error_description_nslcmop
= detailed_status
5857 db_nslcmop_update
["detailed-status"] = detailed_status
5860 + " step {} Done with result {} {}".format(
5861 step
, nslcmop_operation_state
, detailed_status
5866 step
= "Checking if any charm package has changed or not"
5867 for current_charm_path
, target_charm_path
in list(
5868 charm_artifact_paths
5872 and target_charm_path
5873 and self
.check_charm_hash_changed(
5874 current_charm_path
, target_charm_path
5878 step
= "Checking whether VNF uses juju bundle"
5879 if check_juju_bundle_existence(current_vnfd
):
5882 "Charm upgrade is not supported for the instance which"
5883 " uses juju-bundle: {}".format(
5884 check_juju_bundle_existence(current_vnfd
)
5888 step
= "Upgrading Charm"
5892 ) = await self
._ns
_charm
_upgrade
(
5895 charm_type
=charm_type
,
5896 path
=self
.fs
.path
+ target_charm_path
,
5897 timeout
=timeout_seconds
,
5900 if result
== "FAILED":
5901 nslcmop_operation_state
= result
5902 error_description_nslcmop
= detailed_status
5904 db_nslcmop_update
["detailed-status"] = detailed_status
5907 + " step {} Done with result {} {}".format(
5908 step
, nslcmop_operation_state
, detailed_status
5912 step
= "Updating policies"
5913 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5914 result
= "COMPLETED"
5915 detailed_status
= "Done"
5916 db_nslcmop_update
["detailed-status"] = "Done"
5918 # If nslcmop_operation_state is None, so any operation is not failed.
5919 if not nslcmop_operation_state
:
5920 nslcmop_operation_state
= "COMPLETED"
5922 # If update CHANGE_VNFPKG nslcmop_operation is successful
5923 # vnf revision need to be updated
5924 vnfr_update
["revision"] = latest_vnfd_revision
5925 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5929 + " task Done with result {} {}".format(
5930 nslcmop_operation_state
, detailed_status
5933 elif update_type
== "REMOVE_VNF":
5934 # This part is included in https://osm.etsi.org/gerrit/11876
5935 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5936 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5937 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5938 step
= "Removing VNF"
5939 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5940 if result
== "FAILED":
5941 nslcmop_operation_state
= result
5942 error_description_nslcmop
= detailed_status
5943 db_nslcmop_update
["detailed-status"] = detailed_status
5944 change_type
= "vnf_terminated"
5945 if not nslcmop_operation_state
:
5946 nslcmop_operation_state
= "COMPLETED"
5949 + " task Done with result {} {}".format(
5950 nslcmop_operation_state
, detailed_status
5954 elif update_type
== "OPERATE_VNF":
5955 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5956 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5957 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5958 (result
, detailed_status
) = await self
.rebuild_start_stop(
5959 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5961 if result
== "FAILED":
5962 nslcmop_operation_state
= result
5963 error_description_nslcmop
= detailed_status
5964 db_nslcmop_update
["detailed-status"] = detailed_status
5965 if not nslcmop_operation_state
:
5966 nslcmop_operation_state
= "COMPLETED"
5969 + " task Done with result {} {}".format(
5970 nslcmop_operation_state
, detailed_status
5974 # If nslcmop_operation_state is None, so any operation is not failed.
5975 # All operations are executed in overall.
5976 if not nslcmop_operation_state
:
5977 nslcmop_operation_state
= "COMPLETED"
5978 db_nsr_update
["operational-status"] = old_operational_status
5980 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5981 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5983 except asyncio
.CancelledError
:
5985 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5987 exc
= "Operation was cancelled"
5988 except asyncio
.TimeoutError
:
5989 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5991 except Exception as e
:
5992 exc
= traceback
.format_exc()
5993 self
.logger
.critical(
5994 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6003 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6004 nslcmop_operation_state
= "FAILED"
6005 db_nsr_update
["operational-status"] = old_operational_status
6007 self
._write
_ns
_status
(
6009 ns_state
=db_nsr
["nsState"],
6010 current_operation
="IDLE",
6011 current_operation_id
=None,
6012 other_update
=db_nsr_update
,
6015 self
._write
_op
_status
(
6018 error_message
=error_description_nslcmop
,
6019 operation_state
=nslcmop_operation_state
,
6020 other_update
=db_nslcmop_update
,
6023 if nslcmop_operation_state
:
6027 "nslcmop_id": nslcmop_id
,
6028 "operationState": nslcmop_operation_state
,
6030 if change_type
in ("vnf_terminated", "policy_updated"):
6031 msg
.update({"vnf_member_index": member_vnf_index
})
6032 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6033 except Exception as e
:
6035 logging_text
+ "kafka_write notification Exception {}".format(e
)
6037 self
.logger
.debug(logging_text
+ "Exit")
6038 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6039 return nslcmop_operation_state
, detailed_status
6041 async def scale(self
, nsr_id
, nslcmop_id
):
6042 # Try to lock HA task here
6043 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6044 if not task_is_locked_by_me
:
6047 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6048 stage
= ["", "", ""]
6049 tasks_dict_info
= {}
6050 # ^ stage, step, VIM progress
6051 self
.logger
.debug(logging_text
+ "Enter")
6052 # get all needed from database
6054 db_nslcmop_update
= {}
6057 # in case of error, indicates what part of scale was failed to put nsr at error status
6058 scale_process
= None
6059 old_operational_status
= ""
6060 old_config_status
= ""
6063 # wait for any previous tasks in process
6064 step
= "Waiting for previous operations to terminate"
6065 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6066 self
._write
_ns
_status
(
6069 current_operation
="SCALING",
6070 current_operation_id
=nslcmop_id
,
6073 step
= "Getting nslcmop from database"
6075 step
+ " after having waited for previous tasks to be completed"
6077 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6079 step
= "Getting nsr from database"
6080 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6081 old_operational_status
= db_nsr
["operational-status"]
6082 old_config_status
= db_nsr
["config-status"]
6084 step
= "Parsing scaling parameters"
6085 db_nsr_update
["operational-status"] = "scaling"
6086 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6087 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6089 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6091 ]["member-vnf-index"]
6092 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6094 ]["scaling-group-descriptor"]
6095 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6096 # for backward compatibility
6097 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6098 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6099 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6100 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6102 step
= "Getting vnfr from database"
6103 db_vnfr
= self
.db
.get_one(
6104 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6107 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6109 step
= "Getting vnfd from database"
6110 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6112 base_folder
= db_vnfd
["_admin"]["storage"]
6114 step
= "Getting scaling-group-descriptor"
6115 scaling_descriptor
= find_in_list(
6116 get_scaling_aspect(db_vnfd
),
6117 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6119 if not scaling_descriptor
:
6121 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6122 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6125 step
= "Sending scale order to VIM"
6126 # TODO check if ns is in a proper status
6128 if not db_nsr
["_admin"].get("scaling-group"):
6133 "_admin.scaling-group": [
6134 {"name": scaling_group
, "nb-scale-op": 0}
6138 admin_scale_index
= 0
6140 for admin_scale_index
, admin_scale_info
in enumerate(
6141 db_nsr
["_admin"]["scaling-group"]
6143 if admin_scale_info
["name"] == scaling_group
:
6144 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6146 else: # not found, set index one plus last element and add new entry with the name
6147 admin_scale_index
+= 1
6149 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6152 vca_scaling_info
= []
6153 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6154 if scaling_type
== "SCALE_OUT":
6155 if "aspect-delta-details" not in scaling_descriptor
:
6157 "Aspect delta details not fount in scaling descriptor {}".format(
6158 scaling_descriptor
["name"]
6161 # count if max-instance-count is reached
6162 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6164 scaling_info
["scaling_direction"] = "OUT"
6165 scaling_info
["vdu-create"] = {}
6166 scaling_info
["kdu-create"] = {}
6167 for delta
in deltas
:
6168 for vdu_delta
in delta
.get("vdu-delta", {}):
6169 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6170 # vdu_index also provides the number of instance of the targeted vdu
6171 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6172 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6176 additional_params
= (
6177 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6180 cloud_init_list
= []
6182 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6183 max_instance_count
= 10
6184 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6185 max_instance_count
= vdu_profile
.get(
6186 "max-number-of-instances", 10
6189 default_instance_num
= get_number_of_instances(
6192 instances_number
= vdu_delta
.get("number-of-instances", 1)
6193 nb_scale_op
+= instances_number
6195 new_instance_count
= nb_scale_op
+ default_instance_num
6196 # Control if new count is over max and vdu count is less than max.
6197 # Then assign new instance count
6198 if new_instance_count
> max_instance_count
> vdu_count
:
6199 instances_number
= new_instance_count
- max_instance_count
6201 instances_number
= instances_number
6203 if new_instance_count
> max_instance_count
:
6205 "reached the limit of {} (max-instance-count) "
6206 "scaling-out operations for the "
6207 "scaling-group-descriptor '{}'".format(
6208 nb_scale_op
, scaling_group
6211 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6213 # TODO Information of its own ip is not available because db_vnfr is not updated.
6214 additional_params
["OSM"] = get_osm_params(
6215 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6217 cloud_init_list
.append(
6218 self
._parse
_cloud
_init
(
6225 vca_scaling_info
.append(
6227 "osm_vdu_id": vdu_delta
["id"],
6228 "member-vnf-index": vnf_index
,
6230 "vdu_index": vdu_index
+ x
,
6233 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6234 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6235 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6236 kdu_name
= kdu_profile
["kdu-name"]
6237 resource_name
= kdu_profile
.get("resource-name", "")
6239 # Might have different kdus in the same delta
6240 # Should have list for each kdu
6241 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6242 scaling_info
["kdu-create"][kdu_name
] = []
6244 kdur
= get_kdur(db_vnfr
, kdu_name
)
6245 if kdur
.get("helm-chart"):
6246 k8s_cluster_type
= "helm-chart-v3"
6247 self
.logger
.debug("kdur: {}".format(kdur
))
6249 kdur
.get("helm-version")
6250 and kdur
.get("helm-version") == "v2"
6252 k8s_cluster_type
= "helm-chart"
6253 elif kdur
.get("juju-bundle"):
6254 k8s_cluster_type
= "juju-bundle"
6257 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6258 "juju-bundle. Maybe an old NBI version is running".format(
6259 db_vnfr
["member-vnf-index-ref"], kdu_name
6263 max_instance_count
= 10
6264 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6265 max_instance_count
= kdu_profile
.get(
6266 "max-number-of-instances", 10
6269 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6270 deployed_kdu
, _
= get_deployed_kdu(
6271 nsr_deployed
, kdu_name
, vnf_index
6273 if deployed_kdu
is None:
6275 "KDU '{}' for vnf '{}' not deployed".format(
6279 kdu_instance
= deployed_kdu
.get("kdu-instance")
6280 instance_num
= await self
.k8scluster_map
[
6286 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6287 kdu_model
=deployed_kdu
.get("kdu-model"),
6289 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6290 "number-of-instances", 1
6293 # Control if new count is over max and instance_num is less than max.
6294 # Then assign max instance number to kdu replica count
6295 if kdu_replica_count
> max_instance_count
> instance_num
:
6296 kdu_replica_count
= max_instance_count
6297 if kdu_replica_count
> max_instance_count
:
6299 "reached the limit of {} (max-instance-count) "
6300 "scaling-out operations for the "
6301 "scaling-group-descriptor '{}'".format(
6302 instance_num
, scaling_group
6306 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6307 vca_scaling_info
.append(
6309 "osm_kdu_id": kdu_name
,
6310 "member-vnf-index": vnf_index
,
6312 "kdu_index": instance_num
+ x
- 1,
6315 scaling_info
["kdu-create"][kdu_name
].append(
6317 "member-vnf-index": vnf_index
,
6319 "k8s-cluster-type": k8s_cluster_type
,
6320 "resource-name": resource_name
,
6321 "scale": kdu_replica_count
,
6324 elif scaling_type
== "SCALE_IN":
6325 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6327 scaling_info
["scaling_direction"] = "IN"
6328 scaling_info
["vdu-delete"] = {}
6329 scaling_info
["kdu-delete"] = {}
6331 for delta
in deltas
:
6332 for vdu_delta
in delta
.get("vdu-delta", {}):
6333 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6334 min_instance_count
= 0
6335 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6336 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6337 min_instance_count
= vdu_profile
["min-number-of-instances"]
6339 default_instance_num
= get_number_of_instances(
6340 db_vnfd
, vdu_delta
["id"]
6342 instance_num
= vdu_delta
.get("number-of-instances", 1)
6343 nb_scale_op
-= instance_num
6345 new_instance_count
= nb_scale_op
+ default_instance_num
6347 if new_instance_count
< min_instance_count
< vdu_count
:
6348 instances_number
= min_instance_count
- new_instance_count
6350 instances_number
= instance_num
6352 if new_instance_count
< min_instance_count
:
6354 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6355 "scaling-group-descriptor '{}'".format(
6356 nb_scale_op
, scaling_group
6359 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6360 vca_scaling_info
.append(
6362 "osm_vdu_id": vdu_delta
["id"],
6363 "member-vnf-index": vnf_index
,
6365 "vdu_index": vdu_index
- 1 - x
,
6368 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6369 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6370 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6371 kdu_name
= kdu_profile
["kdu-name"]
6372 resource_name
= kdu_profile
.get("resource-name", "")
6374 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6375 scaling_info
["kdu-delete"][kdu_name
] = []
6377 kdur
= get_kdur(db_vnfr
, kdu_name
)
6378 if kdur
.get("helm-chart"):
6379 k8s_cluster_type
= "helm-chart-v3"
6380 self
.logger
.debug("kdur: {}".format(kdur
))
6382 kdur
.get("helm-version")
6383 and kdur
.get("helm-version") == "v2"
6385 k8s_cluster_type
= "helm-chart"
6386 elif kdur
.get("juju-bundle"):
6387 k8s_cluster_type
= "juju-bundle"
6390 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6391 "juju-bundle. Maybe an old NBI version is running".format(
6392 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6396 min_instance_count
= 0
6397 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6398 min_instance_count
= kdu_profile
["min-number-of-instances"]
6400 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6401 deployed_kdu
, _
= get_deployed_kdu(
6402 nsr_deployed
, kdu_name
, vnf_index
6404 if deployed_kdu
is None:
6406 "KDU '{}' for vnf '{}' not deployed".format(
6410 kdu_instance
= deployed_kdu
.get("kdu-instance")
6411 instance_num
= await self
.k8scluster_map
[
6417 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6418 kdu_model
=deployed_kdu
.get("kdu-model"),
6420 kdu_replica_count
= instance_num
- kdu_delta
.get(
6421 "number-of-instances", 1
6424 if kdu_replica_count
< min_instance_count
< instance_num
:
6425 kdu_replica_count
= min_instance_count
6426 if kdu_replica_count
< min_instance_count
:
6428 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6429 "scaling-group-descriptor '{}'".format(
6430 instance_num
, scaling_group
6434 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6435 vca_scaling_info
.append(
6437 "osm_kdu_id": kdu_name
,
6438 "member-vnf-index": vnf_index
,
6440 "kdu_index": instance_num
- x
- 1,
6443 scaling_info
["kdu-delete"][kdu_name
].append(
6445 "member-vnf-index": vnf_index
,
6447 "k8s-cluster-type": k8s_cluster_type
,
6448 "resource-name": resource_name
,
6449 "scale": kdu_replica_count
,
6453 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6454 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6455 if scaling_info
["scaling_direction"] == "IN":
6456 for vdur
in reversed(db_vnfr
["vdur"]):
6457 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6458 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6459 scaling_info
["vdu"].append(
6461 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6462 "vdu_id": vdur
["vdu-id-ref"],
6466 for interface
in vdur
["interfaces"]:
6467 scaling_info
["vdu"][-1]["interface"].append(
6469 "name": interface
["name"],
6470 "ip_address": interface
["ip-address"],
6471 "mac_address": interface
.get("mac-address"),
6474 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6477 step
= "Executing pre-scale vnf-config-primitive"
6478 if scaling_descriptor
.get("scaling-config-action"):
6479 for scaling_config_action
in scaling_descriptor
[
6480 "scaling-config-action"
6483 scaling_config_action
.get("trigger") == "pre-scale-in"
6484 and scaling_type
== "SCALE_IN"
6486 scaling_config_action
.get("trigger") == "pre-scale-out"
6487 and scaling_type
== "SCALE_OUT"
6489 vnf_config_primitive
= scaling_config_action
[
6490 "vnf-config-primitive-name-ref"
6492 step
= db_nslcmop_update
[
6494 ] = "executing pre-scale scaling-config-action '{}'".format(
6495 vnf_config_primitive
6498 # look for primitive
6499 for config_primitive
in (
6500 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6501 ).get("config-primitive", ()):
6502 if config_primitive
["name"] == vnf_config_primitive
:
6506 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6507 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6508 "primitive".format(scaling_group
, vnf_config_primitive
)
6511 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6512 if db_vnfr
.get("additionalParamsForVnf"):
6513 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6515 scale_process
= "VCA"
6516 db_nsr_update
["config-status"] = "configuring pre-scaling"
6517 primitive_params
= self
._map
_primitive
_params
(
6518 config_primitive
, {}, vnfr_params
6521 # Pre-scale retry check: Check if this sub-operation has been executed before
6522 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6525 vnf_config_primitive
,
6529 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6530 # Skip sub-operation
6531 result
= "COMPLETED"
6532 result_detail
= "Done"
6535 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6536 vnf_config_primitive
, result
, result_detail
6540 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6541 # New sub-operation: Get index of this sub-operation
6543 len(db_nslcmop
.get("_admin", {}).get("operations"))
6548 + "vnf_config_primitive={} New sub-operation".format(
6549 vnf_config_primitive
6553 # retry: Get registered params for this existing sub-operation
6554 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6557 vnf_index
= op
.get("member_vnf_index")
6558 vnf_config_primitive
= op
.get("primitive")
6559 primitive_params
= op
.get("primitive_params")
6562 + "vnf_config_primitive={} Sub-operation retry".format(
6563 vnf_config_primitive
6566 # Execute the primitive, either with new (first-time) or registered (reintent) args
6567 ee_descriptor_id
= config_primitive
.get(
6568 "execution-environment-ref"
6570 primitive_name
= config_primitive
.get(
6571 "execution-environment-primitive", vnf_config_primitive
6573 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6574 nsr_deployed
["VCA"],
6575 member_vnf_index
=vnf_index
,
6577 vdu_count_index
=None,
6578 ee_descriptor_id
=ee_descriptor_id
,
6580 result
, result_detail
= await self
._ns
_execute
_primitive
(
6589 + "vnf_config_primitive={} Done with result {} {}".format(
6590 vnf_config_primitive
, result
, result_detail
6593 # Update operationState = COMPLETED | FAILED
6594 self
._update
_suboperation
_status
(
6595 db_nslcmop
, op_index
, result
, result_detail
6598 if result
== "FAILED":
6599 raise LcmException(result_detail
)
6600 db_nsr_update
["config-status"] = old_config_status
6601 scale_process
= None
6605 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6608 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6611 # SCALE-IN VCA - BEGIN
6612 if vca_scaling_info
:
6613 step
= db_nslcmop_update
[
6615 ] = "Deleting the execution environments"
6616 scale_process
= "VCA"
6617 for vca_info
in vca_scaling_info
:
6618 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6619 member_vnf_index
= str(vca_info
["member-vnf-index"])
6621 logging_text
+ "vdu info: {}".format(vca_info
)
6623 if vca_info
.get("osm_vdu_id"):
6624 vdu_id
= vca_info
["osm_vdu_id"]
6625 vdu_index
= int(vca_info
["vdu_index"])
6628 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6629 member_vnf_index
, vdu_id
, vdu_index
6631 stage
[2] = step
= "Scaling in VCA"
6632 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6633 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6634 config_update
= db_nsr
["configurationStatus"]
6635 for vca_index
, vca
in enumerate(vca_update
):
6637 (vca
or vca
.get("ee_id"))
6638 and vca
["member-vnf-index"] == member_vnf_index
6639 and vca
["vdu_count_index"] == vdu_index
6641 if vca
.get("vdu_id"):
6642 config_descriptor
= get_configuration(
6643 db_vnfd
, vca
.get("vdu_id")
6645 elif vca
.get("kdu_name"):
6646 config_descriptor
= get_configuration(
6647 db_vnfd
, vca
.get("kdu_name")
6650 config_descriptor
= get_configuration(
6651 db_vnfd
, db_vnfd
["id"]
6653 operation_params
= (
6654 db_nslcmop
.get("operationParams") or {}
6656 exec_terminate_primitives
= not operation_params
.get(
6657 "skip_terminate_primitives"
6658 ) and vca
.get("needed_terminate")
6659 task
= asyncio
.ensure_future(
6668 exec_primitives
=exec_terminate_primitives
,
6672 timeout
=self
.timeout_charm_delete
,
6675 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6678 del vca_update
[vca_index
]
6679 del config_update
[vca_index
]
6680 # wait for pending tasks of terminate primitives
6684 + "Waiting for tasks {}".format(
6685 list(tasks_dict_info
.keys())
6688 error_list
= await self
._wait
_for
_tasks
(
6692 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6697 tasks_dict_info
.clear()
6699 raise LcmException("; ".join(error_list
))
6701 db_vca_and_config_update
= {
6702 "_admin.deployed.VCA": vca_update
,
6703 "configurationStatus": config_update
,
6706 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6708 scale_process
= None
6709 # SCALE-IN VCA - END
6712 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6713 scale_process
= "RO"
6714 if self
.ro_config
.get("ng"):
6715 await self
._scale
_ng
_ro
(
6716 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6718 scaling_info
.pop("vdu-create", None)
6719 scaling_info
.pop("vdu-delete", None)
6721 scale_process
= None
6725 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6726 scale_process
= "KDU"
6727 await self
._scale
_kdu
(
6728 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6730 scaling_info
.pop("kdu-create", None)
6731 scaling_info
.pop("kdu-delete", None)
6733 scale_process
= None
6737 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6739 # SCALE-UP VCA - BEGIN
6740 if vca_scaling_info
:
6741 step
= db_nslcmop_update
[
6743 ] = "Creating new execution environments"
6744 scale_process
= "VCA"
6745 for vca_info
in vca_scaling_info
:
6746 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6747 member_vnf_index
= str(vca_info
["member-vnf-index"])
6749 logging_text
+ "vdu info: {}".format(vca_info
)
6751 vnfd_id
= db_vnfr
["vnfd-ref"]
6752 if vca_info
.get("osm_vdu_id"):
6753 vdu_index
= int(vca_info
["vdu_index"])
6754 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6755 if db_vnfr
.get("additionalParamsForVnf"):
6756 deploy_params
.update(
6758 db_vnfr
["additionalParamsForVnf"].copy()
6761 descriptor_config
= get_configuration(
6762 db_vnfd
, db_vnfd
["id"]
6764 if descriptor_config
:
6769 logging_text
=logging_text
6770 + "member_vnf_index={} ".format(member_vnf_index
),
6773 nslcmop_id
=nslcmop_id
,
6779 member_vnf_index
=member_vnf_index
,
6780 vdu_index
=vdu_index
,
6782 deploy_params
=deploy_params
,
6783 descriptor_config
=descriptor_config
,
6784 base_folder
=base_folder
,
6785 task_instantiation_info
=tasks_dict_info
,
6788 vdu_id
= vca_info
["osm_vdu_id"]
6789 vdur
= find_in_list(
6790 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6792 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6793 if vdur
.get("additionalParams"):
6794 deploy_params_vdu
= parse_yaml_strings(
6795 vdur
["additionalParams"]
6798 deploy_params_vdu
= deploy_params
6799 deploy_params_vdu
["OSM"] = get_osm_params(
6800 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6802 if descriptor_config
:
6807 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6808 member_vnf_index
, vdu_id
, vdu_index
6810 stage
[2] = step
= "Scaling out VCA"
6811 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6813 logging_text
=logging_text
6814 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6815 member_vnf_index
, vdu_id
, vdu_index
6819 nslcmop_id
=nslcmop_id
,
6825 member_vnf_index
=member_vnf_index
,
6826 vdu_index
=vdu_index
,
6828 deploy_params
=deploy_params_vdu
,
6829 descriptor_config
=descriptor_config
,
6830 base_folder
=base_folder
,
6831 task_instantiation_info
=tasks_dict_info
,
6834 # SCALE-UP VCA - END
6835 scale_process
= None
6838 # execute primitive service POST-SCALING
6839 step
= "Executing post-scale vnf-config-primitive"
6840 if scaling_descriptor
.get("scaling-config-action"):
6841 for scaling_config_action
in scaling_descriptor
[
6842 "scaling-config-action"
6845 scaling_config_action
.get("trigger") == "post-scale-in"
6846 and scaling_type
== "SCALE_IN"
6848 scaling_config_action
.get("trigger") == "post-scale-out"
6849 and scaling_type
== "SCALE_OUT"
6851 vnf_config_primitive
= scaling_config_action
[
6852 "vnf-config-primitive-name-ref"
6854 step
= db_nslcmop_update
[
6856 ] = "executing post-scale scaling-config-action '{}'".format(
6857 vnf_config_primitive
6860 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6861 if db_vnfr
.get("additionalParamsForVnf"):
6862 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6864 # look for primitive
6865 for config_primitive
in (
6866 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6867 ).get("config-primitive", ()):
6868 if config_primitive
["name"] == vnf_config_primitive
:
6872 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6873 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6874 "config-primitive".format(
6875 scaling_group
, vnf_config_primitive
6878 scale_process
= "VCA"
6879 db_nsr_update
["config-status"] = "configuring post-scaling"
6880 primitive_params
= self
._map
_primitive
_params
(
6881 config_primitive
, {}, vnfr_params
6884 # Post-scale retry check: Check if this sub-operation has been executed before
6885 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6888 vnf_config_primitive
,
6892 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6893 # Skip sub-operation
6894 result
= "COMPLETED"
6895 result_detail
= "Done"
6898 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6899 vnf_config_primitive
, result
, result_detail
6903 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6904 # New sub-operation: Get index of this sub-operation
6906 len(db_nslcmop
.get("_admin", {}).get("operations"))
6911 + "vnf_config_primitive={} New sub-operation".format(
6912 vnf_config_primitive
6916 # retry: Get registered params for this existing sub-operation
6917 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6920 vnf_index
= op
.get("member_vnf_index")
6921 vnf_config_primitive
= op
.get("primitive")
6922 primitive_params
= op
.get("primitive_params")
6925 + "vnf_config_primitive={} Sub-operation retry".format(
6926 vnf_config_primitive
6929 # Execute the primitive, either with new (first-time) or registered (reintent) args
6930 ee_descriptor_id
= config_primitive
.get(
6931 "execution-environment-ref"
6933 primitive_name
= config_primitive
.get(
6934 "execution-environment-primitive", vnf_config_primitive
6936 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6937 nsr_deployed
["VCA"],
6938 member_vnf_index
=vnf_index
,
6940 vdu_count_index
=None,
6941 ee_descriptor_id
=ee_descriptor_id
,
6943 result
, result_detail
= await self
._ns
_execute
_primitive
(
6952 + "vnf_config_primitive={} Done with result {} {}".format(
6953 vnf_config_primitive
, result
, result_detail
6956 # Update operationState = COMPLETED | FAILED
6957 self
._update
_suboperation
_status
(
6958 db_nslcmop
, op_index
, result
, result_detail
6961 if result
== "FAILED":
6962 raise LcmException(result_detail
)
6963 db_nsr_update
["config-status"] = old_config_status
6964 scale_process
= None
6969 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6970 db_nsr_update
["operational-status"] = (
6972 if old_operational_status
== "failed"
6973 else old_operational_status
6975 db_nsr_update
["config-status"] = old_config_status
6978 ROclient
.ROClientException
,
6983 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6985 except asyncio
.CancelledError
:
6987 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6989 exc
= "Operation was cancelled"
6990 except Exception as e
:
6991 exc
= traceback
.format_exc()
6992 self
.logger
.critical(
6993 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6997 self
._write
_ns
_status
(
7000 current_operation
="IDLE",
7001 current_operation_id
=None,
7004 stage
[1] = "Waiting for instantiate pending tasks."
7005 self
.logger
.debug(logging_text
+ stage
[1])
7006 exc
= await self
._wait
_for
_tasks
(
7009 self
.timeout_ns_deploy
,
7017 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7018 nslcmop_operation_state
= "FAILED"
7020 db_nsr_update
["operational-status"] = old_operational_status
7021 db_nsr_update
["config-status"] = old_config_status
7022 db_nsr_update
["detailed-status"] = ""
7024 if "VCA" in scale_process
:
7025 db_nsr_update
["config-status"] = "failed"
7026 if "RO" in scale_process
:
7027 db_nsr_update
["operational-status"] = "failed"
7030 ] = "FAILED scaling nslcmop={} {}: {}".format(
7031 nslcmop_id
, step
, exc
7034 error_description_nslcmop
= None
7035 nslcmop_operation_state
= "COMPLETED"
7036 db_nslcmop_update
["detailed-status"] = "Done"
7038 self
._write
_op
_status
(
7041 error_message
=error_description_nslcmop
,
7042 operation_state
=nslcmop_operation_state
,
7043 other_update
=db_nslcmop_update
,
7046 self
._write
_ns
_status
(
7049 current_operation
="IDLE",
7050 current_operation_id
=None,
7051 other_update
=db_nsr_update
,
7054 if nslcmop_operation_state
:
7058 "nslcmop_id": nslcmop_id
,
7059 "operationState": nslcmop_operation_state
,
7061 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7062 except Exception as e
:
7064 logging_text
+ "kafka_write notification Exception {}".format(e
)
7066 self
.logger
.debug(logging_text
+ "Exit")
7067 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7069 async def _scale_kdu(
7070 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7072 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7073 for kdu_name
in _scaling_info
:
7074 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7075 deployed_kdu
, index
= get_deployed_kdu(
7076 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7078 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7079 kdu_instance
= deployed_kdu
["kdu-instance"]
7080 kdu_model
= deployed_kdu
.get("kdu-model")
7081 scale
= int(kdu_scaling_info
["scale"])
7082 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7085 "collection": "nsrs",
7086 "filter": {"_id": nsr_id
},
7087 "path": "_admin.deployed.K8s.{}".format(index
),
7090 step
= "scaling application {}".format(
7091 kdu_scaling_info
["resource-name"]
7093 self
.logger
.debug(logging_text
+ step
)
7095 if kdu_scaling_info
["type"] == "delete":
7096 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7099 and kdu_config
.get("terminate-config-primitive")
7100 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7102 terminate_config_primitive_list
= kdu_config
.get(
7103 "terminate-config-primitive"
7105 terminate_config_primitive_list
.sort(
7106 key
=lambda val
: int(val
["seq"])
7110 terminate_config_primitive
7111 ) in terminate_config_primitive_list
:
7112 primitive_params_
= self
._map
_primitive
_params
(
7113 terminate_config_primitive
, {}, {}
7115 step
= "execute terminate config primitive"
7116 self
.logger
.debug(logging_text
+ step
)
7117 await asyncio
.wait_for(
7118 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7119 cluster_uuid
=cluster_uuid
,
7120 kdu_instance
=kdu_instance
,
7121 primitive_name
=terminate_config_primitive
["name"],
7122 params
=primitive_params_
,
7129 await asyncio
.wait_for(
7130 self
.k8scluster_map
[k8s_cluster_type
].scale(
7133 kdu_scaling_info
["resource-name"],
7135 cluster_uuid
=cluster_uuid
,
7136 kdu_model
=kdu_model
,
7140 timeout
=self
.timeout_vca_on_error
,
7143 if kdu_scaling_info
["type"] == "create":
7144 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7147 and kdu_config
.get("initial-config-primitive")
7148 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7150 initial_config_primitive_list
= kdu_config
.get(
7151 "initial-config-primitive"
7153 initial_config_primitive_list
.sort(
7154 key
=lambda val
: int(val
["seq"])
7157 for initial_config_primitive
in initial_config_primitive_list
:
7158 primitive_params_
= self
._map
_primitive
_params
(
7159 initial_config_primitive
, {}, {}
7161 step
= "execute initial config primitive"
7162 self
.logger
.debug(logging_text
+ step
)
7163 await asyncio
.wait_for(
7164 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7165 cluster_uuid
=cluster_uuid
,
7166 kdu_instance
=kdu_instance
,
7167 primitive_name
=initial_config_primitive
["name"],
7168 params
=primitive_params_
,
7175 async def _scale_ng_ro(
7176 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7178 nsr_id
= db_nslcmop
["nsInstanceId"]
7179 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7182 # read from db: vnfd's for every vnf
7185 # for each vnf in ns, read vnfd
7186 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7187 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7188 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7189 # if we haven't this vnfd, read it from db
7190 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7192 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7193 db_vnfds
.append(vnfd
)
7194 n2vc_key
= self
.n2vc
.get_public_key()
7195 n2vc_key_list
= [n2vc_key
]
7198 vdu_scaling_info
.get("vdu-create"),
7199 vdu_scaling_info
.get("vdu-delete"),
7202 # db_vnfr has been updated, update db_vnfrs to use it
7203 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7204 await self
._instantiate
_ng
_ro
(
7214 start_deploy
=time(),
7215 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7217 if vdu_scaling_info
.get("vdu-delete"):
7219 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7222 async def extract_prometheus_scrape_jobs(
7223 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7225 # look if exist a file called 'prometheus*.j2' and
7226 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7230 for f
in artifact_content
7231 if f
.startswith("prometheus") and f
.endswith(".j2")
7237 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7241 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7242 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7244 vnfr_id
= vnfr_id
.replace("-", "")
7246 "JOB_NAME": vnfr_id
,
7247 "TARGET_IP": target_ip
,
7248 "EXPORTER_POD_IP": host_name
,
7249 "EXPORTER_POD_PORT": host_port
,
7251 job_list
= parse_job(job_data
, variables
)
7252 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7253 for job
in job_list
:
7255 not isinstance(job
.get("job_name"), str)
7256 or vnfr_id
not in job
["job_name"]
7258 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7259 job
["nsr_id"] = nsr_id
7260 job
["vnfr_id"] = vnfr_id
7263 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7264 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7265 self
.logger
.info(logging_text
+ "Enter")
7266 stage
= ["Preparing the environment", ""]
7267 # database nsrs record
7271 # in case of error, indicates what part of scale was failed to put nsr at error status
7272 start_deploy
= time()
7274 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7275 vim_account_id
= db_vnfr
.get("vim-account-id")
7276 vim_info_key
= "vim:" + vim_account_id
7277 vdur
= find_in_list(
7278 db_vnfr
["vdur"], lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7281 vdu_vim_name
= vdur
["name"]
7282 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7283 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7284 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7285 # wait for any previous tasks in process
7286 stage
[1] = "Waiting for previous operations to terminate"
7287 self
.logger
.info(stage
[1])
7288 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7290 stage
[1] = "Reading from database."
7291 self
.logger
.info(stage
[1])
7292 self
._write
_ns
_status
(
7295 current_operation
=operation_type
.upper(),
7296 current_operation_id
=nslcmop_id
7298 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7301 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7302 db_nsr_update
["operational-status"] = operation_type
7303 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7307 "vim_vm_id": vim_vm_id
,
7309 "vdu_index": additional_param
["count-index"],
7310 "vdu_id": vdur
["id"],
7311 "target_vim": target_vim
,
7312 "vim_account_id": vim_account_id
7315 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7316 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7317 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7318 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7319 self
.logger
.info("response from RO: {}".format(result_dict
))
7320 action_id
= result_dict
["action_id"]
7321 await self
._wait
_ng
_ro
(
7322 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_operate
7324 return "COMPLETED", "Done"
7325 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7326 self
.logger
.error("Exit Exception {}".format(e
))
7328 except asyncio
.CancelledError
:
7329 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7330 exc
= "Operation was cancelled"
7331 except Exception as e
:
7332 exc
= traceback
.format_exc()
7333 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7334 return "FAILED", "Error in operate VNF {}".format(exc
)
7336 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7338 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7340 :param: vim_account_id: VIM Account ID
7342 :return: (cloud_name, cloud_credential)
7344 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7345 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7347 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7349 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7351 :param: vim_account_id: VIM Account ID
7353 :return: (cloud_name, cloud_credential)
7355 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7356 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7358 async def migrate(self
, nsr_id
, nslcmop_id
):
7360 Migrate VNFs and VDUs instances in a NS
7362 :param: nsr_id: NS Instance ID
7363 :param: nslcmop_id: nslcmop ID of migrate
7366 # Try to lock HA task here
7367 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7368 if not task_is_locked_by_me
:
7370 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7371 self
.logger
.debug(logging_text
+ "Enter")
7372 # get all needed from database
7374 db_nslcmop_update
= {}
7375 nslcmop_operation_state
= None
7379 # in case of error, indicates what part of scale was failed to put nsr at error status
7380 start_deploy
= time()
7383 # wait for any previous tasks in process
7384 step
= "Waiting for previous operations to terminate"
7385 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7387 self
._write
_ns
_status
(
7390 current_operation
="MIGRATING",
7391 current_operation_id
=nslcmop_id
,
7393 step
= "Getting nslcmop from database"
7395 step
+ " after having waited for previous tasks to be completed"
7397 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7398 migrate_params
= db_nslcmop
.get("operationParams")
7401 target
.update(migrate_params
)
7402 desc
= await self
.RO
.migrate(nsr_id
, target
)
7403 self
.logger
.debug("RO return > {}".format(desc
))
7404 action_id
= desc
["action_id"]
7405 await self
._wait
_ng
_ro
(
7406 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7409 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7410 self
.logger
.error("Exit Exception {}".format(e
))
7412 except asyncio
.CancelledError
:
7413 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7414 exc
= "Operation was cancelled"
7415 except Exception as e
:
7416 exc
= traceback
.format_exc()
7417 self
.logger
.critical(
7418 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7421 self
._write
_ns
_status
(
7424 current_operation
="IDLE",
7425 current_operation_id
=None,
7428 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7429 nslcmop_operation_state
= "FAILED"
7431 nslcmop_operation_state
= "COMPLETED"
7432 db_nslcmop_update
["detailed-status"] = "Done"
7433 db_nsr_update
["detailed-status"] = "Done"
7435 self
._write
_op
_status
(
7439 operation_state
=nslcmop_operation_state
,
7440 other_update
=db_nslcmop_update
,
7442 if nslcmop_operation_state
:
7446 "nslcmop_id": nslcmop_id
,
7447 "operationState": nslcmop_operation_state
,
7449 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7450 except Exception as e
:
7452 logging_text
+ "kafka_write notification Exception {}".format(e
)
7454 self
.logger
.debug(logging_text
+ "Exit")
7455 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7458 async def heal(self
, nsr_id
, nslcmop_id
):
7462 :param nsr_id: ns instance to heal
7463 :param nslcmop_id: operation to run
7467 # Try to lock HA task here
7468 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7469 if not task_is_locked_by_me
:
7472 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7473 stage
= ["", "", ""]
7474 tasks_dict_info
= {}
7475 # ^ stage, step, VIM progress
7476 self
.logger
.debug(logging_text
+ "Enter")
7477 # get all needed from database
7479 db_nslcmop_update
= {}
7481 db_vnfrs
= {} # vnf's info indexed by _id
7483 old_operational_status
= ""
7484 old_config_status
= ""
7487 # wait for any previous tasks in process
7488 step
= "Waiting for previous operations to terminate"
7489 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7490 self
._write
_ns
_status
(
7493 current_operation
="HEALING",
7494 current_operation_id
=nslcmop_id
,
7497 step
= "Getting nslcmop from database"
7499 step
+ " after having waited for previous tasks to be completed"
7501 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7503 step
= "Getting nsr from database"
7504 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7505 old_operational_status
= db_nsr
["operational-status"]
7506 old_config_status
= db_nsr
["config-status"]
7509 "_admin.deployed.RO.operational-status": "healing",
7511 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7513 step
= "Sending heal order to VIM"
7514 task_ro
= asyncio
.ensure_future(
7516 logging_text
=logging_text
,
7518 db_nslcmop
=db_nslcmop
,
7522 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7523 tasks_dict_info
[task_ro
] = "Healing at VIM"
7527 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7528 self
.logger
.debug(logging_text
+ stage
[1])
7529 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7530 self
.fs
.sync(db_nsr
["nsd-id"])
7532 # read from db: vnfr's of this ns
7533 step
= "Getting vnfrs from db"
7534 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7535 for vnfr
in db_vnfrs_list
:
7536 db_vnfrs
[vnfr
["_id"]] = vnfr
7537 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7539 # Check for each target VNF
7540 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7541 for target_vnf
in target_list
:
7542 # Find this VNF in the list from DB
7543 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7545 db_vnfr
= db_vnfrs
[vnfr_id
]
7546 vnfd_id
= db_vnfr
.get("vnfd-id")
7547 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7548 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7549 base_folder
= vnfd
["_admin"]["storage"]
7554 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7555 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7557 # Check each target VDU and deploy N2VC
7558 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7559 deploy_params_vdu
= target_vdu
7560 # Set run-day1 vnf level value if not vdu level value exists
7561 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7562 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7563 vdu_name
= target_vdu
.get("vdu-id", None)
7564 # TODO: Get vdu_id from vdud.
7566 # For multi instance VDU count-index is mandatory
7567 # For single session VDU count-indes is 0
7568 vdu_index
= target_vdu
.get("count-index",0)
7570 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7571 stage
[1] = "Deploying Execution Environments."
7572 self
.logger
.debug(logging_text
+ stage
[1])
7574 # VNF Level charm. Normal case when proxy charms.
7575 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7576 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7577 if descriptor_config
:
7578 # Continue if healed machine is management machine
7579 vnf_ip_address
= db_vnfr
.get("ip-address")
7580 target_instance
= None
7581 for instance
in db_vnfr
.get("vdur", None):
7582 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7583 target_instance
= instance
7585 if vnf_ip_address
== target_instance
.get("ip-address"):
7587 logging_text
=logging_text
7588 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7589 member_vnf_index
, vdu_name
, vdu_index
7593 nslcmop_id
=nslcmop_id
,
7599 member_vnf_index
=member_vnf_index
,
7602 deploy_params
=deploy_params_vdu
,
7603 descriptor_config
=descriptor_config
,
7604 base_folder
=base_folder
,
7605 task_instantiation_info
=tasks_dict_info
,
7609 # VDU Level charm. Normal case with native charms.
7610 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7611 if descriptor_config
:
7613 logging_text
=logging_text
7614 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7615 member_vnf_index
, vdu_name
, vdu_index
7619 nslcmop_id
=nslcmop_id
,
7625 member_vnf_index
=member_vnf_index
,
7626 vdu_index
=vdu_index
,
7628 deploy_params
=deploy_params_vdu
,
7629 descriptor_config
=descriptor_config
,
7630 base_folder
=base_folder
,
7631 task_instantiation_info
=tasks_dict_info
,
7636 ROclient
.ROClientException
,
7641 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7643 except asyncio
.CancelledError
:
7645 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7647 exc
= "Operation was cancelled"
7648 except Exception as e
:
7649 exc
= traceback
.format_exc()
7650 self
.logger
.critical(
7651 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7656 stage
[1] = "Waiting for healing pending tasks."
7657 self
.logger
.debug(logging_text
+ stage
[1])
7658 exc
= await self
._wait
_for
_tasks
(
7661 self
.timeout_ns_deploy
,
7669 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7670 nslcmop_operation_state
= "FAILED"
7672 db_nsr_update
["operational-status"] = old_operational_status
7673 db_nsr_update
["config-status"] = old_config_status
7676 ] = "FAILED healing nslcmop={} {}: {}".format(
7677 nslcmop_id
, step
, exc
7679 for task
, task_name
in tasks_dict_info
.items():
7680 if not task
.done() or task
.cancelled() or task
.exception():
7681 if task_name
.startswith(self
.task_name_deploy_vca
):
7682 # A N2VC task is pending
7683 db_nsr_update
["config-status"] = "failed"
7685 # RO task is pending
7686 db_nsr_update
["operational-status"] = "failed"
7688 error_description_nslcmop
= None
7689 nslcmop_operation_state
= "COMPLETED"
7690 db_nslcmop_update
["detailed-status"] = "Done"
7691 db_nsr_update
["detailed-status"] = "Done"
7692 db_nsr_update
["operational-status"] = "running"
7693 db_nsr_update
["config-status"] = "configured"
7695 self
._write
_op
_status
(
7698 error_message
=error_description_nslcmop
,
7699 operation_state
=nslcmop_operation_state
,
7700 other_update
=db_nslcmop_update
,
7703 self
._write
_ns
_status
(
7706 current_operation
="IDLE",
7707 current_operation_id
=None,
7708 other_update
=db_nsr_update
,
7711 if nslcmop_operation_state
:
7715 "nslcmop_id": nslcmop_id
,
7716 "operationState": nslcmop_operation_state
,
7718 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7719 except Exception as e
:
7721 logging_text
+ "kafka_write notification Exception {}".format(e
)
7723 self
.logger
.debug(logging_text
+ "Exit")
7724 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7735 :param logging_text: preffix text to use at logging
7736 :param nsr_id: nsr identity
7737 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7738 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7739 :return: None or exception
7741 def get_vim_account(vim_account_id
):
7743 if vim_account_id
in db_vims
:
7744 return db_vims
[vim_account_id
]
7745 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7746 db_vims
[vim_account_id
] = db_vim
7751 ns_params
= db_nslcmop
.get("operationParams")
7752 if ns_params
and ns_params
.get("timeout_ns_heal"):
7753 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7755 timeout_ns_heal
= self
.timeout
.get(
7756 "ns_heal", self
.timeout_ns_heal
7761 nslcmop_id
= db_nslcmop
["_id"]
7763 "action_id": nslcmop_id
,
7765 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7766 target
.update(db_nslcmop
.get("operationParams", {}))
7768 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7769 desc
= await self
.RO
.recreate(nsr_id
, target
)
7770 self
.logger
.debug("RO return > {}".format(desc
))
7771 action_id
= desc
["action_id"]
7772 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7773 await self
._wait
_ng
_ro
(
7774 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7780 "_admin.deployed.RO.operational-status": "running",
7781 "detailed-status": " ".join(stage
),
7783 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7784 self
._write
_op
_status
(nslcmop_id
, stage
)
7786 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7789 except Exception as e
:
7790 stage
[2] = "ERROR healing at VIM"
7791 #self.set_vnfr_at_error(db_vnfrs, str(e))
7793 "Error healing at VIM {}".format(e
),
7794 exc_info
=not isinstance(
7797 ROclient
.ROClientException
,
7823 task_instantiation_info
,
7826 # launch instantiate_N2VC in a asyncio task and register task object
7827 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7828 # if not found, create one entry and update database
7829 # fill db_nsr._admin.deployed.VCA.<index>
7832 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7834 if "execution-environment-list" in descriptor_config
:
7835 ee_list
= descriptor_config
.get("execution-environment-list", [])
7836 elif "juju" in descriptor_config
:
7837 ee_list
= [descriptor_config
] # ns charms
7838 else: # other types as script are not supported
7841 for ee_item
in ee_list
:
7844 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7845 ee_item
.get("juju"), ee_item
.get("helm-chart")
7848 ee_descriptor_id
= ee_item
.get("id")
7849 if ee_item
.get("juju"):
7850 vca_name
= ee_item
["juju"].get("charm")
7853 if ee_item
["juju"].get("charm") is not None
7856 if ee_item
["juju"].get("cloud") == "k8s":
7857 vca_type
= "k8s_proxy_charm"
7858 elif ee_item
["juju"].get("proxy") is False:
7859 vca_type
= "native_charm"
7860 elif ee_item
.get("helm-chart"):
7861 vca_name
= ee_item
["helm-chart"]
7862 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7865 vca_type
= "helm-v3"
7868 logging_text
+ "skipping non juju neither charm configuration"
7873 for vca_index
, vca_deployed
in enumerate(
7874 db_nsr
["_admin"]["deployed"]["VCA"]
7876 if not vca_deployed
:
7879 vca_deployed
.get("member-vnf-index") == member_vnf_index
7880 and vca_deployed
.get("vdu_id") == vdu_id
7881 and vca_deployed
.get("kdu_name") == kdu_name
7882 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7883 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7887 # not found, create one.
7889 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7892 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7894 target
+= "/kdu/{}".format(kdu_name
)
7896 "target_element": target
,
7897 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7898 "member-vnf-index": member_vnf_index
,
7900 "kdu_name": kdu_name
,
7901 "vdu_count_index": vdu_index
,
7902 "operational-status": "init", # TODO revise
7903 "detailed-status": "", # TODO revise
7904 "step": "initial-deploy", # TODO revise
7906 "vdu_name": vdu_name
,
7908 "ee_descriptor_id": ee_descriptor_id
,
7912 # create VCA and configurationStatus in db
7914 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7915 "configurationStatus.{}".format(vca_index
): dict(),
7917 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7919 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7921 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7922 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7923 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7926 task_n2vc
= asyncio
.ensure_future(
7928 logging_text
=logging_text
,
7929 vca_index
=vca_index
,
7935 vdu_index
=vdu_index
,
7936 deploy_params
=deploy_params
,
7937 config_descriptor
=descriptor_config
,
7938 base_folder
=base_folder
,
7939 nslcmop_id
=nslcmop_id
,
7943 ee_config_descriptor
=ee_item
,
7946 self
.lcm_tasks
.register(
7950 "instantiate_N2VC-{}".format(vca_index
),
7953 task_instantiation_info
[
7955 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7956 member_vnf_index
or "", vdu_id
or ""
7959 async def heal_N2VC(
7976 ee_config_descriptor
,
7978 nsr_id
= db_nsr
["_id"]
7979 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7980 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
7981 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
7982 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
7984 "collection": "nsrs",
7985 "filter": {"_id": nsr_id
},
7986 "path": db_update_entry
,
7992 element_under_configuration
= nsr_id
7996 vnfr_id
= db_vnfr
["_id"]
7997 osm_config
["osm"]["vnf_id"] = vnfr_id
7999 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8001 if vca_type
== "native_charm":
8004 index_number
= vdu_index
or 0
8007 element_type
= "VNF"
8008 element_under_configuration
= vnfr_id
8009 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8011 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8012 element_type
= "VDU"
8013 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8014 osm_config
["osm"]["vdu_id"] = vdu_id
8016 namespace
+= ".{}".format(kdu_name
)
8017 element_type
= "KDU"
8018 element_under_configuration
= kdu_name
8019 osm_config
["osm"]["kdu_name"] = kdu_name
8022 if base_folder
["pkg-dir"]:
8023 artifact_path
= "{}/{}/{}/{}".format(
8024 base_folder
["folder"],
8025 base_folder
["pkg-dir"],
8028 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8033 artifact_path
= "{}/Scripts/{}/{}/".format(
8034 base_folder
["folder"],
8037 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8042 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8044 # get initial_config_primitive_list that applies to this element
8045 initial_config_primitive_list
= config_descriptor
.get(
8046 "initial-config-primitive"
8050 "Initial config primitive list > {}".format(
8051 initial_config_primitive_list
8055 # add config if not present for NS charm
8056 ee_descriptor_id
= ee_config_descriptor
.get("id")
8057 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8058 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8059 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8063 "Initial config primitive list #2 > {}".format(
8064 initial_config_primitive_list
8067 # n2vc_redesign STEP 3.1
8068 # find old ee_id if exists
8069 ee_id
= vca_deployed
.get("ee_id")
8071 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8072 # create or register execution environment in VCA. Only for native charms when healing
8073 if vca_type
== "native_charm":
8074 step
= "Waiting to VM being up and getting IP address"
8075 self
.logger
.debug(logging_text
+ step
)
8076 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8085 credentials
= {"hostname": rw_mgmt_ip
}
8087 username
= deep_get(
8088 config_descriptor
, ("config-access", "ssh-access", "default-user")
8090 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8091 # merged. Meanwhile let's get username from initial-config-primitive
8092 if not username
and initial_config_primitive_list
:
8093 for config_primitive
in initial_config_primitive_list
:
8094 for param
in config_primitive
.get("parameter", ()):
8095 if param
["name"] == "ssh-username":
8096 username
= param
["value"]
8100 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8101 "'config-access.ssh-access.default-user'"
8103 credentials
["username"] = username
8105 # n2vc_redesign STEP 3.2
8106 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8107 self
._write
_configuration
_status
(
8109 vca_index
=vca_index
,
8110 status
="REGISTERING",
8111 element_under_configuration
=element_under_configuration
,
8112 element_type
=element_type
,
8115 step
= "register execution environment {}".format(credentials
)
8116 self
.logger
.debug(logging_text
+ step
)
8117 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8118 credentials
=credentials
,
8119 namespace
=namespace
,
8124 # update ee_id en db
8126 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8128 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8130 # for compatibility with MON/POL modules, the need model and application name at database
8131 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8132 # Not sure if this need to be done when healing
8134 ee_id_parts = ee_id.split(".")
8135 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8136 if len(ee_id_parts) >= 2:
8137 model_name = ee_id_parts[0]
8138 application_name = ee_id_parts[1]
8139 db_nsr_update[db_update_entry + "model"] = model_name
8140 db_nsr_update[db_update_entry + "application"] = application_name
8143 # n2vc_redesign STEP 3.3
8144 # Install configuration software. Only for native charms.
8145 step
= "Install configuration Software"
8147 self
._write
_configuration
_status
(
8149 vca_index
=vca_index
,
8150 status
="INSTALLING SW",
8151 element_under_configuration
=element_under_configuration
,
8152 element_type
=element_type
,
8153 #other_update=db_nsr_update,
8157 # TODO check if already done
8158 self
.logger
.debug(logging_text
+ step
)
8160 if vca_type
== "native_charm":
8161 config_primitive
= next(
8162 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8165 if config_primitive
:
8166 config
= self
._map
_primitive
_params
(
8167 config_primitive
, {}, deploy_params
8169 await self
.vca_map
[vca_type
].install_configuration_sw(
8171 artifact_path
=artifact_path
,
8179 # write in db flag of configuration_sw already installed
8181 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8184 # Not sure if this need to be done when healing
8186 # add relations for this VCA (wait for other peers related with this VCA)
8187 await self._add_vca_relations(
8188 logging_text=logging_text,
8191 vca_index=vca_index,
8195 # if SSH access is required, then get execution environment SSH public
8196 # if native charm we have waited already to VM be UP
8197 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8200 # self.logger.debug("get ssh key block")
8202 config_descriptor
, ("config-access", "ssh-access", "required")
8204 # self.logger.debug("ssh key needed")
8205 # Needed to inject a ssh key
8208 ("config-access", "ssh-access", "default-user"),
8210 step
= "Install configuration Software, getting public ssh key"
8211 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8212 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8215 step
= "Insert public key into VM user={} ssh_key={}".format(
8219 # self.logger.debug("no need to get ssh key")
8220 step
= "Waiting to VM being up and getting IP address"
8221 self
.logger
.debug(logging_text
+ step
)
8223 # n2vc_redesign STEP 5.1
8224 # wait for RO (ip-address) Insert pub_key into VM
8225 # IMPORTANT: We need do wait for RO to complete healing operation.
8226 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8229 rw_mgmt_ip
= await self
.wait_kdu_up(
8230 logging_text
, nsr_id
, vnfr_id
, kdu_name
8233 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8243 rw_mgmt_ip
= None # This is for a NS configuration
8245 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8247 # store rw_mgmt_ip in deploy params for later replacement
8248 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8251 # get run-day1 operation parameter
8252 runDay1
= deploy_params
.get("run-day1",False)
8253 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8255 # n2vc_redesign STEP 6 Execute initial config primitive
8256 step
= "execute initial config primitive"
8258 # wait for dependent primitives execution (NS -> VNF -> VDU)
8259 if initial_config_primitive_list
:
8260 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8262 # stage, in function of element type: vdu, kdu, vnf or ns
8263 my_vca
= vca_deployed_list
[vca_index
]
8264 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8266 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8267 elif my_vca
.get("member-vnf-index"):
8269 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8272 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8274 self
._write
_configuration
_status
(
8275 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8278 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8280 check_if_terminated_needed
= True
8281 for initial_config_primitive
in initial_config_primitive_list
:
8282 # adding information on the vca_deployed if it is a NS execution environment
8283 if not vca_deployed
["member-vnf-index"]:
8284 deploy_params
["ns_config_info"] = json
.dumps(
8285 self
._get
_ns
_config
_info
(nsr_id
)
8287 # TODO check if already done
8288 primitive_params_
= self
._map
_primitive
_params
(
8289 initial_config_primitive
, {}, deploy_params
8292 step
= "execute primitive '{}' params '{}'".format(
8293 initial_config_primitive
["name"], primitive_params_
8295 self
.logger
.debug(logging_text
+ step
)
8296 await self
.vca_map
[vca_type
].exec_primitive(
8298 primitive_name
=initial_config_primitive
["name"],
8299 params_dict
=primitive_params_
,
8304 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8305 if check_if_terminated_needed
:
8306 if config_descriptor
.get("terminate-config-primitive"):
8308 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8310 check_if_terminated_needed
= False
8312 # TODO register in database that primitive is done
8314 # STEP 7 Configure metrics
8315 # Not sure if this need to be done when healing
8317 if vca_type == "helm" or vca_type == "helm-v3":
8318 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8320 artifact_path=artifact_path,
8321 ee_config_descriptor=ee_config_descriptor,
8324 target_ip=rw_mgmt_ip,
8330 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8333 for job in prometheus_jobs:
8336 {"job_name": job["job_name"]},
8339 fail_on_empty=False,
8343 step
= "instantiated at VCA"
8344 self
.logger
.debug(logging_text
+ step
)
8346 self
._write
_configuration
_status
(
8347 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8350 except Exception as e
: # TODO not use Exception but N2VC exception
8351 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8353 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8356 "Exception while {} : {}".format(step
, e
), exc_info
=True
8358 self
._write
_configuration
_status
(
8359 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8361 raise LcmException("{} {}".format(step
, e
)) from e
8363 async def _wait_heal_ro(
8369 while time() <= start_time
+ timeout
:
8370 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8371 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8372 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8373 if operational_status_ro
!= "healing":
8375 await asyncio
.sleep(15, loop
=self
.loop
)
8376 else: # timeout_ns_deploy
8377 raise NgRoException("Timeout waiting ns to deploy")
8379 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8381 Vertical Scale the VDUs in a NS
8383 :param: nsr_id: NS Instance ID
8384 :param: nslcmop_id: nslcmop ID of migrate
8387 # Try to lock HA task here
8388 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8389 if not task_is_locked_by_me
:
8391 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8392 self
.logger
.debug(logging_text
+ "Enter")
8393 # get all needed from database
8395 db_nslcmop_update
= {}
8396 nslcmop_operation_state
= None
8400 # in case of error, indicates what part of scale was failed to put nsr at error status
8401 start_deploy
= time()
8404 # wait for any previous tasks in process
8405 step
= "Waiting for previous operations to terminate"
8406 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8408 self
._write
_ns
_status
(
8411 current_operation
="VerticalScale",
8412 current_operation_id
=nslcmop_id
8414 step
= "Getting nslcmop from database"
8415 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8416 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8417 operationParams
= db_nslcmop
.get("operationParams")
8419 target
.update(operationParams
)
8420 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8421 self
.logger
.debug("RO return > {}".format(desc
))
8422 action_id
= desc
["action_id"]
8423 await self
._wait
_ng
_ro
(
8424 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
8426 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8427 self
.logger
.error("Exit Exception {}".format(e
))
8429 except asyncio
.CancelledError
:
8430 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8431 exc
= "Operation was cancelled"
8432 except Exception as e
:
8433 exc
= traceback
.format_exc()
8434 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8436 self
._write
_ns
_status
(
8439 current_operation
="IDLE",
8440 current_operation_id
=None,
8445 ] = "FAILED {}: {}".format(step
, exc
)
8446 nslcmop_operation_state
= "FAILED"
8448 nslcmop_operation_state
= "COMPLETED"
8449 db_nslcmop_update
["detailed-status"] = "Done"
8450 db_nsr_update
["detailed-status"] = "Done"
8452 self
._write
_op
_status
(
8456 operation_state
=nslcmop_operation_state
,
8457 other_update
=db_nslcmop_update
,
8459 if nslcmop_operation_state
:
8463 "nslcmop_id": nslcmop_id
,
8464 "operationState": nslcmop_operation_state
,
8466 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8467 except Exception as e
:
8469 logging_text
+ "kafka_write notification Exception {}".format(e
)
8471 self
.logger
.debug(logging_text
+ "Exit")
8472 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")