1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from n2vc
.k8s_helm_conn
import K8sHelmConnector
31 from n2vc
.k8s_juju_conn
import K8sJujuConnector
33 from osm_common
.dbbase
import DbException
34 from osm_common
.fsbase
import FsException
36 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
37 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
39 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
41 from copy
import copy
, deepcopy
42 from http
import HTTPStatus
44 from uuid
import uuid4
46 from random
import randint
48 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
51 class N2VCJujuConnectorLCM(N2VCJujuConnector
):
53 async def create_execution_environment(self
, namespace
: str, db_dict
: dict, reuse_ee_id
: str = None,
54 progress_timeout
: float = None, total_timeout
: float = None,
55 config
: dict = None, artifact_path
: str = None,
56 vca_type
: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type
== "k8s_proxy_charm":
59 ee_id
= await self
.install_k8s_proxy_charm(
60 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
62 artifact_path
=artifact_path
,
66 return await super().create_execution_environment(
67 namespace
=namespace
, db_dict
=db_dict
, reuse_ee_id
=reuse_ee_id
,
68 progress_timeout
=progress_timeout
, total_timeout
=total_timeout
)
70 async def install_configuration_sw(self
, ee_id
: str, artifact_path
: str, db_dict
: dict,
71 progress_timeout
: float = None, total_timeout
: float = None,
72 config
: dict = None, num_units
: int = 1, vca_type
: str = "lxc_proxy_charm"):
73 if vca_type
== "k8s_proxy_charm":
75 return await super().install_configuration_sw(
76 ee_id
=ee_id
, artifact_path
=artifact_path
, db_dict
=db_dict
, progress_timeout
=progress_timeout
,
77 total_timeout
=total_timeout
, config
=config
, num_units
=num_units
)
81 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete
= 10 * 60
85 timeout_primitive
= 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
88 SUBOPERATION_STATUS_NOT_FOUND
= -1
89 SUBOPERATION_STATUS_NEW
= -2
90 SUBOPERATION_STATUS_SKIP
= -3
91 task_name_deploy_vca
= "Deploying VCA"
93 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
, prometheus
=None):
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
103 logger
=logging
.getLogger('lcm.ns')
107 self
.lcm_tasks
= lcm_tasks
108 self
.timeout
= config
["timeout"]
109 self
.ro_config
= config
["ro_config"]
110 self
.ng_ro
= config
["ro_config"].get("ng")
111 self
.vca_config
= config
["VCA"].copy()
113 # create N2VC connector
114 self
.n2vc
= N2VCJujuConnectorLCM(
119 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
120 username
=self
.vca_config
.get('user', None),
121 vca_config
=self
.vca_config
,
122 on_update_db
=self
._on
_update
_n
2vc
_db
125 self
.conn_helm_ee
= LCMHelmConn(
132 vca_config
=self
.vca_config
,
133 on_update_db
=self
._on
_update
_n
2vc
_db
136 self
.k8sclusterhelm
= K8sHelmConnector(
137 kubectl_command
=self
.vca_config
.get("kubectlpath"),
138 helm_command
=self
.vca_config
.get("helmpath"),
145 self
.k8sclusterjuju
= K8sJujuConnector(
146 kubectl_command
=self
.vca_config
.get("kubectlpath"),
147 juju_command
=self
.vca_config
.get("jujupath"),
151 on_update_db
=self
._on
_update
_k
8s
_db
,
154 self
.k8scluster_map
= {
155 "helm-chart": self
.k8sclusterhelm
,
156 "chart": self
.k8sclusterhelm
,
157 "juju-bundle": self
.k8sclusterjuju
,
158 "juju": self
.k8sclusterjuju
,
162 "lxc_proxy_charm": self
.n2vc
,
163 "native_charm": self
.n2vc
,
164 "k8s_proxy_charm": self
.n2vc
,
165 "helm": self
.conn_helm_ee
168 self
.prometheus
= prometheus
172 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
174 self
.RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
176 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
181 # TODO filter RO descriptor fields...
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict
['deploymentStatus'] = ro_descriptor
187 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
189 except Exception as e
:
190 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
192 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
194 # remove last dot from path (if exists)
195 if path
.endswith('.'):
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
202 nsr_id
= filter.get('_id')
204 # read ns record from database
205 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
206 current_ns_status
= nsr
.get('nsState')
208 # get vca status for NS
209 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
213 db_dict
['vcaStatus'] = status_dict
214 await self
.n2vc
.update_vca_status(db_dict
['vcaStatus'])
216 # update configurationStatus for this VCA
218 vca_index
= int(path
[path
.rfind(".")+1:])
220 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
221 vca_status
= vca_list
[vca_index
].get('status')
223 configuration_status_list
= nsr
.get('configurationStatus')
224 config_status
= configuration_status_list
[vca_index
].get('status')
226 if config_status
== 'BROKEN' and vca_status
!= 'failed':
227 db_dict
['configurationStatus'][vca_index
] = 'READY'
228 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
229 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
230 except Exception as e
:
231 # not update configurationStatus
232 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
234 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
235 # if nsState = 'DEGRADED' check if all is OK
237 if current_ns_status
in ('READY', 'DEGRADED'):
238 error_description
= ''
240 if status_dict
.get('machines'):
241 for machine_id
in status_dict
.get('machines'):
242 machine
= status_dict
.get('machines').get(machine_id
)
243 # check machine agent-status
244 if machine
.get('agent-status'):
245 s
= machine
.get('agent-status').get('status')
248 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
249 # check machine instance status
250 if machine
.get('instance-status'):
251 s
= machine
.get('instance-status').get('status')
254 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
256 if status_dict
.get('applications'):
257 for app_id
in status_dict
.get('applications'):
258 app
= status_dict
.get('applications').get(app_id
)
259 # check application status
260 if app
.get('status'):
261 s
= app
.get('status').get('status')
264 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
266 if error_description
:
267 db_dict
['errorDescription'] = error_description
268 if current_ns_status
== 'READY' and is_degraded
:
269 db_dict
['nsState'] = 'DEGRADED'
270 if current_ns_status
== 'DEGRADED' and not is_degraded
:
271 db_dict
['nsState'] = 'READY'
274 self
.update_db_2("nsrs", nsr_id
, db_dict
)
276 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
278 except Exception as e
:
279 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
281 async def _on_update_k8s_db(self
, cluster_uuid
, kdu_instance
, filter=None):
283 Updating vca status in NSR record
284 :param cluster_uuid: UUID of a k8s cluster
285 :param kdu_instance: The unique name of the KDU instance
286 :param filter: To get nsr_id
290 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
291 # .format(cluster_uuid, kdu_instance, filter))
294 nsr_id
= filter.get('_id')
296 # get vca status for NS
297 vca_status
= await self
.k8sclusterjuju
.status_kdu(cluster_uuid
,
299 complete_status
=True,
303 db_dict
['vcaStatus'] = {nsr_id
: vca_status
}
305 await self
.k8sclusterjuju
.update_vcaStatus(db_dict
['vcaStatus'], cluster_uuid
,
309 self
.update_db_2("nsrs", nsr_id
, db_dict
)
311 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
313 except Exception as e
:
314 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
317 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
319 env
= Environment(undefined
=StrictUndefined
)
320 template
= env
.from_string(cloud_init_text
)
321 return template
.render(additional_params
or {})
322 except UndefinedError
as e
:
323 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
324 "file, must be provided in the instantiation parameters inside the "
325 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
326 except (TemplateError
, TemplateNotFound
) as e
:
327 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
328 format(vnfd_id
, vdu_id
, e
))
330 def _get_cloud_init(self
, vdu
, vnfd
):
332 cloud_init_content
= cloud_init_file
= None
333 if vdu
.get("cloud-init-file"):
334 base_folder
= vnfd
["_admin"]["storage"]
335 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
336 vdu
["cloud-init-file"])
337 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
338 cloud_init_content
= ci_file
.read()
339 elif vdu
.get("cloud-init"):
340 cloud_init_content
= vdu
["cloud-init"]
342 return cloud_init_content
343 except FsException
as e
:
344 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
345 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
347 def _get_osm_params(self
, db_vnfr
, vdu_id
=None, vdu_count_index
=0):
348 osm_params
= {x
.replace("-", "_"): db_vnfr
[x
] for x
in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
349 if db_vnfr
.get(x
) is not None}
350 osm_params
["ns_id"] = db_vnfr
["nsr-id-ref"]
351 osm_params
["vnf_id"] = db_vnfr
["_id"]
352 osm_params
["member_vnf_index"] = db_vnfr
["member-vnf-index-ref"]
353 if db_vnfr
.get("vdur"):
354 osm_params
["vdu"] = {}
355 for vdur
in db_vnfr
["vdur"]:
357 "count_index": vdur
["count-index"],
358 "vdu_id": vdur
["vdu-id-ref"],
361 if vdur
.get("ip-address"):
362 vdu
["ip_address"] = vdur
["ip-address"]
363 for iface
in vdur
["interfaces"]:
364 vdu
["interfaces"][iface
["name"]] = \
365 {x
.replace("-", "_"): iface
[x
] for x
in ("mac-address", "ip-address", "vnf-vld-id", "name")
366 if iface
.get(x
) is not None}
367 vdu_id_index
= "{}-{}".format(vdur
["vdu-id-ref"], vdur
["count-index"])
368 osm_params
["vdu"][vdu_id_index
] = vdu
370 osm_params
["vdu_id"] = vdu_id
371 osm_params
["count_index"] = vdu_count_index
374 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
375 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
376 additional_params
= vdur
.get("additionalParams")
377 return self
._format
_additional
_params
(additional_params
)
379 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
381 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
382 :param vnfd: input vnfd
383 :param new_id: overrides vnf id if provided
384 :param additionalParams: Instantiation params for VNFs provided
385 :param nsrId: Id of the NSR
386 :return: copy of vnfd
388 vnfd_RO
= deepcopy(vnfd
)
389 # remove unused by RO configuration, monitoring, scaling and internal keys
390 vnfd_RO
.pop("_id", None)
391 vnfd_RO
.pop("_admin", None)
392 vnfd_RO
.pop("vnf-configuration", None)
393 vnfd_RO
.pop("monitoring-param", None)
394 vnfd_RO
.pop("scaling-group-descriptor", None)
395 vnfd_RO
.pop("kdu", None)
396 vnfd_RO
.pop("k8s-cluster", None)
398 vnfd_RO
["id"] = new_id
400 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
401 for vdu
in get_iterable(vnfd_RO
, "vdu"):
402 vdu
.pop("cloud-init-file", None)
403 vdu
.pop("cloud-init", None)
406 def _ns_params_2_RO(self
, ns_params
, nsd
, vnfd_dict
, db_vnfrs
, n2vc_key_list
):
408 Creates a RO ns descriptor from OSM ns_instantiate params
409 :param ns_params: OSM instantiate params
410 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
411 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
412 :return: The RO ns descriptor
416 # TODO feature 1417: Check that no instantiation is set over PDU
417 # check if PDU forces a concrete vim-network-id and add it
418 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
420 def vim_account_2_RO(vim_account
):
421 if vim_account
in vim_2_RO
:
422 return vim_2_RO
[vim_account
]
424 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
425 if db_vim
["_admin"]["operationalState"] != "ENABLED":
426 raise LcmException("VIM={} is not available. operationalState={}".format(
427 vim_account
, db_vim
["_admin"]["operationalState"]))
428 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
429 vim_2_RO
[vim_account
] = RO_vim_id
432 def wim_account_2_RO(wim_account
):
433 if isinstance(wim_account
, str):
434 if wim_account
in wim_2_RO
:
435 return wim_2_RO
[wim_account
]
437 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
438 if db_wim
["_admin"]["operationalState"] != "ENABLED":
439 raise LcmException("WIM={} is not available. operationalState={}".format(
440 wim_account
, db_wim
["_admin"]["operationalState"]))
441 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
442 wim_2_RO
[wim_account
] = RO_wim_id
447 def ip_profile_2_RO(ip_profile
):
448 RO_ip_profile
= deepcopy((ip_profile
))
449 if "dns-server" in RO_ip_profile
:
450 if isinstance(RO_ip_profile
["dns-server"], list):
451 RO_ip_profile
["dns-address"] = []
452 for ds
in RO_ip_profile
.pop("dns-server"):
453 RO_ip_profile
["dns-address"].append(ds
['address'])
455 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
456 if RO_ip_profile
.get("ip-version") == "ipv4":
457 RO_ip_profile
["ip-version"] = "IPv4"
458 if RO_ip_profile
.get("ip-version") == "ipv6":
459 RO_ip_profile
["ip-version"] = "IPv6"
460 if "dhcp-params" in RO_ip_profile
:
461 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
467 # "name": ns_params["nsName"],
468 # "description": ns_params.get("nsDescription"),
469 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
470 "wim_account": wim_account_2_RO(ns_params
.get("wimAccountId")),
471 # "scenario": ns_params["nsdId"],
473 # set vim_account of each vnf if different from general vim_account.
474 # Get this information from <vnfr> database content, key vim-account-id
475 # Vim account can be set by placement_engine and it may be different from
476 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
477 for vnf_index
, vnfr
in db_vnfrs
.items():
478 if vnfr
.get("vim-account-id") and vnfr
["vim-account-id"] != ns_params
["vimAccountId"]:
479 populate_dict(RO_ns_params
, ("vnfs", vnf_index
, "datacenter"), vim_account_2_RO(vnfr
["vim-account-id"]))
481 n2vc_key_list
= n2vc_key_list
or []
482 for vnfd_ref
, vnfd
in vnfd_dict
.items():
483 vdu_needed_access
= []
485 if vnfd
.get("vnf-configuration"):
486 ssh_required
= deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required"))
487 if ssh_required
and vnfd
.get("mgmt-interface"):
488 if vnfd
["mgmt-interface"].get("vdu-id"):
489 vdu_needed_access
.append(vnfd
["mgmt-interface"]["vdu-id"])
490 elif vnfd
["mgmt-interface"].get("cp"):
491 mgmt_cp
= vnfd
["mgmt-interface"]["cp"]
493 for vdu
in vnfd
.get("vdu", ()):
494 if vdu
.get("vdu-configuration"):
495 ssh_required
= deep_get(vdu
, ("vdu-configuration", "config-access", "ssh-access", "required"))
497 vdu_needed_access
.append(vdu
["id"])
499 for vdu_interface
in vdu
.get("interface"):
500 if vdu_interface
.get("external-connection-point-ref") and \
501 vdu_interface
["external-connection-point-ref"] == mgmt_cp
:
502 vdu_needed_access
.append(vdu
["id"])
506 if vdu_needed_access
:
507 for vnf_member
in nsd
.get("constituent-vnfd"):
508 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
510 for vdu
in vdu_needed_access
:
511 populate_dict(RO_ns_params
,
512 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
, "mgmt_keys"),
515 for vdu
in get_iterable(vnfd
, "vdu"):
516 cloud_init_text
= self
._get
_cloud
_init
(vdu
, vnfd
)
517 if not cloud_init_text
:
519 for vnf_member
in nsd
.get("constituent-vnfd"):
520 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
522 db_vnfr
= db_vnfrs
[vnf_member
["member-vnf-index"]]
523 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdu
["id"]) or {}
526 for vdu_index
in range(0, int(vdu
.get("count", 1))):
527 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu
["id"], vdu_index
)
528 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
, vnfd
["id"],
530 populate_dict(RO_ns_params
,
531 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
["id"], "cloud_init"),
534 if ns_params
.get("vduImage"):
535 RO_ns_params
["vduImage"] = ns_params
["vduImage"]
537 if ns_params
.get("ssh_keys"):
538 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh_keys"]}
539 for vnf_params
in get_iterable(ns_params
, "vnf"):
540 for constituent_vnfd
in nsd
["constituent-vnfd"]:
541 if constituent_vnfd
["member-vnf-index"] == vnf_params
["member-vnf-index"]:
542 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
545 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
546 "constituent-vnfd".format(vnf_params
["member-vnf-index"]))
548 for vdu_params
in get_iterable(vnf_params
, "vdu"):
549 # TODO feature 1417: check that this VDU exist and it is not a PDU
550 if vdu_params
.get("volume"):
551 for volume_params
in vdu_params
["volume"]:
552 if volume_params
.get("vim-volume-id"):
553 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
554 vdu_params
["id"], "devices", volume_params
["name"], "vim_id"),
555 volume_params
["vim-volume-id"])
556 if vdu_params
.get("interface"):
557 for interface_params
in vdu_params
["interface"]:
558 if interface_params
.get("ip-address"):
559 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
560 vdu_params
["id"], "interfaces", interface_params
["name"],
562 interface_params
["ip-address"])
563 if interface_params
.get("mac-address"):
564 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
565 vdu_params
["id"], "interfaces", interface_params
["name"],
567 interface_params
["mac-address"])
568 if interface_params
.get("floating-ip-required"):
569 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
570 vdu_params
["id"], "interfaces", interface_params
["name"],
572 interface_params
["floating-ip-required"])
574 for internal_vld_params
in get_iterable(vnf_params
, "internal-vld"):
575 if internal_vld_params
.get("vim-network-name"):
576 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
577 internal_vld_params
["name"], "vim-network-name"),
578 internal_vld_params
["vim-network-name"])
579 if internal_vld_params
.get("vim-network-id"):
580 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
581 internal_vld_params
["name"], "vim-network-id"),
582 internal_vld_params
["vim-network-id"])
583 if internal_vld_params
.get("ip-profile"):
584 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
585 internal_vld_params
["name"], "ip-profile"),
586 ip_profile_2_RO(internal_vld_params
["ip-profile"]))
587 if internal_vld_params
.get("provider-network"):
589 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
590 internal_vld_params
["name"], "provider-network"),
591 internal_vld_params
["provider-network"].copy())
593 for icp_params
in get_iterable(internal_vld_params
, "internal-connection-point"):
596 for vdu_descriptor
in vnf_descriptor
["vdu"]:
597 for vdu_interface
in vdu_descriptor
["interface"]:
598 if vdu_interface
.get("internal-connection-point-ref") == icp_params
["id-ref"]:
599 if icp_params
.get("ip-address"):
600 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
601 vdu_descriptor
["id"], "interfaces",
602 vdu_interface
["name"], "ip_address"),
603 icp_params
["ip-address"])
605 if icp_params
.get("mac-address"):
606 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
607 vdu_descriptor
["id"], "interfaces",
608 vdu_interface
["name"], "mac_address"),
609 icp_params
["mac-address"])
615 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
616 "internal-vld:id-ref={} is not present at vnfd:internal-"
617 "connection-point".format(vnf_params
["member-vnf-index"],
618 icp_params
["id-ref"]))
620 for vld_params
in get_iterable(ns_params
, "vld"):
621 if "ip-profile" in vld_params
:
622 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "ip-profile"),
623 ip_profile_2_RO(vld_params
["ip-profile"]))
625 if vld_params
.get("provider-network"):
627 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "provider-network"),
628 vld_params
["provider-network"].copy())
630 if "wimAccountId" in vld_params
and vld_params
["wimAccountId"] is not None:
631 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "wim_account"),
632 wim_account_2_RO(vld_params
["wimAccountId"])),
633 if vld_params
.get("vim-network-name"):
635 if isinstance(vld_params
["vim-network-name"], dict):
636 for vim_account
, vim_net
in vld_params
["vim-network-name"].items():
637 RO_vld_sites
.append({
638 "netmap-use": vim_net
,
639 "datacenter": vim_account_2_RO(vim_account
)
641 else: # isinstance str
642 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-name"]})
644 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
646 if vld_params
.get("vim-network-id"):
648 if isinstance(vld_params
["vim-network-id"], dict):
649 for vim_account
, vim_net
in vld_params
["vim-network-id"].items():
650 RO_vld_sites
.append({
651 "netmap-use": vim_net
,
652 "datacenter": vim_account_2_RO(vim_account
)
654 else: # isinstance str
655 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-id"]})
657 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
658 if vld_params
.get("ns-net"):
659 if isinstance(vld_params
["ns-net"], dict):
660 for vld_id
, instance_scenario_id
in vld_params
["ns-net"].items():
661 RO_vld_ns_net
= {"instance_scenario_id": instance_scenario_id
, "osm_id": vld_id
}
662 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "use-network"), RO_vld_ns_net
)
663 if "vnfd-connection-point-ref" in vld_params
:
664 for cp_params
in vld_params
["vnfd-connection-point-ref"]:
666 for constituent_vnfd
in nsd
["constituent-vnfd"]:
667 if constituent_vnfd
["member-vnf-index"] == cp_params
["member-vnf-index-ref"]:
668 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
672 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
673 "is not present at nsd:constituent-vnfd".format(cp_params
["member-vnf-index-ref"]))
675 for vdu_descriptor
in vnf_descriptor
["vdu"]:
676 for interface_descriptor
in vdu_descriptor
["interface"]:
677 if interface_descriptor
.get("external-connection-point-ref") == \
678 cp_params
["vnfd-connection-point-ref"]:
685 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
686 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
687 cp_params
["member-vnf-index-ref"],
688 cp_params
["vnfd-connection-point-ref"],
689 vnf_descriptor
["id"]))
690 if cp_params
.get("ip-address"):
691 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
692 vdu_descriptor
["id"], "interfaces",
693 interface_descriptor
["name"], "ip_address"),
694 cp_params
["ip-address"])
695 if cp_params
.get("mac-address"):
696 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
697 vdu_descriptor
["id"], "interfaces",
698 interface_descriptor
["name"], "mac_address"),
699 cp_params
["mac-address"])
702 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None):
703 # make a copy to do not change
704 vdu_create
= copy(vdu_create
)
705 vdu_delete
= copy(vdu_delete
)
707 vdurs
= db_vnfr
.get("vdur")
710 vdu_index
= len(vdurs
)
713 vdur
= vdurs
[vdu_index
]
714 if vdur
.get("pdu-type"):
716 vdu_id_ref
= vdur
["vdu-id-ref"]
717 if vdu_create
and vdu_create
.get(vdu_id_ref
):
718 vdur_copy
= deepcopy(vdur
)
719 vdur_copy
["status"] = "BUILD"
720 vdur_copy
["status-detailed"] = None
721 vdur_copy
["ip_address"]: None
722 for iface
in vdur_copy
["interfaces"]:
723 iface
["ip-address"] = None
724 iface
["mac-address"] = None
725 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
726 for index
in range(0, vdu_create
[vdu_id_ref
]):
727 vdur_copy
["_id"] = str(uuid4())
728 vdur_copy
["count-index"] += 1
729 vdurs
.insert(vdu_index
+1+index
, vdur_copy
)
730 self
.logger
.debug("scale out, adding vdu={}".format(vdur_copy
))
731 vdur_copy
= deepcopy(vdur_copy
)
733 del vdu_create
[vdu_id_ref
]
734 if vdu_delete
and vdu_delete
.get(vdu_id_ref
):
736 vdu_delete
[vdu_id_ref
] -= 1
737 if not vdu_delete
[vdu_id_ref
]:
738 del vdu_delete
[vdu_id_ref
]
739 # check all operations are done
740 if vdu_create
or vdu_delete
:
741 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
744 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
747 vnfr_update
= {"vdur": vdurs
}
748 db_vnfr
["vdur"] = vdurs
749 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
751 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
753 Updates database nsr with the RO info for the created vld
754 :param ns_update_nsr: dictionary to be filled with the updated info
755 :param db_nsr: content of db_nsr. This is also modified
756 :param nsr_desc_RO: nsr descriptor from RO
757 :return: Nothing, LcmException is raised on errors
760 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
761 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
762 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
764 vld
["vim-id"] = net_RO
.get("vim_net_id")
765 vld
["name"] = net_RO
.get("vim_name")
766 vld
["status"] = net_RO
.get("status")
767 vld
["status-detailed"] = net_RO
.get("error_msg")
768 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
771 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
773 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
775 for db_vnfr
in db_vnfrs
.values():
776 vnfr_update
= {"status": "ERROR"}
777 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
778 if "status" not in vdur
:
779 vdur
["status"] = "ERROR"
780 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
782 vdur
["status-detailed"] = str(error_text
)
783 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
784 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
785 except DbException
as e
:
786 self
.logger
.error("Cannot update vnf. {}".format(e
))
788 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
790 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
791 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
792 :param nsr_desc_RO: nsr descriptor from RO
793 :return: Nothing, LcmException is raised on errors
795 for vnf_index
, db_vnfr
in db_vnfrs
.items():
796 for vnf_RO
in nsr_desc_RO
["vnfs"]:
797 if vnf_RO
["member_vnf_index"] != vnf_index
:
800 if vnf_RO
.get("ip_address"):
801 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
802 elif not db_vnfr
.get("ip-address"):
803 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
804 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
806 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
807 vdur_RO_count_index
= 0
808 if vdur
.get("pdu-type"):
810 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
811 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
813 if vdur
["count-index"] != vdur_RO_count_index
:
814 vdur_RO_count_index
+= 1
816 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
817 if vdur_RO
.get("ip_address"):
818 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
820 vdur
["ip-address"] = None
821 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
822 vdur
["name"] = vdur_RO
.get("vim_name")
823 vdur
["status"] = vdur_RO
.get("status")
824 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
825 for ifacer
in get_iterable(vdur
, "interfaces"):
826 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
827 if ifacer
["name"] == interface_RO
.get("internal_name"):
828 ifacer
["ip-address"] = interface_RO
.get("ip_address")
829 ifacer
["mac-address"] = interface_RO
.get("mac_address")
832 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
834 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
835 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
838 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
839 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
841 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
842 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
843 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
845 vld
["vim-id"] = net_RO
.get("vim_net_id")
846 vld
["name"] = net_RO
.get("vim_name")
847 vld
["status"] = net_RO
.get("status")
848 vld
["status-detailed"] = net_RO
.get("error_msg")
849 vnfr_update
["vld.{}".format(vld_index
)] = vld
852 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
853 vnf_index
, vld
["id"]))
855 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
859 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
861 def _get_ns_config_info(self
, nsr_id
):
863 Generates a mapping between vnf,vdu elements and the N2VC id
864 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
865 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
866 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
867 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
869 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
870 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
872 ns_config_info
= {"osm-config-mapping": mapping
}
873 for vca
in vca_deployed_list
:
874 if not vca
["member-vnf-index"]:
876 if not vca
["vdu_id"]:
877 mapping
[vca
["member-vnf-index"]] = vca
["application"]
879 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
881 return ns_config_info
884 def _get_initial_config_primitive_list(desc_primitive_list
, vca_deployed
, ee_descriptor_id
):
886 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
887 primitives as verify-ssh-credentials, or config when needed
888 :param desc_primitive_list: information of the descriptor
889 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
890 this element contains a ssh public key
891 :param ee_descriptor_id: execution environment descriptor id. It is the value of
892 XXX_configuration.execution-environment-list.INDEX.id; it can be None
893 :return: The modified list. Can ba an empty list, but always a list
896 primitive_list
= desc_primitive_list
or []
898 # filter primitives by ee_id
899 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
903 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
905 # look for primitive config, and get the position. None if not present
906 config_position
= None
907 for index
, primitive
in enumerate(primitive_list
):
908 if primitive
["name"] == "config":
909 config_position
= index
912 # for NS, add always a config primitive if not present (bug 874)
913 if not vca_deployed
["member-vnf-index"] and config_position
is None:
914 primitive_list
.insert(0, {"name": "config", "parameter": []})
916 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
917 if vca_deployed
["member-vnf-index"] and config_position
is not None and vca_deployed
.get("ssh-public-key"):
918 primitive_list
.insert(config_position
+ 1, {"name": "verify-ssh-credentials", "parameter": []})
919 return primitive_list
921 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
922 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
923 nslcmop_id
= db_nslcmop
["_id"]
925 "name": db_nsr
["name"],
928 "image": deepcopy(db_nsr
["image"]),
929 "flavor": deepcopy(db_nsr
["flavor"]),
930 "action_id": nslcmop_id
,
932 for image
in target
["image"]:
933 image
["vim_info"] = []
934 for flavor
in target
["flavor"]:
935 flavor
["vim_info"] = []
937 ns_params
= db_nslcmop
.get("operationParams")
939 if ns_params
.get("ssh_keys"):
940 ssh_keys
+= ns_params
.get("ssh_keys")
942 ssh_keys
+= n2vc_key_list
945 for vld_index
, vld
in enumerate(nsd
.get("vld")):
946 target_vld
= {"id": vld
["id"],
948 "mgmt-network": vld
.get("mgmt-network", False),
949 "type": vld
.get("type"),
950 "vim_info": [{"vim-network-name": vld
.get("vim-network-name"),
951 "vim_account_id": ns_params
["vimAccountId"]}],
953 for cp
in vld
["vnfd-connection-point-ref"]:
954 cp2target
["member_vnf:{}.{}".format(cp
["member-vnf-index-ref"], cp
["vnfd-connection-point-ref"])] = \
955 "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
956 target
["ns"]["vld"].append(target_vld
)
957 for vnfr
in db_vnfrs
.values():
958 vnfd
= db_vnfds_ref
[vnfr
["vnfd-ref"]]
959 target_vnf
= deepcopy(vnfr
)
960 for vld
in target_vnf
.get("vld", ()):
961 # check if connected to a ns.vld
962 vnf_cp
= next((cp
for cp
in vnfd
.get("connection-point", ()) if
963 cp
.get("internal-vld-ref") == vld
["id"]), None)
965 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
966 if cp2target
.get(ns_cp
):
967 vld
["target"] = cp2target
[ns_cp
]
968 vld
["vim_info"] = [{"vim-network-name": vld
.get("vim-network-name"),
969 "vim_account_id": vnfr
["vim-account-id"]}]
971 for vdur
in target_vnf
.get("vdur", ()):
972 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
973 vdud_index
, vdud
= next(k
for k
in enumerate(vnfd
["vdu"]) if k
[1]["id"] == vdur
["vdu-id-ref"])
974 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
977 if deep_get(vdud
, ("vdu-configuration", "config-access", "ssh-access", "required")):
978 vdur
["ssh-keys"] = ssh_keys
979 vdur
["ssh-access-required"] = True
980 elif deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
981 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
982 vdur
["ssh-keys"] = ssh_keys
983 vdur
["ssh-access-required"] = True
986 if vdud
.get("cloud-init-file"):
987 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
988 elif vdud
.get("cloud-init"):
989 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], vdud_index
)
992 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
993 if not next((vi
for vi
in ns_flavor
["vim_info"] if
994 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
995 ns_flavor
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
997 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
998 if not next((vi
for vi
in ns_image
["vim_info"] if
999 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
1000 ns_image
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
1002 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
1003 target
["vnf"].append(target_vnf
)
1005 desc
= await self
.RO
.deploy(nsr_id
, target
)
1006 action_id
= desc
["action_id"]
1007 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
1011 "_admin.deployed.RO.operational-status": "running",
1012 "detailed-status": " ".join(stage
)
1014 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1015 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1016 self
._write
_op
_status
(nslcmop_id
, stage
)
1017 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
1020 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
, start_time
, timeout
, stage
):
1021 detailed_status_old
= None
1023 while time() <= start_time
+ timeout
:
1024 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
1025 if desc_status
["status"] == "FAILED":
1026 raise NgRoException(desc_status
["details"])
1027 elif desc_status
["status"] == "BUILD":
1028 stage
[2] = "VIM: ({})".format(desc_status
["details"])
1029 elif desc_status
["status"] == "DONE":
1030 stage
[2] = "Deployed at VIM"
1033 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
1034 if stage
[2] != detailed_status_old
:
1035 detailed_status_old
= stage
[2]
1036 db_nsr_update
["detailed-status"] = " ".join(stage
)
1037 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1038 self
._write
_op
_status
(nslcmop_id
, stage
)
1039 await asyncio
.sleep(5, loop
=self
.loop
)
1040 else: # timeout_ns_deploy
1041 raise NgRoException("Timeout waiting ns to deploy")
1043 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
1047 start_deploy
= time()
1055 desc
= await self
.RO
.deploy(nsr_id
, target
)
1056 action_id
= desc
["action_id"]
1057 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1058 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1059 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
1062 delete_timeout
= 20 * 60 # 20 minutes
1063 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
1065 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1066 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1068 await self
.RO
.delete(nsr_id
)
1069 except Exception as e
:
1070 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1071 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1072 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1073 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1074 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
1075 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1076 failed_detail
.append("delete conflict: {}".format(e
))
1077 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
1079 failed_detail
.append("delete error: {}".format(e
))
1080 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
1083 stage
[2] = "Error deleting from VIM"
1085 stage
[2] = "Deleted from VIM"
1086 db_nsr_update
["detailed-status"] = " ".join(stage
)
1087 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1088 self
._write
_op
_status
(nslcmop_id
, stage
)
1091 raise LcmException("; ".join(failed_detail
))
1094 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
1095 n2vc_key_list
, stage
):
1098 :param logging_text: preffix text to use at logging
1099 :param nsr_id: nsr identity
1100 :param nsd: database content of ns descriptor
1101 :param db_nsr: database content of ns record
1102 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1104 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1105 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1106 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1107 :return: None or exception
1111 RO_descriptor_number
= 0 # number of descriptors created at RO
1112 vnf_index_2_RO_id
= {} # map between vnfd/nsd id to the id used at RO
1113 nslcmop_id
= db_nslcmop
["_id"]
1114 start_deploy
= time()
1115 ns_params
= db_nslcmop
.get("operationParams")
1116 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1117 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1119 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1121 # Check for and optionally request placement optimization. Database will be updated if placement activated
1122 stage
[2] = "Waiting for Placement."
1123 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1124 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1125 for vnfr
in db_vnfrs
.values():
1126 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1129 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1132 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
1133 db_vnfds_ref
, n2vc_key_list
, stage
, start_deploy
,
1136 # get vnfds, instantiate at RO
1137 for c_vnf
in nsd
.get("constituent-vnfd", ()):
1138 member_vnf_index
= c_vnf
["member-vnf-index"]
1139 vnfd
= db_vnfds_ref
[c_vnf
['vnfd-id-ref']]
1140 vnfd_ref
= vnfd
["id"]
1142 stage
[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref
, member_vnf_index
)
1143 db_nsr_update
["detailed-status"] = " ".join(stage
)
1144 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1145 self
._write
_op
_status
(nslcmop_id
, stage
)
1147 # self.logger.debug(logging_text + stage[2])
1148 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, member_vnf_index
[:23])
1149 vnf_index_2_RO_id
[member_vnf_index
] = vnfd_id_RO
1150 RO_descriptor_number
+= 1
1152 # look position at deployed.RO.vnfd if not present it will be appended at the end
1153 for index
, vnf_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"]):
1154 if vnf_deployed
["member-vnf-index"] == member_vnf_index
:
1157 index
= len(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"])
1158 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1161 RO_update
= {"member-vnf-index": member_vnf_index
}
1162 vnfd_list
= await self
.RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
1164 RO_update
["id"] = vnfd_list
[0]["uuid"]
1165 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1166 format(vnfd_ref
, member_vnf_index
, vnfd_list
[0]["uuid"]))
1168 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
, db_vnfrs
[c_vnf
["member-vnf-index"]].
1169 get("additionalParamsForVnf"), nsr_id
)
1170 desc
= await self
.RO
.create("vnfd", descriptor
=vnfd_RO
)
1171 RO_update
["id"] = desc
["uuid"]
1172 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1173 vnfd_ref
, member_vnf_index
, desc
["uuid"]))
1174 db_nsr_update
["_admin.deployed.RO.vnfd.{}".format(index
)] = RO_update
1175 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"][index
] = RO_update
1180 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1181 db_nsr_update
["detailed-status"] = " ".join(stage
)
1182 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1183 self
._write
_op
_status
(nslcmop_id
, stage
)
1185 # self.logger.debug(logging_text + stage[2])
1186 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_ref
[:23])
1187 RO_descriptor_number
+= 1
1188 nsd_list
= await self
.RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
1190 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
1191 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
1192 nsd_ref
, RO_nsd_uuid
))
1194 nsd_RO
= deepcopy(nsd
)
1195 nsd_RO
["id"] = RO_osm_nsd_id
1196 nsd_RO
.pop("_id", None)
1197 nsd_RO
.pop("_admin", None)
1198 for c_vnf
in nsd_RO
.get("constituent-vnfd", ()):
1199 member_vnf_index
= c_vnf
["member-vnf-index"]
1200 c_vnf
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1201 for c_vld
in nsd_RO
.get("vld", ()):
1202 for cp
in c_vld
.get("vnfd-connection-point-ref", ()):
1203 member_vnf_index
= cp
["member-vnf-index-ref"]
1204 cp
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1206 desc
= await self
.RO
.create("nsd", descriptor
=nsd_RO
)
1207 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1208 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
1209 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_ref
, RO_nsd_uuid
))
1210 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1213 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1214 db_nsr_update
["detailed-status"] = " ".join(stage
)
1215 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1216 self
._write
_op
_status
(nslcmop_id
, stage
)
1218 # if present use it unless in error status
1219 RO_nsr_id
= deep_get(db_nsr
, ("_admin", "deployed", "RO", "nsr_id"))
1222 stage
[2] = "Looking for existing ns at RO"
1223 db_nsr_update
["detailed-status"] = " ".join(stage
)
1224 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1225 self
._write
_op
_status
(nslcmop_id
, stage
)
1226 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1227 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1229 except ROclient
.ROClientException
as e
:
1230 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
1232 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1234 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1235 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1236 if ns_status
== "ERROR":
1237 stage
[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
1238 self
.logger
.debug(logging_text
+ stage
[2])
1239 await self
.RO
.delete("ns", RO_nsr_id
)
1240 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1242 stage
[2] = "Checking dependencies"
1243 db_nsr_update
["detailed-status"] = " ".join(stage
)
1244 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1245 self
._write
_op
_status
(nslcmop_id
, stage
)
1246 # self.logger.debug(logging_text + stage[2])
1248 # check if VIM is creating and wait look if previous tasks in process
1249 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account", ns_params
["vimAccountId"])
1251 stage
[2] = "Waiting for related tasks '{}' to be completed".format(task_name
)
1252 self
.logger
.debug(logging_text
+ stage
[2])
1253 await asyncio
.wait(task_dependency
, timeout
=3600)
1254 if ns_params
.get("vnf"):
1255 for vnf
in ns_params
["vnf"]:
1256 if "vimAccountId" in vnf
:
1257 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account",
1258 vnf
["vimAccountId"])
1260 stage
[2] = "Waiting for related tasks '{}' to be completed.".format(task_name
)
1261 self
.logger
.debug(logging_text
+ stage
[2])
1262 await asyncio
.wait(task_dependency
, timeout
=3600)
1264 stage
[2] = "Checking instantiation parameters."
1265 RO_ns_params
= self
._ns
_params
_2_RO
(ns_params
, nsd
, db_vnfds_ref
, db_vnfrs
, n2vc_key_list
)
1266 stage
[2] = "Deploying ns at VIM."
1267 db_nsr_update
["detailed-status"] = " ".join(stage
)
1268 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1269 self
._write
_op
_status
(nslcmop_id
, stage
)
1271 desc
= await self
.RO
.create("ns", descriptor
=RO_ns_params
, name
=db_nsr
["name"], scenario
=RO_nsd_uuid
)
1272 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
1273 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1274 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
1275 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
1277 # wait until NS is ready
1278 stage
[2] = "Waiting VIM to deploy ns."
1279 db_nsr_update
["detailed-status"] = " ".join(stage
)
1280 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1281 self
._write
_op
_status
(nslcmop_id
, stage
)
1282 detailed_status_old
= None
1283 self
.logger
.debug(logging_text
+ stage
[2] + " RO_ns_id={}".format(RO_nsr_id
))
1286 while time() <= start_deploy
+ timeout_ns_deploy
:
1287 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1290 if desc
!= old_desc
:
1291 # desc has changed => update db
1292 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
1295 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1296 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1297 if ns_status
== "ERROR":
1298 raise ROclient
.ROClientException(ns_status_info
)
1299 elif ns_status
== "BUILD":
1300 stage
[2] = "VIM: ({})".format(ns_status_info
)
1301 elif ns_status
== "ACTIVE":
1302 stage
[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1304 self
.ns_update_vnfr(db_vnfrs
, desc
)
1306 except LcmExceptionNoMgmtIP
:
1309 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1310 if stage
[2] != detailed_status_old
:
1311 detailed_status_old
= stage
[2]
1312 db_nsr_update
["detailed-status"] = " ".join(stage
)
1313 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1314 self
._write
_op
_status
(nslcmop_id
, stage
)
1315 await asyncio
.sleep(5, loop
=self
.loop
)
1316 else: # timeout_ns_deploy
1317 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1320 self
.ns_update_nsr(db_nsr_update
, db_nsr
, desc
)
1322 db_nsr_update
["_admin.deployed.RO.operational-status"] = "running"
1323 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1324 stage
[2] = "Deployed at VIM"
1325 db_nsr_update
["detailed-status"] = " ".join(stage
)
1326 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1327 self
._write
_op
_status
(nslcmop_id
, stage
)
1328 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1329 # self.logger.debug(logging_text + "Deployed at VIM")
1330 except (ROclient
.ROClientException
, LcmException
, DbException
, NgRoException
) as e
:
1331 stage
[2] = "ERROR deploying at VIM"
1332 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1335 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1337 Wait for kdu to be up, get ip address
1338 :param logging_text: prefix use for logging
1345 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1348 while nb_tries
< 360:
1349 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1350 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
1352 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
1353 if kdur
.get("status"):
1354 if kdur
["status"] in ("READY", "ENABLED"):
1355 return kdur
.get("ip-address")
1357 raise LcmException("target KDU={} is in error state".format(kdu_name
))
1359 await asyncio
.sleep(10, loop
=self
.loop
)
1361 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1363 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1365 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1366 :param logging_text: prefix use for logging
1371 :param pub_key: public ssh key to inject, None to skip
1372 :param user: user to apply the public ssh key
1376 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1380 target_vdu_id
= None
1386 if ro_retries
>= 360: # 1 hour
1387 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1389 await asyncio
.sleep(10, loop
=self
.loop
)
1392 if not target_vdu_id
:
1393 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1395 if not vdu_id
: # for the VNF case
1396 if db_vnfr
.get("status") == "ERROR":
1397 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1398 ip_address
= db_vnfr
.get("ip-address")
1401 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1403 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1404 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1406 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1407 vdur
= db_vnfr
["vdur"][0]
1409 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1412 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE":
1413 ip_address
= vdur
.get("ip-address")
1416 target_vdu_id
= vdur
["vdu-id-ref"]
1417 elif vdur
.get("status") == "ERROR":
1418 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1420 if not target_vdu_id
:
1423 # inject public key into machine
1424 if pub_key
and user
:
1425 # wait until NS is deployed at RO
1427 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1428 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1432 # self.logger.debug(logging_text + "Inserting RO key")
1433 if vdur
.get("pdu-type"):
1434 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1437 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1439 target
= {"action": "inject_ssh_key", "key": pub_key
, "user": user
,
1440 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdu_id
}]}],
1442 await self
.RO
.deploy(nsr_id
, target
)
1444 result_dict
= await self
.RO
.create_action(
1446 item_id_name
=ro_nsr_id
,
1447 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1449 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1450 if not result_dict
or not isinstance(result_dict
, dict):
1451 raise LcmException("Unknown response from RO when injecting key")
1452 for result
in result_dict
.values():
1453 if result
.get("vim_result") == 200:
1456 raise ROclient
.ROClientException("error injecting key: {}".format(
1457 result
.get("description")))
1459 except NgRoException
as e
:
1460 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1461 except ROclient
.ROClientException
as e
:
1463 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1467 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1473 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1475 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1477 my_vca
= vca_deployed_list
[vca_index
]
1478 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1479 # vdu or kdu: no dependencies
1483 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1484 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1485 configuration_status_list
= db_nsr
["configurationStatus"]
1486 for index
, vca_deployed
in enumerate(configuration_status_list
):
1487 if index
== vca_index
:
1490 if not my_vca
.get("member-vnf-index") or \
1491 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1492 internal_status
= configuration_status_list
[index
].get("status")
1493 if internal_status
== 'READY':
1495 elif internal_status
== 'BROKEN':
1496 raise LcmException("Configuration aborted because dependent charm/s has failed")
1500 # no dependencies, return
1502 await asyncio
.sleep(10)
1505 raise LcmException("Configuration aborted because dependent charm/s timeout")
1507 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1508 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1509 ee_config_descriptor
):
1510 nsr_id
= db_nsr
["_id"]
1511 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1512 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1513 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1514 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1516 'collection': 'nsrs',
1517 'filter': {'_id': nsr_id
},
1518 'path': db_update_entry
1524 element_under_configuration
= nsr_id
1528 vnfr_id
= db_vnfr
["_id"]
1529 osm_config
["osm"]["vnf_id"] = vnfr_id
1531 namespace
= "{nsi}.{ns}".format(
1532 nsi
=nsi_id
if nsi_id
else "",
1536 element_type
= 'VNF'
1537 element_under_configuration
= vnfr_id
1538 namespace
+= ".{}".format(vnfr_id
)
1540 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1541 element_type
= 'VDU'
1542 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1543 osm_config
["osm"]["vdu_id"] = vdu_id
1545 namespace
+= ".{}".format(kdu_name
)
1546 element_type
= 'KDU'
1547 element_under_configuration
= kdu_name
1548 osm_config
["osm"]["kdu_name"] = kdu_name
1551 artifact_path
= "{}/{}/{}/{}".format(
1552 base_folder
["folder"],
1553 base_folder
["pkg-dir"],
1554 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1557 # get initial_config_primitive_list that applies to this element
1558 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1560 # add config if not present for NS charm
1561 ee_descriptor_id
= ee_config_descriptor
.get("id")
1562 initial_config_primitive_list
= self
._get
_initial
_config
_primitive
_list
(initial_config_primitive_list
,
1563 vca_deployed
, ee_descriptor_id
)
1565 # n2vc_redesign STEP 3.1
1566 # find old ee_id if exists
1567 ee_id
= vca_deployed
.get("ee_id")
1569 # create or register execution environment in VCA
1570 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1572 self
._write
_configuration
_status
(
1574 vca_index
=vca_index
,
1576 element_under_configuration
=element_under_configuration
,
1577 element_type
=element_type
1580 step
= "create execution environment"
1581 self
.logger
.debug(logging_text
+ step
)
1582 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1583 namespace
=namespace
,
1587 artifact_path
=artifact_path
,
1590 elif vca_type
== "native_charm":
1591 step
= "Waiting to VM being up and getting IP address"
1592 self
.logger
.debug(logging_text
+ step
)
1593 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1594 user
=None, pub_key
=None)
1595 credentials
= {"hostname": rw_mgmt_ip
}
1597 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1598 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1599 # merged. Meanwhile let's get username from initial-config-primitive
1600 if not username
and initial_config_primitive_list
:
1601 for config_primitive
in initial_config_primitive_list
:
1602 for param
in config_primitive
.get("parameter", ()):
1603 if param
["name"] == "ssh-username":
1604 username
= param
["value"]
1607 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1608 "'config-access.ssh-access.default-user'")
1609 credentials
["username"] = username
1610 # n2vc_redesign STEP 3.2
1612 self
._write
_configuration
_status
(
1614 vca_index
=vca_index
,
1615 status
='REGISTERING',
1616 element_under_configuration
=element_under_configuration
,
1617 element_type
=element_type
1620 step
= "register execution environment {}".format(credentials
)
1621 self
.logger
.debug(logging_text
+ step
)
1622 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1623 credentials
=credentials
, namespace
=namespace
, db_dict
=db_dict
)
1625 # for compatibility with MON/POL modules, the need model and application name at database
1626 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1627 ee_id_parts
= ee_id
.split('.')
1628 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1629 if len(ee_id_parts
) >= 2:
1630 model_name
= ee_id_parts
[0]
1631 application_name
= ee_id_parts
[1]
1632 db_nsr_update
[db_update_entry
+ "model"] = model_name
1633 db_nsr_update
[db_update_entry
+ "application"] = application_name
1635 # n2vc_redesign STEP 3.3
1636 step
= "Install configuration Software"
1638 self
._write
_configuration
_status
(
1640 vca_index
=vca_index
,
1641 status
='INSTALLING SW',
1642 element_under_configuration
=element_under_configuration
,
1643 element_type
=element_type
,
1644 other_update
=db_nsr_update
1647 # TODO check if already done
1648 self
.logger
.debug(logging_text
+ step
)
1650 if vca_type
== "native_charm":
1651 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1652 if config_primitive
:
1653 config
= self
._map
_primitive
_params
(
1659 if vca_type
== "lxc_proxy_charm":
1660 if element_type
== "NS":
1661 num_units
= db_nsr
.get("config-units") or 1
1662 elif element_type
== "VNF":
1663 num_units
= db_vnfr
.get("config-units") or 1
1664 elif element_type
== "VDU":
1665 for v
in db_vnfr
["vdur"]:
1666 if vdu_id
== v
["vdu-id-ref"]:
1667 num_units
= v
.get("config-units") or 1
1670 await self
.vca_map
[vca_type
].install_configuration_sw(
1672 artifact_path
=artifact_path
,
1675 num_units
=num_units
,
1679 # write in db flag of configuration_sw already installed
1680 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1682 # add relations for this VCA (wait for other peers related with this VCA)
1683 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1684 vca_index
=vca_index
, vca_type
=vca_type
)
1686 # if SSH access is required, then get execution environment SSH public
1687 # if native charm we have waited already to VM be UP
1688 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
1691 # self.logger.debug("get ssh key block")
1692 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1693 # self.logger.debug("ssh key needed")
1694 # Needed to inject a ssh key
1695 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1696 step
= "Install configuration Software, getting public ssh key"
1697 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1699 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1701 # self.logger.debug("no need to get ssh key")
1702 step
= "Waiting to VM being up and getting IP address"
1703 self
.logger
.debug(logging_text
+ step
)
1705 # n2vc_redesign STEP 5.1
1706 # wait for RO (ip-address) Insert pub_key into VM
1709 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1711 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1712 vdu_index
, user
=user
, pub_key
=pub_key
)
1714 rw_mgmt_ip
= None # This is for a NS configuration
1716 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1718 # store rw_mgmt_ip in deploy params for later replacement
1719 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1721 # n2vc_redesign STEP 6 Execute initial config primitive
1722 step
= 'execute initial config primitive'
1724 # wait for dependent primitives execution (NS -> VNF -> VDU)
1725 if initial_config_primitive_list
:
1726 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1728 # stage, in function of element type: vdu, kdu, vnf or ns
1729 my_vca
= vca_deployed_list
[vca_index
]
1730 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1732 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1733 elif my_vca
.get("member-vnf-index"):
1735 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1738 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1740 self
._write
_configuration
_status
(
1742 vca_index
=vca_index
,
1743 status
='EXECUTING PRIMITIVE'
1746 self
._write
_op
_status
(
1751 check_if_terminated_needed
= True
1752 for initial_config_primitive
in initial_config_primitive_list
:
1753 # adding information on the vca_deployed if it is a NS execution environment
1754 if not vca_deployed
["member-vnf-index"]:
1755 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1756 # TODO check if already done
1757 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1759 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1760 self
.logger
.debug(logging_text
+ step
)
1761 await self
.vca_map
[vca_type
].exec_primitive(
1763 primitive_name
=initial_config_primitive
["name"],
1764 params_dict
=primitive_params_
,
1767 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1768 if check_if_terminated_needed
:
1769 if config_descriptor
.get('terminate-config-primitive'):
1770 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1771 check_if_terminated_needed
= False
1773 # TODO register in database that primitive is done
1775 # STEP 7 Configure metrics
1776 if vca_type
== "helm":
1777 prometheus_jobs
= await self
.add_prometheus_metrics(
1779 artifact_path
=artifact_path
,
1780 ee_config_descriptor
=ee_config_descriptor
,
1783 target_ip
=rw_mgmt_ip
,
1786 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1788 step
= "instantiated at VCA"
1789 self
.logger
.debug(logging_text
+ step
)
1791 self
._write
_configuration
_status
(
1793 vca_index
=vca_index
,
1797 except Exception as e
: # TODO not use Exception but N2VC exception
1798 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1799 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1800 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1801 self
._write
_configuration
_status
(
1803 vca_index
=vca_index
,
1806 raise LcmException("{} {}".format(step
, e
)) from e
1808 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1809 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1811 Update db_nsr fields.
1814 :param current_operation:
1815 :param current_operation_id:
1816 :param error_description:
1817 :param error_detail:
1818 :param other_update: Other required changes at database if provided, will be cleared
1822 db_dict
= other_update
or {}
1823 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1824 db_dict
["_admin.current-operation"] = current_operation_id
1825 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1826 db_dict
["currentOperation"] = current_operation
1827 db_dict
["currentOperationID"] = current_operation_id
1828 db_dict
["errorDescription"] = error_description
1829 db_dict
["errorDetail"] = error_detail
1832 db_dict
["nsState"] = ns_state
1833 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1834 except DbException
as e
:
1835 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1837 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1838 operation_state
: str = None, other_update
: dict = None):
1840 db_dict
= other_update
or {}
1841 db_dict
['queuePosition'] = queuePosition
1842 if isinstance(stage
, list):
1843 db_dict
['stage'] = stage
[0]
1844 db_dict
['detailed-status'] = " ".join(stage
)
1845 elif stage
is not None:
1846 db_dict
['stage'] = str(stage
)
1848 if error_message
is not None:
1849 db_dict
['errorMessage'] = error_message
1850 if operation_state
is not None:
1851 db_dict
['operationState'] = operation_state
1852 db_dict
["statusEnteredTime"] = time()
1853 self
.update_db_2("nslcmops", op_id
, db_dict
)
1854 except DbException
as e
:
1855 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1857 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1859 nsr_id
= db_nsr
["_id"]
1860 # configurationStatus
1861 config_status
= db_nsr
.get('configurationStatus')
1863 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1864 enumerate(config_status
) if v
}
1866 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1868 except DbException
as e
:
1869 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1871 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1872 element_under_configuration
: str = None, element_type
: str = None,
1873 other_update
: dict = None):
1875 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1876 # .format(vca_index, status))
1879 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1880 db_dict
= other_update
or {}
1882 db_dict
[db_path
+ 'status'] = status
1883 if element_under_configuration
:
1884 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1886 db_dict
[db_path
+ 'elementType'] = element_type
1887 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1888 except DbException
as e
:
1889 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1890 .format(status
, nsr_id
, vca_index
, e
))
1892 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1894 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1895 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1896 Database is used because the result can be obtained from a different LCM worker in case of HA.
1897 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1898 :param db_nslcmop: database content of nslcmop
1899 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1900 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1901 computed 'vim-account-id'
1904 nslcmop_id
= db_nslcmop
['_id']
1905 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1906 if placement_engine
== "PLA":
1907 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1908 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1909 db_poll_interval
= 5
1910 wait
= db_poll_interval
* 10
1912 while not pla_result
and wait
>= 0:
1913 await asyncio
.sleep(db_poll_interval
)
1914 wait
-= db_poll_interval
1915 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1916 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1919 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1921 for pla_vnf
in pla_result
['vnf']:
1922 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1923 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1926 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1928 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1931 def update_nsrs_with_pla_result(self
, params
):
1933 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1934 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1935 except Exception as e
:
1936 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1938 async def instantiate(self
, nsr_id
, nslcmop_id
):
1941 :param nsr_id: ns instance to deploy
1942 :param nslcmop_id: operation to run
1946 # Try to lock HA task here
1947 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1948 if not task_is_locked_by_me
:
1949 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1952 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1953 self
.logger
.debug(logging_text
+ "Enter")
1955 # get all needed from database
1957 # database nsrs record
1960 # database nslcmops record
1963 # update operation on nsrs
1965 # update operation on nslcmops
1966 db_nslcmop_update
= {}
1968 nslcmop_operation_state
= None
1969 db_vnfrs
= {} # vnf's info indexed by member-index
1971 tasks_dict_info
= {} # from task to info text
1974 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1975 # ^ stage, step, VIM progress
1977 # wait for any previous tasks in process
1978 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1980 stage
[1] = "Sync filesystem from database."
1981 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1983 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1984 stage
[1] = "Reading from database."
1985 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1986 db_nsr_update
["detailed-status"] = "creating"
1987 db_nsr_update
["operational-status"] = "init"
1988 self
._write
_ns
_status
(
1990 ns_state
="BUILDING",
1991 current_operation
="INSTANTIATING",
1992 current_operation_id
=nslcmop_id
,
1993 other_update
=db_nsr_update
1995 self
._write
_op
_status
(
2001 # read from db: operation
2002 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2003 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2004 ns_params
= db_nslcmop
.get("operationParams")
2005 if ns_params
and ns_params
.get("timeout_ns_deploy"):
2006 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
2008 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
2011 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2012 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2013 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
2014 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2016 # nsr_name = db_nsr["name"] # TODO short-name??
2018 # read from db: vnf's of this ns
2019 stage
[1] = "Getting vnfrs from db."
2020 self
.logger
.debug(logging_text
+ stage
[1])
2021 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2023 # read from db: vnfd's for every vnf
2024 db_vnfds_ref
= {} # every vnfd data indexed by vnf name
2025 db_vnfds
= {} # every vnfd data indexed by vnf id
2026 db_vnfds_index
= {} # every vnfd data indexed by vnf member-index
2028 # for each vnf in ns, read vnfd
2029 for vnfr
in db_vnfrs_list
:
2030 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
# vnf's dict indexed by member-index: '1', '2', etc
2031 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
2032 vnfd_ref
= vnfr
["vnfd-ref"] # vnfd name for this vnf
2034 # if we haven't this vnfd, read it from db
2035 if vnfd_id
not in db_vnfds
:
2037 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
2038 self
.logger
.debug(logging_text
+ stage
[1])
2039 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2042 db_vnfds_ref
[vnfd_ref
] = vnfd
# vnfd's indexed by name
2043 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
2044 db_vnfds_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds
[vnfd_id
] # vnfd's indexed by member-index
2046 # Get or generates the _admin.deployed.VCA list
2047 vca_deployed_list
= None
2048 if db_nsr
["_admin"].get("deployed"):
2049 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2050 if vca_deployed_list
is None:
2051 vca_deployed_list
= []
2052 configuration_status_list
= []
2053 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2054 db_nsr_update
["configurationStatus"] = configuration_status_list
2055 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2056 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2057 elif isinstance(vca_deployed_list
, dict):
2058 # maintain backward compatibility. Change a dict to list at database
2059 vca_deployed_list
= list(vca_deployed_list
.values())
2060 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2061 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2063 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
2064 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2065 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2067 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2068 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2069 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2070 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
2072 # n2vc_redesign STEP 2 Deploy Network Scenario
2073 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2074 self
._write
_op
_status
(
2079 stage
[1] = "Deploying KDUs."
2080 # self.logger.debug(logging_text + "Before deploy_kdus")
2081 # Call to deploy_kdus in case exists the "vdu:kdu" param
2082 await self
.deploy_kdus(
2083 logging_text
=logging_text
,
2085 nslcmop_id
=nslcmop_id
,
2088 task_instantiation_info
=tasks_dict_info
,
2091 stage
[1] = "Getting VCA public key."
2092 # n2vc_redesign STEP 1 Get VCA public ssh-key
2093 # feature 1429. Add n2vc public key to needed VMs
2094 n2vc_key
= self
.n2vc
.get_public_key()
2095 n2vc_key_list
= [n2vc_key
]
2096 if self
.vca_config
.get("public_key"):
2097 n2vc_key_list
.append(self
.vca_config
["public_key"])
2099 stage
[1] = "Deploying NS at VIM."
2100 task_ro
= asyncio
.ensure_future(
2101 self
.instantiate_RO(
2102 logging_text
=logging_text
,
2106 db_nslcmop
=db_nslcmop
,
2108 db_vnfds_ref
=db_vnfds_ref
,
2109 n2vc_key_list
=n2vc_key_list
,
2113 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2114 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2116 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2117 stage
[1] = "Deploying Execution Environments."
2118 self
.logger
.debug(logging_text
+ stage
[1])
2120 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2121 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2122 for c_vnf
in get_iterable(nsd
, "constituent-vnfd"):
2123 vnfd_id
= c_vnf
["vnfd-id-ref"]
2124 vnfd
= db_vnfds_ref
[vnfd_id
]
2125 member_vnf_index
= str(c_vnf
["member-vnf-index"])
2126 db_vnfr
= db_vnfrs
[member_vnf_index
]
2127 base_folder
= vnfd
["_admin"]["storage"]
2133 # Get additional parameters
2134 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2135 if db_vnfr
.get("additionalParamsForVnf"):
2136 deploy_params
.update(self
._format
_additional
_params
(db_vnfr
["additionalParamsForVnf"].copy()))
2138 descriptor_config
= vnfd
.get("vnf-configuration")
2139 if descriptor_config
:
2141 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
2144 nslcmop_id
=nslcmop_id
,
2150 member_vnf_index
=member_vnf_index
,
2151 vdu_index
=vdu_index
,
2153 deploy_params
=deploy_params
,
2154 descriptor_config
=descriptor_config
,
2155 base_folder
=base_folder
,
2156 task_instantiation_info
=tasks_dict_info
,
2160 # Deploy charms for each VDU that supports one.
2161 for vdud
in get_iterable(vnfd
, 'vdu'):
2163 descriptor_config
= vdud
.get('vdu-configuration')
2164 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
2165 if vdur
.get("additionalParams"):
2166 deploy_params_vdu
= self
._format
_additional
_params
(vdur
["additionalParams"])
2168 deploy_params_vdu
= deploy_params
2169 deploy_params_vdu
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_id
, vdu_count_index
=0)
2170 if descriptor_config
:
2173 for vdu_index
in range(int(vdud
.get("count", 1))):
2174 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2176 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2177 member_vnf_index
, vdu_id
, vdu_index
),
2180 nslcmop_id
=nslcmop_id
,
2186 member_vnf_index
=member_vnf_index
,
2187 vdu_index
=vdu_index
,
2189 deploy_params
=deploy_params_vdu
,
2190 descriptor_config
=descriptor_config
,
2191 base_folder
=base_folder
,
2192 task_instantiation_info
=tasks_dict_info
,
2195 for kdud
in get_iterable(vnfd
, 'kdu'):
2196 kdu_name
= kdud
["name"]
2197 descriptor_config
= kdud
.get('kdu-configuration')
2198 if descriptor_config
:
2202 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
2203 deploy_params_kdu
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2204 if kdur
.get("additionalParams"):
2205 deploy_params_kdu
= self
._format
_additional
_params
(kdur
["additionalParams"])
2208 logging_text
=logging_text
,
2211 nslcmop_id
=nslcmop_id
,
2217 member_vnf_index
=member_vnf_index
,
2218 vdu_index
=vdu_index
,
2220 deploy_params
=deploy_params_kdu
,
2221 descriptor_config
=descriptor_config
,
2222 base_folder
=base_folder
,
2223 task_instantiation_info
=tasks_dict_info
,
2227 # Check if this NS has a charm configuration
2228 descriptor_config
= nsd
.get("ns-configuration")
2229 if descriptor_config
and descriptor_config
.get("juju"):
2232 member_vnf_index
= None
2238 # Get additional parameters
2239 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2240 if db_nsr
.get("additionalParamsForNs"):
2241 deploy_params
.update(self
._format
_additional
_params
(db_nsr
["additionalParamsForNs"].copy()))
2242 base_folder
= nsd
["_admin"]["storage"]
2244 logging_text
=logging_text
,
2247 nslcmop_id
=nslcmop_id
,
2253 member_vnf_index
=member_vnf_index
,
2254 vdu_index
=vdu_index
,
2256 deploy_params
=deploy_params
,
2257 descriptor_config
=descriptor_config
,
2258 base_folder
=base_folder
,
2259 task_instantiation_info
=tasks_dict_info
,
2263 # rest of staff will be done at finally
2265 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
2266 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
2268 except asyncio
.CancelledError
:
2269 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
2270 exc
= "Operation was cancelled"
2271 except Exception as e
:
2272 exc
= traceback
.format_exc()
2273 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
2276 error_list
.append(str(exc
))
2278 # wait for pending tasks
2280 stage
[1] = "Waiting for instantiate pending tasks."
2281 self
.logger
.debug(logging_text
+ stage
[1])
2282 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2283 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2284 stage
[1] = stage
[2] = ""
2285 except asyncio
.CancelledError
:
2286 error_list
.append("Cancelled")
2287 # TODO cancel all tasks
2288 except Exception as exc
:
2289 error_list
.append(str(exc
))
2291 # update operation-status
2292 db_nsr_update
["operational-status"] = "running"
2293 # let's begin with VCA 'configured' status (later we can change it)
2294 db_nsr_update
["config-status"] = "configured"
2295 for task
, task_name
in tasks_dict_info
.items():
2296 if not task
.done() or task
.cancelled() or task
.exception():
2297 if task_name
.startswith(self
.task_name_deploy_vca
):
2298 # A N2VC task is pending
2299 db_nsr_update
["config-status"] = "failed"
2301 # RO or KDU task is pending
2302 db_nsr_update
["operational-status"] = "failed"
2304 # update status at database
2306 error_detail
= ". ".join(error_list
)
2307 self
.logger
.error(logging_text
+ error_detail
)
2308 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
2309 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
2311 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2312 db_nslcmop_update
["detailed-status"] = error_detail
2313 nslcmop_operation_state
= "FAILED"
2317 error_description_nsr
= error_description_nslcmop
= None
2319 db_nsr_update
["detailed-status"] = "Done"
2320 db_nslcmop_update
["detailed-status"] = "Done"
2321 nslcmop_operation_state
= "COMPLETED"
2324 self
._write
_ns
_status
(
2327 current_operation
="IDLE",
2328 current_operation_id
=None,
2329 error_description
=error_description_nsr
,
2330 error_detail
=error_detail
,
2331 other_update
=db_nsr_update
2333 self
._write
_op
_status
(
2336 error_message
=error_description_nslcmop
,
2337 operation_state
=nslcmop_operation_state
,
2338 other_update
=db_nslcmop_update
,
2341 if nslcmop_operation_state
:
2343 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2344 "operationState": nslcmop_operation_state
},
2346 except Exception as e
:
2347 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2349 self
.logger
.debug(logging_text
+ "Exit")
2350 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2352 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2353 timeout
: int = 3600, vca_type
: str = None) -> bool:
2356 # 1. find all relations for this VCA
2357 # 2. wait for other peers related
2361 vca_type
= vca_type
or "lxc_proxy_charm"
2363 # STEP 1: find all relations for this VCA
2366 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2367 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2370 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2372 # read all ns-configuration relations
2373 ns_relations
= list()
2374 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2376 for r
in db_ns_relations
:
2377 # check if this VCA is in the relation
2378 if my_vca
.get('member-vnf-index') in\
2379 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2380 ns_relations
.append(r
)
2382 # read all vnf-configuration relations
2383 vnf_relations
= list()
2384 db_vnfd_list
= db_nsr
.get('vnfd-id')
2386 for vnfd
in db_vnfd_list
:
2387 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2388 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2389 if db_vnf_relations
:
2390 for r
in db_vnf_relations
:
2391 # check if this VCA is in the relation
2392 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2393 vnf_relations
.append(r
)
2395 # if no relations, terminate
2396 if not ns_relations
and not vnf_relations
:
2397 self
.logger
.debug(logging_text
+ ' No relations')
2400 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2407 if now
- start
>= timeout
:
2408 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2411 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2412 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2414 # for each defined NS relation, find the VCA's related
2415 for r
in ns_relations
.copy():
2416 from_vca_ee_id
= None
2418 from_vca_endpoint
= None
2419 to_vca_endpoint
= None
2420 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2421 for vca
in vca_list
:
2422 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2423 and vca
.get('config_sw_installed'):
2424 from_vca_ee_id
= vca
.get('ee_id')
2425 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2426 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2427 and vca
.get('config_sw_installed'):
2428 to_vca_ee_id
= vca
.get('ee_id')
2429 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2430 if from_vca_ee_id
and to_vca_ee_id
:
2432 await self
.vca_map
[vca_type
].add_relation(
2433 ee_id_1
=from_vca_ee_id
,
2434 ee_id_2
=to_vca_ee_id
,
2435 endpoint_1
=from_vca_endpoint
,
2436 endpoint_2
=to_vca_endpoint
)
2437 # remove entry from relations list
2438 ns_relations
.remove(r
)
2440 # check failed peers
2442 vca_status_list
= db_nsr
.get('configurationStatus')
2444 for i
in range(len(vca_list
)):
2446 vca_status
= vca_status_list
[i
]
2447 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2448 if vca_status
.get('status') == 'BROKEN':
2449 # peer broken: remove relation from list
2450 ns_relations
.remove(r
)
2451 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2452 if vca_status
.get('status') == 'BROKEN':
2453 # peer broken: remove relation from list
2454 ns_relations
.remove(r
)
2459 # for each defined VNF relation, find the VCA's related
2460 for r
in vnf_relations
.copy():
2461 from_vca_ee_id
= None
2463 from_vca_endpoint
= None
2464 to_vca_endpoint
= None
2465 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2466 for vca
in vca_list
:
2467 key_to_check
= "vdu_id"
2468 if vca
.get("vdu_id") is None:
2469 key_to_check
= "vnfd_id"
2470 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2471 from_vca_ee_id
= vca
.get('ee_id')
2472 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2473 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2474 to_vca_ee_id
= vca
.get('ee_id')
2475 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2476 if from_vca_ee_id
and to_vca_ee_id
:
2478 await self
.vca_map
[vca_type
].add_relation(
2479 ee_id_1
=from_vca_ee_id
,
2480 ee_id_2
=to_vca_ee_id
,
2481 endpoint_1
=from_vca_endpoint
,
2482 endpoint_2
=to_vca_endpoint
)
2483 # remove entry from relations list
2484 vnf_relations
.remove(r
)
2486 # check failed peers
2488 vca_status_list
= db_nsr
.get('configurationStatus')
2490 for i
in range(len(vca_list
)):
2492 vca_status
= vca_status_list
[i
]
2493 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2494 if vca_status
.get('status') == 'BROKEN':
2495 # peer broken: remove relation from list
2496 vnf_relations
.remove(r
)
2497 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2498 if vca_status
.get('status') == 'BROKEN':
2499 # peer broken: remove relation from list
2500 vnf_relations
.remove(r
)
2506 await asyncio
.sleep(5.0)
2508 if not ns_relations
and not vnf_relations
:
2509 self
.logger
.debug('Relations added')
2514 except Exception as e
:
2515 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2518 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2519 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2522 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2524 db_dict_install
= {"collection": "nsrs",
2525 "filter": {"_id": nsr_id
},
2526 "path": nsr_db_path
}
2528 kdu_instance
= await self
.k8scluster_map
[k8sclustertype
].install(
2529 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2530 kdu_model
=k8s_instance_info
["kdu-model"],
2533 db_dict
=db_dict_install
,
2535 kdu_name
=k8s_instance_info
["kdu-name"],
2536 namespace
=k8s_instance_info
["namespace"])
2537 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2539 # Obtain services to obtain management service ip
2540 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2541 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2542 kdu_instance
=kdu_instance
,
2543 namespace
=k8s_instance_info
["namespace"])
2545 # Obtain management service info (if exists)
2546 vnfr_update_dict
= {}
2548 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2549 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2550 for mgmt_service
in mgmt_services
:
2551 for service
in services
:
2552 if service
["name"].startswith(mgmt_service
["name"]):
2553 # Mgmt service found, Obtain service ip
2554 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2555 if isinstance(ip
, list) and len(ip
) == 1:
2558 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2560 # Check if must update also mgmt ip at the vnf
2561 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2562 if service_external_cp
:
2563 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2564 vnfr_update_dict
["ip-address"] = ip
2568 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2570 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2571 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2573 kdu_config
= kdud
.get("kdu-configuration")
2574 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2575 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2576 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2578 for initial_config_primitive
in initial_config_primitive_list
:
2579 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2581 await asyncio
.wait_for(
2582 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2583 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2584 kdu_instance
=kdu_instance
,
2585 primitive_name
=initial_config_primitive
["name"],
2586 params
=primitive_params_
, db_dict
=db_dict_install
),
2589 except Exception as e
:
2590 # Prepare update db with error and raise exception
2592 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2593 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2595 # ignore to keep original exception
2597 # reraise original error
2602 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2603 # Launch kdus if present in the descriptor
2605 k8scluster_id_2_uuic
= {"helm-chart": {}, "juju-bundle": {}}
2607 async def _get_cluster_id(cluster_id
, cluster_type
):
2608 nonlocal k8scluster_id_2_uuic
2609 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2610 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2612 # check if K8scluster is creating and wait look if previous tasks in process
2613 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2615 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2616 self
.logger
.debug(logging_text
+ text
)
2617 await asyncio
.wait(task_dependency
, timeout
=3600)
2619 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2620 if not db_k8scluster
:
2621 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2623 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2625 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2627 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2630 logging_text
+= "Deploy kdus: "
2633 db_nsr_update
= {"_admin.deployed.K8s": []}
2634 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2637 updated_cluster_list
= []
2639 for vnfr_data
in db_vnfrs
.values():
2640 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2641 # Step 0: Prepare and set parameters
2642 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
2643 vnfd_id
= vnfr_data
.get('vnfd-id')
2644 kdud
= next(kdud
for kdud
in db_vnfds
[vnfd_id
]["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2645 namespace
= kdur
.get("k8s-namespace")
2646 if kdur
.get("helm-chart"):
2647 kdumodel
= kdur
["helm-chart"]
2648 k8sclustertype
= "helm-chart"
2649 elif kdur
.get("juju-bundle"):
2650 kdumodel
= kdur
["juju-bundle"]
2651 k8sclustertype
= "juju-bundle"
2653 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2654 "juju-bundle. Maybe an old NBI version is running".
2655 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2656 # check if kdumodel is a file and exists
2658 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2659 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2660 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2661 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2663 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2664 kdumodel
= self
.fs
.path
+ filename
2665 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2667 except Exception: # it is not a file
2670 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2671 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2672 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2675 if k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
:
2676 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2677 self
.k8sclusterhelm
.synchronize_repos(cluster_uuid
=cluster_uuid
))
2678 if del_repo_list
or added_repo_dict
:
2679 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2680 updated
= {'_admin.helm_charts_added.' +
2681 item
: name
for item
, name
in added_repo_dict
.items()}
2682 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster '{}' to_delete: {}, "
2683 "to_add: {}".format(k8s_cluster_id
, del_repo_list
,
2685 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2686 updated_cluster_list
.append(cluster_uuid
)
2689 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2690 kdur
["kdu-name"], k8s_cluster_id
)
2691 k8s_instance_info
= {"kdu-instance": None,
2692 "k8scluster-uuid": cluster_uuid
,
2693 "k8scluster-type": k8sclustertype
,
2694 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2695 "kdu-name": kdur
["kdu-name"],
2696 "kdu-model": kdumodel
,
2697 "namespace": namespace
}
2698 db_path
= "_admin.deployed.K8s.{}".format(index
)
2699 db_nsr_update
[db_path
] = k8s_instance_info
2700 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2702 task
= asyncio
.ensure_future(
2703 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, db_vnfds
[vnfd_id
],
2704 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2705 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2706 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2710 except (LcmException
, asyncio
.CancelledError
):
2712 except Exception as e
:
2713 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2714 if isinstance(e
, (N2VCException
, DbException
)):
2715 self
.logger
.error(logging_text
+ msg
)
2717 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2718 raise LcmException(msg
)
2721 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2723 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2724 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2725 base_folder
, task_instantiation_info
, stage
):
2726 # launch instantiate_N2VC in a asyncio task and register task object
2727 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2728 # if not found, create one entry and update database
2729 # fill db_nsr._admin.deployed.VCA.<index>
2731 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2732 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2733 ee_list
= [descriptor_config
]
2734 elif descriptor_config
.get("execution-environment-list"):
2735 ee_list
= descriptor_config
.get("execution-environment-list")
2736 else: # other types as script are not supported
2739 for ee_item
in ee_list
:
2740 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2741 ee_item
.get("helm-chart")))
2742 ee_descriptor_id
= ee_item
.get("id")
2743 if ee_item
.get("juju"):
2744 vca_name
= ee_item
['juju'].get('charm')
2745 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2746 if ee_item
['juju'].get('cloud') == "k8s":
2747 vca_type
= "k8s_proxy_charm"
2748 elif ee_item
['juju'].get('proxy') is False:
2749 vca_type
= "native_charm"
2750 elif ee_item
.get("helm-chart"):
2751 vca_name
= ee_item
['helm-chart']
2754 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2758 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2759 if not vca_deployed
:
2761 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2762 vca_deployed
.get("vdu_id") == vdu_id
and \
2763 vca_deployed
.get("kdu_name") == kdu_name
and \
2764 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2765 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2768 # not found, create one.
2769 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2771 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2773 target
+= "/kdu/{}".format(kdu_name
)
2775 "target_element": target
,
2776 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2777 "member-vnf-index": member_vnf_index
,
2779 "kdu_name": kdu_name
,
2780 "vdu_count_index": vdu_index
,
2781 "operational-status": "init", # TODO revise
2782 "detailed-status": "", # TODO revise
2783 "step": "initial-deploy", # TODO revise
2785 "vdu_name": vdu_name
,
2787 "ee_descriptor_id": ee_descriptor_id
2791 # create VCA and configurationStatus in db
2793 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2794 "configurationStatus.{}".format(vca_index
): dict()
2796 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2798 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2801 task_n2vc
= asyncio
.ensure_future(
2802 self
.instantiate_N2VC(
2803 logging_text
=logging_text
,
2804 vca_index
=vca_index
,
2810 vdu_index
=vdu_index
,
2811 deploy_params
=deploy_params
,
2812 config_descriptor
=descriptor_config
,
2813 base_folder
=base_folder
,
2814 nslcmop_id
=nslcmop_id
,
2818 ee_config_descriptor
=ee_item
2821 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2822 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2823 member_vnf_index
or "", vdu_id
or "")
2826 def _get_terminate_config_primitive(primitive_list
, vca_deployed
):
2827 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2828 it get only those primitives for this execution envirom"""
2830 primitive_list
= primitive_list
or []
2831 # filter primitives by ee_descriptor_id
2832 ee_descriptor_id
= vca_deployed
.get("ee_descriptor_id")
2833 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
2836 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
2838 return primitive_list
2841 def _create_nslcmop(nsr_id
, operation
, params
):
2843 Creates a ns-lcm-opp content to be stored at database.
2844 :param nsr_id: internal id of the instance
2845 :param operation: instantiate, terminate, scale, action, ...
2846 :param params: user parameters for the operation
2847 :return: dictionary following SOL005 format
2849 # Raise exception if invalid arguments
2850 if not (nsr_id
and operation
and params
):
2852 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2858 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2859 "operationState": "PROCESSING",
2860 "statusEnteredTime": now
,
2861 "nsInstanceId": nsr_id
,
2862 "lcmOperationType": operation
,
2864 "isAutomaticInvocation": False,
2865 "operationParams": params
,
2866 "isCancelPending": False,
2868 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2869 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2874 def _format_additional_params(self
, params
):
2875 params
= params
or {}
2876 for key
, value
in params
.items():
2877 if str(value
).startswith("!!yaml "):
2878 params
[key
] = yaml
.safe_load(value
[7:])
2881 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2882 primitive
= seq
.get('name')
2883 primitive_params
= {}
2885 "member_vnf_index": vnf_index
,
2886 "primitive": primitive
,
2887 "primitive_params": primitive_params
,
2890 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2894 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2895 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2896 if op
.get('operationState') == 'COMPLETED':
2897 # b. Skip sub-operation
2898 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2899 return self
.SUBOPERATION_STATUS_SKIP
2901 # c. retry executing sub-operation
2902 # The sub-operation exists, and operationState != 'COMPLETED'
2903 # Update operationState = 'PROCESSING' to indicate a retry.
2904 operationState
= 'PROCESSING'
2905 detailed_status
= 'In progress'
2906 self
._update
_suboperation
_status
(
2907 db_nslcmop
, op_index
, operationState
, detailed_status
)
2908 # Return the sub-operation index
2909 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2910 # with arguments extracted from the sub-operation
2913 # Find a sub-operation where all keys in a matching dictionary must match
2914 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2915 def _find_suboperation(self
, db_nslcmop
, match
):
2916 if db_nslcmop
and match
:
2917 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2918 for i
, op
in enumerate(op_list
):
2919 if all(op
.get(k
) == match
[k
] for k
in match
):
2921 return self
.SUBOPERATION_STATUS_NOT_FOUND
2923 # Update status for a sub-operation given its index
2924 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2925 # Update DB for HA tasks
2926 q_filter
= {'_id': db_nslcmop
['_id']}
2927 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2928 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2929 self
.db
.set_one("nslcmops",
2931 update_dict
=update_dict
,
2932 fail_on_empty
=False)
2934 # Add sub-operation, return the index of the added sub-operation
2935 # Optionally, set operationState, detailed-status, and operationType
2936 # Status and type are currently set for 'scale' sub-operations:
2937 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2938 # 'detailed-status' : status message
2939 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2940 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2941 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2942 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2943 RO_nsr_id
=None, RO_scaling_info
=None):
2945 return self
.SUBOPERATION_STATUS_NOT_FOUND
2946 # Get the "_admin.operations" list, if it exists
2947 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2948 op_list
= db_nslcmop_admin
.get('operations')
2949 # Create or append to the "_admin.operations" list
2950 new_op
= {'member_vnf_index': vnf_index
,
2952 'vdu_count_index': vdu_count_index
,
2953 'primitive': primitive
,
2954 'primitive_params': mapped_primitive_params
}
2956 new_op
['operationState'] = operationState
2958 new_op
['detailed-status'] = detailed_status
2960 new_op
['lcmOperationType'] = operationType
2962 new_op
['RO_nsr_id'] = RO_nsr_id
2964 new_op
['RO_scaling_info'] = RO_scaling_info
2966 # No existing operations, create key 'operations' with current operation as first list element
2967 db_nslcmop_admin
.update({'operations': [new_op
]})
2968 op_list
= db_nslcmop_admin
.get('operations')
2970 # Existing operations, append operation to list
2971 op_list
.append(new_op
)
2973 db_nslcmop_update
= {'_admin.operations': op_list
}
2974 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2975 op_index
= len(op_list
) - 1
2978 # Helper methods for scale() sub-operations
2980 # pre-scale/post-scale:
2981 # Check for 3 different cases:
2982 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2983 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2984 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2985 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2986 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2987 # Find this sub-operation
2988 if RO_nsr_id
and RO_scaling_info
:
2989 operationType
= 'SCALE-RO'
2991 'member_vnf_index': vnf_index
,
2992 'RO_nsr_id': RO_nsr_id
,
2993 'RO_scaling_info': RO_scaling_info
,
2997 'member_vnf_index': vnf_index
,
2998 'primitive': vnf_config_primitive
,
2999 'primitive_params': primitive_params
,
3000 'lcmOperationType': operationType
3002 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3003 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3004 # a. New sub-operation
3005 # The sub-operation does not exist, add it.
3006 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3007 # The following parameters are set to None for all kind of scaling:
3009 vdu_count_index
= None
3011 if RO_nsr_id
and RO_scaling_info
:
3012 vnf_config_primitive
= None
3013 primitive_params
= None
3016 RO_scaling_info
= None
3017 # Initial status for sub-operation
3018 operationState
= 'PROCESSING'
3019 detailed_status
= 'In progress'
3020 # Add sub-operation for pre/post-scaling (zero or more operations)
3021 self
._add
_suboperation
(db_nslcmop
,
3026 vnf_config_primitive
,
3033 return self
.SUBOPERATION_STATUS_NEW
3035 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3036 # or op_index (operationState != 'COMPLETED')
3037 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3039 # Function to return execution_environment id
3041 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3042 # TODO vdu_index_count
3043 for vca
in vca_deployed_list
:
3044 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3047 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
3048 vca_index
, destroy_ee
=True, exec_primitives
=True):
3050 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3051 :param logging_text:
3053 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3054 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3055 :param vca_index: index in the database _admin.deployed.VCA
3056 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3057 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3058 not executed properly
3059 :return: None or exception
3063 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3064 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3068 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3070 # execute terminate_primitives
3072 terminate_primitives
= self
._get
_terminate
_config
_primitive
(
3073 config_descriptor
.get("terminate-config-primitive"), vca_deployed
)
3074 vdu_id
= vca_deployed
.get("vdu_id")
3075 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3076 vdu_name
= vca_deployed
.get("vdu_name")
3077 vnf_index
= vca_deployed
.get("member-vnf-index")
3078 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3079 for seq
in terminate_primitives
:
3080 # For each sequence in list, get primitive and call _ns_execute_primitive()
3081 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3082 vnf_index
, seq
.get("name"))
3083 self
.logger
.debug(logging_text
+ step
)
3084 # Create the primitive for each sequence, i.e. "primitive": "touch"
3085 primitive
= seq
.get('name')
3086 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
3089 self
._add
_suboperation
(db_nslcmop
,
3095 mapped_primitive_params
)
3096 # Sub-operations: Call _ns_execute_primitive() instead of action()
3098 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
3099 mapped_primitive_params
,
3101 except LcmException
:
3102 # this happens when VCA is not deployed. In this case it is not needed to terminate
3104 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
3105 if result
not in result_ok
:
3106 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3107 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
3108 # set that this VCA do not need terminated
3109 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
3110 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
3112 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3113 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3116 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
3118 async def _delete_all_N2VC(self
, db_nsr
: dict):
3119 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
3120 namespace
= "." + db_nsr
["_id"]
3122 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
3123 except N2VCNotFound
: # already deleted. Skip
3125 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
3127 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
3129 Terminates a deployment from RO
3130 :param logging_text:
3131 :param nsr_deployed: db_nsr._admin.deployed
3134 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3135 this method will update only the index 2, but it will write on database the concatenated content of the list
3140 ro_nsr_id
= ro_delete_action
= None
3141 if nsr_deployed
and nsr_deployed
.get("RO"):
3142 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3143 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3146 stage
[2] = "Deleting ns from VIM."
3147 db_nsr_update
["detailed-status"] = " ".join(stage
)
3148 self
._write
_op
_status
(nslcmop_id
, stage
)
3149 self
.logger
.debug(logging_text
+ stage
[2])
3150 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3151 self
._write
_op
_status
(nslcmop_id
, stage
)
3152 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3153 ro_delete_action
= desc
["action_id"]
3154 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3155 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3156 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3157 if ro_delete_action
:
3158 # wait until NS is deleted from VIM
3159 stage
[2] = "Waiting ns deleted from VIM."
3160 detailed_status_old
= None
3161 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
3163 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3164 self
._write
_op
_status
(nslcmop_id
, stage
)
3166 delete_timeout
= 20 * 60 # 20 minutes
3167 while delete_timeout
> 0:
3168 desc
= await self
.RO
.show(
3170 item_id_name
=ro_nsr_id
,
3171 extra_item
="action",
3172 extra_item_id
=ro_delete_action
)
3175 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3177 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3178 if ns_status
== "ERROR":
3179 raise ROclient
.ROClientException(ns_status_info
)
3180 elif ns_status
== "BUILD":
3181 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3182 elif ns_status
== "ACTIVE":
3183 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3184 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3187 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
3188 if stage
[2] != detailed_status_old
:
3189 detailed_status_old
= stage
[2]
3190 db_nsr_update
["detailed-status"] = " ".join(stage
)
3191 self
._write
_op
_status
(nslcmop_id
, stage
)
3192 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3193 await asyncio
.sleep(5, loop
=self
.loop
)
3195 else: # delete_timeout <= 0:
3196 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
3198 except Exception as e
:
3199 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3200 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3201 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3202 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3203 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3204 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
3205 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3206 failed_detail
.append("delete conflict: {}".format(e
))
3207 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
3209 failed_detail
.append("delete error: {}".format(e
))
3210 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
3213 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3214 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3216 stage
[2] = "Deleting nsd from RO."
3217 db_nsr_update
["detailed-status"] = " ".join(stage
)
3218 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3219 self
._write
_op
_status
(nslcmop_id
, stage
)
3220 await self
.RO
.delete("nsd", ro_nsd_id
)
3221 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3222 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3223 except Exception as e
:
3224 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3225 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3226 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3227 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3228 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3229 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3231 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3232 self
.logger
.error(logging_text
+ failed_detail
[-1])
3234 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3235 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3236 if not vnf_deployed
or not vnf_deployed
["id"]:
3239 ro_vnfd_id
= vnf_deployed
["id"]
3240 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3241 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3242 db_nsr_update
["detailed-status"] = " ".join(stage
)
3243 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3244 self
._write
_op
_status
(nslcmop_id
, stage
)
3245 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3246 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3247 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3248 except Exception as e
:
3249 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3250 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3251 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3252 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3253 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3254 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3256 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3257 self
.logger
.error(logging_text
+ failed_detail
[-1])
3260 stage
[2] = "Error deleting from VIM"
3262 stage
[2] = "Deleted from VIM"
3263 db_nsr_update
["detailed-status"] = " ".join(stage
)
3264 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3265 self
._write
_op
_status
(nslcmop_id
, stage
)
3268 raise LcmException("; ".join(failed_detail
))
3270 async def terminate(self
, nsr_id
, nslcmop_id
):
3271 # Try to lock HA task here
3272 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3273 if not task_is_locked_by_me
:
3276 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3277 self
.logger
.debug(logging_text
+ "Enter")
3278 timeout_ns_terminate
= self
.timeout_ns_terminate
3281 operation_params
= None
3283 error_list
= [] # annotates all failed error messages
3284 db_nslcmop_update
= {}
3285 autoremove
= False # autoremove after terminated
3286 tasks_dict_info
= {}
3288 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3289 # ^ contains [stage, step, VIM-status]
3291 # wait for any previous tasks in process
3292 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3294 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3295 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3296 operation_params
= db_nslcmop
.get("operationParams") or {}
3297 if operation_params
.get("timeout_ns_terminate"):
3298 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3299 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3300 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3302 db_nsr_update
["operational-status"] = "terminating"
3303 db_nsr_update
["config-status"] = "terminating"
3304 self
._write
_ns
_status
(
3306 ns_state
="TERMINATING",
3307 current_operation
="TERMINATING",
3308 current_operation_id
=nslcmop_id
,
3309 other_update
=db_nsr_update
3311 self
._write
_op
_status
(
3316 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3317 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3320 stage
[1] = "Getting vnf descriptors from db."
3321 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3322 db_vnfds_from_id
= {}
3323 db_vnfds_from_member_index
= {}
3325 for vnfr
in db_vnfrs_list
:
3326 vnfd_id
= vnfr
["vnfd-id"]
3327 if vnfd_id
not in db_vnfds_from_id
:
3328 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3329 db_vnfds_from_id
[vnfd_id
] = vnfd
3330 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3332 # Destroy individual execution environments when there are terminating primitives.
3333 # Rest of EE will be deleted at once
3334 # TODO - check before calling _destroy_N2VC
3335 # if not operation_params.get("skip_terminate_primitives"):#
3336 # or not vca.get("needed_terminate"):
3337 stage
[0] = "Stage 2/3 execute terminating primitives."
3338 self
.logger
.debug(logging_text
+ stage
[0])
3339 stage
[1] = "Looking execution environment that needs terminate."
3340 self
.logger
.debug(logging_text
+ stage
[1])
3341 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3342 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3343 config_descriptor
= None
3344 if not vca
or not vca
.get("ee_id"):
3346 if not vca
.get("member-vnf-index"):
3348 config_descriptor
= db_nsr
.get("ns-configuration")
3349 elif vca
.get("vdu_id"):
3350 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3351 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3353 config_descriptor
= vdud
.get("vdu-configuration")
3354 elif vca
.get("kdu_name"):
3355 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3356 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3358 config_descriptor
= kdud
.get("kdu-configuration")
3360 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3361 vca_type
= vca
.get("type")
3362 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3363 vca
.get("needed_terminate"))
3364 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3365 # pending native charms
3366 destroy_ee
= True if vca_type
in ("helm", "native_charm") else False
3367 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3368 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3369 task
= asyncio
.ensure_future(
3370 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3371 destroy_ee
, exec_terminate_primitives
))
3372 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3374 # wait for pending tasks of terminate primitives
3376 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3377 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3378 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3380 tasks_dict_info
.clear()
3382 return # raise LcmException("; ".join(error_list))
3384 # remove All execution environments at once
3385 stage
[0] = "Stage 3/3 delete all."
3387 if nsr_deployed
.get("VCA"):
3388 stage
[1] = "Deleting all execution environments."
3389 self
.logger
.debug(logging_text
+ stage
[1])
3390 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3391 timeout
=self
.timeout_charm_delete
))
3392 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3393 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3395 # Delete from k8scluster
3396 stage
[1] = "Deleting KDUs."
3397 self
.logger
.debug(logging_text
+ stage
[1])
3398 # print(nsr_deployed)
3399 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3400 if not kdu
or not kdu
.get("kdu-instance"):
3402 kdu_instance
= kdu
.get("kdu-instance")
3403 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3404 task_delete_kdu_instance
= asyncio
.ensure_future(
3405 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3406 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3407 kdu_instance
=kdu_instance
))
3409 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3410 format(kdu
.get("k8scluster-type")))
3412 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3415 stage
[1] = "Deleting ns from VIM."
3417 task_delete_ro
= asyncio
.ensure_future(
3418 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3420 task_delete_ro
= asyncio
.ensure_future(
3421 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3422 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3424 # rest of staff will be done at finally
3426 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3427 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3429 except asyncio
.CancelledError
:
3430 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3431 exc
= "Operation was cancelled"
3432 except Exception as e
:
3433 exc
= traceback
.format_exc()
3434 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3437 error_list
.append(str(exc
))
3439 # wait for pending tasks
3441 stage
[1] = "Waiting for terminate pending tasks."
3442 self
.logger
.debug(logging_text
+ stage
[1])
3443 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3445 stage
[1] = stage
[2] = ""
3446 except asyncio
.CancelledError
:
3447 error_list
.append("Cancelled")
3448 # TODO cancell all tasks
3449 except Exception as exc
:
3450 error_list
.append(str(exc
))
3451 # update status at database
3453 error_detail
= "; ".join(error_list
)
3454 # self.logger.error(logging_text + error_detail)
3455 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3456 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3458 db_nsr_update
["operational-status"] = "failed"
3459 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3460 db_nslcmop_update
["detailed-status"] = error_detail
3461 nslcmop_operation_state
= "FAILED"
3465 error_description_nsr
= error_description_nslcmop
= None
3466 ns_state
= "NOT_INSTANTIATED"
3467 db_nsr_update
["operational-status"] = "terminated"
3468 db_nsr_update
["detailed-status"] = "Done"
3469 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3470 db_nslcmop_update
["detailed-status"] = "Done"
3471 nslcmop_operation_state
= "COMPLETED"
3474 self
._write
_ns
_status
(
3477 current_operation
="IDLE",
3478 current_operation_id
=None,
3479 error_description
=error_description_nsr
,
3480 error_detail
=error_detail
,
3481 other_update
=db_nsr_update
3483 self
._write
_op
_status
(
3486 error_message
=error_description_nslcmop
,
3487 operation_state
=nslcmop_operation_state
,
3488 other_update
=db_nslcmop_update
,
3490 if ns_state
== "NOT_INSTANTIATED":
3492 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3493 except DbException
as e
:
3494 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3496 if operation_params
:
3497 autoremove
= operation_params
.get("autoremove", False)
3498 if nslcmop_operation_state
:
3500 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3501 "operationState": nslcmop_operation_state
,
3502 "autoremove": autoremove
},
3504 except Exception as e
:
3505 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3507 self
.logger
.debug(logging_text
+ "Exit")
3508 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3510 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3512 error_detail_list
= []
3514 pending_tasks
= list(created_tasks_info
.keys())
3515 num_tasks
= len(pending_tasks
)
3517 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3518 self
._write
_op
_status
(nslcmop_id
, stage
)
3519 while pending_tasks
:
3521 _timeout
= timeout
+ time_start
- time()
3522 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3523 return_when
=asyncio
.FIRST_COMPLETED
)
3524 num_done
+= len(done
)
3525 if not done
: # Timeout
3526 for task
in pending_tasks
:
3527 new_error
= created_tasks_info
[task
] + ": Timeout"
3528 error_detail_list
.append(new_error
)
3529 error_list
.append(new_error
)
3532 if task
.cancelled():
3535 exc
= task
.exception()
3537 if isinstance(exc
, asyncio
.TimeoutError
):
3539 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3540 error_list
.append(created_tasks_info
[task
])
3541 error_detail_list
.append(new_error
)
3542 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3544 self
.logger
.error(logging_text
+ new_error
)
3546 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3547 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + exc_traceback
)
3549 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3550 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3552 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3553 if nsr_id
: # update also nsr
3554 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3555 "errorDetail": ". ".join(error_detail_list
)})
3556 self
._write
_op
_status
(nslcmop_id
, stage
)
3557 return error_detail_list
3560 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3562 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3563 The default-value is used. If it is between < > it look for a value at instantiation_params
3564 :param primitive_desc: portion of VNFD/NSD that describes primitive
3565 :param params: Params provided by user
3566 :param instantiation_params: Instantiation params provided by user
3567 :return: a dictionary with the calculated params
3569 calculated_params
= {}
3570 for parameter
in primitive_desc
.get("parameter", ()):
3571 param_name
= parameter
["name"]
3572 if param_name
in params
:
3573 calculated_params
[param_name
] = params
[param_name
]
3574 elif "default-value" in parameter
or "value" in parameter
:
3575 if "value" in parameter
:
3576 calculated_params
[param_name
] = parameter
["value"]
3578 calculated_params
[param_name
] = parameter
["default-value"]
3579 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3580 and calculated_params
[param_name
].endswith(">"):
3581 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3582 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3584 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3585 format(calculated_params
[param_name
], primitive_desc
["name"]))
3587 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3588 format(param_name
, primitive_desc
["name"]))
3590 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3591 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
], default_flow_style
=True,
3593 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3594 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3595 if parameter
.get("data-type") == "INTEGER":
3597 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3598 except ValueError: # error converting string to int
3600 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3601 elif parameter
.get("data-type") == "BOOLEAN":
3602 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3604 # add always ns_config_info if primitive name is config
3605 if primitive_desc
["name"] == "config":
3606 if "ns_config_info" in instantiation_params
:
3607 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3608 return calculated_params
3610 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3611 ee_descriptor_id
=None):
3612 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3613 for vca
in deployed_vca
:
3616 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3618 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3620 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3622 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3626 # vca_deployed not found
3627 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3628 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3632 ee_id
= vca
.get("ee_id")
3633 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3635 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3636 "execution environment"
3637 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3638 return ee_id
, vca_type
3640 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0,
3641 retries_interval
=30, timeout
=None,
3642 vca_type
=None, db_dict
=None) -> (str, str):
3644 if primitive
== "config":
3645 primitive_params
= {"params": primitive_params
}
3647 vca_type
= vca_type
or "lxc_proxy_charm"
3651 output
= await asyncio
.wait_for(
3652 self
.vca_map
[vca_type
].exec_primitive(
3654 primitive_name
=primitive
,
3655 params_dict
=primitive_params
,
3656 progress_timeout
=self
.timeout_progress_primitive
,
3657 total_timeout
=self
.timeout_primitive
,
3659 timeout
=timeout
or self
.timeout_primitive
)
3662 except asyncio
.CancelledError
:
3664 except Exception as e
: # asyncio.TimeoutError
3665 if isinstance(e
, asyncio
.TimeoutError
):
3669 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3671 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3673 return 'FAILED', str(e
)
3675 return 'COMPLETED', output
3677 except (LcmException
, asyncio
.CancelledError
):
3679 except Exception as e
:
3680 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3682 async def vca_status_refresh(self
, nsr_id
, nslcmop_id
):
3684 Updating the vca_status with latest juju information in nsrs record
3685 :param: nsr_id: Id of the nsr
3686 :param: nslcmop_id: Id of the nslcmop
3690 self
.logger
.debug("Task ns={} action={} Enter".format(nsr_id
, nslcmop_id
))
3691 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3692 if db_nsr
['_admin']['deployed']['K8s']:
3693 for k8s_index
, k8s
in enumerate(db_nsr
['_admin']['deployed']['K8s']):
3694 cluster_uuid
, kdu_instance
= k8s
["k8scluster-uuid"], k8s
["kdu-instance"]
3695 await self
._on
_update
_k
8s
_db
(cluster_uuid
, kdu_instance
, filter={'_id': nsr_id
})
3697 for vca_index
, _
in enumerate(db_nsr
['_admin']['deployed']['VCA']):
3698 table
, filter = "nsrs", {"_id": nsr_id
}
3699 path
= "_admin.deployed.VCA.{}.".format(vca_index
)
3700 await self
._on
_update
_n
2vc
_db
(table
, filter, path
, {})
3702 self
.logger
.debug("Task ns={} action={} Exit".format(nsr_id
, nslcmop_id
))
3703 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_vca_status_refresh")
3705 async def action(self
, nsr_id
, nslcmop_id
):
3707 # Try to lock HA task here
3708 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3709 if not task_is_locked_by_me
:
3712 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3713 self
.logger
.debug(logging_text
+ "Enter")
3714 # get all needed from database
3718 db_nslcmop_update
= {}
3719 nslcmop_operation_state
= None
3720 error_description_nslcmop
= None
3723 # wait for any previous tasks in process
3724 step
= "Waiting for previous operations to terminate"
3725 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3727 self
._write
_ns
_status
(
3730 current_operation
="RUNNING ACTION",
3731 current_operation_id
=nslcmop_id
3734 step
= "Getting information from database"
3735 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3736 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3738 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3739 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3740 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3741 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3742 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3743 primitive
= db_nslcmop
["operationParams"]["primitive"]
3744 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3745 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3748 step
= "Getting vnfr from database"
3749 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3750 step
= "Getting vnfd from database"
3751 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3753 step
= "Getting nsd from database"
3754 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3756 # for backward compatibility
3757 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3758 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3759 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3760 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3762 # look for primitive
3763 config_primitive_desc
= descriptor_configuration
= None
3765 for vdu
in get_iterable(db_vnfd
, "vdu"):
3766 if vdu_id
== vdu
["id"]:
3767 descriptor_configuration
= vdu
.get("vdu-configuration")
3770 for kdu
in get_iterable(db_vnfd
, "kdu"):
3771 if kdu_name
== kdu
["name"]:
3772 descriptor_configuration
= kdu
.get("kdu-configuration")
3775 descriptor_configuration
= db_vnfd
.get("vnf-configuration")
3777 descriptor_configuration
= db_nsd
.get("ns-configuration")
3779 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3780 for config_primitive
in descriptor_configuration
["config-primitive"]:
3781 if config_primitive
["name"] == primitive
:
3782 config_primitive_desc
= config_primitive
3785 if not config_primitive_desc
:
3786 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3787 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3789 primitive_name
= primitive
3790 ee_descriptor_id
= None
3792 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3793 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3797 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3798 desc_params
= self
._format
_additional
_params
(vdur
.get("additionalParams"))
3800 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3801 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
3803 desc_params
= self
._format
_additional
_params
(db_vnfr
.get("additionalParamsForVnf"))
3805 desc_params
= self
._format
_additional
_params
(db_nsr
.get("additionalParamsForNs"))
3808 kdu_action
= True if not deep_get(kdu
, ("kdu-configuration", "juju")) else False
3810 # TODO check if ns is in a proper status
3811 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3812 # kdur and desc_params already set from before
3813 if primitive_params
:
3814 desc_params
.update(primitive_params
)
3815 # TODO Check if we will need something at vnf level
3816 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3817 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3820 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3822 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3823 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3824 raise LcmException(msg
)
3826 db_dict
= {"collection": "nsrs",
3827 "filter": {"_id": nsr_id
},
3828 "path": "_admin.deployed.K8s.{}".format(index
)}
3829 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3830 step
= "Executing kdu {}".format(primitive_name
)
3831 if primitive_name
== "upgrade":
3832 if desc_params
.get("kdu_model"):
3833 kdu_model
= desc_params
.get("kdu_model")
3834 del desc_params
["kdu_model"]
3836 kdu_model
= kdu
.get("kdu-model")
3837 parts
= kdu_model
.split(sep
=":")
3839 kdu_model
= parts
[0]
3841 detailed_status
= await asyncio
.wait_for(
3842 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3843 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3844 kdu_instance
=kdu
.get("kdu-instance"),
3845 atomic
=True, kdu_model
=kdu_model
,
3846 params
=desc_params
, db_dict
=db_dict
,
3847 timeout
=timeout_ns_action
),
3848 timeout
=timeout_ns_action
+ 10)
3849 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3850 elif primitive_name
== "rollback":
3851 detailed_status
= await asyncio
.wait_for(
3852 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3853 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3854 kdu_instance
=kdu
.get("kdu-instance"),
3856 timeout
=timeout_ns_action
)
3857 elif primitive_name
== "status":
3858 detailed_status
= await asyncio
.wait_for(
3859 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3860 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3861 kdu_instance
=kdu
.get("kdu-instance")),
3862 timeout
=timeout_ns_action
)
3864 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3865 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3867 detailed_status
= await asyncio
.wait_for(
3868 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3869 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3870 kdu_instance
=kdu_instance
,
3871 primitive_name
=primitive_name
,
3872 params
=params
, db_dict
=db_dict
,
3873 timeout
=timeout_ns_action
),
3874 timeout
=timeout_ns_action
)
3877 nslcmop_operation_state
= 'COMPLETED'
3879 detailed_status
= ''
3880 nslcmop_operation_state
= 'FAILED'
3882 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3883 member_vnf_index
=vnf_index
,
3885 vdu_count_index
=vdu_count_index
,
3886 ee_descriptor_id
=ee_descriptor_id
)
3887 db_nslcmop_notif
= {"collection": "nslcmops",
3888 "filter": {"_id": nslcmop_id
},
3889 "path": "admin.VCA"}
3890 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3892 primitive
=primitive_name
,
3893 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3894 timeout
=timeout_ns_action
,
3896 db_dict
=db_nslcmop_notif
)
3898 db_nslcmop_update
["detailed-status"] = detailed_status
3899 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3900 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3902 return # database update is called inside finally
3904 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3905 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3907 except asyncio
.CancelledError
:
3908 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3909 exc
= "Operation was cancelled"
3910 except asyncio
.TimeoutError
:
3911 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3913 except Exception as e
:
3914 exc
= traceback
.format_exc()
3915 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3918 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3919 "FAILED {}: {}".format(step
, exc
)
3920 nslcmop_operation_state
= "FAILED"
3922 self
._write
_ns
_status
(
3924 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3925 current_operation
="IDLE",
3926 current_operation_id
=None,
3927 # error_description=error_description_nsr,
3928 # error_detail=error_detail,
3929 other_update
=db_nsr_update
3932 self
._write
_op
_status
(
3935 error_message
=error_description_nslcmop
,
3936 operation_state
=nslcmop_operation_state
,
3937 other_update
=db_nslcmop_update
,
3940 if nslcmop_operation_state
:
3942 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3943 "operationState": nslcmop_operation_state
},
3945 except Exception as e
:
3946 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3947 self
.logger
.debug(logging_text
+ "Exit")
3948 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3949 return nslcmop_operation_state
, detailed_status
3951 async def scale(self
, nsr_id
, nslcmop_id
):
3953 # Try to lock HA task here
3954 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3955 if not task_is_locked_by_me
:
3958 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3959 self
.logger
.debug(logging_text
+ "Enter")
3960 # get all needed from database
3963 db_nslcmop_update
= {}
3964 nslcmop_operation_state
= None
3967 # in case of error, indicates what part of scale was failed to put nsr at error status
3968 scale_process
= None
3969 old_operational_status
= ""
3970 old_config_status
= ""
3972 # wait for any previous tasks in process
3973 step
= "Waiting for previous operations to terminate"
3974 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3976 self
._write
_ns
_status
(
3979 current_operation
="SCALING",
3980 current_operation_id
=nslcmop_id
3983 step
= "Getting nslcmop from database"
3984 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3985 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3986 step
= "Getting nsr from database"
3987 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3989 old_operational_status
= db_nsr
["operational-status"]
3990 old_config_status
= db_nsr
["config-status"]
3991 step
= "Parsing scaling parameters"
3992 # self.logger.debug(step)
3993 db_nsr_update
["operational-status"] = "scaling"
3994 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3995 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3998 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3999 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
4000 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4001 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4002 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
4005 RO_nsr_id
= nsr_deployed
["RO"]["nsr_id"]
4006 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
4007 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
4008 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
4009 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
4011 # for backward compatibility
4012 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
4013 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
4014 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
4015 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4017 step
= "Getting vnfr from database"
4018 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
4019 step
= "Getting vnfd from database"
4020 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4022 step
= "Getting scaling-group-descriptor"
4023 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
4024 if scaling_descriptor
["name"] == scaling_group
:
4027 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4028 "at vnfd:scaling-group-descriptor".format(scaling_group
))
4031 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4032 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4033 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4036 # TODO check if ns is in a proper status
4037 step
= "Sending scale order to VIM"
4039 if not db_nsr
["_admin"].get("scaling-group"):
4040 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
4041 admin_scale_index
= 0
4043 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
4044 if admin_scale_info
["name"] == scaling_group
:
4045 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
4047 else: # not found, set index one plus last element and add new entry with the name
4048 admin_scale_index
+= 1
4049 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
4050 RO_scaling_info
= []
4051 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
4052 if scaling_type
== "SCALE_OUT":
4053 # count if max-instance-count is reached
4054 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
4055 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4056 if nb_scale_op
>= max_instance_count
:
4057 raise LcmException("reached the limit of {} (max-instance-count) "
4058 "scaling-out operations for the "
4059 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4062 vdu_scaling_info
["scaling_direction"] = "OUT"
4063 vdu_scaling_info
["vdu-create"] = {}
4064 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4065 vdud
= next(vdu
for vdu
in db_vnfd
.get("vdu") if vdu
["id"] == vdu_scale_info
["vdu-id-ref"])
4066 vdu_index
= len([x
for x
in db_vnfr
.get("vdur", ())
4067 if x
.get("vdu-id-ref") == vdu_scale_info
["vdu-id-ref"] and
4068 x
.get("member-vnf-index-ref") == vnf_index
])
4069 cloud_init_text
= self
._get
_cloud
_init
(vdud
, db_vnfd
)
4071 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
4072 cloud_init_list
= []
4073 for x
in range(vdu_scale_info
.get("count", 1)):
4075 # TODO Information of its own ip is not available because db_vnfr is not updated.
4076 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_scale_info
["vdu-id-ref"],
4078 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
,
4079 db_vnfd
["id"], vdud
["id"]))
4080 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4081 "type": "create", "count": vdu_scale_info
.get("count", 1)})
4083 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
4084 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4086 elif scaling_type
== "SCALE_IN":
4087 # count if min-instance-count is reached
4088 min_instance_count
= 0
4089 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
4090 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
4091 if nb_scale_op
<= min_instance_count
:
4092 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4093 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4095 vdu_scaling_info
["scaling_direction"] = "IN"
4096 vdu_scaling_info
["vdu-delete"] = {}
4097 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4098 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4099 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
4100 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4102 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4103 vdu_create
= vdu_scaling_info
.get("vdu-create")
4104 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
4105 if vdu_scaling_info
["scaling_direction"] == "IN":
4106 for vdur
in reversed(db_vnfr
["vdur"]):
4107 if vdu_delete
.get(vdur
["vdu-id-ref"]):
4108 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
4109 vdu_scaling_info
["vdu"].append({
4110 "name": vdur
["name"],
4111 "vdu_id": vdur
["vdu-id-ref"],
4114 for interface
in vdur
["interfaces"]:
4115 vdu_scaling_info
["vdu"][-1]["interface"].append({
4116 "name": interface
["name"],
4117 "ip_address": interface
["ip-address"],
4118 "mac_address": interface
.get("mac-address"),
4120 vdu_delete
= vdu_scaling_info
.pop("vdu-delete")
4123 step
= "Executing pre-scale vnf-config-primitive"
4124 if scaling_descriptor
.get("scaling-config-action"):
4125 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4126 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
4127 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
4128 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4129 step
= db_nslcmop_update
["detailed-status"] = \
4130 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4132 # look for primitive
4133 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4134 if config_primitive
["name"] == vnf_config_primitive
:
4138 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4139 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4140 "primitive".format(scaling_group
, vnf_config_primitive
))
4142 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4143 if db_vnfr
.get("additionalParamsForVnf"):
4144 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4146 scale_process
= "VCA"
4147 db_nsr_update
["config-status"] = "configuring pre-scaling"
4148 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4150 # Pre-scale retry check: Check if this sub-operation has been executed before
4151 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4152 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
4153 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4154 # Skip sub-operation
4155 result
= 'COMPLETED'
4156 result_detail
= 'Done'
4157 self
.logger
.debug(logging_text
+
4158 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4159 vnf_config_primitive
, result
, result_detail
))
4161 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4162 # New sub-operation: Get index of this sub-operation
4163 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4164 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4165 format(vnf_config_primitive
))
4167 # retry: Get registered params for this existing sub-operation
4168 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4169 vnf_index
= op
.get('member_vnf_index')
4170 vnf_config_primitive
= op
.get('primitive')
4171 primitive_params
= op
.get('primitive_params')
4172 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4173 format(vnf_config_primitive
))
4174 # Execute the primitive, either with new (first-time) or registered (reintent) args
4175 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4176 primitive_name
= config_primitive
.get("execution-environment-primitive",
4177 vnf_config_primitive
)
4178 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4179 member_vnf_index
=vnf_index
,
4181 vdu_count_index
=None,
4182 ee_descriptor_id
=ee_descriptor_id
)
4183 result
, result_detail
= await self
._ns
_execute
_primitive
(
4184 ee_id
, primitive_name
, primitive_params
, vca_type
)
4185 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4186 vnf_config_primitive
, result
, result_detail
))
4187 # Update operationState = COMPLETED | FAILED
4188 self
._update
_suboperation
_status
(
4189 db_nslcmop
, op_index
, result
, result_detail
)
4191 if result
== "FAILED":
4192 raise LcmException(result_detail
)
4193 db_nsr_update
["config-status"] = old_config_status
4194 scale_process
= None
4198 # Should this block be skipped if 'RO_nsr_id' == None ?
4199 # if (RO_nsr_id and RO_scaling_info):
4201 scale_process
= "RO"
4202 # Scale RO retry check: Check if this sub-operation has been executed before
4203 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4204 db_nslcmop
, vnf_index
, None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
4205 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4206 # Skip sub-operation
4207 result
= 'COMPLETED'
4208 result_detail
= 'Done'
4209 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(
4210 result
, result_detail
))
4212 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4213 # New sub-operation: Get index of this sub-operation
4214 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4215 self
.logger
.debug(logging_text
+ "New sub-operation RO")
4217 # retry: Get registered params for this existing sub-operation
4218 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4219 RO_nsr_id
= op
.get('RO_nsr_id')
4220 RO_scaling_info
= op
.get('RO_scaling_info')
4221 self
.logger
.debug(logging_text
+ "Sub-operation RO retry for primitive {}".format(
4222 vnf_config_primitive
))
4224 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
4225 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
4226 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
4228 RO_nslcmop_id
= RO_desc
["instance_action_id"]
4229 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
4231 RO_task_done
= False
4232 step
= detailed_status
= "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id
)
4233 detailed_status_old
= None
4234 self
.logger
.debug(logging_text
+ step
)
4236 deployment_timeout
= 1 * 3600 # One hour
4237 while deployment_timeout
> 0:
4238 if not RO_task_done
:
4239 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
4240 extra_item_id
=RO_nslcmop_id
)
4243 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4245 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4246 if ns_status
== "ERROR":
4247 raise ROclient
.ROClientException(ns_status_info
)
4248 elif ns_status
== "BUILD":
4249 detailed_status
= step
+ "; {}".format(ns_status_info
)
4250 elif ns_status
== "ACTIVE":
4252 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
4253 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
4254 self
.logger
.debug(logging_text
+ step
)
4256 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
4258 desc
= await self
.RO
.show("ns", RO_nsr_id
)
4259 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
4261 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4263 if ns_status
== "ERROR":
4264 raise ROclient
.ROClientException(ns_status_info
)
4265 elif ns_status
== "BUILD":
4266 detailed_status
= step
+ "; {}".format(ns_status_info
)
4267 elif ns_status
== "ACTIVE":
4268 step
= detailed_status
= \
4269 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4271 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4272 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4274 except LcmExceptionNoMgmtIP
:
4277 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4278 if detailed_status
!= detailed_status_old
:
4279 self
._update
_suboperation
_status
(
4280 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4281 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4282 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4284 await asyncio
.sleep(5, loop
=self
.loop
)
4285 deployment_timeout
-= 5
4286 if deployment_timeout
<= 0:
4287 self
._update
_suboperation
_status
(
4288 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4289 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4291 # update VDU_SCALING_INFO with the obtained ip_addresses
4292 if vdu_scaling_info
["scaling_direction"] == "OUT":
4293 for vdur
in reversed(db_vnfr
["vdur"]):
4294 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4295 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4296 vdu_scaling_info
["vdu"].append({
4297 "name": vdur
["name"],
4298 "vdu_id": vdur
["vdu-id-ref"],
4301 for interface
in vdur
["interfaces"]:
4302 vdu_scaling_info
["vdu"][-1]["interface"].append({
4303 "name": interface
["name"],
4304 "ip_address": interface
["ip-address"],
4305 "mac_address": interface
.get("mac-address"),
4307 del vdu_scaling_info
["vdu-create"]
4309 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4312 scale_process
= None
4314 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4317 # execute primitive service POST-SCALING
4318 step
= "Executing post-scale vnf-config-primitive"
4319 if scaling_descriptor
.get("scaling-config-action"):
4320 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4321 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4322 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4323 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4324 step
= db_nslcmop_update
["detailed-status"] = \
4325 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4327 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4328 if db_vnfr
.get("additionalParamsForVnf"):
4329 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4331 # look for primitive
4332 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4333 if config_primitive
["name"] == vnf_config_primitive
:
4337 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4338 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4339 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4340 scale_process
= "VCA"
4341 db_nsr_update
["config-status"] = "configuring post-scaling"
4342 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4344 # Post-scale retry check: Check if this sub-operation has been executed before
4345 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4346 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4347 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4348 # Skip sub-operation
4349 result
= 'COMPLETED'
4350 result_detail
= 'Done'
4351 self
.logger
.debug(logging_text
+
4352 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4353 format(vnf_config_primitive
, result
, result_detail
))
4355 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4356 # New sub-operation: Get index of this sub-operation
4357 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4358 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4359 format(vnf_config_primitive
))
4361 # retry: Get registered params for this existing sub-operation
4362 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4363 vnf_index
= op
.get('member_vnf_index')
4364 vnf_config_primitive
= op
.get('primitive')
4365 primitive_params
= op
.get('primitive_params')
4366 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4367 format(vnf_config_primitive
))
4368 # Execute the primitive, either with new (first-time) or registered (reintent) args
4369 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4370 primitive_name
= config_primitive
.get("execution-environment-primitive",
4371 vnf_config_primitive
)
4372 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4373 member_vnf_index
=vnf_index
,
4375 vdu_count_index
=None,
4376 ee_descriptor_id
=ee_descriptor_id
)
4377 result
, result_detail
= await self
._ns
_execute
_primitive
(
4378 ee_id
, primitive_name
, primitive_params
, vca_type
)
4379 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4380 vnf_config_primitive
, result
, result_detail
))
4381 # Update operationState = COMPLETED | FAILED
4382 self
._update
_suboperation
_status
(
4383 db_nslcmop
, op_index
, result
, result_detail
)
4385 if result
== "FAILED":
4386 raise LcmException(result_detail
)
4387 db_nsr_update
["config-status"] = old_config_status
4388 scale_process
= None
4391 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4392 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4393 else old_operational_status
4394 db_nsr_update
["config-status"] = old_config_status
4396 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
4397 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4399 except asyncio
.CancelledError
:
4400 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4401 exc
= "Operation was cancelled"
4402 except Exception as e
:
4403 exc
= traceback
.format_exc()
4404 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4406 self
._write
_ns
_status
(
4409 current_operation
="IDLE",
4410 current_operation_id
=None
4413 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4414 nslcmop_operation_state
= "FAILED"
4416 db_nsr_update
["operational-status"] = old_operational_status
4417 db_nsr_update
["config-status"] = old_config_status
4418 db_nsr_update
["detailed-status"] = ""
4420 if "VCA" in scale_process
:
4421 db_nsr_update
["config-status"] = "failed"
4422 if "RO" in scale_process
:
4423 db_nsr_update
["operational-status"] = "failed"
4424 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4427 error_description_nslcmop
= None
4428 nslcmop_operation_state
= "COMPLETED"
4429 db_nslcmop_update
["detailed-status"] = "Done"
4431 self
._write
_op
_status
(
4434 error_message
=error_description_nslcmop
,
4435 operation_state
=nslcmop_operation_state
,
4436 other_update
=db_nslcmop_update
,
4439 self
._write
_ns
_status
(
4442 current_operation
="IDLE",
4443 current_operation_id
=None,
4444 other_update
=db_nsr_update
4447 if nslcmop_operation_state
:
4449 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
4450 "operationState": nslcmop_operation_state
},
4453 # await asyncio.sleep(cooldown_time, loop=self.loop)
4454 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4455 except Exception as e
:
4456 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4457 self
.logger
.debug(logging_text
+ "Exit")
4458 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4460 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4461 if not self
.prometheus
:
4463 # look if exist a file called 'prometheus*.j2' and
4464 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4465 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4468 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4472 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4473 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4475 vnfr_id
= vnfr_id
.replace("-", "")
4477 "JOB_NAME": vnfr_id
,
4478 "TARGET_IP": target_ip
,
4479 "EXPORTER_POD_IP": host_name
,
4480 "EXPORTER_POD_PORT": host_port
,
4482 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4483 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4484 for job
in job_list
:
4485 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4486 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4487 job
["nsr_id"] = nsr_id
4488 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4489 if await self
.prometheus
.update(job_dict
):
4490 return list(job_dict
.keys())