1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, Template
, meta
, TemplateError
, TemplateNotFound
, TemplateSyntaxError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from n2vc
.k8s_helm_conn
import K8sHelmConnector
31 from n2vc
.k8s_juju_conn
import K8sJujuConnector
33 from osm_common
.dbbase
import DbException
34 from osm_common
.fsbase
import FsException
36 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
37 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
39 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
41 from copy
import copy
, deepcopy
42 from http
import HTTPStatus
44 from uuid
import uuid4
45 from functools
import partial
47 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
50 class N2VCJujuConnectorLCM(N2VCJujuConnector
):
52 async def create_execution_environment(self
, namespace
: str, db_dict
: dict, reuse_ee_id
: str = None,
53 progress_timeout
: float = None, total_timeout
: float = None,
54 artifact_path
: str = None, vca_type
: str = None) -> (str, dict):
55 # admit two new parameters, artifact_path and vca_type
56 if vca_type
== "k8s_proxy_charm":
57 ee_id
= await self
.n2vc
.install_k8s_proxy_charm(
58 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
60 artifact_path
=artifact_path
,
64 return await super().create_execution_environment(
65 namespace
=namespace
, db_dict
=db_dict
, reuse_ee_id
=reuse_ee_id
,
66 progress_timeout
=progress_timeout
, total_timeout
=total_timeout
)
68 async def install_configuration_sw(self
, ee_id
: str, artifact_path
: str, db_dict
: dict,
69 progress_timeout
: float = None, total_timeout
: float = None,
70 config
: dict = None, num_units
: int = 1, vca_type
: str = "lxc_proxy_charm"):
71 if vca_type
== "k8s_proxy_charm":
73 return await super().install_configuration_sw(
74 ee_id
=ee_id
, artifact_path
=artifact_path
, db_dict
=db_dict
, progress_timeout
=progress_timeout
,
75 total_timeout
=total_timeout
, config
=config
, num_units
=num_units
)
79 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
80 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
81 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
82 timeout_charm_delete
= 10 * 60
83 timeout_primitive
= 30 * 60 # timeout for primitive execution
84 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
86 SUBOPERATION_STATUS_NOT_FOUND
= -1
87 SUBOPERATION_STATUS_NEW
= -2
88 SUBOPERATION_STATUS_SKIP
= -3
89 task_name_deploy_vca
= "Deploying VCA"
91 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
):
93 Init, Connect to database, filesystem storage, and messaging
94 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
101 logger
=logging
.getLogger('lcm.ns')
105 self
.lcm_tasks
= lcm_tasks
106 self
.timeout
= config
["timeout"]
107 self
.ro_config
= config
["ro_config"]
108 self
.ng_ro
= config
["ro_config"].get("ng")
109 self
.vca_config
= config
["VCA"].copy()
111 # create N2VC connector
112 self
.n2vc
= N2VCJujuConnectorLCM(
117 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
118 username
=self
.vca_config
.get('user', None),
119 vca_config
=self
.vca_config
,
120 on_update_db
=self
._on
_update
_n
2vc
_db
123 self
.conn_helm_ee
= LCMHelmConn(
130 vca_config
=self
.vca_config
,
131 on_update_db
=self
._on
_update
_n
2vc
_db
134 self
.k8sclusterhelm
= K8sHelmConnector(
135 kubectl_command
=self
.vca_config
.get("kubectlpath"),
136 helm_command
=self
.vca_config
.get("helmpath"),
143 self
.k8sclusterjuju
= K8sJujuConnector(
144 kubectl_command
=self
.vca_config
.get("kubectlpath"),
145 juju_command
=self
.vca_config
.get("jujupath"),
152 self
.k8scluster_map
= {
153 "helm-chart": self
.k8sclusterhelm
,
154 "chart": self
.k8sclusterhelm
,
155 "juju-bundle": self
.k8sclusterjuju
,
156 "juju": self
.k8sclusterjuju
,
160 "lxc_proxy_charm": self
.n2vc
,
161 "native_charm": self
.n2vc
,
162 "k8s_proxy_charm": self
.n2vc
,
163 "helm": self
.conn_helm_ee
168 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
170 self
.RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
172 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
174 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
177 # TODO filter RO descriptor fields...
181 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
182 db_dict
['deploymentStatus'] = ro_descriptor
183 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
185 except Exception as e
:
186 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
188 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
190 # remove last dot from path (if exists)
191 if path
.endswith('.'):
194 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
195 # .format(table, filter, path, updated_data))
199 nsr_id
= filter.get('_id')
201 # read ns record from database
202 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
203 current_ns_status
= nsr
.get('nsState')
205 # get vca status for NS
206 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
210 db_dict
['vcaStatus'] = status_dict
212 # update configurationStatus for this VCA
214 vca_index
= int(path
[path
.rfind(".")+1:])
216 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
217 vca_status
= vca_list
[vca_index
].get('status')
219 configuration_status_list
= nsr
.get('configurationStatus')
220 config_status
= configuration_status_list
[vca_index
].get('status')
222 if config_status
== 'BROKEN' and vca_status
!= 'failed':
223 db_dict
['configurationStatus'][vca_index
] = 'READY'
224 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
225 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
226 except Exception as e
:
227 # not update configurationStatus
228 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
230 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
231 # if nsState = 'DEGRADED' check if all is OK
233 if current_ns_status
in ('READY', 'DEGRADED'):
234 error_description
= ''
236 if status_dict
.get('machines'):
237 for machine_id
in status_dict
.get('machines'):
238 machine
= status_dict
.get('machines').get(machine_id
)
239 # check machine agent-status
240 if machine
.get('agent-status'):
241 s
= machine
.get('agent-status').get('status')
244 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
245 # check machine instance status
246 if machine
.get('instance-status'):
247 s
= machine
.get('instance-status').get('status')
250 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
252 if status_dict
.get('applications'):
253 for app_id
in status_dict
.get('applications'):
254 app
= status_dict
.get('applications').get(app_id
)
255 # check application status
256 if app
.get('status'):
257 s
= app
.get('status').get('status')
260 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
262 if error_description
:
263 db_dict
['errorDescription'] = error_description
264 if current_ns_status
== 'READY' and is_degraded
:
265 db_dict
['nsState'] = 'DEGRADED'
266 if current_ns_status
== 'DEGRADED' and not is_degraded
:
267 db_dict
['nsState'] = 'READY'
270 self
.update_db_2("nsrs", nsr_id
, db_dict
)
272 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
274 except Exception as e
:
275 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
277 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
279 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
280 :param vnfd: input vnfd
281 :param new_id: overrides vnf id if provided
282 :param additionalParams: Instantiation params for VNFs provided
283 :param nsrId: Id of the NSR
284 :return: copy of vnfd
287 vnfd_RO
= deepcopy(vnfd
)
288 # remove unused by RO configuration, monitoring, scaling and internal keys
289 vnfd_RO
.pop("_id", None)
290 vnfd_RO
.pop("_admin", None)
291 vnfd_RO
.pop("vnf-configuration", None)
292 vnfd_RO
.pop("monitoring-param", None)
293 vnfd_RO
.pop("scaling-group-descriptor", None)
294 vnfd_RO
.pop("kdu", None)
295 vnfd_RO
.pop("k8s-cluster", None)
297 vnfd_RO
["id"] = new_id
299 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
300 for vdu
in get_iterable(vnfd_RO
, "vdu"):
301 cloud_init_file
= None
302 if vdu
.get("cloud-init-file"):
303 base_folder
= vnfd
["_admin"]["storage"]
304 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
305 vdu
["cloud-init-file"])
306 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
307 cloud_init_content
= ci_file
.read()
308 vdu
.pop("cloud-init-file", None)
309 elif vdu
.get("cloud-init"):
310 cloud_init_content
= vdu
["cloud-init"]
315 ast
= env
.parse(cloud_init_content
)
316 mandatory_vars
= meta
.find_undeclared_variables(ast
)
318 for var
in mandatory_vars
:
319 if not additionalParams
or var
not in additionalParams
.keys():
320 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
321 "file, must be provided in the instantiation parameters inside the "
322 "'additionalParamsForVnf' block".format(var
, vnfd
["id"], vdu
["id"]))
323 template
= Template(cloud_init_content
)
324 cloud_init_content
= template
.render(additionalParams
or {})
325 vdu
["cloud-init"] = cloud_init_content
328 except FsException
as e
:
329 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
330 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
331 except (TemplateError
, TemplateNotFound
, TemplateSyntaxError
) as e
:
332 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
333 format(vnfd
["id"], vdu
["id"], e
))
335 def _ns_params_2_RO(self
, ns_params
, nsd
, vnfd_dict
, db_vnfrs
, n2vc_key_list
):
337 Creates a RO ns descriptor from OSM ns_instantiate params
338 :param ns_params: OSM instantiate params
339 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
340 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
341 :return: The RO ns descriptor
345 # TODO feature 1417: Check that no instantiation is set over PDU
346 # check if PDU forces a concrete vim-network-id and add it
347 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
349 def vim_account_2_RO(vim_account
):
350 if vim_account
in vim_2_RO
:
351 return vim_2_RO
[vim_account
]
353 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
354 if db_vim
["_admin"]["operationalState"] != "ENABLED":
355 raise LcmException("VIM={} is not available. operationalState={}".format(
356 vim_account
, db_vim
["_admin"]["operationalState"]))
357 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
358 vim_2_RO
[vim_account
] = RO_vim_id
361 def wim_account_2_RO(wim_account
):
362 if isinstance(wim_account
, str):
363 if wim_account
in wim_2_RO
:
364 return wim_2_RO
[wim_account
]
366 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
367 if db_wim
["_admin"]["operationalState"] != "ENABLED":
368 raise LcmException("WIM={} is not available. operationalState={}".format(
369 wim_account
, db_wim
["_admin"]["operationalState"]))
370 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
371 wim_2_RO
[wim_account
] = RO_wim_id
376 def ip_profile_2_RO(ip_profile
):
377 RO_ip_profile
= deepcopy((ip_profile
))
378 if "dns-server" in RO_ip_profile
:
379 if isinstance(RO_ip_profile
["dns-server"], list):
380 RO_ip_profile
["dns-address"] = []
381 for ds
in RO_ip_profile
.pop("dns-server"):
382 RO_ip_profile
["dns-address"].append(ds
['address'])
384 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
385 if RO_ip_profile
.get("ip-version") == "ipv4":
386 RO_ip_profile
["ip-version"] = "IPv4"
387 if RO_ip_profile
.get("ip-version") == "ipv6":
388 RO_ip_profile
["ip-version"] = "IPv6"
389 if "dhcp-params" in RO_ip_profile
:
390 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
396 # "name": ns_params["nsName"],
397 # "description": ns_params.get("nsDescription"),
398 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
399 "wim_account": wim_account_2_RO(ns_params
.get("wimAccountId")),
400 # "scenario": ns_params["nsdId"],
402 # set vim_account of each vnf if different from general vim_account.
403 # Get this information from <vnfr> database content, key vim-account-id
404 # Vim account can be set by placement_engine and it may be different from
405 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
406 for vnf_index
, vnfr
in db_vnfrs
.items():
407 if vnfr
.get("vim-account-id") and vnfr
["vim-account-id"] != ns_params
["vimAccountId"]:
408 populate_dict(RO_ns_params
, ("vnfs", vnf_index
, "datacenter"), vim_account_2_RO(vnfr
["vim-account-id"]))
410 n2vc_key_list
= n2vc_key_list
or []
411 for vnfd_ref
, vnfd
in vnfd_dict
.items():
412 vdu_needed_access
= []
414 if vnfd
.get("vnf-configuration"):
415 ssh_required
= deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required"))
416 if ssh_required
and vnfd
.get("mgmt-interface"):
417 if vnfd
["mgmt-interface"].get("vdu-id"):
418 vdu_needed_access
.append(vnfd
["mgmt-interface"]["vdu-id"])
419 elif vnfd
["mgmt-interface"].get("cp"):
420 mgmt_cp
= vnfd
["mgmt-interface"]["cp"]
422 for vdu
in vnfd
.get("vdu", ()):
423 if vdu
.get("vdu-configuration"):
424 ssh_required
= deep_get(vdu
, ("vdu-configuration", "config-access", "ssh-access", "required"))
426 vdu_needed_access
.append(vdu
["id"])
428 for vdu_interface
in vdu
.get("interface"):
429 if vdu_interface
.get("external-connection-point-ref") and \
430 vdu_interface
["external-connection-point-ref"] == mgmt_cp
:
431 vdu_needed_access
.append(vdu
["id"])
435 if vdu_needed_access
:
436 for vnf_member
in nsd
.get("constituent-vnfd"):
437 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
439 for vdu
in vdu_needed_access
:
440 populate_dict(RO_ns_params
,
441 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
, "mgmt_keys"),
444 if ns_params
.get("vduImage"):
445 RO_ns_params
["vduImage"] = ns_params
["vduImage"]
447 if ns_params
.get("ssh_keys"):
448 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh_keys"]}
449 for vnf_params
in get_iterable(ns_params
, "vnf"):
450 for constituent_vnfd
in nsd
["constituent-vnfd"]:
451 if constituent_vnfd
["member-vnf-index"] == vnf_params
["member-vnf-index"]:
452 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
455 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
456 "constituent-vnfd".format(vnf_params
["member-vnf-index"]))
458 for vdu_params
in get_iterable(vnf_params
, "vdu"):
459 # TODO feature 1417: check that this VDU exist and it is not a PDU
460 if vdu_params
.get("volume"):
461 for volume_params
in vdu_params
["volume"]:
462 if volume_params
.get("vim-volume-id"):
463 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
464 vdu_params
["id"], "devices", volume_params
["name"], "vim_id"),
465 volume_params
["vim-volume-id"])
466 if vdu_params
.get("interface"):
467 for interface_params
in vdu_params
["interface"]:
468 if interface_params
.get("ip-address"):
469 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
470 vdu_params
["id"], "interfaces", interface_params
["name"],
472 interface_params
["ip-address"])
473 if interface_params
.get("mac-address"):
474 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
475 vdu_params
["id"], "interfaces", interface_params
["name"],
477 interface_params
["mac-address"])
478 if interface_params
.get("floating-ip-required"):
479 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
480 vdu_params
["id"], "interfaces", interface_params
["name"],
482 interface_params
["floating-ip-required"])
484 for internal_vld_params
in get_iterable(vnf_params
, "internal-vld"):
485 if internal_vld_params
.get("vim-network-name"):
486 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
487 internal_vld_params
["name"], "vim-network-name"),
488 internal_vld_params
["vim-network-name"])
489 if internal_vld_params
.get("vim-network-id"):
490 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
491 internal_vld_params
["name"], "vim-network-id"),
492 internal_vld_params
["vim-network-id"])
493 if internal_vld_params
.get("ip-profile"):
494 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
495 internal_vld_params
["name"], "ip-profile"),
496 ip_profile_2_RO(internal_vld_params
["ip-profile"]))
497 if internal_vld_params
.get("provider-network"):
499 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
500 internal_vld_params
["name"], "provider-network"),
501 internal_vld_params
["provider-network"].copy())
503 for icp_params
in get_iterable(internal_vld_params
, "internal-connection-point"):
506 for vdu_descriptor
in vnf_descriptor
["vdu"]:
507 for vdu_interface
in vdu_descriptor
["interface"]:
508 if vdu_interface
.get("internal-connection-point-ref") == icp_params
["id-ref"]:
509 if icp_params
.get("ip-address"):
510 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
511 vdu_descriptor
["id"], "interfaces",
512 vdu_interface
["name"], "ip_address"),
513 icp_params
["ip-address"])
515 if icp_params
.get("mac-address"):
516 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
517 vdu_descriptor
["id"], "interfaces",
518 vdu_interface
["name"], "mac_address"),
519 icp_params
["mac-address"])
525 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
526 "internal-vld:id-ref={} is not present at vnfd:internal-"
527 "connection-point".format(vnf_params
["member-vnf-index"],
528 icp_params
["id-ref"]))
530 for vld_params
in get_iterable(ns_params
, "vld"):
531 if "ip-profile" in vld_params
:
532 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "ip-profile"),
533 ip_profile_2_RO(vld_params
["ip-profile"]))
535 if vld_params
.get("provider-network"):
537 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "provider-network"),
538 vld_params
["provider-network"].copy())
540 if "wimAccountId" in vld_params
and vld_params
["wimAccountId"] is not None:
541 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "wim_account"),
542 wim_account_2_RO(vld_params
["wimAccountId"])),
543 if vld_params
.get("vim-network-name"):
545 if isinstance(vld_params
["vim-network-name"], dict):
546 for vim_account
, vim_net
in vld_params
["vim-network-name"].items():
547 RO_vld_sites
.append({
548 "netmap-use": vim_net
,
549 "datacenter": vim_account_2_RO(vim_account
)
551 else: # isinstance str
552 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-name"]})
554 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
556 if vld_params
.get("vim-network-id"):
558 if isinstance(vld_params
["vim-network-id"], dict):
559 for vim_account
, vim_net
in vld_params
["vim-network-id"].items():
560 RO_vld_sites
.append({
561 "netmap-use": vim_net
,
562 "datacenter": vim_account_2_RO(vim_account
)
564 else: # isinstance str
565 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-id"]})
567 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
568 if vld_params
.get("ns-net"):
569 if isinstance(vld_params
["ns-net"], dict):
570 for vld_id
, instance_scenario_id
in vld_params
["ns-net"].items():
571 RO_vld_ns_net
= {"instance_scenario_id": instance_scenario_id
, "osm_id": vld_id
}
572 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "use-network"), RO_vld_ns_net
)
573 if "vnfd-connection-point-ref" in vld_params
:
574 for cp_params
in vld_params
["vnfd-connection-point-ref"]:
576 for constituent_vnfd
in nsd
["constituent-vnfd"]:
577 if constituent_vnfd
["member-vnf-index"] == cp_params
["member-vnf-index-ref"]:
578 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
582 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
583 "is not present at nsd:constituent-vnfd".format(cp_params
["member-vnf-index-ref"]))
585 for vdu_descriptor
in vnf_descriptor
["vdu"]:
586 for interface_descriptor
in vdu_descriptor
["interface"]:
587 if interface_descriptor
.get("external-connection-point-ref") == \
588 cp_params
["vnfd-connection-point-ref"]:
595 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
596 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
597 cp_params
["member-vnf-index-ref"],
598 cp_params
["vnfd-connection-point-ref"],
599 vnf_descriptor
["id"]))
600 if cp_params
.get("ip-address"):
601 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
602 vdu_descriptor
["id"], "interfaces",
603 interface_descriptor
["name"], "ip_address"),
604 cp_params
["ip-address"])
605 if cp_params
.get("mac-address"):
606 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
607 vdu_descriptor
["id"], "interfaces",
608 interface_descriptor
["name"], "mac_address"),
609 cp_params
["mac-address"])
612 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None):
613 # make a copy to do not change
614 vdu_create
= copy(vdu_create
)
615 vdu_delete
= copy(vdu_delete
)
617 vdurs
= db_vnfr
.get("vdur")
620 vdu_index
= len(vdurs
)
623 vdur
= vdurs
[vdu_index
]
624 if vdur
.get("pdu-type"):
626 vdu_id_ref
= vdur
["vdu-id-ref"]
627 if vdu_create
and vdu_create
.get(vdu_id_ref
):
628 for index
in range(0, vdu_create
[vdu_id_ref
]):
629 vdur
= deepcopy(vdur
)
630 vdur
["_id"] = str(uuid4())
631 vdur
["count-index"] += 1
632 vdurs
.insert(vdu_index
+1+index
, vdur
)
633 del vdu_create
[vdu_id_ref
]
634 if vdu_delete
and vdu_delete
.get(vdu_id_ref
):
636 vdu_delete
[vdu_id_ref
] -= 1
637 if not vdu_delete
[vdu_id_ref
]:
638 del vdu_delete
[vdu_id_ref
]
639 # check all operations are done
640 if vdu_create
or vdu_delete
:
641 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
644 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
647 vnfr_update
= {"vdur": vdurs
}
648 db_vnfr
["vdur"] = vdurs
649 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
651 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
653 Updates database nsr with the RO info for the created vld
654 :param ns_update_nsr: dictionary to be filled with the updated info
655 :param db_nsr: content of db_nsr. This is also modified
656 :param nsr_desc_RO: nsr descriptor from RO
657 :return: Nothing, LcmException is raised on errors
660 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
661 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
662 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
664 vld
["vim-id"] = net_RO
.get("vim_net_id")
665 vld
["name"] = net_RO
.get("vim_name")
666 vld
["status"] = net_RO
.get("status")
667 vld
["status-detailed"] = net_RO
.get("error_msg")
668 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
671 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
673 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
675 for db_vnfr
in db_vnfrs
.values():
676 vnfr_update
= {"status": "ERROR"}
677 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
678 if "status" not in vdur
:
679 vdur
["status"] = "ERROR"
680 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
682 vdur
["status-detailed"] = str(error_text
)
683 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
684 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
685 except DbException
as e
:
686 self
.logger
.error("Cannot update vnf. {}".format(e
))
688 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
690 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
691 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
692 :param nsr_desc_RO: nsr descriptor from RO
693 :return: Nothing, LcmException is raised on errors
695 for vnf_index
, db_vnfr
in db_vnfrs
.items():
696 for vnf_RO
in nsr_desc_RO
["vnfs"]:
697 if vnf_RO
["member_vnf_index"] != vnf_index
:
700 if vnf_RO
.get("ip_address"):
701 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
702 elif not db_vnfr
.get("ip-address"):
703 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
704 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
706 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
707 vdur_RO_count_index
= 0
708 if vdur
.get("pdu-type"):
710 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
711 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
713 if vdur
["count-index"] != vdur_RO_count_index
:
714 vdur_RO_count_index
+= 1
716 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
717 if vdur_RO
.get("ip_address"):
718 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
720 vdur
["ip-address"] = None
721 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
722 vdur
["name"] = vdur_RO
.get("vim_name")
723 vdur
["status"] = vdur_RO
.get("status")
724 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
725 for ifacer
in get_iterable(vdur
, "interfaces"):
726 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
727 if ifacer
["name"] == interface_RO
.get("internal_name"):
728 ifacer
["ip-address"] = interface_RO
.get("ip_address")
729 ifacer
["mac-address"] = interface_RO
.get("mac_address")
732 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
734 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
735 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
738 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
739 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
741 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
742 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
743 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
745 vld
["vim-id"] = net_RO
.get("vim_net_id")
746 vld
["name"] = net_RO
.get("vim_name")
747 vld
["status"] = net_RO
.get("status")
748 vld
["status-detailed"] = net_RO
.get("error_msg")
749 vnfr_update
["vld.{}".format(vld_index
)] = vld
752 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
753 vnf_index
, vld
["id"]))
755 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
759 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
761 def _get_ns_config_info(self
, nsr_id
):
763 Generates a mapping between vnf,vdu elements and the N2VC id
764 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
765 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
766 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
767 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
769 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
770 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
772 ns_config_info
= {"osm-config-mapping": mapping
}
773 for vca
in vca_deployed_list
:
774 if not vca
["member-vnf-index"]:
776 if not vca
["vdu_id"]:
777 mapping
[vca
["member-vnf-index"]] = vca
["application"]
779 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
781 return ns_config_info
784 def _get_initial_config_primitive_list(desc_primitive_list
, vca_deployed
):
786 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
787 primitives as verify-ssh-credentials, or config when needed
788 :param desc_primitive_list: information of the descriptor
789 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
790 this element contains a ssh public key
791 :return: The modified list. Can ba an empty list, but always a list
793 if desc_primitive_list
:
794 primitive_list
= desc_primitive_list
.copy()
797 # look for primitive config, and get the position. None if not present
798 config_position
= None
799 for index
, primitive
in enumerate(primitive_list
):
800 if primitive
["name"] == "config":
801 config_position
= index
804 # for NS, add always a config primitive if not present (bug 874)
805 if not vca_deployed
["member-vnf-index"] and config_position
is None:
806 primitive_list
.insert(0, {"name": "config", "parameter": []})
808 # for VNF/VDU add verify-ssh-credentials after config
809 if vca_deployed
["member-vnf-index"] and config_position
is not None and vca_deployed
.get("ssh-public-key"):
810 primitive_list
.insert(config_position
+ 1, {"name": "verify-ssh-credentials", "parameter": []})
811 return primitive_list
813 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
814 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
815 nslcmop_id
= db_nslcmop
["_id"]
817 "name": db_nsr
["name"],
820 "image": deepcopy(db_nsr
["image"]),
821 "flavor": deepcopy(db_nsr
["flavor"]),
822 "action_id": nslcmop_id
,
824 for image
in target
["image"]:
825 image
["vim_info"] = []
826 for flavor
in target
["flavor"]:
827 flavor
["vim_info"] = []
829 ns_params
= db_nslcmop
.get("operationParams")
831 if ns_params
.get("ssh_keys"):
832 ssh_keys
+= ns_params
.get("ssh_keys")
834 ssh_keys
+= n2vc_key_list
837 for vld_index
, vld
in enumerate(nsd
.get("vld")):
838 target_vld
= {"id": vld
["id"],
840 "mgmt-network": vld
.get("mgmt-network", False),
841 "type": vld
.get("type"),
842 "vim_info": [{"vim-network-name": vld
.get("vim-network-name"),
843 "vim_account_id": ns_params
["vimAccountId"]}],
845 for cp
in vld
["vnfd-connection-point-ref"]:
846 cp2target
["member_vnf:{}.{}".format(cp
["member-vnf-index-ref"], cp
["vnfd-connection-point-ref"])] = \
847 "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
848 target
["ns"]["vld"].append(target_vld
)
849 for vnfr
in db_vnfrs
.values():
850 vnfd
= db_vnfds_ref
[vnfr
["vnfd-ref"]]
851 target_vnf
= deepcopy(vnfr
)
852 for vld
in target_vnf
.get("vld", ()):
853 # check if connected to a ns.vld
854 vnf_cp
= next((cp
for cp
in vnfd
.get("connection-point", ()) if
855 cp
.get("internal-vld-ref") == vld
["id"]), None)
857 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
858 if cp2target
.get(ns_cp
):
859 vld
["target"] = cp2target
[ns_cp
]
860 vld
["vim_info"] = [{"vim-network-name": vld
.get("vim-network-name"),
861 "vim_account_id": vnfr
["vim-account-id"]}]
863 for vdur
in target_vnf
.get("vdur", ()):
864 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
865 vdud_index
, vdud
= next(k
for k
in enumerate(vnfd
["vdu"]) if k
[1]["id"] == vdur
["vdu-id-ref"])
866 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
869 if deep_get(vdud
, ("vdu-configuration", "config-access", "ssh-access", "required")):
870 vdur
["ssh-keys"] = ssh_keys
871 vdur
["ssh-access-required"] = True
872 elif deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
873 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
874 vdur
["ssh-keys"] = ssh_keys
875 vdur
["ssh-access-required"] = True
878 if vdud
.get("cloud-init-file"):
879 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
880 elif vdud
.get("cloud-init"):
881 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], vdud_index
)
884 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
885 if not next((vi
for vi
in ns_flavor
["vim_info"] if
886 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
887 ns_flavor
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
889 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
890 if not next((vi
for vi
in ns_image
["vim_info"] if
891 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
892 ns_image
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
894 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
895 target
["vnf"].append(target_vnf
)
897 desc
= await self
.RO
.deploy(nsr_id
, target
)
898 action_id
= desc
["action_id"]
899 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
903 "_admin.deployed.RO.operational-status": "running",
904 "detailed-status": " ".join(stage
)
906 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
907 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
908 self
._write
_op
_status
(nslcmop_id
, stage
)
909 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
912 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
, start_time
, timeout
, stage
):
913 detailed_status_old
= None
915 while time() <= start_time
+ timeout
:
916 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
917 if desc_status
["status"] == "FAILED":
918 raise NgRoException(desc_status
["details"])
919 elif desc_status
["status"] == "BUILD":
920 stage
[2] = "VIM: ({})".format(desc_status
["details"])
921 elif desc_status
["status"] == "DONE":
922 stage
[2] = "Deployed at VIM"
925 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
926 if stage
[2] != detailed_status_old
:
927 detailed_status_old
= stage
[2]
928 db_nsr_update
["detailed-status"] = " ".join(stage
)
929 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
930 self
._write
_op
_status
(nslcmop_id
, stage
)
931 await asyncio
.sleep(5, loop
=self
.loop
)
932 else: # timeout_ns_deploy
933 raise NgRoException("Timeout waiting ns to deploy")
935 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
939 start_deploy
= time()
947 desc
= await self
.RO
.deploy(nsr_id
, target
)
948 action_id
= desc
["action_id"]
949 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
950 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
951 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
954 delete_timeout
= 20 * 60 # 20 minutes
955 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
957 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
958 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
960 await self
.RO
.delete(nsr_id
)
961 except Exception as e
:
962 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
963 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
964 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
965 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
966 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
967 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
968 failed_detail
.append("delete conflict: {}".format(e
))
969 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
971 failed_detail
.append("delete error: {}".format(e
))
972 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
975 stage
[2] = "Error deleting from VIM"
977 stage
[2] = "Deleted from VIM"
978 db_nsr_update
["detailed-status"] = " ".join(stage
)
979 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
980 self
._write
_op
_status
(nslcmop_id
, stage
)
983 raise LcmException("; ".join(failed_detail
))
986 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
987 n2vc_key_list
, stage
):
990 :param logging_text: preffix text to use at logging
991 :param nsr_id: nsr identity
992 :param nsd: database content of ns descriptor
993 :param db_nsr: database content of ns record
994 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
996 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
997 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
998 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
999 :return: None or exception
1003 RO_descriptor_number
= 0 # number of descriptors created at RO
1004 vnf_index_2_RO_id
= {} # map between vnfd/nsd id to the id used at RO
1005 nslcmop_id
= db_nslcmop
["_id"]
1006 start_deploy
= time()
1007 ns_params
= db_nslcmop
.get("operationParams")
1008 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1009 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1011 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1013 # Check for and optionally request placement optimization. Database will be updated if placement activated
1014 stage
[2] = "Waiting for Placement."
1015 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1016 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1017 for vnfr
in db_vnfrs
.values():
1018 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1021 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1024 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
1025 db_vnfds_ref
, n2vc_key_list
, stage
, start_deploy
,
1028 # get vnfds, instantiate at RO
1029 for c_vnf
in nsd
.get("constituent-vnfd", ()):
1030 member_vnf_index
= c_vnf
["member-vnf-index"]
1031 vnfd
= db_vnfds_ref
[c_vnf
['vnfd-id-ref']]
1032 vnfd_ref
= vnfd
["id"]
1034 stage
[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref
, member_vnf_index
)
1035 db_nsr_update
["detailed-status"] = " ".join(stage
)
1036 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1037 self
._write
_op
_status
(nslcmop_id
, stage
)
1039 # self.logger.debug(logging_text + stage[2])
1040 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, member_vnf_index
[:23])
1041 vnf_index_2_RO_id
[member_vnf_index
] = vnfd_id_RO
1042 RO_descriptor_number
+= 1
1044 # look position at deployed.RO.vnfd if not present it will be appended at the end
1045 for index
, vnf_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"]):
1046 if vnf_deployed
["member-vnf-index"] == member_vnf_index
:
1049 index
= len(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"])
1050 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1053 RO_update
= {"member-vnf-index": member_vnf_index
}
1054 vnfd_list
= await self
.RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
1056 RO_update
["id"] = vnfd_list
[0]["uuid"]
1057 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1058 format(vnfd_ref
, member_vnf_index
, vnfd_list
[0]["uuid"]))
1060 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
, db_vnfrs
[c_vnf
["member-vnf-index"]].
1061 get("additionalParamsForVnf"), nsr_id
)
1062 desc
= await self
.RO
.create("vnfd", descriptor
=vnfd_RO
)
1063 RO_update
["id"] = desc
["uuid"]
1064 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1065 vnfd_ref
, member_vnf_index
, desc
["uuid"]))
1066 db_nsr_update
["_admin.deployed.RO.vnfd.{}".format(index
)] = RO_update
1067 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"][index
] = RO_update
1072 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1073 db_nsr_update
["detailed-status"] = " ".join(stage
)
1074 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1075 self
._write
_op
_status
(nslcmop_id
, stage
)
1077 # self.logger.debug(logging_text + stage[2])
1078 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_ref
[:23])
1079 RO_descriptor_number
+= 1
1080 nsd_list
= await self
.RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
1082 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
1083 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
1084 nsd_ref
, RO_nsd_uuid
))
1086 nsd_RO
= deepcopy(nsd
)
1087 nsd_RO
["id"] = RO_osm_nsd_id
1088 nsd_RO
.pop("_id", None)
1089 nsd_RO
.pop("_admin", None)
1090 for c_vnf
in nsd_RO
.get("constituent-vnfd", ()):
1091 member_vnf_index
= c_vnf
["member-vnf-index"]
1092 c_vnf
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1093 for c_vld
in nsd_RO
.get("vld", ()):
1094 for cp
in c_vld
.get("vnfd-connection-point-ref", ()):
1095 member_vnf_index
= cp
["member-vnf-index-ref"]
1096 cp
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1098 desc
= await self
.RO
.create("nsd", descriptor
=nsd_RO
)
1099 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1100 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
1101 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_ref
, RO_nsd_uuid
))
1102 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1105 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1106 db_nsr_update
["detailed-status"] = " ".join(stage
)
1107 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1108 self
._write
_op
_status
(nslcmop_id
, stage
)
1110 # if present use it unless in error status
1111 RO_nsr_id
= deep_get(db_nsr
, ("_admin", "deployed", "RO", "nsr_id"))
1114 stage
[2] = "Looking for existing ns at RO"
1115 db_nsr_update
["detailed-status"] = " ".join(stage
)
1116 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1117 self
._write
_op
_status
(nslcmop_id
, stage
)
1118 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1119 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1121 except ROclient
.ROClientException
as e
:
1122 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
1124 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1126 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1127 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1128 if ns_status
== "ERROR":
1129 stage
[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
1130 self
.logger
.debug(logging_text
+ stage
[2])
1131 await self
.RO
.delete("ns", RO_nsr_id
)
1132 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1134 stage
[2] = "Checking dependencies"
1135 db_nsr_update
["detailed-status"] = " ".join(stage
)
1136 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1137 self
._write
_op
_status
(nslcmop_id
, stage
)
1138 # self.logger.debug(logging_text + stage[2])
1140 # check if VIM is creating and wait look if previous tasks in process
1141 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account", ns_params
["vimAccountId"])
1143 stage
[2] = "Waiting for related tasks '{}' to be completed".format(task_name
)
1144 self
.logger
.debug(logging_text
+ stage
[2])
1145 await asyncio
.wait(task_dependency
, timeout
=3600)
1146 if ns_params
.get("vnf"):
1147 for vnf
in ns_params
["vnf"]:
1148 if "vimAccountId" in vnf
:
1149 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account",
1150 vnf
["vimAccountId"])
1152 stage
[2] = "Waiting for related tasks '{}' to be completed.".format(task_name
)
1153 self
.logger
.debug(logging_text
+ stage
[2])
1154 await asyncio
.wait(task_dependency
, timeout
=3600)
1156 stage
[2] = "Checking instantiation parameters."
1157 RO_ns_params
= self
._ns
_params
_2_RO
(ns_params
, nsd
, db_vnfds_ref
, db_vnfrs
, n2vc_key_list
)
1158 stage
[2] = "Deploying ns at VIM."
1159 db_nsr_update
["detailed-status"] = " ".join(stage
)
1160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1161 self
._write
_op
_status
(nslcmop_id
, stage
)
1163 desc
= await self
.RO
.create("ns", descriptor
=RO_ns_params
, name
=db_nsr
["name"], scenario
=RO_nsd_uuid
)
1164 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
1165 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1166 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
1167 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
1169 # wait until NS is ready
1170 stage
[2] = "Waiting VIM to deploy ns."
1171 db_nsr_update
["detailed-status"] = " ".join(stage
)
1172 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1173 self
._write
_op
_status
(nslcmop_id
, stage
)
1174 detailed_status_old
= None
1175 self
.logger
.debug(logging_text
+ stage
[2] + " RO_ns_id={}".format(RO_nsr_id
))
1178 while time() <= start_deploy
+ timeout_ns_deploy
:
1179 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1182 if desc
!= old_desc
:
1183 # desc has changed => update db
1184 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
1187 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1188 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1189 if ns_status
== "ERROR":
1190 raise ROclient
.ROClientException(ns_status_info
)
1191 elif ns_status
== "BUILD":
1192 stage
[2] = "VIM: ({})".format(ns_status_info
)
1193 elif ns_status
== "ACTIVE":
1194 stage
[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1196 self
.ns_update_vnfr(db_vnfrs
, desc
)
1198 except LcmExceptionNoMgmtIP
:
1201 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1202 if stage
[2] != detailed_status_old
:
1203 detailed_status_old
= stage
[2]
1204 db_nsr_update
["detailed-status"] = " ".join(stage
)
1205 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1206 self
._write
_op
_status
(nslcmop_id
, stage
)
1207 await asyncio
.sleep(5, loop
=self
.loop
)
1208 else: # timeout_ns_deploy
1209 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1212 self
.ns_update_nsr(db_nsr_update
, db_nsr
, desc
)
1214 db_nsr_update
["_admin.deployed.RO.operational-status"] = "running"
1215 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1216 stage
[2] = "Deployed at VIM"
1217 db_nsr_update
["detailed-status"] = " ".join(stage
)
1218 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1219 self
._write
_op
_status
(nslcmop_id
, stage
)
1220 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1221 # self.logger.debug(logging_text + "Deployed at VIM")
1222 except (ROclient
.ROClientException
, LcmException
, DbException
, NgRoException
) as e
:
1223 stage
[2] = "ERROR deploying at VIM"
1224 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1227 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1229 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1230 :param logging_text: prefix use for logging
1235 :param pub_key: public ssh key to inject, None to skip
1236 :param user: user to apply the public ssh key
1240 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1244 target_vdu_id
= None
1250 if ro_retries
>= 360: # 1 hour
1251 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1253 await asyncio
.sleep(10, loop
=self
.loop
)
1256 if not target_vdu_id
:
1257 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1259 if not vdu_id
: # for the VNF case
1260 if db_vnfr
.get("status") == "ERROR":
1261 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1262 ip_address
= db_vnfr
.get("ip-address")
1265 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1267 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1268 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1270 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1271 vdur
= db_vnfr
["vdur"][0]
1273 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1276 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE":
1277 ip_address
= vdur
.get("ip-address")
1280 target_vdu_id
= vdur
["vdu-id-ref"]
1281 elif vdur
.get("status") == "ERROR":
1282 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1284 if not target_vdu_id
:
1287 # inject public key into machine
1288 if pub_key
and user
:
1289 # wait until NS is deployed at RO
1291 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1292 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1296 # self.logger.debug(logging_text + "Inserting RO key")
1297 if vdur
.get("pdu-type"):
1298 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1301 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1303 target
= {"action": "inject_ssh_key", "key": pub_key
, "user": user
,
1304 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdu_id
}]}],
1306 await self
.RO
.deploy(nsr_id
, target
)
1308 result_dict
= await self
.RO
.create_action(
1310 item_id_name
=ro_nsr_id
,
1311 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1313 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1314 if not result_dict
or not isinstance(result_dict
, dict):
1315 raise LcmException("Unknown response from RO when injecting key")
1316 for result
in result_dict
.values():
1317 if result
.get("vim_result") == 200:
1320 raise ROclient
.ROClientException("error injecting key: {}".format(
1321 result
.get("description")))
1323 except NgRoException
as e
:
1324 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1325 except ROclient
.ROClientException
as e
:
1327 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1331 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1337 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1339 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1341 my_vca
= vca_deployed_list
[vca_index
]
1342 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1343 # vdu or kdu: no dependencies
1347 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1348 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1349 configuration_status_list
= db_nsr
["configurationStatus"]
1350 for index
, vca_deployed
in enumerate(configuration_status_list
):
1351 if index
== vca_index
:
1354 if not my_vca
.get("member-vnf-index") or \
1355 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1356 internal_status
= configuration_status_list
[index
].get("status")
1357 if internal_status
== 'READY':
1359 elif internal_status
== 'BROKEN':
1360 raise LcmException("Configuration aborted because dependent charm/s has failed")
1364 # no dependencies, return
1366 await asyncio
.sleep(10)
1369 raise LcmException("Configuration aborted because dependent charm/s timeout")
1371 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1372 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
):
1373 nsr_id
= db_nsr
["_id"]
1374 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1375 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1376 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1378 'collection': 'nsrs',
1379 'filter': {'_id': nsr_id
},
1380 'path': db_update_entry
1386 element_under_configuration
= nsr_id
1390 vnfr_id
= db_vnfr
["_id"]
1392 namespace
= "{nsi}.{ns}".format(
1393 nsi
=nsi_id
if nsi_id
else "",
1397 element_type
= 'VNF'
1398 element_under_configuration
= vnfr_id
1399 namespace
+= ".{}".format(vnfr_id
)
1401 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1402 element_type
= 'VDU'
1403 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1405 namespace
+= ".{}".format(kdu_name
)
1406 element_type
= 'KDU'
1407 element_under_configuration
= kdu_name
1410 artifact_path
= "{}/{}/{}/{}".format(
1411 base_folder
["folder"],
1412 base_folder
["pkg-dir"],
1413 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1417 # n2vc_redesign STEP 3.1
1419 # find old ee_id if exists
1420 ee_id
= vca_deployed
.get("ee_id")
1422 # create or register execution environment in VCA
1423 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1425 self
._write
_configuration
_status
(
1427 vca_index
=vca_index
,
1429 element_under_configuration
=element_under_configuration
,
1430 element_type
=element_type
1433 step
= "create execution environment"
1434 self
.logger
.debug(logging_text
+ step
)
1435 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1436 namespace
=namespace
,
1439 artifact_path
=artifact_path
,
1442 elif vca_type
== "native_charm":
1443 step
= "Waiting to VM being up and getting IP address"
1444 self
.logger
.debug(logging_text
+ step
)
1445 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1446 user
=None, pub_key
=None)
1447 credentials
= {"hostname": rw_mgmt_ip
}
1449 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1450 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1451 # merged. Meanwhile let's get username from initial-config-primitive
1452 if not username
and config_descriptor
.get("initial-config-primitive"):
1453 for config_primitive
in config_descriptor
["initial-config-primitive"]:
1454 for param
in config_primitive
.get("parameter", ()):
1455 if param
["name"] == "ssh-username":
1456 username
= param
["value"]
1459 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1460 "'config-access.ssh-access.default-user'")
1461 credentials
["username"] = username
1462 # n2vc_redesign STEP 3.2
1464 self
._write
_configuration
_status
(
1466 vca_index
=vca_index
,
1467 status
='REGISTERING',
1468 element_under_configuration
=element_under_configuration
,
1469 element_type
=element_type
1472 step
= "register execution environment {}".format(credentials
)
1473 self
.logger
.debug(logging_text
+ step
)
1474 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1475 credentials
=credentials
, namespace
=namespace
, db_dict
=db_dict
)
1477 # for compatibility with MON/POL modules, the need model and application name at database
1478 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1479 ee_id_parts
= ee_id
.split('.')
1480 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1481 if len(ee_id_parts
) >= 2:
1482 model_name
= ee_id_parts
[0]
1483 application_name
= ee_id_parts
[1]
1484 db_nsr_update
[db_update_entry
+ "model"] = model_name
1485 db_nsr_update
[db_update_entry
+ "application"] = application_name
1487 # n2vc_redesign STEP 3.3
1488 step
= "Install configuration Software"
1490 self
._write
_configuration
_status
(
1492 vca_index
=vca_index
,
1493 status
='INSTALLING SW',
1494 element_under_configuration
=element_under_configuration
,
1495 element_type
=element_type
,
1496 other_update
=db_nsr_update
1499 # TODO check if already done
1500 self
.logger
.debug(logging_text
+ step
)
1502 if vca_type
== "native_charm":
1503 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1504 if initial_config_primitive_list
:
1505 for primitive
in initial_config_primitive_list
:
1506 if primitive
["name"] == "config":
1507 config
= self
._map
_primitive
_params
(
1514 if vca_type
== "lxc_proxy_charm":
1515 if element_type
== "NS":
1516 num_units
= db_nsr
.get("config-units") or 1
1517 elif element_type
== "VNF":
1518 num_units
= db_vnfr
.get("config-units") or 1
1519 elif element_type
== "VDU":
1520 for v
in db_vnfr
["vdur"]:
1521 if vdu_id
== v
["vdu-id-ref"]:
1522 num_units
= v
.get("config-units") or 1
1525 await self
.vca_map
[vca_type
].install_configuration_sw(
1527 artifact_path
=artifact_path
,
1530 num_units
=num_units
,
1534 # write in db flag of configuration_sw already installed
1535 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1537 # add relations for this VCA (wait for other peers related with this VCA)
1538 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1539 vca_index
=vca_index
, vca_type
=vca_type
)
1541 # if SSH access is required, then get execution environment SSH public
1542 if vca_type
in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP
1545 # self.logger.debug("get ssh key block")
1546 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1547 # self.logger.debug("ssh key needed")
1548 # Needed to inject a ssh key
1549 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1550 step
= "Install configuration Software, getting public ssh key"
1551 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1553 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1555 # self.logger.debug("no need to get ssh key")
1556 step
= "Waiting to VM being up and getting IP address"
1557 self
.logger
.debug(logging_text
+ step
)
1559 # n2vc_redesign STEP 5.1
1560 # wait for RO (ip-address) Insert pub_key into VM
1562 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1563 user
=user
, pub_key
=pub_key
)
1565 rw_mgmt_ip
= None # This is for a NS configuration
1567 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1569 # store rw_mgmt_ip in deploy params for later replacement
1570 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1572 # n2vc_redesign STEP 6 Execute initial config primitive
1573 step
= 'execute initial config primitive'
1574 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1576 # sort initial config primitives by 'seq'
1577 if initial_config_primitive_list
:
1579 initial_config_primitive_list
.sort(key
=lambda val
: int(val
['seq']))
1580 except Exception as e
:
1581 self
.logger
.error(logging_text
+ step
+ ": " + str(e
))
1583 self
.logger
.debug(logging_text
+ step
+ ": No initial-config-primitive")
1585 # add config if not present for NS charm
1586 initial_config_primitive_list
= self
._get
_initial
_config
_primitive
_list
(initial_config_primitive_list
,
1589 # wait for dependent primitives execution (NS -> VNF -> VDU)
1590 if initial_config_primitive_list
:
1591 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1593 # stage, in function of element type: vdu, kdu, vnf or ns
1594 my_vca
= vca_deployed_list
[vca_index
]
1595 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1597 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1598 elif my_vca
.get("member-vnf-index"):
1600 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1603 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1605 self
._write
_configuration
_status
(
1607 vca_index
=vca_index
,
1608 status
='EXECUTING PRIMITIVE'
1611 self
._write
_op
_status
(
1616 check_if_terminated_needed
= True
1617 for initial_config_primitive
in initial_config_primitive_list
:
1618 # adding information on the vca_deployed if it is a NS execution environment
1619 if not vca_deployed
["member-vnf-index"]:
1620 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1621 # TODO check if already done
1622 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1624 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1625 self
.logger
.debug(logging_text
+ step
)
1626 await self
.vca_map
[vca_type
].exec_primitive(
1628 primitive_name
=initial_config_primitive
["name"],
1629 params_dict
=primitive_params_
,
1632 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1633 if check_if_terminated_needed
:
1634 if config_descriptor
.get('terminate-config-primitive'):
1635 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1636 check_if_terminated_needed
= False
1638 # TODO register in database that primitive is done
1640 step
= "instantiated at VCA"
1641 self
.logger
.debug(logging_text
+ step
)
1643 self
._write
_configuration
_status
(
1645 vca_index
=vca_index
,
1649 except Exception as e
: # TODO not use Exception but N2VC exception
1650 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1651 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1652 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1653 self
._write
_configuration
_status
(
1655 vca_index
=vca_index
,
1658 raise LcmException("{} {}".format(step
, e
)) from e
1660 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1661 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1663 Update db_nsr fields.
1666 :param current_operation:
1667 :param current_operation_id:
1668 :param error_description:
1669 :param error_detail:
1670 :param other_update: Other required changes at database if provided, will be cleared
1674 db_dict
= other_update
or {}
1675 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1676 db_dict
["_admin.current-operation"] = current_operation_id
1677 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1678 db_dict
["currentOperation"] = current_operation
1679 db_dict
["currentOperationID"] = current_operation_id
1680 db_dict
["errorDescription"] = error_description
1681 db_dict
["errorDetail"] = error_detail
1684 db_dict
["nsState"] = ns_state
1685 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1686 except DbException
as e
:
1687 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1689 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1690 operation_state
: str = None, other_update
: dict = None):
1692 db_dict
= other_update
or {}
1693 db_dict
['queuePosition'] = queuePosition
1694 if isinstance(stage
, list):
1695 db_dict
['stage'] = stage
[0]
1696 db_dict
['detailed-status'] = " ".join(stage
)
1697 elif stage
is not None:
1698 db_dict
['stage'] = str(stage
)
1700 if error_message
is not None:
1701 db_dict
['errorMessage'] = error_message
1702 if operation_state
is not None:
1703 db_dict
['operationState'] = operation_state
1704 db_dict
["statusEnteredTime"] = time()
1705 self
.update_db_2("nslcmops", op_id
, db_dict
)
1706 except DbException
as e
:
1707 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1709 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1711 nsr_id
= db_nsr
["_id"]
1712 # configurationStatus
1713 config_status
= db_nsr
.get('configurationStatus')
1715 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1716 enumerate(config_status
) if v
}
1718 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1720 except DbException
as e
:
1721 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1723 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1724 element_under_configuration
: str = None, element_type
: str = None,
1725 other_update
: dict = None):
1727 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1728 # .format(vca_index, status))
1731 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1732 db_dict
= other_update
or {}
1734 db_dict
[db_path
+ 'status'] = status
1735 if element_under_configuration
:
1736 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1738 db_dict
[db_path
+ 'elementType'] = element_type
1739 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1740 except DbException
as e
:
1741 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1742 .format(status
, nsr_id
, vca_index
, e
))
1744 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1746 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1747 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1748 Database is used because the result can be obtained from a different LCM worker in case of HA.
1749 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1750 :param db_nslcmop: database content of nslcmop
1751 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1752 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1753 computed 'vim-account-id'
1756 nslcmop_id
= db_nslcmop
['_id']
1757 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1758 if placement_engine
== "PLA":
1759 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1760 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1761 db_poll_interval
= 5
1762 wait
= db_poll_interval
* 10
1764 while not pla_result
and wait
>= 0:
1765 await asyncio
.sleep(db_poll_interval
)
1766 wait
-= db_poll_interval
1767 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1768 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1771 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1773 for pla_vnf
in pla_result
['vnf']:
1774 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1775 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1778 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1780 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1783 def update_nsrs_with_pla_result(self
, params
):
1785 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1786 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1787 except Exception as e
:
1788 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1790 async def instantiate(self
, nsr_id
, nslcmop_id
):
1793 :param nsr_id: ns instance to deploy
1794 :param nslcmop_id: operation to run
1798 # Try to lock HA task here
1799 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1800 if not task_is_locked_by_me
:
1801 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1804 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1805 self
.logger
.debug(logging_text
+ "Enter")
1810 # get all needed from database
1812 # database nsrs record
1815 # database nslcmops record
1818 # update operation on nsrs
1820 # update operation on nslcmops
1821 db_nslcmop_update
= {}
1823 nslcmop_operation_state
= None
1824 db_vnfrs
= {} # vnf's info indexed by member-index
1826 tasks_dict_info
= {} # from task to info text
1829 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1830 # ^ stage, step, VIM progress
1832 # wait for any previous tasks in process
1833 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1835 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1836 stage
[1] = "Reading from database,"
1837 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1838 db_nsr_update
["detailed-status"] = "creating"
1839 db_nsr_update
["operational-status"] = "init"
1840 self
._write
_ns
_status
(
1842 ns_state
="BUILDING",
1843 current_operation
="INSTANTIATING",
1844 current_operation_id
=nslcmop_id
,
1845 other_update
=db_nsr_update
1847 self
._write
_op
_status
(
1853 # read from db: operation
1854 stage
[1] = "Getting nslcmop={} from db".format(nslcmop_id
)
1855 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1856 ns_params
= db_nslcmop
.get("operationParams")
1857 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1858 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1860 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1863 stage
[1] = "Getting nsr={} from db".format(nsr_id
)
1864 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1865 stage
[1] = "Getting nsd={} from db".format(db_nsr
["nsd-id"])
1866 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1868 # nsr_name = db_nsr["name"] # TODO short-name??
1870 # read from db: vnf's of this ns
1871 stage
[1] = "Getting vnfrs from db"
1872 self
.logger
.debug(logging_text
+ stage
[1])
1873 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1875 # read from db: vnfd's for every vnf
1876 db_vnfds_ref
= {} # every vnfd data indexed by vnf name
1877 db_vnfds
= {} # every vnfd data indexed by vnf id
1878 db_vnfds_index
= {} # every vnfd data indexed by vnf member-index
1880 # for each vnf in ns, read vnfd
1881 for vnfr
in db_vnfrs_list
:
1882 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
# vnf's dict indexed by member-index: '1', '2', etc
1883 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
1884 vnfd_ref
= vnfr
["vnfd-ref"] # vnfd name for this vnf
1885 # if we haven't this vnfd, read it from db
1886 if vnfd_id
not in db_vnfds
:
1888 stage
[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id
, vnfd_ref
)
1889 self
.logger
.debug(logging_text
+ stage
[1])
1890 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1893 db_vnfds_ref
[vnfd_ref
] = vnfd
# vnfd's indexed by name
1894 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
1895 db_vnfds_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds
[vnfd_id
] # vnfd's indexed by member-index
1897 # Get or generates the _admin.deployed.VCA list
1898 vca_deployed_list
= None
1899 if db_nsr
["_admin"].get("deployed"):
1900 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1901 if vca_deployed_list
is None:
1902 vca_deployed_list
= []
1903 configuration_status_list
= []
1904 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1905 db_nsr_update
["configurationStatus"] = configuration_status_list
1906 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1907 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1908 elif isinstance(vca_deployed_list
, dict):
1909 # maintain backward compatibility. Change a dict to list at database
1910 vca_deployed_list
= list(vca_deployed_list
.values())
1911 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1912 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1914 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1915 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1916 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1918 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1919 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1920 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1922 # n2vc_redesign STEP 2 Deploy Network Scenario
1923 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1924 self
._write
_op
_status
(
1929 stage
[1] = "Deploying KDUs,"
1930 # self.logger.debug(logging_text + "Before deploy_kdus")
1931 # Call to deploy_kdus in case exists the "vdu:kdu" param
1932 await self
.deploy_kdus(
1933 logging_text
=logging_text
,
1935 nslcmop_id
=nslcmop_id
,
1938 task_instantiation_info
=tasks_dict_info
,
1941 stage
[1] = "Getting VCA public key."
1942 # n2vc_redesign STEP 1 Get VCA public ssh-key
1943 # feature 1429. Add n2vc public key to needed VMs
1944 n2vc_key
= self
.n2vc
.get_public_key()
1945 n2vc_key_list
= [n2vc_key
]
1946 if self
.vca_config
.get("public_key"):
1947 n2vc_key_list
.append(self
.vca_config
["public_key"])
1949 stage
[1] = "Deploying NS at VIM."
1950 task_ro
= asyncio
.ensure_future(
1951 self
.instantiate_RO(
1952 logging_text
=logging_text
,
1956 db_nslcmop
=db_nslcmop
,
1958 db_vnfds_ref
=db_vnfds_ref
,
1959 n2vc_key_list
=n2vc_key_list
,
1963 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1964 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1966 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1967 stage
[1] = "Deploying Execution Environments."
1968 self
.logger
.debug(logging_text
+ stage
[1])
1970 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1971 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1972 for c_vnf
in get_iterable(nsd
, "constituent-vnfd"):
1973 vnfd_id
= c_vnf
["vnfd-id-ref"]
1974 vnfd
= db_vnfds_ref
[vnfd_id
]
1975 member_vnf_index
= str(c_vnf
["member-vnf-index"])
1976 db_vnfr
= db_vnfrs
[member_vnf_index
]
1977 base_folder
= vnfd
["_admin"]["storage"]
1983 # Get additional parameters
1985 if db_vnfr
.get("additionalParamsForVnf"):
1986 deploy_params
= self
._format
_additional
_params
(db_vnfr
["additionalParamsForVnf"].copy())
1988 descriptor_config
= vnfd
.get("vnf-configuration")
1989 if descriptor_config
:
1991 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1994 nslcmop_id
=nslcmop_id
,
2000 member_vnf_index
=member_vnf_index
,
2001 vdu_index
=vdu_index
,
2003 deploy_params
=deploy_params
,
2004 descriptor_config
=descriptor_config
,
2005 base_folder
=base_folder
,
2006 task_instantiation_info
=tasks_dict_info
,
2010 # Deploy charms for each VDU that supports one.
2011 for vdud
in get_iterable(vnfd
, 'vdu'):
2013 descriptor_config
= vdud
.get('vdu-configuration')
2014 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
2015 if vdur
.get("additionalParams"):
2016 deploy_params_vdu
= self
._format
_additional
_params
(vdur
["additionalParams"])
2018 deploy_params_vdu
= deploy_params
2019 if descriptor_config
:
2020 # look for vdu index in the db_vnfr["vdu"] section
2021 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2022 # if vdur["vdu-id-ref"] == vdu_id:
2025 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2026 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2027 # vdu_name = vdur.get("name")
2030 for vdu_index
in range(int(vdud
.get("count", 1))):
2031 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2033 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2034 member_vnf_index
, vdu_id
, vdu_index
),
2037 nslcmop_id
=nslcmop_id
,
2043 member_vnf_index
=member_vnf_index
,
2044 vdu_index
=vdu_index
,
2046 deploy_params
=deploy_params_vdu
,
2047 descriptor_config
=descriptor_config
,
2048 base_folder
=base_folder
,
2049 task_instantiation_info
=tasks_dict_info
,
2052 for kdud
in get_iterable(vnfd
, 'kdu'):
2053 kdu_name
= kdud
["name"]
2054 descriptor_config
= kdud
.get('kdu-configuration')
2055 if descriptor_config
:
2059 # look for vdu index in the db_vnfr["vdu"] section
2060 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2061 # if vdur["vdu-id-ref"] == vdu_id:
2064 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2065 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2066 # vdu_name = vdur.get("name")
2070 logging_text
=logging_text
,
2073 nslcmop_id
=nslcmop_id
,
2079 member_vnf_index
=member_vnf_index
,
2080 vdu_index
=vdu_index
,
2082 deploy_params
=deploy_params
,
2083 descriptor_config
=descriptor_config
,
2084 base_folder
=base_folder
,
2085 task_instantiation_info
=tasks_dict_info
,
2089 # Check if this NS has a charm configuration
2090 descriptor_config
= nsd
.get("ns-configuration")
2091 if descriptor_config
and descriptor_config
.get("juju"):
2094 member_vnf_index
= None
2100 # Get additional parameters
2102 if db_nsr
.get("additionalParamsForNs"):
2103 deploy_params
= self
._format
_additional
_params
(db_nsr
["additionalParamsForNs"].copy())
2104 base_folder
= nsd
["_admin"]["storage"]
2106 logging_text
=logging_text
,
2109 nslcmop_id
=nslcmop_id
,
2115 member_vnf_index
=member_vnf_index
,
2116 vdu_index
=vdu_index
,
2118 deploy_params
=deploy_params
,
2119 descriptor_config
=descriptor_config
,
2120 base_folder
=base_folder
,
2121 task_instantiation_info
=tasks_dict_info
,
2125 # rest of staff will be done at finally
2127 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
2128 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
2130 except asyncio
.CancelledError
:
2131 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
2132 exc
= "Operation was cancelled"
2133 except Exception as e
:
2134 exc
= traceback
.format_exc()
2135 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
2138 error_list
.append(str(exc
))
2140 # wait for pending tasks
2142 stage
[1] = "Waiting for instantiate pending tasks."
2143 self
.logger
.debug(logging_text
+ stage
[1])
2144 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2145 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2146 stage
[1] = stage
[2] = ""
2147 except asyncio
.CancelledError
:
2148 error_list
.append("Cancelled")
2149 # TODO cancel all tasks
2150 except Exception as exc
:
2151 error_list
.append(str(exc
))
2153 # update operation-status
2154 db_nsr_update
["operational-status"] = "running"
2155 # let's begin with VCA 'configured' status (later we can change it)
2156 db_nsr_update
["config-status"] = "configured"
2157 for task
, task_name
in tasks_dict_info
.items():
2158 if not task
.done() or task
.cancelled() or task
.exception():
2159 if task_name
.startswith(self
.task_name_deploy_vca
):
2160 # A N2VC task is pending
2161 db_nsr_update
["config-status"] = "failed"
2163 # RO or KDU task is pending
2164 db_nsr_update
["operational-status"] = "failed"
2166 # update status at database
2168 error_detail
= ". ".join(error_list
)
2169 self
.logger
.error(logging_text
+ error_detail
)
2170 error_description_nslcmop
= 'Stage: {}. Detail: {}'.format(stage
[0], error_detail
)
2171 error_description_nsr
= 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id
, stage
[0])
2173 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2174 db_nslcmop_update
["detailed-status"] = error_detail
2175 nslcmop_operation_state
= "FAILED"
2179 error_description_nsr
= error_description_nslcmop
= None
2181 db_nsr_update
["detailed-status"] = "Done"
2182 db_nslcmop_update
["detailed-status"] = "Done"
2183 nslcmop_operation_state
= "COMPLETED"
2186 self
._write
_ns
_status
(
2189 current_operation
="IDLE",
2190 current_operation_id
=None,
2191 error_description
=error_description_nsr
,
2192 error_detail
=error_detail
,
2193 other_update
=db_nsr_update
2195 self
._write
_op
_status
(
2198 error_message
=error_description_nslcmop
,
2199 operation_state
=nslcmop_operation_state
,
2200 other_update
=db_nslcmop_update
,
2203 if nslcmop_operation_state
:
2205 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2206 "operationState": nslcmop_operation_state
},
2208 except Exception as e
:
2209 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2211 self
.logger
.debug(logging_text
+ "Exit")
2212 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2214 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2215 timeout
: int = 3600, vca_type
: str = None) -> bool:
2218 # 1. find all relations for this VCA
2219 # 2. wait for other peers related
2223 vca_type
= vca_type
or "lxc_proxy_charm"
2225 # STEP 1: find all relations for this VCA
2228 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2229 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2232 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2234 # read all ns-configuration relations
2235 ns_relations
= list()
2236 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2238 for r
in db_ns_relations
:
2239 # check if this VCA is in the relation
2240 if my_vca
.get('member-vnf-index') in\
2241 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2242 ns_relations
.append(r
)
2244 # read all vnf-configuration relations
2245 vnf_relations
= list()
2246 db_vnfd_list
= db_nsr
.get('vnfd-id')
2248 for vnfd
in db_vnfd_list
:
2249 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2250 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2251 if db_vnf_relations
:
2252 for r
in db_vnf_relations
:
2253 # check if this VCA is in the relation
2254 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2255 vnf_relations
.append(r
)
2257 # if no relations, terminate
2258 if not ns_relations
and not vnf_relations
:
2259 self
.logger
.debug(logging_text
+ ' No relations')
2262 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2269 if now
- start
>= timeout
:
2270 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2273 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2274 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2276 # for each defined NS relation, find the VCA's related
2277 for r
in ns_relations
:
2278 from_vca_ee_id
= None
2280 from_vca_endpoint
= None
2281 to_vca_endpoint
= None
2282 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2283 for vca
in vca_list
:
2284 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2285 and vca
.get('config_sw_installed'):
2286 from_vca_ee_id
= vca
.get('ee_id')
2287 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2288 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2289 and vca
.get('config_sw_installed'):
2290 to_vca_ee_id
= vca
.get('ee_id')
2291 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2292 if from_vca_ee_id
and to_vca_ee_id
:
2294 await self
.vca_map
[vca_type
].add_relation(
2295 ee_id_1
=from_vca_ee_id
,
2296 ee_id_2
=to_vca_ee_id
,
2297 endpoint_1
=from_vca_endpoint
,
2298 endpoint_2
=to_vca_endpoint
)
2299 # remove entry from relations list
2300 ns_relations
.remove(r
)
2302 # check failed peers
2304 vca_status_list
= db_nsr
.get('configurationStatus')
2306 for i
in range(len(vca_list
)):
2308 vca_status
= vca_status_list
[i
]
2309 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2310 if vca_status
.get('status') == 'BROKEN':
2311 # peer broken: remove relation from list
2312 ns_relations
.remove(r
)
2313 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2314 if vca_status
.get('status') == 'BROKEN':
2315 # peer broken: remove relation from list
2316 ns_relations
.remove(r
)
2321 # for each defined VNF relation, find the VCA's related
2322 for r
in vnf_relations
:
2323 from_vca_ee_id
= None
2325 from_vca_endpoint
= None
2326 to_vca_endpoint
= None
2327 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2328 for vca
in vca_list
:
2329 if vca
.get('vdu_id') == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2330 from_vca_ee_id
= vca
.get('ee_id')
2331 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2332 if vca
.get('vdu_id') == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2333 to_vca_ee_id
= vca
.get('ee_id')
2334 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2335 if from_vca_ee_id
and to_vca_ee_id
:
2337 await self
.vca_map
[vca_type
].add_relation(
2338 ee_id_1
=from_vca_ee_id
,
2339 ee_id_2
=to_vca_ee_id
,
2340 endpoint_1
=from_vca_endpoint
,
2341 endpoint_2
=to_vca_endpoint
)
2342 # remove entry from relations list
2343 vnf_relations
.remove(r
)
2345 # check failed peers
2347 vca_status_list
= db_nsr
.get('configurationStatus')
2349 for i
in range(len(vca_list
)):
2351 vca_status
= vca_status_list
[i
]
2352 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2353 if vca_status
.get('status') == 'BROKEN':
2354 # peer broken: remove relation from list
2355 ns_relations
.remove(r
)
2356 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2357 if vca_status
.get('status') == 'BROKEN':
2358 # peer broken: remove relation from list
2359 ns_relations
.remove(r
)
2365 await asyncio
.sleep(5.0)
2367 if not ns_relations
and not vnf_relations
:
2368 self
.logger
.debug('Relations added')
2373 except Exception as e
:
2374 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2377 def _write_db_callback(self
, task
, item
, _id
, on_done
=None, on_exc
=None):
2379 callback for kdu install intended to store the returned kdu_instance at database
2384 result
= task
.result()
2386 db_update
[on_done
] = str(result
)
2387 except Exception as e
:
2389 db_update
[on_exc
] = str(e
)
2392 self
.update_db_2(item
, _id
, db_update
)
2396 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2397 # Launch kdus if present in the descriptor
2399 k8scluster_id_2_uuic
= {"helm-chart": {}, "juju-bundle": {}}
2401 def _get_cluster_id(cluster_id
, cluster_type
):
2402 nonlocal k8scluster_id_2_uuic
2403 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2404 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2406 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2407 if not db_k8scluster
:
2408 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2409 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2411 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id
, cluster_type
))
2412 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2415 logging_text
+= "Deploy kdus: "
2418 db_nsr_update
= {"_admin.deployed.K8s": []}
2419 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2422 updated_cluster_list
= []
2424 for vnfr_data
in db_vnfrs
.values():
2425 for kdur
in get_iterable(vnfr_data
, "kdur"):
2426 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
2427 vnfd_id
= vnfr_data
.get('vnfd-id')
2428 namespace
= kdur
.get("k8s-namespace")
2429 if kdur
.get("helm-chart"):
2430 kdumodel
= kdur
["helm-chart"]
2431 k8sclustertype
= "helm-chart"
2432 elif kdur
.get("juju-bundle"):
2433 kdumodel
= kdur
["juju-bundle"]
2434 k8sclustertype
= "juju-bundle"
2436 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2437 "juju-bundle. Maybe an old NBI version is running".
2438 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2439 # check if kdumodel is a file and exists
2441 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2442 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2443 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2444 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2446 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2447 kdumodel
= self
.fs
.path
+ filename
2448 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2450 except Exception: # it is not a file
2453 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2454 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2455 cluster_uuid
= _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2457 if k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
:
2458 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2459 self
.k8sclusterhelm
.synchronize_repos(cluster_uuid
=cluster_uuid
))
2460 if del_repo_list
or added_repo_dict
:
2461 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2462 updated
= {'_admin.helm_charts_added.' +
2463 item
: name
for item
, name
in added_repo_dict
.items()}
2464 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster '{}' to_delete: {}, "
2465 "to_add: {}".format(k8s_cluster_id
, del_repo_list
,
2467 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2468 updated_cluster_list
.append(cluster_uuid
)
2470 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2471 kdur
["kdu-name"], k8s_cluster_id
)
2473 k8s_instace_info
= {"kdu-instance": None,
2474 "k8scluster-uuid": cluster_uuid
,
2475 "k8scluster-type": k8sclustertype
,
2476 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2477 "kdu-name": kdur
["kdu-name"],
2478 "kdu-model": kdumodel
,
2479 "namespace": namespace
}
2480 db_path
= "_admin.deployed.K8s.{}".format(index
)
2481 db_nsr_update
[db_path
] = k8s_instace_info
2482 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2484 db_dict
= {"collection": "nsrs",
2485 "filter": {"_id": nsr_id
},
2488 task
= asyncio
.ensure_future(
2489 self
.k8scluster_map
[k8sclustertype
].install(cluster_uuid
=cluster_uuid
, kdu_model
=kdumodel
,
2490 atomic
=True, params
=desc_params
,
2491 db_dict
=db_dict
, timeout
=600,
2492 kdu_name
=kdur
["kdu-name"], namespace
=namespace
))
2494 task
.add_done_callback(partial(self
._write
_db
_callback
, item
="nsrs", _id
=nsr_id
,
2495 on_done
=db_path
+ ".kdu-instance",
2496 on_exc
=db_path
+ ".detailed-status"))
2497 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2498 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2502 except (LcmException
, asyncio
.CancelledError
):
2504 except Exception as e
:
2505 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2506 if isinstance(e
, (N2VCException
, DbException
)):
2507 self
.logger
.error(logging_text
+ msg
)
2509 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2510 raise LcmException(msg
)
2513 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2515 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2516 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2517 base_folder
, task_instantiation_info
, stage
):
2518 # launch instantiate_N2VC in a asyncio task and register task object
2519 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2520 # if not found, create one entry and update database
2521 # fill db_nsr._admin.deployed.VCA.<index>
2523 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2524 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2525 ee_list
= [descriptor_config
]
2526 elif descriptor_config
.get("execution-environment-list"):
2527 ee_list
= descriptor_config
.get("execution-environment-list")
2528 else: # other types as script are not supported
2531 for ee_item
in ee_list
:
2532 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2533 ee_item
.get("helm-chart")))
2534 if ee_item
.get("juju"):
2535 vca_name
= ee_item
['juju'].get('charm')
2536 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2537 if ee_item
['juju'].get('cloud') == "k8s":
2538 vca_type
= "k8s_proxy_charm"
2539 elif ee_item
['juju'].get('proxy') is False:
2540 vca_type
= "native_charm"
2541 elif ee_item
.get("helm-chart"):
2542 vca_name
= ee_item
['helm-chart']
2545 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2549 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2550 if not vca_deployed
:
2552 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2553 vca_deployed
.get("vdu_id") == vdu_id
and \
2554 vca_deployed
.get("kdu_name") == kdu_name
and \
2555 vca_deployed
.get("vdu_count_index", 0) == vdu_index
:
2558 # not found, create one.
2560 "member-vnf-index": member_vnf_index
,
2562 "kdu_name": kdu_name
,
2563 "vdu_count_index": vdu_index
,
2564 "operational-status": "init", # TODO revise
2565 "detailed-status": "", # TODO revise
2566 "step": "initial-deploy", # TODO revise
2568 "vdu_name": vdu_name
,
2573 # create VCA and configurationStatus in db
2575 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2576 "configurationStatus.{}".format(vca_index
): dict()
2578 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2580 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2583 task_n2vc
= asyncio
.ensure_future(
2584 self
.instantiate_N2VC(
2585 logging_text
=logging_text
,
2586 vca_index
=vca_index
,
2592 vdu_index
=vdu_index
,
2593 deploy_params
=deploy_params
,
2594 config_descriptor
=descriptor_config
,
2595 base_folder
=base_folder
,
2596 nslcmop_id
=nslcmop_id
,
2602 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2603 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2604 member_vnf_index
or "", vdu_id
or "")
2606 # Check if this VNFD has a configured terminate action
2607 def _has_terminate_config_primitive(self
, vnfd
):
2608 vnf_config
= vnfd
.get("vnf-configuration")
2609 if vnf_config
and vnf_config
.get("terminate-config-primitive"):
2615 def _get_terminate_config_primitive_seq_list(vnfd
):
2616 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2617 # No need to check for existing primitive twice, already done before
2618 vnf_config
= vnfd
.get("vnf-configuration")
2619 seq_list
= vnf_config
.get("terminate-config-primitive")
2620 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2621 seq_list_sorted
= sorted(seq_list
, key
=lambda x
: int(x
['seq']))
2622 return seq_list_sorted
2625 def _create_nslcmop(nsr_id
, operation
, params
):
2627 Creates a ns-lcm-opp content to be stored at database.
2628 :param nsr_id: internal id of the instance
2629 :param operation: instantiate, terminate, scale, action, ...
2630 :param params: user parameters for the operation
2631 :return: dictionary following SOL005 format
2633 # Raise exception if invalid arguments
2634 if not (nsr_id
and operation
and params
):
2636 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2642 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2643 "operationState": "PROCESSING",
2644 "statusEnteredTime": now
,
2645 "nsInstanceId": nsr_id
,
2646 "lcmOperationType": operation
,
2648 "isAutomaticInvocation": False,
2649 "operationParams": params
,
2650 "isCancelPending": False,
2652 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2653 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2658 def _format_additional_params(self
, params
):
2659 params
= params
or {}
2660 for key
, value
in params
.items():
2661 if str(value
).startswith("!!yaml "):
2662 params
[key
] = yaml
.safe_load(value
[7:])
2665 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2666 primitive
= seq
.get('name')
2667 primitive_params
= {}
2669 "member_vnf_index": vnf_index
,
2670 "primitive": primitive
,
2671 "primitive_params": primitive_params
,
2674 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2678 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2679 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2680 if op
.get('operationState') == 'COMPLETED':
2681 # b. Skip sub-operation
2682 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2683 return self
.SUBOPERATION_STATUS_SKIP
2685 # c. retry executing sub-operation
2686 # The sub-operation exists, and operationState != 'COMPLETED'
2687 # Update operationState = 'PROCESSING' to indicate a retry.
2688 operationState
= 'PROCESSING'
2689 detailed_status
= 'In progress'
2690 self
._update
_suboperation
_status
(
2691 db_nslcmop
, op_index
, operationState
, detailed_status
)
2692 # Return the sub-operation index
2693 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2694 # with arguments extracted from the sub-operation
2697 # Find a sub-operation where all keys in a matching dictionary must match
2698 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2699 def _find_suboperation(self
, db_nslcmop
, match
):
2700 if db_nslcmop
and match
:
2701 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2702 for i
, op
in enumerate(op_list
):
2703 if all(op
.get(k
) == match
[k
] for k
in match
):
2705 return self
.SUBOPERATION_STATUS_NOT_FOUND
2707 # Update status for a sub-operation given its index
2708 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2709 # Update DB for HA tasks
2710 q_filter
= {'_id': db_nslcmop
['_id']}
2711 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2712 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2713 self
.db
.set_one("nslcmops",
2715 update_dict
=update_dict
,
2716 fail_on_empty
=False)
2718 # Add sub-operation, return the index of the added sub-operation
2719 # Optionally, set operationState, detailed-status, and operationType
2720 # Status and type are currently set for 'scale' sub-operations:
2721 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2722 # 'detailed-status' : status message
2723 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2724 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2725 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2726 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2727 RO_nsr_id
=None, RO_scaling_info
=None):
2729 return self
.SUBOPERATION_STATUS_NOT_FOUND
2730 # Get the "_admin.operations" list, if it exists
2731 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2732 op_list
= db_nslcmop_admin
.get('operations')
2733 # Create or append to the "_admin.operations" list
2734 new_op
= {'member_vnf_index': vnf_index
,
2736 'vdu_count_index': vdu_count_index
,
2737 'primitive': primitive
,
2738 'primitive_params': mapped_primitive_params
}
2740 new_op
['operationState'] = operationState
2742 new_op
['detailed-status'] = detailed_status
2744 new_op
['lcmOperationType'] = operationType
2746 new_op
['RO_nsr_id'] = RO_nsr_id
2748 new_op
['RO_scaling_info'] = RO_scaling_info
2750 # No existing operations, create key 'operations' with current operation as first list element
2751 db_nslcmop_admin
.update({'operations': [new_op
]})
2752 op_list
= db_nslcmop_admin
.get('operations')
2754 # Existing operations, append operation to list
2755 op_list
.append(new_op
)
2757 db_nslcmop_update
= {'_admin.operations': op_list
}
2758 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2759 op_index
= len(op_list
) - 1
2762 # Helper methods for scale() sub-operations
2764 # pre-scale/post-scale:
2765 # Check for 3 different cases:
2766 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2767 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2768 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2769 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2770 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2771 # Find this sub-operation
2772 if RO_nsr_id
and RO_scaling_info
:
2773 operationType
= 'SCALE-RO'
2775 'member_vnf_index': vnf_index
,
2776 'RO_nsr_id': RO_nsr_id
,
2777 'RO_scaling_info': RO_scaling_info
,
2781 'member_vnf_index': vnf_index
,
2782 'primitive': vnf_config_primitive
,
2783 'primitive_params': primitive_params
,
2784 'lcmOperationType': operationType
2786 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2787 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2788 # a. New sub-operation
2789 # The sub-operation does not exist, add it.
2790 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2791 # The following parameters are set to None for all kind of scaling:
2793 vdu_count_index
= None
2795 if RO_nsr_id
and RO_scaling_info
:
2796 vnf_config_primitive
= None
2797 primitive_params
= None
2800 RO_scaling_info
= None
2801 # Initial status for sub-operation
2802 operationState
= 'PROCESSING'
2803 detailed_status
= 'In progress'
2804 # Add sub-operation for pre/post-scaling (zero or more operations)
2805 self
._add
_suboperation
(db_nslcmop
,
2810 vnf_config_primitive
,
2817 return self
.SUBOPERATION_STATUS_NEW
2819 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2820 # or op_index (operationState != 'COMPLETED')
2821 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2823 # Function to return execution_environment id
2825 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2826 # TODO vdu_index_count
2827 for vca
in vca_deployed_list
:
2828 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2831 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2832 vca_index
, destroy_ee
=True, exec_primitives
=True):
2834 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2835 :param logging_text:
2837 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2838 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2839 :param vca_index: index in the database _admin.deployed.VCA
2840 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2841 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2842 not executed properly
2843 :return: None or exception
2847 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2848 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2852 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2854 # execute terminate_primitives
2856 terminate_primitives
= config_descriptor
.get("terminate-config-primitive")
2857 vdu_id
= vca_deployed
.get("vdu_id")
2858 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2859 vdu_name
= vca_deployed
.get("vdu_name")
2860 vnf_index
= vca_deployed
.get("member-vnf-index")
2861 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2862 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2863 terminate_primitives
= sorted(terminate_primitives
, key
=lambda x
: int(x
['seq']))
2864 for seq
in terminate_primitives
:
2865 # For each sequence in list, get primitive and call _ns_execute_primitive()
2866 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2867 vnf_index
, seq
.get("name"))
2868 self
.logger
.debug(logging_text
+ step
)
2869 # Create the primitive for each sequence, i.e. "primitive": "touch"
2870 primitive
= seq
.get('name')
2871 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2872 # The following 3 parameters are currently set to None for 'terminate':
2873 # vdu_id, vdu_count_index, vdu_name
2876 self
._add
_suboperation
(db_nslcmop
,
2882 mapped_primitive_params
)
2883 # Sub-operations: Call _ns_execute_primitive() instead of action()
2885 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2886 mapped_primitive_params
,
2888 except LcmException
:
2889 # this happens when VCA is not deployed. In this case it is not needed to terminate
2891 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2892 if result
not in result_ok
:
2893 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2894 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2895 # set that this VCA do not need terminated
2896 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2897 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2900 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2902 async def _delete_all_N2VC(self
, db_nsr
: dict):
2903 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2904 namespace
= "." + db_nsr
["_id"]
2906 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2907 except N2VCNotFound
: # already deleted. Skip
2909 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2911 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2913 Terminates a deployment from RO
2914 :param logging_text:
2915 :param nsr_deployed: db_nsr._admin.deployed
2918 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2919 this method will update only the index 2, but it will write on database the concatenated content of the list
2924 ro_nsr_id
= ro_delete_action
= None
2925 if nsr_deployed
and nsr_deployed
.get("RO"):
2926 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2927 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2930 stage
[2] = "Deleting ns from VIM."
2931 db_nsr_update
["detailed-status"] = " ".join(stage
)
2932 self
._write
_op
_status
(nslcmop_id
, stage
)
2933 self
.logger
.debug(logging_text
+ stage
[2])
2934 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2935 self
._write
_op
_status
(nslcmop_id
, stage
)
2936 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2937 ro_delete_action
= desc
["action_id"]
2938 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2939 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2940 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2941 if ro_delete_action
:
2942 # wait until NS is deleted from VIM
2943 stage
[2] = "Waiting ns deleted from VIM."
2944 detailed_status_old
= None
2945 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2947 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2948 self
._write
_op
_status
(nslcmop_id
, stage
)
2950 delete_timeout
= 20 * 60 # 20 minutes
2951 while delete_timeout
> 0:
2952 desc
= await self
.RO
.show(
2954 item_id_name
=ro_nsr_id
,
2955 extra_item
="action",
2956 extra_item_id
=ro_delete_action
)
2959 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2961 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2962 if ns_status
== "ERROR":
2963 raise ROclient
.ROClientException(ns_status_info
)
2964 elif ns_status
== "BUILD":
2965 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2966 elif ns_status
== "ACTIVE":
2967 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2968 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2971 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2972 if stage
[2] != detailed_status_old
:
2973 detailed_status_old
= stage
[2]
2974 db_nsr_update
["detailed-status"] = " ".join(stage
)
2975 self
._write
_op
_status
(nslcmop_id
, stage
)
2976 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2977 await asyncio
.sleep(5, loop
=self
.loop
)
2979 else: # delete_timeout <= 0:
2980 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2982 except Exception as e
:
2983 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2984 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2985 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2986 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2987 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2988 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2989 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2990 failed_detail
.append("delete conflict: {}".format(e
))
2991 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2993 failed_detail
.append("delete error: {}".format(e
))
2994 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2997 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2998 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3000 stage
[2] = "Deleting nsd from RO."
3001 db_nsr_update
["detailed-status"] = " ".join(stage
)
3002 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3003 self
._write
_op
_status
(nslcmop_id
, stage
)
3004 await self
.RO
.delete("nsd", ro_nsd_id
)
3005 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3006 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3007 except Exception as e
:
3008 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3009 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3010 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3011 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3012 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3013 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3015 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3016 self
.logger
.error(logging_text
+ failed_detail
[-1])
3018 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3019 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3020 if not vnf_deployed
or not vnf_deployed
["id"]:
3023 ro_vnfd_id
= vnf_deployed
["id"]
3024 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3025 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3026 db_nsr_update
["detailed-status"] = " ".join(stage
)
3027 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3028 self
._write
_op
_status
(nslcmop_id
, stage
)
3029 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3030 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3031 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3032 except Exception as e
:
3033 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3034 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3035 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3036 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3037 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3038 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3040 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3041 self
.logger
.error(logging_text
+ failed_detail
[-1])
3044 stage
[2] = "Error deleting from VIM"
3046 stage
[2] = "Deleted from VIM"
3047 db_nsr_update
["detailed-status"] = " ".join(stage
)
3048 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3049 self
._write
_op
_status
(nslcmop_id
, stage
)
3052 raise LcmException("; ".join(failed_detail
))
3054 async def terminate(self
, nsr_id
, nslcmop_id
):
3055 # Try to lock HA task here
3056 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3057 if not task_is_locked_by_me
:
3060 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3061 self
.logger
.debug(logging_text
+ "Enter")
3062 timeout_ns_terminate
= self
.timeout_ns_terminate
3065 operation_params
= None
3067 error_list
= [] # annotates all failed error messages
3068 db_nslcmop_update
= {}
3069 autoremove
= False # autoremove after terminated
3070 tasks_dict_info
= {}
3072 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3073 # ^ contains [stage, step, VIM-status]
3075 # wait for any previous tasks in process
3076 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3078 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3079 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3080 operation_params
= db_nslcmop
.get("operationParams") or {}
3081 if operation_params
.get("timeout_ns_terminate"):
3082 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3083 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3084 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3086 db_nsr_update
["operational-status"] = "terminating"
3087 db_nsr_update
["config-status"] = "terminating"
3088 self
._write
_ns
_status
(
3090 ns_state
="TERMINATING",
3091 current_operation
="TERMINATING",
3092 current_operation_id
=nslcmop_id
,
3093 other_update
=db_nsr_update
3095 self
._write
_op
_status
(
3100 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3101 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3104 stage
[1] = "Getting vnf descriptors from db."
3105 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3106 db_vnfds_from_id
= {}
3107 db_vnfds_from_member_index
= {}
3109 for vnfr
in db_vnfrs_list
:
3110 vnfd_id
= vnfr
["vnfd-id"]
3111 if vnfd_id
not in db_vnfds_from_id
:
3112 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3113 db_vnfds_from_id
[vnfd_id
] = vnfd
3114 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3116 # Destroy individual execution environments when there are terminating primitives.
3117 # Rest of EE will be deleted at once
3118 # TODO - check before calling _destroy_N2VC
3119 # if not operation_params.get("skip_terminate_primitives"):#
3120 # or not vca.get("needed_terminate"):
3121 stage
[0] = "Stage 2/3 execute terminating primitives."
3122 self
.logger
.debug(logging_text
+ stage
[0])
3123 stage
[1] = "Looking execution environment that needs terminate."
3124 self
.logger
.debug(logging_text
+ stage
[1])
3125 self
.logger
.debug("nsr_deployed: {}".format(nsr_deployed
))
3126 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3127 self
.logger
.debug("vca_index: {}, vca: {}".format(vca_index
, vca
))
3128 config_descriptor
= None
3129 if not vca
or not vca
.get("ee_id"):
3131 if not vca
.get("member-vnf-index"):
3133 config_descriptor
= db_nsr
.get("ns-configuration")
3134 elif vca
.get("vdu_id"):
3135 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3136 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3138 config_descriptor
= vdud
.get("vdu-configuration")
3139 elif vca
.get("kdu_name"):
3140 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3141 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3143 config_descriptor
= kdud
.get("kdu-configuration")
3145 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3146 # For helm we must destroy_ee
3147 vca_type
= vca
.get("type")
3148 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3149 vca
.get("needed_terminate"))
3150 self
.logger
.debug("vca type: {}".format(vca_type
))
3151 if not vca_type
== "helm":
3152 task
= asyncio
.ensure_future(self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
,
3153 vca_index
, False, exec_terminate_primitives
))
3155 task
= asyncio
.ensure_future(self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
,
3156 vca_index
, True, exec_terminate_primitives
))
3157 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3159 # wait for pending tasks of terminate primitives
3161 self
.logger
.debug(logging_text
+ 'Waiting for terminate primitive pending tasks...')
3162 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3163 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3166 return # raise LcmException("; ".join(error_list))
3167 tasks_dict_info
.clear()
3169 # remove All execution environments at once
3170 stage
[0] = "Stage 3/3 delete all."
3172 if nsr_deployed
.get("VCA"):
3173 stage
[1] = "Deleting all execution environments."
3174 self
.logger
.debug(logging_text
+ stage
[1])
3175 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3176 timeout
=self
.timeout_charm_delete
))
3177 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3178 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3180 # Delete from k8scluster
3181 stage
[1] = "Deleting KDUs."
3182 self
.logger
.debug(logging_text
+ stage
[1])
3183 # print(nsr_deployed)
3184 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3185 if not kdu
or not kdu
.get("kdu-instance"):
3187 kdu_instance
= kdu
.get("kdu-instance")
3188 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3189 task_delete_kdu_instance
= asyncio
.ensure_future(
3190 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3191 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3192 kdu_instance
=kdu_instance
))
3194 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3195 format(kdu
.get("k8scluster-type")))
3197 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3200 stage
[1] = "Deleting ns from VIM."
3202 task_delete_ro
= asyncio
.ensure_future(
3203 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3205 task_delete_ro
= asyncio
.ensure_future(
3206 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3207 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3209 # rest of staff will be done at finally
3211 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3212 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3214 except asyncio
.CancelledError
:
3215 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3216 exc
= "Operation was cancelled"
3217 except Exception as e
:
3218 exc
= traceback
.format_exc()
3219 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3222 error_list
.append(str(exc
))
3224 # wait for pending tasks
3226 stage
[1] = "Waiting for terminate pending tasks."
3227 self
.logger
.debug(logging_text
+ stage
[1])
3228 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3230 stage
[1] = stage
[2] = ""
3231 except asyncio
.CancelledError
:
3232 error_list
.append("Cancelled")
3233 # TODO cancell all tasks
3234 except Exception as exc
:
3235 error_list
.append(str(exc
))
3236 # update status at database
3238 error_detail
= "; ".join(error_list
)
3239 # self.logger.error(logging_text + error_detail)
3240 error_description_nslcmop
= 'Stage: {}. Detail: {}'.format(stage
[0], error_detail
)
3241 error_description_nsr
= 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id
, stage
[0])
3243 db_nsr_update
["operational-status"] = "failed"
3244 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3245 db_nslcmop_update
["detailed-status"] = error_detail
3246 nslcmop_operation_state
= "FAILED"
3250 error_description_nsr
= error_description_nslcmop
= None
3251 ns_state
= "NOT_INSTANTIATED"
3252 db_nsr_update
["operational-status"] = "terminated"
3253 db_nsr_update
["detailed-status"] = "Done"
3254 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3255 db_nslcmop_update
["detailed-status"] = "Done"
3256 nslcmop_operation_state
= "COMPLETED"
3259 self
._write
_ns
_status
(
3262 current_operation
="IDLE",
3263 current_operation_id
=None,
3264 error_description
=error_description_nsr
,
3265 error_detail
=error_detail
,
3266 other_update
=db_nsr_update
3268 self
._write
_op
_status
(
3271 error_message
=error_description_nslcmop
,
3272 operation_state
=nslcmop_operation_state
,
3273 other_update
=db_nslcmop_update
,
3275 if operation_params
:
3276 autoremove
= operation_params
.get("autoremove", False)
3277 if nslcmop_operation_state
:
3279 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3280 "operationState": nslcmop_operation_state
,
3281 "autoremove": autoremove
},
3283 except Exception as e
:
3284 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3286 self
.logger
.debug(logging_text
+ "Exit")
3287 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3289 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3291 error_detail_list
= []
3293 pending_tasks
= list(created_tasks_info
.keys())
3294 num_tasks
= len(pending_tasks
)
3296 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3297 self
._write
_op
_status
(nslcmop_id
, stage
)
3298 while pending_tasks
:
3300 _timeout
= timeout
+ time_start
- time()
3301 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3302 return_when
=asyncio
.FIRST_COMPLETED
)
3303 num_done
+= len(done
)
3304 if not done
: # Timeout
3305 for task
in pending_tasks
:
3306 new_error
= created_tasks_info
[task
] + ": Timeout"
3307 error_detail_list
.append(new_error
)
3308 error_list
.append(new_error
)
3311 if task
.cancelled():
3314 exc
= task
.exception()
3316 if isinstance(exc
, asyncio
.TimeoutError
):
3318 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3319 error_list
.append(created_tasks_info
[task
])
3320 error_detail_list
.append(new_error
)
3321 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3323 self
.logger
.error(logging_text
+ new_error
)
3325 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3326 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + exc_traceback
)
3328 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3329 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3331 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3332 if nsr_id
: # update also nsr
3333 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3334 "errorDetail": ". ".join(error_detail_list
)})
3335 self
._write
_op
_status
(nslcmop_id
, stage
)
3336 return error_detail_list
3339 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3341 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3342 The default-value is used. If it is between < > it look for a value at instantiation_params
3343 :param primitive_desc: portion of VNFD/NSD that describes primitive
3344 :param params: Params provided by user
3345 :param instantiation_params: Instantiation params provided by user
3346 :return: a dictionary with the calculated params
3348 calculated_params
= {}
3349 for parameter
in primitive_desc
.get("parameter", ()):
3350 param_name
= parameter
["name"]
3351 if param_name
in params
:
3352 calculated_params
[param_name
] = params
[param_name
]
3353 elif "default-value" in parameter
or "value" in parameter
:
3354 if "value" in parameter
:
3355 calculated_params
[param_name
] = parameter
["value"]
3357 calculated_params
[param_name
] = parameter
["default-value"]
3358 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3359 and calculated_params
[param_name
].endswith(">"):
3360 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3361 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3363 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3364 format(calculated_params
[param_name
], primitive_desc
["name"]))
3366 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3367 format(param_name
, primitive_desc
["name"]))
3369 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3370 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
], default_flow_style
=True,
3372 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3373 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3375 # add always ns_config_info if primitive name is config
3376 if primitive_desc
["name"] == "config":
3377 if "ns_config_info" in instantiation_params
:
3378 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3379 return calculated_params
3381 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None):
3382 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3383 for vca
in deployed_vca
:
3386 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3388 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3390 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3394 # vca_deployed not found
3395 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3396 "deployed".format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3399 ee_id
= vca
.get("ee_id")
3400 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3402 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3403 "execution environment"
3404 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3405 return ee_id
, vca_type
3407 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0,
3408 retries_interval
=30, timeout
=None,
3409 vca_type
=None, db_dict
=None) -> (str, str):
3411 if primitive
== "config":
3412 primitive_params
= {"params": primitive_params
}
3414 vca_type
= vca_type
or "lxc_proxy_charm"
3418 output
= await asyncio
.wait_for(
3419 self
.vca_map
[vca_type
].exec_primitive(
3421 primitive_name
=primitive
,
3422 params_dict
=primitive_params
,
3423 progress_timeout
=self
.timeout_progress_primitive
,
3424 total_timeout
=self
.timeout_primitive
,
3426 timeout
=timeout
or self
.timeout_primitive
)
3429 except asyncio
.CancelledError
:
3431 except Exception as e
: # asyncio.TimeoutError
3432 if isinstance(e
, asyncio
.TimeoutError
):
3436 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3438 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3440 return 'FAILED', str(e
)
3442 return 'COMPLETED', output
3444 except (LcmException
, asyncio
.CancelledError
):
3446 except Exception as e
:
3447 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3449 async def action(self
, nsr_id
, nslcmop_id
):
3451 # Try to lock HA task here
3452 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3453 if not task_is_locked_by_me
:
3456 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3457 self
.logger
.debug(logging_text
+ "Enter")
3458 # get all needed from database
3462 db_nslcmop_update
= {}
3463 nslcmop_operation_state
= None
3464 error_description_nslcmop
= None
3467 # wait for any previous tasks in process
3468 step
= "Waiting for previous operations to terminate"
3469 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3471 self
._write
_ns
_status
(
3474 current_operation
="RUNNING ACTION",
3475 current_operation_id
=nslcmop_id
3478 step
= "Getting information from database"
3479 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3480 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3482 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3483 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3484 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3485 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3486 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3487 primitive
= db_nslcmop
["operationParams"]["primitive"]
3488 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3489 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3492 step
= "Getting vnfr from database"
3493 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3494 step
= "Getting vnfd from database"
3495 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3497 step
= "Getting nsd from database"
3498 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3500 # for backward compatibility
3501 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3502 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3503 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3504 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3506 # look for primitive
3507 config_primitive_desc
= None
3509 for vdu
in get_iterable(db_vnfd
, "vdu"):
3510 if vdu_id
== vdu
["id"]:
3511 for config_primitive
in deep_get(vdu
, ("vdu-configuration", "config-primitive"), ()):
3512 if config_primitive
["name"] == primitive
:
3513 config_primitive_desc
= config_primitive
3517 for kdu
in get_iterable(db_vnfd
, "kdu"):
3518 if kdu_name
== kdu
["name"]:
3519 for config_primitive
in deep_get(kdu
, ("kdu-configuration", "config-primitive"), ()):
3520 if config_primitive
["name"] == primitive
:
3521 config_primitive_desc
= config_primitive
3525 for config_primitive
in deep_get(db_vnfd
, ("vnf-configuration", "config-primitive"), ()):
3526 if config_primitive
["name"] == primitive
:
3527 config_primitive_desc
= config_primitive
3530 for config_primitive
in deep_get(db_nsd
, ("ns-configuration", "config-primitive"), ()):
3531 if config_primitive
["name"] == primitive
:
3532 config_primitive_desc
= config_primitive
3535 if not config_primitive_desc
and not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3536 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3541 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3542 desc_params
= self
._format
_additional
_params
(vdur
.get("additionalParams"))
3544 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3545 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
3547 desc_params
= self
._format
_additional
_params
(db_vnfr
.get("additionalParamsForVnf"))
3549 desc_params
= self
._format
_additional
_params
(db_nsr
.get("additionalParamsForNs"))
3552 kdu_action
= True if not deep_get(kdu
, ("kdu-configuration", "juju")) else False
3554 # TODO check if ns is in a proper status
3555 if kdu_name
and (primitive
in ("upgrade", "rollback", "status") or kdu_action
):
3556 # kdur and desc_params already set from before
3557 if primitive_params
:
3558 desc_params
.update(primitive_params
)
3559 # TODO Check if we will need something at vnf level
3560 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3561 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3564 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3566 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3567 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3568 raise LcmException(msg
)
3570 db_dict
= {"collection": "nsrs",
3571 "filter": {"_id": nsr_id
},
3572 "path": "_admin.deployed.K8s.{}".format(index
)}
3573 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive
, vnf_index
, kdu_name
))
3574 step
= "Executing kdu {}".format(primitive
)
3575 if primitive
== "upgrade":
3576 if desc_params
.get("kdu_model"):
3577 kdu_model
= desc_params
.get("kdu_model")
3578 del desc_params
["kdu_model"]
3580 kdu_model
= kdu
.get("kdu-model")
3581 parts
= kdu_model
.split(sep
=":")
3583 kdu_model
= parts
[0]
3585 detailed_status
= await asyncio
.wait_for(
3586 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3587 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3588 kdu_instance
=kdu
.get("kdu-instance"),
3589 atomic
=True, kdu_model
=kdu_model
,
3590 params
=desc_params
, db_dict
=db_dict
,
3591 timeout
=timeout_ns_action
),
3592 timeout
=timeout_ns_action
+ 10)
3593 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3594 elif primitive
== "rollback":
3595 detailed_status
= await asyncio
.wait_for(
3596 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3597 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3598 kdu_instance
=kdu
.get("kdu-instance"),
3600 timeout
=timeout_ns_action
)
3601 elif primitive
== "status":
3602 detailed_status
= await asyncio
.wait_for(
3603 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3604 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3605 kdu_instance
=kdu
.get("kdu-instance")),
3606 timeout
=timeout_ns_action
)
3608 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3609 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3611 detailed_status
= await asyncio
.wait_for(
3612 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3613 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3614 kdu_instance
=kdu_instance
,
3615 primitive_name
=primitive
,
3616 params
=params
, db_dict
=db_dict
,
3617 timeout
=timeout_ns_action
),
3618 timeout
=timeout_ns_action
)
3621 nslcmop_operation_state
= 'COMPLETED'
3623 detailed_status
= ''
3624 nslcmop_operation_state
= 'FAILED'
3626 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3627 member_vnf_index
=vnf_index
,
3629 vdu_count_index
=vdu_count_index
)
3630 db_nslcmop_notif
= {"collection": "nslcmops",
3631 "filter": {"_id": nslcmop_id
},
3632 "path": "admin.VCA"}
3633 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3635 primitive
=primitive
,
3636 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3637 timeout
=timeout_ns_action
,
3639 db_dict
=db_nslcmop_notif
)
3641 db_nslcmop_update
["detailed-status"] = detailed_status
3642 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3643 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3645 return # database update is called inside finally
3647 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3648 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3650 except asyncio
.CancelledError
:
3651 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3652 exc
= "Operation was cancelled"
3653 except asyncio
.TimeoutError
:
3654 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3656 except Exception as e
:
3657 exc
= traceback
.format_exc()
3658 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3661 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3662 "FAILED {}: {}".format(step
, exc
)
3663 nslcmop_operation_state
= "FAILED"
3665 self
._write
_ns
_status
(
3667 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3668 current_operation
="IDLE",
3669 current_operation_id
=None,
3670 # error_description=error_description_nsr,
3671 # error_detail=error_detail,
3672 other_update
=db_nsr_update
3675 self
._write
_op
_status
(
3678 error_message
=error_description_nslcmop
,
3679 operation_state
=nslcmop_operation_state
,
3680 other_update
=db_nslcmop_update
,
3683 if nslcmop_operation_state
:
3685 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3686 "operationState": nslcmop_operation_state
},
3688 except Exception as e
:
3689 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3690 self
.logger
.debug(logging_text
+ "Exit")
3691 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3692 return nslcmop_operation_state
, detailed_status
3694 async def scale(self
, nsr_id
, nslcmop_id
):
3696 # Try to lock HA task here
3697 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3698 if not task_is_locked_by_me
:
3701 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3702 self
.logger
.debug(logging_text
+ "Enter")
3703 # get all needed from database
3706 db_nslcmop_update
= {}
3707 nslcmop_operation_state
= None
3710 # in case of error, indicates what part of scale was failed to put nsr at error status
3711 scale_process
= None
3712 old_operational_status
= ""
3713 old_config_status
= ""
3716 # wait for any previous tasks in process
3717 step
= "Waiting for previous operations to terminate"
3718 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3720 self
._write
_ns
_status
(
3723 current_operation
="SCALING",
3724 current_operation_id
=nslcmop_id
3727 step
= "Getting nslcmop from database"
3728 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3729 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3730 step
= "Getting nsr from database"
3731 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3733 old_operational_status
= db_nsr
["operational-status"]
3734 old_config_status
= db_nsr
["config-status"]
3735 step
= "Parsing scaling parameters"
3736 # self.logger.debug(step)
3737 db_nsr_update
["operational-status"] = "scaling"
3738 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3739 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3742 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3743 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3744 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3745 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3746 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3749 RO_nsr_id
= nsr_deployed
["RO"]["nsr_id"]
3750 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3751 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3752 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3753 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3755 # for backward compatibility
3756 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3757 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3758 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3759 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3761 step
= "Getting vnfr from database"
3762 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3763 step
= "Getting vnfd from database"
3764 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3766 step
= "Getting scaling-group-descriptor"
3767 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
3768 if scaling_descriptor
["name"] == scaling_group
:
3771 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3772 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3775 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3776 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3777 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3780 # TODO check if ns is in a proper status
3781 step
= "Sending scale order to VIM"
3783 if not db_nsr
["_admin"].get("scaling-group"):
3784 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3785 admin_scale_index
= 0
3787 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3788 if admin_scale_info
["name"] == scaling_group
:
3789 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3791 else: # not found, set index one plus last element and add new entry with the name
3792 admin_scale_index
+= 1
3793 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3794 RO_scaling_info
= []
3795 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3796 if scaling_type
== "SCALE_OUT":
3797 # count if max-instance-count is reached
3798 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
3799 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3800 if nb_scale_op
>= max_instance_count
:
3801 raise LcmException("reached the limit of {} (max-instance-count) "
3802 "scaling-out operations for the "
3803 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3806 vdu_scaling_info
["scaling_direction"] = "OUT"
3807 vdu_scaling_info
["vdu-create"] = {}
3808 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3809 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3810 "type": "create", "count": vdu_scale_info
.get("count", 1)})
3811 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3813 elif scaling_type
== "SCALE_IN":
3814 # count if min-instance-count is reached
3815 min_instance_count
= 0
3816 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3817 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3818 if nb_scale_op
<= min_instance_count
:
3819 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3820 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3822 vdu_scaling_info
["scaling_direction"] = "IN"
3823 vdu_scaling_info
["vdu-delete"] = {}
3824 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3825 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3826 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
3827 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3829 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3830 vdu_create
= vdu_scaling_info
.get("vdu-create")
3831 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3832 if vdu_scaling_info
["scaling_direction"] == "IN":
3833 for vdur
in reversed(db_vnfr
["vdur"]):
3834 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3835 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3836 vdu_scaling_info
["vdu"].append({
3837 "name": vdur
["name"],
3838 "vdu_id": vdur
["vdu-id-ref"],
3841 for interface
in vdur
["interfaces"]:
3842 vdu_scaling_info
["vdu"][-1]["interface"].append({
3843 "name": interface
["name"],
3844 "ip_address": interface
["ip-address"],
3845 "mac_address": interface
.get("mac-address"),
3847 vdu_delete
= vdu_scaling_info
.pop("vdu-delete")
3850 step
= "Executing pre-scale vnf-config-primitive"
3851 if scaling_descriptor
.get("scaling-config-action"):
3852 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3853 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3854 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3855 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3856 step
= db_nslcmop_update
["detailed-status"] = \
3857 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3859 # look for primitive
3860 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3861 if config_primitive
["name"] == vnf_config_primitive
:
3865 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3866 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3867 "primitive".format(scaling_group
, config_primitive
))
3869 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3870 if db_vnfr
.get("additionalParamsForVnf"):
3871 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3873 scale_process
= "VCA"
3874 db_nsr_update
["config-status"] = "configuring pre-scaling"
3875 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3877 # Pre-scale retry check: Check if this sub-operation has been executed before
3878 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3879 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3880 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3881 # Skip sub-operation
3882 result
= 'COMPLETED'
3883 result_detail
= 'Done'
3884 self
.logger
.debug(logging_text
+
3885 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3886 vnf_config_primitive
, result
, result_detail
))
3888 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3889 # New sub-operation: Get index of this sub-operation
3890 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3891 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3892 format(vnf_config_primitive
))
3894 # retry: Get registered params for this existing sub-operation
3895 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3896 vnf_index
= op
.get('member_vnf_index')
3897 vnf_config_primitive
= op
.get('primitive')
3898 primitive_params
= op
.get('primitive_params')
3899 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3900 format(vnf_config_primitive
))
3901 # Execute the primitive, either with new (first-time) or registered (reintent) args
3902 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3903 member_vnf_index
=vnf_index
,
3905 vdu_count_index
=None)
3906 result
, result_detail
= await self
._ns
_execute
_primitive
(
3907 ee_id
, vnf_config_primitive
, primitive_params
, vca_type
)
3908 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3909 vnf_config_primitive
, result
, result_detail
))
3910 # Update operationState = COMPLETED | FAILED
3911 self
._update
_suboperation
_status
(
3912 db_nslcmop
, op_index
, result
, result_detail
)
3914 if result
== "FAILED":
3915 raise LcmException(result_detail
)
3916 db_nsr_update
["config-status"] = old_config_status
3917 scale_process
= None
3921 # Should this block be skipped if 'RO_nsr_id' == None ?
3922 # if (RO_nsr_id and RO_scaling_info):
3924 scale_process
= "RO"
3925 # Scale RO retry check: Check if this sub-operation has been executed before
3926 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3927 db_nslcmop
, vnf_index
, None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
3928 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3929 # Skip sub-operation
3930 result
= 'COMPLETED'
3931 result_detail
= 'Done'
3932 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(
3933 result
, result_detail
))
3935 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3936 # New sub-operation: Get index of this sub-operation
3937 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3938 self
.logger
.debug(logging_text
+ "New sub-operation RO")
3940 # retry: Get registered params for this existing sub-operation
3941 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3942 RO_nsr_id
= op
.get('RO_nsr_id')
3943 RO_scaling_info
= op
.get('RO_scaling_info')
3944 self
.logger
.debug(logging_text
+ "Sub-operation RO retry for primitive {}".format(
3945 vnf_config_primitive
))
3947 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
3948 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3949 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3951 RO_nslcmop_id
= RO_desc
["instance_action_id"]
3952 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
3954 RO_task_done
= False
3955 step
= detailed_status
= "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id
)
3956 detailed_status_old
= None
3957 self
.logger
.debug(logging_text
+ step
)
3959 deployment_timeout
= 1 * 3600 # One hour
3960 while deployment_timeout
> 0:
3961 if not RO_task_done
:
3962 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
3963 extra_item_id
=RO_nslcmop_id
)
3966 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3968 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3969 if ns_status
== "ERROR":
3970 raise ROclient
.ROClientException(ns_status_info
)
3971 elif ns_status
== "BUILD":
3972 detailed_status
= step
+ "; {}".format(ns_status_info
)
3973 elif ns_status
== "ACTIVE":
3975 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
3976 self
.logger
.debug(logging_text
+ step
)
3978 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
3981 if ns_status
== "ERROR":
3982 raise ROclient
.ROClientException(ns_status_info
)
3983 elif ns_status
== "BUILD":
3984 detailed_status
= step
+ "; {}".format(ns_status_info
)
3985 elif ns_status
== "ACTIVE":
3986 step
= detailed_status
= \
3987 "Waiting for management IP address reported by the VIM. Updating VNFRs"
3989 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
3992 desc
= await self
.RO
.show("ns", RO_nsr_id
)
3995 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3997 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
3998 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4000 except LcmExceptionNoMgmtIP
:
4003 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4004 if detailed_status
!= detailed_status_old
:
4005 self
._update
_suboperation
_status
(
4006 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4007 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4008 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4010 await asyncio
.sleep(5, loop
=self
.loop
)
4011 deployment_timeout
-= 5
4012 if deployment_timeout
<= 0:
4013 self
._update
_suboperation
_status
(
4014 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4015 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4017 # update VDU_SCALING_INFO with the obtained ip_addresses
4018 if vdu_scaling_info
["scaling_direction"] == "OUT":
4019 for vdur
in reversed(db_vnfr
["vdur"]):
4020 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4021 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4022 vdu_scaling_info
["vdu"].append({
4023 "name": vdur
["name"],
4024 "vdu_id": vdur
["vdu-id-ref"],
4027 for interface
in vdur
["interfaces"]:
4028 vdu_scaling_info
["vdu"][-1]["interface"].append({
4029 "name": interface
["name"],
4030 "ip_address": interface
["ip-address"],
4031 "mac_address": interface
.get("mac-address"),
4033 del vdu_scaling_info
["vdu-create"]
4035 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4038 scale_process
= None
4040 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4043 # execute primitive service POST-SCALING
4044 step
= "Executing post-scale vnf-config-primitive"
4045 if scaling_descriptor
.get("scaling-config-action"):
4046 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4047 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4048 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4049 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4050 step
= db_nslcmop_update
["detailed-status"] = \
4051 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4053 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4054 if db_vnfr
.get("additionalParamsForVnf"):
4055 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4057 # look for primitive
4058 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4059 if config_primitive
["name"] == vnf_config_primitive
:
4062 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
4063 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
4064 "match any vnf-configuration:config-primitive".format(scaling_group
,
4066 scale_process
= "VCA"
4067 db_nsr_update
["config-status"] = "configuring post-scaling"
4068 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4070 # Post-scale retry check: Check if this sub-operation has been executed before
4071 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4072 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4073 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4074 # Skip sub-operation
4075 result
= 'COMPLETED'
4076 result_detail
= 'Done'
4077 self
.logger
.debug(logging_text
+
4078 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4079 format(vnf_config_primitive
, result
, result_detail
))
4081 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4082 # New sub-operation: Get index of this sub-operation
4083 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4084 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4085 format(vnf_config_primitive
))
4087 # retry: Get registered params for this existing sub-operation
4088 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4089 vnf_index
= op
.get('member_vnf_index')
4090 vnf_config_primitive
= op
.get('primitive')
4091 primitive_params
= op
.get('primitive_params')
4092 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4093 format(vnf_config_primitive
))
4094 # Execute the primitive, either with new (first-time) or registered (reintent) args
4095 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4096 member_vnf_index
=vnf_index
,
4098 vdu_count_index
=None)
4099 result
, result_detail
= await self
._ns
_execute
_primitive
(
4100 ee_id
, vnf_config_primitive
, primitive_params
, vca_type
)
4101 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4102 vnf_config_primitive
, result
, result_detail
))
4103 # Update operationState = COMPLETED | FAILED
4104 self
._update
_suboperation
_status
(
4105 db_nslcmop
, op_index
, result
, result_detail
)
4107 if result
== "FAILED":
4108 raise LcmException(result_detail
)
4109 db_nsr_update
["config-status"] = old_config_status
4110 scale_process
= None
4113 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4114 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4115 else old_operational_status
4116 db_nsr_update
["config-status"] = old_config_status
4118 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
4119 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4121 except asyncio
.CancelledError
:
4122 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4123 exc
= "Operation was cancelled"
4124 except Exception as e
:
4125 exc
= traceback
.format_exc()
4126 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4128 self
._write
_ns
_status
(
4131 current_operation
="IDLE",
4132 current_operation_id
=None
4135 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4136 nslcmop_operation_state
= "FAILED"
4138 db_nsr_update
["operational-status"] = old_operational_status
4139 db_nsr_update
["config-status"] = old_config_status
4140 db_nsr_update
["detailed-status"] = ""
4142 if "VCA" in scale_process
:
4143 db_nsr_update
["config-status"] = "failed"
4144 if "RO" in scale_process
:
4145 db_nsr_update
["operational-status"] = "failed"
4146 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4149 error_description_nslcmop
= None
4150 nslcmop_operation_state
= "COMPLETED"
4151 db_nslcmop_update
["detailed-status"] = "Done"
4153 self
._write
_op
_status
(
4156 error_message
=error_description_nslcmop
,
4157 operation_state
=nslcmop_operation_state
,
4158 other_update
=db_nslcmop_update
,
4161 self
._write
_ns
_status
(
4164 current_operation
="IDLE",
4165 current_operation_id
=None,
4166 other_update
=db_nsr_update
4169 if nslcmop_operation_state
:
4171 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
4172 "operationState": nslcmop_operation_state
},
4175 # await asyncio.sleep(cooldown_time, loop=self.loop)
4176 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4177 except Exception as e
:
4178 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4179 self
.logger
.debug(logging_text
+ "Exit")
4180 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")