1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
36 from osm_lcm
import ROclient
37 from osm_lcm
.data_utils
.nsr
import (
40 get_deployed_vca_list
,
43 from osm_lcm
.data_utils
.vca
import (
52 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
53 from osm_lcm
.lcm_utils
import (
60 check_juju_bundle_existence
,
61 get_charm_artifact_path
,
63 from osm_lcm
.data_utils
.nsd
import (
64 get_ns_configuration_relation_list
,
68 from osm_lcm
.data_utils
.vnfd
import (
74 get_ee_sorted_initial_config_primitive_list
,
75 get_ee_sorted_terminate_config_primitive_list
,
77 get_virtual_link_profiles
,
82 get_number_of_instances
,
84 get_kdu_resource_profile
,
85 find_software_version
,
87 from osm_lcm
.data_utils
.list_utils
import find_in_list
88 from osm_lcm
.data_utils
.vnfr
import (
92 get_volumes_from_instantiation_params
,
94 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
95 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
96 from n2vc
.definitions
import RelationEndpoint
97 from n2vc
.k8s_helm_conn
import K8sHelmConnector
98 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
99 from n2vc
.k8s_juju_conn
import K8sJujuConnector
101 from osm_common
.dbbase
import DbException
102 from osm_common
.fsbase
import FsException
104 from osm_lcm
.data_utils
.database
.database
import Database
105 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
107 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
108 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
110 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
111 from osm_lcm
.osm_config
import OsmConfigBuilder
112 from osm_lcm
.prometheus
import parse_job
114 from copy
import copy
, deepcopy
115 from time
import time
116 from uuid
import uuid4
118 from random
import randint
120 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
123 class NsLcm(LcmBase
):
124 timeout_vca_on_error
= (
126 ) # Time for charm from first time at blocked,error status to mark as failed
127 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
128 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
129 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
130 timeout_charm_delete
= 10 * 60
131 timeout_primitive
= 30 * 60 # timeout for primitive execution
132 timeout_ns_update
= 30 * 60 # timeout for ns update
133 timeout_progress_primitive
= (
135 ) # timeout for some progress in a primitive execution
136 timeout_migrate
= 1800 # default global timeout for migrating vnfs
137 timeout_operate
= 1800 # default global timeout for migrating vnfs
138 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
139 SUBOPERATION_STATUS_NOT_FOUND
= -1
140 SUBOPERATION_STATUS_NEW
= -2
141 SUBOPERATION_STATUS_SKIP
= -3
142 task_name_deploy_vca
= "Deploying VCA"
144 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
146 Init, Connect to database, filesystem storage, and messaging
147 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
150 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
152 self
.db
= Database().instance
.db
153 self
.fs
= Filesystem().instance
.fs
155 self
.lcm_tasks
= lcm_tasks
156 self
.timeout
= config
["timeout"]
157 self
.ro_config
= config
["ro_config"]
158 self
.ng_ro
= config
["ro_config"].get("ng")
159 self
.vca_config
= config
["VCA"].copy()
161 # create N2VC connector
162 self
.n2vc
= N2VCJujuConnector(
165 on_update_db
=self
._on
_update
_n
2vc
_db
,
170 self
.conn_helm_ee
= LCMHelmConn(
173 vca_config
=self
.vca_config
,
174 on_update_db
=self
._on
_update
_n
2vc
_db
,
177 self
.k8sclusterhelm2
= K8sHelmConnector(
178 kubectl_command
=self
.vca_config
.get("kubectlpath"),
179 helm_command
=self
.vca_config
.get("helmpath"),
186 self
.k8sclusterhelm3
= K8sHelm3Connector(
187 kubectl_command
=self
.vca_config
.get("kubectlpath"),
188 helm_command
=self
.vca_config
.get("helm3path"),
195 self
.k8sclusterjuju
= K8sJujuConnector(
196 kubectl_command
=self
.vca_config
.get("kubectlpath"),
197 juju_command
=self
.vca_config
.get("jujupath"),
200 on_update_db
=self
._on
_update
_k
8s
_db
,
205 self
.k8scluster_map
= {
206 "helm-chart": self
.k8sclusterhelm2
,
207 "helm-chart-v3": self
.k8sclusterhelm3
,
208 "chart": self
.k8sclusterhelm3
,
209 "juju-bundle": self
.k8sclusterjuju
,
210 "juju": self
.k8sclusterjuju
,
214 "lxc_proxy_charm": self
.n2vc
,
215 "native_charm": self
.n2vc
,
216 "k8s_proxy_charm": self
.n2vc
,
217 "helm": self
.conn_helm_ee
,
218 "helm-v3": self
.conn_helm_ee
,
222 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
224 self
.op_status_map
= {
225 "instantiation": self
.RO
.status
,
226 "termination": self
.RO
.status
,
227 "migrate": self
.RO
.status
,
228 "healing": self
.RO
.recreate_status
,
229 "verticalscale": self
.RO
.status
,
230 "start_stop_rebuild": self
.RO
.status
,
234 def increment_ip_mac(ip_mac
, vm_index
=1):
235 if not isinstance(ip_mac
, str):
238 # try with ipv4 look for last dot
239 i
= ip_mac
.rfind(".")
242 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
243 # try with ipv6 or mac look for last colon. Operate in hex
244 i
= ip_mac
.rfind(":")
247 # format in hex, len can be 2 for mac or 4 for ipv6
248 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
249 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
255 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
257 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
260 # TODO filter RO descriptor fields...
264 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
265 db_dict
["deploymentStatus"] = ro_descriptor
266 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
268 except Exception as e
:
270 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
273 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
275 # remove last dot from path (if exists)
276 if path
.endswith("."):
279 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
280 # .format(table, filter, path, updated_data))
283 nsr_id
= filter.get("_id")
285 # read ns record from database
286 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
287 current_ns_status
= nsr
.get("nsState")
289 # get vca status for NS
290 status_dict
= await self
.n2vc
.get_status(
291 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
296 db_dict
["vcaStatus"] = status_dict
298 # update configurationStatus for this VCA
300 vca_index
= int(path
[path
.rfind(".") + 1 :])
303 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
305 vca_status
= vca_list
[vca_index
].get("status")
307 configuration_status_list
= nsr
.get("configurationStatus")
308 config_status
= configuration_status_list
[vca_index
].get("status")
310 if config_status
== "BROKEN" and vca_status
!= "failed":
311 db_dict
["configurationStatus"][vca_index
] = "READY"
312 elif config_status
!= "BROKEN" and vca_status
== "failed":
313 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
314 except Exception as e
:
315 # not update configurationStatus
316 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
318 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
319 # if nsState = 'DEGRADED' check if all is OK
321 if current_ns_status
in ("READY", "DEGRADED"):
322 error_description
= ""
324 if status_dict
.get("machines"):
325 for machine_id
in status_dict
.get("machines"):
326 machine
= status_dict
.get("machines").get(machine_id
)
327 # check machine agent-status
328 if machine
.get("agent-status"):
329 s
= machine
.get("agent-status").get("status")
332 error_description
+= (
333 "machine {} agent-status={} ; ".format(
337 # check machine instance status
338 if machine
.get("instance-status"):
339 s
= machine
.get("instance-status").get("status")
342 error_description
+= (
343 "machine {} instance-status={} ; ".format(
348 if status_dict
.get("applications"):
349 for app_id
in status_dict
.get("applications"):
350 app
= status_dict
.get("applications").get(app_id
)
351 # check application status
352 if app
.get("status"):
353 s
= app
.get("status").get("status")
356 error_description
+= (
357 "application {} status={} ; ".format(app_id
, s
)
360 if error_description
:
361 db_dict
["errorDescription"] = error_description
362 if current_ns_status
== "READY" and is_degraded
:
363 db_dict
["nsState"] = "DEGRADED"
364 if current_ns_status
== "DEGRADED" and not is_degraded
:
365 db_dict
["nsState"] = "READY"
368 self
.update_db_2("nsrs", nsr_id
, db_dict
)
370 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
372 except Exception as e
:
373 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
375 async def _on_update_k8s_db(
376 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
379 Updating vca status in NSR record
380 :param cluster_uuid: UUID of a k8s cluster
381 :param kdu_instance: The unique name of the KDU instance
382 :param filter: To get nsr_id
383 :cluster_type: The cluster type (juju, k8s)
387 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
388 # .format(cluster_uuid, kdu_instance, filter))
390 nsr_id
= filter.get("_id")
392 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
393 cluster_uuid
=cluster_uuid
,
394 kdu_instance
=kdu_instance
,
396 complete_status
=True,
402 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
405 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
409 self
.update_db_2("nsrs", nsr_id
, db_dict
)
410 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
412 except Exception as e
:
413 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
416 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
419 undefined
=StrictUndefined
,
420 autoescape
=select_autoescape(default_for_string
=True, default
=True),
422 template
= env
.from_string(cloud_init_text
)
423 return template
.render(additional_params
or {})
424 except UndefinedError
as e
:
426 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
427 "file, must be provided in the instantiation parameters inside the "
428 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
430 except (TemplateError
, TemplateNotFound
) as e
:
432 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
437 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
438 cloud_init_content
= cloud_init_file
= None
440 if vdu
.get("cloud-init-file"):
441 base_folder
= vnfd
["_admin"]["storage"]
442 if base_folder
["pkg-dir"]:
443 cloud_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"],
449 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
450 base_folder
["folder"],
451 vdu
["cloud-init-file"],
453 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
454 cloud_init_content
= ci_file
.read()
455 elif vdu
.get("cloud-init"):
456 cloud_init_content
= vdu
["cloud-init"]
458 return cloud_init_content
459 except FsException
as e
:
461 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
462 vnfd
["id"], vdu
["id"], cloud_init_file
, e
466 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
468 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
470 additional_params
= vdur
.get("additionalParams")
471 return parse_yaml_strings(additional_params
)
473 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
475 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
476 :param vnfd: input vnfd
477 :param new_id: overrides vnf id if provided
478 :param additionalParams: Instantiation params for VNFs provided
479 :param nsrId: Id of the NSR
480 :return: copy of vnfd
482 vnfd_RO
= deepcopy(vnfd
)
483 # remove unused by RO configuration, monitoring, scaling and internal keys
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
486 vnfd_RO
.pop("monitoring-param", None)
487 vnfd_RO
.pop("scaling-group-descriptor", None)
488 vnfd_RO
.pop("kdu", None)
489 vnfd_RO
.pop("k8s-cluster", None)
491 vnfd_RO
["id"] = new_id
493 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
494 for vdu
in get_iterable(vnfd_RO
, "vdu"):
495 vdu
.pop("cloud-init-file", None)
496 vdu
.pop("cloud-init", None)
500 def ip_profile_2_RO(ip_profile
):
501 RO_ip_profile
= deepcopy(ip_profile
)
502 if "dns-server" in RO_ip_profile
:
503 if isinstance(RO_ip_profile
["dns-server"], list):
504 RO_ip_profile
["dns-address"] = []
505 for ds
in RO_ip_profile
.pop("dns-server"):
506 RO_ip_profile
["dns-address"].append(ds
["address"])
508 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
509 if RO_ip_profile
.get("ip-version") == "ipv4":
510 RO_ip_profile
["ip-version"] = "IPv4"
511 if RO_ip_profile
.get("ip-version") == "ipv6":
512 RO_ip_profile
["ip-version"] = "IPv6"
513 if "dhcp-params" in RO_ip_profile
:
514 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
517 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
518 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
519 if db_vim
["_admin"]["operationalState"] != "ENABLED":
521 "VIM={} is not available. operationalState={}".format(
522 vim_account
, db_vim
["_admin"]["operationalState"]
525 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
528 def get_ro_wim_id_for_wim_account(self
, wim_account
):
529 if isinstance(wim_account
, str):
530 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
531 if db_wim
["_admin"]["operationalState"] != "ENABLED":
533 "WIM={} is not available. operationalState={}".format(
534 wim_account
, db_wim
["_admin"]["operationalState"]
537 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
542 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
544 db_vdu_push_list
= []
546 db_update
= {"_admin.modified": time()}
548 for vdu_id
, vdu_count
in vdu_create
.items():
552 for vdur
in reversed(db_vnfr
["vdur"])
553 if vdur
["vdu-id-ref"] == vdu_id
558 # Read the template saved in the db:
560 "No vdur in the database. Using the vdur-template to scale"
562 vdur_template
= db_vnfr
.get("vdur-template")
563 if not vdur_template
:
565 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
569 vdur
= vdur_template
[0]
570 # Delete a template from the database after using it
573 {"_id": db_vnfr
["_id"]},
575 pull
={"vdur-template": {"_id": vdur
["_id"]}},
577 for count
in range(vdu_count
):
578 vdur_copy
= deepcopy(vdur
)
579 vdur_copy
["status"] = "BUILD"
580 vdur_copy
["status-detailed"] = None
581 vdur_copy
["ip-address"] = None
582 vdur_copy
["_id"] = str(uuid4())
583 vdur_copy
["count-index"] += count
+ 1
584 vdur_copy
["id"] = "{}-{}".format(
585 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
587 vdur_copy
.pop("vim_info", None)
588 for iface
in vdur_copy
["interfaces"]:
589 if iface
.get("fixed-ip"):
590 iface
["ip-address"] = self
.increment_ip_mac(
591 iface
["ip-address"], count
+ 1
594 iface
.pop("ip-address", None)
595 if iface
.get("fixed-mac"):
596 iface
["mac-address"] = self
.increment_ip_mac(
597 iface
["mac-address"], count
+ 1
600 iface
.pop("mac-address", None)
604 ) # only first vdu can be managment of vnf
605 db_vdu_push_list
.append(vdur_copy
)
606 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
608 if len(db_vnfr
["vdur"]) == 1:
609 # The scale will move to 0 instances
611 "Scaling to 0 !, creating the template with the last vdur"
613 template_vdur
= [db_vnfr
["vdur"][0]]
614 for vdu_id
, vdu_count
in vdu_delete
.items():
616 indexes_to_delete
= [
618 for iv
in enumerate(db_vnfr
["vdur"])
619 if iv
[1]["vdu-id-ref"] == vdu_id
623 "vdur.{}.status".format(i
): "DELETING"
624 for i
in indexes_to_delete
[-vdu_count
:]
628 # it must be deleted one by one because common.db does not allow otherwise
631 for v
in reversed(db_vnfr
["vdur"])
632 if v
["vdu-id-ref"] == vdu_id
634 for vdu
in vdus_to_delete
[:vdu_count
]:
637 {"_id": db_vnfr
["_id"]},
639 pull
={"vdur": {"_id": vdu
["_id"]}},
643 db_push
["vdur"] = db_vdu_push_list
645 db_push
["vdur-template"] = template_vdur
648 db_vnfr
["vdur-template"] = template_vdur
649 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
650 # modify passed dictionary db_vnfr
651 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
652 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
654 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
656 Updates database nsr with the RO info for the created vld
657 :param ns_update_nsr: dictionary to be filled with the updated info
658 :param db_nsr: content of db_nsr. This is also modified
659 :param nsr_desc_RO: nsr descriptor from RO
660 :return: Nothing, LcmException is raised on errors
663 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
664 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
665 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
667 vld
["vim-id"] = net_RO
.get("vim_net_id")
668 vld
["name"] = net_RO
.get("vim_name")
669 vld
["status"] = net_RO
.get("status")
670 vld
["status-detailed"] = net_RO
.get("error_msg")
671 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
678 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
680 for db_vnfr
in db_vnfrs
.values():
681 vnfr_update
= {"status": "ERROR"}
682 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
683 if "status" not in vdur
:
684 vdur
["status"] = "ERROR"
685 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
687 vdur
["status-detailed"] = str(error_text
)
689 "vdur.{}.status-detailed".format(vdu_index
)
691 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
692 except DbException
as e
:
693 self
.logger
.error("Cannot update vnf. {}".format(e
))
695 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
697 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
698 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
699 :param nsr_desc_RO: nsr descriptor from RO
700 :return: Nothing, LcmException is raised on errors
702 for vnf_index
, db_vnfr
in db_vnfrs
.items():
703 for vnf_RO
in nsr_desc_RO
["vnfs"]:
704 if vnf_RO
["member_vnf_index"] != vnf_index
:
707 if vnf_RO
.get("ip_address"):
708 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
711 elif not db_vnfr
.get("ip-address"):
712 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
713 raise LcmExceptionNoMgmtIP(
714 "ns member_vnf_index '{}' has no IP address".format(
719 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
720 vdur_RO_count_index
= 0
721 if vdur
.get("pdu-type"):
723 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
724 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
726 if vdur
["count-index"] != vdur_RO_count_index
:
727 vdur_RO_count_index
+= 1
729 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
730 if vdur_RO
.get("ip_address"):
731 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
733 vdur
["ip-address"] = None
734 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
735 vdur
["name"] = vdur_RO
.get("vim_name")
736 vdur
["status"] = vdur_RO
.get("status")
737 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
738 for ifacer
in get_iterable(vdur
, "interfaces"):
739 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
740 if ifacer
["name"] == interface_RO
.get("internal_name"):
741 ifacer
["ip-address"] = interface_RO
.get(
744 ifacer
["mac-address"] = interface_RO
.get(
750 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
751 "from VIM info".format(
752 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
755 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
759 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
761 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
765 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
766 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
767 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
769 vld
["vim-id"] = net_RO
.get("vim_net_id")
770 vld
["name"] = net_RO
.get("vim_name")
771 vld
["status"] = net_RO
.get("status")
772 vld
["status-detailed"] = net_RO
.get("error_msg")
773 vnfr_update
["vld.{}".format(vld_index
)] = vld
777 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
782 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
787 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
792 def _get_ns_config_info(self
, nsr_id
):
794 Generates a mapping between vnf,vdu elements and the N2VC id
795 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
796 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
797 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
798 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
800 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
801 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
803 ns_config_info
= {"osm-config-mapping": mapping
}
804 for vca
in vca_deployed_list
:
805 if not vca
["member-vnf-index"]:
807 if not vca
["vdu_id"]:
808 mapping
[vca
["member-vnf-index"]] = vca
["application"]
812 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
814 ] = vca
["application"]
815 return ns_config_info
817 async def _instantiate_ng_ro(
834 def get_vim_account(vim_account_id
):
836 if vim_account_id
in db_vims
:
837 return db_vims
[vim_account_id
]
838 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
839 db_vims
[vim_account_id
] = db_vim
842 # modify target_vld info with instantiation parameters
843 def parse_vld_instantiation_params(
844 target_vim
, target_vld
, vld_params
, target_sdn
846 if vld_params
.get("ip-profile"):
847 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
850 if vld_params
.get("provider-network"):
851 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
854 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
855 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
858 if vld_params
.get("wimAccountId"):
859 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
860 target_vld
["vim_info"][target_wim
] = {}
861 for param
in ("vim-network-name", "vim-network-id"):
862 if vld_params
.get(param
):
863 if isinstance(vld_params
[param
], dict):
864 for vim
, vim_net
in vld_params
[param
].items():
865 other_target_vim
= "vim:" + vim
867 target_vld
["vim_info"],
868 (other_target_vim
, param
.replace("-", "_")),
871 else: # isinstance str
872 target_vld
["vim_info"][target_vim
][
873 param
.replace("-", "_")
874 ] = vld_params
[param
]
875 if vld_params
.get("common_id"):
876 target_vld
["common_id"] = vld_params
.get("common_id")
878 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
879 def update_ns_vld_target(target
, ns_params
):
880 for vnf_params
in ns_params
.get("vnf", ()):
881 if vnf_params
.get("vimAccountId"):
885 for vnfr
in db_vnfrs
.values()
886 if vnf_params
["member-vnf-index"]
887 == vnfr
["member-vnf-index-ref"]
891 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
892 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
893 target_vld
= find_in_list(
894 get_iterable(vdur
, "interfaces"),
895 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
898 vld_params
= find_in_list(
899 get_iterable(ns_params
, "vld"),
900 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
904 if vnf_params
.get("vimAccountId") not in a_vld
.get(
907 target_vim_network_list
= [
908 v
for _
, v
in a_vld
.get("vim_info").items()
910 target_vim_network_name
= next(
912 item
.get("vim_network_name", "")
913 for item
in target_vim_network_list
918 target
["ns"]["vld"][a_index
].get("vim_info").update(
920 "vim:{}".format(vnf_params
["vimAccountId"]): {
921 "vim_network_name": target_vim_network_name
,
927 for param
in ("vim-network-name", "vim-network-id"):
928 if vld_params
.get(param
) and isinstance(
929 vld_params
[param
], dict
931 for vim
, vim_net
in vld_params
[
934 other_target_vim
= "vim:" + vim
936 target
["ns"]["vld"][a_index
].get(
941 param
.replace("-", "_"),
946 nslcmop_id
= db_nslcmop
["_id"]
948 "name": db_nsr
["name"],
951 "image": deepcopy(db_nsr
["image"]),
952 "flavor": deepcopy(db_nsr
["flavor"]),
953 "action_id": nslcmop_id
,
954 "cloud_init_content": {},
956 for image
in target
["image"]:
957 image
["vim_info"] = {}
958 for flavor
in target
["flavor"]:
959 flavor
["vim_info"] = {}
960 if db_nsr
.get("affinity-or-anti-affinity-group"):
961 target
["affinity-or-anti-affinity-group"] = deepcopy(
962 db_nsr
["affinity-or-anti-affinity-group"]
964 for affinity_or_anti_affinity_group
in target
[
965 "affinity-or-anti-affinity-group"
967 affinity_or_anti_affinity_group
["vim_info"] = {}
969 if db_nslcmop
.get("lcmOperationType") != "instantiate":
970 # get parameters of instantiation:
971 db_nslcmop_instantiate
= self
.db
.get_list(
974 "nsInstanceId": db_nslcmop
["nsInstanceId"],
975 "lcmOperationType": "instantiate",
978 ns_params
= db_nslcmop_instantiate
.get("operationParams")
980 ns_params
= db_nslcmop
.get("operationParams")
981 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
982 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
985 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
986 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
990 "mgmt-network": vld
.get("mgmt-network", False),
991 "type": vld
.get("type"),
994 "vim_network_name": vld
.get("vim-network-name"),
995 "vim_account_id": ns_params
["vimAccountId"],
999 # check if this network needs SDN assist
1000 if vld
.get("pci-interfaces"):
1001 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1002 sdnc_id
= db_vim
["config"].get("sdn-controller")
1004 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1005 target_sdn
= "sdn:{}".format(sdnc_id
)
1006 target_vld
["vim_info"][target_sdn
] = {
1008 "target_vim": target_vim
,
1010 "type": vld
.get("type"),
1013 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1014 for nsd_vnf_profile
in nsd_vnf_profiles
:
1015 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1016 if cp
["virtual-link-profile-id"] == vld
["id"]:
1018 "member_vnf:{}.{}".format(
1019 cp
["constituent-cpd-id"][0][
1020 "constituent-base-element-id"
1022 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1024 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1026 # check at nsd descriptor, if there is an ip-profile
1028 nsd_vlp
= find_in_list(
1029 get_virtual_link_profiles(nsd
),
1030 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1035 and nsd_vlp
.get("virtual-link-protocol-data")
1036 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1038 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1041 ip_profile_dest_data
= {}
1042 if "ip-version" in ip_profile_source_data
:
1043 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1046 if "cidr" in ip_profile_source_data
:
1047 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1050 if "gateway-ip" in ip_profile_source_data
:
1051 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1054 if "dhcp-enabled" in ip_profile_source_data
:
1055 ip_profile_dest_data
["dhcp-params"] = {
1056 "enabled": ip_profile_source_data
["dhcp-enabled"]
1058 vld_params
["ip-profile"] = ip_profile_dest_data
1060 # update vld_params with instantiation params
1061 vld_instantiation_params
= find_in_list(
1062 get_iterable(ns_params
, "vld"),
1063 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1065 if vld_instantiation_params
:
1066 vld_params
.update(vld_instantiation_params
)
1067 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1068 target
["ns"]["vld"].append(target_vld
)
1069 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1070 update_ns_vld_target(target
, ns_params
)
1072 for vnfr
in db_vnfrs
.values():
1073 vnfd
= find_in_list(
1074 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1076 vnf_params
= find_in_list(
1077 get_iterable(ns_params
, "vnf"),
1078 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1080 target_vnf
= deepcopy(vnfr
)
1081 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1082 for vld
in target_vnf
.get("vld", ()):
1083 # check if connected to a ns.vld, to fill target'
1084 vnf_cp
= find_in_list(
1085 vnfd
.get("int-virtual-link-desc", ()),
1086 lambda cpd
: cpd
.get("id") == vld
["id"],
1089 ns_cp
= "member_vnf:{}.{}".format(
1090 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1092 if cp2target
.get(ns_cp
):
1093 vld
["target"] = cp2target
[ns_cp
]
1096 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1098 # check if this network needs SDN assist
1100 if vld
.get("pci-interfaces"):
1101 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1102 sdnc_id
= db_vim
["config"].get("sdn-controller")
1104 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1105 target_sdn
= "sdn:{}".format(sdnc_id
)
1106 vld
["vim_info"][target_sdn
] = {
1108 "target_vim": target_vim
,
1110 "type": vld
.get("type"),
1113 # check at vnfd descriptor, if there is an ip-profile
1115 vnfd_vlp
= find_in_list(
1116 get_virtual_link_profiles(vnfd
),
1117 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1121 and vnfd_vlp
.get("virtual-link-protocol-data")
1122 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1124 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1127 ip_profile_dest_data
= {}
1128 if "ip-version" in ip_profile_source_data
:
1129 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1132 if "cidr" in ip_profile_source_data
:
1133 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1136 if "gateway-ip" in ip_profile_source_data
:
1137 ip_profile_dest_data
[
1139 ] = ip_profile_source_data
["gateway-ip"]
1140 if "dhcp-enabled" in ip_profile_source_data
:
1141 ip_profile_dest_data
["dhcp-params"] = {
1142 "enabled": ip_profile_source_data
["dhcp-enabled"]
1145 vld_params
["ip-profile"] = ip_profile_dest_data
1146 # update vld_params with instantiation params
1148 vld_instantiation_params
= find_in_list(
1149 get_iterable(vnf_params
, "internal-vld"),
1150 lambda i_vld
: i_vld
["name"] == vld
["id"],
1152 if vld_instantiation_params
:
1153 vld_params
.update(vld_instantiation_params
)
1154 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1157 for vdur
in target_vnf
.get("vdur", ()):
1158 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1159 continue # This vdu must not be created
1160 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1162 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1165 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1166 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1169 and vdu_configuration
.get("config-access")
1170 and vdu_configuration
.get("config-access").get("ssh-access")
1172 vdur
["ssh-keys"] = ssh_keys_all
1173 vdur
["ssh-access-required"] = vdu_configuration
[
1175 ]["ssh-access"]["required"]
1178 and vnf_configuration
.get("config-access")
1179 and vnf_configuration
.get("config-access").get("ssh-access")
1180 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1182 vdur
["ssh-keys"] = ssh_keys_all
1183 vdur
["ssh-access-required"] = vnf_configuration
[
1185 ]["ssh-access"]["required"]
1186 elif ssh_keys_instantiation
and find_in_list(
1187 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1189 vdur
["ssh-keys"] = ssh_keys_instantiation
1191 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1193 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1195 if vdud
.get("cloud-init-file"):
1196 vdur
["cloud-init"] = "{}:file:{}".format(
1197 vnfd
["_id"], vdud
.get("cloud-init-file")
1199 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1200 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1201 base_folder
= vnfd
["_admin"]["storage"]
1202 if base_folder
["pkg-dir"]:
1203 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1204 base_folder
["folder"],
1205 base_folder
["pkg-dir"],
1206 vdud
.get("cloud-init-file"),
1209 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1210 base_folder
["folder"],
1211 vdud
.get("cloud-init-file"),
1213 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1214 target
["cloud_init_content"][
1217 elif vdud
.get("cloud-init"):
1218 vdur
["cloud-init"] = "{}:vdu:{}".format(
1219 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1221 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1222 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1225 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1226 deploy_params_vdu
= self
._format
_additional
_params
(
1227 vdur
.get("additionalParams") or {}
1229 deploy_params_vdu
["OSM"] = get_osm_params(
1230 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1232 vdur
["additionalParams"] = deploy_params_vdu
1235 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1236 if target_vim
not in ns_flavor
["vim_info"]:
1237 ns_flavor
["vim_info"][target_vim
] = {}
1240 # in case alternative images are provided we must check if they should be applied
1241 # for the vim_type, modify the vim_type taking into account
1242 ns_image_id
= int(vdur
["ns-image-id"])
1243 if vdur
.get("alt-image-ids"):
1244 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1245 vim_type
= db_vim
["vim_type"]
1246 for alt_image_id
in vdur
.get("alt-image-ids"):
1247 ns_alt_image
= target
["image"][int(alt_image_id
)]
1248 if vim_type
== ns_alt_image
.get("vim-type"):
1249 # must use alternative image
1251 "use alternative image id: {}".format(alt_image_id
)
1253 ns_image_id
= alt_image_id
1254 vdur
["ns-image-id"] = ns_image_id
1256 ns_image
= target
["image"][int(ns_image_id
)]
1257 if target_vim
not in ns_image
["vim_info"]:
1258 ns_image
["vim_info"][target_vim
] = {}
1261 if vdur
.get("affinity-or-anti-affinity-group-id"):
1262 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1263 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1264 if target_vim
not in ns_ags
["vim_info"]:
1265 ns_ags
["vim_info"][target_vim
] = {}
1267 vdur
["vim_info"] = {target_vim
: {}}
1268 # instantiation parameters
1270 vdu_instantiation_params
= find_in_list(
1271 get_iterable(vnf_params
, "vdu"),
1272 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1274 if vdu_instantiation_params
:
1275 # Parse the vdu_volumes from the instantiation params
1276 vdu_volumes
= get_volumes_from_instantiation_params(
1277 vdu_instantiation_params
, vdud
1279 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1280 vdur_list
.append(vdur
)
1281 target_vnf
["vdur"] = vdur_list
1282 target
["vnf"].append(target_vnf
)
1284 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1285 desc
= await self
.RO
.deploy(nsr_id
, target
)
1286 self
.logger
.debug("RO return > {}".format(desc
))
1287 action_id
= desc
["action_id"]
1288 await self
._wait
_ng
_ro
(
1289 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1290 operation
="instantiation"
1295 "_admin.deployed.RO.operational-status": "running",
1296 "detailed-status": " ".join(stage
),
1298 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1299 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1300 self
._write
_op
_status
(nslcmop_id
, stage
)
1302 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1306 async def _wait_ng_ro(
1316 detailed_status_old
= None
1318 start_time
= start_time
or time()
1319 while time() <= start_time
+ timeout
:
1320 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1321 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1322 if desc_status
["status"] == "FAILED":
1323 raise NgRoException(desc_status
["details"])
1324 elif desc_status
["status"] == "BUILD":
1326 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1327 elif desc_status
["status"] == "DONE":
1329 stage
[2] = "Deployed at VIM"
1332 assert False, "ROclient.check_ns_status returns unknown {}".format(
1333 desc_status
["status"]
1335 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1336 detailed_status_old
= stage
[2]
1337 db_nsr_update
["detailed-status"] = " ".join(stage
)
1338 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1339 self
._write
_op
_status
(nslcmop_id
, stage
)
1340 await asyncio
.sleep(15, loop
=self
.loop
)
1341 else: # timeout_ns_deploy
1342 raise NgRoException("Timeout waiting ns to deploy")
1344 async def _terminate_ng_ro(
1345 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1350 start_deploy
= time()
1357 "action_id": nslcmop_id
,
1359 desc
= await self
.RO
.deploy(nsr_id
, target
)
1360 action_id
= desc
["action_id"]
1361 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1362 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1365 + "ns terminate action at RO. action_id={}".format(action_id
)
1369 delete_timeout
= 20 * 60 # 20 minutes
1370 await self
._wait
_ng
_ro
(
1371 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1372 operation
="termination"
1375 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1376 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1378 await self
.RO
.delete(nsr_id
)
1379 except Exception as e
:
1380 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1381 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1382 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1383 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1385 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1387 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1388 failed_detail
.append("delete conflict: {}".format(e
))
1391 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1394 failed_detail
.append("delete error: {}".format(e
))
1397 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1401 stage
[2] = "Error deleting from VIM"
1403 stage
[2] = "Deleted from VIM"
1404 db_nsr_update
["detailed-status"] = " ".join(stage
)
1405 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1406 self
._write
_op
_status
(nslcmop_id
, stage
)
1409 raise LcmException("; ".join(failed_detail
))
1412 async def instantiate_RO(
1426 :param logging_text: preffix text to use at logging
1427 :param nsr_id: nsr identity
1428 :param nsd: database content of ns descriptor
1429 :param db_nsr: database content of ns record
1430 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1432 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1433 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1434 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1435 :return: None or exception
1438 start_deploy
= time()
1439 ns_params
= db_nslcmop
.get("operationParams")
1440 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1441 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1443 timeout_ns_deploy
= self
.timeout
.get(
1444 "ns_deploy", self
.timeout_ns_deploy
1447 # Check for and optionally request placement optimization. Database will be updated if placement activated
1448 stage
[2] = "Waiting for Placement."
1449 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1450 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1451 for vnfr
in db_vnfrs
.values():
1452 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1455 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1457 return await self
._instantiate
_ng
_ro
(
1470 except Exception as e
:
1471 stage
[2] = "ERROR deploying at VIM"
1472 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1474 "Error deploying at VIM {}".format(e
),
1475 exc_info
=not isinstance(
1478 ROclient
.ROClientException
,
1487 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1489 Wait for kdu to be up, get ip address
1490 :param logging_text: prefix use for logging
1494 :return: IP address, K8s services
1497 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1500 while nb_tries
< 360:
1501 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1505 for x
in get_iterable(db_vnfr
, "kdur")
1506 if x
.get("kdu-name") == kdu_name
1512 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1514 if kdur
.get("status"):
1515 if kdur
["status"] in ("READY", "ENABLED"):
1516 return kdur
.get("ip-address"), kdur
.get("services")
1519 "target KDU={} is in error state".format(kdu_name
)
1522 await asyncio
.sleep(10, loop
=self
.loop
)
1524 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1526 async def wait_vm_up_insert_key_ro(
1527 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1530 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1531 :param logging_text: prefix use for logging
1536 :param pub_key: public ssh key to inject, None to skip
1537 :param user: user to apply the public ssh key
1541 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1545 target_vdu_id
= None
1551 if ro_retries
>= 360: # 1 hour
1553 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1556 await asyncio
.sleep(10, loop
=self
.loop
)
1559 if not target_vdu_id
:
1560 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1562 if not vdu_id
: # for the VNF case
1563 if db_vnfr
.get("status") == "ERROR":
1565 "Cannot inject ssh-key because target VNF is in error state"
1567 ip_address
= db_vnfr
.get("ip-address")
1573 for x
in get_iterable(db_vnfr
, "vdur")
1574 if x
.get("ip-address") == ip_address
1582 for x
in get_iterable(db_vnfr
, "vdur")
1583 if x
.get("vdu-id-ref") == vdu_id
1584 and x
.get("count-index") == vdu_index
1590 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1591 ): # If only one, this should be the target vdu
1592 vdur
= db_vnfr
["vdur"][0]
1595 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1596 vnfr_id
, vdu_id
, vdu_index
1599 # New generation RO stores information at "vim_info"
1602 if vdur
.get("vim_info"):
1604 t
for t
in vdur
["vim_info"]
1605 ) # there should be only one key
1606 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1608 vdur
.get("pdu-type")
1609 or vdur
.get("status") == "ACTIVE"
1610 or ng_ro_status
== "ACTIVE"
1612 ip_address
= vdur
.get("ip-address")
1615 target_vdu_id
= vdur
["vdu-id-ref"]
1616 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1618 "Cannot inject ssh-key because target VM is in error state"
1621 if not target_vdu_id
:
1624 # inject public key into machine
1625 if pub_key
and user
:
1626 self
.logger
.debug(logging_text
+ "Inserting RO key")
1627 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1628 if vdur
.get("pdu-type"):
1629 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1632 ro_vm_id
= "{}-{}".format(
1633 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1634 ) # TODO add vdu_index
1638 "action": "inject_ssh_key",
1642 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1644 desc
= await self
.RO
.deploy(nsr_id
, target
)
1645 action_id
= desc
["action_id"]
1646 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1649 # wait until NS is deployed at RO
1651 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1652 ro_nsr_id
= deep_get(
1653 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1657 result_dict
= await self
.RO
.create_action(
1659 item_id_name
=ro_nsr_id
,
1661 "add_public_key": pub_key
,
1666 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1667 if not result_dict
or not isinstance(result_dict
, dict):
1669 "Unknown response from RO when injecting key"
1671 for result
in result_dict
.values():
1672 if result
.get("vim_result") == 200:
1675 raise ROclient
.ROClientException(
1676 "error injecting key: {}".format(
1677 result
.get("description")
1681 except NgRoException
as e
:
1683 "Reaching max tries injecting key. Error: {}".format(e
)
1685 except ROclient
.ROClientException
as e
:
1689 + "error injecting key: {}. Retrying until {} seconds".format(
1696 "Reaching max tries injecting key. Error: {}".format(e
)
1703 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1705 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1707 my_vca
= vca_deployed_list
[vca_index
]
1708 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1709 # vdu or kdu: no dependencies
1713 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1714 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1715 configuration_status_list
= db_nsr
["configurationStatus"]
1716 for index
, vca_deployed
in enumerate(configuration_status_list
):
1717 if index
== vca_index
:
1720 if not my_vca
.get("member-vnf-index") or (
1721 vca_deployed
.get("member-vnf-index")
1722 == my_vca
.get("member-vnf-index")
1724 internal_status
= configuration_status_list
[index
].get("status")
1725 if internal_status
== "READY":
1727 elif internal_status
== "BROKEN":
1729 "Configuration aborted because dependent charm/s has failed"
1734 # no dependencies, return
1736 await asyncio
.sleep(10)
1739 raise LcmException("Configuration aborted because dependent charm/s timeout")
1741 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1744 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1746 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1747 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1750 async def instantiate_N2VC(
1767 ee_config_descriptor
,
1769 nsr_id
= db_nsr
["_id"]
1770 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1771 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1772 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1773 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1775 "collection": "nsrs",
1776 "filter": {"_id": nsr_id
},
1777 "path": db_update_entry
,
1783 element_under_configuration
= nsr_id
1787 vnfr_id
= db_vnfr
["_id"]
1788 osm_config
["osm"]["vnf_id"] = vnfr_id
1790 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1792 if vca_type
== "native_charm":
1795 index_number
= vdu_index
or 0
1798 element_type
= "VNF"
1799 element_under_configuration
= vnfr_id
1800 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1802 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1803 element_type
= "VDU"
1804 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1805 osm_config
["osm"]["vdu_id"] = vdu_id
1807 namespace
+= ".{}".format(kdu_name
)
1808 element_type
= "KDU"
1809 element_under_configuration
= kdu_name
1810 osm_config
["osm"]["kdu_name"] = kdu_name
1813 if base_folder
["pkg-dir"]:
1814 artifact_path
= "{}/{}/{}/{}".format(
1815 base_folder
["folder"],
1816 base_folder
["pkg-dir"],
1819 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1824 artifact_path
= "{}/Scripts/{}/{}/".format(
1825 base_folder
["folder"],
1828 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1833 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1835 # get initial_config_primitive_list that applies to this element
1836 initial_config_primitive_list
= config_descriptor
.get(
1837 "initial-config-primitive"
1841 "Initial config primitive list > {}".format(
1842 initial_config_primitive_list
1846 # add config if not present for NS charm
1847 ee_descriptor_id
= ee_config_descriptor
.get("id")
1848 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1849 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1850 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1854 "Initial config primitive list #2 > {}".format(
1855 initial_config_primitive_list
1858 # n2vc_redesign STEP 3.1
1859 # find old ee_id if exists
1860 ee_id
= vca_deployed
.get("ee_id")
1862 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1863 # create or register execution environment in VCA
1864 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1866 self
._write
_configuration
_status
(
1868 vca_index
=vca_index
,
1870 element_under_configuration
=element_under_configuration
,
1871 element_type
=element_type
,
1874 step
= "create execution environment"
1875 self
.logger
.debug(logging_text
+ step
)
1879 if vca_type
== "k8s_proxy_charm":
1880 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1881 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1882 namespace
=namespace
,
1883 artifact_path
=artifact_path
,
1887 elif vca_type
== "helm" or vca_type
== "helm-v3":
1888 ee_id
, credentials
= await self
.vca_map
[
1890 ].create_execution_environment(
1891 namespace
=namespace
,
1895 artifact_path
=artifact_path
,
1899 ee_id
, credentials
= await self
.vca_map
[
1901 ].create_execution_environment(
1902 namespace
=namespace
,
1908 elif vca_type
== "native_charm":
1909 step
= "Waiting to VM being up and getting IP address"
1910 self
.logger
.debug(logging_text
+ step
)
1911 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1920 credentials
= {"hostname": rw_mgmt_ip
}
1922 username
= deep_get(
1923 config_descriptor
, ("config-access", "ssh-access", "default-user")
1925 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1926 # merged. Meanwhile let's get username from initial-config-primitive
1927 if not username
and initial_config_primitive_list
:
1928 for config_primitive
in initial_config_primitive_list
:
1929 for param
in config_primitive
.get("parameter", ()):
1930 if param
["name"] == "ssh-username":
1931 username
= param
["value"]
1935 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1936 "'config-access.ssh-access.default-user'"
1938 credentials
["username"] = username
1939 # n2vc_redesign STEP 3.2
1941 self
._write
_configuration
_status
(
1943 vca_index
=vca_index
,
1944 status
="REGISTERING",
1945 element_under_configuration
=element_under_configuration
,
1946 element_type
=element_type
,
1949 step
= "register execution environment {}".format(credentials
)
1950 self
.logger
.debug(logging_text
+ step
)
1951 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1952 credentials
=credentials
,
1953 namespace
=namespace
,
1958 # for compatibility with MON/POL modules, the need model and application name at database
1959 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1960 ee_id_parts
= ee_id
.split(".")
1961 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1962 if len(ee_id_parts
) >= 2:
1963 model_name
= ee_id_parts
[0]
1964 application_name
= ee_id_parts
[1]
1965 db_nsr_update
[db_update_entry
+ "model"] = model_name
1966 db_nsr_update
[db_update_entry
+ "application"] = application_name
1968 # n2vc_redesign STEP 3.3
1969 step
= "Install configuration Software"
1971 self
._write
_configuration
_status
(
1973 vca_index
=vca_index
,
1974 status
="INSTALLING SW",
1975 element_under_configuration
=element_under_configuration
,
1976 element_type
=element_type
,
1977 other_update
=db_nsr_update
,
1980 # TODO check if already done
1981 self
.logger
.debug(logging_text
+ step
)
1983 if vca_type
== "native_charm":
1984 config_primitive
= next(
1985 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
1988 if config_primitive
:
1989 config
= self
._map
_primitive
_params
(
1990 config_primitive
, {}, deploy_params
1993 if vca_type
== "lxc_proxy_charm":
1994 if element_type
== "NS":
1995 num_units
= db_nsr
.get("config-units") or 1
1996 elif element_type
== "VNF":
1997 num_units
= db_vnfr
.get("config-units") or 1
1998 elif element_type
== "VDU":
1999 for v
in db_vnfr
["vdur"]:
2000 if vdu_id
== v
["vdu-id-ref"]:
2001 num_units
= v
.get("config-units") or 1
2003 if vca_type
!= "k8s_proxy_charm":
2004 await self
.vca_map
[vca_type
].install_configuration_sw(
2006 artifact_path
=artifact_path
,
2009 num_units
=num_units
,
2014 # write in db flag of configuration_sw already installed
2016 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2019 # add relations for this VCA (wait for other peers related with this VCA)
2020 await self
._add
_vca
_relations
(
2021 logging_text
=logging_text
,
2024 vca_index
=vca_index
,
2027 # if SSH access is required, then get execution environment SSH public
2028 # if native charm we have waited already to VM be UP
2029 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2032 # self.logger.debug("get ssh key block")
2034 config_descriptor
, ("config-access", "ssh-access", "required")
2036 # self.logger.debug("ssh key needed")
2037 # Needed to inject a ssh key
2040 ("config-access", "ssh-access", "default-user"),
2042 step
= "Install configuration Software, getting public ssh key"
2043 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2044 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2047 step
= "Insert public key into VM user={} ssh_key={}".format(
2051 # self.logger.debug("no need to get ssh key")
2052 step
= "Waiting to VM being up and getting IP address"
2053 self
.logger
.debug(logging_text
+ step
)
2055 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2058 # n2vc_redesign STEP 5.1
2059 # wait for RO (ip-address) Insert pub_key into VM
2062 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2063 logging_text
, nsr_id
, vnfr_id
, kdu_name
2065 vnfd
= self
.db
.get_one(
2067 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2069 kdu
= get_kdu(vnfd
, kdu_name
)
2071 service
["name"] for service
in get_kdu_services(kdu
)
2073 exposed_services
= []
2074 for service
in services
:
2075 if any(s
in service
["name"] for s
in kdu_services
):
2076 exposed_services
.append(service
)
2077 await self
.vca_map
[vca_type
].exec_primitive(
2079 primitive_name
="config",
2081 "osm-config": json
.dumps(
2083 k8s
={"services": exposed_services
}
2090 # This verification is needed in order to avoid trying to add a public key
2091 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2092 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2093 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2095 elif db_vnfr
.get('vdur'):
2096 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2106 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2108 # store rw_mgmt_ip in deploy params for later replacement
2109 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2111 # n2vc_redesign STEP 6 Execute initial config primitive
2112 step
= "execute initial config primitive"
2114 # wait for dependent primitives execution (NS -> VNF -> VDU)
2115 if initial_config_primitive_list
:
2116 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2118 # stage, in function of element type: vdu, kdu, vnf or ns
2119 my_vca
= vca_deployed_list
[vca_index
]
2120 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2122 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2123 elif my_vca
.get("member-vnf-index"):
2125 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2128 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2130 self
._write
_configuration
_status
(
2131 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2134 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2136 check_if_terminated_needed
= True
2137 for initial_config_primitive
in initial_config_primitive_list
:
2138 # adding information on the vca_deployed if it is a NS execution environment
2139 if not vca_deployed
["member-vnf-index"]:
2140 deploy_params
["ns_config_info"] = json
.dumps(
2141 self
._get
_ns
_config
_info
(nsr_id
)
2143 # TODO check if already done
2144 primitive_params_
= self
._map
_primitive
_params
(
2145 initial_config_primitive
, {}, deploy_params
2148 step
= "execute primitive '{}' params '{}'".format(
2149 initial_config_primitive
["name"], primitive_params_
2151 self
.logger
.debug(logging_text
+ step
)
2152 await self
.vca_map
[vca_type
].exec_primitive(
2154 primitive_name
=initial_config_primitive
["name"],
2155 params_dict
=primitive_params_
,
2160 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2161 if check_if_terminated_needed
:
2162 if config_descriptor
.get("terminate-config-primitive"):
2164 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2166 check_if_terminated_needed
= False
2168 # TODO register in database that primitive is done
2170 # STEP 7 Configure metrics
2171 if vca_type
== "helm" or vca_type
== "helm-v3":
2172 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2174 artifact_path
=artifact_path
,
2175 ee_config_descriptor
=ee_config_descriptor
,
2178 target_ip
=rw_mgmt_ip
,
2184 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2187 for job
in prometheus_jobs
:
2190 {"job_name": job
["job_name"]},
2193 fail_on_empty
=False,
2196 step
= "instantiated at VCA"
2197 self
.logger
.debug(logging_text
+ step
)
2199 self
._write
_configuration
_status
(
2200 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2203 except Exception as e
: # TODO not use Exception but N2VC exception
2204 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2206 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2209 "Exception while {} : {}".format(step
, e
), exc_info
=True
2211 self
._write
_configuration
_status
(
2212 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2214 raise LcmException("{} {}".format(step
, e
)) from e
2216 def _write_ns_status(
2220 current_operation
: str,
2221 current_operation_id
: str,
2222 error_description
: str = None,
2223 error_detail
: str = None,
2224 other_update
: dict = None,
2227 Update db_nsr fields.
2230 :param current_operation:
2231 :param current_operation_id:
2232 :param error_description:
2233 :param error_detail:
2234 :param other_update: Other required changes at database if provided, will be cleared
2238 db_dict
= other_update
or {}
2241 ] = current_operation_id
# for backward compatibility
2242 db_dict
["_admin.current-operation"] = current_operation_id
2243 db_dict
["_admin.operation-type"] = (
2244 current_operation
if current_operation
!= "IDLE" else None
2246 db_dict
["currentOperation"] = current_operation
2247 db_dict
["currentOperationID"] = current_operation_id
2248 db_dict
["errorDescription"] = error_description
2249 db_dict
["errorDetail"] = error_detail
2252 db_dict
["nsState"] = ns_state
2253 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2254 except DbException
as e
:
2255 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2257 def _write_op_status(
2261 error_message
: str = None,
2262 queuePosition
: int = 0,
2263 operation_state
: str = None,
2264 other_update
: dict = None,
2267 db_dict
= other_update
or {}
2268 db_dict
["queuePosition"] = queuePosition
2269 if isinstance(stage
, list):
2270 db_dict
["stage"] = stage
[0]
2271 db_dict
["detailed-status"] = " ".join(stage
)
2272 elif stage
is not None:
2273 db_dict
["stage"] = str(stage
)
2275 if error_message
is not None:
2276 db_dict
["errorMessage"] = error_message
2277 if operation_state
is not None:
2278 db_dict
["operationState"] = operation_state
2279 db_dict
["statusEnteredTime"] = time()
2280 self
.update_db_2("nslcmops", op_id
, db_dict
)
2281 except DbException
as e
:
2283 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2286 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2288 nsr_id
= db_nsr
["_id"]
2289 # configurationStatus
2290 config_status
= db_nsr
.get("configurationStatus")
2293 "configurationStatus.{}.status".format(index
): status
2294 for index
, v
in enumerate(config_status
)
2298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2300 except DbException
as e
:
2302 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2305 def _write_configuration_status(
2310 element_under_configuration
: str = None,
2311 element_type
: str = None,
2312 other_update
: dict = None,
2315 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2316 # .format(vca_index, status))
2319 db_path
= "configurationStatus.{}.".format(vca_index
)
2320 db_dict
= other_update
or {}
2322 db_dict
[db_path
+ "status"] = status
2323 if element_under_configuration
:
2325 db_path
+ "elementUnderConfiguration"
2326 ] = element_under_configuration
2328 db_dict
[db_path
+ "elementType"] = element_type
2329 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2330 except DbException
as e
:
2332 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2333 status
, nsr_id
, vca_index
, e
2337 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2339 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2340 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2341 Database is used because the result can be obtained from a different LCM worker in case of HA.
2342 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2343 :param db_nslcmop: database content of nslcmop
2344 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2345 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2346 computed 'vim-account-id'
2349 nslcmop_id
= db_nslcmop
["_id"]
2350 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2351 if placement_engine
== "PLA":
2353 logging_text
+ "Invoke and wait for placement optimization"
2355 await self
.msg
.aiowrite(
2356 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2358 db_poll_interval
= 5
2359 wait
= db_poll_interval
* 10
2361 while not pla_result
and wait
>= 0:
2362 await asyncio
.sleep(db_poll_interval
)
2363 wait
-= db_poll_interval
2364 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2365 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2369 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2372 for pla_vnf
in pla_result
["vnf"]:
2373 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2374 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2379 {"_id": vnfr
["_id"]},
2380 {"vim-account-id": pla_vnf
["vimAccountId"]},
2383 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2386 def update_nsrs_with_pla_result(self
, params
):
2388 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2390 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2392 except Exception as e
:
2393 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2395 async def instantiate(self
, nsr_id
, nslcmop_id
):
2398 :param nsr_id: ns instance to deploy
2399 :param nslcmop_id: operation to run
2403 # Try to lock HA task here
2404 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2405 if not task_is_locked_by_me
:
2407 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2411 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2412 self
.logger
.debug(logging_text
+ "Enter")
2414 # get all needed from database
2416 # database nsrs record
2419 # database nslcmops record
2422 # update operation on nsrs
2424 # update operation on nslcmops
2425 db_nslcmop_update
= {}
2427 nslcmop_operation_state
= None
2428 db_vnfrs
= {} # vnf's info indexed by member-index
2430 tasks_dict_info
= {} # from task to info text
2434 "Stage 1/5: preparation of the environment.",
2435 "Waiting for previous operations to terminate.",
2438 # ^ stage, step, VIM progress
2440 # wait for any previous tasks in process
2441 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2443 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2444 stage
[1] = "Reading from database."
2445 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2446 db_nsr_update
["detailed-status"] = "creating"
2447 db_nsr_update
["operational-status"] = "init"
2448 self
._write
_ns
_status
(
2450 ns_state
="BUILDING",
2451 current_operation
="INSTANTIATING",
2452 current_operation_id
=nslcmop_id
,
2453 other_update
=db_nsr_update
,
2455 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2457 # read from db: operation
2458 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2459 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2460 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2461 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2462 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2464 ns_params
= db_nslcmop
.get("operationParams")
2465 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2466 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2468 timeout_ns_deploy
= self
.timeout
.get(
2469 "ns_deploy", self
.timeout_ns_deploy
2473 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2474 self
.logger
.debug(logging_text
+ stage
[1])
2475 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2476 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2477 self
.logger
.debug(logging_text
+ stage
[1])
2478 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2479 self
.fs
.sync(db_nsr
["nsd-id"])
2481 # nsr_name = db_nsr["name"] # TODO short-name??
2483 # read from db: vnf's of this ns
2484 stage
[1] = "Getting vnfrs from db."
2485 self
.logger
.debug(logging_text
+ stage
[1])
2486 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2488 # read from db: vnfd's for every vnf
2489 db_vnfds
= [] # every vnfd data
2491 # for each vnf in ns, read vnfd
2492 for vnfr
in db_vnfrs_list
:
2493 if vnfr
.get("kdur"):
2495 for kdur
in vnfr
["kdur"]:
2496 if kdur
.get("additionalParams"):
2497 kdur
["additionalParams"] = json
.loads(
2498 kdur
["additionalParams"]
2500 kdur_list
.append(kdur
)
2501 vnfr
["kdur"] = kdur_list
2503 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2504 vnfd_id
= vnfr
["vnfd-id"]
2505 vnfd_ref
= vnfr
["vnfd-ref"]
2506 self
.fs
.sync(vnfd_id
)
2508 # if we haven't this vnfd, read it from db
2509 if vnfd_id
not in db_vnfds
:
2511 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2514 self
.logger
.debug(logging_text
+ stage
[1])
2515 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2518 db_vnfds
.append(vnfd
)
2520 # Get or generates the _admin.deployed.VCA list
2521 vca_deployed_list
= None
2522 if db_nsr
["_admin"].get("deployed"):
2523 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2524 if vca_deployed_list
is None:
2525 vca_deployed_list
= []
2526 configuration_status_list
= []
2527 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2528 db_nsr_update
["configurationStatus"] = configuration_status_list
2529 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2530 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2531 elif isinstance(vca_deployed_list
, dict):
2532 # maintain backward compatibility. Change a dict to list at database
2533 vca_deployed_list
= list(vca_deployed_list
.values())
2534 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2535 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2538 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2540 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2541 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2543 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2544 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2545 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2547 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2550 # n2vc_redesign STEP 2 Deploy Network Scenario
2551 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2552 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2554 stage
[1] = "Deploying KDUs."
2555 # self.logger.debug(logging_text + "Before deploy_kdus")
2556 # Call to deploy_kdus in case exists the "vdu:kdu" param
2557 await self
.deploy_kdus(
2558 logging_text
=logging_text
,
2560 nslcmop_id
=nslcmop_id
,
2563 task_instantiation_info
=tasks_dict_info
,
2566 stage
[1] = "Getting VCA public key."
2567 # n2vc_redesign STEP 1 Get VCA public ssh-key
2568 # feature 1429. Add n2vc public key to needed VMs
2569 n2vc_key
= self
.n2vc
.get_public_key()
2570 n2vc_key_list
= [n2vc_key
]
2571 if self
.vca_config
.get("public_key"):
2572 n2vc_key_list
.append(self
.vca_config
["public_key"])
2574 stage
[1] = "Deploying NS at VIM."
2575 task_ro
= asyncio
.ensure_future(
2576 self
.instantiate_RO(
2577 logging_text
=logging_text
,
2581 db_nslcmop
=db_nslcmop
,
2584 n2vc_key_list
=n2vc_key_list
,
2588 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2589 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2591 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2592 stage
[1] = "Deploying Execution Environments."
2593 self
.logger
.debug(logging_text
+ stage
[1])
2595 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2596 for vnf_profile
in get_vnf_profiles(nsd
):
2597 vnfd_id
= vnf_profile
["vnfd-id"]
2598 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2599 member_vnf_index
= str(vnf_profile
["id"])
2600 db_vnfr
= db_vnfrs
[member_vnf_index
]
2601 base_folder
= vnfd
["_admin"]["storage"]
2607 # Get additional parameters
2608 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2609 if db_vnfr
.get("additionalParamsForVnf"):
2610 deploy_params
.update(
2611 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2614 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2615 if descriptor_config
:
2617 logging_text
=logging_text
2618 + "member_vnf_index={} ".format(member_vnf_index
),
2621 nslcmop_id
=nslcmop_id
,
2627 member_vnf_index
=member_vnf_index
,
2628 vdu_index
=vdu_index
,
2630 deploy_params
=deploy_params
,
2631 descriptor_config
=descriptor_config
,
2632 base_folder
=base_folder
,
2633 task_instantiation_info
=tasks_dict_info
,
2637 # Deploy charms for each VDU that supports one.
2638 for vdud
in get_vdu_list(vnfd
):
2640 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2641 vdur
= find_in_list(
2642 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2645 if vdur
.get("additionalParams"):
2646 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2648 deploy_params_vdu
= deploy_params
2649 deploy_params_vdu
["OSM"] = get_osm_params(
2650 db_vnfr
, vdu_id
, vdu_count_index
=0
2652 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2654 self
.logger
.debug("VDUD > {}".format(vdud
))
2656 "Descriptor config > {}".format(descriptor_config
)
2658 if descriptor_config
:
2661 for vdu_index
in range(vdud_count
):
2662 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2664 logging_text
=logging_text
2665 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2666 member_vnf_index
, vdu_id
, vdu_index
2670 nslcmop_id
=nslcmop_id
,
2676 member_vnf_index
=member_vnf_index
,
2677 vdu_index
=vdu_index
,
2679 deploy_params
=deploy_params_vdu
,
2680 descriptor_config
=descriptor_config
,
2681 base_folder
=base_folder
,
2682 task_instantiation_info
=tasks_dict_info
,
2685 for kdud
in get_kdu_list(vnfd
):
2686 kdu_name
= kdud
["name"]
2687 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2688 if descriptor_config
:
2693 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2695 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2696 if kdur
.get("additionalParams"):
2697 deploy_params_kdu
.update(
2698 parse_yaml_strings(kdur
["additionalParams"].copy())
2702 logging_text
=logging_text
,
2705 nslcmop_id
=nslcmop_id
,
2711 member_vnf_index
=member_vnf_index
,
2712 vdu_index
=vdu_index
,
2714 deploy_params
=deploy_params_kdu
,
2715 descriptor_config
=descriptor_config
,
2716 base_folder
=base_folder
,
2717 task_instantiation_info
=tasks_dict_info
,
2721 # Check if this NS has a charm configuration
2722 descriptor_config
= nsd
.get("ns-configuration")
2723 if descriptor_config
and descriptor_config
.get("juju"):
2726 member_vnf_index
= None
2732 # Get additional parameters
2733 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2734 if db_nsr
.get("additionalParamsForNs"):
2735 deploy_params
.update(
2736 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2738 base_folder
= nsd
["_admin"]["storage"]
2740 logging_text
=logging_text
,
2743 nslcmop_id
=nslcmop_id
,
2749 member_vnf_index
=member_vnf_index
,
2750 vdu_index
=vdu_index
,
2752 deploy_params
=deploy_params
,
2753 descriptor_config
=descriptor_config
,
2754 base_folder
=base_folder
,
2755 task_instantiation_info
=tasks_dict_info
,
2759 # rest of staff will be done at finally
2762 ROclient
.ROClientException
,
2768 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2771 except asyncio
.CancelledError
:
2773 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2775 exc
= "Operation was cancelled"
2776 except Exception as e
:
2777 exc
= traceback
.format_exc()
2778 self
.logger
.critical(
2779 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2784 error_list
.append(str(exc
))
2786 # wait for pending tasks
2788 stage
[1] = "Waiting for instantiate pending tasks."
2789 self
.logger
.debug(logging_text
+ stage
[1])
2790 error_list
+= await self
._wait
_for
_tasks
(
2798 stage
[1] = stage
[2] = ""
2799 except asyncio
.CancelledError
:
2800 error_list
.append("Cancelled")
2801 # TODO cancel all tasks
2802 except Exception as exc
:
2803 error_list
.append(str(exc
))
2805 # update operation-status
2806 db_nsr_update
["operational-status"] = "running"
2807 # let's begin with VCA 'configured' status (later we can change it)
2808 db_nsr_update
["config-status"] = "configured"
2809 for task
, task_name
in tasks_dict_info
.items():
2810 if not task
.done() or task
.cancelled() or task
.exception():
2811 if task_name
.startswith(self
.task_name_deploy_vca
):
2812 # A N2VC task is pending
2813 db_nsr_update
["config-status"] = "failed"
2815 # RO or KDU task is pending
2816 db_nsr_update
["operational-status"] = "failed"
2818 # update status at database
2820 error_detail
= ". ".join(error_list
)
2821 self
.logger
.error(logging_text
+ error_detail
)
2822 error_description_nslcmop
= "{} Detail: {}".format(
2823 stage
[0], error_detail
2825 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2826 nslcmop_id
, stage
[0]
2829 db_nsr_update
["detailed-status"] = (
2830 error_description_nsr
+ " Detail: " + error_detail
2832 db_nslcmop_update
["detailed-status"] = error_detail
2833 nslcmop_operation_state
= "FAILED"
2837 error_description_nsr
= error_description_nslcmop
= None
2839 db_nsr_update
["detailed-status"] = "Done"
2840 db_nslcmop_update
["detailed-status"] = "Done"
2841 nslcmop_operation_state
= "COMPLETED"
2844 self
._write
_ns
_status
(
2847 current_operation
="IDLE",
2848 current_operation_id
=None,
2849 error_description
=error_description_nsr
,
2850 error_detail
=error_detail
,
2851 other_update
=db_nsr_update
,
2853 self
._write
_op
_status
(
2856 error_message
=error_description_nslcmop
,
2857 operation_state
=nslcmop_operation_state
,
2858 other_update
=db_nslcmop_update
,
2861 if nslcmop_operation_state
:
2863 await self
.msg
.aiowrite(
2868 "nslcmop_id": nslcmop_id
,
2869 "operationState": nslcmop_operation_state
,
2873 except Exception as e
:
2875 logging_text
+ "kafka_write notification Exception {}".format(e
)
2878 self
.logger
.debug(logging_text
+ "Exit")
2879 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2881 def _get_vnfd(self
, vnfd_id
: str, cached_vnfds
: Dict
[str, Any
]):
2882 if vnfd_id
not in cached_vnfds
:
2883 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
2884 return cached_vnfds
[vnfd_id
]
2886 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2887 if vnf_profile_id
not in cached_vnfrs
:
2888 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2891 "member-vnf-index-ref": vnf_profile_id
,
2892 "nsr-id-ref": nsr_id
,
2895 return cached_vnfrs
[vnf_profile_id
]
2897 def _is_deployed_vca_in_relation(
2898 self
, vca
: DeployedVCA
, relation
: Relation
2901 for endpoint
in (relation
.provider
, relation
.requirer
):
2902 if endpoint
["kdu-resource-profile-id"]:
2905 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2906 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2907 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2913 def _update_ee_relation_data_with_implicit_data(
2914 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2916 ee_relation_data
= safe_get_ee_relation(
2917 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2919 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2920 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2921 "execution-environment-ref"
2923 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2924 vnfd_id
= vnf_profile
["vnfd-id"]
2925 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2928 if ee_relation_level
== EELevel
.VNF
2929 else ee_relation_data
["vdu-profile-id"]
2931 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2934 f
"not execution environments found for ee_relation {ee_relation_data}"
2936 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2937 return ee_relation_data
2939 def _get_ns_relations(
2942 nsd
: Dict
[str, Any
],
2944 cached_vnfds
: Dict
[str, Any
],
2945 ) -> List
[Relation
]:
2947 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2948 for r
in db_ns_relations
:
2949 provider_dict
= None
2950 requirer_dict
= None
2951 if all(key
in r
for key
in ("provider", "requirer")):
2952 provider_dict
= r
["provider"]
2953 requirer_dict
= r
["requirer"]
2954 elif "entities" in r
:
2955 provider_id
= r
["entities"][0]["id"]
2958 "endpoint": r
["entities"][0]["endpoint"],
2960 if provider_id
!= nsd
["id"]:
2961 provider_dict
["vnf-profile-id"] = provider_id
2962 requirer_id
= r
["entities"][1]["id"]
2965 "endpoint": r
["entities"][1]["endpoint"],
2967 if requirer_id
!= nsd
["id"]:
2968 requirer_dict
["vnf-profile-id"] = requirer_id
2971 "provider/requirer or entities must be included in the relation."
2973 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2974 nsr_id
, nsd
, provider_dict
, cached_vnfds
2976 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
2977 nsr_id
, nsd
, requirer_dict
, cached_vnfds
2979 provider
= EERelation(relation_provider
)
2980 requirer
= EERelation(relation_requirer
)
2981 relation
= Relation(r
["name"], provider
, requirer
)
2982 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
2984 relations
.append(relation
)
2987 def _get_vnf_relations(
2990 nsd
: Dict
[str, Any
],
2992 cached_vnfds
: Dict
[str, Any
],
2993 ) -> List
[Relation
]:
2995 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
2996 vnf_profile_id
= vnf_profile
["id"]
2997 vnfd_id
= vnf_profile
["vnfd-id"]
2998 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
2999 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3000 for r
in db_vnf_relations
:
3001 provider_dict
= None
3002 requirer_dict
= None
3003 if all(key
in r
for key
in ("provider", "requirer")):
3004 provider_dict
= r
["provider"]
3005 requirer_dict
= r
["requirer"]
3006 elif "entities" in r
:
3007 provider_id
= r
["entities"][0]["id"]
3010 "vnf-profile-id": vnf_profile_id
,
3011 "endpoint": r
["entities"][0]["endpoint"],
3013 if provider_id
!= vnfd_id
:
3014 provider_dict
["vdu-profile-id"] = provider_id
3015 requirer_id
= r
["entities"][1]["id"]
3018 "vnf-profile-id": vnf_profile_id
,
3019 "endpoint": r
["entities"][1]["endpoint"],
3021 if requirer_id
!= vnfd_id
:
3022 requirer_dict
["vdu-profile-id"] = requirer_id
3025 "provider/requirer or entities must be included in the relation."
3027 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3028 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3030 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3031 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3033 provider
= EERelation(relation_provider
)
3034 requirer
= EERelation(relation_requirer
)
3035 relation
= Relation(r
["name"], provider
, requirer
)
3036 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3038 relations
.append(relation
)
3041 def _get_kdu_resource_data(
3043 ee_relation
: EERelation
,
3044 db_nsr
: Dict
[str, Any
],
3045 cached_vnfds
: Dict
[str, Any
],
3046 ) -> DeployedK8sResource
:
3047 nsd
= get_nsd(db_nsr
)
3048 vnf_profiles
= get_vnf_profiles(nsd
)
3049 vnfd_id
= find_in_list(
3051 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3053 db_vnfd
= self
._get
_vnfd
(vnfd_id
, cached_vnfds
)
3054 kdu_resource_profile
= get_kdu_resource_profile(
3055 db_vnfd
, ee_relation
.kdu_resource_profile_id
3057 kdu_name
= kdu_resource_profile
["kdu-name"]
3058 deployed_kdu
, _
= get_deployed_kdu(
3059 db_nsr
.get("_admin", ()).get("deployed", ()),
3061 ee_relation
.vnf_profile_id
,
3063 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3066 def _get_deployed_component(
3068 ee_relation
: EERelation
,
3069 db_nsr
: Dict
[str, Any
],
3070 cached_vnfds
: Dict
[str, Any
],
3071 ) -> DeployedComponent
:
3072 nsr_id
= db_nsr
["_id"]
3073 deployed_component
= None
3074 ee_level
= EELevel
.get_level(ee_relation
)
3075 if ee_level
== EELevel
.NS
:
3076 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3078 deployed_component
= DeployedVCA(nsr_id
, vca
)
3079 elif ee_level
== EELevel
.VNF
:
3080 vca
= get_deployed_vca(
3084 "member-vnf-index": ee_relation
.vnf_profile_id
,
3085 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3089 deployed_component
= DeployedVCA(nsr_id
, vca
)
3090 elif ee_level
== EELevel
.VDU
:
3091 vca
= get_deployed_vca(
3094 "vdu_id": ee_relation
.vdu_profile_id
,
3095 "member-vnf-index": ee_relation
.vnf_profile_id
,
3096 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3100 deployed_component
= DeployedVCA(nsr_id
, vca
)
3101 elif ee_level
== EELevel
.KDU
:
3102 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3103 ee_relation
, db_nsr
, cached_vnfds
3105 if kdu_resource_data
:
3106 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3107 return deployed_component
3109 async def _add_relation(
3113 db_nsr
: Dict
[str, Any
],
3114 cached_vnfds
: Dict
[str, Any
],
3115 cached_vnfrs
: Dict
[str, Any
],
3117 deployed_provider
= self
._get
_deployed
_component
(
3118 relation
.provider
, db_nsr
, cached_vnfds
3120 deployed_requirer
= self
._get
_deployed
_component
(
3121 relation
.requirer
, db_nsr
, cached_vnfds
3125 and deployed_requirer
3126 and deployed_provider
.config_sw_installed
3127 and deployed_requirer
.config_sw_installed
3129 provider_db_vnfr
= (
3131 relation
.provider
.nsr_id
,
3132 relation
.provider
.vnf_profile_id
,
3135 if relation
.provider
.vnf_profile_id
3138 requirer_db_vnfr
= (
3140 relation
.requirer
.nsr_id
,
3141 relation
.requirer
.vnf_profile_id
,
3144 if relation
.requirer
.vnf_profile_id
3147 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3148 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3149 provider_relation_endpoint
= RelationEndpoint(
3150 deployed_provider
.ee_id
,
3152 relation
.provider
.endpoint
,
3154 requirer_relation_endpoint
= RelationEndpoint(
3155 deployed_requirer
.ee_id
,
3157 relation
.requirer
.endpoint
,
3159 await self
.vca_map
[vca_type
].add_relation(
3160 provider
=provider_relation_endpoint
,
3161 requirer
=requirer_relation_endpoint
,
3163 # remove entry from relations list
3167 async def _add_vca_relations(
3173 timeout
: int = 3600,
3177 # 1. find all relations for this VCA
3178 # 2. wait for other peers related
3182 # STEP 1: find all relations for this VCA
3185 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3186 nsd
= get_nsd(db_nsr
)
3189 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3190 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3195 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3196 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3198 # if no relations, terminate
3200 self
.logger
.debug(logging_text
+ " No relations")
3203 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3210 if now
- start
>= timeout
:
3211 self
.logger
.error(logging_text
+ " : timeout adding relations")
3214 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3215 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3217 # for each relation, find the VCA's related
3218 for relation
in relations
.copy():
3219 added
= await self
._add
_relation
(
3227 relations
.remove(relation
)
3230 self
.logger
.debug("Relations added")
3232 await asyncio
.sleep(5.0)
3236 except Exception as e
:
3237 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3240 async def _install_kdu(
3248 k8s_instance_info
: dict,
3249 k8params
: dict = None,
3255 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3258 "collection": "nsrs",
3259 "filter": {"_id": nsr_id
},
3260 "path": nsr_db_path
,
3263 if k8s_instance_info
.get("kdu-deployment-name"):
3264 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3266 kdu_instance
= self
.k8scluster_map
[
3268 ].generate_kdu_instance_name(
3269 db_dict
=db_dict_install
,
3270 kdu_model
=k8s_instance_info
["kdu-model"],
3271 kdu_name
=k8s_instance_info
["kdu-name"],
3274 # Update the nsrs table with the kdu-instance value
3278 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3281 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3282 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3283 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3284 # namespace, this first verification could be removed, and the next step would be done for any kind
3286 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3287 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3288 if k8sclustertype
in ("juju", "juju-bundle"):
3289 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3290 # that the user passed a namespace which he wants its KDU to be deployed in)
3296 "_admin.projects_write": k8s_instance_info
["namespace"],
3297 "_admin.projects_read": k8s_instance_info
["namespace"],
3303 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3308 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3310 k8s_instance_info
["namespace"] = kdu_instance
3312 await self
.k8scluster_map
[k8sclustertype
].install(
3313 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3314 kdu_model
=k8s_instance_info
["kdu-model"],
3317 db_dict
=db_dict_install
,
3319 kdu_name
=k8s_instance_info
["kdu-name"],
3320 namespace
=k8s_instance_info
["namespace"],
3321 kdu_instance
=kdu_instance
,
3325 # Obtain services to obtain management service ip
3326 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3327 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3328 kdu_instance
=kdu_instance
,
3329 namespace
=k8s_instance_info
["namespace"],
3332 # Obtain management service info (if exists)
3333 vnfr_update_dict
= {}
3334 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3336 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3341 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3344 for service
in kdud
.get("service", [])
3345 if service
.get("mgmt-service")
3347 for mgmt_service
in mgmt_services
:
3348 for service
in services
:
3349 if service
["name"].startswith(mgmt_service
["name"]):
3350 # Mgmt service found, Obtain service ip
3351 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3352 if isinstance(ip
, list) and len(ip
) == 1:
3356 "kdur.{}.ip-address".format(kdu_index
)
3359 # Check if must update also mgmt ip at the vnf
3360 service_external_cp
= mgmt_service
.get(
3361 "external-connection-point-ref"
3363 if service_external_cp
:
3365 deep_get(vnfd
, ("mgmt-interface", "cp"))
3366 == service_external_cp
3368 vnfr_update_dict
["ip-address"] = ip
3373 "external-connection-point-ref", ""
3375 == service_external_cp
,
3378 "kdur.{}.ip-address".format(kdu_index
)
3383 "Mgmt service name: {} not found".format(
3384 mgmt_service
["name"]
3388 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3389 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3391 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3394 and kdu_config
.get("initial-config-primitive")
3395 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3397 initial_config_primitive_list
= kdu_config
.get(
3398 "initial-config-primitive"
3400 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3402 for initial_config_primitive
in initial_config_primitive_list
:
3403 primitive_params_
= self
._map
_primitive
_params
(
3404 initial_config_primitive
, {}, {}
3407 await asyncio
.wait_for(
3408 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3409 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3410 kdu_instance
=kdu_instance
,
3411 primitive_name
=initial_config_primitive
["name"],
3412 params
=primitive_params_
,
3413 db_dict
=db_dict_install
,
3419 except Exception as e
:
3420 # Prepare update db with error and raise exception
3423 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3427 vnfr_data
.get("_id"),
3428 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3431 # ignore to keep original exception
3433 # reraise original error
3438 async def deploy_kdus(
3445 task_instantiation_info
,
3447 # Launch kdus if present in the descriptor
3449 k8scluster_id_2_uuic
= {
3450 "helm-chart-v3": {},
3455 async def _get_cluster_id(cluster_id
, cluster_type
):
3456 nonlocal k8scluster_id_2_uuic
3457 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3458 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3460 # check if K8scluster is creating and wait look if previous tasks in process
3461 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3462 "k8scluster", cluster_id
3465 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3466 task_name
, cluster_id
3468 self
.logger
.debug(logging_text
+ text
)
3469 await asyncio
.wait(task_dependency
, timeout
=3600)
3471 db_k8scluster
= self
.db
.get_one(
3472 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3474 if not db_k8scluster
:
3475 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3477 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3479 if cluster_type
== "helm-chart-v3":
3481 # backward compatibility for existing clusters that have not been initialized for helm v3
3482 k8s_credentials
= yaml
.safe_dump(
3483 db_k8scluster
.get("credentials")
3485 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3486 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3488 db_k8scluster_update
= {}
3489 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3490 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3491 db_k8scluster_update
[
3492 "_admin.helm-chart-v3.created"
3494 db_k8scluster_update
[
3495 "_admin.helm-chart-v3.operationalState"
3498 "k8sclusters", cluster_id
, db_k8scluster_update
3500 except Exception as e
:
3503 + "error initializing helm-v3 cluster: {}".format(str(e
))
3506 "K8s cluster '{}' has not been initialized for '{}'".format(
3507 cluster_id
, cluster_type
3512 "K8s cluster '{}' has not been initialized for '{}'".format(
3513 cluster_id
, cluster_type
3516 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3519 logging_text
+= "Deploy kdus: "
3522 db_nsr_update
= {"_admin.deployed.K8s": []}
3523 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3526 updated_cluster_list
= []
3527 updated_v3_cluster_list
= []
3529 for vnfr_data
in db_vnfrs
.values():
3530 vca_id
= self
.get_vca_id(vnfr_data
, {})
3531 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3532 # Step 0: Prepare and set parameters
3533 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3534 vnfd_id
= vnfr_data
.get("vnfd-id")
3535 vnfd_with_id
= find_in_list(
3536 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3540 for kdud
in vnfd_with_id
["kdu"]
3541 if kdud
["name"] == kdur
["kdu-name"]
3543 namespace
= kdur
.get("k8s-namespace")
3544 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3545 if kdur
.get("helm-chart"):
3546 kdumodel
= kdur
["helm-chart"]
3547 # Default version: helm3, if helm-version is v2 assign v2
3548 k8sclustertype
= "helm-chart-v3"
3549 self
.logger
.debug("kdur: {}".format(kdur
))
3551 kdur
.get("helm-version")
3552 and kdur
.get("helm-version") == "v2"
3554 k8sclustertype
= "helm-chart"
3555 elif kdur
.get("juju-bundle"):
3556 kdumodel
= kdur
["juju-bundle"]
3557 k8sclustertype
= "juju-bundle"
3560 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3561 "juju-bundle. Maybe an old NBI version is running".format(
3562 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3565 # check if kdumodel is a file and exists
3567 vnfd_with_id
= find_in_list(
3568 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3570 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3571 if storage
: # may be not present if vnfd has not artifacts
3572 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3573 if storage
["pkg-dir"]:
3574 filename
= "{}/{}/{}s/{}".format(
3581 filename
= "{}/Scripts/{}s/{}".format(
3586 if self
.fs
.file_exists(
3587 filename
, mode
="file"
3588 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3589 kdumodel
= self
.fs
.path
+ filename
3590 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3592 except Exception: # it is not a file
3595 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3596 step
= "Synchronize repos for k8s cluster '{}'".format(
3599 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3603 k8sclustertype
== "helm-chart"
3604 and cluster_uuid
not in updated_cluster_list
3606 k8sclustertype
== "helm-chart-v3"
3607 and cluster_uuid
not in updated_v3_cluster_list
3609 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3610 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3611 cluster_uuid
=cluster_uuid
3614 if del_repo_list
or added_repo_dict
:
3615 if k8sclustertype
== "helm-chart":
3617 "_admin.helm_charts_added." + item
: None
3618 for item
in del_repo_list
3621 "_admin.helm_charts_added." + item
: name
3622 for item
, name
in added_repo_dict
.items()
3624 updated_cluster_list
.append(cluster_uuid
)
3625 elif k8sclustertype
== "helm-chart-v3":
3627 "_admin.helm_charts_v3_added." + item
: None
3628 for item
in del_repo_list
3631 "_admin.helm_charts_v3_added." + item
: name
3632 for item
, name
in added_repo_dict
.items()
3634 updated_v3_cluster_list
.append(cluster_uuid
)
3636 logging_text
+ "repos synchronized on k8s cluster "
3637 "'{}' to_delete: {}, to_add: {}".format(
3638 k8s_cluster_id
, del_repo_list
, added_repo_dict
3643 {"_id": k8s_cluster_id
},
3649 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3650 vnfr_data
["member-vnf-index-ref"],
3654 k8s_instance_info
= {
3655 "kdu-instance": None,
3656 "k8scluster-uuid": cluster_uuid
,
3657 "k8scluster-type": k8sclustertype
,
3658 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3659 "kdu-name": kdur
["kdu-name"],
3660 "kdu-model": kdumodel
,
3661 "namespace": namespace
,
3662 "kdu-deployment-name": kdu_deployment_name
,
3664 db_path
= "_admin.deployed.K8s.{}".format(index
)
3665 db_nsr_update
[db_path
] = k8s_instance_info
3666 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3667 vnfd_with_id
= find_in_list(
3668 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3670 task
= asyncio
.ensure_future(
3679 k8params
=desc_params
,
3684 self
.lcm_tasks
.register(
3688 "instantiate_KDU-{}".format(index
),
3691 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3697 except (LcmException
, asyncio
.CancelledError
):
3699 except Exception as e
:
3700 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3701 if isinstance(e
, (N2VCException
, DbException
)):
3702 self
.logger
.error(logging_text
+ msg
)
3704 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3705 raise LcmException(msg
)
3708 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3727 task_instantiation_info
,
3730 # launch instantiate_N2VC in a asyncio task and register task object
3731 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3732 # if not found, create one entry and update database
3733 # fill db_nsr._admin.deployed.VCA.<index>
3736 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3740 get_charm_name
= False
3741 if "execution-environment-list" in descriptor_config
:
3742 ee_list
= descriptor_config
.get("execution-environment-list", [])
3743 elif "juju" in descriptor_config
:
3744 ee_list
= [descriptor_config
] # ns charms
3745 if "execution-environment-list" not in descriptor_config
:
3746 # charm name is only required for ns charms
3747 get_charm_name
= True
3748 else: # other types as script are not supported
3751 for ee_item
in ee_list
:
3754 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3755 ee_item
.get("juju"), ee_item
.get("helm-chart")
3758 ee_descriptor_id
= ee_item
.get("id")
3759 if ee_item
.get("juju"):
3760 vca_name
= ee_item
["juju"].get("charm")
3762 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
3765 if ee_item
["juju"].get("charm") is not None
3768 if ee_item
["juju"].get("cloud") == "k8s":
3769 vca_type
= "k8s_proxy_charm"
3770 elif ee_item
["juju"].get("proxy") is False:
3771 vca_type
= "native_charm"
3772 elif ee_item
.get("helm-chart"):
3773 vca_name
= ee_item
["helm-chart"]
3774 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3777 vca_type
= "helm-v3"
3780 logging_text
+ "skipping non juju neither charm configuration"
3785 for vca_index
, vca_deployed
in enumerate(
3786 db_nsr
["_admin"]["deployed"]["VCA"]
3788 if not vca_deployed
:
3791 vca_deployed
.get("member-vnf-index") == member_vnf_index
3792 and vca_deployed
.get("vdu_id") == vdu_id
3793 and vca_deployed
.get("kdu_name") == kdu_name
3794 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3795 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3799 # not found, create one.
3801 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3804 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3806 target
+= "/kdu/{}".format(kdu_name
)
3808 "target_element": target
,
3809 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3810 "member-vnf-index": member_vnf_index
,
3812 "kdu_name": kdu_name
,
3813 "vdu_count_index": vdu_index
,
3814 "operational-status": "init", # TODO revise
3815 "detailed-status": "", # TODO revise
3816 "step": "initial-deploy", # TODO revise
3818 "vdu_name": vdu_name
,
3820 "ee_descriptor_id": ee_descriptor_id
,
3821 "charm_name": charm_name
,
3825 # create VCA and configurationStatus in db
3827 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3828 "configurationStatus.{}".format(vca_index
): dict(),
3830 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3832 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3834 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3835 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3836 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3839 task_n2vc
= asyncio
.ensure_future(
3840 self
.instantiate_N2VC(
3841 logging_text
=logging_text
,
3842 vca_index
=vca_index
,
3848 vdu_index
=vdu_index
,
3849 deploy_params
=deploy_params
,
3850 config_descriptor
=descriptor_config
,
3851 base_folder
=base_folder
,
3852 nslcmop_id
=nslcmop_id
,
3856 ee_config_descriptor
=ee_item
,
3859 self
.lcm_tasks
.register(
3863 "instantiate_N2VC-{}".format(vca_index
),
3866 task_instantiation_info
[
3868 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3869 member_vnf_index
or "", vdu_id
or ""
3873 def _create_nslcmop(nsr_id
, operation
, params
):
3875 Creates a ns-lcm-opp content to be stored at database.
3876 :param nsr_id: internal id of the instance
3877 :param operation: instantiate, terminate, scale, action, ...
3878 :param params: user parameters for the operation
3879 :return: dictionary following SOL005 format
3881 # Raise exception if invalid arguments
3882 if not (nsr_id
and operation
and params
):
3884 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3891 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3892 "operationState": "PROCESSING",
3893 "statusEnteredTime": now
,
3894 "nsInstanceId": nsr_id
,
3895 "lcmOperationType": operation
,
3897 "isAutomaticInvocation": False,
3898 "operationParams": params
,
3899 "isCancelPending": False,
3901 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3902 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3907 def _format_additional_params(self
, params
):
3908 params
= params
or {}
3909 for key
, value
in params
.items():
3910 if str(value
).startswith("!!yaml "):
3911 params
[key
] = yaml
.safe_load(value
[7:])
3914 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3915 primitive
= seq
.get("name")
3916 primitive_params
= {}
3918 "member_vnf_index": vnf_index
,
3919 "primitive": primitive
,
3920 "primitive_params": primitive_params
,
3923 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3927 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3928 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3929 if op
.get("operationState") == "COMPLETED":
3930 # b. Skip sub-operation
3931 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3932 return self
.SUBOPERATION_STATUS_SKIP
3934 # c. retry executing sub-operation
3935 # The sub-operation exists, and operationState != 'COMPLETED'
3936 # Update operationState = 'PROCESSING' to indicate a retry.
3937 operationState
= "PROCESSING"
3938 detailed_status
= "In progress"
3939 self
._update
_suboperation
_status
(
3940 db_nslcmop
, op_index
, operationState
, detailed_status
3942 # Return the sub-operation index
3943 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3944 # with arguments extracted from the sub-operation
3947 # Find a sub-operation where all keys in a matching dictionary must match
3948 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3949 def _find_suboperation(self
, db_nslcmop
, match
):
3950 if db_nslcmop
and match
:
3951 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3952 for i
, op
in enumerate(op_list
):
3953 if all(op
.get(k
) == match
[k
] for k
in match
):
3955 return self
.SUBOPERATION_STATUS_NOT_FOUND
3957 # Update status for a sub-operation given its index
3958 def _update_suboperation_status(
3959 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3961 # Update DB for HA tasks
3962 q_filter
= {"_id": db_nslcmop
["_id"]}
3964 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3965 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3968 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3971 # Add sub-operation, return the index of the added sub-operation
3972 # Optionally, set operationState, detailed-status, and operationType
3973 # Status and type are currently set for 'scale' sub-operations:
3974 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3975 # 'detailed-status' : status message
3976 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3977 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3978 def _add_suboperation(
3986 mapped_primitive_params
,
3987 operationState
=None,
3988 detailed_status
=None,
3991 RO_scaling_info
=None,
3994 return self
.SUBOPERATION_STATUS_NOT_FOUND
3995 # Get the "_admin.operations" list, if it exists
3996 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
3997 op_list
= db_nslcmop_admin
.get("operations")
3998 # Create or append to the "_admin.operations" list
4000 "member_vnf_index": vnf_index
,
4002 "vdu_count_index": vdu_count_index
,
4003 "primitive": primitive
,
4004 "primitive_params": mapped_primitive_params
,
4007 new_op
["operationState"] = operationState
4009 new_op
["detailed-status"] = detailed_status
4011 new_op
["lcmOperationType"] = operationType
4013 new_op
["RO_nsr_id"] = RO_nsr_id
4015 new_op
["RO_scaling_info"] = RO_scaling_info
4017 # No existing operations, create key 'operations' with current operation as first list element
4018 db_nslcmop_admin
.update({"operations": [new_op
]})
4019 op_list
= db_nslcmop_admin
.get("operations")
4021 # Existing operations, append operation to list
4022 op_list
.append(new_op
)
4024 db_nslcmop_update
= {"_admin.operations": op_list
}
4025 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4026 op_index
= len(op_list
) - 1
4029 # Helper methods for scale() sub-operations
4031 # pre-scale/post-scale:
4032 # Check for 3 different cases:
4033 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4034 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4035 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4036 def _check_or_add_scale_suboperation(
4040 vnf_config_primitive
,
4044 RO_scaling_info
=None,
4046 # Find this sub-operation
4047 if RO_nsr_id
and RO_scaling_info
:
4048 operationType
= "SCALE-RO"
4050 "member_vnf_index": vnf_index
,
4051 "RO_nsr_id": RO_nsr_id
,
4052 "RO_scaling_info": RO_scaling_info
,
4056 "member_vnf_index": vnf_index
,
4057 "primitive": vnf_config_primitive
,
4058 "primitive_params": primitive_params
,
4059 "lcmOperationType": operationType
,
4061 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4062 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4063 # a. New sub-operation
4064 # The sub-operation does not exist, add it.
4065 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4066 # The following parameters are set to None for all kind of scaling:
4068 vdu_count_index
= None
4070 if RO_nsr_id
and RO_scaling_info
:
4071 vnf_config_primitive
= None
4072 primitive_params
= None
4075 RO_scaling_info
= None
4076 # Initial status for sub-operation
4077 operationState
= "PROCESSING"
4078 detailed_status
= "In progress"
4079 # Add sub-operation for pre/post-scaling (zero or more operations)
4080 self
._add
_suboperation
(
4086 vnf_config_primitive
,
4094 return self
.SUBOPERATION_STATUS_NEW
4096 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4097 # or op_index (operationState != 'COMPLETED')
4098 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4100 # Function to return execution_environment id
4102 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4103 # TODO vdu_index_count
4104 for vca
in vca_deployed_list
:
4105 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4108 async def destroy_N2VC(
4116 exec_primitives
=True,
4121 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4122 :param logging_text:
4124 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4125 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4126 :param vca_index: index in the database _admin.deployed.VCA
4127 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4128 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4129 not executed properly
4130 :param scaling_in: True destroys the application, False destroys the model
4131 :return: None or exception
4136 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4137 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4141 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4143 # execute terminate_primitives
4145 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4146 config_descriptor
.get("terminate-config-primitive"),
4147 vca_deployed
.get("ee_descriptor_id"),
4149 vdu_id
= vca_deployed
.get("vdu_id")
4150 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4151 vdu_name
= vca_deployed
.get("vdu_name")
4152 vnf_index
= vca_deployed
.get("member-vnf-index")
4153 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4154 for seq
in terminate_primitives
:
4155 # For each sequence in list, get primitive and call _ns_execute_primitive()
4156 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4157 vnf_index
, seq
.get("name")
4159 self
.logger
.debug(logging_text
+ step
)
4160 # Create the primitive for each sequence, i.e. "primitive": "touch"
4161 primitive
= seq
.get("name")
4162 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4167 self
._add
_suboperation
(
4174 mapped_primitive_params
,
4176 # Sub-operations: Call _ns_execute_primitive() instead of action()
4178 result
, result_detail
= await self
._ns
_execute
_primitive
(
4179 vca_deployed
["ee_id"],
4181 mapped_primitive_params
,
4185 except LcmException
:
4186 # this happens when VCA is not deployed. In this case it is not needed to terminate
4188 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4189 if result
not in result_ok
:
4191 "terminate_primitive {} for vnf_member_index={} fails with "
4192 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4194 # set that this VCA do not need terminated
4195 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4199 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4202 # Delete Prometheus Jobs if any
4203 # This uses NSR_ID, so it will destroy any jobs under this index
4204 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4207 await self
.vca_map
[vca_type
].delete_execution_environment(
4208 vca_deployed
["ee_id"],
4209 scaling_in
=scaling_in
,
4214 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4215 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4216 namespace
= "." + db_nsr
["_id"]
4218 await self
.n2vc
.delete_namespace(
4219 namespace
=namespace
,
4220 total_timeout
=self
.timeout_charm_delete
,
4223 except N2VCNotFound
: # already deleted. Skip
4225 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4227 async def _terminate_RO(
4228 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4231 Terminates a deployment from RO
4232 :param logging_text:
4233 :param nsr_deployed: db_nsr._admin.deployed
4236 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4237 this method will update only the index 2, but it will write on database the concatenated content of the list
4242 ro_nsr_id
= ro_delete_action
= None
4243 if nsr_deployed
and nsr_deployed
.get("RO"):
4244 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4245 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4248 stage
[2] = "Deleting ns from VIM."
4249 db_nsr_update
["detailed-status"] = " ".join(stage
)
4250 self
._write
_op
_status
(nslcmop_id
, stage
)
4251 self
.logger
.debug(logging_text
+ stage
[2])
4252 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4253 self
._write
_op
_status
(nslcmop_id
, stage
)
4254 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4255 ro_delete_action
= desc
["action_id"]
4257 "_admin.deployed.RO.nsr_delete_action_id"
4258 ] = ro_delete_action
4259 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4260 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4261 if ro_delete_action
:
4262 # wait until NS is deleted from VIM
4263 stage
[2] = "Waiting ns deleted from VIM."
4264 detailed_status_old
= None
4268 + " RO_id={} ro_delete_action={}".format(
4269 ro_nsr_id
, ro_delete_action
4272 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4273 self
._write
_op
_status
(nslcmop_id
, stage
)
4275 delete_timeout
= 20 * 60 # 20 minutes
4276 while delete_timeout
> 0:
4277 desc
= await self
.RO
.show(
4279 item_id_name
=ro_nsr_id
,
4280 extra_item
="action",
4281 extra_item_id
=ro_delete_action
,
4285 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4287 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4288 if ns_status
== "ERROR":
4289 raise ROclient
.ROClientException(ns_status_info
)
4290 elif ns_status
== "BUILD":
4291 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4292 elif ns_status
== "ACTIVE":
4293 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4294 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4299 ), "ROclient.check_action_status returns unknown {}".format(
4302 if stage
[2] != detailed_status_old
:
4303 detailed_status_old
= stage
[2]
4304 db_nsr_update
["detailed-status"] = " ".join(stage
)
4305 self
._write
_op
_status
(nslcmop_id
, stage
)
4306 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4307 await asyncio
.sleep(5, loop
=self
.loop
)
4309 else: # delete_timeout <= 0:
4310 raise ROclient
.ROClientException(
4311 "Timeout waiting ns deleted from VIM"
4314 except Exception as e
:
4315 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4317 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4319 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4320 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4321 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4323 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4326 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4328 failed_detail
.append("delete conflict: {}".format(e
))
4331 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4334 failed_detail
.append("delete error: {}".format(e
))
4336 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4340 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4341 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4343 stage
[2] = "Deleting nsd from RO."
4344 db_nsr_update
["detailed-status"] = " ".join(stage
)
4345 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4346 self
._write
_op
_status
(nslcmop_id
, stage
)
4347 await self
.RO
.delete("nsd", ro_nsd_id
)
4349 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4351 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4352 except Exception as e
:
4354 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4356 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4358 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4361 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4363 failed_detail
.append(
4364 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4366 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4368 failed_detail
.append(
4369 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4371 self
.logger
.error(logging_text
+ failed_detail
[-1])
4373 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4374 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4375 if not vnf_deployed
or not vnf_deployed
["id"]:
4378 ro_vnfd_id
= vnf_deployed
["id"]
4381 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4382 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4384 db_nsr_update
["detailed-status"] = " ".join(stage
)
4385 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4386 self
._write
_op
_status
(nslcmop_id
, stage
)
4387 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4389 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4391 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4392 except Exception as e
:
4394 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4397 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4401 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4404 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4406 failed_detail
.append(
4407 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4409 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4411 failed_detail
.append(
4412 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4414 self
.logger
.error(logging_text
+ failed_detail
[-1])
4417 stage
[2] = "Error deleting from VIM"
4419 stage
[2] = "Deleted from VIM"
4420 db_nsr_update
["detailed-status"] = " ".join(stage
)
4421 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4422 self
._write
_op
_status
(nslcmop_id
, stage
)
4425 raise LcmException("; ".join(failed_detail
))
4427 async def terminate(self
, nsr_id
, nslcmop_id
):
4428 # Try to lock HA task here
4429 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4430 if not task_is_locked_by_me
:
4433 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4434 self
.logger
.debug(logging_text
+ "Enter")
4435 timeout_ns_terminate
= self
.timeout_ns_terminate
4438 operation_params
= None
4440 error_list
= [] # annotates all failed error messages
4441 db_nslcmop_update
= {}
4442 autoremove
= False # autoremove after terminated
4443 tasks_dict_info
= {}
4446 "Stage 1/3: Preparing task.",
4447 "Waiting for previous operations to terminate.",
4450 # ^ contains [stage, step, VIM-status]
4452 # wait for any previous tasks in process
4453 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4455 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4456 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4457 operation_params
= db_nslcmop
.get("operationParams") or {}
4458 if operation_params
.get("timeout_ns_terminate"):
4459 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4460 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4461 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4463 db_nsr_update
["operational-status"] = "terminating"
4464 db_nsr_update
["config-status"] = "terminating"
4465 self
._write
_ns
_status
(
4467 ns_state
="TERMINATING",
4468 current_operation
="TERMINATING",
4469 current_operation_id
=nslcmop_id
,
4470 other_update
=db_nsr_update
,
4472 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4473 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4474 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4477 stage
[1] = "Getting vnf descriptors from db."
4478 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4480 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4482 db_vnfds_from_id
= {}
4483 db_vnfds_from_member_index
= {}
4485 for vnfr
in db_vnfrs_list
:
4486 vnfd_id
= vnfr
["vnfd-id"]
4487 if vnfd_id
not in db_vnfds_from_id
:
4488 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4489 db_vnfds_from_id
[vnfd_id
] = vnfd
4490 db_vnfds_from_member_index
[
4491 vnfr
["member-vnf-index-ref"]
4492 ] = db_vnfds_from_id
[vnfd_id
]
4494 # Destroy individual execution environments when there are terminating primitives.
4495 # Rest of EE will be deleted at once
4496 # TODO - check before calling _destroy_N2VC
4497 # if not operation_params.get("skip_terminate_primitives"):#
4498 # or not vca.get("needed_terminate"):
4499 stage
[0] = "Stage 2/3 execute terminating primitives."
4500 self
.logger
.debug(logging_text
+ stage
[0])
4501 stage
[1] = "Looking execution environment that needs terminate."
4502 self
.logger
.debug(logging_text
+ stage
[1])
4504 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4505 config_descriptor
= None
4506 vca_member_vnf_index
= vca
.get("member-vnf-index")
4507 vca_id
= self
.get_vca_id(
4508 db_vnfrs_dict
.get(vca_member_vnf_index
)
4509 if vca_member_vnf_index
4513 if not vca
or not vca
.get("ee_id"):
4515 if not vca
.get("member-vnf-index"):
4517 config_descriptor
= db_nsr
.get("ns-configuration")
4518 elif vca
.get("vdu_id"):
4519 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4520 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4521 elif vca
.get("kdu_name"):
4522 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4523 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4525 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4526 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4527 vca_type
= vca
.get("type")
4528 exec_terminate_primitives
= not operation_params
.get(
4529 "skip_terminate_primitives"
4530 ) and vca
.get("needed_terminate")
4531 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4532 # pending native charms
4534 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4536 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4537 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4538 task
= asyncio
.ensure_future(
4546 exec_terminate_primitives
,
4550 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4552 # wait for pending tasks of terminate primitives
4556 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4558 error_list
= await self
._wait
_for
_tasks
(
4561 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4565 tasks_dict_info
.clear()
4567 return # raise LcmException("; ".join(error_list))
4569 # remove All execution environments at once
4570 stage
[0] = "Stage 3/3 delete all."
4572 if nsr_deployed
.get("VCA"):
4573 stage
[1] = "Deleting all execution environments."
4574 self
.logger
.debug(logging_text
+ stage
[1])
4575 vca_id
= self
.get_vca_id({}, db_nsr
)
4576 task_delete_ee
= asyncio
.ensure_future(
4578 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4579 timeout
=self
.timeout_charm_delete
,
4582 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4583 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4585 # Delete from k8scluster
4586 stage
[1] = "Deleting KDUs."
4587 self
.logger
.debug(logging_text
+ stage
[1])
4588 # print(nsr_deployed)
4589 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4590 if not kdu
or not kdu
.get("kdu-instance"):
4592 kdu_instance
= kdu
.get("kdu-instance")
4593 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4594 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4595 vca_id
= self
.get_vca_id({}, db_nsr
)
4596 task_delete_kdu_instance
= asyncio
.ensure_future(
4597 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4598 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4599 kdu_instance
=kdu_instance
,
4601 namespace
=kdu
.get("namespace"),
4607 + "Unknown k8s deployment type {}".format(
4608 kdu
.get("k8scluster-type")
4613 task_delete_kdu_instance
4614 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4617 stage
[1] = "Deleting ns from VIM."
4619 task_delete_ro
= asyncio
.ensure_future(
4620 self
._terminate
_ng
_ro
(
4621 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4625 task_delete_ro
= asyncio
.ensure_future(
4627 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4630 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4632 # rest of staff will be done at finally
4635 ROclient
.ROClientException
,
4640 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4642 except asyncio
.CancelledError
:
4644 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4646 exc
= "Operation was cancelled"
4647 except Exception as e
:
4648 exc
= traceback
.format_exc()
4649 self
.logger
.critical(
4650 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4655 error_list
.append(str(exc
))
4657 # wait for pending tasks
4659 stage
[1] = "Waiting for terminate pending tasks."
4660 self
.logger
.debug(logging_text
+ stage
[1])
4661 error_list
+= await self
._wait
_for
_tasks
(
4664 timeout_ns_terminate
,
4668 stage
[1] = stage
[2] = ""
4669 except asyncio
.CancelledError
:
4670 error_list
.append("Cancelled")
4671 # TODO cancell all tasks
4672 except Exception as exc
:
4673 error_list
.append(str(exc
))
4674 # update status at database
4676 error_detail
= "; ".join(error_list
)
4677 # self.logger.error(logging_text + error_detail)
4678 error_description_nslcmop
= "{} Detail: {}".format(
4679 stage
[0], error_detail
4681 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4682 nslcmop_id
, stage
[0]
4685 db_nsr_update
["operational-status"] = "failed"
4686 db_nsr_update
["detailed-status"] = (
4687 error_description_nsr
+ " Detail: " + error_detail
4689 db_nslcmop_update
["detailed-status"] = error_detail
4690 nslcmop_operation_state
= "FAILED"
4694 error_description_nsr
= error_description_nslcmop
= None
4695 ns_state
= "NOT_INSTANTIATED"
4696 db_nsr_update
["operational-status"] = "terminated"
4697 db_nsr_update
["detailed-status"] = "Done"
4698 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4699 db_nslcmop_update
["detailed-status"] = "Done"
4700 nslcmop_operation_state
= "COMPLETED"
4703 self
._write
_ns
_status
(
4706 current_operation
="IDLE",
4707 current_operation_id
=None,
4708 error_description
=error_description_nsr
,
4709 error_detail
=error_detail
,
4710 other_update
=db_nsr_update
,
4712 self
._write
_op
_status
(
4715 error_message
=error_description_nslcmop
,
4716 operation_state
=nslcmop_operation_state
,
4717 other_update
=db_nslcmop_update
,
4719 if ns_state
== "NOT_INSTANTIATED":
4723 {"nsr-id-ref": nsr_id
},
4724 {"_admin.nsState": "NOT_INSTANTIATED"},
4726 except DbException
as e
:
4729 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4733 if operation_params
:
4734 autoremove
= operation_params
.get("autoremove", False)
4735 if nslcmop_operation_state
:
4737 await self
.msg
.aiowrite(
4742 "nslcmop_id": nslcmop_id
,
4743 "operationState": nslcmop_operation_state
,
4744 "autoremove": autoremove
,
4748 except Exception as e
:
4750 logging_text
+ "kafka_write notification Exception {}".format(e
)
4753 self
.logger
.debug(logging_text
+ "Exit")
4754 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4756 async def _wait_for_tasks(
4757 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4760 error_detail_list
= []
4762 pending_tasks
= list(created_tasks_info
.keys())
4763 num_tasks
= len(pending_tasks
)
4765 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4766 self
._write
_op
_status
(nslcmop_id
, stage
)
4767 while pending_tasks
:
4769 _timeout
= timeout
+ time_start
- time()
4770 done
, pending_tasks
= await asyncio
.wait(
4771 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4773 num_done
+= len(done
)
4774 if not done
: # Timeout
4775 for task
in pending_tasks
:
4776 new_error
= created_tasks_info
[task
] + ": Timeout"
4777 error_detail_list
.append(new_error
)
4778 error_list
.append(new_error
)
4781 if task
.cancelled():
4784 exc
= task
.exception()
4786 if isinstance(exc
, asyncio
.TimeoutError
):
4788 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4789 error_list
.append(created_tasks_info
[task
])
4790 error_detail_list
.append(new_error
)
4797 ROclient
.ROClientException
,
4803 self
.logger
.error(logging_text
+ new_error
)
4805 exc_traceback
= "".join(
4806 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4810 + created_tasks_info
[task
]
4816 logging_text
+ created_tasks_info
[task
] + ": Done"
4818 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4820 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4821 if nsr_id
: # update also nsr
4826 "errorDescription": "Error at: " + ", ".join(error_list
),
4827 "errorDetail": ". ".join(error_detail_list
),
4830 self
._write
_op
_status
(nslcmop_id
, stage
)
4831 return error_detail_list
4834 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4836 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4837 The default-value is used. If it is between < > it look for a value at instantiation_params
4838 :param primitive_desc: portion of VNFD/NSD that describes primitive
4839 :param params: Params provided by user
4840 :param instantiation_params: Instantiation params provided by user
4841 :return: a dictionary with the calculated params
4843 calculated_params
= {}
4844 for parameter
in primitive_desc
.get("parameter", ()):
4845 param_name
= parameter
["name"]
4846 if param_name
in params
:
4847 calculated_params
[param_name
] = params
[param_name
]
4848 elif "default-value" in parameter
or "value" in parameter
:
4849 if "value" in parameter
:
4850 calculated_params
[param_name
] = parameter
["value"]
4852 calculated_params
[param_name
] = parameter
["default-value"]
4854 isinstance(calculated_params
[param_name
], str)
4855 and calculated_params
[param_name
].startswith("<")
4856 and calculated_params
[param_name
].endswith(">")
4858 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4859 calculated_params
[param_name
] = instantiation_params
[
4860 calculated_params
[param_name
][1:-1]
4864 "Parameter {} needed to execute primitive {} not provided".format(
4865 calculated_params
[param_name
], primitive_desc
["name"]
4870 "Parameter {} needed to execute primitive {} not provided".format(
4871 param_name
, primitive_desc
["name"]
4875 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4876 calculated_params
[param_name
] = yaml
.safe_dump(
4877 calculated_params
[param_name
], default_flow_style
=True, width
=256
4879 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4881 ].startswith("!!yaml "):
4882 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4883 if parameter
.get("data-type") == "INTEGER":
4885 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4886 except ValueError: # error converting string to int
4888 "Parameter {} of primitive {} must be integer".format(
4889 param_name
, primitive_desc
["name"]
4892 elif parameter
.get("data-type") == "BOOLEAN":
4893 calculated_params
[param_name
] = not (
4894 (str(calculated_params
[param_name
])).lower() == "false"
4897 # add always ns_config_info if primitive name is config
4898 if primitive_desc
["name"] == "config":
4899 if "ns_config_info" in instantiation_params
:
4900 calculated_params
["ns_config_info"] = instantiation_params
[
4903 return calculated_params
4905 def _look_for_deployed_vca(
4912 ee_descriptor_id
=None,
4914 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4915 for vca
in deployed_vca
:
4918 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4921 vdu_count_index
is not None
4922 and vdu_count_index
!= vca
["vdu_count_index"]
4925 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4927 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4931 # vca_deployed not found
4933 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4934 " is not deployed".format(
4943 ee_id
= vca
.get("ee_id")
4945 "type", "lxc_proxy_charm"
4946 ) # default value for backward compatibility - proxy charm
4949 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4950 "execution environment".format(
4951 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4954 return ee_id
, vca_type
4956 async def _ns_execute_primitive(
4962 retries_interval
=30,
4969 if primitive
== "config":
4970 primitive_params
= {"params": primitive_params
}
4972 vca_type
= vca_type
or "lxc_proxy_charm"
4976 output
= await asyncio
.wait_for(
4977 self
.vca_map
[vca_type
].exec_primitive(
4979 primitive_name
=primitive
,
4980 params_dict
=primitive_params
,
4981 progress_timeout
=self
.timeout_progress_primitive
,
4982 total_timeout
=self
.timeout_primitive
,
4987 timeout
=timeout
or self
.timeout_primitive
,
4991 except asyncio
.CancelledError
:
4993 except Exception as e
: # asyncio.TimeoutError
4994 if isinstance(e
, asyncio
.TimeoutError
):
4999 "Error executing action {} on {} -> {}".format(
5004 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5006 return "FAILED", str(e
)
5008 return "COMPLETED", output
5010 except (LcmException
, asyncio
.CancelledError
):
5012 except Exception as e
:
5013 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5015 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5017 Updating the vca_status with latest juju information in nsrs record
5018 :param: nsr_id: Id of the nsr
5019 :param: nslcmop_id: Id of the nslcmop
5023 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5024 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5025 vca_id
= self
.get_vca_id({}, db_nsr
)
5026 if db_nsr
["_admin"]["deployed"]["K8s"]:
5027 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5028 cluster_uuid
, kdu_instance
, cluster_type
= (
5029 k8s
["k8scluster-uuid"],
5030 k8s
["kdu-instance"],
5031 k8s
["k8scluster-type"],
5033 await self
._on
_update
_k
8s
_db
(
5034 cluster_uuid
=cluster_uuid
,
5035 kdu_instance
=kdu_instance
,
5036 filter={"_id": nsr_id
},
5038 cluster_type
=cluster_type
,
5041 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5042 table
, filter = "nsrs", {"_id": nsr_id
}
5043 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5044 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5046 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5047 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5049 async def action(self
, nsr_id
, nslcmop_id
):
5050 # Try to lock HA task here
5051 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5052 if not task_is_locked_by_me
:
5055 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5056 self
.logger
.debug(logging_text
+ "Enter")
5057 # get all needed from database
5061 db_nslcmop_update
= {}
5062 nslcmop_operation_state
= None
5063 error_description_nslcmop
= None
5066 # wait for any previous tasks in process
5067 step
= "Waiting for previous operations to terminate"
5068 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5070 self
._write
_ns
_status
(
5073 current_operation
="RUNNING ACTION",
5074 current_operation_id
=nslcmop_id
,
5077 step
= "Getting information from database"
5078 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5079 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5080 if db_nslcmop
["operationParams"].get("primitive_params"):
5081 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5082 db_nslcmop
["operationParams"]["primitive_params"]
5085 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5086 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5087 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5088 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5089 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5090 primitive
= db_nslcmop
["operationParams"]["primitive"]
5091 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5092 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5093 "timeout_ns_action", self
.timeout_primitive
5097 step
= "Getting vnfr from database"
5098 db_vnfr
= self
.db
.get_one(
5099 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5101 if db_vnfr
.get("kdur"):
5103 for kdur
in db_vnfr
["kdur"]:
5104 if kdur
.get("additionalParams"):
5105 kdur
["additionalParams"] = json
.loads(
5106 kdur
["additionalParams"]
5108 kdur_list
.append(kdur
)
5109 db_vnfr
["kdur"] = kdur_list
5110 step
= "Getting vnfd from database"
5111 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5113 # Sync filesystem before running a primitive
5114 self
.fs
.sync(db_vnfr
["vnfd-id"])
5116 step
= "Getting nsd from database"
5117 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5119 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5120 # for backward compatibility
5121 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5122 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5123 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5124 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5126 # look for primitive
5127 config_primitive_desc
= descriptor_configuration
= None
5129 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5131 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5133 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5135 descriptor_configuration
= db_nsd
.get("ns-configuration")
5137 if descriptor_configuration
and descriptor_configuration
.get(
5140 for config_primitive
in descriptor_configuration
["config-primitive"]:
5141 if config_primitive
["name"] == primitive
:
5142 config_primitive_desc
= config_primitive
5145 if not config_primitive_desc
:
5146 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5148 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5152 primitive_name
= primitive
5153 ee_descriptor_id
= None
5155 primitive_name
= config_primitive_desc
.get(
5156 "execution-environment-primitive", primitive
5158 ee_descriptor_id
= config_primitive_desc
.get(
5159 "execution-environment-ref"
5165 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5167 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5170 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5172 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5174 desc_params
= parse_yaml_strings(
5175 db_vnfr
.get("additionalParamsForVnf")
5178 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5179 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5180 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5182 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5183 actions
.add(primitive
["name"])
5184 for primitive
in kdu_configuration
.get("config-primitive", []):
5185 actions
.add(primitive
["name"])
5187 nsr_deployed
["K8s"],
5188 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5189 and kdu
["member-vnf-index"] == vnf_index
,
5193 if primitive_name
in actions
5194 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5198 # TODO check if ns is in a proper status
5200 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5202 # kdur and desc_params already set from before
5203 if primitive_params
:
5204 desc_params
.update(primitive_params
)
5205 # TODO Check if we will need something at vnf level
5206 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5208 kdu_name
== kdu
["kdu-name"]
5209 and kdu
["member-vnf-index"] == vnf_index
5214 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5217 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5218 msg
= "unknown k8scluster-type '{}'".format(
5219 kdu
.get("k8scluster-type")
5221 raise LcmException(msg
)
5224 "collection": "nsrs",
5225 "filter": {"_id": nsr_id
},
5226 "path": "_admin.deployed.K8s.{}".format(index
),
5230 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5232 step
= "Executing kdu {}".format(primitive_name
)
5233 if primitive_name
== "upgrade":
5234 if desc_params
.get("kdu_model"):
5235 kdu_model
= desc_params
.get("kdu_model")
5236 del desc_params
["kdu_model"]
5238 kdu_model
= kdu
.get("kdu-model")
5239 parts
= kdu_model
.split(sep
=":")
5241 kdu_model
= parts
[0]
5243 detailed_status
= await asyncio
.wait_for(
5244 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5245 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5246 kdu_instance
=kdu
.get("kdu-instance"),
5248 kdu_model
=kdu_model
,
5251 timeout
=timeout_ns_action
,
5253 timeout
=timeout_ns_action
+ 10,
5256 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5258 elif primitive_name
== "rollback":
5259 detailed_status
= await asyncio
.wait_for(
5260 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5261 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5262 kdu_instance
=kdu
.get("kdu-instance"),
5265 timeout
=timeout_ns_action
,
5267 elif primitive_name
== "status":
5268 detailed_status
= await asyncio
.wait_for(
5269 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5270 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5271 kdu_instance
=kdu
.get("kdu-instance"),
5274 timeout
=timeout_ns_action
,
5277 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5278 kdu
["kdu-name"], nsr_id
5280 params
= self
._map
_primitive
_params
(
5281 config_primitive_desc
, primitive_params
, desc_params
5284 detailed_status
= await asyncio
.wait_for(
5285 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5286 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5287 kdu_instance
=kdu_instance
,
5288 primitive_name
=primitive_name
,
5291 timeout
=timeout_ns_action
,
5294 timeout
=timeout_ns_action
,
5298 nslcmop_operation_state
= "COMPLETED"
5300 detailed_status
= ""
5301 nslcmop_operation_state
= "FAILED"
5303 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5304 nsr_deployed
["VCA"],
5305 member_vnf_index
=vnf_index
,
5307 vdu_count_index
=vdu_count_index
,
5308 ee_descriptor_id
=ee_descriptor_id
,
5310 for vca_index
, vca_deployed
in enumerate(
5311 db_nsr
["_admin"]["deployed"]["VCA"]
5313 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5315 "collection": "nsrs",
5316 "filter": {"_id": nsr_id
},
5317 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5321 nslcmop_operation_state
,
5323 ) = await self
._ns
_execute
_primitive
(
5325 primitive
=primitive_name
,
5326 primitive_params
=self
._map
_primitive
_params
(
5327 config_primitive_desc
, primitive_params
, desc_params
5329 timeout
=timeout_ns_action
,
5335 db_nslcmop_update
["detailed-status"] = detailed_status
5336 error_description_nslcmop
= (
5337 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5341 + " task Done with result {} {}".format(
5342 nslcmop_operation_state
, detailed_status
5345 return # database update is called inside finally
5347 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5348 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5350 except asyncio
.CancelledError
:
5352 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5354 exc
= "Operation was cancelled"
5355 except asyncio
.TimeoutError
:
5356 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5358 except Exception as e
:
5359 exc
= traceback
.format_exc()
5360 self
.logger
.critical(
5361 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5370 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5371 nslcmop_operation_state
= "FAILED"
5373 self
._write
_ns
_status
(
5377 ], # TODO check if degraded. For the moment use previous status
5378 current_operation
="IDLE",
5379 current_operation_id
=None,
5380 # error_description=error_description_nsr,
5381 # error_detail=error_detail,
5382 other_update
=db_nsr_update
,
5385 self
._write
_op
_status
(
5388 error_message
=error_description_nslcmop
,
5389 operation_state
=nslcmop_operation_state
,
5390 other_update
=db_nslcmop_update
,
5393 if nslcmop_operation_state
:
5395 await self
.msg
.aiowrite(
5400 "nslcmop_id": nslcmop_id
,
5401 "operationState": nslcmop_operation_state
,
5405 except Exception as e
:
5407 logging_text
+ "kafka_write notification Exception {}".format(e
)
5409 self
.logger
.debug(logging_text
+ "Exit")
5410 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5411 return nslcmop_operation_state
, detailed_status
5413 async def terminate_vdus(
5414 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5416 """This method terminates VDUs
5419 db_vnfr: VNF instance record
5420 member_vnf_index: VNF index to identify the VDUs to be removed
5421 db_nsr: NS instance record
5422 update_db_nslcmops: Nslcmop update record
5424 vca_scaling_info
= []
5425 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5426 scaling_info
["scaling_direction"] = "IN"
5427 scaling_info
["vdu-delete"] = {}
5428 scaling_info
["kdu-delete"] = {}
5429 db_vdur
= db_vnfr
.get("vdur")
5430 vdur_list
= copy(db_vdur
)
5432 for index
, vdu
in enumerate(vdur_list
):
5433 vca_scaling_info
.append(
5435 "osm_vdu_id": vdu
["vdu-id-ref"],
5436 "member-vnf-index": member_vnf_index
,
5438 "vdu_index": count_index
,
5440 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5441 scaling_info
["vdu"].append(
5443 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5444 "vdu_id": vdu
["vdu-id-ref"],
5447 for interface
in vdu
["interfaces"]:
5448 scaling_info
["vdu"][index
]["interface"].append(
5450 "name": interface
["name"],
5451 "ip_address": interface
["ip-address"],
5452 "mac_address": interface
.get("mac-address"),
5454 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5455 stage
[2] = "Terminating VDUs"
5456 if scaling_info
.get("vdu-delete"):
5457 # scale_process = "RO"
5458 if self
.ro_config
.get("ng"):
5459 await self
._scale
_ng
_ro
(
5460 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5463 async def remove_vnf(
5464 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5466 """This method is to Remove VNF instances from NS.
5469 nsr_id: NS instance id
5470 nslcmop_id: nslcmop id of update
5471 vnf_instance_id: id of the VNF instance to be removed
5474 result: (str, str) COMPLETED/FAILED, details
5478 logging_text
= "Task ns={} update ".format(nsr_id
)
5479 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5480 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5481 if check_vnfr_count
> 1:
5482 stage
= ["", "", ""]
5483 step
= "Getting nslcmop from database"
5484 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5485 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5486 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5487 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5488 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5489 """ db_vnfr = self.db.get_one(
5490 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5492 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5493 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5495 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5496 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5497 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5498 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5499 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5500 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5501 return "COMPLETED", "Done"
5503 step
= "Terminate VNF Failed with"
5504 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5506 except (LcmException
, asyncio
.CancelledError
):
5508 except Exception as e
:
5509 self
.logger
.debug("Error removing VNF {}".format(e
))
5510 return "FAILED", "Error removing VNF {}".format(e
)
5512 async def _ns_redeploy_vnf(
5513 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5515 """This method updates and redeploys VNF instances
5518 nsr_id: NS instance id
5519 nslcmop_id: nslcmop id
5520 db_vnfd: VNF descriptor
5521 db_vnfr: VNF instance record
5522 db_nsr: NS instance record
5525 result: (str, str) COMPLETED/FAILED, details
5529 stage
= ["", "", ""]
5530 logging_text
= "Task ns={} update ".format(nsr_id
)
5531 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5532 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5534 # Terminate old VNF resources
5535 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5536 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5538 # old_vnfd_id = db_vnfr["vnfd-id"]
5539 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5540 new_db_vnfd
= db_vnfd
5541 # new_vnfd_ref = new_db_vnfd["id"]
5542 # new_vnfd_id = vnfd_id
5546 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5548 "name": cp
.get("id"),
5549 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5550 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5553 new_vnfr_cp
.append(vnf_cp
)
5554 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5555 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5556 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5557 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5558 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5559 updated_db_vnfr
= self
.db
.get_one(
5560 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5563 # Instantiate new VNF resources
5564 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5565 vca_scaling_info
= []
5566 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5567 scaling_info
["scaling_direction"] = "OUT"
5568 scaling_info
["vdu-create"] = {}
5569 scaling_info
["kdu-create"] = {}
5570 vdud_instantiate_list
= db_vnfd
["vdu"]
5571 for index
, vdud
in enumerate(vdud_instantiate_list
):
5572 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5576 additional_params
= (
5577 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5580 cloud_init_list
= []
5582 # TODO Information of its own ip is not available because db_vnfr is not updated.
5583 additional_params
["OSM"] = get_osm_params(
5584 updated_db_vnfr
, vdud
["id"], 1
5586 cloud_init_list
.append(
5587 self
._parse
_cloud
_init
(
5594 vca_scaling_info
.append(
5596 "osm_vdu_id": vdud
["id"],
5597 "member-vnf-index": member_vnf_index
,
5599 "vdu_index": count_index
,
5602 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5603 if self
.ro_config
.get("ng"):
5605 "New Resources to be deployed: {}".format(scaling_info
))
5606 await self
._scale
_ng
_ro
(
5607 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5609 return "COMPLETED", "Done"
5610 except (LcmException
, asyncio
.CancelledError
):
5612 except Exception as e
:
5613 self
.logger
.debug("Error updating VNF {}".format(e
))
5614 return "FAILED", "Error updating VNF {}".format(e
)
5616 async def _ns_charm_upgrade(
5622 timeout
: float = None,
5624 """This method upgrade charms in VNF instances
5627 ee_id: Execution environment id
5628 path: Local path to the charm
5630 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5631 timeout: (Float) Timeout for the ns update operation
5634 result: (str, str) COMPLETED/FAILED, details
5637 charm_type
= charm_type
or "lxc_proxy_charm"
5638 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5642 charm_type
=charm_type
,
5643 timeout
=timeout
or self
.timeout_ns_update
,
5647 return "COMPLETED", output
5649 except (LcmException
, asyncio
.CancelledError
):
5652 except Exception as e
:
5654 self
.logger
.debug("Error upgrading charm {}".format(path
))
5656 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5658 async def update(self
, nsr_id
, nslcmop_id
):
5659 """Update NS according to different update types
5661 This method performs upgrade of VNF instances then updates the revision
5662 number in VNF record
5665 nsr_id: Network service will be updated
5666 nslcmop_id: ns lcm operation id
5669 It may raise DbException, LcmException, N2VCException, K8sException
5672 # Try to lock HA task here
5673 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5674 if not task_is_locked_by_me
:
5677 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5678 self
.logger
.debug(logging_text
+ "Enter")
5680 # Set the required variables to be filled up later
5682 db_nslcmop_update
= {}
5684 nslcmop_operation_state
= None
5686 error_description_nslcmop
= ""
5688 change_type
= "updated"
5689 detailed_status
= ""
5692 # wait for any previous tasks in process
5693 step
= "Waiting for previous operations to terminate"
5694 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5695 self
._write
_ns
_status
(
5698 current_operation
="UPDATING",
5699 current_operation_id
=nslcmop_id
,
5702 step
= "Getting nslcmop from database"
5703 db_nslcmop
= self
.db
.get_one(
5704 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5706 update_type
= db_nslcmop
["operationParams"]["updateType"]
5708 step
= "Getting nsr from database"
5709 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5710 old_operational_status
= db_nsr
["operational-status"]
5711 db_nsr_update
["operational-status"] = "updating"
5712 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5713 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5715 if update_type
== "CHANGE_VNFPKG":
5717 # Get the input parameters given through update request
5718 vnf_instance_id
= db_nslcmop
["operationParams"][
5719 "changeVnfPackageData"
5720 ].get("vnfInstanceId")
5722 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5725 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5727 step
= "Getting vnfr from database"
5728 db_vnfr
= self
.db
.get_one(
5729 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5732 step
= "Getting vnfds from database"
5734 latest_vnfd
= self
.db
.get_one(
5735 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5737 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5740 current_vnf_revision
= db_vnfr
.get("revision", 1)
5741 current_vnfd
= self
.db
.get_one(
5743 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5744 fail_on_empty
=False,
5746 # Charm artifact paths will be filled up later
5748 current_charm_artifact_path
,
5749 target_charm_artifact_path
,
5750 charm_artifact_paths
,
5753 step
= "Checking if revision has changed in VNFD"
5754 if current_vnf_revision
!= latest_vnfd_revision
:
5756 change_type
= "policy_updated"
5758 # There is new revision of VNFD, update operation is required
5759 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5760 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5762 step
= "Removing the VNFD packages if they exist in the local path"
5763 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5764 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5766 step
= "Get the VNFD packages from FSMongo"
5767 self
.fs
.sync(from_path
=latest_vnfd_path
)
5768 self
.fs
.sync(from_path
=current_vnfd_path
)
5771 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5773 base_folder
= latest_vnfd
["_admin"]["storage"]
5775 for charm_index
, charm_deployed
in enumerate(
5776 get_iterable(nsr_deployed
, "VCA")
5778 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5780 # Getting charm-id and charm-type
5781 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5782 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5783 charm_type
= charm_deployed
.get("type")
5786 ee_id
= charm_deployed
.get("ee_id")
5788 step
= "Getting descriptor config"
5789 descriptor_config
= get_configuration(
5790 current_vnfd
, current_vnfd
["id"]
5793 if "execution-environment-list" in descriptor_config
:
5794 ee_list
= descriptor_config
.get(
5795 "execution-environment-list", []
5800 # There could be several charm used in the same VNF
5801 for ee_item
in ee_list
:
5802 if ee_item
.get("juju"):
5804 step
= "Getting charm name"
5805 charm_name
= ee_item
["juju"].get("charm")
5807 step
= "Setting Charm artifact paths"
5808 current_charm_artifact_path
.append(
5809 get_charm_artifact_path(
5813 current_vnf_revision
,
5816 target_charm_artifact_path
.append(
5817 get_charm_artifact_path(
5821 latest_vnfd_revision
,
5825 charm_artifact_paths
= zip(
5826 current_charm_artifact_path
, target_charm_artifact_path
5829 step
= "Checking if software version has changed in VNFD"
5830 if find_software_version(current_vnfd
) != find_software_version(
5834 step
= "Checking if existing VNF has charm"
5835 for current_charm_path
, target_charm_path
in list(
5836 charm_artifact_paths
5838 if current_charm_path
:
5840 "Software version change is not supported as VNF instance {} has charm.".format(
5845 # There is no change in the charm package, then redeploy the VNF
5846 # based on new descriptor
5847 step
= "Redeploying VNF"
5848 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5852 ) = await self
._ns
_redeploy
_vnf
(
5859 if result
== "FAILED":
5860 nslcmop_operation_state
= result
5861 error_description_nslcmop
= detailed_status
5862 db_nslcmop_update
["detailed-status"] = detailed_status
5865 + " step {} Done with result {} {}".format(
5866 step
, nslcmop_operation_state
, detailed_status
5871 step
= "Checking if any charm package has changed or not"
5872 for current_charm_path
, target_charm_path
in list(
5873 charm_artifact_paths
5877 and target_charm_path
5878 and self
.check_charm_hash_changed(
5879 current_charm_path
, target_charm_path
5883 step
= "Checking whether VNF uses juju bundle"
5884 if check_juju_bundle_existence(current_vnfd
):
5887 "Charm upgrade is not supported for the instance which"
5888 " uses juju-bundle: {}".format(
5889 check_juju_bundle_existence(current_vnfd
)
5893 step
= "Upgrading Charm"
5897 ) = await self
._ns
_charm
_upgrade
(
5900 charm_type
=charm_type
,
5901 path
=self
.fs
.path
+ target_charm_path
,
5902 timeout
=timeout_seconds
,
5905 if result
== "FAILED":
5906 nslcmop_operation_state
= result
5907 error_description_nslcmop
= detailed_status
5909 db_nslcmop_update
["detailed-status"] = detailed_status
5912 + " step {} Done with result {} {}".format(
5913 step
, nslcmop_operation_state
, detailed_status
5917 step
= "Updating policies"
5918 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5919 result
= "COMPLETED"
5920 detailed_status
= "Done"
5921 db_nslcmop_update
["detailed-status"] = "Done"
5923 # If nslcmop_operation_state is None, so any operation is not failed.
5924 if not nslcmop_operation_state
:
5925 nslcmop_operation_state
= "COMPLETED"
5927 # If update CHANGE_VNFPKG nslcmop_operation is successful
5928 # vnf revision need to be updated
5929 vnfr_update
["revision"] = latest_vnfd_revision
5930 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5934 + " task Done with result {} {}".format(
5935 nslcmop_operation_state
, detailed_status
5938 elif update_type
== "REMOVE_VNF":
5939 # This part is included in https://osm.etsi.org/gerrit/11876
5940 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5941 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5942 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5943 step
= "Removing VNF"
5944 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5945 if result
== "FAILED":
5946 nslcmop_operation_state
= result
5947 error_description_nslcmop
= detailed_status
5948 db_nslcmop_update
["detailed-status"] = detailed_status
5949 change_type
= "vnf_terminated"
5950 if not nslcmop_operation_state
:
5951 nslcmop_operation_state
= "COMPLETED"
5954 + " task Done with result {} {}".format(
5955 nslcmop_operation_state
, detailed_status
5959 elif update_type
== "OPERATE_VNF":
5960 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5961 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5962 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5963 (result
, detailed_status
) = await self
.rebuild_start_stop(
5964 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5966 if result
== "FAILED":
5967 nslcmop_operation_state
= result
5968 error_description_nslcmop
= detailed_status
5969 db_nslcmop_update
["detailed-status"] = detailed_status
5970 if not nslcmop_operation_state
:
5971 nslcmop_operation_state
= "COMPLETED"
5974 + " task Done with result {} {}".format(
5975 nslcmop_operation_state
, detailed_status
5979 # If nslcmop_operation_state is None, so any operation is not failed.
5980 # All operations are executed in overall.
5981 if not nslcmop_operation_state
:
5982 nslcmop_operation_state
= "COMPLETED"
5983 db_nsr_update
["operational-status"] = old_operational_status
5985 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5986 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5988 except asyncio
.CancelledError
:
5990 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5992 exc
= "Operation was cancelled"
5993 except asyncio
.TimeoutError
:
5994 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5996 except Exception as e
:
5997 exc
= traceback
.format_exc()
5998 self
.logger
.critical(
5999 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6008 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6009 nslcmop_operation_state
= "FAILED"
6010 db_nsr_update
["operational-status"] = old_operational_status
6012 self
._write
_ns
_status
(
6014 ns_state
=db_nsr
["nsState"],
6015 current_operation
="IDLE",
6016 current_operation_id
=None,
6017 other_update
=db_nsr_update
,
6020 self
._write
_op
_status
(
6023 error_message
=error_description_nslcmop
,
6024 operation_state
=nslcmop_operation_state
,
6025 other_update
=db_nslcmop_update
,
6028 if nslcmop_operation_state
:
6032 "nslcmop_id": nslcmop_id
,
6033 "operationState": nslcmop_operation_state
,
6035 if change_type
in ("vnf_terminated", "policy_updated"):
6036 msg
.update({"vnf_member_index": member_vnf_index
})
6037 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6038 except Exception as e
:
6040 logging_text
+ "kafka_write notification Exception {}".format(e
)
6042 self
.logger
.debug(logging_text
+ "Exit")
6043 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6044 return nslcmop_operation_state
, detailed_status
6046 async def scale(self
, nsr_id
, nslcmop_id
):
6047 # Try to lock HA task here
6048 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6049 if not task_is_locked_by_me
:
6052 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6053 stage
= ["", "", ""]
6054 tasks_dict_info
= {}
6055 # ^ stage, step, VIM progress
6056 self
.logger
.debug(logging_text
+ "Enter")
6057 # get all needed from database
6059 db_nslcmop_update
= {}
6062 # in case of error, indicates what part of scale was failed to put nsr at error status
6063 scale_process
= None
6064 old_operational_status
= ""
6065 old_config_status
= ""
6068 # wait for any previous tasks in process
6069 step
= "Waiting for previous operations to terminate"
6070 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6071 self
._write
_ns
_status
(
6074 current_operation
="SCALING",
6075 current_operation_id
=nslcmop_id
,
6078 step
= "Getting nslcmop from database"
6080 step
+ " after having waited for previous tasks to be completed"
6082 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6084 step
= "Getting nsr from database"
6085 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6086 old_operational_status
= db_nsr
["operational-status"]
6087 old_config_status
= db_nsr
["config-status"]
6089 step
= "Parsing scaling parameters"
6090 db_nsr_update
["operational-status"] = "scaling"
6091 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6092 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6094 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6096 ]["member-vnf-index"]
6097 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6099 ]["scaling-group-descriptor"]
6100 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6101 # for backward compatibility
6102 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6103 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6104 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6105 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6107 step
= "Getting vnfr from database"
6108 db_vnfr
= self
.db
.get_one(
6109 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6112 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6114 step
= "Getting vnfd from database"
6115 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6117 base_folder
= db_vnfd
["_admin"]["storage"]
6119 step
= "Getting scaling-group-descriptor"
6120 scaling_descriptor
= find_in_list(
6121 get_scaling_aspect(db_vnfd
),
6122 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6124 if not scaling_descriptor
:
6126 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6127 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6130 step
= "Sending scale order to VIM"
6131 # TODO check if ns is in a proper status
6133 if not db_nsr
["_admin"].get("scaling-group"):
6138 "_admin.scaling-group": [
6139 {"name": scaling_group
, "nb-scale-op": 0}
6143 admin_scale_index
= 0
6145 for admin_scale_index
, admin_scale_info
in enumerate(
6146 db_nsr
["_admin"]["scaling-group"]
6148 if admin_scale_info
["name"] == scaling_group
:
6149 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6151 else: # not found, set index one plus last element and add new entry with the name
6152 admin_scale_index
+= 1
6154 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6157 vca_scaling_info
= []
6158 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6159 if scaling_type
== "SCALE_OUT":
6160 if "aspect-delta-details" not in scaling_descriptor
:
6162 "Aspect delta details not fount in scaling descriptor {}".format(
6163 scaling_descriptor
["name"]
6166 # count if max-instance-count is reached
6167 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6169 scaling_info
["scaling_direction"] = "OUT"
6170 scaling_info
["vdu-create"] = {}
6171 scaling_info
["kdu-create"] = {}
6172 for delta
in deltas
:
6173 for vdu_delta
in delta
.get("vdu-delta", {}):
6174 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6175 # vdu_index also provides the number of instance of the targeted vdu
6176 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6177 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6181 additional_params
= (
6182 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6185 cloud_init_list
= []
6187 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6188 max_instance_count
= 10
6189 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6190 max_instance_count
= vdu_profile
.get(
6191 "max-number-of-instances", 10
6194 default_instance_num
= get_number_of_instances(
6197 instances_number
= vdu_delta
.get("number-of-instances", 1)
6198 nb_scale_op
+= instances_number
6200 new_instance_count
= nb_scale_op
+ default_instance_num
6201 # Control if new count is over max and vdu count is less than max.
6202 # Then assign new instance count
6203 if new_instance_count
> max_instance_count
> vdu_count
:
6204 instances_number
= new_instance_count
- max_instance_count
6206 instances_number
= instances_number
6208 if new_instance_count
> max_instance_count
:
6210 "reached the limit of {} (max-instance-count) "
6211 "scaling-out operations for the "
6212 "scaling-group-descriptor '{}'".format(
6213 nb_scale_op
, scaling_group
6216 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6218 # TODO Information of its own ip is not available because db_vnfr is not updated.
6219 additional_params
["OSM"] = get_osm_params(
6220 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6222 cloud_init_list
.append(
6223 self
._parse
_cloud
_init
(
6230 vca_scaling_info
.append(
6232 "osm_vdu_id": vdu_delta
["id"],
6233 "member-vnf-index": vnf_index
,
6235 "vdu_index": vdu_index
+ x
,
6238 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6239 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6240 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6241 kdu_name
= kdu_profile
["kdu-name"]
6242 resource_name
= kdu_profile
.get("resource-name", "")
6244 # Might have different kdus in the same delta
6245 # Should have list for each kdu
6246 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6247 scaling_info
["kdu-create"][kdu_name
] = []
6249 kdur
= get_kdur(db_vnfr
, kdu_name
)
6250 if kdur
.get("helm-chart"):
6251 k8s_cluster_type
= "helm-chart-v3"
6252 self
.logger
.debug("kdur: {}".format(kdur
))
6254 kdur
.get("helm-version")
6255 and kdur
.get("helm-version") == "v2"
6257 k8s_cluster_type
= "helm-chart"
6258 elif kdur
.get("juju-bundle"):
6259 k8s_cluster_type
= "juju-bundle"
6262 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6263 "juju-bundle. Maybe an old NBI version is running".format(
6264 db_vnfr
["member-vnf-index-ref"], kdu_name
6268 max_instance_count
= 10
6269 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6270 max_instance_count
= kdu_profile
.get(
6271 "max-number-of-instances", 10
6274 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6275 deployed_kdu
, _
= get_deployed_kdu(
6276 nsr_deployed
, kdu_name
, vnf_index
6278 if deployed_kdu
is None:
6280 "KDU '{}' for vnf '{}' not deployed".format(
6284 kdu_instance
= deployed_kdu
.get("kdu-instance")
6285 instance_num
= await self
.k8scluster_map
[
6291 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6292 kdu_model
=deployed_kdu
.get("kdu-model"),
6294 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6295 "number-of-instances", 1
6298 # Control if new count is over max and instance_num is less than max.
6299 # Then assign max instance number to kdu replica count
6300 if kdu_replica_count
> max_instance_count
> instance_num
:
6301 kdu_replica_count
= max_instance_count
6302 if kdu_replica_count
> max_instance_count
:
6304 "reached the limit of {} (max-instance-count) "
6305 "scaling-out operations for the "
6306 "scaling-group-descriptor '{}'".format(
6307 instance_num
, scaling_group
6311 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6312 vca_scaling_info
.append(
6314 "osm_kdu_id": kdu_name
,
6315 "member-vnf-index": vnf_index
,
6317 "kdu_index": instance_num
+ x
- 1,
6320 scaling_info
["kdu-create"][kdu_name
].append(
6322 "member-vnf-index": vnf_index
,
6324 "k8s-cluster-type": k8s_cluster_type
,
6325 "resource-name": resource_name
,
6326 "scale": kdu_replica_count
,
6329 elif scaling_type
== "SCALE_IN":
6330 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6332 scaling_info
["scaling_direction"] = "IN"
6333 scaling_info
["vdu-delete"] = {}
6334 scaling_info
["kdu-delete"] = {}
6336 for delta
in deltas
:
6337 for vdu_delta
in delta
.get("vdu-delta", {}):
6338 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6339 min_instance_count
= 0
6340 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6341 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6342 min_instance_count
= vdu_profile
["min-number-of-instances"]
6344 default_instance_num
= get_number_of_instances(
6345 db_vnfd
, vdu_delta
["id"]
6347 instance_num
= vdu_delta
.get("number-of-instances", 1)
6348 nb_scale_op
-= instance_num
6350 new_instance_count
= nb_scale_op
+ default_instance_num
6352 if new_instance_count
< min_instance_count
< vdu_count
:
6353 instances_number
= min_instance_count
- new_instance_count
6355 instances_number
= instance_num
6357 if new_instance_count
< min_instance_count
:
6359 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6360 "scaling-group-descriptor '{}'".format(
6361 nb_scale_op
, scaling_group
6364 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6365 vca_scaling_info
.append(
6367 "osm_vdu_id": vdu_delta
["id"],
6368 "member-vnf-index": vnf_index
,
6370 "vdu_index": vdu_index
- 1 - x
,
6373 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6374 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6375 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6376 kdu_name
= kdu_profile
["kdu-name"]
6377 resource_name
= kdu_profile
.get("resource-name", "")
6379 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6380 scaling_info
["kdu-delete"][kdu_name
] = []
6382 kdur
= get_kdur(db_vnfr
, kdu_name
)
6383 if kdur
.get("helm-chart"):
6384 k8s_cluster_type
= "helm-chart-v3"
6385 self
.logger
.debug("kdur: {}".format(kdur
))
6387 kdur
.get("helm-version")
6388 and kdur
.get("helm-version") == "v2"
6390 k8s_cluster_type
= "helm-chart"
6391 elif kdur
.get("juju-bundle"):
6392 k8s_cluster_type
= "juju-bundle"
6395 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6396 "juju-bundle. Maybe an old NBI version is running".format(
6397 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6401 min_instance_count
= 0
6402 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6403 min_instance_count
= kdu_profile
["min-number-of-instances"]
6405 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6406 deployed_kdu
, _
= get_deployed_kdu(
6407 nsr_deployed
, kdu_name
, vnf_index
6409 if deployed_kdu
is None:
6411 "KDU '{}' for vnf '{}' not deployed".format(
6415 kdu_instance
= deployed_kdu
.get("kdu-instance")
6416 instance_num
= await self
.k8scluster_map
[
6422 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6423 kdu_model
=deployed_kdu
.get("kdu-model"),
6425 kdu_replica_count
= instance_num
- kdu_delta
.get(
6426 "number-of-instances", 1
6429 if kdu_replica_count
< min_instance_count
< instance_num
:
6430 kdu_replica_count
= min_instance_count
6431 if kdu_replica_count
< min_instance_count
:
6433 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6434 "scaling-group-descriptor '{}'".format(
6435 instance_num
, scaling_group
6439 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6440 vca_scaling_info
.append(
6442 "osm_kdu_id": kdu_name
,
6443 "member-vnf-index": vnf_index
,
6445 "kdu_index": instance_num
- x
- 1,
6448 scaling_info
["kdu-delete"][kdu_name
].append(
6450 "member-vnf-index": vnf_index
,
6452 "k8s-cluster-type": k8s_cluster_type
,
6453 "resource-name": resource_name
,
6454 "scale": kdu_replica_count
,
6458 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6459 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6460 if scaling_info
["scaling_direction"] == "IN":
6461 for vdur
in reversed(db_vnfr
["vdur"]):
6462 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6463 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6464 scaling_info
["vdu"].append(
6466 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6467 "vdu_id": vdur
["vdu-id-ref"],
6471 for interface
in vdur
["interfaces"]:
6472 scaling_info
["vdu"][-1]["interface"].append(
6474 "name": interface
["name"],
6475 "ip_address": interface
["ip-address"],
6476 "mac_address": interface
.get("mac-address"),
6479 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6482 step
= "Executing pre-scale vnf-config-primitive"
6483 if scaling_descriptor
.get("scaling-config-action"):
6484 for scaling_config_action
in scaling_descriptor
[
6485 "scaling-config-action"
6488 scaling_config_action
.get("trigger") == "pre-scale-in"
6489 and scaling_type
== "SCALE_IN"
6491 scaling_config_action
.get("trigger") == "pre-scale-out"
6492 and scaling_type
== "SCALE_OUT"
6494 vnf_config_primitive
= scaling_config_action
[
6495 "vnf-config-primitive-name-ref"
6497 step
= db_nslcmop_update
[
6499 ] = "executing pre-scale scaling-config-action '{}'".format(
6500 vnf_config_primitive
6503 # look for primitive
6504 for config_primitive
in (
6505 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6506 ).get("config-primitive", ()):
6507 if config_primitive
["name"] == vnf_config_primitive
:
6511 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6512 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6513 "primitive".format(scaling_group
, vnf_config_primitive
)
6516 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6517 if db_vnfr
.get("additionalParamsForVnf"):
6518 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6520 scale_process
= "VCA"
6521 db_nsr_update
["config-status"] = "configuring pre-scaling"
6522 primitive_params
= self
._map
_primitive
_params
(
6523 config_primitive
, {}, vnfr_params
6526 # Pre-scale retry check: Check if this sub-operation has been executed before
6527 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6530 vnf_config_primitive
,
6534 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6535 # Skip sub-operation
6536 result
= "COMPLETED"
6537 result_detail
= "Done"
6540 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6541 vnf_config_primitive
, result
, result_detail
6545 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6546 # New sub-operation: Get index of this sub-operation
6548 len(db_nslcmop
.get("_admin", {}).get("operations"))
6553 + "vnf_config_primitive={} New sub-operation".format(
6554 vnf_config_primitive
6558 # retry: Get registered params for this existing sub-operation
6559 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6562 vnf_index
= op
.get("member_vnf_index")
6563 vnf_config_primitive
= op
.get("primitive")
6564 primitive_params
= op
.get("primitive_params")
6567 + "vnf_config_primitive={} Sub-operation retry".format(
6568 vnf_config_primitive
6571 # Execute the primitive, either with new (first-time) or registered (reintent) args
6572 ee_descriptor_id
= config_primitive
.get(
6573 "execution-environment-ref"
6575 primitive_name
= config_primitive
.get(
6576 "execution-environment-primitive", vnf_config_primitive
6578 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6579 nsr_deployed
["VCA"],
6580 member_vnf_index
=vnf_index
,
6582 vdu_count_index
=None,
6583 ee_descriptor_id
=ee_descriptor_id
,
6585 result
, result_detail
= await self
._ns
_execute
_primitive
(
6594 + "vnf_config_primitive={} Done with result {} {}".format(
6595 vnf_config_primitive
, result
, result_detail
6598 # Update operationState = COMPLETED | FAILED
6599 self
._update
_suboperation
_status
(
6600 db_nslcmop
, op_index
, result
, result_detail
6603 if result
== "FAILED":
6604 raise LcmException(result_detail
)
6605 db_nsr_update
["config-status"] = old_config_status
6606 scale_process
= None
6610 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6613 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6616 # SCALE-IN VCA - BEGIN
6617 if vca_scaling_info
:
6618 step
= db_nslcmop_update
[
6620 ] = "Deleting the execution environments"
6621 scale_process
= "VCA"
6622 for vca_info
in vca_scaling_info
:
6623 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6624 member_vnf_index
= str(vca_info
["member-vnf-index"])
6626 logging_text
+ "vdu info: {}".format(vca_info
)
6628 if vca_info
.get("osm_vdu_id"):
6629 vdu_id
= vca_info
["osm_vdu_id"]
6630 vdu_index
= int(vca_info
["vdu_index"])
6633 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6634 member_vnf_index
, vdu_id
, vdu_index
6636 stage
[2] = step
= "Scaling in VCA"
6637 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6638 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6639 config_update
= db_nsr
["configurationStatus"]
6640 for vca_index
, vca
in enumerate(vca_update
):
6642 (vca
or vca
.get("ee_id"))
6643 and vca
["member-vnf-index"] == member_vnf_index
6644 and vca
["vdu_count_index"] == vdu_index
6646 if vca
.get("vdu_id"):
6647 config_descriptor
= get_configuration(
6648 db_vnfd
, vca
.get("vdu_id")
6650 elif vca
.get("kdu_name"):
6651 config_descriptor
= get_configuration(
6652 db_vnfd
, vca
.get("kdu_name")
6655 config_descriptor
= get_configuration(
6656 db_vnfd
, db_vnfd
["id"]
6658 operation_params
= (
6659 db_nslcmop
.get("operationParams") or {}
6661 exec_terminate_primitives
= not operation_params
.get(
6662 "skip_terminate_primitives"
6663 ) and vca
.get("needed_terminate")
6664 task
= asyncio
.ensure_future(
6673 exec_primitives
=exec_terminate_primitives
,
6677 timeout
=self
.timeout_charm_delete
,
6680 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6683 del vca_update
[vca_index
]
6684 del config_update
[vca_index
]
6685 # wait for pending tasks of terminate primitives
6689 + "Waiting for tasks {}".format(
6690 list(tasks_dict_info
.keys())
6693 error_list
= await self
._wait
_for
_tasks
(
6697 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6702 tasks_dict_info
.clear()
6704 raise LcmException("; ".join(error_list
))
6706 db_vca_and_config_update
= {
6707 "_admin.deployed.VCA": vca_update
,
6708 "configurationStatus": config_update
,
6711 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6713 scale_process
= None
6714 # SCALE-IN VCA - END
6717 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6718 scale_process
= "RO"
6719 if self
.ro_config
.get("ng"):
6720 await self
._scale
_ng
_ro
(
6721 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6723 scaling_info
.pop("vdu-create", None)
6724 scaling_info
.pop("vdu-delete", None)
6726 scale_process
= None
6730 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6731 scale_process
= "KDU"
6732 await self
._scale
_kdu
(
6733 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6735 scaling_info
.pop("kdu-create", None)
6736 scaling_info
.pop("kdu-delete", None)
6738 scale_process
= None
6742 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6744 # SCALE-UP VCA - BEGIN
6745 if vca_scaling_info
:
6746 step
= db_nslcmop_update
[
6748 ] = "Creating new execution environments"
6749 scale_process
= "VCA"
6750 for vca_info
in vca_scaling_info
:
6751 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6752 member_vnf_index
= str(vca_info
["member-vnf-index"])
6754 logging_text
+ "vdu info: {}".format(vca_info
)
6756 vnfd_id
= db_vnfr
["vnfd-ref"]
6757 if vca_info
.get("osm_vdu_id"):
6758 vdu_index
= int(vca_info
["vdu_index"])
6759 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6760 if db_vnfr
.get("additionalParamsForVnf"):
6761 deploy_params
.update(
6763 db_vnfr
["additionalParamsForVnf"].copy()
6766 descriptor_config
= get_configuration(
6767 db_vnfd
, db_vnfd
["id"]
6769 if descriptor_config
:
6774 logging_text
=logging_text
6775 + "member_vnf_index={} ".format(member_vnf_index
),
6778 nslcmop_id
=nslcmop_id
,
6784 member_vnf_index
=member_vnf_index
,
6785 vdu_index
=vdu_index
,
6787 deploy_params
=deploy_params
,
6788 descriptor_config
=descriptor_config
,
6789 base_folder
=base_folder
,
6790 task_instantiation_info
=tasks_dict_info
,
6793 vdu_id
= vca_info
["osm_vdu_id"]
6794 vdur
= find_in_list(
6795 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6797 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6798 if vdur
.get("additionalParams"):
6799 deploy_params_vdu
= parse_yaml_strings(
6800 vdur
["additionalParams"]
6803 deploy_params_vdu
= deploy_params
6804 deploy_params_vdu
["OSM"] = get_osm_params(
6805 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6807 if descriptor_config
:
6812 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6813 member_vnf_index
, vdu_id
, vdu_index
6815 stage
[2] = step
= "Scaling out VCA"
6816 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6818 logging_text
=logging_text
6819 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6820 member_vnf_index
, vdu_id
, vdu_index
6824 nslcmop_id
=nslcmop_id
,
6830 member_vnf_index
=member_vnf_index
,
6831 vdu_index
=vdu_index
,
6833 deploy_params
=deploy_params_vdu
,
6834 descriptor_config
=descriptor_config
,
6835 base_folder
=base_folder
,
6836 task_instantiation_info
=tasks_dict_info
,
6839 # SCALE-UP VCA - END
6840 scale_process
= None
6843 # execute primitive service POST-SCALING
6844 step
= "Executing post-scale vnf-config-primitive"
6845 if scaling_descriptor
.get("scaling-config-action"):
6846 for scaling_config_action
in scaling_descriptor
[
6847 "scaling-config-action"
6850 scaling_config_action
.get("trigger") == "post-scale-in"
6851 and scaling_type
== "SCALE_IN"
6853 scaling_config_action
.get("trigger") == "post-scale-out"
6854 and scaling_type
== "SCALE_OUT"
6856 vnf_config_primitive
= scaling_config_action
[
6857 "vnf-config-primitive-name-ref"
6859 step
= db_nslcmop_update
[
6861 ] = "executing post-scale scaling-config-action '{}'".format(
6862 vnf_config_primitive
6865 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6866 if db_vnfr
.get("additionalParamsForVnf"):
6867 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6869 # look for primitive
6870 for config_primitive
in (
6871 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6872 ).get("config-primitive", ()):
6873 if config_primitive
["name"] == vnf_config_primitive
:
6877 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6878 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6879 "config-primitive".format(
6880 scaling_group
, vnf_config_primitive
6883 scale_process
= "VCA"
6884 db_nsr_update
["config-status"] = "configuring post-scaling"
6885 primitive_params
= self
._map
_primitive
_params
(
6886 config_primitive
, {}, vnfr_params
6889 # Post-scale retry check: Check if this sub-operation has been executed before
6890 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6893 vnf_config_primitive
,
6897 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6898 # Skip sub-operation
6899 result
= "COMPLETED"
6900 result_detail
= "Done"
6903 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6904 vnf_config_primitive
, result
, result_detail
6908 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6909 # New sub-operation: Get index of this sub-operation
6911 len(db_nslcmop
.get("_admin", {}).get("operations"))
6916 + "vnf_config_primitive={} New sub-operation".format(
6917 vnf_config_primitive
6921 # retry: Get registered params for this existing sub-operation
6922 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6925 vnf_index
= op
.get("member_vnf_index")
6926 vnf_config_primitive
= op
.get("primitive")
6927 primitive_params
= op
.get("primitive_params")
6930 + "vnf_config_primitive={} Sub-operation retry".format(
6931 vnf_config_primitive
6934 # Execute the primitive, either with new (first-time) or registered (reintent) args
6935 ee_descriptor_id
= config_primitive
.get(
6936 "execution-environment-ref"
6938 primitive_name
= config_primitive
.get(
6939 "execution-environment-primitive", vnf_config_primitive
6941 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6942 nsr_deployed
["VCA"],
6943 member_vnf_index
=vnf_index
,
6945 vdu_count_index
=None,
6946 ee_descriptor_id
=ee_descriptor_id
,
6948 result
, result_detail
= await self
._ns
_execute
_primitive
(
6957 + "vnf_config_primitive={} Done with result {} {}".format(
6958 vnf_config_primitive
, result
, result_detail
6961 # Update operationState = COMPLETED | FAILED
6962 self
._update
_suboperation
_status
(
6963 db_nslcmop
, op_index
, result
, result_detail
6966 if result
== "FAILED":
6967 raise LcmException(result_detail
)
6968 db_nsr_update
["config-status"] = old_config_status
6969 scale_process
= None
6974 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6975 db_nsr_update
["operational-status"] = (
6977 if old_operational_status
== "failed"
6978 else old_operational_status
6980 db_nsr_update
["config-status"] = old_config_status
6983 ROclient
.ROClientException
,
6988 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6990 except asyncio
.CancelledError
:
6992 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6994 exc
= "Operation was cancelled"
6995 except Exception as e
:
6996 exc
= traceback
.format_exc()
6997 self
.logger
.critical(
6998 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7002 self
._write
_ns
_status
(
7005 current_operation
="IDLE",
7006 current_operation_id
=None,
7009 stage
[1] = "Waiting for instantiate pending tasks."
7010 self
.logger
.debug(logging_text
+ stage
[1])
7011 exc
= await self
._wait
_for
_tasks
(
7014 self
.timeout_ns_deploy
,
7022 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7023 nslcmop_operation_state
= "FAILED"
7025 db_nsr_update
["operational-status"] = old_operational_status
7026 db_nsr_update
["config-status"] = old_config_status
7027 db_nsr_update
["detailed-status"] = ""
7029 if "VCA" in scale_process
:
7030 db_nsr_update
["config-status"] = "failed"
7031 if "RO" in scale_process
:
7032 db_nsr_update
["operational-status"] = "failed"
7035 ] = "FAILED scaling nslcmop={} {}: {}".format(
7036 nslcmop_id
, step
, exc
7039 error_description_nslcmop
= None
7040 nslcmop_operation_state
= "COMPLETED"
7041 db_nslcmop_update
["detailed-status"] = "Done"
7043 self
._write
_op
_status
(
7046 error_message
=error_description_nslcmop
,
7047 operation_state
=nslcmop_operation_state
,
7048 other_update
=db_nslcmop_update
,
7051 self
._write
_ns
_status
(
7054 current_operation
="IDLE",
7055 current_operation_id
=None,
7056 other_update
=db_nsr_update
,
7059 if nslcmop_operation_state
:
7063 "nslcmop_id": nslcmop_id
,
7064 "operationState": nslcmop_operation_state
,
7066 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7067 except Exception as e
:
7069 logging_text
+ "kafka_write notification Exception {}".format(e
)
7071 self
.logger
.debug(logging_text
+ "Exit")
7072 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7074 async def _scale_kdu(
7075 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7077 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7078 for kdu_name
in _scaling_info
:
7079 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7080 deployed_kdu
, index
= get_deployed_kdu(
7081 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7083 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7084 kdu_instance
= deployed_kdu
["kdu-instance"]
7085 kdu_model
= deployed_kdu
.get("kdu-model")
7086 scale
= int(kdu_scaling_info
["scale"])
7087 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7090 "collection": "nsrs",
7091 "filter": {"_id": nsr_id
},
7092 "path": "_admin.deployed.K8s.{}".format(index
),
7095 step
= "scaling application {}".format(
7096 kdu_scaling_info
["resource-name"]
7098 self
.logger
.debug(logging_text
+ step
)
7100 if kdu_scaling_info
["type"] == "delete":
7101 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7104 and kdu_config
.get("terminate-config-primitive")
7105 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7107 terminate_config_primitive_list
= kdu_config
.get(
7108 "terminate-config-primitive"
7110 terminate_config_primitive_list
.sort(
7111 key
=lambda val
: int(val
["seq"])
7115 terminate_config_primitive
7116 ) in terminate_config_primitive_list
:
7117 primitive_params_
= self
._map
_primitive
_params
(
7118 terminate_config_primitive
, {}, {}
7120 step
= "execute terminate config primitive"
7121 self
.logger
.debug(logging_text
+ step
)
7122 await asyncio
.wait_for(
7123 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7124 cluster_uuid
=cluster_uuid
,
7125 kdu_instance
=kdu_instance
,
7126 primitive_name
=terminate_config_primitive
["name"],
7127 params
=primitive_params_
,
7134 await asyncio
.wait_for(
7135 self
.k8scluster_map
[k8s_cluster_type
].scale(
7138 kdu_scaling_info
["resource-name"],
7140 cluster_uuid
=cluster_uuid
,
7141 kdu_model
=kdu_model
,
7145 timeout
=self
.timeout_vca_on_error
,
7148 if kdu_scaling_info
["type"] == "create":
7149 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7152 and kdu_config
.get("initial-config-primitive")
7153 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7155 initial_config_primitive_list
= kdu_config
.get(
7156 "initial-config-primitive"
7158 initial_config_primitive_list
.sort(
7159 key
=lambda val
: int(val
["seq"])
7162 for initial_config_primitive
in initial_config_primitive_list
:
7163 primitive_params_
= self
._map
_primitive
_params
(
7164 initial_config_primitive
, {}, {}
7166 step
= "execute initial config primitive"
7167 self
.logger
.debug(logging_text
+ step
)
7168 await asyncio
.wait_for(
7169 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7170 cluster_uuid
=cluster_uuid
,
7171 kdu_instance
=kdu_instance
,
7172 primitive_name
=initial_config_primitive
["name"],
7173 params
=primitive_params_
,
7180 async def _scale_ng_ro(
7181 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7183 nsr_id
= db_nslcmop
["nsInstanceId"]
7184 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7187 # read from db: vnfd's for every vnf
7190 # for each vnf in ns, read vnfd
7191 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7192 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7193 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7194 # if we haven't this vnfd, read it from db
7195 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7197 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7198 db_vnfds
.append(vnfd
)
7199 n2vc_key
= self
.n2vc
.get_public_key()
7200 n2vc_key_list
= [n2vc_key
]
7203 vdu_scaling_info
.get("vdu-create"),
7204 vdu_scaling_info
.get("vdu-delete"),
7207 # db_vnfr has been updated, update db_vnfrs to use it
7208 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7209 await self
._instantiate
_ng
_ro
(
7219 start_deploy
=time(),
7220 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7222 if vdu_scaling_info
.get("vdu-delete"):
7224 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7227 async def extract_prometheus_scrape_jobs(
7228 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7230 # look if exist a file called 'prometheus*.j2' and
7231 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7235 for f
in artifact_content
7236 if f
.startswith("prometheus") and f
.endswith(".j2")
7242 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7246 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7247 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7249 vnfr_id
= vnfr_id
.replace("-", "")
7251 "JOB_NAME": vnfr_id
,
7252 "TARGET_IP": target_ip
,
7253 "EXPORTER_POD_IP": host_name
,
7254 "EXPORTER_POD_PORT": host_port
,
7256 job_list
= parse_job(job_data
, variables
)
7257 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7258 for job
in job_list
:
7260 not isinstance(job
.get("job_name"), str)
7261 or vnfr_id
not in job
["job_name"]
7263 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7264 job
["nsr_id"] = nsr_id
7265 job
["vnfr_id"] = vnfr_id
7268 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7269 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7270 self
.logger
.info(logging_text
+ "Enter")
7271 stage
= ["Preparing the environment", ""]
7272 # database nsrs record
7276 # in case of error, indicates what part of scale was failed to put nsr at error status
7277 start_deploy
= time()
7279 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7280 vim_account_id
= db_vnfr
.get("vim-account-id")
7281 vim_info_key
= "vim:" + vim_account_id
7282 vdu_id
= additional_param
["vdu_id"]
7283 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7284 vdur
= find_in_list(
7285 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7288 vdu_vim_name
= vdur
["name"]
7289 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7290 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7292 raise LcmException("Target vdu is not found")
7293 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7294 # wait for any previous tasks in process
7295 stage
[1] = "Waiting for previous operations to terminate"
7296 self
.logger
.info(stage
[1])
7297 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7299 stage
[1] = "Reading from database."
7300 self
.logger
.info(stage
[1])
7301 self
._write
_ns
_status
(
7304 current_operation
=operation_type
.upper(),
7305 current_operation_id
=nslcmop_id
7307 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7310 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7311 db_nsr_update
["operational-status"] = operation_type
7312 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7316 "vim_vm_id": vim_vm_id
,
7318 "vdu_index": additional_param
["count-index"],
7319 "vdu_id": vdur
["id"],
7320 "target_vim": target_vim
,
7321 "vim_account_id": vim_account_id
7324 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7325 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7326 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7327 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7328 self
.logger
.info("response from RO: {}".format(result_dict
))
7329 action_id
= result_dict
["action_id"]
7330 await self
._wait
_ng
_ro
(
7331 nsr_id
, action_id
, nslcmop_id
, start_deploy
,
7332 self
.timeout_operate
, None, "start_stop_rebuild",
7334 return "COMPLETED", "Done"
7335 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7336 self
.logger
.error("Exit Exception {}".format(e
))
7338 except asyncio
.CancelledError
:
7339 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7340 exc
= "Operation was cancelled"
7341 except Exception as e
:
7342 exc
= traceback
.format_exc()
7343 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7344 return "FAILED", "Error in operate VNF {}".format(exc
)
7346 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7348 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7350 :param: vim_account_id: VIM Account ID
7352 :return: (cloud_name, cloud_credential)
7354 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7355 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7357 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7359 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7361 :param: vim_account_id: VIM Account ID
7363 :return: (cloud_name, cloud_credential)
7365 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7366 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7368 async def migrate(self
, nsr_id
, nslcmop_id
):
7370 Migrate VNFs and VDUs instances in a NS
7372 :param: nsr_id: NS Instance ID
7373 :param: nslcmop_id: nslcmop ID of migrate
7376 # Try to lock HA task here
7377 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7378 if not task_is_locked_by_me
:
7380 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7381 self
.logger
.debug(logging_text
+ "Enter")
7382 # get all needed from database
7384 db_nslcmop_update
= {}
7385 nslcmop_operation_state
= None
7389 # in case of error, indicates what part of scale was failed to put nsr at error status
7390 start_deploy
= time()
7393 # wait for any previous tasks in process
7394 step
= "Waiting for previous operations to terminate"
7395 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7397 self
._write
_ns
_status
(
7400 current_operation
="MIGRATING",
7401 current_operation_id
=nslcmop_id
,
7403 step
= "Getting nslcmop from database"
7405 step
+ " after having waited for previous tasks to be completed"
7407 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7408 migrate_params
= db_nslcmop
.get("operationParams")
7411 target
.update(migrate_params
)
7412 desc
= await self
.RO
.migrate(nsr_id
, target
)
7413 self
.logger
.debug("RO return > {}".format(desc
))
7414 action_id
= desc
["action_id"]
7415 await self
._wait
_ng
_ro
(
7416 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7419 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7420 self
.logger
.error("Exit Exception {}".format(e
))
7422 except asyncio
.CancelledError
:
7423 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7424 exc
= "Operation was cancelled"
7425 except Exception as e
:
7426 exc
= traceback
.format_exc()
7427 self
.logger
.critical(
7428 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7431 self
._write
_ns
_status
(
7434 current_operation
="IDLE",
7435 current_operation_id
=None,
7438 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7439 nslcmop_operation_state
= "FAILED"
7441 nslcmop_operation_state
= "COMPLETED"
7442 db_nslcmop_update
["detailed-status"] = "Done"
7443 db_nsr_update
["detailed-status"] = "Done"
7445 self
._write
_op
_status
(
7449 operation_state
=nslcmop_operation_state
,
7450 other_update
=db_nslcmop_update
,
7452 if nslcmop_operation_state
:
7456 "nslcmop_id": nslcmop_id
,
7457 "operationState": nslcmop_operation_state
,
7459 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7460 except Exception as e
:
7462 logging_text
+ "kafka_write notification Exception {}".format(e
)
7464 self
.logger
.debug(logging_text
+ "Exit")
7465 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7468 async def heal(self
, nsr_id
, nslcmop_id
):
7472 :param nsr_id: ns instance to heal
7473 :param nslcmop_id: operation to run
7477 # Try to lock HA task here
7478 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7479 if not task_is_locked_by_me
:
7482 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7483 stage
= ["", "", ""]
7484 tasks_dict_info
= {}
7485 # ^ stage, step, VIM progress
7486 self
.logger
.debug(logging_text
+ "Enter")
7487 # get all needed from database
7489 db_nslcmop_update
= {}
7491 db_vnfrs
= {} # vnf's info indexed by _id
7493 old_operational_status
= ""
7494 old_config_status
= ""
7497 # wait for any previous tasks in process
7498 step
= "Waiting for previous operations to terminate"
7499 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7500 self
._write
_ns
_status
(
7503 current_operation
="HEALING",
7504 current_operation_id
=nslcmop_id
,
7507 step
= "Getting nslcmop from database"
7509 step
+ " after having waited for previous tasks to be completed"
7511 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7513 step
= "Getting nsr from database"
7514 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7515 old_operational_status
= db_nsr
["operational-status"]
7516 old_config_status
= db_nsr
["config-status"]
7519 "_admin.deployed.RO.operational-status": "healing",
7521 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7523 step
= "Sending heal order to VIM"
7524 task_ro
= asyncio
.ensure_future(
7526 logging_text
=logging_text
,
7528 db_nslcmop
=db_nslcmop
,
7532 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "heal_RO", task_ro
)
7533 tasks_dict_info
[task_ro
] = "Healing at VIM"
7537 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7538 self
.logger
.debug(logging_text
+ stage
[1])
7539 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7540 self
.fs
.sync(db_nsr
["nsd-id"])
7542 # read from db: vnfr's of this ns
7543 step
= "Getting vnfrs from db"
7544 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7545 for vnfr
in db_vnfrs_list
:
7546 db_vnfrs
[vnfr
["_id"]] = vnfr
7547 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7549 # Check for each target VNF
7550 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7551 for target_vnf
in target_list
:
7552 # Find this VNF in the list from DB
7553 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7555 db_vnfr
= db_vnfrs
[vnfr_id
]
7556 vnfd_id
= db_vnfr
.get("vnfd-id")
7557 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7558 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7559 base_folder
= vnfd
["_admin"]["storage"]
7564 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7565 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7567 # Check each target VDU and deploy N2VC
7568 for target_vdu
in target_vnf
["additionalParams"].get("vdu", None):
7569 deploy_params_vdu
= target_vdu
7570 # Set run-day1 vnf level value if not vdu level value exists
7571 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7572 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7573 vdu_name
= target_vdu
.get("vdu-id", None)
7574 # TODO: Get vdu_id from vdud.
7576 # For multi instance VDU count-index is mandatory
7577 # For single session VDU count-indes is 0
7578 vdu_index
= target_vdu
.get("count-index",0)
7580 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7581 stage
[1] = "Deploying Execution Environments."
7582 self
.logger
.debug(logging_text
+ stage
[1])
7584 # VNF Level charm. Normal case when proxy charms.
7585 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7586 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7587 if descriptor_config
:
7588 # Continue if healed machine is management machine
7589 vnf_ip_address
= db_vnfr
.get("ip-address")
7590 target_instance
= None
7591 for instance
in db_vnfr
.get("vdur", None):
7592 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7593 target_instance
= instance
7595 if vnf_ip_address
== target_instance
.get("ip-address"):
7597 logging_text
=logging_text
7598 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7599 member_vnf_index
, vdu_name
, vdu_index
7603 nslcmop_id
=nslcmop_id
,
7609 member_vnf_index
=member_vnf_index
,
7612 deploy_params
=deploy_params_vdu
,
7613 descriptor_config
=descriptor_config
,
7614 base_folder
=base_folder
,
7615 task_instantiation_info
=tasks_dict_info
,
7619 # VDU Level charm. Normal case with native charms.
7620 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7621 if descriptor_config
:
7623 logging_text
=logging_text
7624 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7625 member_vnf_index
, vdu_name
, vdu_index
7629 nslcmop_id
=nslcmop_id
,
7635 member_vnf_index
=member_vnf_index
,
7636 vdu_index
=vdu_index
,
7638 deploy_params
=deploy_params_vdu
,
7639 descriptor_config
=descriptor_config
,
7640 base_folder
=base_folder
,
7641 task_instantiation_info
=tasks_dict_info
,
7646 ROclient
.ROClientException
,
7651 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7653 except asyncio
.CancelledError
:
7655 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7657 exc
= "Operation was cancelled"
7658 except Exception as e
:
7659 exc
= traceback
.format_exc()
7660 self
.logger
.critical(
7661 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7666 stage
[1] = "Waiting for healing pending tasks."
7667 self
.logger
.debug(logging_text
+ stage
[1])
7668 exc
= await self
._wait
_for
_tasks
(
7671 self
.timeout_ns_deploy
,
7679 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7680 nslcmop_operation_state
= "FAILED"
7682 db_nsr_update
["operational-status"] = old_operational_status
7683 db_nsr_update
["config-status"] = old_config_status
7686 ] = "FAILED healing nslcmop={} {}: {}".format(
7687 nslcmop_id
, step
, exc
7689 for task
, task_name
in tasks_dict_info
.items():
7690 if not task
.done() or task
.cancelled() or task
.exception():
7691 if task_name
.startswith(self
.task_name_deploy_vca
):
7692 # A N2VC task is pending
7693 db_nsr_update
["config-status"] = "failed"
7695 # RO task is pending
7696 db_nsr_update
["operational-status"] = "failed"
7698 error_description_nslcmop
= None
7699 nslcmop_operation_state
= "COMPLETED"
7700 db_nslcmop_update
["detailed-status"] = "Done"
7701 db_nsr_update
["detailed-status"] = "Done"
7702 db_nsr_update
["operational-status"] = "running"
7703 db_nsr_update
["config-status"] = "configured"
7705 self
._write
_op
_status
(
7708 error_message
=error_description_nslcmop
,
7709 operation_state
=nslcmop_operation_state
,
7710 other_update
=db_nslcmop_update
,
7713 self
._write
_ns
_status
(
7716 current_operation
="IDLE",
7717 current_operation_id
=None,
7718 other_update
=db_nsr_update
,
7721 if nslcmop_operation_state
:
7725 "nslcmop_id": nslcmop_id
,
7726 "operationState": nslcmop_operation_state
,
7728 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7729 except Exception as e
:
7731 logging_text
+ "kafka_write notification Exception {}".format(e
)
7733 self
.logger
.debug(logging_text
+ "Exit")
7734 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7745 :param logging_text: preffix text to use at logging
7746 :param nsr_id: nsr identity
7747 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7748 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7749 :return: None or exception
7751 def get_vim_account(vim_account_id
):
7753 if vim_account_id
in db_vims
:
7754 return db_vims
[vim_account_id
]
7755 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7756 db_vims
[vim_account_id
] = db_vim
7761 ns_params
= db_nslcmop
.get("operationParams")
7762 if ns_params
and ns_params
.get("timeout_ns_heal"):
7763 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7765 timeout_ns_heal
= self
.timeout
.get(
7766 "ns_heal", self
.timeout_ns_heal
7771 nslcmop_id
= db_nslcmop
["_id"]
7773 "action_id": nslcmop_id
,
7775 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7776 target
.update(db_nslcmop
.get("operationParams", {}))
7778 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7779 desc
= await self
.RO
.recreate(nsr_id
, target
)
7780 self
.logger
.debug("RO return > {}".format(desc
))
7781 action_id
= desc
["action_id"]
7782 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7783 await self
._wait
_ng
_ro
(
7784 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7790 "_admin.deployed.RO.operational-status": "running",
7791 "detailed-status": " ".join(stage
),
7793 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7794 self
._write
_op
_status
(nslcmop_id
, stage
)
7796 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7799 except Exception as e
:
7800 stage
[2] = "ERROR healing at VIM"
7801 #self.set_vnfr_at_error(db_vnfrs, str(e))
7803 "Error healing at VIM {}".format(e
),
7804 exc_info
=not isinstance(
7807 ROclient
.ROClientException
,
7833 task_instantiation_info
,
7836 # launch instantiate_N2VC in a asyncio task and register task object
7837 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7838 # if not found, create one entry and update database
7839 # fill db_nsr._admin.deployed.VCA.<index>
7842 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7846 get_charm_name
= False
7847 if "execution-environment-list" in descriptor_config
:
7848 ee_list
= descriptor_config
.get("execution-environment-list", [])
7849 elif "juju" in descriptor_config
:
7850 ee_list
= [descriptor_config
] # ns charms
7851 if "execution-environment-list" not in descriptor_config
:
7852 # charm name is only required for ns charms
7853 get_charm_name
= True
7854 else: # other types as script are not supported
7857 for ee_item
in ee_list
:
7860 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7861 ee_item
.get("juju"), ee_item
.get("helm-chart")
7864 ee_descriptor_id
= ee_item
.get("id")
7865 if ee_item
.get("juju"):
7866 vca_name
= ee_item
["juju"].get("charm")
7868 charm_name
= self
.find_charm_name(db_nsr
, str(vca_name
))
7871 if ee_item
["juju"].get("charm") is not None
7874 if ee_item
["juju"].get("cloud") == "k8s":
7875 vca_type
= "k8s_proxy_charm"
7876 elif ee_item
["juju"].get("proxy") is False:
7877 vca_type
= "native_charm"
7878 elif ee_item
.get("helm-chart"):
7879 vca_name
= ee_item
["helm-chart"]
7880 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7883 vca_type
= "helm-v3"
7886 logging_text
+ "skipping non juju neither charm configuration"
7891 for vca_index
, vca_deployed
in enumerate(
7892 db_nsr
["_admin"]["deployed"]["VCA"]
7894 if not vca_deployed
:
7897 vca_deployed
.get("member-vnf-index") == member_vnf_index
7898 and vca_deployed
.get("vdu_id") == vdu_id
7899 and vca_deployed
.get("kdu_name") == kdu_name
7900 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7901 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7905 # not found, create one.
7907 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7910 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7912 target
+= "/kdu/{}".format(kdu_name
)
7914 "target_element": target
,
7915 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7916 "member-vnf-index": member_vnf_index
,
7918 "kdu_name": kdu_name
,
7919 "vdu_count_index": vdu_index
,
7920 "operational-status": "init", # TODO revise
7921 "detailed-status": "", # TODO revise
7922 "step": "initial-deploy", # TODO revise
7924 "vdu_name": vdu_name
,
7926 "ee_descriptor_id": ee_descriptor_id
,
7927 "charm_name": charm_name
,
7931 # create VCA and configurationStatus in db
7933 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7934 "configurationStatus.{}".format(vca_index
): dict(),
7936 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7938 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7940 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7941 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7942 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7945 task_n2vc
= asyncio
.ensure_future(
7947 logging_text
=logging_text
,
7948 vca_index
=vca_index
,
7954 vdu_index
=vdu_index
,
7955 deploy_params
=deploy_params
,
7956 config_descriptor
=descriptor_config
,
7957 base_folder
=base_folder
,
7958 nslcmop_id
=nslcmop_id
,
7962 ee_config_descriptor
=ee_item
,
7965 self
.lcm_tasks
.register(
7969 "instantiate_N2VC-{}".format(vca_index
),
7972 task_instantiation_info
[
7974 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
7975 member_vnf_index
or "", vdu_id
or ""
7978 async def heal_N2VC(
7995 ee_config_descriptor
,
7997 nsr_id
= db_nsr
["_id"]
7998 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
7999 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8000 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8001 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8003 "collection": "nsrs",
8004 "filter": {"_id": nsr_id
},
8005 "path": db_update_entry
,
8011 element_under_configuration
= nsr_id
8015 vnfr_id
= db_vnfr
["_id"]
8016 osm_config
["osm"]["vnf_id"] = vnfr_id
8018 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8020 if vca_type
== "native_charm":
8023 index_number
= vdu_index
or 0
8026 element_type
= "VNF"
8027 element_under_configuration
= vnfr_id
8028 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8030 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8031 element_type
= "VDU"
8032 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8033 osm_config
["osm"]["vdu_id"] = vdu_id
8035 namespace
+= ".{}".format(kdu_name
)
8036 element_type
= "KDU"
8037 element_under_configuration
= kdu_name
8038 osm_config
["osm"]["kdu_name"] = kdu_name
8041 if base_folder
["pkg-dir"]:
8042 artifact_path
= "{}/{}/{}/{}".format(
8043 base_folder
["folder"],
8044 base_folder
["pkg-dir"],
8047 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8052 artifact_path
= "{}/Scripts/{}/{}/".format(
8053 base_folder
["folder"],
8056 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8061 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8063 # get initial_config_primitive_list that applies to this element
8064 initial_config_primitive_list
= config_descriptor
.get(
8065 "initial-config-primitive"
8069 "Initial config primitive list > {}".format(
8070 initial_config_primitive_list
8074 # add config if not present for NS charm
8075 ee_descriptor_id
= ee_config_descriptor
.get("id")
8076 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8077 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8078 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8082 "Initial config primitive list #2 > {}".format(
8083 initial_config_primitive_list
8086 # n2vc_redesign STEP 3.1
8087 # find old ee_id if exists
8088 ee_id
= vca_deployed
.get("ee_id")
8090 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8091 # create or register execution environment in VCA. Only for native charms when healing
8092 if vca_type
== "native_charm":
8093 step
= "Waiting to VM being up and getting IP address"
8094 self
.logger
.debug(logging_text
+ step
)
8095 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8104 credentials
= {"hostname": rw_mgmt_ip
}
8106 username
= deep_get(
8107 config_descriptor
, ("config-access", "ssh-access", "default-user")
8109 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8110 # merged. Meanwhile let's get username from initial-config-primitive
8111 if not username
and initial_config_primitive_list
:
8112 for config_primitive
in initial_config_primitive_list
:
8113 for param
in config_primitive
.get("parameter", ()):
8114 if param
["name"] == "ssh-username":
8115 username
= param
["value"]
8119 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8120 "'config-access.ssh-access.default-user'"
8122 credentials
["username"] = username
8124 # n2vc_redesign STEP 3.2
8125 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8126 self
._write
_configuration
_status
(
8128 vca_index
=vca_index
,
8129 status
="REGISTERING",
8130 element_under_configuration
=element_under_configuration
,
8131 element_type
=element_type
,
8134 step
= "register execution environment {}".format(credentials
)
8135 self
.logger
.debug(logging_text
+ step
)
8136 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8137 credentials
=credentials
,
8138 namespace
=namespace
,
8143 # update ee_id en db
8145 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8147 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8149 # for compatibility with MON/POL modules, the need model and application name at database
8150 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8151 # Not sure if this need to be done when healing
8153 ee_id_parts = ee_id.split(".")
8154 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8155 if len(ee_id_parts) >= 2:
8156 model_name = ee_id_parts[0]
8157 application_name = ee_id_parts[1]
8158 db_nsr_update[db_update_entry + "model"] = model_name
8159 db_nsr_update[db_update_entry + "application"] = application_name
8162 # n2vc_redesign STEP 3.3
8163 # Install configuration software. Only for native charms.
8164 step
= "Install configuration Software"
8166 self
._write
_configuration
_status
(
8168 vca_index
=vca_index
,
8169 status
="INSTALLING SW",
8170 element_under_configuration
=element_under_configuration
,
8171 element_type
=element_type
,
8172 #other_update=db_nsr_update,
8176 # TODO check if already done
8177 self
.logger
.debug(logging_text
+ step
)
8179 if vca_type
== "native_charm":
8180 config_primitive
= next(
8181 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8184 if config_primitive
:
8185 config
= self
._map
_primitive
_params
(
8186 config_primitive
, {}, deploy_params
8188 await self
.vca_map
[vca_type
].install_configuration_sw(
8190 artifact_path
=artifact_path
,
8198 # write in db flag of configuration_sw already installed
8200 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8203 # Not sure if this need to be done when healing
8205 # add relations for this VCA (wait for other peers related with this VCA)
8206 await self._add_vca_relations(
8207 logging_text=logging_text,
8210 vca_index=vca_index,
8214 # if SSH access is required, then get execution environment SSH public
8215 # if native charm we have waited already to VM be UP
8216 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8219 # self.logger.debug("get ssh key block")
8221 config_descriptor
, ("config-access", "ssh-access", "required")
8223 # self.logger.debug("ssh key needed")
8224 # Needed to inject a ssh key
8227 ("config-access", "ssh-access", "default-user"),
8229 step
= "Install configuration Software, getting public ssh key"
8230 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8231 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8234 step
= "Insert public key into VM user={} ssh_key={}".format(
8238 # self.logger.debug("no need to get ssh key")
8239 step
= "Waiting to VM being up and getting IP address"
8240 self
.logger
.debug(logging_text
+ step
)
8242 # n2vc_redesign STEP 5.1
8243 # wait for RO (ip-address) Insert pub_key into VM
8244 # IMPORTANT: We need do wait for RO to complete healing operation.
8245 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8248 rw_mgmt_ip
= await self
.wait_kdu_up(
8249 logging_text
, nsr_id
, vnfr_id
, kdu_name
8252 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8262 rw_mgmt_ip
= None # This is for a NS configuration
8264 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8266 # store rw_mgmt_ip in deploy params for later replacement
8267 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8270 # get run-day1 operation parameter
8271 runDay1
= deploy_params
.get("run-day1",False)
8272 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8274 # n2vc_redesign STEP 6 Execute initial config primitive
8275 step
= "execute initial config primitive"
8277 # wait for dependent primitives execution (NS -> VNF -> VDU)
8278 if initial_config_primitive_list
:
8279 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8281 # stage, in function of element type: vdu, kdu, vnf or ns
8282 my_vca
= vca_deployed_list
[vca_index
]
8283 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8285 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8286 elif my_vca
.get("member-vnf-index"):
8288 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8291 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8293 self
._write
_configuration
_status
(
8294 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8297 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8299 check_if_terminated_needed
= True
8300 for initial_config_primitive
in initial_config_primitive_list
:
8301 # adding information on the vca_deployed if it is a NS execution environment
8302 if not vca_deployed
["member-vnf-index"]:
8303 deploy_params
["ns_config_info"] = json
.dumps(
8304 self
._get
_ns
_config
_info
(nsr_id
)
8306 # TODO check if already done
8307 primitive_params_
= self
._map
_primitive
_params
(
8308 initial_config_primitive
, {}, deploy_params
8311 step
= "execute primitive '{}' params '{}'".format(
8312 initial_config_primitive
["name"], primitive_params_
8314 self
.logger
.debug(logging_text
+ step
)
8315 await self
.vca_map
[vca_type
].exec_primitive(
8317 primitive_name
=initial_config_primitive
["name"],
8318 params_dict
=primitive_params_
,
8323 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8324 if check_if_terminated_needed
:
8325 if config_descriptor
.get("terminate-config-primitive"):
8327 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8329 check_if_terminated_needed
= False
8331 # TODO register in database that primitive is done
8333 # STEP 7 Configure metrics
8334 # Not sure if this need to be done when healing
8336 if vca_type == "helm" or vca_type == "helm-v3":
8337 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8339 artifact_path=artifact_path,
8340 ee_config_descriptor=ee_config_descriptor,
8343 target_ip=rw_mgmt_ip,
8349 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8352 for job in prometheus_jobs:
8355 {"job_name": job["job_name"]},
8358 fail_on_empty=False,
8362 step
= "instantiated at VCA"
8363 self
.logger
.debug(logging_text
+ step
)
8365 self
._write
_configuration
_status
(
8366 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8369 except Exception as e
: # TODO not use Exception but N2VC exception
8370 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8372 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8375 "Exception while {} : {}".format(step
, e
), exc_info
=True
8377 self
._write
_configuration
_status
(
8378 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8380 raise LcmException("{} {}".format(step
, e
)) from e
8382 async def _wait_heal_ro(
8388 while time() <= start_time
+ timeout
:
8389 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8390 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8391 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8392 if operational_status_ro
!= "healing":
8394 await asyncio
.sleep(15, loop
=self
.loop
)
8395 else: # timeout_ns_deploy
8396 raise NgRoException("Timeout waiting ns to deploy")
8398 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8400 Vertical Scale the VDUs in a NS
8402 :param: nsr_id: NS Instance ID
8403 :param: nslcmop_id: nslcmop ID of migrate
8406 # Try to lock HA task here
8407 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8408 if not task_is_locked_by_me
:
8410 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8411 self
.logger
.debug(logging_text
+ "Enter")
8412 # get all needed from database
8414 db_nslcmop_update
= {}
8415 nslcmop_operation_state
= None
8419 # in case of error, indicates what part of scale was failed to put nsr at error status
8420 start_deploy
= time()
8423 # wait for any previous tasks in process
8424 step
= "Waiting for previous operations to terminate"
8425 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8427 self
._write
_ns
_status
(
8430 current_operation
="VerticalScale",
8431 current_operation_id
=nslcmop_id
8433 step
= "Getting nslcmop from database"
8434 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8435 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8436 operationParams
= db_nslcmop
.get("operationParams")
8438 target
.update(operationParams
)
8439 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8440 self
.logger
.debug("RO return > {}".format(desc
))
8441 action_id
= desc
["action_id"]
8442 await self
._wait
_ng
_ro
(
8443 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8444 operation
="verticalscale"
8446 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8447 self
.logger
.error("Exit Exception {}".format(e
))
8449 except asyncio
.CancelledError
:
8450 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8451 exc
= "Operation was cancelled"
8452 except Exception as e
:
8453 exc
= traceback
.format_exc()
8454 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8456 self
._write
_ns
_status
(
8459 current_operation
="IDLE",
8460 current_operation_id
=None,
8465 ] = "FAILED {}: {}".format(step
, exc
)
8466 nslcmop_operation_state
= "FAILED"
8468 nslcmop_operation_state
= "COMPLETED"
8469 db_nslcmop_update
["detailed-status"] = "Done"
8470 db_nsr_update
["detailed-status"] = "Done"
8472 self
._write
_op
_status
(
8476 operation_state
=nslcmop_operation_state
,
8477 other_update
=db_nslcmop_update
,
8479 if nslcmop_operation_state
:
8483 "nslcmop_id": nslcmop_id
,
8484 "operationState": nslcmop_operation_state
,
8486 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8487 except Exception as e
:
8489 logging_text
+ "kafka_write notification Exception {}".format(e
)
8491 self
.logger
.debug(logging_text
+ "Exit")
8492 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")