1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from typing
import Any
, Dict
, List
24 import logging
.handlers
35 from osm_lcm
import ROclient
36 from osm_lcm
.data_utils
.nsr
import (
39 get_deployed_vca_list
,
42 from osm_lcm
.data_utils
.vca
import (
51 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
52 from osm_lcm
.lcm_utils
import (
59 check_juju_bundle_existence
,
60 get_charm_artifact_path
,
62 from osm_lcm
.data_utils
.nsd
import (
63 get_ns_configuration_relation_list
,
67 from osm_lcm
.data_utils
.vnfd
import (
73 get_ee_sorted_initial_config_primitive_list
,
74 get_ee_sorted_terminate_config_primitive_list
,
76 get_virtual_link_profiles
,
81 get_number_of_instances
,
83 get_kdu_resource_profile
,
84 find_software_version
,
86 from osm_lcm
.data_utils
.list_utils
import find_in_list
87 from osm_lcm
.data_utils
.vnfr
import (
91 get_volumes_from_instantiation_params
,
93 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
94 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
95 from n2vc
.definitions
import RelationEndpoint
96 from n2vc
.k8s_helm_conn
import K8sHelmConnector
97 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
98 from n2vc
.k8s_juju_conn
import K8sJujuConnector
100 from osm_common
.dbbase
import DbException
101 from osm_common
.fsbase
import FsException
103 from osm_lcm
.data_utils
.database
.database
import Database
104 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
105 from osm_lcm
.data_utils
.wim
import (
107 get_target_wim_attrs
,
108 select_feasible_wim_account
,
111 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
112 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
114 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
115 from osm_lcm
.osm_config
import OsmConfigBuilder
116 from osm_lcm
.prometheus
import parse_job
118 from copy
import copy
, deepcopy
119 from time
import time
120 from uuid
import uuid4
122 from random
import randint
124 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
127 class NsLcm(LcmBase
):
128 timeout_vca_on_error
= (
130 ) # Time for charm from first time at blocked,error status to mark as failed
131 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
132 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
133 timeout_ns_heal
= 1800 # default global timeout for un deployment a ns
134 timeout_charm_delete
= 10 * 60
135 timeout_primitive
= 30 * 60 # timeout for primitive execution
136 timeout_ns_update
= 30 * 60 # timeout for ns update
137 timeout_progress_primitive
= (
139 ) # timeout for some progress in a primitive execution
140 timeout_migrate
= 1800 # default global timeout for migrating vnfs
141 timeout_operate
= 1800 # default global timeout for migrating vnfs
142 timeout_verticalscale
= 1800 # default global timeout for Vertical Sclaing
143 SUBOPERATION_STATUS_NOT_FOUND
= -1
144 SUBOPERATION_STATUS_NEW
= -2
145 SUBOPERATION_STATUS_SKIP
= -3
146 task_name_deploy_vca
= "Deploying VCA"
148 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
150 Init, Connect to database, filesystem storage, and messaging
151 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
154 super().__init
__(msg
=msg
, logger
=logging
.getLogger("lcm.ns"))
156 self
.db
= Database().instance
.db
157 self
.fs
= Filesystem().instance
.fs
159 self
.lcm_tasks
= lcm_tasks
160 self
.timeout
= config
["timeout"]
161 self
.ro_config
= config
["ro_config"]
162 self
.ng_ro
= config
["ro_config"].get("ng")
163 self
.vca_config
= config
["VCA"].copy()
165 # create N2VC connector
166 self
.n2vc
= N2VCJujuConnector(
169 on_update_db
=self
._on
_update
_n
2vc
_db
,
174 self
.conn_helm_ee
= LCMHelmConn(
177 vca_config
=self
.vca_config
,
178 on_update_db
=self
._on
_update
_n
2vc
_db
,
181 self
.k8sclusterhelm2
= K8sHelmConnector(
182 kubectl_command
=self
.vca_config
.get("kubectlpath"),
183 helm_command
=self
.vca_config
.get("helmpath"),
190 self
.k8sclusterhelm3
= K8sHelm3Connector(
191 kubectl_command
=self
.vca_config
.get("kubectlpath"),
192 helm_command
=self
.vca_config
.get("helm3path"),
199 self
.k8sclusterjuju
= K8sJujuConnector(
200 kubectl_command
=self
.vca_config
.get("kubectlpath"),
201 juju_command
=self
.vca_config
.get("jujupath"),
204 on_update_db
=self
._on
_update
_k
8s
_db
,
209 self
.k8scluster_map
= {
210 "helm-chart": self
.k8sclusterhelm2
,
211 "helm-chart-v3": self
.k8sclusterhelm3
,
212 "chart": self
.k8sclusterhelm3
,
213 "juju-bundle": self
.k8sclusterjuju
,
214 "juju": self
.k8sclusterjuju
,
218 "lxc_proxy_charm": self
.n2vc
,
219 "native_charm": self
.n2vc
,
220 "k8s_proxy_charm": self
.n2vc
,
221 "helm": self
.conn_helm_ee
,
222 "helm-v3": self
.conn_helm_ee
,
226 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
228 self
.op_status_map
= {
229 "instantiation": self
.RO
.status
,
230 "termination": self
.RO
.status
,
231 "migrate": self
.RO
.status
,
232 "healing": self
.RO
.recreate_status
,
233 "verticalscale": self
.RO
.status
,
234 "start_stop_rebuild": self
.RO
.status
,
238 def increment_ip_mac(ip_mac
, vm_index
=1):
239 if not isinstance(ip_mac
, str):
242 # try with ipv4 look for last dot
243 i
= ip_mac
.rfind(".")
246 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
247 # try with ipv6 or mac look for last colon. Operate in hex
248 i
= ip_mac
.rfind(":")
251 # format in hex, len can be 2 for mac or 4 for ipv6
252 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
253 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
259 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
261 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
264 # TODO filter RO descriptor fields...
268 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
269 db_dict
["deploymentStatus"] = ro_descriptor
270 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
272 except Exception as e
:
274 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id
, e
)
277 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
, vca_id
=None):
279 # remove last dot from path (if exists)
280 if path
.endswith("."):
283 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
284 # .format(table, filter, path, updated_data))
287 nsr_id
= filter.get("_id")
289 # read ns record from database
290 nsr
= self
.db
.get_one(table
="nsrs", q_filter
=filter)
291 current_ns_status
= nsr
.get("nsState")
293 # get vca status for NS
294 status_dict
= await self
.n2vc
.get_status(
295 namespace
="." + nsr_id
, yaml_format
=False, vca_id
=vca_id
300 db_dict
["vcaStatus"] = status_dict
301 await self
.n2vc
.update_vca_status(db_dict
["vcaStatus"], vca_id
=vca_id
)
303 # update configurationStatus for this VCA
305 vca_index
= int(path
[path
.rfind(".") + 1 :])
308 target_dict
=nsr
, key_list
=("_admin", "deployed", "VCA")
310 vca_status
= vca_list
[vca_index
].get("status")
312 configuration_status_list
= nsr
.get("configurationStatus")
313 config_status
= configuration_status_list
[vca_index
].get("status")
315 if config_status
== "BROKEN" and vca_status
!= "failed":
316 db_dict
["configurationStatus"][vca_index
] = "READY"
317 elif config_status
!= "BROKEN" and vca_status
== "failed":
318 db_dict
["configurationStatus"][vca_index
] = "BROKEN"
319 except Exception as e
:
320 # not update configurationStatus
321 self
.logger
.debug("Error updating vca_index (ignore): {}".format(e
))
323 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
324 # if nsState = 'DEGRADED' check if all is OK
326 if current_ns_status
in ("READY", "DEGRADED"):
327 error_description
= ""
329 if status_dict
.get("machines"):
330 for machine_id
in status_dict
.get("machines"):
331 machine
= status_dict
.get("machines").get(machine_id
)
332 # check machine agent-status
333 if machine
.get("agent-status"):
334 s
= machine
.get("agent-status").get("status")
337 error_description
+= (
338 "machine {} agent-status={} ; ".format(
342 # check machine instance status
343 if machine
.get("instance-status"):
344 s
= machine
.get("instance-status").get("status")
347 error_description
+= (
348 "machine {} instance-status={} ; ".format(
353 if status_dict
.get("applications"):
354 for app_id
in status_dict
.get("applications"):
355 app
= status_dict
.get("applications").get(app_id
)
356 # check application status
357 if app
.get("status"):
358 s
= app
.get("status").get("status")
361 error_description
+= (
362 "application {} status={} ; ".format(app_id
, s
)
365 if error_description
:
366 db_dict
["errorDescription"] = error_description
367 if current_ns_status
== "READY" and is_degraded
:
368 db_dict
["nsState"] = "DEGRADED"
369 if current_ns_status
== "DEGRADED" and not is_degraded
:
370 db_dict
["nsState"] = "READY"
373 self
.update_db_2("nsrs", nsr_id
, db_dict
)
375 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
377 except Exception as e
:
378 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
380 async def _on_update_k8s_db(
381 self
, cluster_uuid
, kdu_instance
, filter=None, vca_id
=None, cluster_type
="juju"
384 Updating vca status in NSR record
385 :param cluster_uuid: UUID of a k8s cluster
386 :param kdu_instance: The unique name of the KDU instance
387 :param filter: To get nsr_id
388 :cluster_type: The cluster type (juju, k8s)
392 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
393 # .format(cluster_uuid, kdu_instance, filter))
395 nsr_id
= filter.get("_id")
397 vca_status
= await self
.k8scluster_map
[cluster_type
].status_kdu(
398 cluster_uuid
=cluster_uuid
,
399 kdu_instance
=kdu_instance
,
401 complete_status
=True,
407 db_dict
["vcaStatus"] = {nsr_id
: vca_status
}
409 if cluster_type
in ("juju-bundle", "juju"):
410 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
411 # status in a similar way between Juju Bundles and Helm Charts on this side
412 await self
.k8sclusterjuju
.update_vca_status(
413 db_dict
["vcaStatus"],
419 f
"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
423 self
.update_db_2("nsrs", nsr_id
, db_dict
)
424 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
426 except Exception as e
:
427 self
.logger
.warn("Error updating NS state for ns={}: {}".format(nsr_id
, e
))
430 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
432 env
= Environment(undefined
=StrictUndefined
, autoescape
=True)
433 template
= env
.from_string(cloud_init_text
)
434 return template
.render(additional_params
or {})
435 except UndefinedError
as e
:
437 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
438 "file, must be provided in the instantiation parameters inside the "
439 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
)
441 except (TemplateError
, TemplateNotFound
) as e
:
443 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
448 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
449 cloud_init_content
= cloud_init_file
= None
451 if vdu
.get("cloud-init-file"):
452 base_folder
= vnfd
["_admin"]["storage"]
453 if base_folder
["pkg-dir"]:
454 cloud_init_file
= "{}/{}/cloud_init/{}".format(
455 base_folder
["folder"],
456 base_folder
["pkg-dir"],
457 vdu
["cloud-init-file"],
460 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
461 base_folder
["folder"],
462 vdu
["cloud-init-file"],
464 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
465 cloud_init_content
= ci_file
.read()
466 elif vdu
.get("cloud-init"):
467 cloud_init_content
= vdu
["cloud-init"]
469 return cloud_init_content
470 except FsException
as e
:
472 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
473 vnfd
["id"], vdu
["id"], cloud_init_file
, e
477 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
479 (vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"]), {}
481 additional_params
= vdur
.get("additionalParams")
482 return parse_yaml_strings(additional_params
)
484 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
486 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
487 :param vnfd: input vnfd
488 :param new_id: overrides vnf id if provided
489 :param additionalParams: Instantiation params for VNFs provided
490 :param nsrId: Id of the NSR
491 :return: copy of vnfd
493 vnfd_RO
= deepcopy(vnfd
)
494 # remove unused by RO configuration, monitoring, scaling and internal keys
495 vnfd_RO
.pop("_id", None)
496 vnfd_RO
.pop("_admin", None)
497 vnfd_RO
.pop("monitoring-param", None)
498 vnfd_RO
.pop("scaling-group-descriptor", None)
499 vnfd_RO
.pop("kdu", None)
500 vnfd_RO
.pop("k8s-cluster", None)
502 vnfd_RO
["id"] = new_id
504 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
505 for vdu
in get_iterable(vnfd_RO
, "vdu"):
506 vdu
.pop("cloud-init-file", None)
507 vdu
.pop("cloud-init", None)
511 def ip_profile_2_RO(ip_profile
):
512 RO_ip_profile
= deepcopy(ip_profile
)
513 if "dns-server" in RO_ip_profile
:
514 if isinstance(RO_ip_profile
["dns-server"], list):
515 RO_ip_profile
["dns-address"] = []
516 for ds
in RO_ip_profile
.pop("dns-server"):
517 RO_ip_profile
["dns-address"].append(ds
["address"])
519 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
520 if RO_ip_profile
.get("ip-version") == "ipv4":
521 RO_ip_profile
["ip-version"] = "IPv4"
522 if RO_ip_profile
.get("ip-version") == "ipv6":
523 RO_ip_profile
["ip-version"] = "IPv6"
524 if "dhcp-params" in RO_ip_profile
:
525 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
528 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
529 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
530 if db_vim
["_admin"]["operationalState"] != "ENABLED":
532 "VIM={} is not available. operationalState={}".format(
533 vim_account
, db_vim
["_admin"]["operationalState"]
536 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
539 def get_ro_wim_id_for_wim_account(self
, wim_account
):
540 if isinstance(wim_account
, str):
541 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
542 if db_wim
["_admin"]["operationalState"] != "ENABLED":
544 "WIM={} is not available. operationalState={}".format(
545 wim_account
, db_wim
["_admin"]["operationalState"]
548 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
553 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
555 db_vdu_push_list
= []
557 db_update
= {"_admin.modified": time()}
559 for vdu_id
, vdu_count
in vdu_create
.items():
563 for vdur
in reversed(db_vnfr
["vdur"])
564 if vdur
["vdu-id-ref"] == vdu_id
569 # Read the template saved in the db:
571 "No vdur in the database. Using the vdur-template to scale"
573 vdur_template
= db_vnfr
.get("vdur-template")
574 if not vdur_template
:
576 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
580 vdur
= vdur_template
[0]
581 # Delete a template from the database after using it
584 {"_id": db_vnfr
["_id"]},
586 pull
={"vdur-template": {"_id": vdur
["_id"]}},
588 for count
in range(vdu_count
):
589 vdur_copy
= deepcopy(vdur
)
590 vdur_copy
["status"] = "BUILD"
591 vdur_copy
["status-detailed"] = None
592 vdur_copy
["ip-address"] = None
593 vdur_copy
["_id"] = str(uuid4())
594 vdur_copy
["count-index"] += count
+ 1
595 vdur_copy
["id"] = "{}-{}".format(
596 vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"]
598 vdur_copy
.pop("vim_info", None)
599 for iface
in vdur_copy
["interfaces"]:
600 if iface
.get("fixed-ip"):
601 iface
["ip-address"] = self
.increment_ip_mac(
602 iface
["ip-address"], count
+ 1
605 iface
.pop("ip-address", None)
606 if iface
.get("fixed-mac"):
607 iface
["mac-address"] = self
.increment_ip_mac(
608 iface
["mac-address"], count
+ 1
611 iface
.pop("mac-address", None)
615 ) # only first vdu can be managment of vnf
616 db_vdu_push_list
.append(vdur_copy
)
617 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
619 if len(db_vnfr
["vdur"]) == 1:
620 # The scale will move to 0 instances
622 "Scaling to 0 !, creating the template with the last vdur"
624 template_vdur
= [db_vnfr
["vdur"][0]]
625 for vdu_id
, vdu_count
in vdu_delete
.items():
627 indexes_to_delete
= [
629 for iv
in enumerate(db_vnfr
["vdur"])
630 if iv
[1]["vdu-id-ref"] == vdu_id
634 "vdur.{}.status".format(i
): "DELETING"
635 for i
in indexes_to_delete
[-vdu_count
:]
639 # it must be deleted one by one because common.db does not allow otherwise
642 for v
in reversed(db_vnfr
["vdur"])
643 if v
["vdu-id-ref"] == vdu_id
645 for vdu
in vdus_to_delete
[:vdu_count
]:
648 {"_id": db_vnfr
["_id"]},
650 pull
={"vdur": {"_id": vdu
["_id"]}},
654 db_push
["vdur"] = db_vdu_push_list
656 db_push
["vdur-template"] = template_vdur
659 db_vnfr
["vdur-template"] = template_vdur
660 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
661 # modify passed dictionary db_vnfr
662 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
663 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
665 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
667 Updates database nsr with the RO info for the created vld
668 :param ns_update_nsr: dictionary to be filled with the updated info
669 :param db_nsr: content of db_nsr. This is also modified
670 :param nsr_desc_RO: nsr descriptor from RO
671 :return: Nothing, LcmException is raised on errors
674 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
675 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
676 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
678 vld
["vim-id"] = net_RO
.get("vim_net_id")
679 vld
["name"] = net_RO
.get("vim_name")
680 vld
["status"] = net_RO
.get("status")
681 vld
["status-detailed"] = net_RO
.get("error_msg")
682 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
686 "ns_update_nsr: Not found vld={} at RO info".format(vld
["id"])
689 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
691 for db_vnfr
in db_vnfrs
.values():
692 vnfr_update
= {"status": "ERROR"}
693 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
694 if "status" not in vdur
:
695 vdur
["status"] = "ERROR"
696 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
698 vdur
["status-detailed"] = str(error_text
)
700 "vdur.{}.status-detailed".format(vdu_index
)
702 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
703 except DbException
as e
:
704 self
.logger
.error("Cannot update vnf. {}".format(e
))
706 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
708 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
709 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
710 :param nsr_desc_RO: nsr descriptor from RO
711 :return: Nothing, LcmException is raised on errors
713 for vnf_index
, db_vnfr
in db_vnfrs
.items():
714 for vnf_RO
in nsr_desc_RO
["vnfs"]:
715 if vnf_RO
["member_vnf_index"] != vnf_index
:
718 if vnf_RO
.get("ip_address"):
719 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
[
722 elif not db_vnfr
.get("ip-address"):
723 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
724 raise LcmExceptionNoMgmtIP(
725 "ns member_vnf_index '{}' has no IP address".format(
730 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
731 vdur_RO_count_index
= 0
732 if vdur
.get("pdu-type"):
734 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
735 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
737 if vdur
["count-index"] != vdur_RO_count_index
:
738 vdur_RO_count_index
+= 1
740 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
741 if vdur_RO
.get("ip_address"):
742 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
744 vdur
["ip-address"] = None
745 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
746 vdur
["name"] = vdur_RO
.get("vim_name")
747 vdur
["status"] = vdur_RO
.get("status")
748 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
749 for ifacer
in get_iterable(vdur
, "interfaces"):
750 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
751 if ifacer
["name"] == interface_RO
.get("internal_name"):
752 ifacer
["ip-address"] = interface_RO
.get(
755 ifacer
["mac-address"] = interface_RO
.get(
761 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
762 "from VIM info".format(
763 vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]
766 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
770 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
772 vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]
776 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
777 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
778 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
780 vld
["vim-id"] = net_RO
.get("vim_net_id")
781 vld
["name"] = net_RO
.get("vim_name")
782 vld
["status"] = net_RO
.get("status")
783 vld
["status-detailed"] = net_RO
.get("error_msg")
784 vnfr_update
["vld.{}".format(vld_index
)] = vld
788 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
793 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
798 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
803 def _get_ns_config_info(self
, nsr_id
):
805 Generates a mapping between vnf,vdu elements and the N2VC id
806 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
807 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
808 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
809 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
811 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
812 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
814 ns_config_info
= {"osm-config-mapping": mapping
}
815 for vca
in vca_deployed_list
:
816 if not vca
["member-vnf-index"]:
818 if not vca
["vdu_id"]:
819 mapping
[vca
["member-vnf-index"]] = vca
["application"]
823 vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"]
825 ] = vca
["application"]
826 return ns_config_info
828 async def _instantiate_ng_ro(
845 def get_vim_account(vim_account_id
):
847 if vim_account_id
in db_vims
:
848 return db_vims
[vim_account_id
]
849 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
850 db_vims
[vim_account_id
] = db_vim
853 # modify target_vld info with instantiation parameters
854 def parse_vld_instantiation_params(
855 target_vim
, target_vld
, vld_params
, target_sdn
857 if vld_params
.get("ip-profile"):
858 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
[
861 if vld_params
.get("provider-network"):
862 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
[
865 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
866 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
[
870 # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
871 # if wim_account_id is specified in vld_params, validate if it is feasible.
872 wim_account_id
, db_wim
= select_feasible_wim_account(
873 db_nsr
, db_vnfrs
, target_vld
, vld_params
, self
.logger
877 # WIM is needed and a feasible one was found, populate WIM target and SDN ports
878 self
.logger
.info("WIM selected: {:s}".format(str(wim_account_id
)))
879 # update vld_params with correct WIM account Id
880 vld_params
["wimAccountId"] = wim_account_id
882 target_wim
= "wim:{}".format(wim_account_id
)
883 target_wim_attrs
= get_target_wim_attrs(nsr_id
, target_vld
, vld_params
)
884 sdn_ports
= get_sdn_ports(vld_params
, db_wim
)
885 if len(sdn_ports
) > 0:
886 target_vld
["vim_info"][target_wim
] = target_wim_attrs
887 target_vld
["vim_info"][target_wim
]["sdn-ports"] = sdn_ports
890 "Target VLD with WIM data: {:s}".format(str(target_vld
))
893 for param
in ("vim-network-name", "vim-network-id"):
894 if vld_params
.get(param
):
895 if isinstance(vld_params
[param
], dict):
896 for vim
, vim_net
in vld_params
[param
].items():
897 other_target_vim
= "vim:" + vim
899 target_vld
["vim_info"],
900 (other_target_vim
, param
.replace("-", "_")),
903 else: # isinstance str
904 target_vld
["vim_info"][target_vim
][
905 param
.replace("-", "_")
906 ] = vld_params
[param
]
907 if vld_params
.get("common_id"):
908 target_vld
["common_id"] = vld_params
.get("common_id")
910 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
911 def update_ns_vld_target(target
, ns_params
):
912 for vnf_params
in ns_params
.get("vnf", ()):
913 if vnf_params
.get("vimAccountId"):
917 for vnfr
in db_vnfrs
.values()
918 if vnf_params
["member-vnf-index"]
919 == vnfr
["member-vnf-index-ref"]
923 vdur
= next((vdur
for vdur
in target_vnf
.get("vdur", ())), None)
924 for a_index
, a_vld
in enumerate(target
["ns"]["vld"]):
925 target_vld
= find_in_list(
926 get_iterable(vdur
, "interfaces"),
927 lambda iface
: iface
.get("ns-vld-id") == a_vld
["name"],
930 vld_params
= find_in_list(
931 get_iterable(ns_params
, "vld"),
932 lambda v_vld
: v_vld
["name"] in (a_vld
["name"], a_vld
["id"]),
936 if vnf_params
.get("vimAccountId") not in a_vld
.get(
939 target_vim_network_list
= [
940 v
for _
, v
in a_vld
.get("vim_info").items()
942 target_vim_network_name
= next(
944 item
.get("vim_network_name", "")
945 for item
in target_vim_network_list
950 target
["ns"]["vld"][a_index
].get("vim_info").update(
952 "vim:{}".format(vnf_params
["vimAccountId"]): {
953 "vim_network_name": target_vim_network_name
,
959 for param
in ("vim-network-name", "vim-network-id"):
960 if vld_params
.get(param
) and isinstance(
961 vld_params
[param
], dict
963 for vim
, vim_net
in vld_params
[
966 other_target_vim
= "vim:" + vim
968 target
["ns"]["vld"][a_index
].get(
973 param
.replace("-", "_"),
978 nslcmop_id
= db_nslcmop
["_id"]
980 "name": db_nsr
["name"],
983 "image": deepcopy(db_nsr
["image"]),
984 "flavor": deepcopy(db_nsr
["flavor"]),
985 "action_id": nslcmop_id
,
986 "cloud_init_content": {},
988 for image
in target
["image"]:
989 image
["vim_info"] = {}
990 for flavor
in target
["flavor"]:
991 flavor
["vim_info"] = {}
992 if db_nsr
.get("affinity-or-anti-affinity-group"):
993 target
["affinity-or-anti-affinity-group"] = deepcopy(
994 db_nsr
["affinity-or-anti-affinity-group"]
996 for affinity_or_anti_affinity_group
in target
[
997 "affinity-or-anti-affinity-group"
999 affinity_or_anti_affinity_group
["vim_info"] = {}
1001 if db_nslcmop
.get("lcmOperationType") != "instantiate":
1002 # get parameters of instantiation:
1003 db_nslcmop_instantiate
= self
.db
.get_list(
1006 "nsInstanceId": db_nslcmop
["nsInstanceId"],
1007 "lcmOperationType": "instantiate",
1010 ns_params
= db_nslcmop_instantiate
.get("operationParams")
1012 ns_params
= db_nslcmop
.get("operationParams")
1013 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
1014 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
1017 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
1018 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
1021 "name": vld
["name"],
1022 "mgmt-network": vld
.get("mgmt-network", False),
1023 "type": vld
.get("type"),
1026 "vim_network_name": vld
.get("vim-network-name"),
1027 "vim_account_id": ns_params
["vimAccountId"],
1031 # check if this network needs SDN assist
1032 if vld
.get("pci-interfaces"):
1033 db_vim
= get_vim_account(ns_params
["vimAccountId"])
1034 if vim_config
:= db_vim
.get("config"):
1035 if sdnc_id
:= vim_config
.get("sdn-controller"):
1036 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
1037 target_sdn
= "sdn:{}".format(sdnc_id
)
1038 target_vld
["vim_info"][target_sdn
] = {
1040 "target_vim": target_vim
,
1042 "type": vld
.get("type"),
1045 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
1046 for nsd_vnf_profile
in nsd_vnf_profiles
:
1047 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
1048 if cp
["virtual-link-profile-id"] == vld
["id"]:
1050 "member_vnf:{}.{}".format(
1051 cp
["constituent-cpd-id"][0][
1052 "constituent-base-element-id"
1054 cp
["constituent-cpd-id"][0]["constituent-cpd-id"],
1056 ] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
1058 # check at nsd descriptor, if there is an ip-profile
1060 nsd_vlp
= find_in_list(
1061 get_virtual_link_profiles(nsd
),
1062 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"]
1067 and nsd_vlp
.get("virtual-link-protocol-data")
1068 and nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1070 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"][
1073 ip_profile_dest_data
= {}
1074 if "ip-version" in ip_profile_source_data
:
1075 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1078 if "cidr" in ip_profile_source_data
:
1079 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1082 if "gateway-ip" in ip_profile_source_data
:
1083 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
[
1086 if "dhcp-enabled" in ip_profile_source_data
:
1087 ip_profile_dest_data
["dhcp-params"] = {
1088 "enabled": ip_profile_source_data
["dhcp-enabled"]
1090 vld_params
["ip-profile"] = ip_profile_dest_data
1092 # update vld_params with instantiation params
1093 vld_instantiation_params
= find_in_list(
1094 get_iterable(ns_params
, "vld"),
1095 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]),
1097 if vld_instantiation_params
:
1098 vld_params
.update(vld_instantiation_params
)
1099 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
1100 target
["ns"]["vld"].append(target_vld
)
1101 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1102 update_ns_vld_target(target
, ns_params
)
1104 for vnfr
in db_vnfrs
.values():
1105 vnfd
= find_in_list(
1106 db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"]
1108 vnf_params
= find_in_list(
1109 get_iterable(ns_params
, "vnf"),
1110 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"],
1112 target_vnf
= deepcopy(vnfr
)
1113 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
1114 for vld
in target_vnf
.get("vld", ()):
1115 # check if connected to a ns.vld, to fill target'
1116 vnf_cp
= find_in_list(
1117 vnfd
.get("int-virtual-link-desc", ()),
1118 lambda cpd
: cpd
.get("id") == vld
["id"],
1121 ns_cp
= "member_vnf:{}.{}".format(
1122 vnfr
["member-vnf-index-ref"], vnf_cp
["id"]
1124 if cp2target
.get(ns_cp
):
1125 vld
["target"] = cp2target
[ns_cp
]
1128 target_vim
: {"vim_network_name": vld
.get("vim-network-name")}
1130 # check if this network needs SDN assist
1132 if vld
.get("pci-interfaces"):
1133 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1134 sdnc_id
= db_vim
["config"].get("sdn-controller")
1136 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
1137 target_sdn
= "sdn:{}".format(sdnc_id
)
1138 vld
["vim_info"][target_sdn
] = {
1140 "target_vim": target_vim
,
1142 "type": vld
.get("type"),
1145 # check at vnfd descriptor, if there is an ip-profile
1147 vnfd_vlp
= find_in_list(
1148 get_virtual_link_profiles(vnfd
),
1149 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"],
1153 and vnfd_vlp
.get("virtual-link-protocol-data")
1154 and vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data")
1156 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"][
1159 ip_profile_dest_data
= {}
1160 if "ip-version" in ip_profile_source_data
:
1161 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
[
1164 if "cidr" in ip_profile_source_data
:
1165 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
[
1168 if "gateway-ip" in ip_profile_source_data
:
1169 ip_profile_dest_data
[
1171 ] = ip_profile_source_data
["gateway-ip"]
1172 if "dhcp-enabled" in ip_profile_source_data
:
1173 ip_profile_dest_data
["dhcp-params"] = {
1174 "enabled": ip_profile_source_data
["dhcp-enabled"]
1177 vld_params
["ip-profile"] = ip_profile_dest_data
1178 # update vld_params with instantiation params
1180 vld_instantiation_params
= find_in_list(
1181 get_iterable(vnf_params
, "internal-vld"),
1182 lambda i_vld
: i_vld
["name"] == vld
["id"],
1184 if vld_instantiation_params
:
1185 vld_params
.update(vld_instantiation_params
)
1186 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
1189 for vdur
in target_vnf
.get("vdur", ()):
1190 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
1191 continue # This vdu must not be created
1192 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
1194 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
1197 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
1198 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
1201 and vdu_configuration
.get("config-access")
1202 and vdu_configuration
.get("config-access").get("ssh-access")
1204 vdur
["ssh-keys"] = ssh_keys_all
1205 vdur
["ssh-access-required"] = vdu_configuration
[
1207 ]["ssh-access"]["required"]
1210 and vnf_configuration
.get("config-access")
1211 and vnf_configuration
.get("config-access").get("ssh-access")
1212 and any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"])
1214 vdur
["ssh-keys"] = ssh_keys_all
1215 vdur
["ssh-access-required"] = vnf_configuration
[
1217 ]["ssh-access"]["required"]
1218 elif ssh_keys_instantiation
and find_in_list(
1219 vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")
1221 vdur
["ssh-keys"] = ssh_keys_instantiation
1223 self
.logger
.debug("NS > vdur > {}".format(vdur
))
1225 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
1227 if vdud
.get("cloud-init-file"):
1228 vdur
["cloud-init"] = "{}:file:{}".format(
1229 vnfd
["_id"], vdud
.get("cloud-init-file")
1231 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1232 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
1233 base_folder
= vnfd
["_admin"]["storage"]
1234 if base_folder
["pkg-dir"]:
1235 cloud_init_file
= "{}/{}/cloud_init/{}".format(
1236 base_folder
["folder"],
1237 base_folder
["pkg-dir"],
1238 vdud
.get("cloud-init-file"),
1241 cloud_init_file
= "{}/Scripts/cloud_init/{}".format(
1242 base_folder
["folder"],
1243 vdud
.get("cloud-init-file"),
1245 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
1246 target
["cloud_init_content"][
1249 elif vdud
.get("cloud-init"):
1250 vdur
["cloud-init"] = "{}:vdu:{}".format(
1251 vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"])
1253 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1254 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
[
1257 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
1258 deploy_params_vdu
= self
._format
_additional
_params
(
1259 vdur
.get("additionalParams") or {}
1261 deploy_params_vdu
["OSM"] = get_osm_params(
1262 vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"]
1264 vdur
["additionalParams"] = deploy_params_vdu
1267 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
1268 if target_vim
not in ns_flavor
["vim_info"]:
1269 ns_flavor
["vim_info"][target_vim
] = {}
1272 # in case alternative images are provided we must check if they should be applied
1273 # for the vim_type, modify the vim_type taking into account
1274 ns_image_id
= int(vdur
["ns-image-id"])
1275 if vdur
.get("alt-image-ids"):
1276 db_vim
= get_vim_account(vnfr
["vim-account-id"])
1277 vim_type
= db_vim
["vim_type"]
1278 for alt_image_id
in vdur
.get("alt-image-ids"):
1279 ns_alt_image
= target
["image"][int(alt_image_id
)]
1280 if vim_type
== ns_alt_image
.get("vim-type"):
1281 # must use alternative image
1283 "use alternative image id: {}".format(alt_image_id
)
1285 ns_image_id
= alt_image_id
1286 vdur
["ns-image-id"] = ns_image_id
1288 ns_image
= target
["image"][int(ns_image_id
)]
1289 if target_vim
not in ns_image
["vim_info"]:
1290 ns_image
["vim_info"][target_vim
] = {}
1293 if vdur
.get("affinity-or-anti-affinity-group-id"):
1294 for ags_id
in vdur
["affinity-or-anti-affinity-group-id"]:
1295 ns_ags
= target
["affinity-or-anti-affinity-group"][int(ags_id
)]
1296 if target_vim
not in ns_ags
["vim_info"]:
1297 ns_ags
["vim_info"][target_vim
] = {}
1299 vdur
["vim_info"] = {target_vim
: {}}
1300 # instantiation parameters
1302 vdu_instantiation_params
= find_in_list(
1303 get_iterable(vnf_params
, "vdu"),
1304 lambda i_vdu
: i_vdu
["id"] == vdud
["id"],
1306 if vdu_instantiation_params
:
1307 # Parse the vdu_volumes from the instantiation params
1308 vdu_volumes
= get_volumes_from_instantiation_params(
1309 vdu_instantiation_params
, vdud
1311 vdur
["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1312 vdur_list
.append(vdur
)
1313 target_vnf
["vdur"] = vdur_list
1314 target
["vnf"].append(target_vnf
)
1316 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
1317 desc
= await self
.RO
.deploy(nsr_id
, target
)
1318 self
.logger
.debug("RO return > {}".format(desc
))
1319 action_id
= desc
["action_id"]
1320 await self
._wait
_ng
_ro
(
1321 nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
,
1322 operation
="instantiation"
1327 "_admin.deployed.RO.operational-status": "running",
1328 "detailed-status": " ".join(stage
),
1330 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1331 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1332 self
._write
_op
_status
(nslcmop_id
, stage
)
1334 logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
)
1338 async def _wait_ng_ro(
1348 detailed_status_old
= None
1350 start_time
= start_time
or time()
1351 while time() <= start_time
+ timeout
:
1352 desc_status
= await self
.op_status_map
[operation
](nsr_id
, action_id
)
1353 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
1354 if desc_status
["status"] == "FAILED":
1355 raise NgRoException(desc_status
["details"])
1356 elif desc_status
["status"] == "BUILD":
1358 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1359 elif desc_status
["status"] == "DONE":
1361 stage
[2] = "Deployed at VIM"
1364 assert False, "ROclient.check_ns_status returns unknown {}".format(
1365 desc_status
["status"]
1367 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
1368 detailed_status_old
= stage
[2]
1369 db_nsr_update
["detailed-status"] = " ".join(stage
)
1370 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1371 self
._write
_op
_status
(nslcmop_id
, stage
)
1372 await asyncio
.sleep(15, loop
=self
.loop
)
1373 else: # timeout_ns_deploy
1374 raise NgRoException("Timeout waiting ns to deploy")
1376 async def _terminate_ng_ro(
1377 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
1382 start_deploy
= time()
1389 "action_id": nslcmop_id
,
1391 desc
= await self
.RO
.deploy(nsr_id
, target
)
1392 action_id
= desc
["action_id"]
1393 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1394 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1397 + "ns terminate action at RO. action_id={}".format(action_id
)
1401 delete_timeout
= 20 * 60 # 20 minutes
1402 await self
._wait
_ng
_ro
(
1403 nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
,
1404 operation
="termination"
1407 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1408 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1410 await self
.RO
.delete(nsr_id
)
1411 except Exception as e
:
1412 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1413 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1414 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1415 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1417 logging_text
+ "RO_action_id={} already deleted".format(action_id
)
1419 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1420 failed_detail
.append("delete conflict: {}".format(e
))
1423 + "RO_action_id={} delete conflict: {}".format(action_id
, e
)
1426 failed_detail
.append("delete error: {}".format(e
))
1429 + "RO_action_id={} delete error: {}".format(action_id
, e
)
1433 stage
[2] = "Error deleting from VIM"
1435 stage
[2] = "Deleted from VIM"
1436 db_nsr_update
["detailed-status"] = " ".join(stage
)
1437 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1438 self
._write
_op
_status
(nslcmop_id
, stage
)
1441 raise LcmException("; ".join(failed_detail
))
1444 async def instantiate_RO(
1458 :param logging_text: preffix text to use at logging
1459 :param nsr_id: nsr identity
1460 :param nsd: database content of ns descriptor
1461 :param db_nsr: database content of ns record
1462 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1464 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1465 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1466 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1467 :return: None or exception
1470 start_deploy
= time()
1471 ns_params
= db_nslcmop
.get("operationParams")
1472 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1473 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1475 timeout_ns_deploy
= self
.timeout
.get(
1476 "ns_deploy", self
.timeout_ns_deploy
1479 # Check for and optionally request placement optimization. Database will be updated if placement activated
1480 stage
[2] = "Waiting for Placement."
1481 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1482 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1483 for vnfr
in db_vnfrs
.values():
1484 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1487 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1489 return await self
._instantiate
_ng
_ro
(
1502 except Exception as e
:
1503 stage
[2] = "ERROR deploying at VIM"
1504 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1506 "Error deploying at VIM {}".format(e
),
1507 exc_info
=not isinstance(
1510 ROclient
.ROClientException
,
1519 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1521 Wait for kdu to be up, get ip address
1522 :param logging_text: prefix use for logging
1526 :return: IP address, K8s services
1529 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1532 while nb_tries
< 360:
1533 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1537 for x
in get_iterable(db_vnfr
, "kdur")
1538 if x
.get("kdu-name") == kdu_name
1544 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
)
1546 if kdur
.get("status"):
1547 if kdur
["status"] in ("READY", "ENABLED"):
1548 return kdur
.get("ip-address"), kdur
.get("services")
1551 "target KDU={} is in error state".format(kdu_name
)
1554 await asyncio
.sleep(10, loop
=self
.loop
)
1556 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1558 async def wait_vm_up_insert_key_ro(
1559 self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None
1562 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1563 :param logging_text: prefix use for logging
1568 :param pub_key: public ssh key to inject, None to skip
1569 :param user: user to apply the public ssh key
1573 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1577 target_vdu_id
= None
1583 if ro_retries
>= 360: # 1 hour
1585 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
)
1588 await asyncio
.sleep(10, loop
=self
.loop
)
1591 if not target_vdu_id
:
1592 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1594 if not vdu_id
: # for the VNF case
1595 if db_vnfr
.get("status") == "ERROR":
1597 "Cannot inject ssh-key because target VNF is in error state"
1599 ip_address
= db_vnfr
.get("ip-address")
1605 for x
in get_iterable(db_vnfr
, "vdur")
1606 if x
.get("ip-address") == ip_address
1614 for x
in get_iterable(db_vnfr
, "vdur")
1615 if x
.get("vdu-id-ref") == vdu_id
1616 and x
.get("count-index") == vdu_index
1622 not vdur
and len(db_vnfr
.get("vdur", ())) == 1
1623 ): # If only one, this should be the target vdu
1624 vdur
= db_vnfr
["vdur"][0]
1627 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1628 vnfr_id
, vdu_id
, vdu_index
1631 # New generation RO stores information at "vim_info"
1634 if vdur
.get("vim_info"):
1636 t
for t
in vdur
["vim_info"]
1637 ) # there should be only one key
1638 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1640 vdur
.get("pdu-type")
1641 or vdur
.get("status") == "ACTIVE"
1642 or ng_ro_status
== "ACTIVE"
1644 ip_address
= vdur
.get("ip-address")
1647 target_vdu_id
= vdur
["vdu-id-ref"]
1648 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1650 "Cannot inject ssh-key because target VM is in error state"
1653 if not target_vdu_id
:
1656 # inject public key into machine
1657 if pub_key
and user
:
1658 self
.logger
.debug(logging_text
+ "Inserting RO key")
1659 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1660 if vdur
.get("pdu-type"):
1661 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1664 ro_vm_id
= "{}-{}".format(
1665 db_vnfr
["member-vnf-index-ref"], target_vdu_id
1666 ) # TODO add vdu_index
1670 "action": "inject_ssh_key",
1674 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1676 desc
= await self
.RO
.deploy(nsr_id
, target
)
1677 action_id
= desc
["action_id"]
1678 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600, operation
="instantiation")
1681 # wait until NS is deployed at RO
1683 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1684 ro_nsr_id
= deep_get(
1685 db_nsrs
, ("_admin", "deployed", "RO", "nsr_id")
1689 result_dict
= await self
.RO
.create_action(
1691 item_id_name
=ro_nsr_id
,
1693 "add_public_key": pub_key
,
1698 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1699 if not result_dict
or not isinstance(result_dict
, dict):
1701 "Unknown response from RO when injecting key"
1703 for result
in result_dict
.values():
1704 if result
.get("vim_result") == 200:
1707 raise ROclient
.ROClientException(
1708 "error injecting key: {}".format(
1709 result
.get("description")
1713 except NgRoException
as e
:
1715 "Reaching max tries injecting key. Error: {}".format(e
)
1717 except ROclient
.ROClientException
as e
:
1721 + "error injecting key: {}. Retrying until {} seconds".format(
1728 "Reaching max tries injecting key. Error: {}".format(e
)
1735 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1737 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1739 my_vca
= vca_deployed_list
[vca_index
]
1740 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1741 # vdu or kdu: no dependencies
1745 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1746 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1747 configuration_status_list
= db_nsr
["configurationStatus"]
1748 for index
, vca_deployed
in enumerate(configuration_status_list
):
1749 if index
== vca_index
:
1752 if not my_vca
.get("member-vnf-index") or (
1753 vca_deployed
.get("member-vnf-index")
1754 == my_vca
.get("member-vnf-index")
1756 internal_status
= configuration_status_list
[index
].get("status")
1757 if internal_status
== "READY":
1759 elif internal_status
== "BROKEN":
1761 "Configuration aborted because dependent charm/s has failed"
1766 # no dependencies, return
1768 await asyncio
.sleep(10)
1771 raise LcmException("Configuration aborted because dependent charm/s timeout")
1773 def get_vca_id(self
, db_vnfr
: dict, db_nsr
: dict):
1776 vca_id
= deep_get(db_vnfr
, ("vca-id",))
1778 vim_account_id
= deep_get(db_nsr
, ("instantiate_params", "vimAccountId"))
1779 vca_id
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("vca")
1782 async def instantiate_N2VC(
1799 ee_config_descriptor
,
1801 nsr_id
= db_nsr
["_id"]
1802 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1803 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1804 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1805 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1807 "collection": "nsrs",
1808 "filter": {"_id": nsr_id
},
1809 "path": db_update_entry
,
1815 element_under_configuration
= nsr_id
1819 vnfr_id
= db_vnfr
["_id"]
1820 osm_config
["osm"]["vnf_id"] = vnfr_id
1822 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
1824 if vca_type
== "native_charm":
1827 index_number
= vdu_index
or 0
1830 element_type
= "VNF"
1831 element_under_configuration
= vnfr_id
1832 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
1834 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
1835 element_type
= "VDU"
1836 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
1837 osm_config
["osm"]["vdu_id"] = vdu_id
1839 namespace
+= ".{}".format(kdu_name
)
1840 element_type
= "KDU"
1841 element_under_configuration
= kdu_name
1842 osm_config
["osm"]["kdu_name"] = kdu_name
1845 if base_folder
["pkg-dir"]:
1846 artifact_path
= "{}/{}/{}/{}".format(
1847 base_folder
["folder"],
1848 base_folder
["pkg-dir"],
1851 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1856 artifact_path
= "{}/Scripts/{}/{}/".format(
1857 base_folder
["folder"],
1860 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1865 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1867 # get initial_config_primitive_list that applies to this element
1868 initial_config_primitive_list
= config_descriptor
.get(
1869 "initial-config-primitive"
1873 "Initial config primitive list > {}".format(
1874 initial_config_primitive_list
1878 # add config if not present for NS charm
1879 ee_descriptor_id
= ee_config_descriptor
.get("id")
1880 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1881 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
1882 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
1886 "Initial config primitive list #2 > {}".format(
1887 initial_config_primitive_list
1890 # n2vc_redesign STEP 3.1
1891 # find old ee_id if exists
1892 ee_id
= vca_deployed
.get("ee_id")
1894 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
1895 # create or register execution environment in VCA
1896 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1898 self
._write
_configuration
_status
(
1900 vca_index
=vca_index
,
1902 element_under_configuration
=element_under_configuration
,
1903 element_type
=element_type
,
1906 step
= "create execution environment"
1907 self
.logger
.debug(logging_text
+ step
)
1911 if vca_type
== "k8s_proxy_charm":
1912 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1913 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1 :],
1914 namespace
=namespace
,
1915 artifact_path
=artifact_path
,
1919 elif vca_type
== "helm" or vca_type
== "helm-v3":
1920 ee_id
, credentials
= await self
.vca_map
[
1922 ].create_execution_environment(
1923 namespace
=namespace
,
1927 artifact_path
=artifact_path
,
1931 ee_id
, credentials
= await self
.vca_map
[
1933 ].create_execution_environment(
1934 namespace
=namespace
,
1940 elif vca_type
== "native_charm":
1941 step
= "Waiting to VM being up and getting IP address"
1942 self
.logger
.debug(logging_text
+ step
)
1943 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
1952 credentials
= {"hostname": rw_mgmt_ip
}
1954 username
= deep_get(
1955 config_descriptor
, ("config-access", "ssh-access", "default-user")
1957 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1958 # merged. Meanwhile let's get username from initial-config-primitive
1959 if not username
and initial_config_primitive_list
:
1960 for config_primitive
in initial_config_primitive_list
:
1961 for param
in config_primitive
.get("parameter", ()):
1962 if param
["name"] == "ssh-username":
1963 username
= param
["value"]
1967 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1968 "'config-access.ssh-access.default-user'"
1970 credentials
["username"] = username
1971 # n2vc_redesign STEP 3.2
1973 self
._write
_configuration
_status
(
1975 vca_index
=vca_index
,
1976 status
="REGISTERING",
1977 element_under_configuration
=element_under_configuration
,
1978 element_type
=element_type
,
1981 step
= "register execution environment {}".format(credentials
)
1982 self
.logger
.debug(logging_text
+ step
)
1983 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1984 credentials
=credentials
,
1985 namespace
=namespace
,
1990 # for compatibility with MON/POL modules, the need model and application name at database
1991 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1992 ee_id_parts
= ee_id
.split(".")
1993 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1994 if len(ee_id_parts
) >= 2:
1995 model_name
= ee_id_parts
[0]
1996 application_name
= ee_id_parts
[1]
1997 db_nsr_update
[db_update_entry
+ "model"] = model_name
1998 db_nsr_update
[db_update_entry
+ "application"] = application_name
2000 # n2vc_redesign STEP 3.3
2001 step
= "Install configuration Software"
2003 self
._write
_configuration
_status
(
2005 vca_index
=vca_index
,
2006 status
="INSTALLING SW",
2007 element_under_configuration
=element_under_configuration
,
2008 element_type
=element_type
,
2009 other_update
=db_nsr_update
,
2012 # TODO check if already done
2013 self
.logger
.debug(logging_text
+ step
)
2015 if vca_type
== "native_charm":
2016 config_primitive
= next(
2017 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
2020 if config_primitive
:
2021 config
= self
._map
_primitive
_params
(
2022 config_primitive
, {}, deploy_params
2025 if vca_type
== "lxc_proxy_charm":
2026 if element_type
== "NS":
2027 num_units
= db_nsr
.get("config-units") or 1
2028 elif element_type
== "VNF":
2029 num_units
= db_vnfr
.get("config-units") or 1
2030 elif element_type
== "VDU":
2031 for v
in db_vnfr
["vdur"]:
2032 if vdu_id
== v
["vdu-id-ref"]:
2033 num_units
= v
.get("config-units") or 1
2035 if vca_type
!= "k8s_proxy_charm":
2036 await self
.vca_map
[vca_type
].install_configuration_sw(
2038 artifact_path
=artifact_path
,
2041 num_units
=num_units
,
2046 # write in db flag of configuration_sw already installed
2048 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
2051 # add relations for this VCA (wait for other peers related with this VCA)
2052 await self
._add
_vca
_relations
(
2053 logging_text
=logging_text
,
2056 vca_index
=vca_index
,
2059 # if SSH access is required, then get execution environment SSH public
2060 # if native charm we have waited already to VM be UP
2061 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
2064 # self.logger.debug("get ssh key block")
2066 config_descriptor
, ("config-access", "ssh-access", "required")
2068 # self.logger.debug("ssh key needed")
2069 # Needed to inject a ssh key
2072 ("config-access", "ssh-access", "default-user"),
2074 step
= "Install configuration Software, getting public ssh key"
2075 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
2076 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
2079 step
= "Insert public key into VM user={} ssh_key={}".format(
2083 # self.logger.debug("no need to get ssh key")
2084 step
= "Waiting to VM being up and getting IP address"
2085 self
.logger
.debug(logging_text
+ step
)
2087 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2090 # n2vc_redesign STEP 5.1
2091 # wait for RO (ip-address) Insert pub_key into VM
2094 rw_mgmt_ip
, services
= await self
.wait_kdu_up(
2095 logging_text
, nsr_id
, vnfr_id
, kdu_name
2097 vnfd
= self
.db
.get_one(
2099 {"_id": f
'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2101 kdu
= get_kdu(vnfd
, kdu_name
)
2103 service
["name"] for service
in get_kdu_services(kdu
)
2105 exposed_services
= []
2106 for service
in services
:
2107 if any(s
in service
["name"] for s
in kdu_services
):
2108 exposed_services
.append(service
)
2109 await self
.vca_map
[vca_type
].exec_primitive(
2111 primitive_name
="config",
2113 "osm-config": json
.dumps(
2115 k8s
={"services": exposed_services
}
2122 # This verification is needed in order to avoid trying to add a public key
2123 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2124 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2125 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2127 elif db_vnfr
.get('vdur'):
2128 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
2138 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
2140 # store rw_mgmt_ip in deploy params for later replacement
2141 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
2143 # n2vc_redesign STEP 6 Execute initial config primitive
2144 step
= "execute initial config primitive"
2146 # wait for dependent primitives execution (NS -> VNF -> VDU)
2147 if initial_config_primitive_list
:
2148 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
2150 # stage, in function of element type: vdu, kdu, vnf or ns
2151 my_vca
= vca_deployed_list
[vca_index
]
2152 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
2154 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
2155 elif my_vca
.get("member-vnf-index"):
2157 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
2160 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
2162 self
._write
_configuration
_status
(
2163 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
2166 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2168 check_if_terminated_needed
= True
2169 for initial_config_primitive
in initial_config_primitive_list
:
2170 # adding information on the vca_deployed if it is a NS execution environment
2171 if not vca_deployed
["member-vnf-index"]:
2172 deploy_params
["ns_config_info"] = json
.dumps(
2173 self
._get
_ns
_config
_info
(nsr_id
)
2175 # TODO check if already done
2176 primitive_params_
= self
._map
_primitive
_params
(
2177 initial_config_primitive
, {}, deploy_params
2180 step
= "execute primitive '{}' params '{}'".format(
2181 initial_config_primitive
["name"], primitive_params_
2183 self
.logger
.debug(logging_text
+ step
)
2184 await self
.vca_map
[vca_type
].exec_primitive(
2186 primitive_name
=initial_config_primitive
["name"],
2187 params_dict
=primitive_params_
,
2192 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2193 if check_if_terminated_needed
:
2194 if config_descriptor
.get("terminate-config-primitive"):
2196 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
2198 check_if_terminated_needed
= False
2200 # TODO register in database that primitive is done
2202 # STEP 7 Configure metrics
2203 if vca_type
== "helm" or vca_type
== "helm-v3":
2204 prometheus_jobs
= await self
.extract_prometheus_scrape_jobs(
2206 artifact_path
=artifact_path
,
2207 ee_config_descriptor
=ee_config_descriptor
,
2210 target_ip
=rw_mgmt_ip
,
2216 {db_update_entry
+ "prometheus_jobs": prometheus_jobs
},
2219 for job
in prometheus_jobs
:
2222 {"job_name": job
["job_name"]},
2225 fail_on_empty
=False,
2228 step
= "instantiated at VCA"
2229 self
.logger
.debug(logging_text
+ step
)
2231 self
._write
_configuration
_status
(
2232 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
2235 except Exception as e
: # TODO not use Exception but N2VC exception
2236 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2238 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
2241 "Exception while {} : {}".format(step
, e
), exc_info
=True
2243 self
._write
_configuration
_status
(
2244 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
2246 raise LcmException("{} {}".format(step
, e
)) from e
2248 def _write_ns_status(
2252 current_operation
: str,
2253 current_operation_id
: str,
2254 error_description
: str = None,
2255 error_detail
: str = None,
2256 other_update
: dict = None,
2259 Update db_nsr fields.
2262 :param current_operation:
2263 :param current_operation_id:
2264 :param error_description:
2265 :param error_detail:
2266 :param other_update: Other required changes at database if provided, will be cleared
2270 db_dict
= other_update
or {}
2273 ] = current_operation_id
# for backward compatibility
2274 db_dict
["_admin.current-operation"] = current_operation_id
2275 db_dict
["_admin.operation-type"] = (
2276 current_operation
if current_operation
!= "IDLE" else None
2278 db_dict
["currentOperation"] = current_operation
2279 db_dict
["currentOperationID"] = current_operation_id
2280 db_dict
["errorDescription"] = error_description
2281 db_dict
["errorDetail"] = error_detail
2284 db_dict
["nsState"] = ns_state
2285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2286 except DbException
as e
:
2287 self
.logger
.warn("Error writing NS status, ns={}: {}".format(nsr_id
, e
))
2289 def _write_op_status(
2293 error_message
: str = None,
2294 queuePosition
: int = 0,
2295 operation_state
: str = None,
2296 other_update
: dict = None,
2299 db_dict
= other_update
or {}
2300 db_dict
["queuePosition"] = queuePosition
2301 if isinstance(stage
, list):
2302 db_dict
["stage"] = stage
[0]
2303 db_dict
["detailed-status"] = " ".join(stage
)
2304 elif stage
is not None:
2305 db_dict
["stage"] = str(stage
)
2307 if error_message
is not None:
2308 db_dict
["errorMessage"] = error_message
2309 if operation_state
is not None:
2310 db_dict
["operationState"] = operation_state
2311 db_dict
["statusEnteredTime"] = time()
2312 self
.update_db_2("nslcmops", op_id
, db_dict
)
2313 except DbException
as e
:
2315 "Error writing OPERATION status for op_id: {} -> {}".format(op_id
, e
)
2318 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
2320 nsr_id
= db_nsr
["_id"]
2321 # configurationStatus
2322 config_status
= db_nsr
.get("configurationStatus")
2325 "configurationStatus.{}.status".format(index
): status
2326 for index
, v
in enumerate(config_status
)
2330 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2332 except DbException
as e
:
2334 "Error writing all configuration status, ns={}: {}".format(nsr_id
, e
)
2337 def _write_configuration_status(
2342 element_under_configuration
: str = None,
2343 element_type
: str = None,
2344 other_update
: dict = None,
2347 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2348 # .format(vca_index, status))
2351 db_path
= "configurationStatus.{}.".format(vca_index
)
2352 db_dict
= other_update
or {}
2354 db_dict
[db_path
+ "status"] = status
2355 if element_under_configuration
:
2357 db_path
+ "elementUnderConfiguration"
2358 ] = element_under_configuration
2360 db_dict
[db_path
+ "elementType"] = element_type
2361 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2362 except DbException
as e
:
2364 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2365 status
, nsr_id
, vca_index
, e
2369 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
2371 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2372 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2373 Database is used because the result can be obtained from a different LCM worker in case of HA.
2374 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2375 :param db_nslcmop: database content of nslcmop
2376 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2377 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2378 computed 'vim-account-id'
2381 nslcmop_id
= db_nslcmop
["_id"]
2382 placement_engine
= deep_get(db_nslcmop
, ("operationParams", "placement-engine"))
2383 if placement_engine
== "PLA":
2385 logging_text
+ "Invoke and wait for placement optimization"
2387 await self
.msg
.aiowrite(
2388 "pla", "get_placement", {"nslcmopId": nslcmop_id
}, loop
=self
.loop
2390 db_poll_interval
= 5
2391 wait
= db_poll_interval
* 10
2393 while not pla_result
and wait
>= 0:
2394 await asyncio
.sleep(db_poll_interval
)
2395 wait
-= db_poll_interval
2396 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2397 pla_result
= deep_get(db_nslcmop
, ("_admin", "pla"))
2401 "Placement timeout for nslcmopId={}".format(nslcmop_id
)
2404 for pla_vnf
in pla_result
["vnf"]:
2405 vnfr
= db_vnfrs
.get(pla_vnf
["member-vnf-index"])
2406 if not pla_vnf
.get("vimAccountId") or not vnfr
:
2411 {"_id": vnfr
["_id"]},
2412 {"vim-account-id": pla_vnf
["vimAccountId"]},
2415 vnfr
["vim-account-id"] = pla_vnf
["vimAccountId"]
2418 def update_nsrs_with_pla_result(self
, params
):
2420 nslcmop_id
= deep_get(params
, ("placement", "nslcmopId"))
2422 "nslcmops", nslcmop_id
, {"_admin.pla": params
.get("placement")}
2424 except Exception as e
:
2425 self
.logger
.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id
, e
))
2427 async def instantiate(self
, nsr_id
, nslcmop_id
):
2430 :param nsr_id: ns instance to deploy
2431 :param nslcmop_id: operation to run
2435 # Try to lock HA task here
2436 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
2437 if not task_is_locked_by_me
:
2439 "instantiate() task is not locked by me, ns={}".format(nsr_id
)
2443 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
2444 self
.logger
.debug(logging_text
+ "Enter")
2446 # get all needed from database
2448 # database nsrs record
2451 # database nslcmops record
2454 # update operation on nsrs
2456 # update operation on nslcmops
2457 db_nslcmop_update
= {}
2459 nslcmop_operation_state
= None
2460 db_vnfrs
= {} # vnf's info indexed by member-index
2462 tasks_dict_info
= {} # from task to info text
2466 "Stage 1/5: preparation of the environment.",
2467 "Waiting for previous operations to terminate.",
2470 # ^ stage, step, VIM progress
2472 # wait for any previous tasks in process
2473 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
2475 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2476 stage
[1] = "Reading from database."
2477 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2478 db_nsr_update
["detailed-status"] = "creating"
2479 db_nsr_update
["operational-status"] = "init"
2480 self
._write
_ns
_status
(
2482 ns_state
="BUILDING",
2483 current_operation
="INSTANTIATING",
2484 current_operation_id
=nslcmop_id
,
2485 other_update
=db_nsr_update
,
2487 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
2489 # read from db: operation
2490 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2491 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2492 if db_nslcmop
["operationParams"].get("additionalParamsForVnf"):
2493 db_nslcmop
["operationParams"]["additionalParamsForVnf"] = json
.loads(
2494 db_nslcmop
["operationParams"]["additionalParamsForVnf"]
2496 ns_params
= db_nslcmop
.get("operationParams")
2497 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2498 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2500 timeout_ns_deploy
= self
.timeout
.get(
2501 "ns_deploy", self
.timeout_ns_deploy
2505 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2506 self
.logger
.debug(logging_text
+ stage
[1])
2507 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2508 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2509 self
.logger
.debug(logging_text
+ stage
[1])
2510 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2511 self
.fs
.sync(db_nsr
["nsd-id"])
2513 # nsr_name = db_nsr["name"] # TODO short-name??
2515 # read from db: vnf's of this ns
2516 stage
[1] = "Getting vnfrs from db."
2517 self
.logger
.debug(logging_text
+ stage
[1])
2518 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2520 # read from db: vnfd's for every vnf
2521 db_vnfds
= [] # every vnfd data
2523 # for each vnf in ns, read vnfd
2524 for vnfr
in db_vnfrs_list
:
2525 if vnfr
.get("kdur"):
2527 for kdur
in vnfr
["kdur"]:
2528 if kdur
.get("additionalParams"):
2529 kdur
["additionalParams"] = json
.loads(
2530 kdur
["additionalParams"]
2532 kdur_list
.append(kdur
)
2533 vnfr
["kdur"] = kdur_list
2535 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
2536 vnfd_id
= vnfr
["vnfd-id"]
2537 vnfd_ref
= vnfr
["vnfd-ref"]
2538 self
.fs
.sync(vnfd_id
)
2540 # if we haven't this vnfd, read it from db
2541 if vnfd_id
not in db_vnfds
:
2543 stage
[1] = "Getting vnfd={} id='{}' from db.".format(
2546 self
.logger
.debug(logging_text
+ stage
[1])
2547 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2550 db_vnfds
.append(vnfd
)
2552 # Get or generates the _admin.deployed.VCA list
2553 vca_deployed_list
= None
2554 if db_nsr
["_admin"].get("deployed"):
2555 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2556 if vca_deployed_list
is None:
2557 vca_deployed_list
= []
2558 configuration_status_list
= []
2559 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2560 db_nsr_update
["configurationStatus"] = configuration_status_list
2561 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2562 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2563 elif isinstance(vca_deployed_list
, dict):
2564 # maintain backward compatibility. Change a dict to list at database
2565 vca_deployed_list
= list(vca_deployed_list
.values())
2566 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2567 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2570 deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list
2572 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2573 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2575 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2576 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2577 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2579 "vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"}
2582 # n2vc_redesign STEP 2 Deploy Network Scenario
2583 stage
[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2584 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
2586 stage
[1] = "Deploying KDUs."
2587 # self.logger.debug(logging_text + "Before deploy_kdus")
2588 # Call to deploy_kdus in case exists the "vdu:kdu" param
2589 await self
.deploy_kdus(
2590 logging_text
=logging_text
,
2592 nslcmop_id
=nslcmop_id
,
2595 task_instantiation_info
=tasks_dict_info
,
2598 stage
[1] = "Getting VCA public key."
2599 # n2vc_redesign STEP 1 Get VCA public ssh-key
2600 # feature 1429. Add n2vc public key to needed VMs
2601 n2vc_key
= self
.n2vc
.get_public_key()
2602 n2vc_key_list
= [n2vc_key
]
2603 if self
.vca_config
.get("public_key"):
2604 n2vc_key_list
.append(self
.vca_config
["public_key"])
2606 stage
[1] = "Deploying NS at VIM."
2607 task_ro
= asyncio
.ensure_future(
2608 self
.instantiate_RO(
2609 logging_text
=logging_text
,
2613 db_nslcmop
=db_nslcmop
,
2616 n2vc_key_list
=n2vc_key_list
,
2620 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2621 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2623 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2624 stage
[1] = "Deploying Execution Environments."
2625 self
.logger
.debug(logging_text
+ stage
[1])
2627 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2628 for vnf_profile
in get_vnf_profiles(nsd
):
2629 vnfd_id
= vnf_profile
["vnfd-id"]
2630 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
2631 member_vnf_index
= str(vnf_profile
["id"])
2632 db_vnfr
= db_vnfrs
[member_vnf_index
]
2633 base_folder
= vnfd
["_admin"]["storage"]
2639 # Get additional parameters
2640 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
2641 if db_vnfr
.get("additionalParamsForVnf"):
2642 deploy_params
.update(
2643 parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy())
2646 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
2647 if descriptor_config
:
2649 logging_text
=logging_text
2650 + "member_vnf_index={} ".format(member_vnf_index
),
2653 nslcmop_id
=nslcmop_id
,
2659 member_vnf_index
=member_vnf_index
,
2660 vdu_index
=vdu_index
,
2662 deploy_params
=deploy_params
,
2663 descriptor_config
=descriptor_config
,
2664 base_folder
=base_folder
,
2665 task_instantiation_info
=tasks_dict_info
,
2669 # Deploy charms for each VDU that supports one.
2670 for vdud
in get_vdu_list(vnfd
):
2672 descriptor_config
= get_configuration(vnfd
, vdu_id
)
2673 vdur
= find_in_list(
2674 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
2677 if vdur
.get("additionalParams"):
2678 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
2680 deploy_params_vdu
= deploy_params
2681 deploy_params_vdu
["OSM"] = get_osm_params(
2682 db_vnfr
, vdu_id
, vdu_count_index
=0
2684 vdud_count
= get_number_of_instances(vnfd
, vdu_id
)
2686 self
.logger
.debug("VDUD > {}".format(vdud
))
2688 "Descriptor config > {}".format(descriptor_config
)
2690 if descriptor_config
:
2693 for vdu_index
in range(vdud_count
):
2694 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2696 logging_text
=logging_text
2697 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2698 member_vnf_index
, vdu_id
, vdu_index
2702 nslcmop_id
=nslcmop_id
,
2708 member_vnf_index
=member_vnf_index
,
2709 vdu_index
=vdu_index
,
2711 deploy_params
=deploy_params_vdu
,
2712 descriptor_config
=descriptor_config
,
2713 base_folder
=base_folder
,
2714 task_instantiation_info
=tasks_dict_info
,
2717 for kdud
in get_kdu_list(vnfd
):
2718 kdu_name
= kdud
["name"]
2719 descriptor_config
= get_configuration(vnfd
, kdu_name
)
2720 if descriptor_config
:
2725 x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
2727 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
2728 if kdur
.get("additionalParams"):
2729 deploy_params_kdu
.update(
2730 parse_yaml_strings(kdur
["additionalParams"].copy())
2734 logging_text
=logging_text
,
2737 nslcmop_id
=nslcmop_id
,
2743 member_vnf_index
=member_vnf_index
,
2744 vdu_index
=vdu_index
,
2746 deploy_params
=deploy_params_kdu
,
2747 descriptor_config
=descriptor_config
,
2748 base_folder
=base_folder
,
2749 task_instantiation_info
=tasks_dict_info
,
2753 # Check if this NS has a charm configuration
2754 descriptor_config
= nsd
.get("ns-configuration")
2755 if descriptor_config
and descriptor_config
.get("juju"):
2758 member_vnf_index
= None
2764 # Get additional parameters
2765 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
2766 if db_nsr
.get("additionalParamsForNs"):
2767 deploy_params
.update(
2768 parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy())
2770 base_folder
= nsd
["_admin"]["storage"]
2772 logging_text
=logging_text
,
2775 nslcmop_id
=nslcmop_id
,
2781 member_vnf_index
=member_vnf_index
,
2782 vdu_index
=vdu_index
,
2784 deploy_params
=deploy_params
,
2785 descriptor_config
=descriptor_config
,
2786 base_folder
=base_folder
,
2787 task_instantiation_info
=tasks_dict_info
,
2791 # rest of staff will be done at finally
2794 ROclient
.ROClientException
,
2800 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
)
2803 except asyncio
.CancelledError
:
2805 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
2807 exc
= "Operation was cancelled"
2808 except Exception as e
:
2809 exc
= traceback
.format_exc()
2810 self
.logger
.critical(
2811 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
2816 error_list
.append(str(exc
))
2818 # wait for pending tasks
2820 stage
[1] = "Waiting for instantiate pending tasks."
2821 self
.logger
.debug(logging_text
+ stage
[1])
2822 error_list
+= await self
._wait
_for
_tasks
(
2830 stage
[1] = stage
[2] = ""
2831 except asyncio
.CancelledError
:
2832 error_list
.append("Cancelled")
2833 # TODO cancel all tasks
2834 except Exception as exc
:
2835 error_list
.append(str(exc
))
2837 # update operation-status
2838 db_nsr_update
["operational-status"] = "running"
2839 # let's begin with VCA 'configured' status (later we can change it)
2840 db_nsr_update
["config-status"] = "configured"
2841 for task
, task_name
in tasks_dict_info
.items():
2842 if not task
.done() or task
.cancelled() or task
.exception():
2843 if task_name
.startswith(self
.task_name_deploy_vca
):
2844 # A N2VC task is pending
2845 db_nsr_update
["config-status"] = "failed"
2847 # RO or KDU task is pending
2848 db_nsr_update
["operational-status"] = "failed"
2850 # update status at database
2852 error_detail
= ". ".join(error_list
)
2853 self
.logger
.error(logging_text
+ error_detail
)
2854 error_description_nslcmop
= "{} Detail: {}".format(
2855 stage
[0], error_detail
2857 error_description_nsr
= "Operation: INSTANTIATING.{}, {}".format(
2858 nslcmop_id
, stage
[0]
2861 db_nsr_update
["detailed-status"] = (
2862 error_description_nsr
+ " Detail: " + error_detail
2864 db_nslcmop_update
["detailed-status"] = error_detail
2865 nslcmop_operation_state
= "FAILED"
2869 error_description_nsr
= error_description_nslcmop
= None
2871 db_nsr_update
["detailed-status"] = "Done"
2872 db_nslcmop_update
["detailed-status"] = "Done"
2873 nslcmop_operation_state
= "COMPLETED"
2876 self
._write
_ns
_status
(
2879 current_operation
="IDLE",
2880 current_operation_id
=None,
2881 error_description
=error_description_nsr
,
2882 error_detail
=error_detail
,
2883 other_update
=db_nsr_update
,
2885 self
._write
_op
_status
(
2888 error_message
=error_description_nslcmop
,
2889 operation_state
=nslcmop_operation_state
,
2890 other_update
=db_nslcmop_update
,
2893 if nslcmop_operation_state
:
2895 await self
.msg
.aiowrite(
2900 "nslcmop_id": nslcmop_id
,
2901 "operationState": nslcmop_operation_state
,
2905 except Exception as e
:
2907 logging_text
+ "kafka_write notification Exception {}".format(e
)
2910 self
.logger
.debug(logging_text
+ "Exit")
2911 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2913 def _get_vnfd(self
, vnfd_id
: str, projects_read
: str, cached_vnfds
: Dict
[str, Any
]):
2914 if vnfd_id
not in cached_vnfds
:
2915 cached_vnfds
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
, "_admin.projects_read": projects_read
})
2916 return cached_vnfds
[vnfd_id
]
2918 def _get_vnfr(self
, nsr_id
: str, vnf_profile_id
: str, cached_vnfrs
: Dict
[str, Any
]):
2919 if vnf_profile_id
not in cached_vnfrs
:
2920 cached_vnfrs
[vnf_profile_id
] = self
.db
.get_one(
2923 "member-vnf-index-ref": vnf_profile_id
,
2924 "nsr-id-ref": nsr_id
,
2927 return cached_vnfrs
[vnf_profile_id
]
2929 def _is_deployed_vca_in_relation(
2930 self
, vca
: DeployedVCA
, relation
: Relation
2933 for endpoint
in (relation
.provider
, relation
.requirer
):
2934 if endpoint
["kdu-resource-profile-id"]:
2937 vca
.vnf_profile_id
== endpoint
.vnf_profile_id
2938 and vca
.vdu_profile_id
== endpoint
.vdu_profile_id
2939 and vca
.execution_environment_ref
== endpoint
.execution_environment_ref
2945 def _update_ee_relation_data_with_implicit_data(
2946 self
, nsr_id
, nsd
, ee_relation_data
, cached_vnfds
, vnf_profile_id
: str = None
2948 ee_relation_data
= safe_get_ee_relation(
2949 nsr_id
, ee_relation_data
, vnf_profile_id
=vnf_profile_id
2951 ee_relation_level
= EELevel
.get_level(ee_relation_data
)
2952 if (ee_relation_level
in (EELevel
.VNF
, EELevel
.VDU
)) and not ee_relation_data
[
2953 "execution-environment-ref"
2955 vnf_profile
= get_vnf_profile(nsd
, ee_relation_data
["vnf-profile-id"])
2956 vnfd_id
= vnf_profile
["vnfd-id"]
2957 project
= nsd
["_admin"]["projects_read"][0]
2958 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
2961 if ee_relation_level
== EELevel
.VNF
2962 else ee_relation_data
["vdu-profile-id"]
2964 ee
= get_juju_ee_ref(db_vnfd
, entity_id
)
2967 f
"not execution environments found for ee_relation {ee_relation_data}"
2969 ee_relation_data
["execution-environment-ref"] = ee
["id"]
2970 return ee_relation_data
2972 def _get_ns_relations(
2975 nsd
: Dict
[str, Any
],
2977 cached_vnfds
: Dict
[str, Any
],
2978 ) -> List
[Relation
]:
2980 db_ns_relations
= get_ns_configuration_relation_list(nsd
)
2981 for r
in db_ns_relations
:
2982 provider_dict
= None
2983 requirer_dict
= None
2984 if all(key
in r
for key
in ("provider", "requirer")):
2985 provider_dict
= r
["provider"]
2986 requirer_dict
= r
["requirer"]
2987 elif "entities" in r
:
2988 provider_id
= r
["entities"][0]["id"]
2991 "endpoint": r
["entities"][0]["endpoint"],
2993 if provider_id
!= nsd
["id"]:
2994 provider_dict
["vnf-profile-id"] = provider_id
2995 requirer_id
= r
["entities"][1]["id"]
2998 "endpoint": r
["entities"][1]["endpoint"],
3000 if requirer_id
!= nsd
["id"]:
3001 requirer_dict
["vnf-profile-id"] = requirer_id
3004 "provider/requirer or entities must be included in the relation."
3006 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3007 nsr_id
, nsd
, provider_dict
, cached_vnfds
3009 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3010 nsr_id
, nsd
, requirer_dict
, cached_vnfds
3012 provider
= EERelation(relation_provider
)
3013 requirer
= EERelation(relation_requirer
)
3014 relation
= Relation(r
["name"], provider
, requirer
)
3015 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3017 relations
.append(relation
)
3020 def _get_vnf_relations(
3023 nsd
: Dict
[str, Any
],
3025 cached_vnfds
: Dict
[str, Any
],
3026 ) -> List
[Relation
]:
3028 vnf_profile
= get_vnf_profile(nsd
, vca
.vnf_profile_id
)
3029 vnf_profile_id
= vnf_profile
["id"]
3030 vnfd_id
= vnf_profile
["vnfd-id"]
3031 project
= nsd
["_admin"]["projects_read"][0]
3032 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3033 db_vnf_relations
= get_relation_list(db_vnfd
, vnfd_id
)
3034 for r
in db_vnf_relations
:
3035 provider_dict
= None
3036 requirer_dict
= None
3037 if all(key
in r
for key
in ("provider", "requirer")):
3038 provider_dict
= r
["provider"]
3039 requirer_dict
= r
["requirer"]
3040 elif "entities" in r
:
3041 provider_id
= r
["entities"][0]["id"]
3044 "vnf-profile-id": vnf_profile_id
,
3045 "endpoint": r
["entities"][0]["endpoint"],
3047 if provider_id
!= vnfd_id
:
3048 provider_dict
["vdu-profile-id"] = provider_id
3049 requirer_id
= r
["entities"][1]["id"]
3052 "vnf-profile-id": vnf_profile_id
,
3053 "endpoint": r
["entities"][1]["endpoint"],
3055 if requirer_id
!= vnfd_id
:
3056 requirer_dict
["vdu-profile-id"] = requirer_id
3059 "provider/requirer or entities must be included in the relation."
3061 relation_provider
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3062 nsr_id
, nsd
, provider_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3064 relation_requirer
= self
._update
_ee
_relation
_data
_with
_implicit
_data
(
3065 nsr_id
, nsd
, requirer_dict
, cached_vnfds
, vnf_profile_id
=vnf_profile_id
3067 provider
= EERelation(relation_provider
)
3068 requirer
= EERelation(relation_requirer
)
3069 relation
= Relation(r
["name"], provider
, requirer
)
3070 vca_in_relation
= self
._is
_deployed
_vca
_in
_relation
(vca
, relation
)
3072 relations
.append(relation
)
3075 def _get_kdu_resource_data(
3077 ee_relation
: EERelation
,
3078 db_nsr
: Dict
[str, Any
],
3079 cached_vnfds
: Dict
[str, Any
],
3080 ) -> DeployedK8sResource
:
3081 nsd
= get_nsd(db_nsr
)
3082 vnf_profiles
= get_vnf_profiles(nsd
)
3083 vnfd_id
= find_in_list(
3085 lambda vnf_profile
: vnf_profile
["id"] == ee_relation
.vnf_profile_id
,
3087 project
= nsd
["_admin"]["projects_read"][0]
3088 db_vnfd
= self
._get
_vnfd
(vnfd_id
, project
, cached_vnfds
)
3089 kdu_resource_profile
= get_kdu_resource_profile(
3090 db_vnfd
, ee_relation
.kdu_resource_profile_id
3092 kdu_name
= kdu_resource_profile
["kdu-name"]
3093 deployed_kdu
, _
= get_deployed_kdu(
3094 db_nsr
.get("_admin", ()).get("deployed", ()),
3096 ee_relation
.vnf_profile_id
,
3098 deployed_kdu
.update({"resource-name": kdu_resource_profile
["resource-name"]})
3101 def _get_deployed_component(
3103 ee_relation
: EERelation
,
3104 db_nsr
: Dict
[str, Any
],
3105 cached_vnfds
: Dict
[str, Any
],
3106 ) -> DeployedComponent
:
3107 nsr_id
= db_nsr
["_id"]
3108 deployed_component
= None
3109 ee_level
= EELevel
.get_level(ee_relation
)
3110 if ee_level
== EELevel
.NS
:
3111 vca
= get_deployed_vca(db_nsr
, {"vdu_id": None, "member-vnf-index": None})
3113 deployed_component
= DeployedVCA(nsr_id
, vca
)
3114 elif ee_level
== EELevel
.VNF
:
3115 vca
= get_deployed_vca(
3119 "member-vnf-index": ee_relation
.vnf_profile_id
,
3120 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3124 deployed_component
= DeployedVCA(nsr_id
, vca
)
3125 elif ee_level
== EELevel
.VDU
:
3126 vca
= get_deployed_vca(
3129 "vdu_id": ee_relation
.vdu_profile_id
,
3130 "member-vnf-index": ee_relation
.vnf_profile_id
,
3131 "ee_descriptor_id": ee_relation
.execution_environment_ref
,
3135 deployed_component
= DeployedVCA(nsr_id
, vca
)
3136 elif ee_level
== EELevel
.KDU
:
3137 kdu_resource_data
= self
._get
_kdu
_resource
_data
(
3138 ee_relation
, db_nsr
, cached_vnfds
3140 if kdu_resource_data
:
3141 deployed_component
= DeployedK8sResource(kdu_resource_data
)
3142 return deployed_component
3144 async def _add_relation(
3148 db_nsr
: Dict
[str, Any
],
3149 cached_vnfds
: Dict
[str, Any
],
3150 cached_vnfrs
: Dict
[str, Any
],
3152 deployed_provider
= self
._get
_deployed
_component
(
3153 relation
.provider
, db_nsr
, cached_vnfds
3155 deployed_requirer
= self
._get
_deployed
_component
(
3156 relation
.requirer
, db_nsr
, cached_vnfds
3160 and deployed_requirer
3161 and deployed_provider
.config_sw_installed
3162 and deployed_requirer
.config_sw_installed
3164 provider_db_vnfr
= (
3166 relation
.provider
.nsr_id
,
3167 relation
.provider
.vnf_profile_id
,
3170 if relation
.provider
.vnf_profile_id
3173 requirer_db_vnfr
= (
3175 relation
.requirer
.nsr_id
,
3176 relation
.requirer
.vnf_profile_id
,
3179 if relation
.requirer
.vnf_profile_id
3182 provider_vca_id
= self
.get_vca_id(provider_db_vnfr
, db_nsr
)
3183 requirer_vca_id
= self
.get_vca_id(requirer_db_vnfr
, db_nsr
)
3184 provider_relation_endpoint
= RelationEndpoint(
3185 deployed_provider
.ee_id
,
3187 relation
.provider
.endpoint
,
3189 requirer_relation_endpoint
= RelationEndpoint(
3190 deployed_requirer
.ee_id
,
3192 relation
.requirer
.endpoint
,
3194 await self
.vca_map
[vca_type
].add_relation(
3195 provider
=provider_relation_endpoint
,
3196 requirer
=requirer_relation_endpoint
,
3198 # remove entry from relations list
3202 async def _add_vca_relations(
3208 timeout
: int = 3600,
3212 # 1. find all relations for this VCA
3213 # 2. wait for other peers related
3217 # STEP 1: find all relations for this VCA
3220 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3221 nsd
= get_nsd(db_nsr
)
3224 deployed_vca_dict
= get_deployed_vca_list(db_nsr
)[vca_index
]
3225 my_vca
= DeployedVCA(nsr_id
, deployed_vca_dict
)
3230 relations
.extend(self
._get
_ns
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3231 relations
.extend(self
._get
_vnf
_relations
(nsr_id
, nsd
, my_vca
, cached_vnfds
))
3233 # if no relations, terminate
3235 self
.logger
.debug(logging_text
+ " No relations")
3238 self
.logger
.debug(logging_text
+ " adding relations {}".format(relations
))
3245 if now
- start
>= timeout
:
3246 self
.logger
.error(logging_text
+ " : timeout adding relations")
3249 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3250 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3252 # for each relation, find the VCA's related
3253 for relation
in relations
.copy():
3254 added
= await self
._add
_relation
(
3262 relations
.remove(relation
)
3265 self
.logger
.debug("Relations added")
3267 await asyncio
.sleep(5.0)
3271 except Exception as e
:
3272 self
.logger
.warn(logging_text
+ " ERROR adding relations: {}".format(e
))
3275 async def _install_kdu(
3283 k8s_instance_info
: dict,
3284 k8params
: dict = None,
3290 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
3293 "collection": "nsrs",
3294 "filter": {"_id": nsr_id
},
3295 "path": nsr_db_path
,
3298 if k8s_instance_info
.get("kdu-deployment-name"):
3299 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
3301 kdu_instance
= self
.k8scluster_map
[
3303 ].generate_kdu_instance_name(
3304 db_dict
=db_dict_install
,
3305 kdu_model
=k8s_instance_info
["kdu-model"],
3306 kdu_name
=k8s_instance_info
["kdu-name"],
3309 # Update the nsrs table with the kdu-instance value
3313 _desc
={nsr_db_path
+ ".kdu-instance": kdu_instance
},
3316 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3317 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3318 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3319 # namespace, this first verification could be removed, and the next step would be done for any kind
3321 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3322 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3323 if k8sclustertype
in ("juju", "juju-bundle"):
3324 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3325 # that the user passed a namespace which he wants its KDU to be deployed in)
3331 "_admin.projects_write": k8s_instance_info
["namespace"],
3332 "_admin.projects_read": k8s_instance_info
["namespace"],
3338 f
"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3343 _desc
={f
"{nsr_db_path}.namespace": kdu_instance
},
3345 k8s_instance_info
["namespace"] = kdu_instance
3347 await self
.k8scluster_map
[k8sclustertype
].install(
3348 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3349 kdu_model
=k8s_instance_info
["kdu-model"],
3352 db_dict
=db_dict_install
,
3354 kdu_name
=k8s_instance_info
["kdu-name"],
3355 namespace
=k8s_instance_info
["namespace"],
3356 kdu_instance
=kdu_instance
,
3360 # Obtain services to obtain management service ip
3361 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
3362 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3363 kdu_instance
=kdu_instance
,
3364 namespace
=k8s_instance_info
["namespace"],
3367 # Obtain management service info (if exists)
3368 vnfr_update_dict
= {}
3369 kdu_config
= get_configuration(vnfd
, kdud
["name"])
3371 target_ee_list
= kdu_config
.get("execution-environment-list", [])
3376 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
3379 for service
in kdud
.get("service", [])
3380 if service
.get("mgmt-service")
3382 for mgmt_service
in mgmt_services
:
3383 for service
in services
:
3384 if service
["name"].startswith(mgmt_service
["name"]):
3385 # Mgmt service found, Obtain service ip
3386 ip
= service
.get("external_ip", service
.get("cluster_ip"))
3387 if isinstance(ip
, list) and len(ip
) == 1:
3391 "kdur.{}.ip-address".format(kdu_index
)
3394 # Check if must update also mgmt ip at the vnf
3395 service_external_cp
= mgmt_service
.get(
3396 "external-connection-point-ref"
3398 if service_external_cp
:
3400 deep_get(vnfd
, ("mgmt-interface", "cp"))
3401 == service_external_cp
3403 vnfr_update_dict
["ip-address"] = ip
3408 "external-connection-point-ref", ""
3410 == service_external_cp
,
3413 "kdur.{}.ip-address".format(kdu_index
)
3418 "Mgmt service name: {} not found".format(
3419 mgmt_service
["name"]
3423 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
3424 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
3426 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
3429 and kdu_config
.get("initial-config-primitive")
3430 and get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None
3432 initial_config_primitive_list
= kdu_config
.get(
3433 "initial-config-primitive"
3435 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
3437 for initial_config_primitive
in initial_config_primitive_list
:
3438 primitive_params_
= self
._map
_primitive
_params
(
3439 initial_config_primitive
, {}, {}
3442 await asyncio
.wait_for(
3443 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
3444 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
3445 kdu_instance
=kdu_instance
,
3446 primitive_name
=initial_config_primitive
["name"],
3447 params
=primitive_params_
,
3448 db_dict
=db_dict_install
,
3454 except Exception as e
:
3455 # Prepare update db with error and raise exception
3458 "nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)}
3462 vnfr_data
.get("_id"),
3463 {"kdur.{}.status".format(kdu_index
): "ERROR"},
3466 # ignore to keep original exception
3468 # reraise original error
3473 async def deploy_kdus(
3480 task_instantiation_info
,
3482 # Launch kdus if present in the descriptor
3484 k8scluster_id_2_uuic
= {
3485 "helm-chart-v3": {},
3490 async def _get_cluster_id(cluster_id
, cluster_type
):
3491 nonlocal k8scluster_id_2_uuic
3492 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
3493 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
3495 # check if K8scluster is creating and wait look if previous tasks in process
3496 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related(
3497 "k8scluster", cluster_id
3500 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3501 task_name
, cluster_id
3503 self
.logger
.debug(logging_text
+ text
)
3504 await asyncio
.wait(task_dependency
, timeout
=3600)
3506 db_k8scluster
= self
.db
.get_one(
3507 "k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False
3509 if not db_k8scluster
:
3510 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
3512 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
3514 if cluster_type
== "helm-chart-v3":
3516 # backward compatibility for existing clusters that have not been initialized for helm v3
3517 k8s_credentials
= yaml
.safe_dump(
3518 db_k8scluster
.get("credentials")
3520 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(
3521 k8s_credentials
, reuse_cluster_uuid
=cluster_id
3523 db_k8scluster_update
= {}
3524 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
3525 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
3526 db_k8scluster_update
[
3527 "_admin.helm-chart-v3.created"
3529 db_k8scluster_update
[
3530 "_admin.helm-chart-v3.operationalState"
3533 "k8sclusters", cluster_id
, db_k8scluster_update
3535 except Exception as e
:
3538 + "error initializing helm-v3 cluster: {}".format(str(e
))
3541 "K8s cluster '{}' has not been initialized for '{}'".format(
3542 cluster_id
, cluster_type
3547 "K8s cluster '{}' has not been initialized for '{}'".format(
3548 cluster_id
, cluster_type
3551 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
3554 logging_text
+= "Deploy kdus: "
3557 db_nsr_update
= {"_admin.deployed.K8s": []}
3558 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3561 updated_cluster_list
= []
3562 updated_v3_cluster_list
= []
3564 for vnfr_data
in db_vnfrs
.values():
3565 vca_id
= self
.get_vca_id(vnfr_data
, {})
3566 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
3567 # Step 0: Prepare and set parameters
3568 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3569 vnfd_id
= vnfr_data
.get("vnfd-id")
3570 vnfd_with_id
= find_in_list(
3571 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3575 for kdud
in vnfd_with_id
["kdu"]
3576 if kdud
["name"] == kdur
["kdu-name"]
3578 namespace
= kdur
.get("k8s-namespace")
3579 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
3580 if kdur
.get("helm-chart"):
3581 kdumodel
= kdur
["helm-chart"]
3582 # Default version: helm3, if helm-version is v2 assign v2
3583 k8sclustertype
= "helm-chart-v3"
3584 self
.logger
.debug("kdur: {}".format(kdur
))
3586 kdur
.get("helm-version")
3587 and kdur
.get("helm-version") == "v2"
3589 k8sclustertype
= "helm-chart"
3590 elif kdur
.get("juju-bundle"):
3591 kdumodel
= kdur
["juju-bundle"]
3592 k8sclustertype
= "juju-bundle"
3595 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3596 "juju-bundle. Maybe an old NBI version is running".format(
3597 vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]
3600 # check if kdumodel is a file and exists
3602 vnfd_with_id
= find_in_list(
3603 db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
3605 storage
= deep_get(vnfd_with_id
, ("_admin", "storage"))
3606 if storage
: # may be not present if vnfd has not artifacts
3607 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3608 if storage
["pkg-dir"]:
3609 filename
= "{}/{}/{}s/{}".format(
3616 filename
= "{}/Scripts/{}s/{}".format(
3621 if self
.fs
.file_exists(
3622 filename
, mode
="file"
3623 ) or self
.fs
.file_exists(filename
, mode
="dir"):
3624 kdumodel
= self
.fs
.path
+ filename
3625 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
3627 except Exception: # it is not a file
3630 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
3631 step
= "Synchronize repos for k8s cluster '{}'".format(
3634 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
3638 k8sclustertype
== "helm-chart"
3639 and cluster_uuid
not in updated_cluster_list
3641 k8sclustertype
== "helm-chart-v3"
3642 and cluster_uuid
not in updated_v3_cluster_list
3644 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
3645 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(
3646 cluster_uuid
=cluster_uuid
3649 if del_repo_list
or added_repo_dict
:
3650 if k8sclustertype
== "helm-chart":
3652 "_admin.helm_charts_added." + item
: None
3653 for item
in del_repo_list
3656 "_admin.helm_charts_added." + item
: name
3657 for item
, name
in added_repo_dict
.items()
3659 updated_cluster_list
.append(cluster_uuid
)
3660 elif k8sclustertype
== "helm-chart-v3":
3662 "_admin.helm_charts_v3_added." + item
: None
3663 for item
in del_repo_list
3666 "_admin.helm_charts_v3_added." + item
: name
3667 for item
, name
in added_repo_dict
.items()
3669 updated_v3_cluster_list
.append(cluster_uuid
)
3671 logging_text
+ "repos synchronized on k8s cluster "
3672 "'{}' to_delete: {}, to_add: {}".format(
3673 k8s_cluster_id
, del_repo_list
, added_repo_dict
3678 {"_id": k8s_cluster_id
},
3684 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(
3685 vnfr_data
["member-vnf-index-ref"],
3689 k8s_instance_info
= {
3690 "kdu-instance": None,
3691 "k8scluster-uuid": cluster_uuid
,
3692 "k8scluster-type": k8sclustertype
,
3693 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
3694 "kdu-name": kdur
["kdu-name"],
3695 "kdu-model": kdumodel
,
3696 "namespace": namespace
,
3697 "kdu-deployment-name": kdu_deployment_name
,
3699 db_path
= "_admin.deployed.K8s.{}".format(index
)
3700 db_nsr_update
[db_path
] = k8s_instance_info
3701 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3702 vnfd_with_id
= find_in_list(
3703 db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
3705 task
= asyncio
.ensure_future(
3714 k8params
=desc_params
,
3719 self
.lcm_tasks
.register(
3723 "instantiate_KDU-{}".format(index
),
3726 task_instantiation_info
[task
] = "Deploying KDU {}".format(
3732 except (LcmException
, asyncio
.CancelledError
):
3734 except Exception as e
:
3735 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
3736 if isinstance(e
, (N2VCException
, DbException
)):
3737 self
.logger
.error(logging_text
+ msg
)
3739 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
3740 raise LcmException(msg
)
3743 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3762 task_instantiation_info
,
3765 # launch instantiate_N2VC in a asyncio task and register task object
3766 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3767 # if not found, create one entry and update database
3768 # fill db_nsr._admin.deployed.VCA.<index>
3771 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
3773 if "execution-environment-list" in descriptor_config
:
3774 ee_list
= descriptor_config
.get("execution-environment-list", [])
3775 elif "juju" in descriptor_config
:
3776 ee_list
= [descriptor_config
] # ns charms
3777 else: # other types as script are not supported
3780 for ee_item
in ee_list
:
3783 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3784 ee_item
.get("juju"), ee_item
.get("helm-chart")
3787 ee_descriptor_id
= ee_item
.get("id")
3788 if ee_item
.get("juju"):
3789 vca_name
= ee_item
["juju"].get("charm")
3792 if ee_item
["juju"].get("charm") is not None
3795 if ee_item
["juju"].get("cloud") == "k8s":
3796 vca_type
= "k8s_proxy_charm"
3797 elif ee_item
["juju"].get("proxy") is False:
3798 vca_type
= "native_charm"
3799 elif ee_item
.get("helm-chart"):
3800 vca_name
= ee_item
["helm-chart"]
3801 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
3804 vca_type
= "helm-v3"
3807 logging_text
+ "skipping non juju neither charm configuration"
3812 for vca_index
, vca_deployed
in enumerate(
3813 db_nsr
["_admin"]["deployed"]["VCA"]
3815 if not vca_deployed
:
3818 vca_deployed
.get("member-vnf-index") == member_vnf_index
3819 and vca_deployed
.get("vdu_id") == vdu_id
3820 and vca_deployed
.get("kdu_name") == kdu_name
3821 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
3822 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
3826 # not found, create one.
3828 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
3831 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
3833 target
+= "/kdu/{}".format(kdu_name
)
3835 "target_element": target
,
3836 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3837 "member-vnf-index": member_vnf_index
,
3839 "kdu_name": kdu_name
,
3840 "vdu_count_index": vdu_index
,
3841 "operational-status": "init", # TODO revise
3842 "detailed-status": "", # TODO revise
3843 "step": "initial-deploy", # TODO revise
3845 "vdu_name": vdu_name
,
3847 "ee_descriptor_id": ee_descriptor_id
,
3851 # create VCA and configurationStatus in db
3853 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
3854 "configurationStatus.{}".format(vca_index
): dict(),
3856 self
.update_db_2("nsrs", nsr_id
, db_dict
)
3858 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
3860 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
3861 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
3862 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
3865 task_n2vc
= asyncio
.ensure_future(
3866 self
.instantiate_N2VC(
3867 logging_text
=logging_text
,
3868 vca_index
=vca_index
,
3874 vdu_index
=vdu_index
,
3875 deploy_params
=deploy_params
,
3876 config_descriptor
=descriptor_config
,
3877 base_folder
=base_folder
,
3878 nslcmop_id
=nslcmop_id
,
3882 ee_config_descriptor
=ee_item
,
3885 self
.lcm_tasks
.register(
3889 "instantiate_N2VC-{}".format(vca_index
),
3892 task_instantiation_info
[
3894 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
3895 member_vnf_index
or "", vdu_id
or ""
3899 def _create_nslcmop(nsr_id
, operation
, params
):
3901 Creates a ns-lcm-opp content to be stored at database.
3902 :param nsr_id: internal id of the instance
3903 :param operation: instantiate, terminate, scale, action, ...
3904 :param params: user parameters for the operation
3905 :return: dictionary following SOL005 format
3907 # Raise exception if invalid arguments
3908 if not (nsr_id
and operation
and params
):
3910 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3917 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3918 "operationState": "PROCESSING",
3919 "statusEnteredTime": now
,
3920 "nsInstanceId": nsr_id
,
3921 "lcmOperationType": operation
,
3923 "isAutomaticInvocation": False,
3924 "operationParams": params
,
3925 "isCancelPending": False,
3927 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
3928 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
3933 def _format_additional_params(self
, params
):
3934 params
= params
or {}
3935 for key
, value
in params
.items():
3936 if str(value
).startswith("!!yaml "):
3937 params
[key
] = yaml
.safe_load(value
[7:])
3940 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
3941 primitive
= seq
.get("name")
3942 primitive_params
= {}
3944 "member_vnf_index": vnf_index
,
3945 "primitive": primitive
,
3946 "primitive_params": primitive_params
,
3949 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
3953 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
3954 op
= deep_get(db_nslcmop
, ("_admin", "operations"), [])[op_index
]
3955 if op
.get("operationState") == "COMPLETED":
3956 # b. Skip sub-operation
3957 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3958 return self
.SUBOPERATION_STATUS_SKIP
3960 # c. retry executing sub-operation
3961 # The sub-operation exists, and operationState != 'COMPLETED'
3962 # Update operationState = 'PROCESSING' to indicate a retry.
3963 operationState
= "PROCESSING"
3964 detailed_status
= "In progress"
3965 self
._update
_suboperation
_status
(
3966 db_nslcmop
, op_index
, operationState
, detailed_status
3968 # Return the sub-operation index
3969 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3970 # with arguments extracted from the sub-operation
3973 # Find a sub-operation where all keys in a matching dictionary must match
3974 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3975 def _find_suboperation(self
, db_nslcmop
, match
):
3976 if db_nslcmop
and match
:
3977 op_list
= db_nslcmop
.get("_admin", {}).get("operations", [])
3978 for i
, op
in enumerate(op_list
):
3979 if all(op
.get(k
) == match
[k
] for k
in match
):
3981 return self
.SUBOPERATION_STATUS_NOT_FOUND
3983 # Update status for a sub-operation given its index
3984 def _update_suboperation_status(
3985 self
, db_nslcmop
, op_index
, operationState
, detailed_status
3987 # Update DB for HA tasks
3988 q_filter
= {"_id": db_nslcmop
["_id"]}
3990 "_admin.operations.{}.operationState".format(op_index
): operationState
,
3991 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
3994 "nslcmops", q_filter
=q_filter
, update_dict
=update_dict
, fail_on_empty
=False
3997 # Add sub-operation, return the index of the added sub-operation
3998 # Optionally, set operationState, detailed-status, and operationType
3999 # Status and type are currently set for 'scale' sub-operations:
4000 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
4001 # 'detailed-status' : status message
4002 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
4003 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
4004 def _add_suboperation(
4012 mapped_primitive_params
,
4013 operationState
=None,
4014 detailed_status
=None,
4017 RO_scaling_info
=None,
4020 return self
.SUBOPERATION_STATUS_NOT_FOUND
4021 # Get the "_admin.operations" list, if it exists
4022 db_nslcmop_admin
= db_nslcmop
.get("_admin", {})
4023 op_list
= db_nslcmop_admin
.get("operations")
4024 # Create or append to the "_admin.operations" list
4026 "member_vnf_index": vnf_index
,
4028 "vdu_count_index": vdu_count_index
,
4029 "primitive": primitive
,
4030 "primitive_params": mapped_primitive_params
,
4033 new_op
["operationState"] = operationState
4035 new_op
["detailed-status"] = detailed_status
4037 new_op
["lcmOperationType"] = operationType
4039 new_op
["RO_nsr_id"] = RO_nsr_id
4041 new_op
["RO_scaling_info"] = RO_scaling_info
4043 # No existing operations, create key 'operations' with current operation as first list element
4044 db_nslcmop_admin
.update({"operations": [new_op
]})
4045 op_list
= db_nslcmop_admin
.get("operations")
4047 # Existing operations, append operation to list
4048 op_list
.append(new_op
)
4050 db_nslcmop_update
= {"_admin.operations": op_list
}
4051 self
.update_db_2("nslcmops", db_nslcmop
["_id"], db_nslcmop_update
)
4052 op_index
= len(op_list
) - 1
4055 # Helper methods for scale() sub-operations
4057 # pre-scale/post-scale:
4058 # Check for 3 different cases:
4059 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
4060 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
4061 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
4062 def _check_or_add_scale_suboperation(
4066 vnf_config_primitive
,
4070 RO_scaling_info
=None,
4072 # Find this sub-operation
4073 if RO_nsr_id
and RO_scaling_info
:
4074 operationType
= "SCALE-RO"
4076 "member_vnf_index": vnf_index
,
4077 "RO_nsr_id": RO_nsr_id
,
4078 "RO_scaling_info": RO_scaling_info
,
4082 "member_vnf_index": vnf_index
,
4083 "primitive": vnf_config_primitive
,
4084 "primitive_params": primitive_params
,
4085 "lcmOperationType": operationType
,
4087 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
4088 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
4089 # a. New sub-operation
4090 # The sub-operation does not exist, add it.
4091 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4092 # The following parameters are set to None for all kind of scaling:
4094 vdu_count_index
= None
4096 if RO_nsr_id
and RO_scaling_info
:
4097 vnf_config_primitive
= None
4098 primitive_params
= None
4101 RO_scaling_info
= None
4102 # Initial status for sub-operation
4103 operationState
= "PROCESSING"
4104 detailed_status
= "In progress"
4105 # Add sub-operation for pre/post-scaling (zero or more operations)
4106 self
._add
_suboperation
(
4112 vnf_config_primitive
,
4120 return self
.SUBOPERATION_STATUS_NEW
4122 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4123 # or op_index (operationState != 'COMPLETED')
4124 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
4126 # Function to return execution_environment id
4128 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
4129 # TODO vdu_index_count
4130 for vca
in vca_deployed_list
:
4131 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
4134 async def destroy_N2VC(
4142 exec_primitives
=True,
4147 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4148 :param logging_text:
4150 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4151 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4152 :param vca_index: index in the database _admin.deployed.VCA
4153 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4154 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4155 not executed properly
4156 :param scaling_in: True destroys the application, False destroys the model
4157 :return: None or exception
4162 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4163 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
4167 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
4169 # execute terminate_primitives
4171 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
4172 config_descriptor
.get("terminate-config-primitive"),
4173 vca_deployed
.get("ee_descriptor_id"),
4175 vdu_id
= vca_deployed
.get("vdu_id")
4176 vdu_count_index
= vca_deployed
.get("vdu_count_index")
4177 vdu_name
= vca_deployed
.get("vdu_name")
4178 vnf_index
= vca_deployed
.get("member-vnf-index")
4179 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
4180 for seq
in terminate_primitives
:
4181 # For each sequence in list, get primitive and call _ns_execute_primitive()
4182 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
4183 vnf_index
, seq
.get("name")
4185 self
.logger
.debug(logging_text
+ step
)
4186 # Create the primitive for each sequence, i.e. "primitive": "touch"
4187 primitive
= seq
.get("name")
4188 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(
4193 self
._add
_suboperation
(
4200 mapped_primitive_params
,
4202 # Sub-operations: Call _ns_execute_primitive() instead of action()
4204 result
, result_detail
= await self
._ns
_execute
_primitive
(
4205 vca_deployed
["ee_id"],
4207 mapped_primitive_params
,
4211 except LcmException
:
4212 # this happens when VCA is not deployed. In this case it is not needed to terminate
4214 result_ok
= ["COMPLETED", "PARTIALLY_COMPLETED"]
4215 if result
not in result_ok
:
4217 "terminate_primitive {} for vnf_member_index={} fails with "
4218 "error {}".format(seq
.get("name"), vnf_index
, result_detail
)
4220 # set that this VCA do not need terminated
4221 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(
4225 "nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False}
4228 # Delete Prometheus Jobs if any
4229 # This uses NSR_ID, so it will destroy any jobs under this index
4230 self
.db
.del_list("prometheus_jobs", {"nsr_id": db_nslcmop
["nsInstanceId"]})
4233 await self
.vca_map
[vca_type
].delete_execution_environment(
4234 vca_deployed
["ee_id"],
4235 scaling_in
=scaling_in
,
4240 async def _delete_all_N2VC(self
, db_nsr
: dict, vca_id
: str = None):
4241 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="TERMINATING")
4242 namespace
= "." + db_nsr
["_id"]
4244 await self
.n2vc
.delete_namespace(
4245 namespace
=namespace
,
4246 total_timeout
=self
.timeout_charm_delete
,
4249 except N2VCNotFound
: # already deleted. Skip
4251 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
="DELETED")
4253 async def _terminate_RO(
4254 self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4257 Terminates a deployment from RO
4258 :param logging_text:
4259 :param nsr_deployed: db_nsr._admin.deployed
4262 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4263 this method will update only the index 2, but it will write on database the concatenated content of the list
4268 ro_nsr_id
= ro_delete_action
= None
4269 if nsr_deployed
and nsr_deployed
.get("RO"):
4270 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
4271 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
4274 stage
[2] = "Deleting ns from VIM."
4275 db_nsr_update
["detailed-status"] = " ".join(stage
)
4276 self
._write
_op
_status
(nslcmop_id
, stage
)
4277 self
.logger
.debug(logging_text
+ stage
[2])
4278 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4279 self
._write
_op
_status
(nslcmop_id
, stage
)
4280 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
4281 ro_delete_action
= desc
["action_id"]
4283 "_admin.deployed.RO.nsr_delete_action_id"
4284 ] = ro_delete_action
4285 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4286 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4287 if ro_delete_action
:
4288 # wait until NS is deleted from VIM
4289 stage
[2] = "Waiting ns deleted from VIM."
4290 detailed_status_old
= None
4294 + " RO_id={} ro_delete_action={}".format(
4295 ro_nsr_id
, ro_delete_action
4298 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4299 self
._write
_op
_status
(nslcmop_id
, stage
)
4301 delete_timeout
= 20 * 60 # 20 minutes
4302 while delete_timeout
> 0:
4303 desc
= await self
.RO
.show(
4305 item_id_name
=ro_nsr_id
,
4306 extra_item
="action",
4307 extra_item_id
=ro_delete_action
,
4311 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4313 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4314 if ns_status
== "ERROR":
4315 raise ROclient
.ROClientException(ns_status_info
)
4316 elif ns_status
== "BUILD":
4317 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
4318 elif ns_status
== "ACTIVE":
4319 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4320 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4325 ), "ROclient.check_action_status returns unknown {}".format(
4328 if stage
[2] != detailed_status_old
:
4329 detailed_status_old
= stage
[2]
4330 db_nsr_update
["detailed-status"] = " ".join(stage
)
4331 self
._write
_op
_status
(nslcmop_id
, stage
)
4332 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4333 await asyncio
.sleep(5, loop
=self
.loop
)
4335 else: # delete_timeout <= 0:
4336 raise ROclient
.ROClientException(
4337 "Timeout waiting ns deleted from VIM"
4340 except Exception as e
:
4341 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4343 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4345 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
4346 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
4347 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
4349 logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
)
4352 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4354 failed_detail
.append("delete conflict: {}".format(e
))
4357 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
)
4360 failed_detail
.append("delete error: {}".format(e
))
4362 logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
)
4366 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
4367 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
4369 stage
[2] = "Deleting nsd from RO."
4370 db_nsr_update
["detailed-status"] = " ".join(stage
)
4371 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4372 self
._write
_op
_status
(nslcmop_id
, stage
)
4373 await self
.RO
.delete("nsd", ro_nsd_id
)
4375 logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
)
4377 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4378 except Exception as e
:
4380 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4382 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
4384 logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
)
4387 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4389 failed_detail
.append(
4390 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
)
4392 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4394 failed_detail
.append(
4395 "ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
)
4397 self
.logger
.error(logging_text
+ failed_detail
[-1])
4399 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
4400 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
4401 if not vnf_deployed
or not vnf_deployed
["id"]:
4404 ro_vnfd_id
= vnf_deployed
["id"]
4407 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4408 vnf_deployed
["member-vnf-index"], ro_vnfd_id
4410 db_nsr_update
["detailed-status"] = " ".join(stage
)
4411 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4412 self
._write
_op
_status
(nslcmop_id
, stage
)
4413 await self
.RO
.delete("vnfd", ro_vnfd_id
)
4415 logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
)
4417 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
4418 except Exception as e
:
4420 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404
4423 "_admin.deployed.RO.vnfd.{}.id".format(index
)
4427 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
)
4430 isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409
4432 failed_detail
.append(
4433 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
)
4435 self
.logger
.debug(logging_text
+ failed_detail
[-1])
4437 failed_detail
.append(
4438 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
)
4440 self
.logger
.error(logging_text
+ failed_detail
[-1])
4443 stage
[2] = "Error deleting from VIM"
4445 stage
[2] = "Deleted from VIM"
4446 db_nsr_update
["detailed-status"] = " ".join(stage
)
4447 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4448 self
._write
_op
_status
(nslcmop_id
, stage
)
4451 raise LcmException("; ".join(failed_detail
))
4453 async def terminate(self
, nsr_id
, nslcmop_id
):
4454 # Try to lock HA task here
4455 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
4456 if not task_is_locked_by_me
:
4459 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
4460 self
.logger
.debug(logging_text
+ "Enter")
4461 timeout_ns_terminate
= self
.timeout_ns_terminate
4464 operation_params
= None
4466 error_list
= [] # annotates all failed error messages
4467 db_nslcmop_update
= {}
4468 autoremove
= False # autoremove after terminated
4469 tasks_dict_info
= {}
4472 "Stage 1/3: Preparing task.",
4473 "Waiting for previous operations to terminate.",
4476 # ^ contains [stage, step, VIM-status]
4478 # wait for any previous tasks in process
4479 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
4481 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
4482 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
4483 operation_params
= db_nslcmop
.get("operationParams") or {}
4484 if operation_params
.get("timeout_ns_terminate"):
4485 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
4486 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
4487 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
4489 db_nsr_update
["operational-status"] = "terminating"
4490 db_nsr_update
["config-status"] = "terminating"
4491 self
._write
_ns
_status
(
4493 ns_state
="TERMINATING",
4494 current_operation
="TERMINATING",
4495 current_operation_id
=nslcmop_id
,
4496 other_update
=db_nsr_update
,
4498 self
._write
_op
_status
(op_id
=nslcmop_id
, queuePosition
=0, stage
=stage
)
4499 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
4500 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
4503 stage
[1] = "Getting vnf descriptors from db."
4504 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
4506 db_vnfr
["member-vnf-index-ref"]: db_vnfr
for db_vnfr
in db_vnfrs_list
4508 db_vnfds_from_id
= {}
4509 db_vnfds_from_member_index
= {}
4511 for vnfr
in db_vnfrs_list
:
4512 vnfd_id
= vnfr
["vnfd-id"]
4513 if vnfd_id
not in db_vnfds_from_id
:
4514 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4515 db_vnfds_from_id
[vnfd_id
] = vnfd
4516 db_vnfds_from_member_index
[
4517 vnfr
["member-vnf-index-ref"]
4518 ] = db_vnfds_from_id
[vnfd_id
]
4520 # Destroy individual execution environments when there are terminating primitives.
4521 # Rest of EE will be deleted at once
4522 # TODO - check before calling _destroy_N2VC
4523 # if not operation_params.get("skip_terminate_primitives"):#
4524 # or not vca.get("needed_terminate"):
4525 stage
[0] = "Stage 2/3 execute terminating primitives."
4526 self
.logger
.debug(logging_text
+ stage
[0])
4527 stage
[1] = "Looking execution environment that needs terminate."
4528 self
.logger
.debug(logging_text
+ stage
[1])
4530 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
4531 config_descriptor
= None
4532 vca_member_vnf_index
= vca
.get("member-vnf-index")
4533 vca_id
= self
.get_vca_id(
4534 db_vnfrs_dict
.get(vca_member_vnf_index
)
4535 if vca_member_vnf_index
4539 if not vca
or not vca
.get("ee_id"):
4541 if not vca
.get("member-vnf-index"):
4543 config_descriptor
= db_nsr
.get("ns-configuration")
4544 elif vca
.get("vdu_id"):
4545 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4546 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
4547 elif vca
.get("kdu_name"):
4548 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4549 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
4551 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
4552 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
4553 vca_type
= vca
.get("type")
4554 exec_terminate_primitives
= not operation_params
.get(
4555 "skip_terminate_primitives"
4556 ) and vca
.get("needed_terminate")
4557 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4558 # pending native charms
4560 True if vca_type
in ("helm", "helm-v3", "native_charm") else False
4562 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4563 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4564 task
= asyncio
.ensure_future(
4572 exec_terminate_primitives
,
4576 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
4578 # wait for pending tasks of terminate primitives
4582 + "Waiting for tasks {}".format(list(tasks_dict_info
.keys()))
4584 error_list
= await self
._wait
_for
_tasks
(
4587 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
4591 tasks_dict_info
.clear()
4593 return # raise LcmException("; ".join(error_list))
4595 # remove All execution environments at once
4596 stage
[0] = "Stage 3/3 delete all."
4598 if nsr_deployed
.get("VCA"):
4599 stage
[1] = "Deleting all execution environments."
4600 self
.logger
.debug(logging_text
+ stage
[1])
4601 vca_id
= self
.get_vca_id({}, db_nsr
)
4602 task_delete_ee
= asyncio
.ensure_future(
4604 self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
, vca_id
=vca_id
),
4605 timeout
=self
.timeout_charm_delete
,
4608 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4609 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
4611 # Delete from k8scluster
4612 stage
[1] = "Deleting KDUs."
4613 self
.logger
.debug(logging_text
+ stage
[1])
4614 # print(nsr_deployed)
4615 for kdu
in get_iterable(nsr_deployed
, "K8s"):
4616 if not kdu
or not kdu
.get("kdu-instance"):
4618 kdu_instance
= kdu
.get("kdu-instance")
4619 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
4620 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4621 vca_id
= self
.get_vca_id({}, db_nsr
)
4622 task_delete_kdu_instance
= asyncio
.ensure_future(
4623 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
4624 cluster_uuid
=kdu
.get("k8scluster-uuid"),
4625 kdu_instance
=kdu_instance
,
4627 namespace
=kdu
.get("namespace"),
4633 + "Unknown k8s deployment type {}".format(
4634 kdu
.get("k8scluster-type")
4639 task_delete_kdu_instance
4640 ] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
4643 stage
[1] = "Deleting ns from VIM."
4645 task_delete_ro
= asyncio
.ensure_future(
4646 self
._terminate
_ng
_ro
(
4647 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4651 task_delete_ro
= asyncio
.ensure_future(
4653 logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
4656 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
4658 # rest of staff will be done at finally
4661 ROclient
.ROClientException
,
4666 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4668 except asyncio
.CancelledError
:
4670 logging_text
+ "Cancelled Exception while '{}'".format(stage
[1])
4672 exc
= "Operation was cancelled"
4673 except Exception as e
:
4674 exc
= traceback
.format_exc()
4675 self
.logger
.critical(
4676 logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
),
4681 error_list
.append(str(exc
))
4683 # wait for pending tasks
4685 stage
[1] = "Waiting for terminate pending tasks."
4686 self
.logger
.debug(logging_text
+ stage
[1])
4687 error_list
+= await self
._wait
_for
_tasks
(
4690 timeout_ns_terminate
,
4694 stage
[1] = stage
[2] = ""
4695 except asyncio
.CancelledError
:
4696 error_list
.append("Cancelled")
4697 # TODO cancell all tasks
4698 except Exception as exc
:
4699 error_list
.append(str(exc
))
4700 # update status at database
4702 error_detail
= "; ".join(error_list
)
4703 # self.logger.error(logging_text + error_detail)
4704 error_description_nslcmop
= "{} Detail: {}".format(
4705 stage
[0], error_detail
4707 error_description_nsr
= "Operation: TERMINATING.{}, {}.".format(
4708 nslcmop_id
, stage
[0]
4711 db_nsr_update
["operational-status"] = "failed"
4712 db_nsr_update
["detailed-status"] = (
4713 error_description_nsr
+ " Detail: " + error_detail
4715 db_nslcmop_update
["detailed-status"] = error_detail
4716 nslcmop_operation_state
= "FAILED"
4720 error_description_nsr
= error_description_nslcmop
= None
4721 ns_state
= "NOT_INSTANTIATED"
4722 db_nsr_update
["operational-status"] = "terminated"
4723 db_nsr_update
["detailed-status"] = "Done"
4724 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
4725 db_nslcmop_update
["detailed-status"] = "Done"
4726 nslcmop_operation_state
= "COMPLETED"
4729 self
._write
_ns
_status
(
4732 current_operation
="IDLE",
4733 current_operation_id
=None,
4734 error_description
=error_description_nsr
,
4735 error_detail
=error_detail
,
4736 other_update
=db_nsr_update
,
4738 self
._write
_op
_status
(
4741 error_message
=error_description_nslcmop
,
4742 operation_state
=nslcmop_operation_state
,
4743 other_update
=db_nslcmop_update
,
4745 if ns_state
== "NOT_INSTANTIATED":
4749 {"nsr-id-ref": nsr_id
},
4750 {"_admin.nsState": "NOT_INSTANTIATED"},
4752 except DbException
as e
:
4755 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4759 if operation_params
:
4760 autoremove
= operation_params
.get("autoremove", False)
4761 if nslcmop_operation_state
:
4763 await self
.msg
.aiowrite(
4768 "nslcmop_id": nslcmop_id
,
4769 "operationState": nslcmop_operation_state
,
4770 "autoremove": autoremove
,
4774 except Exception as e
:
4776 logging_text
+ "kafka_write notification Exception {}".format(e
)
4779 self
.logger
.debug(logging_text
+ "Exit")
4780 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
4782 async def _wait_for_tasks(
4783 self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None
4786 error_detail_list
= []
4788 pending_tasks
= list(created_tasks_info
.keys())
4789 num_tasks
= len(pending_tasks
)
4791 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4792 self
._write
_op
_status
(nslcmop_id
, stage
)
4793 while pending_tasks
:
4795 _timeout
= timeout
+ time_start
- time()
4796 done
, pending_tasks
= await asyncio
.wait(
4797 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
4799 num_done
+= len(done
)
4800 if not done
: # Timeout
4801 for task
in pending_tasks
:
4802 new_error
= created_tasks_info
[task
] + ": Timeout"
4803 error_detail_list
.append(new_error
)
4804 error_list
.append(new_error
)
4807 if task
.cancelled():
4810 exc
= task
.exception()
4812 if isinstance(exc
, asyncio
.TimeoutError
):
4814 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
4815 error_list
.append(created_tasks_info
[task
])
4816 error_detail_list
.append(new_error
)
4823 ROclient
.ROClientException
,
4829 self
.logger
.error(logging_text
+ new_error
)
4831 exc_traceback
= "".join(
4832 traceback
.format_exception(None, exc
, exc
.__traceback
__)
4836 + created_tasks_info
[task
]
4842 logging_text
+ created_tasks_info
[task
] + ": Done"
4844 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
4846 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
4847 if nsr_id
: # update also nsr
4852 "errorDescription": "Error at: " + ", ".join(error_list
),
4853 "errorDetail": ". ".join(error_detail_list
),
4856 self
._write
_op
_status
(nslcmop_id
, stage
)
4857 return error_detail_list
4860 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
4862 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4863 The default-value is used. If it is between < > it look for a value at instantiation_params
4864 :param primitive_desc: portion of VNFD/NSD that describes primitive
4865 :param params: Params provided by user
4866 :param instantiation_params: Instantiation params provided by user
4867 :return: a dictionary with the calculated params
4869 calculated_params
= {}
4870 for parameter
in primitive_desc
.get("parameter", ()):
4871 param_name
= parameter
["name"]
4872 if param_name
in params
:
4873 calculated_params
[param_name
] = params
[param_name
]
4874 elif "default-value" in parameter
or "value" in parameter
:
4875 if "value" in parameter
:
4876 calculated_params
[param_name
] = parameter
["value"]
4878 calculated_params
[param_name
] = parameter
["default-value"]
4880 isinstance(calculated_params
[param_name
], str)
4881 and calculated_params
[param_name
].startswith("<")
4882 and calculated_params
[param_name
].endswith(">")
4884 if calculated_params
[param_name
][1:-1] in instantiation_params
:
4885 calculated_params
[param_name
] = instantiation_params
[
4886 calculated_params
[param_name
][1:-1]
4890 "Parameter {} needed to execute primitive {} not provided".format(
4891 calculated_params
[param_name
], primitive_desc
["name"]
4896 "Parameter {} needed to execute primitive {} not provided".format(
4897 param_name
, primitive_desc
["name"]
4901 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
4902 calculated_params
[param_name
] = yaml
.safe_dump(
4903 calculated_params
[param_name
], default_flow_style
=True, width
=256
4905 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[
4907 ].startswith("!!yaml "):
4908 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
4909 if parameter
.get("data-type") == "INTEGER":
4911 calculated_params
[param_name
] = int(calculated_params
[param_name
])
4912 except ValueError: # error converting string to int
4914 "Parameter {} of primitive {} must be integer".format(
4915 param_name
, primitive_desc
["name"]
4918 elif parameter
.get("data-type") == "BOOLEAN":
4919 calculated_params
[param_name
] = not (
4920 (str(calculated_params
[param_name
])).lower() == "false"
4923 # add always ns_config_info if primitive name is config
4924 if primitive_desc
["name"] == "config":
4925 if "ns_config_info" in instantiation_params
:
4926 calculated_params
["ns_config_info"] = instantiation_params
[
4929 return calculated_params
4931 def _look_for_deployed_vca(
4938 ee_descriptor_id
=None,
4940 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4941 for vca
in deployed_vca
:
4944 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
4947 vdu_count_index
is not None
4948 and vdu_count_index
!= vca
["vdu_count_index"]
4951 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
4953 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
4957 # vca_deployed not found
4959 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4960 " is not deployed".format(
4969 ee_id
= vca
.get("ee_id")
4971 "type", "lxc_proxy_charm"
4972 ) # default value for backward compatibility - proxy charm
4975 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4976 "execution environment".format(
4977 member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
4980 return ee_id
, vca_type
4982 async def _ns_execute_primitive(
4988 retries_interval
=30,
4995 if primitive
== "config":
4996 primitive_params
= {"params": primitive_params
}
4998 vca_type
= vca_type
or "lxc_proxy_charm"
5002 output
= await asyncio
.wait_for(
5003 self
.vca_map
[vca_type
].exec_primitive(
5005 primitive_name
=primitive
,
5006 params_dict
=primitive_params
,
5007 progress_timeout
=self
.timeout_progress_primitive
,
5008 total_timeout
=self
.timeout_primitive
,
5013 timeout
=timeout
or self
.timeout_primitive
,
5017 except asyncio
.CancelledError
:
5019 except Exception as e
: # asyncio.TimeoutError
5020 if isinstance(e
, asyncio
.TimeoutError
):
5025 "Error executing action {} on {} -> {}".format(
5030 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
5032 return "FAILED", str(e
)
5034 return "COMPLETED", output
5036 except (LcmException
, asyncio
.CancelledError
):
5038 except Exception as e
:
5039 return "FAIL", "Error executing action {}: {}".format(primitive
, e
)
5041 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
5043 Updating the vca_status with latest juju information in nsrs record
5044 :param: nsr_id: Id of the nsr
5045 :param: nslcmop_id: Id of the nslcmop
5049 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
5050 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5051 vca_id
= self
.get_vca_id({}, db_nsr
)
5052 if db_nsr
["_admin"]["deployed"]["K8s"]:
5053 for _
, k8s
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
5054 cluster_uuid
, kdu_instance
, cluster_type
= (
5055 k8s
["k8scluster-uuid"],
5056 k8s
["kdu-instance"],
5057 k8s
["k8scluster-type"],
5059 await self
._on
_update
_k
8s
_db
(
5060 cluster_uuid
=cluster_uuid
,
5061 kdu_instance
=kdu_instance
,
5062 filter={"_id": nsr_id
},
5064 cluster_type
=cluster_type
,
5067 for vca_index
, _
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
5068 table
, filter = "nsrs", {"_id": nsr_id
}
5069 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
5070 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
5072 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
5073 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
5075 async def action(self
, nsr_id
, nslcmop_id
):
5076 # Try to lock HA task here
5077 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5078 if not task_is_locked_by_me
:
5081 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
5082 self
.logger
.debug(logging_text
+ "Enter")
5083 # get all needed from database
5087 db_nslcmop_update
= {}
5088 nslcmop_operation_state
= None
5089 error_description_nslcmop
= None
5092 # wait for any previous tasks in process
5093 step
= "Waiting for previous operations to terminate"
5094 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5096 self
._write
_ns
_status
(
5099 current_operation
="RUNNING ACTION",
5100 current_operation_id
=nslcmop_id
,
5103 step
= "Getting information from database"
5104 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5105 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5106 if db_nslcmop
["operationParams"].get("primitive_params"):
5107 db_nslcmop
["operationParams"]["primitive_params"] = json
.loads(
5108 db_nslcmop
["operationParams"]["primitive_params"]
5111 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5112 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
5113 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
5114 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
5115 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
5116 primitive
= db_nslcmop
["operationParams"]["primitive"]
5117 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
5118 timeout_ns_action
= db_nslcmop
["operationParams"].get(
5119 "timeout_ns_action", self
.timeout_primitive
5123 step
= "Getting vnfr from database"
5124 db_vnfr
= self
.db
.get_one(
5125 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
5127 if db_vnfr
.get("kdur"):
5129 for kdur
in db_vnfr
["kdur"]:
5130 if kdur
.get("additionalParams"):
5131 kdur
["additionalParams"] = json
.loads(
5132 kdur
["additionalParams"]
5134 kdur_list
.append(kdur
)
5135 db_vnfr
["kdur"] = kdur_list
5136 step
= "Getting vnfd from database"
5137 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
5139 # Sync filesystem before running a primitive
5140 self
.fs
.sync(db_vnfr
["vnfd-id"])
5142 step
= "Getting nsd from database"
5143 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
5145 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5146 # for backward compatibility
5147 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
5148 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
5149 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
5150 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5152 # look for primitive
5153 config_primitive_desc
= descriptor_configuration
= None
5155 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
5157 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
5159 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
5161 descriptor_configuration
= db_nsd
.get("ns-configuration")
5163 if descriptor_configuration
and descriptor_configuration
.get(
5166 for config_primitive
in descriptor_configuration
["config-primitive"]:
5167 if config_primitive
["name"] == primitive
:
5168 config_primitive_desc
= config_primitive
5171 if not config_primitive_desc
:
5172 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
5174 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5178 primitive_name
= primitive
5179 ee_descriptor_id
= None
5181 primitive_name
= config_primitive_desc
.get(
5182 "execution-environment-primitive", primitive
5184 ee_descriptor_id
= config_primitive_desc
.get(
5185 "execution-environment-ref"
5191 (x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None
5193 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
5196 (x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None
5198 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
5200 desc_params
= parse_yaml_strings(
5201 db_vnfr
.get("additionalParamsForVnf")
5204 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
5205 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
5206 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
5208 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
5209 actions
.add(primitive
["name"])
5210 for primitive
in kdu_configuration
.get("config-primitive", []):
5211 actions
.add(primitive
["name"])
5213 nsr_deployed
["K8s"],
5214 lambda kdu
: kdu_name
== kdu
["kdu-name"]
5215 and kdu
["member-vnf-index"] == vnf_index
,
5219 if primitive_name
in actions
5220 and kdu
["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5224 # TODO check if ns is in a proper status
5226 primitive_name
in ("upgrade", "rollback", "status") or kdu_action
5228 # kdur and desc_params already set from before
5229 if primitive_params
:
5230 desc_params
.update(primitive_params
)
5231 # TODO Check if we will need something at vnf level
5232 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
5234 kdu_name
== kdu
["kdu-name"]
5235 and kdu
["member-vnf-index"] == vnf_index
5240 "KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
)
5243 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
5244 msg
= "unknown k8scluster-type '{}'".format(
5245 kdu
.get("k8scluster-type")
5247 raise LcmException(msg
)
5250 "collection": "nsrs",
5251 "filter": {"_id": nsr_id
},
5252 "path": "_admin.deployed.K8s.{}".format(index
),
5256 + "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
)
5258 step
= "Executing kdu {}".format(primitive_name
)
5259 if primitive_name
== "upgrade":
5260 if desc_params
.get("kdu_model"):
5261 kdu_model
= desc_params
.get("kdu_model")
5262 del desc_params
["kdu_model"]
5264 kdu_model
= kdu
.get("kdu-model")
5265 parts
= kdu_model
.split(sep
=":")
5267 kdu_model
= parts
[0]
5268 if desc_params
.get("kdu_atomic_upgrade"):
5269 atomic_upgrade
= desc_params
.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
5270 del desc_params
["kdu_atomic_upgrade"]
5272 atomic_upgrade
= True
5274 detailed_status
= await asyncio
.wait_for(
5275 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
5276 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5277 kdu_instance
=kdu
.get("kdu-instance"),
5278 atomic
=atomic_upgrade
,
5279 kdu_model
=kdu_model
,
5282 timeout
=timeout_ns_action
,
5284 timeout
=timeout_ns_action
+ 10,
5287 logging_text
+ " Upgrade of kdu {} done".format(detailed_status
)
5289 elif primitive_name
== "rollback":
5290 detailed_status
= await asyncio
.wait_for(
5291 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
5292 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5293 kdu_instance
=kdu
.get("kdu-instance"),
5296 timeout
=timeout_ns_action
,
5298 elif primitive_name
== "status":
5299 detailed_status
= await asyncio
.wait_for(
5300 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
5301 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5302 kdu_instance
=kdu
.get("kdu-instance"),
5305 timeout
=timeout_ns_action
,
5308 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(
5309 kdu
["kdu-name"], nsr_id
5311 params
= self
._map
_primitive
_params
(
5312 config_primitive_desc
, primitive_params
, desc_params
5315 detailed_status
= await asyncio
.wait_for(
5316 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
5317 cluster_uuid
=kdu
.get("k8scluster-uuid"),
5318 kdu_instance
=kdu_instance
,
5319 primitive_name
=primitive_name
,
5322 timeout
=timeout_ns_action
,
5325 timeout
=timeout_ns_action
,
5329 nslcmop_operation_state
= "COMPLETED"
5331 detailed_status
= ""
5332 nslcmop_operation_state
= "FAILED"
5334 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
5335 nsr_deployed
["VCA"],
5336 member_vnf_index
=vnf_index
,
5338 vdu_count_index
=vdu_count_index
,
5339 ee_descriptor_id
=ee_descriptor_id
,
5341 for vca_index
, vca_deployed
in enumerate(
5342 db_nsr
["_admin"]["deployed"]["VCA"]
5344 if vca_deployed
.get("member-vnf-index") == vnf_index
:
5346 "collection": "nsrs",
5347 "filter": {"_id": nsr_id
},
5348 "path": "_admin.deployed.VCA.{}.".format(vca_index
),
5352 nslcmop_operation_state
,
5354 ) = await self
._ns
_execute
_primitive
(
5356 primitive
=primitive_name
,
5357 primitive_params
=self
._map
_primitive
_params
(
5358 config_primitive_desc
, primitive_params
, desc_params
5360 timeout
=timeout_ns_action
,
5366 db_nslcmop_update
["detailed-status"] = detailed_status
5367 error_description_nslcmop
= (
5368 detailed_status
if nslcmop_operation_state
== "FAILED" else ""
5372 + " task Done with result {} {}".format(
5373 nslcmop_operation_state
, detailed_status
5376 return # database update is called inside finally
5378 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
5379 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
5381 except asyncio
.CancelledError
:
5383 logging_text
+ "Cancelled Exception while '{}'".format(step
)
5385 exc
= "Operation was cancelled"
5386 except asyncio
.TimeoutError
:
5387 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
5389 except Exception as e
:
5390 exc
= traceback
.format_exc()
5391 self
.logger
.critical(
5392 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
5401 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
5402 nslcmop_operation_state
= "FAILED"
5404 self
._write
_ns
_status
(
5408 ], # TODO check if degraded. For the moment use previous status
5409 current_operation
="IDLE",
5410 current_operation_id
=None,
5411 # error_description=error_description_nsr,
5412 # error_detail=error_detail,
5413 other_update
=db_nsr_update
,
5416 self
._write
_op
_status
(
5419 error_message
=error_description_nslcmop
,
5420 operation_state
=nslcmop_operation_state
,
5421 other_update
=db_nslcmop_update
,
5424 if nslcmop_operation_state
:
5426 await self
.msg
.aiowrite(
5431 "nslcmop_id": nslcmop_id
,
5432 "operationState": nslcmop_operation_state
,
5436 except Exception as e
:
5438 logging_text
+ "kafka_write notification Exception {}".format(e
)
5440 self
.logger
.debug(logging_text
+ "Exit")
5441 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
5442 return nslcmop_operation_state
, detailed_status
5444 async def terminate_vdus(
5445 self
, db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
5447 """This method terminates VDUs
5450 db_vnfr: VNF instance record
5451 member_vnf_index: VNF index to identify the VDUs to be removed
5452 db_nsr: NS instance record
5453 update_db_nslcmops: Nslcmop update record
5455 vca_scaling_info
= []
5456 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5457 scaling_info
["scaling_direction"] = "IN"
5458 scaling_info
["vdu-delete"] = {}
5459 scaling_info
["kdu-delete"] = {}
5460 db_vdur
= db_vnfr
.get("vdur")
5461 vdur_list
= copy(db_vdur
)
5463 for index
, vdu
in enumerate(vdur_list
):
5464 vca_scaling_info
.append(
5466 "osm_vdu_id": vdu
["vdu-id-ref"],
5467 "member-vnf-index": member_vnf_index
,
5469 "vdu_index": count_index
,
5471 scaling_info
["vdu-delete"][vdu
["vdu-id-ref"]] = count_index
5472 scaling_info
["vdu"].append(
5474 "name": vdu
.get("name") or vdu
.get("vdu-name"),
5475 "vdu_id": vdu
["vdu-id-ref"],
5478 for interface
in vdu
["interfaces"]:
5479 scaling_info
["vdu"][index
]["interface"].append(
5481 "name": interface
["name"],
5482 "ip_address": interface
["ip-address"],
5483 "mac_address": interface
.get("mac-address"),
5485 self
.logger
.info("NS update scaling info{}".format(scaling_info
))
5486 stage
[2] = "Terminating VDUs"
5487 if scaling_info
.get("vdu-delete"):
5488 # scale_process = "RO"
5489 if self
.ro_config
.get("ng"):
5490 await self
._scale
_ng
_ro
(
5491 logging_text
, db_nsr
, update_db_nslcmops
, db_vnfr
, scaling_info
, stage
5494 async def remove_vnf(
5495 self
, nsr_id
, nslcmop_id
, vnf_instance_id
5497 """This method is to Remove VNF instances from NS.
5500 nsr_id: NS instance id
5501 nslcmop_id: nslcmop id of update
5502 vnf_instance_id: id of the VNF instance to be removed
5505 result: (str, str) COMPLETED/FAILED, details
5509 logging_text
= "Task ns={} update ".format(nsr_id
)
5510 check_vnfr_count
= len(self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}))
5511 self
.logger
.info("check_vnfr_count {}".format(check_vnfr_count
))
5512 if check_vnfr_count
> 1:
5513 stage
= ["", "", ""]
5514 step
= "Getting nslcmop from database"
5515 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
5516 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5517 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5518 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5519 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5520 """ db_vnfr = self.db.get_one(
5521 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5523 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5524 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5526 constituent_vnfr
= db_nsr
.get("constituent-vnfr-ref")
5527 constituent_vnfr
.remove(db_vnfr
.get("_id"))
5528 db_nsr_update
["constituent-vnfr-ref"] = db_nsr
.get("constituent-vnfr-ref")
5529 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5530 self
.db
.del_one("vnfrs", {"_id": db_vnfr
.get("_id")})
5531 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5532 return "COMPLETED", "Done"
5534 step
= "Terminate VNF Failed with"
5535 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5537 except (LcmException
, asyncio
.CancelledError
):
5539 except Exception as e
:
5540 self
.logger
.debug("Error removing VNF {}".format(e
))
5541 return "FAILED", "Error removing VNF {}".format(e
)
5543 async def _ns_redeploy_vnf(
5544 self
, nsr_id
, nslcmop_id
, db_vnfd
, db_vnfr
, db_nsr
,
5546 """This method updates and redeploys VNF instances
5549 nsr_id: NS instance id
5550 nslcmop_id: nslcmop id
5551 db_vnfd: VNF descriptor
5552 db_vnfr: VNF instance record
5553 db_nsr: NS instance record
5556 result: (str, str) COMPLETED/FAILED, details
5560 stage
= ["", "", ""]
5561 logging_text
= "Task ns={} update ".format(nsr_id
)
5562 latest_vnfd_revision
= db_vnfd
["_admin"].get("revision")
5563 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5565 # Terminate old VNF resources
5566 update_db_nslcmops
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
5567 await self
.terminate_vdus(db_vnfr
, member_vnf_index
, db_nsr
, update_db_nslcmops
, stage
, logging_text
)
5569 # old_vnfd_id = db_vnfr["vnfd-id"]
5570 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5571 new_db_vnfd
= db_vnfd
5572 # new_vnfd_ref = new_db_vnfd["id"]
5573 # new_vnfd_id = vnfd_id
5577 for cp
in new_db_vnfd
.get("ext-cpd", ()):
5579 "name": cp
.get("id"),
5580 "connection-point-id": cp
.get("int-cpd", {}).get("cpd"),
5581 "connection-point-vdu-id": cp
.get("int-cpd", {}).get("vdu-id"),
5584 new_vnfr_cp
.append(vnf_cp
)
5585 new_vdur
= update_db_nslcmops
["operationParams"]["newVdur"]
5586 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5587 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5588 new_vnfr_update
= {"revision": latest_vnfd_revision
, "connection-point": new_vnfr_cp
, "vdur": new_vdur
, "ip-address": ""}
5589 self
.update_db_2("vnfrs", db_vnfr
["_id"], new_vnfr_update
)
5590 updated_db_vnfr
= self
.db
.get_one(
5591 "vnfrs", {"member-vnf-index-ref": member_vnf_index
, "nsr-id-ref": nsr_id
}
5594 # Instantiate new VNF resources
5595 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5596 vca_scaling_info
= []
5597 scaling_info
= {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5598 scaling_info
["scaling_direction"] = "OUT"
5599 scaling_info
["vdu-create"] = {}
5600 scaling_info
["kdu-create"] = {}
5601 vdud_instantiate_list
= db_vnfd
["vdu"]
5602 for index
, vdud
in enumerate(vdud_instantiate_list
):
5603 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
5607 additional_params
= (
5608 self
._get
_vdu
_additional
_params
(updated_db_vnfr
, vdud
["id"])
5611 cloud_init_list
= []
5613 # TODO Information of its own ip is not available because db_vnfr is not updated.
5614 additional_params
["OSM"] = get_osm_params(
5615 updated_db_vnfr
, vdud
["id"], 1
5617 cloud_init_list
.append(
5618 self
._parse
_cloud
_init
(
5625 vca_scaling_info
.append(
5627 "osm_vdu_id": vdud
["id"],
5628 "member-vnf-index": member_vnf_index
,
5630 "vdu_index": count_index
,
5633 scaling_info
["vdu-create"][vdud
["id"]] = count_index
5634 if self
.ro_config
.get("ng"):
5636 "New Resources to be deployed: {}".format(scaling_info
))
5637 await self
._scale
_ng
_ro
(
5638 logging_text
, db_nsr
, update_db_nslcmops
, updated_db_vnfr
, scaling_info
, stage
5640 return "COMPLETED", "Done"
5641 except (LcmException
, asyncio
.CancelledError
):
5643 except Exception as e
:
5644 self
.logger
.debug("Error updating VNF {}".format(e
))
5645 return "FAILED", "Error updating VNF {}".format(e
)
5647 async def _ns_charm_upgrade(
5653 timeout
: float = None,
5655 """This method upgrade charms in VNF instances
5658 ee_id: Execution environment id
5659 path: Local path to the charm
5661 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5662 timeout: (Float) Timeout for the ns update operation
5665 result: (str, str) COMPLETED/FAILED, details
5668 charm_type
= charm_type
or "lxc_proxy_charm"
5669 output
= await self
.vca_map
[charm_type
].upgrade_charm(
5673 charm_type
=charm_type
,
5674 timeout
=timeout
or self
.timeout_ns_update
,
5678 return "COMPLETED", output
5680 except (LcmException
, asyncio
.CancelledError
):
5683 except Exception as e
:
5685 self
.logger
.debug("Error upgrading charm {}".format(path
))
5687 return "FAILED", "Error upgrading charm {}: {}".format(path
, e
)
5689 async def update(self
, nsr_id
, nslcmop_id
):
5690 """Update NS according to different update types
5692 This method performs upgrade of VNF instances then updates the revision
5693 number in VNF record
5696 nsr_id: Network service will be updated
5697 nslcmop_id: ns lcm operation id
5700 It may raise DbException, LcmException, N2VCException, K8sException
5703 # Try to lock HA task here
5704 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
5705 if not task_is_locked_by_me
:
5708 logging_text
= "Task ns={} update={} ".format(nsr_id
, nslcmop_id
)
5709 self
.logger
.debug(logging_text
+ "Enter")
5711 # Set the required variables to be filled up later
5713 db_nslcmop_update
= {}
5715 nslcmop_operation_state
= None
5717 error_description_nslcmop
= ""
5719 change_type
= "updated"
5720 detailed_status
= ""
5723 # wait for any previous tasks in process
5724 step
= "Waiting for previous operations to terminate"
5725 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
5726 self
._write
_ns
_status
(
5729 current_operation
="UPDATING",
5730 current_operation_id
=nslcmop_id
,
5733 step
= "Getting nslcmop from database"
5734 db_nslcmop
= self
.db
.get_one(
5735 "nslcmops", {"_id": nslcmop_id
}, fail_on_empty
=False
5737 update_type
= db_nslcmop
["operationParams"]["updateType"]
5739 step
= "Getting nsr from database"
5740 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
5741 old_operational_status
= db_nsr
["operational-status"]
5742 db_nsr_update
["operational-status"] = "updating"
5743 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
5744 nsr_deployed
= db_nsr
["_admin"].get("deployed")
5746 if update_type
== "CHANGE_VNFPKG":
5748 # Get the input parameters given through update request
5749 vnf_instance_id
= db_nslcmop
["operationParams"][
5750 "changeVnfPackageData"
5751 ].get("vnfInstanceId")
5753 vnfd_id
= db_nslcmop
["operationParams"]["changeVnfPackageData"].get(
5756 timeout_seconds
= db_nslcmop
["operationParams"].get("timeout_ns_update")
5758 step
= "Getting vnfr from database"
5759 db_vnfr
= self
.db
.get_one(
5760 "vnfrs", {"_id": vnf_instance_id
}, fail_on_empty
=False
5763 step
= "Getting vnfds from database"
5765 latest_vnfd
= self
.db
.get_one(
5766 "vnfds", {"_id": vnfd_id
}, fail_on_empty
=False
5768 latest_vnfd_revision
= latest_vnfd
["_admin"].get("revision")
5771 current_vnf_revision
= db_vnfr
.get("revision", 1)
5772 current_vnfd
= self
.db
.get_one(
5774 {"_id": vnfd_id
+ ":" + str(current_vnf_revision
)},
5775 fail_on_empty
=False,
5777 # Charm artifact paths will be filled up later
5779 current_charm_artifact_path
,
5780 target_charm_artifact_path
,
5781 charm_artifact_paths
,
5784 step
= "Checking if revision has changed in VNFD"
5785 if current_vnf_revision
!= latest_vnfd_revision
:
5787 change_type
= "policy_updated"
5789 # There is new revision of VNFD, update operation is required
5790 current_vnfd_path
= vnfd_id
+ ":" + str(current_vnf_revision
)
5791 latest_vnfd_path
= vnfd_id
+ ":" + str(latest_vnfd_revision
)
5793 step
= "Removing the VNFD packages if they exist in the local path"
5794 shutil
.rmtree(self
.fs
.path
+ current_vnfd_path
, ignore_errors
=True)
5795 shutil
.rmtree(self
.fs
.path
+ latest_vnfd_path
, ignore_errors
=True)
5797 step
= "Get the VNFD packages from FSMongo"
5798 self
.fs
.sync(from_path
=latest_vnfd_path
)
5799 self
.fs
.sync(from_path
=current_vnfd_path
)
5802 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5804 base_folder
= latest_vnfd
["_admin"]["storage"]
5806 for charm_index
, charm_deployed
in enumerate(
5807 get_iterable(nsr_deployed
, "VCA")
5809 vnf_index
= db_vnfr
.get("member-vnf-index-ref")
5811 # Getting charm-id and charm-type
5812 if charm_deployed
.get("member-vnf-index") == vnf_index
:
5813 charm_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
5814 charm_type
= charm_deployed
.get("type")
5817 ee_id
= charm_deployed
.get("ee_id")
5819 step
= "Getting descriptor config"
5820 descriptor_config
= get_configuration(
5821 current_vnfd
, current_vnfd
["id"]
5824 if "execution-environment-list" in descriptor_config
:
5825 ee_list
= descriptor_config
.get(
5826 "execution-environment-list", []
5831 # There could be several charm used in the same VNF
5832 for ee_item
in ee_list
:
5833 if ee_item
.get("juju"):
5835 step
= "Getting charm name"
5836 charm_name
= ee_item
["juju"].get("charm")
5838 step
= "Setting Charm artifact paths"
5839 current_charm_artifact_path
.append(
5840 get_charm_artifact_path(
5844 current_vnf_revision
,
5847 target_charm_artifact_path
.append(
5848 get_charm_artifact_path(
5852 latest_vnfd_revision
,
5856 charm_artifact_paths
= zip(
5857 current_charm_artifact_path
, target_charm_artifact_path
5860 step
= "Checking if software version has changed in VNFD"
5861 if find_software_version(current_vnfd
) != find_software_version(
5865 step
= "Checking if existing VNF has charm"
5866 for current_charm_path
, target_charm_path
in list(
5867 charm_artifact_paths
5869 if current_charm_path
:
5871 "Software version change is not supported as VNF instance {} has charm.".format(
5876 # There is no change in the charm package, then redeploy the VNF
5877 # based on new descriptor
5878 step
= "Redeploying VNF"
5879 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5883 ) = await self
._ns
_redeploy
_vnf
(
5890 if result
== "FAILED":
5891 nslcmop_operation_state
= result
5892 error_description_nslcmop
= detailed_status
5893 db_nslcmop_update
["detailed-status"] = detailed_status
5896 + " step {} Done with result {} {}".format(
5897 step
, nslcmop_operation_state
, detailed_status
5902 step
= "Checking if any charm package has changed or not"
5903 for current_charm_path
, target_charm_path
in list(
5904 charm_artifact_paths
5908 and target_charm_path
5909 and self
.check_charm_hash_changed(
5910 current_charm_path
, target_charm_path
5914 step
= "Checking whether VNF uses juju bundle"
5915 if check_juju_bundle_existence(current_vnfd
):
5918 "Charm upgrade is not supported for the instance which"
5919 " uses juju-bundle: {}".format(
5920 check_juju_bundle_existence(current_vnfd
)
5924 step
= "Upgrading Charm"
5928 ) = await self
._ns
_charm
_upgrade
(
5931 charm_type
=charm_type
,
5932 path
=self
.fs
.path
+ target_charm_path
,
5933 timeout
=timeout_seconds
,
5936 if result
== "FAILED":
5937 nslcmop_operation_state
= result
5938 error_description_nslcmop
= detailed_status
5940 db_nslcmop_update
["detailed-status"] = detailed_status
5943 + " step {} Done with result {} {}".format(
5944 step
, nslcmop_operation_state
, detailed_status
5948 step
= "Updating policies"
5949 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5950 result
= "COMPLETED"
5951 detailed_status
= "Done"
5952 db_nslcmop_update
["detailed-status"] = "Done"
5954 # If nslcmop_operation_state is None, so any operation is not failed.
5955 if not nslcmop_operation_state
:
5956 nslcmop_operation_state
= "COMPLETED"
5958 # If update CHANGE_VNFPKG nslcmop_operation is successful
5959 # vnf revision need to be updated
5960 vnfr_update
["revision"] = latest_vnfd_revision
5961 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
5965 + " task Done with result {} {}".format(
5966 nslcmop_operation_state
, detailed_status
5969 elif update_type
== "REMOVE_VNF":
5970 # This part is included in https://osm.etsi.org/gerrit/11876
5971 vnf_instance_id
= db_nslcmop
["operationParams"]["removeVnfInstanceId"]
5972 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_instance_id
})
5973 member_vnf_index
= db_vnfr
["member-vnf-index-ref"]
5974 step
= "Removing VNF"
5975 (result
, detailed_status
) = await self
.remove_vnf(nsr_id
, nslcmop_id
, vnf_instance_id
)
5976 if result
== "FAILED":
5977 nslcmop_operation_state
= result
5978 error_description_nslcmop
= detailed_status
5979 db_nslcmop_update
["detailed-status"] = detailed_status
5980 change_type
= "vnf_terminated"
5981 if not nslcmop_operation_state
:
5982 nslcmop_operation_state
= "COMPLETED"
5985 + " task Done with result {} {}".format(
5986 nslcmop_operation_state
, detailed_status
5990 elif update_type
== "OPERATE_VNF":
5991 vnf_id
= db_nslcmop
["operationParams"]["operateVnfData"]["vnfInstanceId"]
5992 operation_type
= db_nslcmop
["operationParams"]["operateVnfData"]["changeStateTo"]
5993 additional_param
= db_nslcmop
["operationParams"]["operateVnfData"]["additionalParam"]
5994 (result
, detailed_status
) = await self
.rebuild_start_stop(
5995 nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
5997 if result
== "FAILED":
5998 nslcmop_operation_state
= result
5999 error_description_nslcmop
= detailed_status
6000 db_nslcmop_update
["detailed-status"] = detailed_status
6001 if not nslcmop_operation_state
:
6002 nslcmop_operation_state
= "COMPLETED"
6005 + " task Done with result {} {}".format(
6006 nslcmop_operation_state
, detailed_status
6010 # If nslcmop_operation_state is None, so any operation is not failed.
6011 # All operations are executed in overall.
6012 if not nslcmop_operation_state
:
6013 nslcmop_operation_state
= "COMPLETED"
6014 db_nsr_update
["operational-status"] = old_operational_status
6016 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
6017 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
6019 except asyncio
.CancelledError
:
6021 logging_text
+ "Cancelled Exception while '{}'".format(step
)
6023 exc
= "Operation was cancelled"
6024 except asyncio
.TimeoutError
:
6025 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
6027 except Exception as e
:
6028 exc
= traceback
.format_exc()
6029 self
.logger
.critical(
6030 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
6039 ) = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
6040 nslcmop_operation_state
= "FAILED"
6041 db_nsr_update
["operational-status"] = old_operational_status
6043 self
._write
_ns
_status
(
6045 ns_state
=db_nsr
["nsState"],
6046 current_operation
="IDLE",
6047 current_operation_id
=None,
6048 other_update
=db_nsr_update
,
6051 self
._write
_op
_status
(
6054 error_message
=error_description_nslcmop
,
6055 operation_state
=nslcmop_operation_state
,
6056 other_update
=db_nslcmop_update
,
6059 if nslcmop_operation_state
:
6063 "nslcmop_id": nslcmop_id
,
6064 "operationState": nslcmop_operation_state
,
6066 if change_type
in ("vnf_terminated", "policy_updated"):
6067 msg
.update({"vnf_member_index": member_vnf_index
})
6068 await self
.msg
.aiowrite("ns", change_type
, msg
, loop
=self
.loop
)
6069 except Exception as e
:
6071 logging_text
+ "kafka_write notification Exception {}".format(e
)
6073 self
.logger
.debug(logging_text
+ "Exit")
6074 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_update")
6075 return nslcmop_operation_state
, detailed_status
6077 async def scale(self
, nsr_id
, nslcmop_id
):
6078 # Try to lock HA task here
6079 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
6080 if not task_is_locked_by_me
:
6083 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
6084 stage
= ["", "", ""]
6085 tasks_dict_info
= {}
6086 # ^ stage, step, VIM progress
6087 self
.logger
.debug(logging_text
+ "Enter")
6088 # get all needed from database
6090 db_nslcmop_update
= {}
6093 # in case of error, indicates what part of scale was failed to put nsr at error status
6094 scale_process
= None
6095 old_operational_status
= ""
6096 old_config_status
= ""
6099 # wait for any previous tasks in process
6100 step
= "Waiting for previous operations to terminate"
6101 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
6102 self
._write
_ns
_status
(
6105 current_operation
="SCALING",
6106 current_operation_id
=nslcmop_id
,
6109 step
= "Getting nslcmop from database"
6111 step
+ " after having waited for previous tasks to be completed"
6113 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
6115 step
= "Getting nsr from database"
6116 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
6117 old_operational_status
= db_nsr
["operational-status"]
6118 old_config_status
= db_nsr
["config-status"]
6120 step
= "Parsing scaling parameters"
6121 db_nsr_update
["operational-status"] = "scaling"
6122 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6123 nsr_deployed
= db_nsr
["_admin"].get("deployed")
6125 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"][
6127 ]["member-vnf-index"]
6128 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"][
6130 ]["scaling-group-descriptor"]
6131 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
6132 # for backward compatibility
6133 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
6134 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
6135 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
6136 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6138 step
= "Getting vnfr from database"
6139 db_vnfr
= self
.db
.get_one(
6140 "vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
}
6143 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
6145 step
= "Getting vnfd from database"
6146 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
6148 base_folder
= db_vnfd
["_admin"]["storage"]
6150 step
= "Getting scaling-group-descriptor"
6151 scaling_descriptor
= find_in_list(
6152 get_scaling_aspect(db_vnfd
),
6153 lambda scale_desc
: scale_desc
["name"] == scaling_group
,
6155 if not scaling_descriptor
:
6157 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6158 "at vnfd:scaling-group-descriptor".format(scaling_group
)
6161 step
= "Sending scale order to VIM"
6162 # TODO check if ns is in a proper status
6164 if not db_nsr
["_admin"].get("scaling-group"):
6169 "_admin.scaling-group": [
6170 {"name": scaling_group
, "nb-scale-op": 0}
6174 admin_scale_index
= 0
6176 for admin_scale_index
, admin_scale_info
in enumerate(
6177 db_nsr
["_admin"]["scaling-group"]
6179 if admin_scale_info
["name"] == scaling_group
:
6180 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
6182 else: # not found, set index one plus last element and add new entry with the name
6183 admin_scale_index
+= 1
6185 "_admin.scaling-group.{}.name".format(admin_scale_index
)
6188 vca_scaling_info
= []
6189 scaling_info
= {"scaling_group_name": scaling_group
, "vdu": [], "kdu": []}
6190 if scaling_type
== "SCALE_OUT":
6191 if "aspect-delta-details" not in scaling_descriptor
:
6193 "Aspect delta details not fount in scaling descriptor {}".format(
6194 scaling_descriptor
["name"]
6197 # count if max-instance-count is reached
6198 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6200 scaling_info
["scaling_direction"] = "OUT"
6201 scaling_info
["vdu-create"] = {}
6202 scaling_info
["kdu-create"] = {}
6203 for delta
in deltas
:
6204 for vdu_delta
in delta
.get("vdu-delta", {}):
6205 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
6206 # vdu_index also provides the number of instance of the targeted vdu
6207 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6208 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(
6212 additional_params
= (
6213 self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"])
6216 cloud_init_list
= []
6218 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6219 max_instance_count
= 10
6220 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
6221 max_instance_count
= vdu_profile
.get(
6222 "max-number-of-instances", 10
6225 default_instance_num
= get_number_of_instances(
6228 instances_number
= vdu_delta
.get("number-of-instances", 1)
6229 nb_scale_op
+= instances_number
6231 new_instance_count
= nb_scale_op
+ default_instance_num
6232 # Control if new count is over max and vdu count is less than max.
6233 # Then assign new instance count
6234 if new_instance_count
> max_instance_count
> vdu_count
:
6235 instances_number
= new_instance_count
- max_instance_count
6237 instances_number
= instances_number
6239 if new_instance_count
> max_instance_count
:
6241 "reached the limit of {} (max-instance-count) "
6242 "scaling-out operations for the "
6243 "scaling-group-descriptor '{}'".format(
6244 nb_scale_op
, scaling_group
6247 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6249 # TODO Information of its own ip is not available because db_vnfr is not updated.
6250 additional_params
["OSM"] = get_osm_params(
6251 db_vnfr
, vdu_delta
["id"], vdu_index
+ x
6253 cloud_init_list
.append(
6254 self
._parse
_cloud
_init
(
6261 vca_scaling_info
.append(
6263 "osm_vdu_id": vdu_delta
["id"],
6264 "member-vnf-index": vnf_index
,
6266 "vdu_index": vdu_index
+ x
,
6269 scaling_info
["vdu-create"][vdu_delta
["id"]] = instances_number
6270 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6271 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6272 kdu_name
= kdu_profile
["kdu-name"]
6273 resource_name
= kdu_profile
.get("resource-name", "")
6275 # Might have different kdus in the same delta
6276 # Should have list for each kdu
6277 if not scaling_info
["kdu-create"].get(kdu_name
, None):
6278 scaling_info
["kdu-create"][kdu_name
] = []
6280 kdur
= get_kdur(db_vnfr
, kdu_name
)
6281 if kdur
.get("helm-chart"):
6282 k8s_cluster_type
= "helm-chart-v3"
6283 self
.logger
.debug("kdur: {}".format(kdur
))
6285 kdur
.get("helm-version")
6286 and kdur
.get("helm-version") == "v2"
6288 k8s_cluster_type
= "helm-chart"
6289 elif kdur
.get("juju-bundle"):
6290 k8s_cluster_type
= "juju-bundle"
6293 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6294 "juju-bundle. Maybe an old NBI version is running".format(
6295 db_vnfr
["member-vnf-index-ref"], kdu_name
6299 max_instance_count
= 10
6300 if kdu_profile
and "max-number-of-instances" in kdu_profile
:
6301 max_instance_count
= kdu_profile
.get(
6302 "max-number-of-instances", 10
6305 nb_scale_op
+= kdu_delta
.get("number-of-instances", 1)
6306 deployed_kdu
, _
= get_deployed_kdu(
6307 nsr_deployed
, kdu_name
, vnf_index
6309 if deployed_kdu
is None:
6311 "KDU '{}' for vnf '{}' not deployed".format(
6315 kdu_instance
= deployed_kdu
.get("kdu-instance")
6316 instance_num
= await self
.k8scluster_map
[
6322 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6323 kdu_model
=deployed_kdu
.get("kdu-model"),
6325 kdu_replica_count
= instance_num
+ kdu_delta
.get(
6326 "number-of-instances", 1
6329 # Control if new count is over max and instance_num is less than max.
6330 # Then assign max instance number to kdu replica count
6331 if kdu_replica_count
> max_instance_count
> instance_num
:
6332 kdu_replica_count
= max_instance_count
6333 if kdu_replica_count
> max_instance_count
:
6335 "reached the limit of {} (max-instance-count) "
6336 "scaling-out operations for the "
6337 "scaling-group-descriptor '{}'".format(
6338 instance_num
, scaling_group
6342 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6343 vca_scaling_info
.append(
6345 "osm_kdu_id": kdu_name
,
6346 "member-vnf-index": vnf_index
,
6348 "kdu_index": instance_num
+ x
- 1,
6351 scaling_info
["kdu-create"][kdu_name
].append(
6353 "member-vnf-index": vnf_index
,
6355 "k8s-cluster-type": k8s_cluster_type
,
6356 "resource-name": resource_name
,
6357 "scale": kdu_replica_count
,
6360 elif scaling_type
== "SCALE_IN":
6361 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
6363 scaling_info
["scaling_direction"] = "IN"
6364 scaling_info
["vdu-delete"] = {}
6365 scaling_info
["kdu-delete"] = {}
6367 for delta
in deltas
:
6368 for vdu_delta
in delta
.get("vdu-delta", {}):
6369 vdu_count
= vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
6370 min_instance_count
= 0
6371 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
6372 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
6373 min_instance_count
= vdu_profile
["min-number-of-instances"]
6375 default_instance_num
= get_number_of_instances(
6376 db_vnfd
, vdu_delta
["id"]
6378 instance_num
= vdu_delta
.get("number-of-instances", 1)
6379 nb_scale_op
-= instance_num
6381 new_instance_count
= nb_scale_op
+ default_instance_num
6383 if new_instance_count
< min_instance_count
< vdu_count
:
6384 instances_number
= min_instance_count
- new_instance_count
6386 instances_number
= instance_num
6388 if new_instance_count
< min_instance_count
:
6390 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6391 "scaling-group-descriptor '{}'".format(
6392 nb_scale_op
, scaling_group
6395 for x
in range(vdu_delta
.get("number-of-instances", 1)):
6396 vca_scaling_info
.append(
6398 "osm_vdu_id": vdu_delta
["id"],
6399 "member-vnf-index": vnf_index
,
6401 "vdu_index": vdu_index
- 1 - x
,
6404 scaling_info
["vdu-delete"][vdu_delta
["id"]] = instances_number
6405 for kdu_delta
in delta
.get("kdu-resource-delta", {}):
6406 kdu_profile
= get_kdu_resource_profile(db_vnfd
, kdu_delta
["id"])
6407 kdu_name
= kdu_profile
["kdu-name"]
6408 resource_name
= kdu_profile
.get("resource-name", "")
6410 if not scaling_info
["kdu-delete"].get(kdu_name
, None):
6411 scaling_info
["kdu-delete"][kdu_name
] = []
6413 kdur
= get_kdur(db_vnfr
, kdu_name
)
6414 if kdur
.get("helm-chart"):
6415 k8s_cluster_type
= "helm-chart-v3"
6416 self
.logger
.debug("kdur: {}".format(kdur
))
6418 kdur
.get("helm-version")
6419 and kdur
.get("helm-version") == "v2"
6421 k8s_cluster_type
= "helm-chart"
6422 elif kdur
.get("juju-bundle"):
6423 k8s_cluster_type
= "juju-bundle"
6426 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6427 "juju-bundle. Maybe an old NBI version is running".format(
6428 db_vnfr
["member-vnf-index-ref"], kdur
["kdu-name"]
6432 min_instance_count
= 0
6433 if kdu_profile
and "min-number-of-instances" in kdu_profile
:
6434 min_instance_count
= kdu_profile
["min-number-of-instances"]
6436 nb_scale_op
-= kdu_delta
.get("number-of-instances", 1)
6437 deployed_kdu
, _
= get_deployed_kdu(
6438 nsr_deployed
, kdu_name
, vnf_index
6440 if deployed_kdu
is None:
6442 "KDU '{}' for vnf '{}' not deployed".format(
6446 kdu_instance
= deployed_kdu
.get("kdu-instance")
6447 instance_num
= await self
.k8scluster_map
[
6453 cluster_uuid
=deployed_kdu
.get("k8scluster-uuid"),
6454 kdu_model
=deployed_kdu
.get("kdu-model"),
6456 kdu_replica_count
= instance_num
- kdu_delta
.get(
6457 "number-of-instances", 1
6460 if kdu_replica_count
< min_instance_count
< instance_num
:
6461 kdu_replica_count
= min_instance_count
6462 if kdu_replica_count
< min_instance_count
:
6464 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6465 "scaling-group-descriptor '{}'".format(
6466 instance_num
, scaling_group
6470 for x
in range(kdu_delta
.get("number-of-instances", 1)):
6471 vca_scaling_info
.append(
6473 "osm_kdu_id": kdu_name
,
6474 "member-vnf-index": vnf_index
,
6476 "kdu_index": instance_num
- x
- 1,
6479 scaling_info
["kdu-delete"][kdu_name
].append(
6481 "member-vnf-index": vnf_index
,
6483 "k8s-cluster-type": k8s_cluster_type
,
6484 "resource-name": resource_name
,
6485 "scale": kdu_replica_count
,
6489 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6490 vdu_delete
= copy(scaling_info
.get("vdu-delete"))
6491 if scaling_info
["scaling_direction"] == "IN":
6492 for vdur
in reversed(db_vnfr
["vdur"]):
6493 if vdu_delete
.get(vdur
["vdu-id-ref"]):
6494 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
6495 scaling_info
["vdu"].append(
6497 "name": vdur
.get("name") or vdur
.get("vdu-name"),
6498 "vdu_id": vdur
["vdu-id-ref"],
6502 for interface
in vdur
["interfaces"]:
6503 scaling_info
["vdu"][-1]["interface"].append(
6505 "name": interface
["name"],
6506 "ip_address": interface
["ip-address"],
6507 "mac_address": interface
.get("mac-address"),
6510 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6513 step
= "Executing pre-scale vnf-config-primitive"
6514 if scaling_descriptor
.get("scaling-config-action"):
6515 for scaling_config_action
in scaling_descriptor
[
6516 "scaling-config-action"
6519 scaling_config_action
.get("trigger") == "pre-scale-in"
6520 and scaling_type
== "SCALE_IN"
6522 scaling_config_action
.get("trigger") == "pre-scale-out"
6523 and scaling_type
== "SCALE_OUT"
6525 vnf_config_primitive
= scaling_config_action
[
6526 "vnf-config-primitive-name-ref"
6528 step
= db_nslcmop_update
[
6530 ] = "executing pre-scale scaling-config-action '{}'".format(
6531 vnf_config_primitive
6534 # look for primitive
6535 for config_primitive
in (
6536 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6537 ).get("config-primitive", ()):
6538 if config_primitive
["name"] == vnf_config_primitive
:
6542 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6543 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6544 "primitive".format(scaling_group
, vnf_config_primitive
)
6547 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6548 if db_vnfr
.get("additionalParamsForVnf"):
6549 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6551 scale_process
= "VCA"
6552 db_nsr_update
["config-status"] = "configuring pre-scaling"
6553 primitive_params
= self
._map
_primitive
_params
(
6554 config_primitive
, {}, vnfr_params
6557 # Pre-scale retry check: Check if this sub-operation has been executed before
6558 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6561 vnf_config_primitive
,
6565 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6566 # Skip sub-operation
6567 result
= "COMPLETED"
6568 result_detail
= "Done"
6571 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6572 vnf_config_primitive
, result
, result_detail
6576 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6577 # New sub-operation: Get index of this sub-operation
6579 len(db_nslcmop
.get("_admin", {}).get("operations"))
6584 + "vnf_config_primitive={} New sub-operation".format(
6585 vnf_config_primitive
6589 # retry: Get registered params for this existing sub-operation
6590 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6593 vnf_index
= op
.get("member_vnf_index")
6594 vnf_config_primitive
= op
.get("primitive")
6595 primitive_params
= op
.get("primitive_params")
6598 + "vnf_config_primitive={} Sub-operation retry".format(
6599 vnf_config_primitive
6602 # Execute the primitive, either with new (first-time) or registered (reintent) args
6603 ee_descriptor_id
= config_primitive
.get(
6604 "execution-environment-ref"
6606 primitive_name
= config_primitive
.get(
6607 "execution-environment-primitive", vnf_config_primitive
6609 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6610 nsr_deployed
["VCA"],
6611 member_vnf_index
=vnf_index
,
6613 vdu_count_index
=None,
6614 ee_descriptor_id
=ee_descriptor_id
,
6616 result
, result_detail
= await self
._ns
_execute
_primitive
(
6625 + "vnf_config_primitive={} Done with result {} {}".format(
6626 vnf_config_primitive
, result
, result_detail
6629 # Update operationState = COMPLETED | FAILED
6630 self
._update
_suboperation
_status
(
6631 db_nslcmop
, op_index
, result
, result_detail
6634 if result
== "FAILED":
6635 raise LcmException(result_detail
)
6636 db_nsr_update
["config-status"] = old_config_status
6637 scale_process
= None
6641 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)
6644 "_admin.scaling-group.{}.time".format(admin_scale_index
)
6647 # SCALE-IN VCA - BEGIN
6648 if vca_scaling_info
:
6649 step
= db_nslcmop_update
[
6651 ] = "Deleting the execution environments"
6652 scale_process
= "VCA"
6653 for vca_info
in vca_scaling_info
:
6654 if vca_info
["type"] == "delete" and not vca_info
.get("osm_kdu_id"):
6655 member_vnf_index
= str(vca_info
["member-vnf-index"])
6657 logging_text
+ "vdu info: {}".format(vca_info
)
6659 if vca_info
.get("osm_vdu_id"):
6660 vdu_id
= vca_info
["osm_vdu_id"]
6661 vdu_index
= int(vca_info
["vdu_index"])
6664 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6665 member_vnf_index
, vdu_id
, vdu_index
6667 stage
[2] = step
= "Scaling in VCA"
6668 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6669 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
6670 config_update
= db_nsr
["configurationStatus"]
6671 for vca_index
, vca
in enumerate(vca_update
):
6673 (vca
or vca
.get("ee_id"))
6674 and vca
["member-vnf-index"] == member_vnf_index
6675 and vca
["vdu_count_index"] == vdu_index
6677 if vca
.get("vdu_id"):
6678 config_descriptor
= get_configuration(
6679 db_vnfd
, vca
.get("vdu_id")
6681 elif vca
.get("kdu_name"):
6682 config_descriptor
= get_configuration(
6683 db_vnfd
, vca
.get("kdu_name")
6686 config_descriptor
= get_configuration(
6687 db_vnfd
, db_vnfd
["id"]
6689 operation_params
= (
6690 db_nslcmop
.get("operationParams") or {}
6692 exec_terminate_primitives
= not operation_params
.get(
6693 "skip_terminate_primitives"
6694 ) and vca
.get("needed_terminate")
6695 task
= asyncio
.ensure_future(
6704 exec_primitives
=exec_terminate_primitives
,
6708 timeout
=self
.timeout_charm_delete
,
6711 tasks_dict_info
[task
] = "Terminating VCA {}".format(
6714 del vca_update
[vca_index
]
6715 del config_update
[vca_index
]
6716 # wait for pending tasks of terminate primitives
6720 + "Waiting for tasks {}".format(
6721 list(tasks_dict_info
.keys())
6724 error_list
= await self
._wait
_for
_tasks
(
6728 self
.timeout_charm_delete
, self
.timeout_ns_terminate
6733 tasks_dict_info
.clear()
6735 raise LcmException("; ".join(error_list
))
6737 db_vca_and_config_update
= {
6738 "_admin.deployed.VCA": vca_update
,
6739 "configurationStatus": config_update
,
6742 "nsrs", db_nsr
["_id"], db_vca_and_config_update
6744 scale_process
= None
6745 # SCALE-IN VCA - END
6748 if scaling_info
.get("vdu-create") or scaling_info
.get("vdu-delete"):
6749 scale_process
= "RO"
6750 if self
.ro_config
.get("ng"):
6751 await self
._scale
_ng
_ro
(
6752 logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, scaling_info
, stage
6754 scaling_info
.pop("vdu-create", None)
6755 scaling_info
.pop("vdu-delete", None)
6757 scale_process
= None
6761 if scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete"):
6762 scale_process
= "KDU"
6763 await self
._scale
_kdu
(
6764 logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
6766 scaling_info
.pop("kdu-create", None)
6767 scaling_info
.pop("kdu-delete", None)
6769 scale_process
= None
6773 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
6775 # SCALE-UP VCA - BEGIN
6776 if vca_scaling_info
:
6777 step
= db_nslcmop_update
[
6779 ] = "Creating new execution environments"
6780 scale_process
= "VCA"
6781 for vca_info
in vca_scaling_info
:
6782 if vca_info
["type"] == "create" and not vca_info
.get("osm_kdu_id"):
6783 member_vnf_index
= str(vca_info
["member-vnf-index"])
6785 logging_text
+ "vdu info: {}".format(vca_info
)
6787 vnfd_id
= db_vnfr
["vnfd-ref"]
6788 if vca_info
.get("osm_vdu_id"):
6789 vdu_index
= int(vca_info
["vdu_index"])
6790 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
6791 if db_vnfr
.get("additionalParamsForVnf"):
6792 deploy_params
.update(
6794 db_vnfr
["additionalParamsForVnf"].copy()
6797 descriptor_config
= get_configuration(
6798 db_vnfd
, db_vnfd
["id"]
6800 if descriptor_config
:
6805 logging_text
=logging_text
6806 + "member_vnf_index={} ".format(member_vnf_index
),
6809 nslcmop_id
=nslcmop_id
,
6815 member_vnf_index
=member_vnf_index
,
6816 vdu_index
=vdu_index
,
6818 deploy_params
=deploy_params
,
6819 descriptor_config
=descriptor_config
,
6820 base_folder
=base_folder
,
6821 task_instantiation_info
=tasks_dict_info
,
6824 vdu_id
= vca_info
["osm_vdu_id"]
6825 vdur
= find_in_list(
6826 db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
6828 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
6829 if vdur
.get("additionalParams"):
6830 deploy_params_vdu
= parse_yaml_strings(
6831 vdur
["additionalParams"]
6834 deploy_params_vdu
= deploy_params
6835 deploy_params_vdu
["OSM"] = get_osm_params(
6836 db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
6838 if descriptor_config
:
6843 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6844 member_vnf_index
, vdu_id
, vdu_index
6846 stage
[2] = step
= "Scaling out VCA"
6847 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
6849 logging_text
=logging_text
6850 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6851 member_vnf_index
, vdu_id
, vdu_index
6855 nslcmop_id
=nslcmop_id
,
6861 member_vnf_index
=member_vnf_index
,
6862 vdu_index
=vdu_index
,
6864 deploy_params
=deploy_params_vdu
,
6865 descriptor_config
=descriptor_config
,
6866 base_folder
=base_folder
,
6867 task_instantiation_info
=tasks_dict_info
,
6870 # SCALE-UP VCA - END
6871 scale_process
= None
6874 # execute primitive service POST-SCALING
6875 step
= "Executing post-scale vnf-config-primitive"
6876 if scaling_descriptor
.get("scaling-config-action"):
6877 for scaling_config_action
in scaling_descriptor
[
6878 "scaling-config-action"
6881 scaling_config_action
.get("trigger") == "post-scale-in"
6882 and scaling_type
== "SCALE_IN"
6884 scaling_config_action
.get("trigger") == "post-scale-out"
6885 and scaling_type
== "SCALE_OUT"
6887 vnf_config_primitive
= scaling_config_action
[
6888 "vnf-config-primitive-name-ref"
6890 step
= db_nslcmop_update
[
6892 ] = "executing post-scale scaling-config-action '{}'".format(
6893 vnf_config_primitive
6896 vnfr_params
= {"VDU_SCALE_INFO": scaling_info
}
6897 if db_vnfr
.get("additionalParamsForVnf"):
6898 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
6900 # look for primitive
6901 for config_primitive
in (
6902 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
6903 ).get("config-primitive", ()):
6904 if config_primitive
["name"] == vnf_config_primitive
:
6908 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6909 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6910 "config-primitive".format(
6911 scaling_group
, vnf_config_primitive
6914 scale_process
= "VCA"
6915 db_nsr_update
["config-status"] = "configuring post-scaling"
6916 primitive_params
= self
._map
_primitive
_params
(
6917 config_primitive
, {}, vnfr_params
6920 # Post-scale retry check: Check if this sub-operation has been executed before
6921 op_index
= self
._check
_or
_add
_scale
_suboperation
(
6924 vnf_config_primitive
,
6928 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
6929 # Skip sub-operation
6930 result
= "COMPLETED"
6931 result_detail
= "Done"
6934 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6935 vnf_config_primitive
, result
, result_detail
6939 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
6940 # New sub-operation: Get index of this sub-operation
6942 len(db_nslcmop
.get("_admin", {}).get("operations"))
6947 + "vnf_config_primitive={} New sub-operation".format(
6948 vnf_config_primitive
6952 # retry: Get registered params for this existing sub-operation
6953 op
= db_nslcmop
.get("_admin", {}).get("operations", [])[
6956 vnf_index
= op
.get("member_vnf_index")
6957 vnf_config_primitive
= op
.get("primitive")
6958 primitive_params
= op
.get("primitive_params")
6961 + "vnf_config_primitive={} Sub-operation retry".format(
6962 vnf_config_primitive
6965 # Execute the primitive, either with new (first-time) or registered (reintent) args
6966 ee_descriptor_id
= config_primitive
.get(
6967 "execution-environment-ref"
6969 primitive_name
= config_primitive
.get(
6970 "execution-environment-primitive", vnf_config_primitive
6972 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(
6973 nsr_deployed
["VCA"],
6974 member_vnf_index
=vnf_index
,
6976 vdu_count_index
=None,
6977 ee_descriptor_id
=ee_descriptor_id
,
6979 result
, result_detail
= await self
._ns
_execute
_primitive
(
6988 + "vnf_config_primitive={} Done with result {} {}".format(
6989 vnf_config_primitive
, result
, result_detail
6992 # Update operationState = COMPLETED | FAILED
6993 self
._update
_suboperation
_status
(
6994 db_nslcmop
, op_index
, result
, result_detail
6997 if result
== "FAILED":
6998 raise LcmException(result_detail
)
6999 db_nsr_update
["config-status"] = old_config_status
7000 scale_process
= None
7005 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
7006 db_nsr_update
["operational-status"] = (
7008 if old_operational_status
== "failed"
7009 else old_operational_status
7011 db_nsr_update
["config-status"] = old_config_status
7014 ROclient
.ROClientException
,
7019 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7021 except asyncio
.CancelledError
:
7023 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7025 exc
= "Operation was cancelled"
7026 except Exception as e
:
7027 exc
= traceback
.format_exc()
7028 self
.logger
.critical(
7029 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7033 self
._write
_ns
_status
(
7036 current_operation
="IDLE",
7037 current_operation_id
=None,
7040 stage
[1] = "Waiting for instantiate pending tasks."
7041 self
.logger
.debug(logging_text
+ stage
[1])
7042 exc
= await self
._wait
_for
_tasks
(
7045 self
.timeout_ns_deploy
,
7053 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7054 nslcmop_operation_state
= "FAILED"
7056 db_nsr_update
["operational-status"] = old_operational_status
7057 db_nsr_update
["config-status"] = old_config_status
7058 db_nsr_update
["detailed-status"] = ""
7060 if "VCA" in scale_process
:
7061 db_nsr_update
["config-status"] = "failed"
7062 if "RO" in scale_process
:
7063 db_nsr_update
["operational-status"] = "failed"
7066 ] = "FAILED scaling nslcmop={} {}: {}".format(
7067 nslcmop_id
, step
, exc
7070 error_description_nslcmop
= None
7071 nslcmop_operation_state
= "COMPLETED"
7072 db_nslcmop_update
["detailed-status"] = "Done"
7074 self
._write
_op
_status
(
7077 error_message
=error_description_nslcmop
,
7078 operation_state
=nslcmop_operation_state
,
7079 other_update
=db_nslcmop_update
,
7082 self
._write
_ns
_status
(
7085 current_operation
="IDLE",
7086 current_operation_id
=None,
7087 other_update
=db_nsr_update
,
7090 if nslcmop_operation_state
:
7094 "nslcmop_id": nslcmop_id
,
7095 "operationState": nslcmop_operation_state
,
7097 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
7098 except Exception as e
:
7100 logging_text
+ "kafka_write notification Exception {}".format(e
)
7102 self
.logger
.debug(logging_text
+ "Exit")
7103 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
7105 async def _scale_kdu(
7106 self
, logging_text
, nsr_id
, nsr_deployed
, db_vnfd
, vca_id
, scaling_info
7108 _scaling_info
= scaling_info
.get("kdu-create") or scaling_info
.get("kdu-delete")
7109 for kdu_name
in _scaling_info
:
7110 for kdu_scaling_info
in _scaling_info
[kdu_name
]:
7111 deployed_kdu
, index
= get_deployed_kdu(
7112 nsr_deployed
, kdu_name
, kdu_scaling_info
["member-vnf-index"]
7114 cluster_uuid
= deployed_kdu
["k8scluster-uuid"]
7115 kdu_instance
= deployed_kdu
["kdu-instance"]
7116 kdu_model
= deployed_kdu
.get("kdu-model")
7117 scale
= int(kdu_scaling_info
["scale"])
7118 k8s_cluster_type
= kdu_scaling_info
["k8s-cluster-type"]
7121 "collection": "nsrs",
7122 "filter": {"_id": nsr_id
},
7123 "path": "_admin.deployed.K8s.{}".format(index
),
7126 step
= "scaling application {}".format(
7127 kdu_scaling_info
["resource-name"]
7129 self
.logger
.debug(logging_text
+ step
)
7131 if kdu_scaling_info
["type"] == "delete":
7132 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7135 and kdu_config
.get("terminate-config-primitive")
7136 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7138 terminate_config_primitive_list
= kdu_config
.get(
7139 "terminate-config-primitive"
7141 terminate_config_primitive_list
.sort(
7142 key
=lambda val
: int(val
["seq"])
7146 terminate_config_primitive
7147 ) in terminate_config_primitive_list
:
7148 primitive_params_
= self
._map
_primitive
_params
(
7149 terminate_config_primitive
, {}, {}
7151 step
= "execute terminate config primitive"
7152 self
.logger
.debug(logging_text
+ step
)
7153 await asyncio
.wait_for(
7154 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7155 cluster_uuid
=cluster_uuid
,
7156 kdu_instance
=kdu_instance
,
7157 primitive_name
=terminate_config_primitive
["name"],
7158 params
=primitive_params_
,
7165 await asyncio
.wait_for(
7166 self
.k8scluster_map
[k8s_cluster_type
].scale(
7169 kdu_scaling_info
["resource-name"],
7171 cluster_uuid
=cluster_uuid
,
7172 kdu_model
=kdu_model
,
7176 timeout
=self
.timeout_vca_on_error
,
7179 if kdu_scaling_info
["type"] == "create":
7180 kdu_config
= get_configuration(db_vnfd
, kdu_name
)
7183 and kdu_config
.get("initial-config-primitive")
7184 and get_juju_ee_ref(db_vnfd
, kdu_name
) is None
7186 initial_config_primitive_list
= kdu_config
.get(
7187 "initial-config-primitive"
7189 initial_config_primitive_list
.sort(
7190 key
=lambda val
: int(val
["seq"])
7193 for initial_config_primitive
in initial_config_primitive_list
:
7194 primitive_params_
= self
._map
_primitive
_params
(
7195 initial_config_primitive
, {}, {}
7197 step
= "execute initial config primitive"
7198 self
.logger
.debug(logging_text
+ step
)
7199 await asyncio
.wait_for(
7200 self
.k8scluster_map
[k8s_cluster_type
].exec_primitive(
7201 cluster_uuid
=cluster_uuid
,
7202 kdu_instance
=kdu_instance
,
7203 primitive_name
=initial_config_primitive
["name"],
7204 params
=primitive_params_
,
7211 async def _scale_ng_ro(
7212 self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
7214 nsr_id
= db_nslcmop
["nsInstanceId"]
7215 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7218 # read from db: vnfd's for every vnf
7221 # for each vnf in ns, read vnfd
7222 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
7223 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
7224 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
7225 # if we haven't this vnfd, read it from db
7226 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
7228 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7229 db_vnfds
.append(vnfd
)
7230 n2vc_key
= self
.n2vc
.get_public_key()
7231 n2vc_key_list
= [n2vc_key
]
7234 vdu_scaling_info
.get("vdu-create"),
7235 vdu_scaling_info
.get("vdu-delete"),
7238 # db_vnfr has been updated, update db_vnfrs to use it
7239 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
7240 await self
._instantiate
_ng
_ro
(
7250 start_deploy
=time(),
7251 timeout_ns_deploy
=self
.timeout_ns_deploy
,
7253 if vdu_scaling_info
.get("vdu-delete"):
7255 db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False
7258 async def extract_prometheus_scrape_jobs(
7259 self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
7261 # look if exist a file called 'prometheus*.j2' and
7262 artifact_content
= self
.fs
.dir_ls(artifact_path
)
7266 for f
in artifact_content
7267 if f
.startswith("prometheus") and f
.endswith(".j2")
7273 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
7277 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
7278 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
7280 vnfr_id
= vnfr_id
.replace("-", "")
7282 "JOB_NAME": vnfr_id
,
7283 "TARGET_IP": target_ip
,
7284 "EXPORTER_POD_IP": host_name
,
7285 "EXPORTER_POD_PORT": host_port
,
7287 job_list
= parse_job(job_data
, variables
)
7288 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7289 for job
in job_list
:
7291 not isinstance(job
.get("job_name"), str)
7292 or vnfr_id
not in job
["job_name"]
7294 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
7295 job
["nsr_id"] = nsr_id
7296 job
["vnfr_id"] = vnfr_id
7299 async def rebuild_start_stop(self
, nsr_id
, nslcmop_id
, vnf_id
, additional_param
, operation_type
):
7300 logging_text
= "Task ns={} {}={} ".format(nsr_id
, operation_type
, nslcmop_id
)
7301 self
.logger
.info(logging_text
+ "Enter")
7302 stage
= ["Preparing the environment", ""]
7303 # database nsrs record
7307 # in case of error, indicates what part of scale was failed to put nsr at error status
7308 start_deploy
= time()
7310 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnf_id
})
7311 vim_account_id
= db_vnfr
.get("vim-account-id")
7312 vim_info_key
= "vim:" + vim_account_id
7313 vdu_id
= additional_param
["vdu_id"]
7314 vdurs
= [item
for item
in db_vnfr
["vdur"] if item
["vdu-id-ref"] == vdu_id
]
7315 vdur
= find_in_list(
7316 vdurs
, lambda vdu
: vdu
["count-index"] == additional_param
["count-index"]
7319 vdu_vim_name
= vdur
["name"]
7320 vim_vm_id
= vdur
["vim_info"][vim_info_key
]["vim_id"]
7321 target_vim
, _
= next(k_v
for k_v
in vdur
["vim_info"].items())
7323 raise LcmException("Target vdu is not found")
7324 self
.logger
.info("vdu_vim_name >> {} ".format(vdu_vim_name
))
7325 # wait for any previous tasks in process
7326 stage
[1] = "Waiting for previous operations to terminate"
7327 self
.logger
.info(stage
[1])
7328 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
7330 stage
[1] = "Reading from database."
7331 self
.logger
.info(stage
[1])
7332 self
._write
_ns
_status
(
7335 current_operation
=operation_type
.upper(),
7336 current_operation_id
=nslcmop_id
7338 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7341 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
7342 db_nsr_update
["operational-status"] = operation_type
7343 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7347 "vim_vm_id": vim_vm_id
,
7349 "vdu_index": additional_param
["count-index"],
7350 "vdu_id": vdur
["id"],
7351 "target_vim": target_vim
,
7352 "vim_account_id": vim_account_id
7355 stage
[1] = "Sending rebuild request to RO... {}".format(desc
)
7356 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
, queuePosition
=0)
7357 self
.logger
.info("ro nsr id: {}".format(nsr_id
))
7358 result_dict
= await self
.RO
.operate(nsr_id
, desc
, operation_type
)
7359 self
.logger
.info("response from RO: {}".format(result_dict
))
7360 action_id
= result_dict
["action_id"]
7361 await self
._wait
_ng
_ro
(
7362 nsr_id
, action_id
, nslcmop_id
, start_deploy
,
7363 self
.timeout_operate
, None, "start_stop_rebuild",
7365 return "COMPLETED", "Done"
7366 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7367 self
.logger
.error("Exit Exception {}".format(e
))
7369 except asyncio
.CancelledError
:
7370 self
.logger
.error("Cancelled Exception while '{}'".format(stage
))
7371 exc
= "Operation was cancelled"
7372 except Exception as e
:
7373 exc
= traceback
.format_exc()
7374 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
7375 return "FAILED", "Error in operate VNF {}".format(exc
)
7377 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7379 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7381 :param: vim_account_id: VIM Account ID
7383 :return: (cloud_name, cloud_credential)
7385 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7386 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
7388 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
7390 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7392 :param: vim_account_id: VIM Account ID
7394 :return: (cloud_name, cloud_credential)
7396 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
7397 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
7399 async def migrate(self
, nsr_id
, nslcmop_id
):
7401 Migrate VNFs and VDUs instances in a NS
7403 :param: nsr_id: NS Instance ID
7404 :param: nslcmop_id: nslcmop ID of migrate
7407 # Try to lock HA task here
7408 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7409 if not task_is_locked_by_me
:
7411 logging_text
= "Task ns={} migrate ".format(nsr_id
)
7412 self
.logger
.debug(logging_text
+ "Enter")
7413 # get all needed from database
7415 db_nslcmop_update
= {}
7416 nslcmop_operation_state
= None
7420 # in case of error, indicates what part of scale was failed to put nsr at error status
7421 start_deploy
= time()
7424 # wait for any previous tasks in process
7425 step
= "Waiting for previous operations to terminate"
7426 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7428 self
._write
_ns
_status
(
7431 current_operation
="MIGRATING",
7432 current_operation_id
=nslcmop_id
,
7434 step
= "Getting nslcmop from database"
7436 step
+ " after having waited for previous tasks to be completed"
7438 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7439 migrate_params
= db_nslcmop
.get("operationParams")
7442 target
.update(migrate_params
)
7443 desc
= await self
.RO
.migrate(nsr_id
, target
)
7444 self
.logger
.debug("RO return > {}".format(desc
))
7445 action_id
= desc
["action_id"]
7446 await self
._wait
_ng
_ro
(
7447 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_migrate
,
7450 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
7451 self
.logger
.error("Exit Exception {}".format(e
))
7453 except asyncio
.CancelledError
:
7454 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
7455 exc
= "Operation was cancelled"
7456 except Exception as e
:
7457 exc
= traceback
.format_exc()
7458 self
.logger
.critical(
7459 "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True
7462 self
._write
_ns
_status
(
7465 current_operation
="IDLE",
7466 current_operation_id
=None,
7469 db_nslcmop_update
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
7470 nslcmop_operation_state
= "FAILED"
7472 nslcmop_operation_state
= "COMPLETED"
7473 db_nslcmop_update
["detailed-status"] = "Done"
7474 db_nsr_update
["detailed-status"] = "Done"
7476 self
._write
_op
_status
(
7480 operation_state
=nslcmop_operation_state
,
7481 other_update
=db_nslcmop_update
,
7483 if nslcmop_operation_state
:
7487 "nslcmop_id": nslcmop_id
,
7488 "operationState": nslcmop_operation_state
,
7490 await self
.msg
.aiowrite("ns", "migrated", msg
, loop
=self
.loop
)
7491 except Exception as e
:
7493 logging_text
+ "kafka_write notification Exception {}".format(e
)
7495 self
.logger
.debug(logging_text
+ "Exit")
7496 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_migrate")
7499 async def heal(self
, nsr_id
, nslcmop_id
):
7503 :param nsr_id: ns instance to heal
7504 :param nslcmop_id: operation to run
7508 # Try to lock HA task here
7509 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
7510 if not task_is_locked_by_me
:
7513 logging_text
= "Task ns={} heal={} ".format(nsr_id
, nslcmop_id
)
7514 stage
= ["", "", ""]
7515 tasks_dict_info
= {}
7516 # ^ stage, step, VIM progress
7517 self
.logger
.debug(logging_text
+ "Enter")
7518 # get all needed from database
7520 db_nslcmop_update
= {}
7522 db_vnfrs
= {} # vnf's info indexed by _id
7524 old_operational_status
= ""
7525 old_config_status
= ""
7528 # wait for any previous tasks in process
7529 step
= "Waiting for previous operations to terminate"
7530 await self
.lcm_tasks
.waitfor_related_HA("ns", "nslcmops", nslcmop_id
)
7531 self
._write
_ns
_status
(
7534 current_operation
="HEALING",
7535 current_operation_id
=nslcmop_id
,
7538 step
= "Getting nslcmop from database"
7540 step
+ " after having waited for previous tasks to be completed"
7542 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
7544 step
= "Getting nsr from database"
7545 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
7546 old_operational_status
= db_nsr
["operational-status"]
7547 old_config_status
= db_nsr
["config-status"]
7550 "_admin.deployed.RO.operational-status": "healing",
7552 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7554 step
= "Sending heal order to VIM"
7555 #task_ro = asyncio.ensure_future(
7557 # logging_text=logging_text,
7559 # db_nslcmop=db_nslcmop,
7563 #self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
7564 #tasks_dict_info[task_ro] = "Healing at VIM"
7566 logging_text
=logging_text
,
7568 db_nslcmop
=db_nslcmop
,
7573 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
7574 self
.logger
.debug(logging_text
+ stage
[1])
7575 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
7576 self
.fs
.sync(db_nsr
["nsd-id"])
7578 # read from db: vnfr's of this ns
7579 step
= "Getting vnfrs from db"
7580 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
7581 for vnfr
in db_vnfrs_list
:
7582 db_vnfrs
[vnfr
["_id"]] = vnfr
7583 self
.logger
.debug("ns.heal db_vnfrs={}".format(db_vnfrs
))
7585 # Check for each target VNF
7586 target_list
= db_nslcmop
.get("operationParams", {}).get("healVnfData", {})
7587 for target_vnf
in target_list
:
7588 # Find this VNF in the list from DB
7589 vnfr_id
= target_vnf
.get("vnfInstanceId", None)
7591 db_vnfr
= db_vnfrs
[vnfr_id
]
7592 vnfd_id
= db_vnfr
.get("vnfd-id")
7593 vnfd_ref
= db_vnfr
.get("vnfd-ref")
7594 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
7595 base_folder
= vnfd
["_admin"]["storage"]
7600 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
7601 member_vnf_index
= db_vnfr
.get("member-vnf-index-ref")
7603 # Check each target VDU and deploy N2VC
7604 target_vdu_list
= target_vnf
.get("additionalParams", {}).get("vdu", [])
7605 if not target_vdu_list
:
7606 # Codigo nuevo para crear diccionario
7607 target_vdu_list
= []
7608 for existing_vdu
in db_vnfr
.get("vdur"):
7609 vdu_name
= existing_vdu
.get("vdu-name", None)
7610 vdu_index
= existing_vdu
.get("count-index", 0)
7611 vdu_run_day1
= target_vnf
.get("additionalParams", {}).get("run-day1", False)
7612 vdu_to_be_healed
= {"vdu-id": vdu_name
, "count-index": vdu_index
, "run-day1": vdu_run_day1
}
7613 target_vdu_list
.append(vdu_to_be_healed
)
7614 for target_vdu
in target_vdu_list
:
7615 deploy_params_vdu
= target_vdu
7616 # Set run-day1 vnf level value if not vdu level value exists
7617 if not deploy_params_vdu
.get("run-day1") and target_vnf
["additionalParams"].get("run-day1"):
7618 deploy_params_vdu
["run-day1"] = target_vnf
["additionalParams"].get("run-day1")
7619 vdu_name
= target_vdu
.get("vdu-id", None)
7620 # TODO: Get vdu_id from vdud.
7622 # For multi instance VDU count-index is mandatory
7623 # For single session VDU count-indes is 0
7624 vdu_index
= target_vdu
.get("count-index",0)
7626 # n2vc_redesign STEP 3 to 6 Deploy N2VC
7627 stage
[1] = "Deploying Execution Environments."
7628 self
.logger
.debug(logging_text
+ stage
[1])
7630 # VNF Level charm. Normal case when proxy charms.
7631 # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
7632 descriptor_config
= get_configuration(vnfd
, vnfd_ref
)
7633 if descriptor_config
:
7634 # Continue if healed machine is management machine
7635 vnf_ip_address
= db_vnfr
.get("ip-address")
7636 target_instance
= None
7637 for instance
in db_vnfr
.get("vdur", None):
7638 if ( instance
["vdu-name"] == vdu_name
and instance
["count-index"] == vdu_index
):
7639 target_instance
= instance
7641 if vnf_ip_address
== target_instance
.get("ip-address"):
7643 logging_text
=logging_text
7644 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7645 member_vnf_index
, vdu_name
, vdu_index
7649 nslcmop_id
=nslcmop_id
,
7655 member_vnf_index
=member_vnf_index
,
7658 deploy_params
=deploy_params_vdu
,
7659 descriptor_config
=descriptor_config
,
7660 base_folder
=base_folder
,
7661 task_instantiation_info
=tasks_dict_info
,
7665 # VDU Level charm. Normal case with native charms.
7666 descriptor_config
= get_configuration(vnfd
, vdu_name
)
7667 if descriptor_config
:
7669 logging_text
=logging_text
7670 + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
7671 member_vnf_index
, vdu_name
, vdu_index
7675 nslcmop_id
=nslcmop_id
,
7681 member_vnf_index
=member_vnf_index
,
7682 vdu_index
=vdu_index
,
7684 deploy_params
=deploy_params_vdu
,
7685 descriptor_config
=descriptor_config
,
7686 base_folder
=base_folder
,
7687 task_instantiation_info
=tasks_dict_info
,
7692 ROclient
.ROClientException
,
7697 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
7699 except asyncio
.CancelledError
:
7701 logging_text
+ "Cancelled Exception while '{}'".format(step
)
7703 exc
= "Operation was cancelled"
7704 except Exception as e
:
7705 exc
= traceback
.format_exc()
7706 self
.logger
.critical(
7707 logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
),
7712 stage
[1] = "Waiting for healing pending tasks."
7713 self
.logger
.debug(logging_text
+ stage
[1])
7714 exc
= await self
._wait
_for
_tasks
(
7717 self
.timeout_ns_deploy
,
7725 ] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
7726 nslcmop_operation_state
= "FAILED"
7728 db_nsr_update
["operational-status"] = old_operational_status
7729 db_nsr_update
["config-status"] = old_config_status
7732 ] = "FAILED healing nslcmop={} {}: {}".format(
7733 nslcmop_id
, step
, exc
7735 for task
, task_name
in tasks_dict_info
.items():
7736 if not task
.done() or task
.cancelled() or task
.exception():
7737 if task_name
.startswith(self
.task_name_deploy_vca
):
7738 # A N2VC task is pending
7739 db_nsr_update
["config-status"] = "failed"
7741 # RO task is pending
7742 db_nsr_update
["operational-status"] = "failed"
7744 error_description_nslcmop
= None
7745 nslcmop_operation_state
= "COMPLETED"
7746 db_nslcmop_update
["detailed-status"] = "Done"
7747 db_nsr_update
["detailed-status"] = "Done"
7748 db_nsr_update
["operational-status"] = "running"
7749 db_nsr_update
["config-status"] = "configured"
7751 self
._write
_op
_status
(
7754 error_message
=error_description_nslcmop
,
7755 operation_state
=nslcmop_operation_state
,
7756 other_update
=db_nslcmop_update
,
7759 self
._write
_ns
_status
(
7762 current_operation
="IDLE",
7763 current_operation_id
=None,
7764 other_update
=db_nsr_update
,
7767 if nslcmop_operation_state
:
7771 "nslcmop_id": nslcmop_id
,
7772 "operationState": nslcmop_operation_state
,
7774 await self
.msg
.aiowrite("ns", "healed", msg
, loop
=self
.loop
)
7775 except Exception as e
:
7777 logging_text
+ "kafka_write notification Exception {}".format(e
)
7779 self
.logger
.debug(logging_text
+ "Exit")
7780 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_heal")
7791 :param logging_text: preffix text to use at logging
7792 :param nsr_id: nsr identity
7793 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
7794 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
7795 :return: None or exception
7797 def get_vim_account(vim_account_id
):
7799 if vim_account_id
in db_vims
:
7800 return db_vims
[vim_account_id
]
7801 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
7802 db_vims
[vim_account_id
] = db_vim
7807 ns_params
= db_nslcmop
.get("operationParams")
7808 if ns_params
and ns_params
.get("timeout_ns_heal"):
7809 timeout_ns_heal
= ns_params
["timeout_ns_heal"]
7811 timeout_ns_heal
= self
.timeout
.get(
7812 "ns_heal", self
.timeout_ns_heal
7817 nslcmop_id
= db_nslcmop
["_id"]
7819 "action_id": nslcmop_id
,
7821 self
.logger
.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop
,timeout_ns_heal
))
7822 target
.update(db_nslcmop
.get("operationParams", {}))
7824 self
.logger
.debug("Send to RO > nsr_id={} target={}".format(nsr_id
, target
))
7825 desc
= await self
.RO
.recreate(nsr_id
, target
)
7826 self
.logger
.debug("RO return > {}".format(desc
))
7827 action_id
= desc
["action_id"]
7828 # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
7829 await self
._wait
_ng
_ro
(
7830 nsr_id
, action_id
, nslcmop_id
, start_heal
, timeout_ns_heal
, stage
,
7836 "_admin.deployed.RO.operational-status": "running",
7837 "detailed-status": " ".join(stage
),
7839 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
7840 self
._write
_op
_status
(nslcmop_id
, stage
)
7842 logging_text
+ "ns healed at RO. RO_id={}".format(action_id
)
7845 except Exception as e
:
7846 stage
[2] = "ERROR healing at VIM"
7847 #self.set_vnfr_at_error(db_vnfrs, str(e))
7849 "Error healing at VIM {}".format(e
),
7850 exc_info
=not isinstance(
7853 ROclient
.ROClientException
,
7879 task_instantiation_info
,
7882 # launch instantiate_N2VC in a asyncio task and register task object
7883 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
7884 # if not found, create one entry and update database
7885 # fill db_nsr._admin.deployed.VCA.<index>
7888 logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
)
7890 if "execution-environment-list" in descriptor_config
:
7891 ee_list
= descriptor_config
.get("execution-environment-list", [])
7892 elif "juju" in descriptor_config
:
7893 ee_list
= [descriptor_config
] # ns charms
7894 else: # other types as script are not supported
7897 for ee_item
in ee_list
:
7900 + "_deploy_n2vc ee_item juju={}, helm={}".format(
7901 ee_item
.get("juju"), ee_item
.get("helm-chart")
7904 ee_descriptor_id
= ee_item
.get("id")
7905 if ee_item
.get("juju"):
7906 vca_name
= ee_item
["juju"].get("charm")
7909 if ee_item
["juju"].get("charm") is not None
7912 if ee_item
["juju"].get("cloud") == "k8s":
7913 vca_type
= "k8s_proxy_charm"
7914 elif ee_item
["juju"].get("proxy") is False:
7915 vca_type
= "native_charm"
7916 elif ee_item
.get("helm-chart"):
7917 vca_name
= ee_item
["helm-chart"]
7918 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
7921 vca_type
= "helm-v3"
7924 logging_text
+ "skipping non juju neither charm configuration"
7929 for vca_index
, vca_deployed
in enumerate(
7930 db_nsr
["_admin"]["deployed"]["VCA"]
7932 if not vca_deployed
:
7935 vca_deployed
.get("member-vnf-index") == member_vnf_index
7936 and vca_deployed
.get("vdu_id") == vdu_id
7937 and vca_deployed
.get("kdu_name") == kdu_name
7938 and vca_deployed
.get("vdu_count_index", 0) == vdu_index
7939 and vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
7943 # not found, create one.
7945 "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
7948 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
7950 target
+= "/kdu/{}".format(kdu_name
)
7952 "target_element": target
,
7953 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
7954 "member-vnf-index": member_vnf_index
,
7956 "kdu_name": kdu_name
,
7957 "vdu_count_index": vdu_index
,
7958 "operational-status": "init", # TODO revise
7959 "detailed-status": "", # TODO revise
7960 "step": "initial-deploy", # TODO revise
7962 "vdu_name": vdu_name
,
7964 "ee_descriptor_id": ee_descriptor_id
,
7968 # create VCA and configurationStatus in db
7970 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
7971 "configurationStatus.{}".format(vca_index
): dict(),
7973 self
.update_db_2("nsrs", nsr_id
, db_dict
)
7975 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
7977 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
7978 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
7979 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
7982 task_n2vc
= asyncio
.ensure_future(
7984 logging_text
=logging_text
,
7985 vca_index
=vca_index
,
7991 vdu_index
=vdu_index
,
7992 deploy_params
=deploy_params
,
7993 config_descriptor
=descriptor_config
,
7994 base_folder
=base_folder
,
7995 nslcmop_id
=nslcmop_id
,
7999 ee_config_descriptor
=ee_item
,
8002 self
.lcm_tasks
.register(
8006 "instantiate_N2VC-{}".format(vca_index
),
8009 task_instantiation_info
[
8011 ] = self
.task_name_deploy_vca
+ " {}.{}".format(
8012 member_vnf_index
or "", vdu_id
or ""
8015 async def heal_N2VC(
8032 ee_config_descriptor
,
8034 nsr_id
= db_nsr
["_id"]
8035 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
8036 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
8037 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
8038 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
8040 "collection": "nsrs",
8041 "filter": {"_id": nsr_id
},
8042 "path": db_update_entry
,
8048 element_under_configuration
= nsr_id
8052 vnfr_id
= db_vnfr
["_id"]
8053 osm_config
["osm"]["vnf_id"] = vnfr_id
8055 namespace
= "{nsi}.{ns}".format(nsi
=nsi_id
if nsi_id
else "", ns
=nsr_id
)
8057 if vca_type
== "native_charm":
8060 index_number
= vdu_index
or 0
8063 element_type
= "VNF"
8064 element_under_configuration
= vnfr_id
8065 namespace
+= ".{}-{}".format(vnfr_id
, index_number
)
8067 namespace
+= ".{}-{}".format(vdu_id
, index_number
)
8068 element_type
= "VDU"
8069 element_under_configuration
= "{}-{}".format(vdu_id
, index_number
)
8070 osm_config
["osm"]["vdu_id"] = vdu_id
8072 namespace
+= ".{}".format(kdu_name
)
8073 element_type
= "KDU"
8074 element_under_configuration
= kdu_name
8075 osm_config
["osm"]["kdu_name"] = kdu_name
8078 if base_folder
["pkg-dir"]:
8079 artifact_path
= "{}/{}/{}/{}".format(
8080 base_folder
["folder"],
8081 base_folder
["pkg-dir"],
8084 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8089 artifact_path
= "{}/Scripts/{}/{}/".format(
8090 base_folder
["folder"],
8093 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
8098 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
8100 # get initial_config_primitive_list that applies to this element
8101 initial_config_primitive_list
= config_descriptor
.get(
8102 "initial-config-primitive"
8106 "Initial config primitive list > {}".format(
8107 initial_config_primitive_list
8111 # add config if not present for NS charm
8112 ee_descriptor_id
= ee_config_descriptor
.get("id")
8113 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
8114 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(
8115 initial_config_primitive_list
, vca_deployed
, ee_descriptor_id
8119 "Initial config primitive list #2 > {}".format(
8120 initial_config_primitive_list
8123 # n2vc_redesign STEP 3.1
8124 # find old ee_id if exists
8125 ee_id
= vca_deployed
.get("ee_id")
8127 vca_id
= self
.get_vca_id(db_vnfr
, db_nsr
)
8128 # create or register execution environment in VCA. Only for native charms when healing
8129 if vca_type
== "native_charm":
8130 step
= "Waiting to VM being up and getting IP address"
8131 self
.logger
.debug(logging_text
+ step
)
8132 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8141 credentials
= {"hostname": rw_mgmt_ip
}
8143 username
= deep_get(
8144 config_descriptor
, ("config-access", "ssh-access", "default-user")
8146 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
8147 # merged. Meanwhile let's get username from initial-config-primitive
8148 if not username
and initial_config_primitive_list
:
8149 for config_primitive
in initial_config_primitive_list
:
8150 for param
in config_primitive
.get("parameter", ()):
8151 if param
["name"] == "ssh-username":
8152 username
= param
["value"]
8156 "Cannot determine the username neither with 'initial-config-primitive' nor with "
8157 "'config-access.ssh-access.default-user'"
8159 credentials
["username"] = username
8161 # n2vc_redesign STEP 3.2
8162 # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
8163 self
._write
_configuration
_status
(
8165 vca_index
=vca_index
,
8166 status
="REGISTERING",
8167 element_under_configuration
=element_under_configuration
,
8168 element_type
=element_type
,
8171 step
= "register execution environment {}".format(credentials
)
8172 self
.logger
.debug(logging_text
+ step
)
8173 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
8174 credentials
=credentials
,
8175 namespace
=namespace
,
8180 # update ee_id en db
8182 "_admin.deployed.VCA.{}.ee_id".format(vca_index
): ee_id
,
8184 self
.update_db_2("nsrs", nsr_id
, db_dict_ee_id
)
8186 # for compatibility with MON/POL modules, the need model and application name at database
8187 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
8188 # Not sure if this need to be done when healing
8190 ee_id_parts = ee_id.split(".")
8191 db_nsr_update = {db_update_entry + "ee_id": ee_id}
8192 if len(ee_id_parts) >= 2:
8193 model_name = ee_id_parts[0]
8194 application_name = ee_id_parts[1]
8195 db_nsr_update[db_update_entry + "model"] = model_name
8196 db_nsr_update[db_update_entry + "application"] = application_name
8199 # n2vc_redesign STEP 3.3
8200 # Install configuration software. Only for native charms.
8201 step
= "Install configuration Software"
8203 self
._write
_configuration
_status
(
8205 vca_index
=vca_index
,
8206 status
="INSTALLING SW",
8207 element_under_configuration
=element_under_configuration
,
8208 element_type
=element_type
,
8209 #other_update=db_nsr_update,
8213 # TODO check if already done
8214 self
.logger
.debug(logging_text
+ step
)
8216 if vca_type
== "native_charm":
8217 config_primitive
= next(
8218 (p
for p
in initial_config_primitive_list
if p
["name"] == "config"),
8221 if config_primitive
:
8222 config
= self
._map
_primitive
_params
(
8223 config_primitive
, {}, deploy_params
8225 await self
.vca_map
[vca_type
].install_configuration_sw(
8227 artifact_path
=artifact_path
,
8235 # write in db flag of configuration_sw already installed
8237 "nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True}
8240 # Not sure if this need to be done when healing
8242 # add relations for this VCA (wait for other peers related with this VCA)
8243 await self._add_vca_relations(
8244 logging_text=logging_text,
8247 vca_index=vca_index,
8251 # if SSH access is required, then get execution environment SSH public
8252 # if native charm we have waited already to VM be UP
8253 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
8256 # self.logger.debug("get ssh key block")
8258 config_descriptor
, ("config-access", "ssh-access", "required")
8260 # self.logger.debug("ssh key needed")
8261 # Needed to inject a ssh key
8264 ("config-access", "ssh-access", "default-user"),
8266 step
= "Install configuration Software, getting public ssh key"
8267 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(
8268 ee_id
=ee_id
, db_dict
=db_dict
, vca_id
=vca_id
8271 step
= "Insert public key into VM user={} ssh_key={}".format(
8275 # self.logger.debug("no need to get ssh key")
8276 step
= "Waiting to VM being up and getting IP address"
8277 self
.logger
.debug(logging_text
+ step
)
8279 # n2vc_redesign STEP 5.1
8280 # wait for RO (ip-address) Insert pub_key into VM
8281 # IMPORTANT: We need do wait for RO to complete healing operation.
8282 await self
._wait
_heal
_ro
(nsr_id
,self
.timeout_ns_heal
)
8285 rw_mgmt_ip
= await self
.wait_kdu_up(
8286 logging_text
, nsr_id
, vnfr_id
, kdu_name
8289 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(
8299 rw_mgmt_ip
= None # This is for a NS configuration
8301 self
.logger
.debug(logging_text
+ " VM_ip_address={}".format(rw_mgmt_ip
))
8303 # store rw_mgmt_ip in deploy params for later replacement
8304 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
8307 # get run-day1 operation parameter
8308 runDay1
= deploy_params
.get("run-day1",False)
8309 self
.logger
.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id
,vdu_id
,runDay1
))
8311 # n2vc_redesign STEP 6 Execute initial config primitive
8312 step
= "execute initial config primitive"
8314 # wait for dependent primitives execution (NS -> VNF -> VDU)
8315 if initial_config_primitive_list
:
8316 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
8318 # stage, in function of element type: vdu, kdu, vnf or ns
8319 my_vca
= vca_deployed_list
[vca_index
]
8320 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
8322 stage
[0] = "Stage 3/5: running Day-1 primitives for VDU."
8323 elif my_vca
.get("member-vnf-index"):
8325 stage
[0] = "Stage 4/5: running Day-1 primitives for VNF."
8328 stage
[0] = "Stage 5/5: running Day-1 primitives for NS."
8330 self
._write
_configuration
_status
(
8331 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="EXECUTING PRIMITIVE"
8334 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
=stage
)
8336 check_if_terminated_needed
= True
8337 for initial_config_primitive
in initial_config_primitive_list
:
8338 # adding information on the vca_deployed if it is a NS execution environment
8339 if not vca_deployed
["member-vnf-index"]:
8340 deploy_params
["ns_config_info"] = json
.dumps(
8341 self
._get
_ns
_config
_info
(nsr_id
)
8343 # TODO check if already done
8344 primitive_params_
= self
._map
_primitive
_params
(
8345 initial_config_primitive
, {}, deploy_params
8348 step
= "execute primitive '{}' params '{}'".format(
8349 initial_config_primitive
["name"], primitive_params_
8351 self
.logger
.debug(logging_text
+ step
)
8352 await self
.vca_map
[vca_type
].exec_primitive(
8354 primitive_name
=initial_config_primitive
["name"],
8355 params_dict
=primitive_params_
,
8360 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
8361 if check_if_terminated_needed
:
8362 if config_descriptor
.get("terminate-config-primitive"):
8364 "nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True}
8366 check_if_terminated_needed
= False
8368 # TODO register in database that primitive is done
8370 # STEP 7 Configure metrics
8371 # Not sure if this need to be done when healing
8373 if vca_type == "helm" or vca_type == "helm-v3":
8374 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
8376 artifact_path=artifact_path,
8377 ee_config_descriptor=ee_config_descriptor,
8380 target_ip=rw_mgmt_ip,
8386 {db_update_entry + "prometheus_jobs": prometheus_jobs},
8389 for job in prometheus_jobs:
8392 {"job_name": job["job_name"]},
8395 fail_on_empty=False,
8399 step
= "instantiated at VCA"
8400 self
.logger
.debug(logging_text
+ step
)
8402 self
._write
_configuration
_status
(
8403 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="READY"
8406 except Exception as e
: # TODO not use Exception but N2VC exception
8407 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
8409 e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)
8412 "Exception while {} : {}".format(step
, e
), exc_info
=True
8414 self
._write
_configuration
_status
(
8415 nsr_id
=nsr_id
, vca_index
=vca_index
, status
="BROKEN"
8417 raise LcmException("{} {}".format(step
, e
)) from e
8419 async def _wait_heal_ro(
8425 while time() <= start_time
+ timeout
:
8426 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
8427 operational_status_ro
= db_nsr
["_admin"]["deployed"]["RO"]["operational-status"]
8428 self
.logger
.debug("Wait Heal RO > {}".format(operational_status_ro
))
8429 if operational_status_ro
!= "healing":
8431 await asyncio
.sleep(15, loop
=self
.loop
)
8432 else: # timeout_ns_deploy
8433 raise NgRoException("Timeout waiting ns to deploy")
8435 async def vertical_scale(self
, nsr_id
, nslcmop_id
):
8437 Vertical Scale the VDUs in a NS
8439 :param: nsr_id: NS Instance ID
8440 :param: nslcmop_id: nslcmop ID of migrate
8443 # Try to lock HA task here
8444 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA("ns", "nslcmops", nslcmop_id
)
8445 if not task_is_locked_by_me
:
8447 logging_text
= "Task ns={} vertical scale ".format(nsr_id
)
8448 self
.logger
.debug(logging_text
+ "Enter")
8449 # get all needed from database
8451 db_nslcmop_update
= {}
8452 nslcmop_operation_state
= None
8456 # in case of error, indicates what part of scale was failed to put nsr at error status
8457 start_deploy
= time()
8460 # wait for any previous tasks in process
8461 step
= "Waiting for previous operations to terminate"
8462 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
8464 self
._write
_ns
_status
(
8467 current_operation
="VerticalScale",
8468 current_operation_id
=nslcmop_id
8470 step
= "Getting nslcmop from database"
8471 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
8472 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
8473 operationParams
= db_nslcmop
.get("operationParams")
8475 target
.update(operationParams
)
8476 desc
= await self
.RO
.vertical_scale(nsr_id
, target
)
8477 self
.logger
.debug("RO return > {}".format(desc
))
8478 action_id
= desc
["action_id"]
8479 await self
._wait
_ng
_ro
(
8480 nsr_id
, action_id
, nslcmop_id
, start_deploy
, self
.timeout_verticalscale
,
8481 operation
="verticalscale"
8483 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
8484 self
.logger
.error("Exit Exception {}".format(e
))
8486 except asyncio
.CancelledError
:
8487 self
.logger
.error("Cancelled Exception while '{}'".format(step
))
8488 exc
= "Operation was cancelled"
8489 except Exception as e
:
8490 exc
= traceback
.format_exc()
8491 self
.logger
.critical("Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
8493 self
._write
_ns
_status
(
8496 current_operation
="IDLE",
8497 current_operation_id
=None,
8502 ] = "FAILED {}: {}".format(step
, exc
)
8503 nslcmop_operation_state
= "FAILED"
8505 nslcmop_operation_state
= "COMPLETED"
8506 db_nslcmop_update
["detailed-status"] = "Done"
8507 db_nsr_update
["detailed-status"] = "Done"
8509 self
._write
_op
_status
(
8513 operation_state
=nslcmop_operation_state
,
8514 other_update
=db_nslcmop_update
,
8516 if nslcmop_operation_state
:
8520 "nslcmop_id": nslcmop_id
,
8521 "operationState": nslcmop_operation_state
,
8523 await self
.msg
.aiowrite("ns", "verticalscaled", msg
, loop
=self
.loop
)
8524 except Exception as e
:
8526 logging_text
+ "kafka_write notification Exception {}".format(e
)
8528 self
.logger
.debug(logging_text
+ "Exit")
8529 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_verticalscale")