1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
, get_kdur
88 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
89 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
90 from n2vc
.definitions
import RelationEndpoint
91 from n2vc
.k8s_helm_conn
import K8sHelmConnector
92 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
93 from n2vc
.k8s_juju_conn
import K8sJujuConnector
95 from osm_common
.dbbase
import DbException
96 from osm_common
.fsbase
import FsException
98 from osm_lcm
.data_utils
.database
.database
import Database
99 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
101 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
102 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
104 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
105 from osm_lcm
.osm_config
import OsmConfigBuilder
106 from osm_lcm
.prometheus
import parse_job
108 from copy
import copy
, deepcopy
109 from time
import time
110 from uuid
import uuid4
112 from random
import randint
114 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
117 class NsLcm(LcmBase
):
118 timeout_vca_on_error
= (
120 ) # Time for charm from first time at blocked,error status to mark as failed
121 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
122 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
123 timeout_charm_delete
= 10 * 60
124 timeout_primitive
= 30 * 60 # timeout for primitive execution
125 timeout_ns_update
= 30 * 60 # timeout for ns update
126 timeout_progress_primitive
= (
128 ) # timeout for some progress in a primitive execution
129 timeout_migrate
= 1800 # default global timeout for migrating vnfs
131 SUBOPERATION_STATUS_NOT_FOUND
= -1
132 SUBOPERATION_STATUS_NEW
= -2
133 SUBOPERATION_STATUS_SKIP
= -3
134 task_name_deploy_vca
= "Deploying VCA"
136 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
138 Init, Connect to database, filesystem storage, and messaging
139 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
142 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
144 self
.db
= Database().instance
.db
145 self
.fs
= Filesystem().instance
.fs
147 self
.lcm_tasks
= lcm_tasks
148 self
.timeout
= config
["timeout"]
149 self
.ro_config
= config
["ro_config"]
150 self
.ng_ro
= config
["ro_config"].get("ng")
151 self
.vca_config
= config
["VCA"].copy()
153 # create N2VC connector
154 self
.n2vc
= N2VCJujuConnector(
157 on_update_db
=self
._on
_update
_n
2vc
_db
,
162 self
.conn_helm_ee
= LCMHelmConn(
165 vca_config
=self
.vca_config
,
166 on_update_db
=self
._on
_update
_n
2vc
_db
,
169 self
.k8sclusterhelm2
= K8sHelmConnector(
170 kubectl_command
=self
.vca_config
.get("kubectlpath"),
171 helm_command
=self
.vca_config
.get("helmpath"),
178 self
.k8sclusterhelm3
= K8sHelm3Connector(
179 kubectl_command
=self
.vca_config
.get("kubectlpath"),
180 helm_command
=self
.vca_config
.get("helm3path"),
187 self
.k8sclusterjuju
= K8sJujuConnector(
188 kubectl_command
=self
.vca_config
.get("kubectlpath"),
189 juju_command
=self
.vca_config
.get("jujupath"),
192 on_update_db
=self
._on
_update
_k
8s
_db
,
197 self
.k8scluster_map
= {
198 "helm-chart": self
.k8sclusterhelm2
,
199 "helm-chart-v3": self
.k8sclusterhelm3
,
200 "chart": self
.k8sclusterhelm3
,
201 "juju-bundle": self
.k8sclusterjuju
,
202 "juju": self
.k8sclusterjuju
,
206 "lxc_proxy_charm": self
.n2vc
,
207 "native_charm": self
.n2vc
,
208 "k8s_proxy_charm": self
.n2vc
,
209 "helm": self
.conn_helm_ee
,
210 "helm-v3": self
.conn_helm_ee
,
214 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
217 def increment_ip_mac(ip_mac
, vm_index
=1):
218 if not isinstance(ip_mac
, str):
221 # try with ipv4 look for last dot
222 i
= ip_mac
.rfind(".")
225 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
226 # try with ipv6 or mac look for last colon. Operate in hex
227 i
= ip_mac
.rfind(":")
230 # format in hex, len can be 2 for mac or 4 for ipv6
231 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
232 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
238 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
240 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
243 # TODO filter RO descriptor fields...
247 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
248 db_dict
["deploymentStatus"] = ro_descriptor
249 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
251 except Exception as e
:
253 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
256 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
258 # remove last dot from path (if exists)
259 if path
.endswith("."):
262 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
263 # .format(table, filter, path, updated_data))
266 nsr_id
= filter.get("_id")
268 # read ns record from database
269 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
270 current_ns_status
= nsr
.get("nsState")
272 # get vca status for NS
273 status_dict
= await self
.n2vc
.get_status(
274 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
279 db_dict
["vcaStatus"] = status_dict
280 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
282 # update configurationStatus for this VCA
284 vca_index
= int(path
[path
.rfind(".") + 1 :])
287 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
289 vca_status
= vca_list
[vca_index
].get("status")
291 configuration_status_list
= nsr
.get("configurationStatus")
292 config_status
= configuration_status_list
[vca_index
].get("status")
294 if config_status
== "BROKEN" and vca_status
!= "failed":
295 db_dict
["configurationStatus"][vca_index
] = "READY"
296 elif config_status
!= "BROKEN" and vca_status
== "failed":
297 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
298 except Exception as e
:
299 # not update configurationStatus
300 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
302 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
303 # if nsState = 'DEGRADED' check if all is OK
305 if current_ns_status
in ("READY", "DEGRADED"):
306 error_description
= ""
308 if status_dict
.get("machines"):
309 for machine_id
in status_dict
.get("machines"):
310 machine
= status_dict
.get("machines").get(machine_id
)
311 # check machine agent-status
312 if machine
.get("agent-status"):
313 s
= machine
.get("agent-status").get("status")
316 error_description
+= (
317 "machine {} agent-status={} ; ".format(
321 # check machine instance status
322 if machine
.get("instance-status"):
323 s
= machine
.get("instance-status").get("status")
326 error_description
+= (
327 "machine {} instance-status={} ; ".format(
332 if status_dict
.get("applications"):
333 for app_id
in status_dict
.get("applications"):
334 app
= status_dict
.get("applications").get(app_id
)
335 # check application status
336 if app
.get("status"):
337 s
= app
.get("status").get("status")
340 error_description
+= (
341 "application {} status={} ; ".format(app_id
, s
)
344 if error_description
:
345 db_dict
["errorDescription"] = error_description
346 if current_ns_status
== "READY" and is_degraded
:
347 db_dict
["nsState"] = "DEGRADED"
348 if current_ns_status
== "DEGRADED" and not is_degraded
:
349 db_dict
["nsState"] = "READY"
352 self
.update_db_2("nsrs", nsr_id
, db_dict
)
354 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
356 except Exception as e
:
357 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
359 async def _on_update_k8s_db(
360 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
363 Updating vca status in NSR record
364 :param cluster_uuid: UUID of a k8s cluster
365 :param kdu_instance: The unique name of the KDU instance
366 :param filter: To get nsr_id
367 :cluster_type: The cluster type (juju, k8s)
371 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
372 # .format(cluster_uuid, kdu_instance, filter))
374 nsr_id
= filter.get("_id")
376 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
377 cluster_uuid
=cluster_uuid
,
378 kdu_instance
=kdu_instance
,
380 complete_status
=True,
386 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
388 if cluster_type
in ("juju-bundle", "juju"):
389 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
390 # status in a similar way between Juju Bundles and Helm Charts on this side
391 await self
.k8sclusterjuju
.update_vca_status(
392 db_dict
["vcaStatus"],
398 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
402 self
.update_db_2("nsrs", nsr_id
, db_dict
)
403 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
405 except Exception as e
:
406 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
409 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
411 env
= Environment(undefined
=StrictUndefined
)
412 template
= env
.from_string(cloud_init_text
)
413 return template
.render(additional_params
or {})
414 except UndefinedError
as e
:
416 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
417 "file, must be provided in the instantiation parameters inside the "
418 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
420 except (TemplateError
, TemplateNotFound
) as e
:
422 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
427 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
428 cloud_init_content
= cloud_init_file
= None
430 if vdu
.get("cloud-init-file"):
431 base_folder
= vnfd
["_admin"]["storage"]
432 if base_folder
["pkg-dir"]:
433 cloud_init_file
= "{}/{}/cloud_init/{}".format(
434 base_folder
["folder"],
435 base_folder
["pkg-dir"],
436 vdu
["cloud-init-file"],
439 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
440 base_folder
["folder"],
441 vdu
["cloud-init-file"],
443 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
444 cloud_init_content
= ci_file
.read()
445 elif vdu
.get("cloud-init"):
446 cloud_init_content
= vdu
["cloud-init"]
448 return cloud_init_content
449 except FsException
as e
:
451 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
452 vnfd
["id"], vdu
["id"], cloud_init_file
, e
456 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
458 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]),
461 additional_params
= vdur
.get("additionalParams")
462 return parse_yaml_strings(additional_params
)
464 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
466 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
467 :param vnfd: input vnfd
468 :param new_id: overrides vnf id if provided
469 :param additionalParams: Instantiation params for VNFs provided
470 :param nsrId: Id of the NSR
471 :return: copy of vnfd
473 vnfd_RO
= deepcopy(vnfd
)
474 # remove unused by RO configuration, monitoring, scaling and internal keys
475 vnfd_RO
.pop("_id", None)
476 vnfd_RO
.pop("_admin", None)
477 vnfd_RO
.pop("monitoring-param", None)
478 vnfd_RO
.pop("scaling-group-descriptor", None)
479 vnfd_RO
.pop("kdu", None)
480 vnfd_RO
.pop("k8s-cluster", None)
482 vnfd_RO
["id"] = new_id
484 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
485 for vdu
in get_iterable(vnfd_RO
, "vdu"):
486 vdu
.pop("cloud-init-file", None)
487 vdu
.pop("cloud-init", None)
491 def ip_profile_2_RO(ip_profile
):
492 RO_ip_profile
= deepcopy(ip_profile
)
493 if "dns-server" in RO_ip_profile
:
494 if isinstance(RO_ip_profile
["dns-server"], list):
495 RO_ip_profile
["dns-address"] = []
496 for ds
in RO_ip_profile
.pop("dns-server"):
497 RO_ip_profile
["dns-address"].append(ds
["address"])
499 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
500 if RO_ip_profile
.get("ip-version") == "ipv4":
501 RO_ip_profile
["ip-version"] = "IPv4"
502 if RO_ip_profile
.get("ip-version") == "ipv6":
503 RO_ip_profile
["ip-version"] = "IPv6"
504 if "dhcp-params" in RO_ip_profile
:
505 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
508 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
509 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
510 if db_vim
["_admin"]["operationalState"] != "ENABLED":
512 "VIM={} is not available. operationalState={}".format(
513 vim_account
, db_vim
["_admin"]["operationalState"]
516 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
519 def get_ro_wim_id_for_wim_account(self
, wim_account
):
520 if isinstance(wim_account
, str):
521 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
522 if db_wim
["_admin"]["operationalState"] != "ENABLED":
524 "WIM={} is not available. operationalState={}".format(
525 wim_account
, db_wim
["_admin"]["operationalState"]
528 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
533 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
535 db_vdu_push_list
= []
537 db_update
= {"_admin.modified": time()}
539 for vdu_id
, vdu_count
in vdu_create
.items():
543 for vdur
in reversed(db_vnfr
["vdur"])
544 if vdur
["vdu-id-ref"] == vdu_id
549 # Read the template saved in the db:
550 self
.logger
.debug(f
"No vdur in the database. Using the vdur-template to scale")
551 vdur_template
= db_vnfr
.get("vdur-template")
552 if not vdur_template
:
554 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
558 vdur
= vdur_template
[0]
559 #Delete a template from the database after using it
560 self
.db
.set_one("vnfrs",
561 {"_id": db_vnfr
["_id"]},
563 pull
={"vdur-template": {"_id": vdur
['_id']}}
565 for count
in range(vdu_count
):
566 vdur_copy
= deepcopy(vdur
)
567 vdur_copy
["status"] = "BUILD"
568 vdur_copy
["status-detailed"] = None
569 vdur_copy
["ip-address"] = None
570 vdur_copy
["_id"] = str(uuid4())
571 vdur_copy
["count-index"] += count
+ 1
572 vdur_copy
["id"] = "{}-{}".format(
573 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
575 vdur_copy
.pop("vim_info", None)
576 for iface
in vdur_copy
["interfaces"]:
577 if iface
.get("fixed-ip"):
578 iface
["ip-address"] = self
.increment_ip_mac(
579 iface
["ip-address"], count
+ 1
582 iface
.pop("ip-address", None)
583 if iface
.get("fixed-mac"):
584 iface
["mac-address"] = self
.increment_ip_mac(
585 iface
["mac-address"], count
+ 1
588 iface
.pop("mac-address", None)
592 ) # only first vdu can be managment of vnf
593 db_vdu_push_list
.append(vdur_copy
)
594 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
596 if len(db_vnfr
["vdur"]) == 1:
597 # The scale will move to 0 instances
598 self
.logger
.debug(f
"Scaling to 0 !, creating the template with the last vdur")
599 template_vdur
= [db_vnfr
["vdur"][0]]
600 for vdu_id
, vdu_count
in vdu_delete
.items():
602 indexes_to_delete
= [
604 for iv
in enumerate(db_vnfr
["vdur"])
605 if iv
[1]["vdu-id-ref"] == vdu_id
609 "vdur.{}.status".format(i
): "DELETING"
610 for i
in indexes_to_delete
[-vdu_count
:]
614 # it must be deleted one by one because common.db does not allow otherwise
617 for v
in reversed(db_vnfr
["vdur"])
618 if v
["vdu-id-ref"] == vdu_id
620 for vdu
in vdus_to_delete
[:vdu_count
]:
623 {"_id": db_vnfr
["_id"]},
625 pull
={"vdur": {"_id": vdu
["_id"]}},
629 db_push
["vdur"] = db_vdu_push_list
631 db_push
["vdur-template"] = template_vdur
634 db_vnfr
["vdur-template"] = template_vdur
635 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
636 # modify passed dictionary db_vnfr
637 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
638 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
640 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
642 Updates database nsr with the RO info for the created vld
643 :param ns_update_nsr: dictionary to be filled with the updated info
644 :param db_nsr: content of db_nsr. This is also modified
645 :param nsr_desc_RO: nsr descriptor from RO
646 :return: Nothing, LcmException is raised on errors
649 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
650 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
651 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
653 vld
["vim-id"] = net_RO
.get("vim_net_id")
654 vld
["name"] = net_RO
.get("vim_name")
655 vld
["status"] = net_RO
.get("status")
656 vld
["status-detailed"] = net_RO
.get("error_msg")
657 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
661 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
664 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
666 for db_vnfr
in db_vnfrs
.values():
667 vnfr_update
= {"status": "ERROR"}
668 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
669 if "status" not in vdur
:
670 vdur
["status"] = "ERROR"
671 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
673 vdur
["status-detailed"] = str(error_text
)
675 "vdur.{}.status-detailed".format(vdu_index
)
677 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
678 except DbException
as e
:
679 self
.logger
.error("Cannot update vnf. {}".format(e
))
681 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
683 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
684 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
685 :param nsr_desc_RO: nsr descriptor from RO
686 :return: Nothing, LcmException is raised on errors
688 for vnf_index
, db_vnfr
in db_vnfrs
.items():
689 for vnf_RO
in nsr_desc_RO
["vnfs"]:
690 if vnf_RO
["member_vnf_index"] != vnf_index
:
693 if vnf_RO
.get("ip_address"):
694 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
697 elif not db_vnfr
.get("ip-address"):
698 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
699 raise LcmExceptionNoMgmtIP(
700 "ns member_vnf_index '{}' has no IP address".format(
705 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
706 vdur_RO_count_index
= 0
707 if vdur
.get("pdu-type"):
709 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
710 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
712 if vdur
["count-index"] != vdur_RO_count_index
:
713 vdur_RO_count_index
+= 1
715 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
716 if vdur_RO
.get("ip_address"):
717 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
719 vdur
["ip-address"] = None
720 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
721 vdur
["name"] = vdur_RO
.get("vim_name")
722 vdur
["status"] = vdur_RO
.get("status")
723 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
724 for ifacer
in get_iterable(vdur
, "interfaces"):
725 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
726 if ifacer
["name"] == interface_RO
.get("internal_name"):
727 ifacer
["ip-address"] = interface_RO
.get(
730 ifacer
["mac-address"] = interface_RO
.get(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
737 "from VIM info".format(
738 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
741 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
745 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
747 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
751 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
752 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
753 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
755 vld
["vim-id"] = net_RO
.get("vim_net_id")
756 vld
["name"] = net_RO
.get("vim_name")
757 vld
["status"] = net_RO
.get("status")
758 vld
["status-detailed"] = net_RO
.get("error_msg")
759 vnfr_update
["vld.{}".format(vld_index
)] = vld
763 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
768 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
773 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
778 def _get_ns_config_info(self
, nsr_id
):
780 Generates a mapping between vnf,vdu elements and the N2VC id
781 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
782 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
783 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
784 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
786 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
787 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
789 ns_config_info
= {"osm-config-mapping": mapping
}
790 for vca
in vca_deployed_list
:
791 if not vca
["member-vnf-index"]:
793 if not vca
["vdu_id"]:
794 mapping
[vca
["member-vnf-index"]] = vca
["application"]
798 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
800 ] = vca
["application"]
801 return ns_config_info
803 async def _instantiate_ng_ro(
820 def get_vim_account(vim_account_id
):
822 if vim_account_id
in db_vims
:
823 return db_vims
[vim_account_id
]
824 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
825 db_vims
[vim_account_id
] = db_vim
828 # modify target_vld info with instantiation parameters
829 def parse_vld_instantiation_params(
830 target_vim
, target_vld
, vld_params
, target_sdn
832 if vld_params
.get("ip-profile"):
833 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
836 if vld_params
.get("provider-network"):
837 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
840 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
841 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
844 if vld_params
.get("wimAccountId"):
845 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
846 target_vld
["vim_info"][target_wim
] = {}
847 for param
in ("vim-network-name", "vim-network-id"):
848 if vld_params
.get(param
):
849 if isinstance(vld_params
[param
], dict):
850 for vim
, vim_net
in vld_params
[param
].items():
851 other_target_vim
= "vim:" + vim
853 target_vld
["vim_info"],
854 (other_target_vim
, param
.replace("-", "_")),
857 else: # isinstance str
858 target_vld
["vim_info"][target_vim
][
859 param
.replace("-", "_")
860 ] = vld_params
[param
]
861 if vld_params
.get("common_id"):
862 target_vld
["common_id"] = vld_params
.get("common_id")
864 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
865 def update_ns_vld_target(target
, ns_params
):
866 for vnf_params
in ns_params
.get("vnf", ()):
867 if vnf_params
.get("vimAccountId"):
871 for vnfr
in db_vnfrs
.values()
872 if vnf_params
["member-vnf-index"]
873 == vnfr
["member-vnf-index-ref"]
877 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
878 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
879 target_vld
= find_in_list(
880 get_iterable(vdur
, "interfaces"),
881 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
884 if vnf_params
.get("vimAccountId") not in a_vld
.get(
887 target
["ns"]["vld"][a_index
].get("vim_info").update(
889 "vim:{}".format(vnf_params
["vimAccountId"]): {
890 "vim_network_name": ""
895 nslcmop_id
= db_nslcmop
["_id"]
897 "name": db_nsr
["name"],
900 "image": deepcopy(db_nsr
["image"]),
901 "flavor": deepcopy(db_nsr
["flavor"]),
902 "action_id": nslcmop_id
,
903 "cloud_init_content": {},
905 for image
in target
["image"]:
906 image
["vim_info"] = {}
907 for flavor
in target
["flavor"]:
908 flavor
["vim_info"] = {}
909 if db_nsr
.get("affinity-or-anti-affinity-group"):
910 target
["affinity-or-anti-affinity-group"] = deepcopy(
911 db_nsr
["affinity-or-anti-affinity-group"]
913 for affinity_or_anti_affinity_group
in target
[
914 "affinity-or-anti-affinity-group"
916 affinity_or_anti_affinity_group
["vim_info"] = {}
918 if db_nslcmop
.get("lcmOperationType") != "instantiate":
919 # get parameters of instantiation:
920 db_nslcmop_instantiate
= self
.db
.get_list(
923 "nsInstanceId": db_nslcmop
["nsInstanceId"],
924 "lcmOperationType": "instantiate",
927 ns_params
= db_nslcmop_instantiate
.get("operationParams")
929 ns_params
= db_nslcmop
.get("operationParams")
930 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
931 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
934 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
935 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
939 "mgmt-network": vld
.get("mgmt-network", False),
940 "type": vld
.get("type"),
943 "vim_network_name": vld
.get("vim-network-name"),
944 "vim_account_id": ns_params
["vimAccountId"],
948 # check if this network needs SDN assist
949 if vld
.get("pci-interfaces"):
950 db_vim
= get_vim_account(ns_params
["vimAccountId"])
951 sdnc_id
= db_vim
["config"].get("sdn-controller")
953 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
954 target_sdn
= "sdn:{}".format(sdnc_id
)
955 target_vld
["vim_info"][target_sdn
] = {
957 "target_vim": target_vim
,
959 "type": vld
.get("type"),
962 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
963 for nsd_vnf_profile
in nsd_vnf_profiles
:
964 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
965 if cp
["virtual-link-profile-id"] == vld
["id"]:
967 "member_vnf:{}.{}".format(
968 cp
["constituent-cpd-id"][0][
969 "constituent-base-element-id"
971 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
973 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
975 # check at nsd descriptor, if there is an ip-profile
977 nsd_vlp
= find_in_list(
978 get_virtual_link_profiles(nsd
),
979 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
984 and nsd_vlp
.get("virtual-link-protocol-data")
985 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
987 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
990 ip_profile_dest_data
= {}
991 if "ip-version" in ip_profile_source_data
:
992 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
995 if "cidr" in ip_profile_source_data
:
996 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
999 if "gateway-ip" in ip_profile_source_data
:
1000 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1003 if "dhcp-enabled" in ip_profile_source_data
:
1004 ip_profile_dest_data
["dhcp-params"] = {
1005 "enabled": ip_profile_source_data
["dhcp-enabled"]
1007 vld_params
["ip-profile"] = ip_profile_dest_data
1009 # update vld_params with instantiation params
1010 vld_instantiation_params
= find_in_list(
1011 get_iterable(ns_params
, "vld"),
1012 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1014 if vld_instantiation_params
:
1015 vld_params
.update(vld_instantiation_params
)
1016 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1017 target
["ns"]["vld"].append(target_vld
)
1018 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1019 update_ns_vld_target(target
, ns_params
)
1021 for vnfr
in db_vnfrs
.values():
1022 vnfd
= find_in_list(
1023 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1025 vnf_params
= find_in_list(
1026 get_iterable(ns_params
, "vnf"),
1027 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1029 target_vnf
= deepcopy(vnfr
)
1030 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1031 for vld
in target_vnf
.get("vld", ()):
1032 # check if connected to a ns.vld, to fill target'
1033 vnf_cp
= find_in_list(
1034 vnfd
.get("int-virtual-link-desc", ()),
1035 lambda cpd
: cpd
.get("id") == vld
["id"],
1038 ns_cp
= "member_vnf:{}.{}".format(
1039 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1041 if cp2target
.get(ns_cp
):
1042 vld
["target"] = cp2target
[ns_cp
]
1045 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1047 # check if this network needs SDN assist
1049 if vld
.get("pci-interfaces"):
1050 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1051 sdnc_id
= db_vim
["config"].get("sdn-controller")
1053 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1054 target_sdn
= "sdn:{}".format(sdnc_id
)
1055 vld
["vim_info"][target_sdn
] = {
1057 "target_vim": target_vim
,
1059 "type": vld
.get("type"),
1062 # check at vnfd descriptor, if there is an ip-profile
1064 vnfd_vlp
= find_in_list(
1065 get_virtual_link_profiles(vnfd
),
1066 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1070 and vnfd_vlp
.get("virtual-link-protocol-data")
1071 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1073 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1076 ip_profile_dest_data
= {}
1077 if "ip-version" in ip_profile_source_data
:
1078 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1081 if "cidr" in ip_profile_source_data
:
1082 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1085 if "gateway-ip" in ip_profile_source_data
:
1086 ip_profile_dest_data
[
1088 ] = ip_profile_source_data
["gateway-ip"]
1089 if "dhcp-enabled" in ip_profile_source_data
:
1090 ip_profile_dest_data
["dhcp-params"] = {
1091 "enabled": ip_profile_source_data
["dhcp-enabled"]
1094 vld_params
["ip-profile"] = ip_profile_dest_data
1095 # update vld_params with instantiation params
1097 vld_instantiation_params
= find_in_list(
1098 get_iterable(vnf_params
, "internal-vld"),
1099 lambda i_vld
: i_vld
["name"] == vld
["id"],
1101 if vld_instantiation_params
:
1102 vld_params
.update(vld_instantiation_params
)
1103 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1106 for vdur
in target_vnf
.get("vdur", ()):
1107 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1108 continue # This vdu must not be created
1109 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1111 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1114 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1115 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1118 and vdu_configuration
.get("config-access")
1119 and vdu_configuration
.get("config-access").get("ssh-access")
1121 vdur
["ssh-keys"] = ssh_keys_all
1122 vdur
["ssh-access-required"] = vdu_configuration
[
1124 ]["ssh-access"]["required"]
1127 and vnf_configuration
.get("config-access")
1128 and vnf_configuration
.get("config-access").get("ssh-access")
1129 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1131 vdur
["ssh-keys"] = ssh_keys_all
1132 vdur
["ssh-access-required"] = vnf_configuration
[
1134 ]["ssh-access"]["required"]
1135 elif ssh_keys_instantiation
and find_in_list(
1136 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1138 vdur
["ssh-keys"] = ssh_keys_instantiation
1140 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1142 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1144 if vdud
.get("cloud-init-file"):
1145 vdur
["cloud-init"] = "{}:file:{}".format(
1146 vnfd
["_id"], vdud
.get("cloud-init-file")
1148 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1149 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1150 base_folder
= vnfd
["_admin"]["storage"]
1151 if base_folder
["pkg-dir"]:
1152 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1153 base_folder
["folder"],
1154 base_folder
["pkg-dir"],
1155 vdud
.get("cloud-init-file"),
1158 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1159 base_folder
["folder"],
1160 vdud
.get("cloud-init-file"),
1162 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1163 target
["cloud_init_content"][
1166 elif vdud
.get("cloud-init"):
1167 vdur
["cloud-init"] = "{}:vdu:{}".format(
1168 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1170 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1171 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1174 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1175 deploy_params_vdu
= self
._format
_additional
_params
(
1176 vdur
.get("additionalParams") or {}
1178 deploy_params_vdu
["OSM"] = get_osm_params(
1179 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1181 vdur
["additionalParams"] = deploy_params_vdu
1184 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1185 if target_vim
not in ns_flavor
["vim_info"]:
1186 ns_flavor
["vim_info"][target_vim
] = {}
1189 # in case alternative images are provided we must check if they should be applied
1190 # for the vim_type, modify the vim_type taking into account
1191 ns_image_id
= int(vdur
["ns-image-id"])
1192 if vdur
.get("alt-image-ids"):
1193 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1194 vim_type
= db_vim
["vim_type"]
1195 for alt_image_id
in vdur
.get("alt-image-ids"):
1196 ns_alt_image
= target
["image"][int(alt_image_id
)]
1197 if vim_type
== ns_alt_image
.get("vim-type"):
1198 # must use alternative image
1200 "use alternative image id: {}".format(alt_image_id
)
1202 ns_image_id
= alt_image_id
1203 vdur
["ns-image-id"] = ns_image_id
1205 ns_image
= target
["image"][int(ns_image_id
)]
1206 if target_vim
not in ns_image
["vim_info"]:
1207 ns_image
["vim_info"][target_vim
] = {}
1210 if vdur
.get("affinity-or-anti-affinity-group-id"):
1211 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1212 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1213 if target_vim
not in ns_ags
["vim_info"]:
1214 ns_ags
["vim_info"][target_vim
] = {}
1216 vdur
["vim_info"] = {target_vim
: {}}
1217 # instantiation parameters
1219 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1220 # vdud["id"]), None)
1221 vdur_list
.append(vdur
)
1222 target_vnf
["vdur"] = vdur_list
1223 target
["vnf"].append(target_vnf
)
1225 desc
= await self
.RO
.deploy(nsr_id
, target
)
1226 self
.logger
.debug("RO return > {}".format(desc
))
1227 action_id
= desc
["action_id"]
1228 await self
._wait
_ng
_ro
(
1229 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
1234 "_admin.deployed.RO.operational-status": "running",
1235 "detailed-status": " ".join(stage
),
1237 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1238 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1239 self
._write
_op
_status
(nslcmop_id
, stage
)
1241 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1245 async def _wait_ng_ro(
1254 detailed_status_old
= None
1256 start_time
= start_time
or time()
1257 while time() <= start_time
+ timeout
:
1258 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1259 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1260 if desc_status
["status"] == "FAILED":
1261 raise NgRoException(desc_status
["details"])
1262 elif desc_status
["status"] == "BUILD":
1264 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1265 elif desc_status
["status"] == "DONE":
1267 stage
[2] = "Deployed at VIM"
1270 assert False, "ROclient.check_ns_status returns unknown {}".format(
1271 desc_status
["status"]
1273 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1274 detailed_status_old
= stage
[2]
1275 db_nsr_update
["detailed-status"] = " ".join(stage
)
1276 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1277 self
._write
_op
_status
(nslcmop_id
, stage
)
1278 await asyncio
.sleep(15, loop
=self
.loop
)
1279 else: # timeout_ns_deploy
1280 raise NgRoException("Timeout waiting ns to deploy")
1282 async def _terminate_ng_ro(
1283 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1288 start_deploy
= time()
1295 "action_id": nslcmop_id
,
1297 desc
= await self
.RO
.deploy(nsr_id
, target
)
1298 action_id
= desc
["action_id"]
1299 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1300 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1303 + "ns terminate action at RO. action_id={}".format(action_id
)
1307 delete_timeout
= 20 * 60 # 20 minutes
1308 await self
._wait
_ng
_ro
(
1309 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
1312 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1313 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1315 await self
.RO
.delete(nsr_id
)
1316 except Exception as e
:
1317 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1318 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1319 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1320 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1322 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1324 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1325 failed_detail
.append("delete conflict: {}".format(e
))
1328 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1331 failed_detail
.append("delete error: {}".format(e
))
1334 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1338 stage
[2] = "Error deleting from VIM"
1340 stage
[2] = "Deleted from VIM"
1341 db_nsr_update
["detailed-status"] = " ".join(stage
)
1342 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1343 self
._write
_op
_status
(nslcmop_id
, stage
)
1346 raise LcmException("; ".join(failed_detail
))
1349 async def instantiate_RO(
1363 :param logging_text: preffix text to use at logging
1364 :param nsr_id: nsr identity
1365 :param nsd: database content of ns descriptor
1366 :param db_nsr: database content of ns record
1367 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1369 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1370 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1371 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1372 :return: None or exception
1375 start_deploy
= time()
1376 ns_params
= db_nslcmop
.get("operationParams")
1377 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1378 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1380 timeout_ns_deploy
= self
.timeout
.get(
1381 "ns_deploy", self
.timeout_ns_deploy
1384 # Check for and optionally request placement optimization. Database will be updated if placement activated
1385 stage
[2] = "Waiting for Placement."
1386 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1387 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1388 for vnfr
in db_vnfrs
.values():
1389 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1392 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1394 return await self
._instantiate
_ng
_ro
(
1407 except Exception as e
:
1408 stage
[2] = "ERROR deploying at VIM"
1409 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1411 "Error deploying at VIM {}".format(e
),
1412 exc_info
=not isinstance(
1415 ROclient
.ROClientException
,
1424 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1426 Wait for kdu to be up, get ip address
1427 :param logging_text: prefix use for logging
1431 :return: IP address, K8s services
1434 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1437 while nb_tries
< 360:
1438 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1442 for x
in get_iterable(db_vnfr
, "kdur")
1443 if x
.get("kdu-name") == kdu_name
1449 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1451 if kdur
.get("status"):
1452 if kdur
["status"] in ("READY", "ENABLED"):
1453 return kdur
.get("ip-address"), kdur
.get("services")
1456 "target KDU={} is in error state".format(kdu_name
)
1459 await asyncio
.sleep(10, loop
=self
.loop
)
1461 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1463 async def wait_vm_up_insert_key_ro(
1464 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1467 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1468 :param logging_text: prefix use for logging
1473 :param pub_key: public ssh key to inject, None to skip
1474 :param user: user to apply the public ssh key
1478 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1482 target_vdu_id
= None
1488 if ro_retries
>= 360: # 1 hour
1490 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1493 await asyncio
.sleep(10, loop
=self
.loop
)
1496 if not target_vdu_id
:
1497 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1499 if not vdu_id
: # for the VNF case
1500 if db_vnfr
.get("status") == "ERROR":
1502 "Cannot inject ssh-key because target VNF is in error state"
1504 ip_address
= db_vnfr
.get("ip-address")
1510 for x
in get_iterable(db_vnfr
, "vdur")
1511 if x
.get("ip-address") == ip_address
1519 for x
in get_iterable(db_vnfr
, "vdur")
1520 if x
.get("vdu-id-ref") == vdu_id
1521 and x
.get("count-index") == vdu_index
1527 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1528 ): # If only one, this should be the target vdu
1529 vdur
= db_vnfr
["vdur"][0]
1532 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1533 vnfr_id
, vdu_id
, vdu_index
1536 # New generation RO stores information at "vim_info"
1539 if vdur
.get("vim_info"):
1541 t
for t
in vdur
["vim_info"]
1542 ) # there should be only one key
1543 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1545 vdur
.get("pdu-type")
1546 or vdur
.get("status") == "ACTIVE"
1547 or ng_ro_status
== "ACTIVE"
1549 ip_address
= vdur
.get("ip-address")
1552 target_vdu_id
= vdur
["vdu-id-ref"]
1553 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1555 "Cannot inject ssh-key because target VM is in error state"
1558 if not target_vdu_id
:
1561 # inject public key into machine
1562 if pub_key
and user
:
1563 self
.logger
.debug(logging_text
+ "Inserting RO key")
1564 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1565 if vdur
.get("pdu-type"):
1566 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1569 ro_vm_id
= "{}-{}".format(
1570 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1571 ) # TODO add vdu_index
1575 "action": "inject_ssh_key",
1579 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1581 desc
= await self
.RO
.deploy(nsr_id
, target
)
1582 action_id
= desc
["action_id"]
1583 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1586 # wait until NS is deployed at RO
1588 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1589 ro_nsr_id
= deep_get(
1590 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1594 result_dict
= await self
.RO
.create_action(
1596 item_id_name
=ro_nsr_id
,
1598 "add_public_key": pub_key
,
1603 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1604 if not result_dict
or not isinstance(result_dict
, dict):
1606 "Unknown response from RO when injecting key"
1608 for result
in result_dict
.values():
1609 if result
.get("vim_result") == 200:
1612 raise ROclient
.ROClientException(
1613 "error injecting key: {}".format(
1614 result
.get("description")
1618 except NgRoException
as e
:
1620 "Reaching max tries injecting key. Error: {}".format(e
)
1622 except ROclient
.ROClientException
as e
:
1626 + "error injecting key: {}. Retrying until {} seconds".format(
1633 "Reaching max tries injecting key. Error: {}".format(e
)
1640 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1642 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1644 my_vca
= vca_deployed_list
[vca_index
]
1645 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1646 # vdu or kdu: no dependencies
1650 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1651 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1652 configuration_status_list
= db_nsr
["configurationStatus"]
1653 for index
, vca_deployed
in enumerate(configuration_status_list
):
1654 if index
== vca_index
:
1657 if not my_vca
.get("member-vnf-index") or (
1658 vca_deployed
.get("member-vnf-index")
1659 == my_vca
.get("member-vnf-index")
1661 internal_status
= configuration_status_list
[index
].get("status")
1662 if internal_status
== "READY":
1664 elif internal_status
== "BROKEN":
1666 "Configuration aborted because dependent charm/s has failed"
1671 # no dependencies, return
1673 await asyncio
.sleep(10)
1676 raise LcmException("Configuration aborted because dependent charm/s timeout")
1678 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1681 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1683 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1684 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1687 async def instantiate_N2VC(
1704 ee_config_descriptor
,
1706 nsr_id
= db_nsr
["_id"]
1707 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1708 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1709 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1710 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1712 "collection": "nsrs",
1713 "filter": {"_id": nsr_id
},
1714 "path": db_update_entry
,
1720 element_under_configuration
= nsr_id
1724 vnfr_id
= db_vnfr
["_id"]
1725 osm_config
["osm"]["vnf_id"] = vnfr_id
1727 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1729 if vca_type
== "native_charm":
1732 index_number
= vdu_index
or 0
1735 element_type
= "VNF"
1736 element_under_configuration
= vnfr_id
1737 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1739 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1740 element_type
= "VDU"
1741 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1742 osm_config
["osm"]["vdu_id"] = vdu_id
1744 namespace
+= ".{}".format(kdu_name
)
1745 element_type
= "KDU"
1746 element_under_configuration
= kdu_name
1747 osm_config
["osm"]["kdu_name"] = kdu_name
1750 if base_folder
["pkg-dir"]:
1751 artifact_path
= "{}/{}/{}/{}".format(
1752 base_folder
["folder"],
1753 base_folder
["pkg-dir"],
1756 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1761 artifact_path
= "{}/Scripts/{}/{}/".format(
1762 base_folder
["folder"],
1765 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1770 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1772 # get initial_config_primitive_list that applies to this element
1773 initial_config_primitive_list
= config_descriptor
.get(
1774 "initial-config-primitive"
1778 "Initial config primitive list > {}".format(
1779 initial_config_primitive_list
1783 # add config if not present for NS charm
1784 ee_descriptor_id
= ee_config_descriptor
.get("id")
1785 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1786 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1787 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1791 "Initial config primitive list #2 > {}".format(
1792 initial_config_primitive_list
1795 # n2vc_redesign STEP 3.1
1796 # find old ee_id if exists
1797 ee_id
= vca_deployed
.get("ee_id")
1799 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1800 # create or register execution environment in VCA
1801 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1803 self
._write
_configuration
_status
(
1805 vca_index
=vca_index
,
1807 element_under_configuration
=element_under_configuration
,
1808 element_type
=element_type
,
1811 step
= "create execution environment"
1812 self
.logger
.debug(logging_text
+ step
)
1816 if vca_type
== "k8s_proxy_charm":
1817 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1818 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1819 namespace
=namespace
,
1820 artifact_path
=artifact_path
,
1824 elif vca_type
== "helm" or vca_type
== "helm-v3":
1825 ee_id
, credentials
= await self
.vca_map
[
1827 ].create_execution_environment(
1828 namespace
=namespace
,
1832 artifact_path
=artifact_path
,
1836 ee_id
, credentials
= await self
.vca_map
[
1838 ].create_execution_environment(
1839 namespace
=namespace
,
1845 elif vca_type
== "native_charm":
1846 step
= "Waiting to VM being up and getting IP address"
1847 self
.logger
.debug(logging_text
+ step
)
1848 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1857 credentials
= {"hostname": rw_mgmt_ip
}
1859 username
= deep_get(
1860 config_descriptor
, ("config-access", "ssh-access", "default-user")
1862 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1863 # merged. Meanwhile let's get username from initial-config-primitive
1864 if not username
and initial_config_primitive_list
:
1865 for config_primitive
in initial_config_primitive_list
:
1866 for param
in config_primitive
.get("parameter", ()):
1867 if param
["name"] == "ssh-username":
1868 username
= param
["value"]
1872 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1873 "'config-access.ssh-access.default-user'"
1875 credentials
["username"] = username
1876 # n2vc_redesign STEP 3.2
1878 self
._write
_configuration
_status
(
1880 vca_index
=vca_index
,
1881 status
="REGISTERING",
1882 element_under_configuration
=element_under_configuration
,
1883 element_type
=element_type
,
1886 step
= "register execution environment {}".format(credentials
)
1887 self
.logger
.debug(logging_text
+ step
)
1888 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1889 credentials
=credentials
,
1890 namespace
=namespace
,
1895 # for compatibility with MON/POL modules, the need model and application name at database
1896 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1897 ee_id_parts
= ee_id
.split(".")
1898 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1899 if len(ee_id_parts
) >= 2:
1900 model_name
= ee_id_parts
[0]
1901 application_name
= ee_id_parts
[1]
1902 db_nsr_update
[db_update_entry
+ "model"] = model_name
1903 db_nsr_update
[db_update_entry
+ "application"] = application_name
1905 # n2vc_redesign STEP 3.3
1906 step
= "Install configuration Software"
1908 self
._write
_configuration
_status
(
1910 vca_index
=vca_index
,
1911 status
="INSTALLING SW",
1912 element_under_configuration
=element_under_configuration
,
1913 element_type
=element_type
,
1914 other_update
=db_nsr_update
,
1917 # TODO check if already done
1918 self
.logger
.debug(logging_text
+ step
)
1920 if vca_type
== "native_charm":
1921 config_primitive
= next(
1922 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1925 if config_primitive
:
1926 config
= self
._map
_primitive
_params
(
1927 config_primitive
, {}, deploy_params
1930 if vca_type
== "lxc_proxy_charm":
1931 if element_type
== "NS":
1932 num_units
= db_nsr
.get("config-units") or 1
1933 elif element_type
== "VNF":
1934 num_units
= db_vnfr
.get("config-units") or 1
1935 elif element_type
== "VDU":
1936 for v
in db_vnfr
["vdur"]:
1937 if vdu_id
== v
["vdu-id-ref"]:
1938 num_units
= v
.get("config-units") or 1
1940 if vca_type
!= "k8s_proxy_charm":
1941 await self
.vca_map
[vca_type
].install_configuration_sw(
1943 artifact_path
=artifact_path
,
1946 num_units
=num_units
,
1951 # write in db flag of configuration_sw already installed
1953 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
1956 # add relations for this VCA (wait for other peers related with this VCA)
1957 await self
._add
_vca
_relations
(
1958 logging_text
=logging_text
,
1961 vca_index
=vca_index
,
1964 # if SSH access is required, then get execution environment SSH public
1965 # if native charm we have waited already to VM be UP
1966 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1969 # self.logger.debug("get ssh key block")
1971 config_descriptor
, ("config-access", "ssh-access", "required")
1973 # self.logger.debug("ssh key needed")
1974 # Needed to inject a ssh key
1977 ("config-access", "ssh-access", "default-user"),
1979 step
= "Install configuration Software, getting public ssh key"
1980 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
1981 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
1984 step
= "Insert public key into VM user={} ssh_key={}".format(
1988 # self.logger.debug("no need to get ssh key")
1989 step
= "Waiting to VM being up and getting IP address"
1990 self
.logger
.debug(logging_text
+ step
)
1992 # n2vc_redesign STEP 5.1
1993 # wait for RO (ip-address) Insert pub_key into VM
1996 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
1997 logging_text
, nsr_id
, vnfr_id
, kdu_name
1999 vnfd
= self
.db
.get_one(
2001 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2003 kdu
= get_kdu(vnfd
, kdu_name
)
2005 service
["name"] for service
in get_kdu_services(kdu
)
2007 exposed_services
= []
2008 for service
in services
:
2009 if any(s
in service
["name"] for s
in kdu_services
):
2010 exposed_services
.append(service
)
2011 await self
.vca_map
[vca_type
].exec_primitive(
2013 primitive_name
="config",
2015 "osm-config": json
.dumps(
2017 k8s
={"services": exposed_services
}
2024 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2035 rw_mgmt_ip
= None # This is for a NS configuration
2037 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2039 # store rw_mgmt_ip in deploy params for later replacement
2040 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2042 # n2vc_redesign STEP 6 Execute initial config primitive
2043 step
= "execute initial config primitive"
2045 # wait for dependent primitives execution (NS -> VNF -> VDU)
2046 if initial_config_primitive_list
:
2047 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2049 # stage, in function of element type: vdu, kdu, vnf or ns
2050 my_vca
= vca_deployed_list
[vca_index
]
2051 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2053 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2054 elif my_vca
.get("member-vnf-index"):
2056 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2059 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2061 self
._write
_configuration
_status
(
2062 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2065 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2067 check_if_terminated_needed
= True
2068 for initial_config_primitive
in initial_config_primitive_list
:
2069 # adding information on the vca_deployed if it is a NS execution environment
2070 if not vca_deployed
["member-vnf-index"]:
2071 deploy_params
["ns_config_info"] = json
.dumps(
2072 self
._get
_ns
_config
_info
(nsr_id
)
2074 # TODO check if already done
2075 primitive_params_
= self
._map
_primitive
_params
(
2076 initial_config_primitive
, {}, deploy_params
2079 step
= "execute primitive '{}' params '{}'".format(
2080 initial_config_primitive
["name"], primitive_params_
2082 self
.logger
.debug(logging_text
+ step
)
2083 await self
.vca_map
[vca_type
].exec_primitive(
2085 primitive_name
=initial_config_primitive
["name"],
2086 params_dict
=primitive_params_
,
2091 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2092 if check_if_terminated_needed
:
2093 if config_descriptor
.get("terminate-config-primitive"):
2095 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2097 check_if_terminated_needed
= False
2099 # TODO register in database that primitive is done
2101 # STEP 7 Configure metrics
2102 if vca_type
== "helm" or vca_type
== "helm-v3":
2103 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2105 artifact_path
=artifact_path
,
2106 ee_config_descriptor
=ee_config_descriptor
,
2109 target_ip
=rw_mgmt_ip
,
2115 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2118 for job
in prometheus_jobs
:
2121 {"job_name": job
["job_name"]},
2124 fail_on_empty
=False,
2127 step
= "instantiated at VCA"
2128 self
.logger
.debug(logging_text
+ step
)
2130 self
._write
_configuration
_status
(
2131 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2134 except Exception as e
: # TODO not use Exception but N2VC exception
2135 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2137 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2140 "Exception while {} : {}".format(step
, e
), exc_info
=True
2142 self
._write
_configuration
_status
(
2143 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2145 raise LcmException("{} {}".format(step
, e
)) from e
2147 def _write_ns_status(
2151 current_operation
: str,
2152 current_operation_id
: str,
2153 error_description
: str = None,
2154 error_detail
: str = None,
2155 other_update
: dict = None,
2158 Update db_nsr fields.
2161 :param current_operation:
2162 :param current_operation_id:
2163 :param error_description:
2164 :param error_detail:
2165 :param other_update: Other required changes at database if provided, will be cleared
2169 db_dict
= other_update
or {}
2172 ] = current_operation_id
# for backward compatibility
2173 db_dict
["_admin.current-operation"] = current_operation_id
2174 db_dict
["_admin.operation-type"] = (
2175 current_operation
if current_operation
!= "IDLE" else None
2177 db_dict
["currentOperation"] = current_operation
2178 db_dict
["currentOperationID"] = current_operation_id
2179 db_dict
["errorDescription"] = error_description
2180 db_dict
["errorDetail"] = error_detail
2183 db_dict
["nsState"] = ns_state
2184 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2185 except DbException
as e
:
2186 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2188 def _write_op_status(
2192 error_message
: str = None,
2193 queuePosition
: int = 0,
2194 operation_state
: str = None,
2195 other_update
: dict = None,
2198 db_dict
= other_update
or {}
2199 db_dict
["queuePosition"] = queuePosition
2200 if isinstance(stage
, list):
2201 db_dict
["stage"] = stage
[0]
2202 db_dict
["detailed-status"] = " ".join(stage
)
2203 elif stage
is not None:
2204 db_dict
["stage"] = str(stage
)
2206 if error_message
is not None:
2207 db_dict
["errorMessage"] = error_message
2208 if operation_state
is not None:
2209 db_dict
["operationState"] = operation_state
2210 db_dict
["statusEnteredTime"] = time()
2211 self
.update_db_2("nslcmops", op_id
, db_dict
)
2212 except DbException
as e
:
2214 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2217 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2219 nsr_id
= db_nsr
["_id"]
2220 # configurationStatus
2221 config_status
= db_nsr
.get("configurationStatus")
2224 "configurationStatus.{}.status".format(index
): status
2225 for index
, v
in enumerate(config_status
)
2229 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2231 except DbException
as e
:
2233 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2236 def _write_configuration_status(
2241 element_under_configuration
: str = None,
2242 element_type
: str = None,
2243 other_update
: dict = None,
2246 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2247 # .format(vca_index, status))
2250 db_path
= "configurationStatus.{}.".format(vca_index
)
2251 db_dict
= other_update
or {}
2253 db_dict
[db_path
+ "status"] = status
2254 if element_under_configuration
:
2256 db_path
+ "elementUnderConfiguration"
2257 ] = element_under_configuration
2259 db_dict
[db_path
+ "elementType"] = element_type
2260 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2261 except DbException
as e
:
2263 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2264 status
, nsr_id
, vca_index
, e
2268 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2270 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2271 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2272 Database is used because the result can be obtained from a different LCM worker in case of HA.
2273 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2274 :param db_nslcmop: database content of nslcmop
2275 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2276 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2277 computed 'vim-account-id'
2280 nslcmop_id
= db_nslcmop
["_id"]
2281 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2282 if placement_engine
== "PLA":
2284 logging_text
+ "Invoke and wait for placement optimization"
2286 await self
.msg
.aiowrite(
2287 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2289 db_poll_interval
= 5
2290 wait
= db_poll_interval
* 10
2292 while not pla_result
and wait
>= 0:
2293 await asyncio
.sleep(db_poll_interval
)
2294 wait
-= db_poll_interval
2295 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2296 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2300 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2303 for pla_vnf
in pla_result
["vnf"]:
2304 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2305 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2310 {"_id": vnfr
["_id"]},
2311 {"vim-account-id": pla_vnf
["vimAccountId"]},
2314 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2317 def update_nsrs_with_pla_result(self
, params
):
2319 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2321 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2323 except Exception as e
:
2324 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2326 async def instantiate(self
, nsr_id
, nslcmop_id
):
2329 :param nsr_id: ns instance to deploy
2330 :param nslcmop_id: operation to run
2334 # Try to lock HA task here
2335 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2336 if not task_is_locked_by_me
:
2338 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2342 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2343 self
.logger
.debug(logging_text
+ "Enter")
2345 # get all needed from database
2347 # database nsrs record
2350 # database nslcmops record
2353 # update operation on nsrs
2355 # update operation on nslcmops
2356 db_nslcmop_update
= {}
2358 nslcmop_operation_state
= None
2359 db_vnfrs
= {} # vnf's info indexed by member-index
2361 tasks_dict_info
= {} # from task to info text
2365 "Stage 1/5: preparation of the environment.",
2366 "Waiting for previous operations to terminate.",
2369 # ^ stage, step, VIM progress
2371 # wait for any previous tasks in process
2372 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2374 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2375 stage
[1] = "Reading from database."
2376 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2377 db_nsr_update
["detailed-status"] = "creating"
2378 db_nsr_update
["operational-status"] = "init"
2379 self
._write
_ns
_status
(
2381 ns_state
="BUILDING",
2382 current_operation
="INSTANTIATING",
2383 current_operation_id
=nslcmop_id
,
2384 other_update
=db_nsr_update
,
2386 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2388 # read from db: operation
2389 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2390 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2391 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2392 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2393 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2395 ns_params
= db_nslcmop
.get("operationParams")
2396 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2397 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2399 timeout_ns_deploy
= self
.timeout
.get(
2400 "ns_deploy", self
.timeout_ns_deploy
2404 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2405 self
.logger
.debug(logging_text
+ stage
[1])
2406 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2407 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2408 self
.logger
.debug(logging_text
+ stage
[1])
2409 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2410 self
.fs
.sync(db_nsr
["nsd-id"])
2412 # nsr_name = db_nsr["name"] # TODO short-name??
2414 # read from db: vnf's of this ns
2415 stage
[1] = "Getting vnfrs from db."
2416 self
.logger
.debug(logging_text
+ stage
[1])
2417 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2419 # read from db: vnfd's for every vnf
2420 db_vnfds
= [] # every vnfd data
2422 # for each vnf in ns, read vnfd
2423 for vnfr
in db_vnfrs_list
:
2424 if vnfr
.get("kdur"):
2426 for kdur
in vnfr
["kdur"]:
2427 if kdur
.get("additionalParams"):
2428 kdur
["additionalParams"] = json
.loads(
2429 kdur
["additionalParams"]
2431 kdur_list
.append(kdur
)
2432 vnfr
["kdur"] = kdur_list
2434 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2435 vnfd_id
= vnfr
["vnfd-id"]
2436 vnfd_ref
= vnfr
["vnfd-ref"]
2437 self
.fs
.sync(vnfd_id
)
2439 # if we haven't this vnfd, read it from db
2440 if vnfd_id
not in db_vnfds
:
2442 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2445 self
.logger
.debug(logging_text
+ stage
[1])
2446 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2449 db_vnfds
.append(vnfd
)
2451 # Get or generates the _admin.deployed.VCA list
2452 vca_deployed_list
= None
2453 if db_nsr
["_admin"].get("deployed"):
2454 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2455 if vca_deployed_list
is None:
2456 vca_deployed_list
= []
2457 configuration_status_list
= []
2458 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2459 db_nsr_update
["configurationStatus"] = configuration_status_list
2460 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2461 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2462 elif isinstance(vca_deployed_list
, dict):
2463 # maintain backward compatibility. Change a dict to list at database
2464 vca_deployed_list
= list(vca_deployed_list
.values())
2465 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2466 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2469 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2471 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2472 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2474 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2475 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2476 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2478 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2481 # n2vc_redesign STEP 2 Deploy Network Scenario
2482 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2483 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2485 stage
[1] = "Deploying KDUs."
2486 # self.logger.debug(logging_text + "Before deploy_kdus")
2487 # Call to deploy_kdus in case exists the "vdu:kdu" param
2488 await self
.deploy_kdus(
2489 logging_text
=logging_text
,
2491 nslcmop_id
=nslcmop_id
,
2494 task_instantiation_info
=tasks_dict_info
,
2497 stage
[1] = "Getting VCA public key."
2498 # n2vc_redesign STEP 1 Get VCA public ssh-key
2499 # feature 1429. Add n2vc public key to needed VMs
2500 n2vc_key
= self
.n2vc
.get_public_key()
2501 n2vc_key_list
= [n2vc_key
]
2502 if self
.vca_config
.get("public_key"):
2503 n2vc_key_list
.append(self
.vca_config
["public_key"])
2505 stage
[1] = "Deploying NS at VIM."
2506 task_ro
= asyncio
.ensure_future(
2507 self
.instantiate_RO(
2508 logging_text
=logging_text
,
2512 db_nslcmop
=db_nslcmop
,
2515 n2vc_key_list
=n2vc_key_list
,
2519 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2520 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2522 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2523 stage
[1] = "Deploying Execution Environments."
2524 self
.logger
.debug(logging_text
+ stage
[1])
2526 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2527 for vnf_profile
in get_vnf_profiles(nsd
):
2528 vnfd_id
= vnf_profile
["vnfd-id"]
2529 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2530 member_vnf_index
= str(vnf_profile
["id"])
2531 db_vnfr
= db_vnfrs
[member_vnf_index
]
2532 base_folder
= vnfd
["_admin"]["storage"]
2538 # Get additional parameters
2539 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2540 if db_vnfr
.get("additionalParamsForVnf"):
2541 deploy_params
.update(
2542 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2545 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2546 if descriptor_config
:
2548 logging_text
=logging_text
2549 + "member_vnf_index={} ".format(member_vnf_index
),
2552 nslcmop_id
=nslcmop_id
,
2558 member_vnf_index
=member_vnf_index
,
2559 vdu_index
=vdu_index
,
2561 deploy_params
=deploy_params
,
2562 descriptor_config
=descriptor_config
,
2563 base_folder
=base_folder
,
2564 task_instantiation_info
=tasks_dict_info
,
2568 # Deploy charms for each VDU that supports one.
2569 for vdud
in get_vdu_list(vnfd
):
2571 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2572 vdur
= find_in_list(
2573 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2576 if vdur
.get("additionalParams"):
2577 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2579 deploy_params_vdu
= deploy_params
2580 deploy_params_vdu
["OSM"] = get_osm_params(
2581 db_vnfr
, vdu_id
, vdu_count_index
=0
2583 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2585 self
.logger
.debug("VDUD > {}".format(vdud
))
2587 "Descriptor config > {}".format(descriptor_config
)
2589 if descriptor_config
:
2592 for vdu_index
in range(vdud_count
):
2593 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2595 logging_text
=logging_text
2596 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2597 member_vnf_index
, vdu_id
, vdu_index
2601 nslcmop_id
=nslcmop_id
,
2607 member_vnf_index
=member_vnf_index
,
2608 vdu_index
=vdu_index
,
2610 deploy_params
=deploy_params_vdu
,
2611 descriptor_config
=descriptor_config
,
2612 base_folder
=base_folder
,
2613 task_instantiation_info
=tasks_dict_info
,
2616 for kdud
in get_kdu_list(vnfd
):
2617 kdu_name
= kdud
["name"]
2618 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2619 if descriptor_config
:
2624 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2626 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2627 if kdur
.get("additionalParams"):
2628 deploy_params_kdu
.update(
2629 parse_yaml_strings(kdur
["additionalParams"].copy())
2633 logging_text
=logging_text
,
2636 nslcmop_id
=nslcmop_id
,
2642 member_vnf_index
=member_vnf_index
,
2643 vdu_index
=vdu_index
,
2645 deploy_params
=deploy_params_kdu
,
2646 descriptor_config
=descriptor_config
,
2647 base_folder
=base_folder
,
2648 task_instantiation_info
=tasks_dict_info
,
2652 # Check if this NS has a charm configuration
2653 descriptor_config
= nsd
.get("ns-configuration")
2654 if descriptor_config
and descriptor_config
.get("juju"):
2657 member_vnf_index
= None
2663 # Get additional parameters
2664 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2665 if db_nsr
.get("additionalParamsForNs"):
2666 deploy_params
.update(
2667 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2669 base_folder
= nsd
["_admin"]["storage"]
2671 logging_text
=logging_text
,
2674 nslcmop_id
=nslcmop_id
,
2680 member_vnf_index
=member_vnf_index
,
2681 vdu_index
=vdu_index
,
2683 deploy_params
=deploy_params
,
2684 descriptor_config
=descriptor_config
,
2685 base_folder
=base_folder
,
2686 task_instantiation_info
=tasks_dict_info
,
2690 # rest of staff will be done at finally
2693 ROclient
.ROClientException
,
2699 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2702 except asyncio
.CancelledError
:
2704 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2706 exc
= "Operation was cancelled"
2707 except Exception as e
:
2708 exc
= traceback
.format_exc()
2709 self
.logger
.critical(
2710 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2715 error_list
.append(str(exc
))
2717 # wait for pending tasks
2719 stage
[1] = "Waiting for instantiate pending tasks."
2720 self
.logger
.debug(logging_text
+ stage
[1])
2721 error_list
+= await self
._wait
_for
_tasks
(
2729 stage
[1] = stage
[2] = ""
2730 except asyncio
.CancelledError
:
2731 error_list
.append("Cancelled")
2732 # TODO cancel all tasks
2733 except Exception as exc
:
2734 error_list
.append(str(exc
))
2736 # update operation-status
2737 db_nsr_update
["operational-status"] = "running"
2738 # let's begin with VCA 'configured' status (later we can change it)
2739 db_nsr_update
["config-status"] = "configured"
2740 for task
, task_name
in tasks_dict_info
.items():
2741 if not task
.done() or task
.cancelled() or task
.exception():
2742 if task_name
.startswith(self
.task_name_deploy_vca
):
2743 # A N2VC task is pending
2744 db_nsr_update
["config-status"] = "failed"
2746 # RO or KDU task is pending
2747 db_nsr_update
["operational-status"] = "failed"
2749 # update status at database
2751 error_detail
= ". ".join(error_list
)
2752 self
.logger
.error(logging_text
+ error_detail
)
2753 error_description_nslcmop
= "{} Detail: {}".format(
2754 stage
[0], error_detail
2756 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2757 nslcmop_id
, stage
[0]
2760 db_nsr_update
["detailed-status"] = (
2761 error_description_nsr
+ " Detail: " + error_detail
2763 db_nslcmop_update
["detailed-status"] = error_detail
2764 nslcmop_operation_state
= "FAILED"
2768 error_description_nsr
= error_description_nslcmop
= None
2770 db_nsr_update
["detailed-status"] = "Done"
2771 db_nslcmop_update
["detailed-status"] = "Done"
2772 nslcmop_operation_state
= "COMPLETED"
2775 self
._write
_ns
_status
(
2778 current_operation
="IDLE",
2779 current_operation_id
=None,
2780 error_description
=error_description_nsr
,
2781 error_detail
=error_detail
,
2782 other_update
=db_nsr_update
,
2784 self
._write
_op
_status
(
2787 error_message
=error_description_nslcmop
,
2788 operation_state
=nslcmop_operation_state
,
2789 other_update
=db_nslcmop_update
,
2792 if nslcmop_operation_state
:
2794 await self
.msg
.aiowrite(
2799 "nslcmop_id": nslcmop_id
,
2800 "operationState": nslcmop_operation_state
,
2804 except Exception as e
:
2806 logging_text
+ "kafka_write notification Exception {}".format(e
)
2809 self
.logger
.debug(logging_text
+ "Exit")
2810 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2812 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2813 if vnfd_id
not in cached_vnfds
:
2814 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2815 return cached_vnfds
[vnfd_id
]
2817 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2818 if vnf_profile_id
not in cached_vnfrs
:
2819 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2822 "member-vnf-index-ref": vnf_profile_id
,
2823 "nsr-id-ref": nsr_id
,
2826 return cached_vnfrs
[vnf_profile_id
]
2828 def _is_deployed_vca_in_relation(
2829 self
, vca
: DeployedVCA
, relation
: Relation
2832 for endpoint
in (relation
.provider
, relation
.requirer
):
2833 if endpoint
["kdu-resource-profile-id"]:
2836 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2837 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2838 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2844 def _update_ee_relation_data_with_implicit_data(
2845 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2847 ee_relation_data
= safe_get_ee_relation(
2848 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2850 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2851 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2852 "execution-environment-ref"
2854 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2855 vnfd_id
= vnf_profile
["vnfd-id"]
2856 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2859 if ee_relation_level
== EELevel
.VNF
2860 else ee_relation_data
["vdu-profile-id"]
2862 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2865 f
"not execution environments found for ee_relation {ee_relation_data}"
2867 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2868 return ee_relation_data
2870 def _get_ns_relations(
2873 nsd
: Dict
[str, Any
],
2875 cached_vnfds
: Dict
[str, Any
],
2876 ) -> List
[Relation
]:
2878 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2879 for r
in db_ns_relations
:
2880 provider_dict
= None
2881 requirer_dict
= None
2882 if all(key
in r
for key
in ("provider", "requirer")):
2883 provider_dict
= r
["provider"]
2884 requirer_dict
= r
["requirer"]
2885 elif "entities" in r
:
2886 provider_id
= r
["entities"][0]["id"]
2889 "endpoint": r
["entities"][0]["endpoint"],
2891 if provider_id
!= nsd
["id"]:
2892 provider_dict
["vnf-profile-id"] = provider_id
2893 requirer_id
= r
["entities"][1]["id"]
2896 "endpoint": r
["entities"][1]["endpoint"],
2898 if requirer_id
!= nsd
["id"]:
2899 requirer_dict
["vnf-profile-id"] = requirer_id
2902 "provider/requirer or entities must be included in the relation."
2904 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2905 nsr_id
, nsd
, provider_dict
, cached_vnfds
2907 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2908 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2910 provider
= EERelation(relation_provider
)
2911 requirer
= EERelation(relation_requirer
)
2912 relation
= Relation(r
["name"], provider
, requirer
)
2913 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2915 relations
.append(relation
)
2918 def _get_vnf_relations(
2921 nsd
: Dict
[str, Any
],
2923 cached_vnfds
: Dict
[str, Any
],
2924 ) -> List
[Relation
]:
2926 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2927 vnf_profile_id
= vnf_profile
["id"]
2928 vnfd_id
= vnf_profile
["vnfd-id"]
2929 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2930 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
2931 for r
in db_vnf_relations
:
2932 provider_dict
= None
2933 requirer_dict
= None
2934 if all(key
in r
for key
in ("provider", "requirer")):
2935 provider_dict
= r
["provider"]
2936 requirer_dict
= r
["requirer"]
2937 elif "entities" in r
:
2938 provider_id
= r
["entities"][0]["id"]
2941 "vnf-profile-id": vnf_profile_id
,
2942 "endpoint": r
["entities"][0]["endpoint"],
2944 if provider_id
!= vnfd_id
:
2945 provider_dict
["vdu-profile-id"] = provider_id
2946 requirer_id
= r
["entities"][1]["id"]
2949 "vnf-profile-id": vnf_profile_id
,
2950 "endpoint": r
["entities"][1]["endpoint"],
2952 if requirer_id
!= vnfd_id
:
2953 requirer_dict
["vdu-profile-id"] = requirer_id
2956 "provider/requirer or entities must be included in the relation."
2958 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2959 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2961 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2962 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
2964 provider
= EERelation(relation_provider
)
2965 requirer
= EERelation(relation_requirer
)
2966 relation
= Relation(r
["name"], provider
, requirer
)
2967 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2969 relations
.append(relation
)
2972 def _get_kdu_resource_data(
2974 ee_relation
: EERelation
,
2975 db_nsr
: Dict
[str, Any
],
2976 cached_vnfds
: Dict
[str, Any
],
2977 ) -> DeployedK8sResource
:
2978 nsd
= get_nsd(db_nsr
)
2979 vnf_profiles
= get_vnf_profiles(nsd
)
2980 vnfd_id
= find_in_list(
2982 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
2984 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2985 kdu_resource_profile
= get_kdu_resource_profile(
2986 db_vnfd
, ee_relation
.kdu_resource_profile_id
2988 kdu_name
= kdu_resource_profile
["kdu-name"]
2989 deployed_kdu
, _
= get_deployed_kdu(
2990 db_nsr
.get("_admin", ()).get("deployed", ()),
2992 ee_relation
.vnf_profile_id
,
2994 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
2997 def _get_deployed_component(
2999 ee_relation
: EERelation
,
3000 db_nsr
: Dict
[str, Any
],
3001 cached_vnfds
: Dict
[str, Any
],
3002 ) -> DeployedComponent
:
3003 nsr_id
= db_nsr
["_id"]
3004 deployed_component
= None
3005 ee_level
= EELevel
.get_level(ee_relation
)
3006 if ee_level
== EELevel
.NS
:
3007 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3009 deployed_component
= DeployedVCA(nsr_id
, vca
)
3010 elif ee_level
== EELevel
.VNF
:
3011 vca
= get_deployed_vca(
3015 "member-vnf-index": ee_relation
.vnf_profile_id
,
3016 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3020 deployed_component
= DeployedVCA(nsr_id
, vca
)
3021 elif ee_level
== EELevel
.VDU
:
3022 vca
= get_deployed_vca(
3025 "vdu_id": ee_relation
.vdu_profile_id
,
3026 "member-vnf-index": ee_relation
.vnf_profile_id
,
3027 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3031 deployed_component
= DeployedVCA(nsr_id
, vca
)
3032 elif ee_level
== EELevel
.KDU
:
3033 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3034 ee_relation
, db_nsr
, cached_vnfds
3036 if kdu_resource_data
:
3037 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3038 return deployed_component
3040 async def _add_relation(
3044 db_nsr
: Dict
[str, Any
],
3045 cached_vnfds
: Dict
[str, Any
],
3046 cached_vnfrs
: Dict
[str, Any
],
3048 deployed_provider
= self
._get
_deployed
_component
(
3049 relation
.provider
, db_nsr
, cached_vnfds
3051 deployed_requirer
= self
._get
_deployed
_component
(
3052 relation
.requirer
, db_nsr
, cached_vnfds
3056 and deployed_requirer
3057 and deployed_provider
.config_sw_installed
3058 and deployed_requirer
.config_sw_installed
3060 provider_db_vnfr
= (
3062 relation
.provider
.nsr_id
,
3063 relation
.provider
.vnf_profile_id
,
3066 if relation
.provider
.vnf_profile_id
3069 requirer_db_vnfr
= (
3071 relation
.requirer
.nsr_id
,
3072 relation
.requirer
.vnf_profile_id
,
3075 if relation
.requirer
.vnf_profile_id
3078 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3079 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3080 provider_relation_endpoint
= RelationEndpoint(
3081 deployed_provider
.ee_id
,
3083 relation
.provider
.endpoint
,
3085 requirer_relation_endpoint
= RelationEndpoint(
3086 deployed_requirer
.ee_id
,
3088 relation
.requirer
.endpoint
,
3090 await self
.vca_map
[vca_type
].add_relation(
3091 provider
=provider_relation_endpoint
,
3092 requirer
=requirer_relation_endpoint
,
3094 # remove entry from relations list
3098 async def _add_vca_relations(
3104 timeout
: int = 3600,
3108 # 1. find all relations for this VCA
3109 # 2. wait for other peers related
3113 # STEP 1: find all relations for this VCA
3116 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3117 nsd
= get_nsd(db_nsr
)
3120 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3121 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3126 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3127 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3129 # if no relations, terminate
3131 self
.logger
.debug(logging_text
+ " No relations")
3134 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3141 if now
- start
>= timeout
:
3142 self
.logger
.error(logging_text
+ " : timeout adding relations")
3145 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3146 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3148 # for each relation, find the VCA's related
3149 for relation
in relations
.copy():
3150 added
= await self
._add
_relation
(
3158 relations
.remove(relation
)
3161 self
.logger
.debug("Relations added")
3163 await asyncio
.sleep(5.0)
3167 except Exception as e
:
3168 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3171 async def _install_kdu(
3179 k8s_instance_info
: dict,
3180 k8params
: dict = None,
3186 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3189 "collection": "nsrs",
3190 "filter": {"_id": nsr_id
},
3191 "path": nsr_db_path
,
3194 if k8s_instance_info
.get("kdu-deployment-name"):
3195 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3197 kdu_instance
= self
.k8scluster_map
[
3199 ].generate_kdu_instance_name(
3200 db_dict
=db_dict_install
,
3201 kdu_model
=k8s_instance_info
["kdu-model"],
3202 kdu_name
=k8s_instance_info
["kdu-name"],
3205 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3207 await self
.k8scluster_map
[k8sclustertype
].install(
3208 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3209 kdu_model
=k8s_instance_info
["kdu-model"],
3212 db_dict
=db_dict_install
,
3214 kdu_name
=k8s_instance_info
["kdu-name"],
3215 namespace
=k8s_instance_info
["namespace"],
3216 kdu_instance
=kdu_instance
,
3220 "nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
}
3223 # Obtain services to obtain management service ip
3224 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3225 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3226 kdu_instance
=kdu_instance
,
3227 namespace
=k8s_instance_info
["namespace"],
3230 # Obtain management service info (if exists)
3231 vnfr_update_dict
= {}
3232 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3234 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3239 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3242 for service
in kdud
.get("service", [])
3243 if service
.get("mgmt-service")
3245 for mgmt_service
in mgmt_services
:
3246 for service
in services
:
3247 if service
["name"].startswith(mgmt_service
["name"]):
3248 # Mgmt service found, Obtain service ip
3249 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3250 if isinstance(ip
, list) and len(ip
) == 1:
3254 "kdur.{}.ip-address".format(kdu_index
)
3257 # Check if must update also mgmt ip at the vnf
3258 service_external_cp
= mgmt_service
.get(
3259 "external-connection-point-ref"
3261 if service_external_cp
:
3263 deep_get(vnfd
, ("mgmt-interface", "cp"))
3264 == service_external_cp
3266 vnfr_update_dict
["ip-address"] = ip
3271 "external-connection-point-ref", ""
3273 == service_external_cp
,
3276 "kdur.{}.ip-address".format(kdu_index
)
3281 "Mgmt service name: {} not found".format(
3282 mgmt_service
["name"]
3286 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3287 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3289 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3292 and kdu_config
.get("initial-config-primitive")
3293 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3295 initial_config_primitive_list
= kdu_config
.get(
3296 "initial-config-primitive"
3298 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3300 for initial_config_primitive
in initial_config_primitive_list
:
3301 primitive_params_
= self
._map
_primitive
_params
(
3302 initial_config_primitive
, {}, {}
3305 await asyncio
.wait_for(
3306 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3307 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3308 kdu_instance
=kdu_instance
,
3309 primitive_name
=initial_config_primitive
["name"],
3310 params
=primitive_params_
,
3311 db_dict
=db_dict_install
,
3317 except Exception as e
:
3318 # Prepare update db with error and raise exception
3321 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3325 vnfr_data
.get("_id"),
3326 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3329 # ignore to keep original exception
3331 # reraise original error
3336 async def deploy_kdus(
3343 task_instantiation_info
,
3345 # Launch kdus if present in the descriptor
3347 k8scluster_id_2_uuic
= {
3348 "helm-chart-v3": {},
3353 async def _get_cluster_id(cluster_id
, cluster_type
):
3354 nonlocal k8scluster_id_2_uuic
3355 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3356 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3358 # check if K8scluster is creating and wait look if previous tasks in process
3359 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3360 "k8scluster", cluster_id
3363 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3364 task_name
, cluster_id
3366 self
.logger
.debug(logging_text
+ text
)
3367 await asyncio
.wait(task_dependency
, timeout
=3600)
3369 db_k8scluster
= self
.db
.get_one(
3370 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3372 if not db_k8scluster
:
3373 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3375 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3377 if cluster_type
== "helm-chart-v3":
3379 # backward compatibility for existing clusters that have not been initialized for helm v3
3380 k8s_credentials
= yaml
.safe_dump(
3381 db_k8scluster
.get("credentials")
3383 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3384 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3386 db_k8scluster_update
= {}
3387 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3388 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3389 db_k8scluster_update
[
3390 "_admin.helm-chart-v3.created"
3392 db_k8scluster_update
[
3393 "_admin.helm-chart-v3.operationalState"
3396 "k8sclusters", cluster_id
, db_k8scluster_update
3398 except Exception as e
:
3401 + "error initializing helm-v3 cluster: {}".format(str(e
))
3404 "K8s cluster '{}' has not been initialized for '{}'".format(
3405 cluster_id
, cluster_type
3410 "K8s cluster '{}' has not been initialized for '{}'".format(
3411 cluster_id
, cluster_type
3414 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3417 logging_text
+= "Deploy kdus: "
3420 db_nsr_update
= {"_admin.deployed.K8s": []}
3421 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3424 updated_cluster_list
= []
3425 updated_v3_cluster_list
= []
3427 for vnfr_data
in db_vnfrs
.values():
3428 vca_id
= self
.get_vca_id(vnfr_data
, {})
3429 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3430 # Step 0: Prepare and set parameters
3431 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3432 vnfd_id
= vnfr_data
.get("vnfd-id")
3433 vnfd_with_id
= find_in_list(
3434 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3438 for kdud
in vnfd_with_id
["kdu"]
3439 if kdud
["name"] == kdur
["kdu-name"]
3441 namespace
= kdur
.get("k8s-namespace")
3442 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3443 if kdur
.get("helm-chart"):
3444 kdumodel
= kdur
["helm-chart"]
3445 # Default version: helm3, if helm-version is v2 assign v2
3446 k8sclustertype
= "helm-chart-v3"
3447 self
.logger
.debug("kdur: {}".format(kdur
))
3449 kdur
.get("helm-version")
3450 and kdur
.get("helm-version") == "v2"
3452 k8sclustertype
= "helm-chart"
3453 elif kdur
.get("juju-bundle"):
3454 kdumodel
= kdur
["juju-bundle"]
3455 k8sclustertype
= "juju-bundle"
3458 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3459 "juju-bundle. Maybe an old NBI version is running".format(
3460 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3463 # check if kdumodel is a file and exists
3465 vnfd_with_id
= find_in_list(
3466 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3468 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3469 if storage
: # may be not present if vnfd has not artifacts
3470 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3471 if storage
["pkg-dir"]:
3472 filename
= "{}/{}/{}s/{}".format(
3479 filename
= "{}/Scripts/{}s/{}".format(
3484 if self
.fs
.file_exists(
3485 filename
, mode
="file"
3486 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3487 kdumodel
= self
.fs
.path
+ filename
3488 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3490 except Exception: # it is not a file
3493 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3494 step
= "Synchronize repos for k8s cluster '{}'".format(
3497 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3501 k8sclustertype
== "helm-chart"
3502 and cluster_uuid
not in updated_cluster_list
3504 k8sclustertype
== "helm-chart-v3"
3505 and cluster_uuid
not in updated_v3_cluster_list
3507 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3508 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3509 cluster_uuid
=cluster_uuid
3512 if del_repo_list
or added_repo_dict
:
3513 if k8sclustertype
== "helm-chart":
3515 "_admin.helm_charts_added." + item
: None
3516 for item
in del_repo_list
3519 "_admin.helm_charts_added." + item
: name
3520 for item
, name
in added_repo_dict
.items()
3522 updated_cluster_list
.append(cluster_uuid
)
3523 elif k8sclustertype
== "helm-chart-v3":
3525 "_admin.helm_charts_v3_added." + item
: None
3526 for item
in del_repo_list
3529 "_admin.helm_charts_v3_added." + item
: name
3530 for item
, name
in added_repo_dict
.items()
3532 updated_v3_cluster_list
.append(cluster_uuid
)
3534 logging_text
+ "repos synchronized on k8s cluster "
3535 "'{}' to_delete: {}, to_add: {}".format(
3536 k8s_cluster_id
, del_repo_list
, added_repo_dict
3541 {"_id": k8s_cluster_id
},
3547 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3548 vnfr_data
["member-vnf-index-ref"],
3552 k8s_instance_info
= {
3553 "kdu-instance": None,
3554 "k8scluster-uuid": cluster_uuid
,
3555 "k8scluster-type": k8sclustertype
,
3556 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3557 "kdu-name": kdur
["kdu-name"],
3558 "kdu-model": kdumodel
,
3559 "namespace": namespace
,
3560 "kdu-deployment-name": kdu_deployment_name
,
3562 db_path
= "_admin.deployed.K8s.{}".format(index
)
3563 db_nsr_update
[db_path
] = k8s_instance_info
3564 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3565 vnfd_with_id
= find_in_list(
3566 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3568 task
= asyncio
.ensure_future(
3577 k8params
=desc_params
,
3582 self
.lcm_tasks
.register(
3586 "instantiate_KDU-{}".format(index
),
3589 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3595 except (LcmException
, asyncio
.CancelledError
):
3597 except Exception as e
:
3598 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3599 if isinstance(e
, (N2VCException
, DbException
)):
3600 self
.logger
.error(logging_text
+ msg
)
3602 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3603 raise LcmException(msg
)
3606 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3625 task_instantiation_info
,
3628 # launch instantiate_N2VC in a asyncio task and register task object
3629 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3630 # if not found, create one entry and update database
3631 # fill db_nsr._admin.deployed.VCA.<index>
3634 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3636 if "execution-environment-list" in descriptor_config
:
3637 ee_list
= descriptor_config
.get("execution-environment-list", [])
3638 elif "juju" in descriptor_config
:
3639 ee_list
= [descriptor_config
] # ns charms
3640 else: # other types as script are not supported
3643 for ee_item
in ee_list
:
3646 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3647 ee_item
.get("juju"), ee_item
.get("helm-chart")
3650 ee_descriptor_id
= ee_item
.get("id")
3651 if ee_item
.get("juju"):
3652 vca_name
= ee_item
["juju"].get("charm")
3655 if ee_item
["juju"].get("charm") is not None
3658 if ee_item
["juju"].get("cloud") == "k8s":
3659 vca_type
= "k8s_proxy_charm"
3660 elif ee_item
["juju"].get("proxy") is False:
3661 vca_type
= "native_charm"
3662 elif ee_item
.get("helm-chart"):
3663 vca_name
= ee_item
["helm-chart"]
3664 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3667 vca_type
= "helm-v3"
3670 logging_text
+ "skipping non juju neither charm configuration"
3675 for vca_index
, vca_deployed
in enumerate(
3676 db_nsr
["_admin"]["deployed"]["VCA"]
3678 if not vca_deployed
:
3681 vca_deployed
.get("member-vnf-index") == member_vnf_index
3682 and vca_deployed
.get("vdu_id") == vdu_id
3683 and vca_deployed
.get("kdu_name") == kdu_name
3684 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3685 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3689 # not found, create one.
3691 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3694 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3696 target
+= "/kdu/{}".format(kdu_name
)
3698 "target_element": target
,
3699 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3700 "member-vnf-index": member_vnf_index
,
3702 "kdu_name": kdu_name
,
3703 "vdu_count_index": vdu_index
,
3704 "operational-status": "init", # TODO revise
3705 "detailed-status": "", # TODO revise
3706 "step": "initial-deploy", # TODO revise
3708 "vdu_name": vdu_name
,
3710 "ee_descriptor_id": ee_descriptor_id
,
3714 # create VCA and configurationStatus in db
3716 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3717 "configurationStatus.{}".format(vca_index
): dict(),
3719 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3721 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3723 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3724 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3725 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3728 task_n2vc
= asyncio
.ensure_future(
3729 self
.instantiate_N2VC(
3730 logging_text
=logging_text
,
3731 vca_index
=vca_index
,
3737 vdu_index
=vdu_index
,
3738 deploy_params
=deploy_params
,
3739 config_descriptor
=descriptor_config
,
3740 base_folder
=base_folder
,
3741 nslcmop_id
=nslcmop_id
,
3745 ee_config_descriptor
=ee_item
,
3748 self
.lcm_tasks
.register(
3752 "instantiate_N2VC-{}".format(vca_index
),
3755 task_instantiation_info
[
3757 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3758 member_vnf_index
or "", vdu_id
or ""
3762 def _create_nslcmop(nsr_id
, operation
, params
):
3764 Creates a ns-lcm-opp content to be stored at database.
3765 :param nsr_id: internal id of the instance
3766 :param operation: instantiate, terminate, scale, action, ...
3767 :param params: user parameters for the operation
3768 :return: dictionary following SOL005 format
3770 # Raise exception if invalid arguments
3771 if not (nsr_id
and operation
and params
):
3773 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3780 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3781 "operationState": "PROCESSING",
3782 "statusEnteredTime": now
,
3783 "nsInstanceId": nsr_id
,
3784 "lcmOperationType": operation
,
3786 "isAutomaticInvocation": False,
3787 "operationParams": params
,
3788 "isCancelPending": False,
3790 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3791 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3796 def _format_additional_params(self
, params
):
3797 params
= params
or {}
3798 for key
, value
in params
.items():
3799 if str(value
).startswith("!!yaml "):
3800 params
[key
] = yaml
.safe_load(value
[7:])
3803 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3804 primitive
= seq
.get("name")
3805 primitive_params
= {}
3807 "member_vnf_index": vnf_index
,
3808 "primitive": primitive
,
3809 "primitive_params": primitive_params
,
3812 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3816 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3817 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3818 if op
.get("operationState") == "COMPLETED":
3819 # b. Skip sub-operation
3820 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3821 return self
.SUBOPERATION_STATUS_SKIP
3823 # c. retry executing sub-operation
3824 # The sub-operation exists, and operationState != 'COMPLETED'
3825 # Update operationState = 'PROCESSING' to indicate a retry.
3826 operationState
= "PROCESSING"
3827 detailed_status
= "In progress"
3828 self
._update
_suboperation
_status
(
3829 db_nslcmop
, op_index
, operationState
, detailed_status
3831 # Return the sub-operation index
3832 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3833 # with arguments extracted from the sub-operation
3836 # Find a sub-operation where all keys in a matching dictionary must match
3837 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3838 def _find_suboperation(self
, db_nslcmop
, match
):
3839 if db_nslcmop
and match
:
3840 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3841 for i
, op
in enumerate(op_list
):
3842 if all(op
.get(k
) == match
[k
] for k
in match
):
3844 return self
.SUBOPERATION_STATUS_NOT_FOUND
3846 # Update status for a sub-operation given its index
3847 def _update_suboperation_status(
3848 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3850 # Update DB for HA tasks
3851 q_filter
= {"_id": db_nslcmop
["_id"]}
3853 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3854 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3857 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3860 # Add sub-operation, return the index of the added sub-operation
3861 # Optionally, set operationState, detailed-status, and operationType
3862 # Status and type are currently set for 'scale' sub-operations:
3863 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3864 # 'detailed-status' : status message
3865 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3866 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3867 def _add_suboperation(
3875 mapped_primitive_params
,
3876 operationState
=None,
3877 detailed_status
=None,
3880 RO_scaling_info
=None,
3883 return self
.SUBOPERATION_STATUS_NOT_FOUND
3884 # Get the "_admin.operations" list, if it exists
3885 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3886 op_list
= db_nslcmop_admin
.get("operations")
3887 # Create or append to the "_admin.operations" list
3889 "member_vnf_index": vnf_index
,
3891 "vdu_count_index": vdu_count_index
,
3892 "primitive": primitive
,
3893 "primitive_params": mapped_primitive_params
,
3896 new_op
["operationState"] = operationState
3898 new_op
["detailed-status"] = detailed_status
3900 new_op
["lcmOperationType"] = operationType
3902 new_op
["RO_nsr_id"] = RO_nsr_id
3904 new_op
["RO_scaling_info"] = RO_scaling_info
3906 # No existing operations, create key 'operations' with current operation as first list element
3907 db_nslcmop_admin
.update({"operations": [new_op
]})
3908 op_list
= db_nslcmop_admin
.get("operations")
3910 # Existing operations, append operation to list
3911 op_list
.append(new_op
)
3913 db_nslcmop_update
= {"_admin.operations": op_list
}
3914 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
3915 op_index
= len(op_list
) - 1
3918 # Helper methods for scale() sub-operations
3920 # pre-scale/post-scale:
3921 # Check for 3 different cases:
3922 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3923 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3924 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3925 def _check_or_add_scale_suboperation(
3929 vnf_config_primitive
,
3933 RO_scaling_info
=None,
3935 # Find this sub-operation
3936 if RO_nsr_id
and RO_scaling_info
:
3937 operationType
= "SCALE-RO"
3939 "member_vnf_index": vnf_index
,
3940 "RO_nsr_id": RO_nsr_id
,
3941 "RO_scaling_info": RO_scaling_info
,
3945 "member_vnf_index": vnf_index
,
3946 "primitive": vnf_config_primitive
,
3947 "primitive_params": primitive_params
,
3948 "lcmOperationType": operationType
,
3950 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3951 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3952 # a. New sub-operation
3953 # The sub-operation does not exist, add it.
3954 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3955 # The following parameters are set to None for all kind of scaling:
3957 vdu_count_index
= None
3959 if RO_nsr_id
and RO_scaling_info
:
3960 vnf_config_primitive
= None
3961 primitive_params
= None
3964 RO_scaling_info
= None
3965 # Initial status for sub-operation
3966 operationState
= "PROCESSING"
3967 detailed_status
= "In progress"
3968 # Add sub-operation for pre/post-scaling (zero or more operations)
3969 self
._add
_suboperation
(
3975 vnf_config_primitive
,
3983 return self
.SUBOPERATION_STATUS_NEW
3985 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3986 # or op_index (operationState != 'COMPLETED')
3987 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3989 # Function to return execution_environment id
3991 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3992 # TODO vdu_index_count
3993 for vca
in vca_deployed_list
:
3994 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3997 async def destroy_N2VC(
4005 exec_primitives
=True,
4010 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4011 :param logging_text:
4013 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4014 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4015 :param vca_index: index in the database _admin.deployed.VCA
4016 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4017 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4018 not executed properly
4019 :param scaling_in: True destroys the application, False destroys the model
4020 :return: None or exception
4025 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4026 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4030 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4032 # execute terminate_primitives
4034 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4035 config_descriptor
.get("terminate-config-primitive"),
4036 vca_deployed
.get("ee_descriptor_id"),
4038 vdu_id
= vca_deployed
.get("vdu_id")
4039 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4040 vdu_name
= vca_deployed
.get("vdu_name")
4041 vnf_index
= vca_deployed
.get("member-vnf-index")
4042 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4043 for seq
in terminate_primitives
:
4044 # For each sequence in list, get primitive and call _ns_execute_primitive()
4045 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4046 vnf_index
, seq
.get("name")
4048 self
.logger
.debug(logging_text
+ step
)
4049 # Create the primitive for each sequence, i.e. "primitive": "touch"
4050 primitive
= seq
.get("name")
4051 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4056 self
._add
_suboperation
(
4063 mapped_primitive_params
,
4065 # Sub-operations: Call _ns_execute_primitive() instead of action()
4067 result
, result_detail
= await self
._ns
_execute
_primitive
(
4068 vca_deployed
["ee_id"],
4070 mapped_primitive_params
,
4074 except LcmException
:
4075 # this happens when VCA is not deployed. In this case it is not needed to terminate
4077 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4078 if result
not in result_ok
:
4080 "terminate_primitive {} for vnf_member_index={} fails with "
4081 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4083 # set that this VCA do not need terminated
4084 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4088 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4091 # Delete Prometheus Jobs if any
4092 # This uses NSR_ID, so it will destroy any jobs under this index
4093 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4096 await self
.vca_map
[vca_type
].delete_execution_environment(
4097 vca_deployed
["ee_id"],
4098 scaling_in
=scaling_in
,
4103 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4104 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4105 namespace
= "." + db_nsr
["_id"]
4107 await self
.n2vc
.delete_namespace(
4108 namespace
=namespace
,
4109 total_timeout
=self
.timeout_charm_delete
,
4112 except N2VCNotFound
: # already deleted. Skip
4114 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4116 async def _terminate_RO(
4117 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4120 Terminates a deployment from RO
4121 :param logging_text:
4122 :param nsr_deployed: db_nsr._admin.deployed
4125 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4126 this method will update only the index 2, but it will write on database the concatenated content of the list
4131 ro_nsr_id
= ro_delete_action
= None
4132 if nsr_deployed
and nsr_deployed
.get("RO"):
4133 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4134 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4137 stage
[2] = "Deleting ns from VIM."
4138 db_nsr_update
["detailed-status"] = " ".join(stage
)
4139 self
._write
_op
_status
(nslcmop_id
, stage
)
4140 self
.logger
.debug(logging_text
+ stage
[2])
4141 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4142 self
._write
_op
_status
(nslcmop_id
, stage
)
4143 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4144 ro_delete_action
= desc
["action_id"]
4146 "_admin.deployed.RO.nsr_delete_action_id"
4147 ] = ro_delete_action
4148 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4149 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4150 if ro_delete_action
:
4151 # wait until NS is deleted from VIM
4152 stage
[2] = "Waiting ns deleted from VIM."
4153 detailed_status_old
= None
4157 + " RO_id={} ro_delete_action={}".format(
4158 ro_nsr_id
, ro_delete_action
4161 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4162 self
._write
_op
_status
(nslcmop_id
, stage
)
4164 delete_timeout
= 20 * 60 # 20 minutes
4165 while delete_timeout
> 0:
4166 desc
= await self
.RO
.show(
4168 item_id_name
=ro_nsr_id
,
4169 extra_item
="action",
4170 extra_item_id
=ro_delete_action
,
4174 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4176 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4177 if ns_status
== "ERROR":
4178 raise ROclient
.ROClientException(ns_status_info
)
4179 elif ns_status
== "BUILD":
4180 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4181 elif ns_status
== "ACTIVE":
4182 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4183 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4188 ), "ROclient.check_action_status returns unknown {}".format(
4191 if stage
[2] != detailed_status_old
:
4192 detailed_status_old
= stage
[2]
4193 db_nsr_update
["detailed-status"] = " ".join(stage
)
4194 self
._write
_op
_status
(nslcmop_id
, stage
)
4195 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4196 await asyncio
.sleep(5, loop
=self
.loop
)
4198 else: # delete_timeout <= 0:
4199 raise ROclient
.ROClientException(
4200 "Timeout waiting ns deleted from VIM"
4203 except Exception as e
:
4204 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4206 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4208 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4209 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4210 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4212 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4215 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4217 failed_detail
.append("delete conflict: {}".format(e
))
4220 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4223 failed_detail
.append("delete error: {}".format(e
))
4225 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4229 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4230 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4232 stage
[2] = "Deleting nsd from RO."
4233 db_nsr_update
["detailed-status"] = " ".join(stage
)
4234 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4235 self
._write
_op
_status
(nslcmop_id
, stage
)
4236 await self
.RO
.delete("nsd", ro_nsd_id
)
4238 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4240 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4241 except Exception as e
:
4243 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4245 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4247 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4250 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4252 failed_detail
.append(
4253 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4255 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4257 failed_detail
.append(
4258 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4260 self
.logger
.error(logging_text
+ failed_detail
[-1])
4262 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4263 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4264 if not vnf_deployed
or not vnf_deployed
["id"]:
4267 ro_vnfd_id
= vnf_deployed
["id"]
4270 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4271 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4273 db_nsr_update
["detailed-status"] = " ".join(stage
)
4274 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4275 self
._write
_op
_status
(nslcmop_id
, stage
)
4276 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4278 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4280 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4281 except Exception as e
:
4283 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4286 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4290 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4293 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4295 failed_detail
.append(
4296 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4298 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4300 failed_detail
.append(
4301 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4303 self
.logger
.error(logging_text
+ failed_detail
[-1])
4306 stage
[2] = "Error deleting from VIM"
4308 stage
[2] = "Deleted from VIM"
4309 db_nsr_update
["detailed-status"] = " ".join(stage
)
4310 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4311 self
._write
_op
_status
(nslcmop_id
, stage
)
4314 raise LcmException("; ".join(failed_detail
))
4316 async def terminate(self
, nsr_id
, nslcmop_id
):
4317 # Try to lock HA task here
4318 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4319 if not task_is_locked_by_me
:
4322 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4323 self
.logger
.debug(logging_text
+ "Enter")
4324 timeout_ns_terminate
= self
.timeout_ns_terminate
4327 operation_params
= None
4329 error_list
= [] # annotates all failed error messages
4330 db_nslcmop_update
= {}
4331 autoremove
= False # autoremove after terminated
4332 tasks_dict_info
= {}
4335 "Stage 1/3: Preparing task.",
4336 "Waiting for previous operations to terminate.",
4339 # ^ contains [stage, step, VIM-status]
4341 # wait for any previous tasks in process
4342 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4344 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4345 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4346 operation_params
= db_nslcmop
.get("operationParams") or {}
4347 if operation_params
.get("timeout_ns_terminate"):
4348 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4349 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4350 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4352 db_nsr_update
["operational-status"] = "terminating"
4353 db_nsr_update
["config-status"] = "terminating"
4354 self
._write
_ns
_status
(
4356 ns_state
="TERMINATING",
4357 current_operation
="TERMINATING",
4358 current_operation_id
=nslcmop_id
,
4359 other_update
=db_nsr_update
,
4361 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4362 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4363 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4366 stage
[1] = "Getting vnf descriptors from db."
4367 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4369 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4371 db_vnfds_from_id
= {}
4372 db_vnfds_from_member_index
= {}
4374 for vnfr
in db_vnfrs_list
:
4375 vnfd_id
= vnfr
["vnfd-id"]
4376 if vnfd_id
not in db_vnfds_from_id
:
4377 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4378 db_vnfds_from_id
[vnfd_id
] = vnfd
4379 db_vnfds_from_member_index
[
4380 vnfr
["member-vnf-index-ref"]
4381 ] = db_vnfds_from_id
[vnfd_id
]
4383 # Destroy individual execution environments when there are terminating primitives.
4384 # Rest of EE will be deleted at once
4385 # TODO - check before calling _destroy_N2VC
4386 # if not operation_params.get("skip_terminate_primitives"):#
4387 # or not vca.get("needed_terminate"):
4388 stage
[0] = "Stage 2/3 execute terminating primitives."
4389 self
.logger
.debug(logging_text
+ stage
[0])
4390 stage
[1] = "Looking execution environment that needs terminate."
4391 self
.logger
.debug(logging_text
+ stage
[1])
4393 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4394 config_descriptor
= None
4395 vca_member_vnf_index
= vca
.get("member-vnf-index")
4396 vca_id
= self
.get_vca_id(
4397 db_vnfrs_dict
.get(vca_member_vnf_index
)
4398 if vca_member_vnf_index
4402 if not vca
or not vca
.get("ee_id"):
4404 if not vca
.get("member-vnf-index"):
4406 config_descriptor
= db_nsr
.get("ns-configuration")
4407 elif vca
.get("vdu_id"):
4408 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4409 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4410 elif vca
.get("kdu_name"):
4411 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4412 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4414 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4415 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4416 vca_type
= vca
.get("type")
4417 exec_terminate_primitives
= not operation_params
.get(
4418 "skip_terminate_primitives"
4419 ) and vca
.get("needed_terminate")
4420 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4421 # pending native charms
4423 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4425 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4426 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4427 task
= asyncio
.ensure_future(
4435 exec_terminate_primitives
,
4439 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4441 # wait for pending tasks of terminate primitives
4445 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4447 error_list
= await self
._wait
_for
_tasks
(
4450 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4454 tasks_dict_info
.clear()
4456 return # raise LcmException("; ".join(error_list))
4458 # remove All execution environments at once
4459 stage
[0] = "Stage 3/3 delete all."
4461 if nsr_deployed
.get("VCA"):
4462 stage
[1] = "Deleting all execution environments."
4463 self
.logger
.debug(logging_text
+ stage
[1])
4464 vca_id
= self
.get_vca_id({}, db_nsr
)
4465 task_delete_ee
= asyncio
.ensure_future(
4467 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4468 timeout
=self
.timeout_charm_delete
,
4471 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4472 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4474 # Delete from k8scluster
4475 stage
[1] = "Deleting KDUs."
4476 self
.logger
.debug(logging_text
+ stage
[1])
4477 # print(nsr_deployed)
4478 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4479 if not kdu
or not kdu
.get("kdu-instance"):
4481 kdu_instance
= kdu
.get("kdu-instance")
4482 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4483 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4484 vca_id
= self
.get_vca_id({}, db_nsr
)
4485 task_delete_kdu_instance
= asyncio
.ensure_future(
4486 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4487 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4488 kdu_instance
=kdu_instance
,
4490 namespace
=kdu
.get("namespace"),
4496 + "Unknown k8s deployment type {}".format(
4497 kdu
.get("k8scluster-type")
4502 task_delete_kdu_instance
4503 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4506 stage
[1] = "Deleting ns from VIM."
4508 task_delete_ro
= asyncio
.ensure_future(
4509 self
._terminate
_ng
_ro
(
4510 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4514 task_delete_ro
= asyncio
.ensure_future(
4516 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4519 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4521 # rest of staff will be done at finally
4524 ROclient
.ROClientException
,
4529 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4531 except asyncio
.CancelledError
:
4533 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4535 exc
= "Operation was cancelled"
4536 except Exception as e
:
4537 exc
= traceback
.format_exc()
4538 self
.logger
.critical(
4539 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4544 error_list
.append(str(exc
))
4546 # wait for pending tasks
4548 stage
[1] = "Waiting for terminate pending tasks."
4549 self
.logger
.debug(logging_text
+ stage
[1])
4550 error_list
+= await self
._wait
_for
_tasks
(
4553 timeout_ns_terminate
,
4557 stage
[1] = stage
[2] = ""
4558 except asyncio
.CancelledError
:
4559 error_list
.append("Cancelled")
4560 # TODO cancell all tasks
4561 except Exception as exc
:
4562 error_list
.append(str(exc
))
4563 # update status at database
4565 error_detail
= "; ".join(error_list
)
4566 # self.logger.error(logging_text + error_detail)
4567 error_description_nslcmop
= "{} Detail: {}".format(
4568 stage
[0], error_detail
4570 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4571 nslcmop_id
, stage
[0]
4574 db_nsr_update
["operational-status"] = "failed"
4575 db_nsr_update
["detailed-status"] = (
4576 error_description_nsr
+ " Detail: " + error_detail
4578 db_nslcmop_update
["detailed-status"] = error_detail
4579 nslcmop_operation_state
= "FAILED"
4583 error_description_nsr
= error_description_nslcmop
= None
4584 ns_state
= "NOT_INSTANTIATED"
4585 db_nsr_update
["operational-status"] = "terminated"
4586 db_nsr_update
["detailed-status"] = "Done"
4587 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4588 db_nslcmop_update
["detailed-status"] = "Done"
4589 nslcmop_operation_state
= "COMPLETED"
4592 self
._write
_ns
_status
(
4595 current_operation
="IDLE",
4596 current_operation_id
=None,
4597 error_description
=error_description_nsr
,
4598 error_detail
=error_detail
,
4599 other_update
=db_nsr_update
,
4601 self
._write
_op
_status
(
4604 error_message
=error_description_nslcmop
,
4605 operation_state
=nslcmop_operation_state
,
4606 other_update
=db_nslcmop_update
,
4608 if ns_state
== "NOT_INSTANTIATED":
4612 {"nsr-id-ref": nsr_id
},
4613 {"_admin.nsState": "NOT_INSTANTIATED"},
4615 except DbException
as e
:
4618 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4622 if operation_params
:
4623 autoremove
= operation_params
.get("autoremove", False)
4624 if nslcmop_operation_state
:
4626 await self
.msg
.aiowrite(
4631 "nslcmop_id": nslcmop_id
,
4632 "operationState": nslcmop_operation_state
,
4633 "autoremove": autoremove
,
4637 except Exception as e
:
4639 logging_text
+ "kafka_write notification Exception {}".format(e
)
4642 self
.logger
.debug(logging_text
+ "Exit")
4643 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4645 async def _wait_for_tasks(
4646 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4649 error_detail_list
= []
4651 pending_tasks
= list(created_tasks_info
.keys())
4652 num_tasks
= len(pending_tasks
)
4654 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4655 self
._write
_op
_status
(nslcmop_id
, stage
)
4656 while pending_tasks
:
4658 _timeout
= timeout
+ time_start
- time()
4659 done
, pending_tasks
= await asyncio
.wait(
4660 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4662 num_done
+= len(done
)
4663 if not done
: # Timeout
4664 for task
in pending_tasks
:
4665 new_error
= created_tasks_info
[task
] + ": Timeout"
4666 error_detail_list
.append(new_error
)
4667 error_list
.append(new_error
)
4670 if task
.cancelled():
4673 exc
= task
.exception()
4675 if isinstance(exc
, asyncio
.TimeoutError
):
4677 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4678 error_list
.append(created_tasks_info
[task
])
4679 error_detail_list
.append(new_error
)
4686 ROclient
.ROClientException
,
4692 self
.logger
.error(logging_text
+ new_error
)
4694 exc_traceback
= "".join(
4695 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4699 + created_tasks_info
[task
]
4705 logging_text
+ created_tasks_info
[task
] + ": Done"
4707 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4709 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4710 if nsr_id
: # update also nsr
4715 "errorDescription": "Error at: " + ", ".join(error_list
),
4716 "errorDetail": ". ".join(error_detail_list
),
4719 self
._write
_op
_status
(nslcmop_id
, stage
)
4720 return error_detail_list
4723 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4725 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4726 The default-value is used. If it is between < > it look for a value at instantiation_params
4727 :param primitive_desc: portion of VNFD/NSD that describes primitive
4728 :param params: Params provided by user
4729 :param instantiation_params: Instantiation params provided by user
4730 :return: a dictionary with the calculated params
4732 calculated_params
= {}
4733 for parameter
in primitive_desc
.get("parameter", ()):
4734 param_name
= parameter
["name"]
4735 if param_name
in params
:
4736 calculated_params
[param_name
] = params
[param_name
]
4737 elif "default-value" in parameter
or "value" in parameter
:
4738 if "value" in parameter
:
4739 calculated_params
[param_name
] = parameter
["value"]
4741 calculated_params
[param_name
] = parameter
["default-value"]
4743 isinstance(calculated_params
[param_name
], str)
4744 and calculated_params
[param_name
].startswith("<")
4745 and calculated_params
[param_name
].endswith(">")
4747 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4748 calculated_params
[param_name
] = instantiation_params
[
4749 calculated_params
[param_name
][1:-1]
4753 "Parameter {} needed to execute primitive {} not provided".format(
4754 calculated_params
[param_name
], primitive_desc
["name"]
4759 "Parameter {} needed to execute primitive {} not provided".format(
4760 param_name
, primitive_desc
["name"]
4764 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4765 calculated_params
[param_name
] = yaml
.safe_dump(
4766 calculated_params
[param_name
], default_flow_style
=True, width
=256
4768 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4770 ].startswith("!!yaml "):
4771 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4772 if parameter
.get("data-type") == "INTEGER":
4774 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4775 except ValueError: # error converting string to int
4777 "Parameter {} of primitive {} must be integer".format(
4778 param_name
, primitive_desc
["name"]
4781 elif parameter
.get("data-type") == "BOOLEAN":
4782 calculated_params
[param_name
] = not (
4783 (str(calculated_params
[param_name
])).lower() == "false"
4786 # add always ns_config_info if primitive name is config
4787 if primitive_desc
["name"] == "config":
4788 if "ns_config_info" in instantiation_params
:
4789 calculated_params
["ns_config_info"] = instantiation_params
[
4792 return calculated_params
4794 def _look_for_deployed_vca(
4801 ee_descriptor_id
=None,
4803 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4804 for vca
in deployed_vca
:
4807 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4810 vdu_count_index
is not None
4811 and vdu_count_index
!= vca
["vdu_count_index"]
4814 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4816 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4820 # vca_deployed not found
4822 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4823 " is not deployed".format(
4832 ee_id
= vca
.get("ee_id")
4834 "type", "lxc_proxy_charm"
4835 ) # default value for backward compatibility - proxy charm
4838 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4839 "execution environment".format(
4840 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4843 return ee_id
, vca_type
4845 async def _ns_execute_primitive(
4851 retries_interval
=30,
4858 if primitive
== "config":
4859 primitive_params
= {"params": primitive_params
}
4861 vca_type
= vca_type
or "lxc_proxy_charm"
4865 output
= await asyncio
.wait_for(
4866 self
.vca_map
[vca_type
].exec_primitive(
4868 primitive_name
=primitive
,
4869 params_dict
=primitive_params
,
4870 progress_timeout
=self
.timeout_progress_primitive
,
4871 total_timeout
=self
.timeout_primitive
,
4876 timeout
=timeout
or self
.timeout_primitive
,
4880 except asyncio
.CancelledError
:
4882 except Exception as e
: # asyncio.TimeoutError
4883 if isinstance(e
, asyncio
.TimeoutError
):
4888 "Error executing action {} on {} -> {}".format(
4893 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
4895 return "FAILED", str(e
)
4897 return "COMPLETED", output
4899 except (LcmException
, asyncio
.CancelledError
):
4901 except Exception as e
:
4902 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
4904 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
4906 Updating the vca_status with latest juju information in nsrs record
4907 :param: nsr_id: Id of the nsr
4908 :param: nslcmop_id: Id of the nslcmop
4912 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
4913 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4914 vca_id
= self
.get_vca_id({}, db_nsr
)
4915 if db_nsr
["_admin"]["deployed"]["K8s"]:
4916 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
4917 cluster_uuid
, kdu_instance
, cluster_type
= (
4918 k8s
["k8scluster-uuid"],
4919 k8s
["kdu-instance"],
4920 k8s
["k8scluster-type"],
4922 await self
._on
_update
_k
8s
_db
(
4923 cluster_uuid
=cluster_uuid
,
4924 kdu_instance
=kdu_instance
,
4925 filter={"_id": nsr_id
},
4927 cluster_type
=cluster_type
,
4930 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
4931 table
, filter = "nsrs", {"_id": nsr_id
}
4932 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
4933 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
4935 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
4936 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
4938 async def action(self
, nsr_id
, nslcmop_id
):
4939 # Try to lock HA task here
4940 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4941 if not task_is_locked_by_me
:
4944 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
4945 self
.logger
.debug(logging_text
+ "Enter")
4946 # get all needed from database
4950 db_nslcmop_update
= {}
4951 nslcmop_operation_state
= None
4952 error_description_nslcmop
= None
4955 # wait for any previous tasks in process
4956 step
= "Waiting for previous operations to terminate"
4957 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4959 self
._write
_ns
_status
(
4962 current_operation
="RUNNING ACTION",
4963 current_operation_id
=nslcmop_id
,
4966 step
= "Getting information from database"
4967 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4968 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4969 if db_nslcmop
["operationParams"].get("primitive_params"):
4970 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
4971 db_nslcmop
["operationParams"]["primitive_params"]
4974 nsr_deployed
= db_nsr
["_admin"].get("deployed")
4975 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4976 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
4977 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
4978 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
4979 primitive
= db_nslcmop
["operationParams"]["primitive"]
4980 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
4981 timeout_ns_action
= db_nslcmop
["operationParams"].get(
4982 "timeout_ns_action", self
.timeout_primitive
4986 step
= "Getting vnfr from database"
4987 db_vnfr
= self
.db
.get_one(
4988 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
4990 if db_vnfr
.get("kdur"):
4992 for kdur
in db_vnfr
["kdur"]:
4993 if kdur
.get("additionalParams"):
4994 kdur
["additionalParams"] = json
.loads(
4995 kdur
["additionalParams"]
4997 kdur_list
.append(kdur
)
4998 db_vnfr
["kdur"] = kdur_list
4999 step
= "Getting vnfd from database"
5000 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5002 # Sync filesystem before running a primitive
5003 self
.fs
.sync(db_vnfr
["vnfd-id"])
5005 step
= "Getting nsd from database"
5006 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5008 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5009 # for backward compatibility
5010 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5011 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5012 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5013 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5015 # look for primitive
5016 config_primitive_desc
= descriptor_configuration
= None
5018 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5020 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5022 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5024 descriptor_configuration
= db_nsd
.get("ns-configuration")
5026 if descriptor_configuration
and descriptor_configuration
.get(
5029 for config_primitive
in descriptor_configuration
["config-primitive"]:
5030 if config_primitive
["name"] == primitive
:
5031 config_primitive_desc
= config_primitive
5034 if not config_primitive_desc
:
5035 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5037 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5041 primitive_name
= primitive
5042 ee_descriptor_id
= None
5044 primitive_name
= config_primitive_desc
.get(
5045 "execution-environment-primitive", primitive
5047 ee_descriptor_id
= config_primitive_desc
.get(
5048 "execution-environment-ref"
5054 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5056 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5059 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5061 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5063 desc_params
= parse_yaml_strings(
5064 db_vnfr
.get("additionalParamsForVnf")
5067 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5068 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5069 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5071 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5072 actions
.add(primitive
["name"])
5073 for primitive
in kdu_configuration
.get("config-primitive", []):
5074 actions
.add(primitive
["name"])
5076 nsr_deployed
["K8s"],
5077 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5078 and kdu
["member-vnf-index"] == vnf_index
,
5082 if primitive_name
in actions
5083 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5087 # TODO check if ns is in a proper status
5089 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5091 # kdur and desc_params already set from before
5092 if primitive_params
:
5093 desc_params
.update(primitive_params
)
5094 # TODO Check if we will need something at vnf level
5095 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5097 kdu_name
== kdu
["kdu-name"]
5098 and kdu
["member-vnf-index"] == vnf_index
5103 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5106 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5107 msg
= "unknown k8scluster-type '{}'".format(
5108 kdu
.get("k8scluster-type")
5110 raise LcmException(msg
)
5113 "collection": "nsrs",
5114 "filter": {"_id": nsr_id
},
5115 "path": "_admin.deployed.K8s.{}".format(index
),
5119 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5121 step
= "Executing kdu {}".format(primitive_name
)
5122 if primitive_name
== "upgrade":
5123 if desc_params
.get("kdu_model"):
5124 kdu_model
= desc_params
.get("kdu_model")
5125 del desc_params
["kdu_model"]
5127 kdu_model
= kdu
.get("kdu-model")
5128 parts
= kdu_model
.split(sep
=":")
5130 kdu_model
= parts
[0]
5132 detailed_status
= await asyncio
.wait_for(
5133 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5134 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5135 kdu_instance
=kdu
.get("kdu-instance"),
5137 kdu_model
=kdu_model
,
5140 timeout
=timeout_ns_action
,
5142 timeout
=timeout_ns_action
+ 10,
5145 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5147 elif primitive_name
== "rollback":
5148 detailed_status
= await asyncio
.wait_for(
5149 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5150 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5151 kdu_instance
=kdu
.get("kdu-instance"),
5154 timeout
=timeout_ns_action
,
5156 elif primitive_name
== "status":
5157 detailed_status
= await asyncio
.wait_for(
5158 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5159 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5160 kdu_instance
=kdu
.get("kdu-instance"),
5163 timeout
=timeout_ns_action
,
5166 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5167 kdu
["kdu-name"], nsr_id
5169 params
= self
._map
_primitive
_params
(
5170 config_primitive_desc
, primitive_params
, desc_params
5173 detailed_status
= await asyncio
.wait_for(
5174 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5175 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5176 kdu_instance
=kdu_instance
,
5177 primitive_name
=primitive_name
,
5180 timeout
=timeout_ns_action
,
5183 timeout
=timeout_ns_action
,
5187 nslcmop_operation_state
= "COMPLETED"
5189 detailed_status
= ""
5190 nslcmop_operation_state
= "FAILED"
5192 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5193 nsr_deployed
["VCA"],
5194 member_vnf_index
=vnf_index
,
5196 vdu_count_index
=vdu_count_index
,
5197 ee_descriptor_id
=ee_descriptor_id
,
5199 for vca_index
, vca_deployed
in enumerate(
5200 db_nsr
["_admin"]["deployed"]["VCA"]
5202 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5204 "collection": "nsrs",
5205 "filter": {"_id": nsr_id
},
5206 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5210 nslcmop_operation_state
,
5212 ) = await self
._ns
_execute
_primitive
(
5214 primitive
=primitive_name
,
5215 primitive_params
=self
._map
_primitive
_params
(
5216 config_primitive_desc
, primitive_params
, desc_params
5218 timeout
=timeout_ns_action
,
5224 db_nslcmop_update
["detailed-status"] = detailed_status
5225 error_description_nslcmop
= (
5226 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5230 + " task Done with result {} {}".format(
5231 nslcmop_operation_state
, detailed_status
5234 return # database update is called inside finally
5236 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5237 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5239 except asyncio
.CancelledError
:
5241 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5243 exc
= "Operation was cancelled"
5244 except asyncio
.TimeoutError
:
5245 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5247 except Exception as e
:
5248 exc
= traceback
.format_exc()
5249 self
.logger
.critical(
5250 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5259 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5260 nslcmop_operation_state
= "FAILED"
5262 self
._write
_ns
_status
(
5266 ], # TODO check if degraded. For the moment use previous status
5267 current_operation
="IDLE",
5268 current_operation_id
=None,
5269 # error_description=error_description_nsr,
5270 # error_detail=error_detail,
5271 other_update
=db_nsr_update
,
5274 self
._write
_op
_status
(
5277 error_message
=error_description_nslcmop
,
5278 operation_state
=nslcmop_operation_state
,
5279 other_update
=db_nslcmop_update
,
5282 if nslcmop_operation_state
:
5284 await self
.msg
.aiowrite(
5289 "nslcmop_id": nslcmop_id
,
5290 "operationState": nslcmop_operation_state
,
5294 except Exception as e
:
5296 logging_text
+ "kafka_write notification Exception {}".format(e
)
5298 self
.logger
.debug(logging_text
+ "Exit")
5299 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5300 return nslcmop_operation_state
, detailed_status
5302 async def terminate_vdus(
5303 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5305 """This method terminates VDUs
5308 db_vnfr: VNF instance record
5309 member_vnf_index: VNF index to identify the VDUs to be removed
5310 db_nsr: NS instance record
5311 update_db_nslcmops: Nslcmop update record
5313 vca_scaling_info
= []
5314 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5315 scaling_info
["scaling_direction"] = "IN"
5316 scaling_info
["vdu-delete"] = {}
5317 scaling_info
["kdu-delete"] = {}
5318 db_vdur
= db_vnfr
.get("vdur")
5319 vdur_list
= copy(db_vdur
)
5321 for index
, vdu
in enumerate(vdur_list
):
5322 vca_scaling_info
.append(
5324 "osm_vdu_id": vdu
["vdu-id-ref"],
5325 "member-vnf-index": member_vnf_index
,
5327 "vdu_index": count_index
,
5329 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5330 scaling_info
["vdu"].append(
5332 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5333 "vdu_id": vdu
["vdu-id-ref"],
5336 for interface
in vdu
["interfaces"]:
5337 scaling_info
["vdu"][index
]["interface"].append(
5339 "name": interface
["name"],
5340 "ip_address": interface
["ip-address"],
5341 "mac_address": interface
.get("mac-address"),
5343 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5344 stage
[2] = "Terminating VDUs"
5345 if scaling_info
.get("vdu-delete"):
5346 # scale_process = "RO"
5347 if self
.ro_config
.get("ng"):
5348 await self
._scale
_ng
_ro
(
5349 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5352 async def remove_vnf(
5353 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5355 """This method is to Remove VNF instances from NS.
5358 nsr_id: NS instance id
5359 nslcmop_id: nslcmop id of update
5360 vnf_instance_id: id of the VNF instance to be removed
5363 result: (str, str) COMPLETED/FAILED, details
5367 logging_text
= "Task ns={} update ".format(nsr_id
)
5368 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5369 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5370 if check_vnfr_count
> 1:
5371 stage
= ["", "", ""]
5372 step
= "Getting nslcmop from database"
5373 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5374 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5375 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5376 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5377 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5378 """ db_vnfr = self.db.get_one(
5379 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5381 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5382 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5384 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5385 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5386 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5387 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5388 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5389 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5390 return "COMPLETED", "Done"
5392 step
= "Terminate VNF Failed with"
5393 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5395 except (LcmException
, asyncio
.CancelledError
):
5397 except Exception as e
:
5398 self
.logger
.debug("Error removing VNF {}".format(e
))
5399 return "FAILED", "Error removing VNF {}".format(e
)
5401 async def _ns_redeploy_vnf(
5402 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5404 """This method updates and redeploys VNF instances
5407 nsr_id: NS instance id
5408 nslcmop_id: nslcmop id
5409 db_vnfd: VNF descriptor
5410 db_vnfr: VNF instance record
5411 db_nsr: NS instance record
5414 result: (str, str) COMPLETED/FAILED, details
5418 stage
= ["", "", ""]
5419 logging_text
= "Task ns={} update ".format(nsr_id
)
5420 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5421 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5423 # Terminate old VNF resources
5424 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5425 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5427 # old_vnfd_id = db_vnfr["vnfd-id"]
5428 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5429 new_db_vnfd
= db_vnfd
5430 # new_vnfd_ref = new_db_vnfd["id"]
5431 # new_vnfd_id = vnfd_id
5435 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5437 "name": cp
.get("id"),
5438 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5439 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5442 new_vnfr_cp
.append(vnf_cp
)
5443 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5444 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5445 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5446 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5447 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5448 updated_db_vnfr
= self
.db
.get_one(
5449 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5452 # Instantiate new VNF resources
5453 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5454 vca_scaling_info
= []
5455 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5456 scaling_info
["scaling_direction"] = "OUT"
5457 scaling_info
["vdu-create"] = {}
5458 scaling_info
["kdu-create"] = {}
5459 vdud_instantiate_list
= db_vnfd
["vdu"]
5460 for index
, vdud
in enumerate(vdud_instantiate_list
):
5461 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5465 additional_params
= (
5466 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5469 cloud_init_list
= []
5471 # TODO Information of its own ip is not available because db_vnfr is not updated.
5472 additional_params
["OSM"] = get_osm_params(
5473 updated_db_vnfr
, vdud
["id"], 1
5475 cloud_init_list
.append(
5476 self
._parse
_cloud
_init
(
5483 vca_scaling_info
.append(
5485 "osm_vdu_id": vdud
["id"],
5486 "member-vnf-index": member_vnf_index
,
5488 "vdu_index": count_index
,
5491 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5492 if self
.ro_config
.get("ng"):
5494 "New Resources to be deployed: {}".format(scaling_info
))
5495 await self
._scale
_ng
_ro
(
5496 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5498 return "COMPLETED", "Done"
5499 except (LcmException
, asyncio
.CancelledError
):
5501 except Exception as e
:
5502 self
.logger
.debug("Error updating VNF {}".format(e
))
5503 return "FAILED", "Error updating VNF {}".format(e
)
5505 async def _ns_charm_upgrade(
5511 timeout
: float = None,
5513 """This method upgrade charms in VNF instances
5516 ee_id: Execution environment id
5517 path: Local path to the charm
5519 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5520 timeout: (Float) Timeout for the ns update operation
5523 result: (str, str) COMPLETED/FAILED, details
5526 charm_type
= charm_type
or "lxc_proxy_charm"
5527 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5531 charm_type
=charm_type
,
5532 timeout
=timeout
or self
.timeout_ns_update
,
5536 return "COMPLETED", output
5538 except (LcmException
, asyncio
.CancelledError
):
5541 except Exception as e
:
5543 self
.logger
.debug("Error upgrading charm {}".format(path
))
5545 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5547 async def update(self
, nsr_id
, nslcmop_id
):
5548 """Update NS according to different update types
5550 This method performs upgrade of VNF instances then updates the revision
5551 number in VNF record
5554 nsr_id: Network service will be updated
5555 nslcmop_id: ns lcm operation id
5558 It may raise DbException, LcmException, N2VCException, K8sException
5561 # Try to lock HA task here
5562 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5563 if not task_is_locked_by_me
:
5566 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5567 self
.logger
.debug(logging_text
+ "Enter")
5569 # Set the required variables to be filled up later
5571 db_nslcmop_update
= {}
5573 nslcmop_operation_state
= None
5575 error_description_nslcmop
= ""
5577 change_type
= "updated"
5578 detailed_status
= ""
5581 # wait for any previous tasks in process
5582 step
= "Waiting for previous operations to terminate"
5583 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5584 self
._write
_ns
_status
(
5587 current_operation
="UPDATING",
5588 current_operation_id
=nslcmop_id
,
5591 step
= "Getting nslcmop from database"
5592 db_nslcmop
= self
.db
.get_one(
5593 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5595 update_type
= db_nslcmop
["operationParams"]["updateType"]
5597 step
= "Getting nsr from database"
5598 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5599 old_operational_status
= db_nsr
["operational-status"]
5600 db_nsr_update
["operational-status"] = "updating"
5601 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5602 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5604 if update_type
== "CHANGE_VNFPKG":
5606 # Get the input parameters given through update request
5607 vnf_instance_id
= db_nslcmop
["operationParams"][
5608 "changeVnfPackageData"
5609 ].get("vnfInstanceId")
5611 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5614 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5616 step
= "Getting vnfr from database"
5617 db_vnfr
= self
.db
.get_one(
5618 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5621 step
= "Getting vnfds from database"
5623 latest_vnfd
= self
.db
.get_one(
5624 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5626 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5629 current_vnf_revision
= db_vnfr
.get("revision", 1)
5630 current_vnfd
= self
.db
.get_one(
5632 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5633 fail_on_empty
=False,
5635 # Charm artifact paths will be filled up later
5637 current_charm_artifact_path
,
5638 target_charm_artifact_path
,
5639 charm_artifact_paths
,
5642 step
= "Checking if revision has changed in VNFD"
5643 if current_vnf_revision
!= latest_vnfd_revision
:
5645 change_type
= "policy_updated"
5647 # There is new revision of VNFD, update operation is required
5648 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5649 latest_vnfd_path
= vnfd_id
5651 step
= "Removing the VNFD packages if they exist in the local path"
5652 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5653 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5655 step
= "Get the VNFD packages from FSMongo"
5656 self
.fs
.sync(from_path
=latest_vnfd_path
)
5657 self
.fs
.sync(from_path
=current_vnfd_path
)
5660 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5662 base_folder
= latest_vnfd
["_admin"]["storage"]
5664 for charm_index
, charm_deployed
in enumerate(
5665 get_iterable(nsr_deployed
, "VCA")
5667 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5669 # Getting charm-id and charm-type
5670 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5671 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5672 charm_type
= charm_deployed
.get("type")
5675 ee_id
= charm_deployed
.get("ee_id")
5677 step
= "Getting descriptor config"
5678 descriptor_config
= get_configuration(
5679 current_vnfd
, current_vnfd
["id"]
5682 if "execution-environment-list" in descriptor_config
:
5683 ee_list
= descriptor_config
.get(
5684 "execution-environment-list", []
5689 # There could be several charm used in the same VNF
5690 for ee_item
in ee_list
:
5691 if ee_item
.get("juju"):
5693 step
= "Getting charm name"
5694 charm_name
= ee_item
["juju"].get("charm")
5696 step
= "Setting Charm artifact paths"
5697 current_charm_artifact_path
.append(
5698 get_charm_artifact_path(
5702 current_vnf_revision
,
5705 target_charm_artifact_path
.append(
5706 get_charm_artifact_path(
5713 charm_artifact_paths
= zip(
5714 current_charm_artifact_path
, target_charm_artifact_path
5717 step
= "Checking if software version has changed in VNFD"
5718 if find_software_version(current_vnfd
) != find_software_version(
5722 step
= "Checking if existing VNF has charm"
5723 for current_charm_path
, target_charm_path
in list(
5724 charm_artifact_paths
5726 if current_charm_path
:
5728 "Software version change is not supported as VNF instance {} has charm.".format(
5733 # There is no change in the charm package, then redeploy the VNF
5734 # based on new descriptor
5735 step
= "Redeploying VNF"
5736 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5740 ) = await self
._ns
_redeploy
_vnf
(
5747 if result
== "FAILED":
5748 nslcmop_operation_state
= result
5749 error_description_nslcmop
= detailed_status
5750 db_nslcmop_update
["detailed-status"] = detailed_status
5753 + " step {} Done with result {} {}".format(
5754 step
, nslcmop_operation_state
, detailed_status
5759 step
= "Checking if any charm package has changed or not"
5760 for current_charm_path
, target_charm_path
in list(
5761 charm_artifact_paths
5765 and target_charm_path
5766 and self
.check_charm_hash_changed(
5767 current_charm_path
, target_charm_path
5771 step
= "Checking whether VNF uses juju bundle"
5772 if check_juju_bundle_existence(current_vnfd
):
5775 "Charm upgrade is not supported for the instance which"
5776 " uses juju-bundle: {}".format(
5777 check_juju_bundle_existence(current_vnfd
)
5781 step
= "Upgrading Charm"
5785 ) = await self
._ns
_charm
_upgrade
(
5788 charm_type
=charm_type
,
5789 path
=self
.fs
.path
+ target_charm_path
,
5790 timeout
=timeout_seconds
,
5793 if result
== "FAILED":
5794 nslcmop_operation_state
= result
5795 error_description_nslcmop
= detailed_status
5797 db_nslcmop_update
["detailed-status"] = detailed_status
5800 + " step {} Done with result {} {}".format(
5801 step
, nslcmop_operation_state
, detailed_status
5805 step
= "Updating policies"
5806 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5807 result
= "COMPLETED"
5808 detailed_status
= "Done"
5809 db_nslcmop_update
["detailed-status"] = "Done"
5811 # If nslcmop_operation_state is None, so any operation is not failed.
5812 if not nslcmop_operation_state
:
5813 nslcmop_operation_state
= "COMPLETED"
5815 # If update CHANGE_VNFPKG nslcmop_operation is successful
5816 # vnf revision need to be updated
5817 vnfr_update
["revision"] = latest_vnfd_revision
5818 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5822 + " task Done with result {} {}".format(
5823 nslcmop_operation_state
, detailed_status
5826 elif update_type
== "REMOVE_VNF":
5827 # This part is included in https://osm.etsi.org/gerrit/11876
5828 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5829 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5830 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5831 step
= "Removing VNF"
5832 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5833 if result
== "FAILED":
5834 nslcmop_operation_state
= result
5835 error_description_nslcmop
= detailed_status
5836 db_nslcmop_update
["detailed-status"] = detailed_status
5837 change_type
= "vnf_terminated"
5838 if not nslcmop_operation_state
:
5839 nslcmop_operation_state
= "COMPLETED"
5842 + " task Done with result {} {}".format(
5843 nslcmop_operation_state
, detailed_status
5847 # If nslcmop_operation_state is None, so any operation is not failed.
5848 # All operations are executed in overall.
5849 if not nslcmop_operation_state
:
5850 nslcmop_operation_state
= "COMPLETED"
5851 db_nsr_update
["operational-status"] = old_operational_status
5853 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5854 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5856 except asyncio
.CancelledError
:
5858 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5860 exc
= "Operation was cancelled"
5861 except asyncio
.TimeoutError
:
5862 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5864 except Exception as e
:
5865 exc
= traceback
.format_exc()
5866 self
.logger
.critical(
5867 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5876 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5877 nslcmop_operation_state
= "FAILED"
5878 db_nsr_update
["operational-status"] = old_operational_status
5880 self
._write
_ns
_status
(
5882 ns_state
=db_nsr
["nsState"],
5883 current_operation
="IDLE",
5884 current_operation_id
=None,
5885 other_update
=db_nsr_update
,
5888 self
._write
_op
_status
(
5891 error_message
=error_description_nslcmop
,
5892 operation_state
=nslcmop_operation_state
,
5893 other_update
=db_nslcmop_update
,
5896 if nslcmop_operation_state
:
5900 "nslcmop_id": nslcmop_id
,
5901 "operationState": nslcmop_operation_state
,
5903 if change_type
in ("vnf_terminated", "policy_updated"):
5904 msg
.update({"vnf_member_index": member_vnf_index
})
5905 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
5906 except Exception as e
:
5908 logging_text
+ "kafka_write notification Exception {}".format(e
)
5910 self
.logger
.debug(logging_text
+ "Exit")
5911 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
5912 return nslcmop_operation_state
, detailed_status
5914 async def scale(self
, nsr_id
, nslcmop_id
):
5915 # Try to lock HA task here
5916 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5917 if not task_is_locked_by_me
:
5920 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
5921 stage
= ["", "", ""]
5922 tasks_dict_info
= {}
5923 # ^ stage, step, VIM progress
5924 self
.logger
.debug(logging_text
+ "Enter")
5925 # get all needed from database
5927 db_nslcmop_update
= {}
5930 # in case of error, indicates what part of scale was failed to put nsr at error status
5931 scale_process
= None
5932 old_operational_status
= ""
5933 old_config_status
= ""
5936 # wait for any previous tasks in process
5937 step
= "Waiting for previous operations to terminate"
5938 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5939 self
._write
_ns
_status
(
5942 current_operation
="SCALING",
5943 current_operation_id
=nslcmop_id
,
5946 step
= "Getting nslcmop from database"
5948 step
+ " after having waited for previous tasks to be completed"
5950 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5952 step
= "Getting nsr from database"
5953 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5954 old_operational_status
= db_nsr
["operational-status"]
5955 old_config_status
= db_nsr
["config-status"]
5957 step
= "Parsing scaling parameters"
5958 db_nsr_update
["operational-status"] = "scaling"
5959 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5960 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5962 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
5964 ]["member-vnf-index"]
5965 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
5967 ]["scaling-group-descriptor"]
5968 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
5969 # for backward compatibility
5970 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5971 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5972 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5973 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5975 step
= "Getting vnfr from database"
5976 db_vnfr
= self
.db
.get_one(
5977 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5980 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5982 step
= "Getting vnfd from database"
5983 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5985 base_folder
= db_vnfd
["_admin"]["storage"]
5987 step
= "Getting scaling-group-descriptor"
5988 scaling_descriptor
= find_in_list(
5989 get_scaling_aspect(db_vnfd
),
5990 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
5992 if not scaling_descriptor
:
5994 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5995 "at vnfd:scaling-group-descriptor".format(scaling_group
)
5998 step
= "Sending scale order to VIM"
5999 # TODO check if ns is in a proper status
6001 if not db_nsr
["_admin"].get("scaling-group"):
6006 "_admin.scaling-group": [
6007 {"name": scaling_group
, "nb-scale-op": 0}
6011 admin_scale_index
= 0
6013 for admin_scale_index
, admin_scale_info
in enumerate(
6014 db_nsr
["_admin"]["scaling-group"]
6016 if admin_scale_info
["name"] == scaling_group
:
6017 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6019 else: # not found, set index one plus last element and add new entry with the name
6020 admin_scale_index
+= 1
6022 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6025 vca_scaling_info
= []
6026 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6027 if scaling_type
== "SCALE_OUT":
6028 if "aspect-delta-details" not in scaling_descriptor
:
6030 "Aspect delta details not fount in scaling descriptor {}".format(
6031 scaling_descriptor
["name"]
6034 # count if max-instance-count is reached
6035 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6037 scaling_info
["scaling_direction"] = "OUT"
6038 scaling_info
["vdu-create"] = {}
6039 scaling_info
["kdu-create"] = {}
6040 for delta
in deltas
:
6041 for vdu_delta
in delta
.get("vdu-delta", {}):
6042 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6043 # vdu_index also provides the number of instance of the targeted vdu
6044 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6045 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6049 additional_params
= (
6050 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6053 cloud_init_list
= []
6055 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6056 max_instance_count
= 10
6057 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6058 max_instance_count
= vdu_profile
.get(
6059 "max-number-of-instances", 10
6062 default_instance_num
= get_number_of_instances(
6065 instances_number
= vdu_delta
.get("number-of-instances", 1)
6066 nb_scale_op
+= instances_number
6068 new_instance_count
= nb_scale_op
+ default_instance_num
6069 # Control if new count is over max and vdu count is less than max.
6070 # Then assign new instance count
6071 if new_instance_count
> max_instance_count
> vdu_count
:
6072 instances_number
= new_instance_count
- max_instance_count
6074 instances_number
= instances_number
6076 if new_instance_count
> max_instance_count
:
6078 "reached the limit of {} (max-instance-count) "
6079 "scaling-out operations for the "
6080 "scaling-group-descriptor '{}'".format(
6081 nb_scale_op
, scaling_group
6084 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6086 # TODO Information of its own ip is not available because db_vnfr is not updated.
6087 additional_params
["OSM"] = get_osm_params(
6088 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6090 cloud_init_list
.append(
6091 self
._parse
_cloud
_init
(
6098 vca_scaling_info
.append(
6100 "osm_vdu_id": vdu_delta
["id"],
6101 "member-vnf-index": vnf_index
,
6103 "vdu_index": vdu_index
+ x
,
6106 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6107 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6108 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6109 kdu_name
= kdu_profile
["kdu-name"]
6110 resource_name
= kdu_profile
.get("resource-name", "")
6112 # Might have different kdus in the same delta
6113 # Should have list for each kdu
6114 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6115 scaling_info
["kdu-create"][kdu_name
] = []
6117 kdur
= get_kdur(db_vnfr
, kdu_name
)
6118 if kdur
.get("helm-chart"):
6119 k8s_cluster_type
= "helm-chart-v3"
6120 self
.logger
.debug("kdur: {}".format(kdur
))
6122 kdur
.get("helm-version")
6123 and kdur
.get("helm-version") == "v2"
6125 k8s_cluster_type
= "helm-chart"
6126 elif kdur
.get("juju-bundle"):
6127 k8s_cluster_type
= "juju-bundle"
6130 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6131 "juju-bundle. Maybe an old NBI version is running".format(
6132 db_vnfr
["member-vnf-index-ref"], kdu_name
6136 max_instance_count
= 10
6137 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6138 max_instance_count
= kdu_profile
.get(
6139 "max-number-of-instances", 10
6142 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6143 deployed_kdu
, _
= get_deployed_kdu(
6144 nsr_deployed
, kdu_name
, vnf_index
6146 if deployed_kdu
is None:
6148 "KDU '{}' for vnf '{}' not deployed".format(
6152 kdu_instance
= deployed_kdu
.get("kdu-instance")
6153 instance_num
= await self
.k8scluster_map
[
6159 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6160 kdu_model
=deployed_kdu
.get("kdu-model"),
6162 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6163 "number-of-instances", 1
6166 # Control if new count is over max and instance_num is less than max.
6167 # Then assign max instance number to kdu replica count
6168 if kdu_replica_count
> max_instance_count
> instance_num
:
6169 kdu_replica_count
= max_instance_count
6170 if kdu_replica_count
> max_instance_count
:
6172 "reached the limit of {} (max-instance-count) "
6173 "scaling-out operations for the "
6174 "scaling-group-descriptor '{}'".format(
6175 instance_num
, scaling_group
6179 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6180 vca_scaling_info
.append(
6182 "osm_kdu_id": kdu_name
,
6183 "member-vnf-index": vnf_index
,
6185 "kdu_index": instance_num
+ x
- 1,
6188 scaling_info
["kdu-create"][kdu_name
].append(
6190 "member-vnf-index": vnf_index
,
6192 "k8s-cluster-type": k8s_cluster_type
,
6193 "resource-name": resource_name
,
6194 "scale": kdu_replica_count
,
6197 elif scaling_type
== "SCALE_IN":
6198 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6200 scaling_info
["scaling_direction"] = "IN"
6201 scaling_info
["vdu-delete"] = {}
6202 scaling_info
["kdu-delete"] = {}
6204 for delta
in deltas
:
6205 for vdu_delta
in delta
.get("vdu-delta", {}):
6206 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6207 min_instance_count
= 0
6208 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6209 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6210 min_instance_count
= vdu_profile
["min-number-of-instances"]
6212 default_instance_num
= get_number_of_instances(
6213 db_vnfd
, vdu_delta
["id"]
6215 instance_num
= vdu_delta
.get("number-of-instances", 1)
6216 nb_scale_op
-= instance_num
6218 new_instance_count
= nb_scale_op
+ default_instance_num
6220 if new_instance_count
< min_instance_count
< vdu_count
:
6221 instances_number
= min_instance_count
- new_instance_count
6223 instances_number
= instance_num
6225 if new_instance_count
< min_instance_count
:
6227 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6228 "scaling-group-descriptor '{}'".format(
6229 nb_scale_op
, scaling_group
6232 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6233 vca_scaling_info
.append(
6235 "osm_vdu_id": vdu_delta
["id"],
6236 "member-vnf-index": vnf_index
,
6238 "vdu_index": vdu_index
- 1 - x
,
6241 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6242 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6243 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6244 kdu_name
= kdu_profile
["kdu-name"]
6245 resource_name
= kdu_profile
.get("resource-name", "")
6247 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6248 scaling_info
["kdu-delete"][kdu_name
] = []
6250 kdur
= get_kdur(db_vnfr
, kdu_name
)
6251 if kdur
.get("helm-chart"):
6252 k8s_cluster_type
= "helm-chart-v3"
6253 self
.logger
.debug("kdur: {}".format(kdur
))
6255 kdur
.get("helm-version")
6256 and kdur
.get("helm-version") == "v2"
6258 k8s_cluster_type
= "helm-chart"
6259 elif kdur
.get("juju-bundle"):
6260 k8s_cluster_type
= "juju-bundle"
6263 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6264 "juju-bundle. Maybe an old NBI version is running".format(
6265 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6269 min_instance_count
= 0
6270 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6271 min_instance_count
= kdu_profile
["min-number-of-instances"]
6273 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6274 deployed_kdu
, _
= get_deployed_kdu(
6275 nsr_deployed
, kdu_name
, vnf_index
6277 if deployed_kdu
is None:
6279 "KDU '{}' for vnf '{}' not deployed".format(
6283 kdu_instance
= deployed_kdu
.get("kdu-instance")
6284 instance_num
= await self
.k8scluster_map
[
6290 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6291 kdu_model
=deployed_kdu
.get("kdu-model"),
6293 kdu_replica_count
= instance_num
- kdu_delta
.get(
6294 "number-of-instances", 1
6297 if kdu_replica_count
< min_instance_count
< instance_num
:
6298 kdu_replica_count
= min_instance_count
6299 if kdu_replica_count
< min_instance_count
:
6301 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6302 "scaling-group-descriptor '{}'".format(
6303 instance_num
, scaling_group
6307 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6308 vca_scaling_info
.append(
6310 "osm_kdu_id": kdu_name
,
6311 "member-vnf-index": vnf_index
,
6313 "kdu_index": instance_num
- x
- 1,
6316 scaling_info
["kdu-delete"][kdu_name
].append(
6318 "member-vnf-index": vnf_index
,
6320 "k8s-cluster-type": k8s_cluster_type
,
6321 "resource-name": resource_name
,
6322 "scale": kdu_replica_count
,
6326 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6327 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6328 if scaling_info
["scaling_direction"] == "IN":
6329 for vdur
in reversed(db_vnfr
["vdur"]):
6330 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6331 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6332 scaling_info
["vdu"].append(
6334 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6335 "vdu_id": vdur
["vdu-id-ref"],
6339 for interface
in vdur
["interfaces"]:
6340 scaling_info
["vdu"][-1]["interface"].append(
6342 "name": interface
["name"],
6343 "ip_address": interface
["ip-address"],
6344 "mac_address": interface
.get("mac-address"),
6347 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6350 step
= "Executing pre-scale vnf-config-primitive"
6351 if scaling_descriptor
.get("scaling-config-action"):
6352 for scaling_config_action
in scaling_descriptor
[
6353 "scaling-config-action"
6356 scaling_config_action
.get("trigger") == "pre-scale-in"
6357 and scaling_type
== "SCALE_IN"
6359 scaling_config_action
.get("trigger") == "pre-scale-out"
6360 and scaling_type
== "SCALE_OUT"
6362 vnf_config_primitive
= scaling_config_action
[
6363 "vnf-config-primitive-name-ref"
6365 step
= db_nslcmop_update
[
6367 ] = "executing pre-scale scaling-config-action '{}'".format(
6368 vnf_config_primitive
6371 # look for primitive
6372 for config_primitive
in (
6373 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6374 ).get("config-primitive", ()):
6375 if config_primitive
["name"] == vnf_config_primitive
:
6379 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6380 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6381 "primitive".format(scaling_group
, vnf_config_primitive
)
6384 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6385 if db_vnfr
.get("additionalParamsForVnf"):
6386 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6388 scale_process
= "VCA"
6389 db_nsr_update
["config-status"] = "configuring pre-scaling"
6390 primitive_params
= self
._map
_primitive
_params
(
6391 config_primitive
, {}, vnfr_params
6394 # Pre-scale retry check: Check if this sub-operation has been executed before
6395 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6398 vnf_config_primitive
,
6402 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6403 # Skip sub-operation
6404 result
= "COMPLETED"
6405 result_detail
= "Done"
6408 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6409 vnf_config_primitive
, result
, result_detail
6413 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6414 # New sub-operation: Get index of this sub-operation
6416 len(db_nslcmop
.get("_admin", {}).get("operations"))
6421 + "vnf_config_primitive={} New sub-operation".format(
6422 vnf_config_primitive
6426 # retry: Get registered params for this existing sub-operation
6427 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6430 vnf_index
= op
.get("member_vnf_index")
6431 vnf_config_primitive
= op
.get("primitive")
6432 primitive_params
= op
.get("primitive_params")
6435 + "vnf_config_primitive={} Sub-operation retry".format(
6436 vnf_config_primitive
6439 # Execute the primitive, either with new (first-time) or registered (reintent) args
6440 ee_descriptor_id
= config_primitive
.get(
6441 "execution-environment-ref"
6443 primitive_name
= config_primitive
.get(
6444 "execution-environment-primitive", vnf_config_primitive
6446 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6447 nsr_deployed
["VCA"],
6448 member_vnf_index
=vnf_index
,
6450 vdu_count_index
=None,
6451 ee_descriptor_id
=ee_descriptor_id
,
6453 result
, result_detail
= await self
._ns
_execute
_primitive
(
6462 + "vnf_config_primitive={} Done with result {} {}".format(
6463 vnf_config_primitive
, result
, result_detail
6466 # Update operationState = COMPLETED | FAILED
6467 self
._update
_suboperation
_status
(
6468 db_nslcmop
, op_index
, result
, result_detail
6471 if result
== "FAILED":
6472 raise LcmException(result_detail
)
6473 db_nsr_update
["config-status"] = old_config_status
6474 scale_process
= None
6478 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6481 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6484 # SCALE-IN VCA - BEGIN
6485 if vca_scaling_info
:
6486 step
= db_nslcmop_update
[
6488 ] = "Deleting the execution environments"
6489 scale_process
= "VCA"
6490 for vca_info
in vca_scaling_info
:
6491 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6492 member_vnf_index
= str(vca_info
["member-vnf-index"])
6494 logging_text
+ "vdu info: {}".format(vca_info
)
6496 if vca_info
.get("osm_vdu_id"):
6497 vdu_id
= vca_info
["osm_vdu_id"]
6498 vdu_index
= int(vca_info
["vdu_index"])
6501 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6502 member_vnf_index
, vdu_id
, vdu_index
6504 stage
[2] = step
= "Scaling in VCA"
6505 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6506 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6507 config_update
= db_nsr
["configurationStatus"]
6508 for vca_index
, vca
in enumerate(vca_update
):
6510 (vca
or vca
.get("ee_id"))
6511 and vca
["member-vnf-index"] == member_vnf_index
6512 and vca
["vdu_count_index"] == vdu_index
6514 if vca
.get("vdu_id"):
6515 config_descriptor
= get_configuration(
6516 db_vnfd
, vca
.get("vdu_id")
6518 elif vca
.get("kdu_name"):
6519 config_descriptor
= get_configuration(
6520 db_vnfd
, vca
.get("kdu_name")
6523 config_descriptor
= get_configuration(
6524 db_vnfd
, db_vnfd
["id"]
6526 operation_params
= (
6527 db_nslcmop
.get("operationParams") or {}
6529 exec_terminate_primitives
= not operation_params
.get(
6530 "skip_terminate_primitives"
6531 ) and vca
.get("needed_terminate")
6532 task
= asyncio
.ensure_future(
6541 exec_primitives
=exec_terminate_primitives
,
6545 timeout
=self
.timeout_charm_delete
,
6548 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6551 del vca_update
[vca_index
]
6552 del config_update
[vca_index
]
6553 # wait for pending tasks of terminate primitives
6557 + "Waiting for tasks {}".format(
6558 list(tasks_dict_info
.keys())
6561 error_list
= await self
._wait
_for
_tasks
(
6565 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6570 tasks_dict_info
.clear()
6572 raise LcmException("; ".join(error_list
))
6574 db_vca_and_config_update
= {
6575 "_admin.deployed.VCA": vca_update
,
6576 "configurationStatus": config_update
,
6579 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6581 scale_process
= None
6582 # SCALE-IN VCA - END
6585 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6586 scale_process
= "RO"
6587 if self
.ro_config
.get("ng"):
6588 await self
._scale
_ng
_ro
(
6589 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6591 scaling_info
.pop("vdu-create", None)
6592 scaling_info
.pop("vdu-delete", None)
6594 scale_process
= None
6598 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6599 scale_process
= "KDU"
6600 await self
._scale
_kdu
(
6601 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6603 scaling_info
.pop("kdu-create", None)
6604 scaling_info
.pop("kdu-delete", None)
6606 scale_process
= None
6610 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6612 # SCALE-UP VCA - BEGIN
6613 if vca_scaling_info
:
6614 step
= db_nslcmop_update
[
6616 ] = "Creating new execution environments"
6617 scale_process
= "VCA"
6618 for vca_info
in vca_scaling_info
:
6619 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6620 member_vnf_index
= str(vca_info
["member-vnf-index"])
6622 logging_text
+ "vdu info: {}".format(vca_info
)
6624 vnfd_id
= db_vnfr
["vnfd-ref"]
6625 if vca_info
.get("osm_vdu_id"):
6626 vdu_index
= int(vca_info
["vdu_index"])
6627 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6628 if db_vnfr
.get("additionalParamsForVnf"):
6629 deploy_params
.update(
6631 db_vnfr
["additionalParamsForVnf"].copy()
6634 descriptor_config
= get_configuration(
6635 db_vnfd
, db_vnfd
["id"]
6637 if descriptor_config
:
6642 logging_text
=logging_text
6643 + "member_vnf_index={} ".format(member_vnf_index
),
6646 nslcmop_id
=nslcmop_id
,
6652 member_vnf_index
=member_vnf_index
,
6653 vdu_index
=vdu_index
,
6655 deploy_params
=deploy_params
,
6656 descriptor_config
=descriptor_config
,
6657 base_folder
=base_folder
,
6658 task_instantiation_info
=tasks_dict_info
,
6661 vdu_id
= vca_info
["osm_vdu_id"]
6662 vdur
= find_in_list(
6663 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6665 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6666 if vdur
.get("additionalParams"):
6667 deploy_params_vdu
= parse_yaml_strings(
6668 vdur
["additionalParams"]
6671 deploy_params_vdu
= deploy_params
6672 deploy_params_vdu
["OSM"] = get_osm_params(
6673 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6675 if descriptor_config
:
6680 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6681 member_vnf_index
, vdu_id
, vdu_index
6683 stage
[2] = step
= "Scaling out VCA"
6684 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6686 logging_text
=logging_text
6687 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6688 member_vnf_index
, vdu_id
, vdu_index
6692 nslcmop_id
=nslcmop_id
,
6698 member_vnf_index
=member_vnf_index
,
6699 vdu_index
=vdu_index
,
6701 deploy_params
=deploy_params_vdu
,
6702 descriptor_config
=descriptor_config
,
6703 base_folder
=base_folder
,
6704 task_instantiation_info
=tasks_dict_info
,
6707 # SCALE-UP VCA - END
6708 scale_process
= None
6711 # execute primitive service POST-SCALING
6712 step
= "Executing post-scale vnf-config-primitive"
6713 if scaling_descriptor
.get("scaling-config-action"):
6714 for scaling_config_action
in scaling_descriptor
[
6715 "scaling-config-action"
6718 scaling_config_action
.get("trigger") == "post-scale-in"
6719 and scaling_type
== "SCALE_IN"
6721 scaling_config_action
.get("trigger") == "post-scale-out"
6722 and scaling_type
== "SCALE_OUT"
6724 vnf_config_primitive
= scaling_config_action
[
6725 "vnf-config-primitive-name-ref"
6727 step
= db_nslcmop_update
[
6729 ] = "executing post-scale scaling-config-action '{}'".format(
6730 vnf_config_primitive
6733 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6734 if db_vnfr
.get("additionalParamsForVnf"):
6735 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6737 # look for primitive
6738 for config_primitive
in (
6739 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6740 ).get("config-primitive", ()):
6741 if config_primitive
["name"] == vnf_config_primitive
:
6745 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6746 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6747 "config-primitive".format(
6748 scaling_group
, vnf_config_primitive
6751 scale_process
= "VCA"
6752 db_nsr_update
["config-status"] = "configuring post-scaling"
6753 primitive_params
= self
._map
_primitive
_params
(
6754 config_primitive
, {}, vnfr_params
6757 # Post-scale retry check: Check if this sub-operation has been executed before
6758 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6761 vnf_config_primitive
,
6765 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6766 # Skip sub-operation
6767 result
= "COMPLETED"
6768 result_detail
= "Done"
6771 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6772 vnf_config_primitive
, result
, result_detail
6776 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6777 # New sub-operation: Get index of this sub-operation
6779 len(db_nslcmop
.get("_admin", {}).get("operations"))
6784 + "vnf_config_primitive={} New sub-operation".format(
6785 vnf_config_primitive
6789 # retry: Get registered params for this existing sub-operation
6790 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6793 vnf_index
= op
.get("member_vnf_index")
6794 vnf_config_primitive
= op
.get("primitive")
6795 primitive_params
= op
.get("primitive_params")
6798 + "vnf_config_primitive={} Sub-operation retry".format(
6799 vnf_config_primitive
6802 # Execute the primitive, either with new (first-time) or registered (reintent) args
6803 ee_descriptor_id
= config_primitive
.get(
6804 "execution-environment-ref"
6806 primitive_name
= config_primitive
.get(
6807 "execution-environment-primitive", vnf_config_primitive
6809 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6810 nsr_deployed
["VCA"],
6811 member_vnf_index
=vnf_index
,
6813 vdu_count_index
=None,
6814 ee_descriptor_id
=ee_descriptor_id
,
6816 result
, result_detail
= await self
._ns
_execute
_primitive
(
6825 + "vnf_config_primitive={} Done with result {} {}".format(
6826 vnf_config_primitive
, result
, result_detail
6829 # Update operationState = COMPLETED | FAILED
6830 self
._update
_suboperation
_status
(
6831 db_nslcmop
, op_index
, result
, result_detail
6834 if result
== "FAILED":
6835 raise LcmException(result_detail
)
6836 db_nsr_update
["config-status"] = old_config_status
6837 scale_process
= None
6842 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6843 db_nsr_update
["operational-status"] = (
6845 if old_operational_status
== "failed"
6846 else old_operational_status
6848 db_nsr_update
["config-status"] = old_config_status
6851 ROclient
.ROClientException
,
6856 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6858 except asyncio
.CancelledError
:
6860 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6862 exc
= "Operation was cancelled"
6863 except Exception as e
:
6864 exc
= traceback
.format_exc()
6865 self
.logger
.critical(
6866 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6870 self
._write
_ns
_status
(
6873 current_operation
="IDLE",
6874 current_operation_id
=None,
6877 stage
[1] = "Waiting for instantiate pending tasks."
6878 self
.logger
.debug(logging_text
+ stage
[1])
6879 exc
= await self
._wait
_for
_tasks
(
6882 self
.timeout_ns_deploy
,
6890 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6891 nslcmop_operation_state
= "FAILED"
6893 db_nsr_update
["operational-status"] = old_operational_status
6894 db_nsr_update
["config-status"] = old_config_status
6895 db_nsr_update
["detailed-status"] = ""
6897 if "VCA" in scale_process
:
6898 db_nsr_update
["config-status"] = "failed"
6899 if "RO" in scale_process
:
6900 db_nsr_update
["operational-status"] = "failed"
6903 ] = "FAILED scaling nslcmop={} {}: {}".format(
6904 nslcmop_id
, step
, exc
6907 error_description_nslcmop
= None
6908 nslcmop_operation_state
= "COMPLETED"
6909 db_nslcmop_update
["detailed-status"] = "Done"
6911 self
._write
_op
_status
(
6914 error_message
=error_description_nslcmop
,
6915 operation_state
=nslcmop_operation_state
,
6916 other_update
=db_nslcmop_update
,
6919 self
._write
_ns
_status
(
6922 current_operation
="IDLE",
6923 current_operation_id
=None,
6924 other_update
=db_nsr_update
,
6927 if nslcmop_operation_state
:
6931 "nslcmop_id": nslcmop_id
,
6932 "operationState": nslcmop_operation_state
,
6934 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
6935 except Exception as e
:
6937 logging_text
+ "kafka_write notification Exception {}".format(e
)
6939 self
.logger
.debug(logging_text
+ "Exit")
6940 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
6942 async def _scale_kdu(
6943 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6945 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
6946 for kdu_name
in _scaling_info
:
6947 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
6948 deployed_kdu
, index
= get_deployed_kdu(
6949 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
6951 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
6952 kdu_instance
= deployed_kdu
["kdu-instance"]
6953 kdu_model
= deployed_kdu
.get("kdu-model")
6954 scale
= int(kdu_scaling_info
["scale"])
6955 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
6958 "collection": "nsrs",
6959 "filter": {"_id": nsr_id
},
6960 "path": "_admin.deployed.K8s.{}".format(index
),
6963 step
= "scaling application {}".format(
6964 kdu_scaling_info
["resource-name"]
6966 self
.logger
.debug(logging_text
+ step
)
6968 if kdu_scaling_info
["type"] == "delete":
6969 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
6972 and kdu_config
.get("terminate-config-primitive")
6973 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
6975 terminate_config_primitive_list
= kdu_config
.get(
6976 "terminate-config-primitive"
6978 terminate_config_primitive_list
.sort(
6979 key
=lambda val
: int(val
["seq"])
6983 terminate_config_primitive
6984 ) in terminate_config_primitive_list
:
6985 primitive_params_
= self
._map
_primitive
_params
(
6986 terminate_config_primitive
, {}, {}
6988 step
= "execute terminate config primitive"
6989 self
.logger
.debug(logging_text
+ step
)
6990 await asyncio
.wait_for(
6991 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
6992 cluster_uuid
=cluster_uuid
,
6993 kdu_instance
=kdu_instance
,
6994 primitive_name
=terminate_config_primitive
["name"],
6995 params
=primitive_params_
,
7002 await asyncio
.wait_for(
7003 self
.k8scluster_map
[k8s_cluster_type
].scale(
7006 kdu_scaling_info
["resource-name"],
7008 cluster_uuid
=cluster_uuid
,
7009 kdu_model
=kdu_model
,
7013 timeout
=self
.timeout_vca_on_error
,
7016 if kdu_scaling_info
["type"] == "create":
7017 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7020 and kdu_config
.get("initial-config-primitive")
7021 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7023 initial_config_primitive_list
= kdu_config
.get(
7024 "initial-config-primitive"
7026 initial_config_primitive_list
.sort(
7027 key
=lambda val
: int(val
["seq"])
7030 for initial_config_primitive
in initial_config_primitive_list
:
7031 primitive_params_
= self
._map
_primitive
_params
(
7032 initial_config_primitive
, {}, {}
7034 step
= "execute initial config primitive"
7035 self
.logger
.debug(logging_text
+ step
)
7036 await asyncio
.wait_for(
7037 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7038 cluster_uuid
=cluster_uuid
,
7039 kdu_instance
=kdu_instance
,
7040 primitive_name
=initial_config_primitive
["name"],
7041 params
=primitive_params_
,
7048 async def _scale_ng_ro(
7049 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7051 nsr_id
= db_nslcmop
["nsInstanceId"]
7052 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7055 # read from db: vnfd's for every vnf
7058 # for each vnf in ns, read vnfd
7059 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7060 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7061 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7062 # if we haven't this vnfd, read it from db
7063 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7065 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7066 db_vnfds
.append(vnfd
)
7067 n2vc_key
= self
.n2vc
.get_public_key()
7068 n2vc_key_list
= [n2vc_key
]
7071 vdu_scaling_info
.get("vdu-create"),
7072 vdu_scaling_info
.get("vdu-delete"),
7075 # db_vnfr has been updated, update db_vnfrs to use it
7076 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7077 await self
._instantiate
_ng
_ro
(
7087 start_deploy
=time(),
7088 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7090 if vdu_scaling_info
.get("vdu-delete"):
7092 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7095 async def extract_prometheus_scrape_jobs(
7096 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7098 # look if exist a file called 'prometheus*.j2' and
7099 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7103 for f
in artifact_content
7104 if f
.startswith("prometheus") and f
.endswith(".j2")
7110 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7114 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7115 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7117 vnfr_id
= vnfr_id
.replace("-", "")
7119 "JOB_NAME": vnfr_id
,
7120 "TARGET_IP": target_ip
,
7121 "EXPORTER_POD_IP": host_name
,
7122 "EXPORTER_POD_PORT": host_port
,
7124 job_list
= parse_job(job_data
, variables
)
7125 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7126 for job
in job_list
:
7128 not isinstance(job
.get("job_name"), str)
7129 or vnfr_id
not in job
["job_name"]
7131 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7132 job
["nsr_id"] = nsr_id
7133 job
["vnfr_id"] = vnfr_id
7136 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7138 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7140 :param: vim_account_id: VIM Account ID
7142 :return: (cloud_name, cloud_credential)
7144 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7145 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7147 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7149 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7151 :param: vim_account_id: VIM Account ID
7153 :return: (cloud_name, cloud_credential)
7155 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7156 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7158 async def migrate(self
, nsr_id
, nslcmop_id
):
7160 Migrate VNFs and VDUs instances in a NS
7162 :param: nsr_id: NS Instance ID
7163 :param: nslcmop_id: nslcmop ID of migrate
7166 # Try to lock HA task here
7167 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7168 if not task_is_locked_by_me
:
7170 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7171 self
.logger
.debug(logging_text
+ "Enter")
7172 # get all needed from database
7174 db_nslcmop_update
= {}
7175 nslcmop_operation_state
= None
7179 # in case of error, indicates what part of scale was failed to put nsr at error status
7180 start_deploy
= time()
7183 # wait for any previous tasks in process
7184 step
= "Waiting for previous operations to terminate"
7185 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7187 self
._write
_ns
_status
(
7190 current_operation
="MIGRATING",
7191 current_operation_id
=nslcmop_id
7193 step
= "Getting nslcmop from database"
7194 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
7195 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7196 migrate_params
= db_nslcmop
.get("operationParams")
7199 target
.update(migrate_params
)
7200 desc
= await self
.RO
.migrate(nsr_id
, target
)
7201 self
.logger
.debug("RO return > {}".format(desc
))
7202 action_id
= desc
["action_id"]
7203 await self
._wait
_ng
_ro
(
7204 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
7206 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7207 self
.logger
.error("Exit Exception {}".format(e
))
7209 except asyncio
.CancelledError
:
7210 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7211 exc
= "Operation was cancelled"
7212 except Exception as e
:
7213 exc
= traceback
.format_exc()
7214 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7216 self
._write
_ns
_status
(
7219 current_operation
="IDLE",
7220 current_operation_id
=None,
7225 ] = "FAILED {}: {}".format(step
, exc
)
7226 nslcmop_operation_state
= "FAILED"
7228 nslcmop_operation_state
= "COMPLETED"
7229 db_nslcmop_update
["detailed-status"] = "Done"
7230 db_nsr_update
["detailed-status"] = "Done"
7232 self
._write
_op
_status
(
7236 operation_state
=nslcmop_operation_state
,
7237 other_update
=db_nslcmop_update
,
7239 if nslcmop_operation_state
:
7243 "nslcmop_id": nslcmop_id
,
7244 "operationState": nslcmop_operation_state
,
7246 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7247 except Exception as e
:
7249 logging_text
+ "kafka_write notification Exception {}".format(e
)
7251 self
.logger
.debug(logging_text
+ "Exit")
7252 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")