Revert "Remove unused methods"
[osm/LCM.git] / osm_lcm / data_utils / vnfd.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2020 Whitestack, LLC
4 # *************************************************************
5 #
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
23 ##
24
25 from osm_lcm.data_utils import list_utils
26
27
28 def get_lcm_operations_configuration(vnfd):
29 return vnfd.get("df", ())[0].get("lcm-operations-configuration", ())
30
31
32 def get_vdu_list(vnfd):
33 return vnfd.get("vdu", ())
34
35
36 def get_kdu_list(vnfd):
37 return vnfd.get("kdu", ())
38
39
40 def get_kdu(vnfd, kdu_name):
41 return list_utils.find_in_list(
42 get_kdu_list(vnfd), lambda kdu: kdu["name"] == kdu_name
43 )
44
45
46 def get_kdu_services(kdu):
47 return kdu.get("service", [])
48
49
50 def get_ee_sorted_initial_config_primitive_list(
51 primitive_list, vca_deployed, ee_descriptor_id
52 ):
53 """
54 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
55 primitives as verify-ssh-credentials, or config when needed
56 :param primitive_list: information of the descriptor
57 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
58 this element contains a ssh public key
59 :param ee_descriptor_id: execution environment descriptor id. It is the value of
60 XXX_configuration.execution-environment-list.INDEX.id; it can be None
61 :return: The modified list. Can ba an empty list, but always a list
62 """
63 primitive_list = primitive_list or []
64 primitive_list = [
65 p
66 for p in primitive_list
67 if p.get("execution-environment-ref", ee_descriptor_id) == ee_descriptor_id
68 ]
69 if primitive_list:
70 primitive_list.sort(key=lambda val: int(val["seq"]))
71
72 # look for primitive config, and get the position. None if not present
73 config_position = None
74 for index, primitive in enumerate(primitive_list):
75 if primitive["name"] == "config":
76 config_position = index
77 break
78
79 # for NS, add always a config primitive if not present (bug 874)
80 if not vca_deployed["member-vnf-index"] and config_position is None:
81 primitive_list.insert(0, {"name": "config", "parameter": []})
82 config_position = 0
83 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
84 if (
85 vca_deployed["member-vnf-index"]
86 and config_position is not None
87 and vca_deployed.get("ssh-public-key")
88 ):
89 primitive_list.insert(
90 config_position + 1, {"name": "verify-ssh-credentials", "parameter": []}
91 )
92 return primitive_list
93
94
95 def get_ee_sorted_terminate_config_primitive_list(primitive_list, ee_descriptor_id):
96 primitive_list = primitive_list or []
97 primitive_list = [
98 p
99 for p in primitive_list
100 if p.get("execution-environment-ref", ee_descriptor_id) == ee_descriptor_id
101 ]
102 if primitive_list:
103 primitive_list.sort(key=lambda val: int(val["seq"]))
104 return primitive_list
105
106
107 def get_vdu_profile(vnfd, vdu_profile_id):
108 return list_utils.find_in_list(
109 vnfd.get("df", ())[0]["vdu-profile"],
110 lambda vdu_profile: vdu_profile["id"] == vdu_profile_id,
111 )
112
113
114 def get_kdu_resource_profile(vnfd, kdu_profile_id):
115 return list_utils.find_in_list(
116 vnfd.get("df", ())[0]["kdu-resource-profile"],
117 lambda kdu_profile: kdu_profile["id"] == kdu_profile_id,
118 )
119
120
121 def get_configuration(vnfd, entity_id):
122 lcm_ops_config = vnfd.get("df")[0].get("lcm-operations-configuration")
123 if not lcm_ops_config:
124 return None
125 ops_vnf = lcm_ops_config.get("operate-vnf-op-config")
126 if not ops_vnf:
127 return None
128 day12ops = ops_vnf.get("day1-2", [])
129 return list_utils.find_in_list(
130 day12ops, lambda configuration: configuration["id"] == entity_id
131 )
132
133
134 def get_relation_list(vnfd, entity_id):
135 return (get_configuration(vnfd, entity_id) or {}).get("relation", [])
136
137
138 def get_virtual_link_profiles(vnfd):
139 return vnfd.get("df")[0].get("virtual-link-profile", ())
140
141
142 def get_vdu(vnfd, vdu_id):
143 return list_utils.find_in_list(vnfd.get("vdu", ()), lambda vdu: vdu["id"] == vdu_id)
144
145
146 def get_vdu_index(vnfd, vdu_id):
147 target_vdu = list_utils.find_in_list(
148 vnfd.get("vdu", ()), lambda vdu: vdu["id"] == vdu_id
149 )
150 if target_vdu:
151 return vnfd.get("vdu", ()).index(target_vdu)
152 else:
153 return -1
154
155
156 def get_scaling_aspect(vnfd):
157 return vnfd.get("df", ())[0].get("scaling-aspect", ())
158
159
160 def get_number_of_instances(vnfd, vdu_id):
161 return list_utils.find_in_list(
162 vnfd.get("df", ())[0].get("instantiation-level", ())[0].get("vdu-level", ()),
163 lambda a_vdu: a_vdu["vdu-id"] == vdu_id,
164 ).get("number-of-instances", 1)
165
166
167 def get_juju_ee_ref(vnfd, entity_id):
168 return list_utils.find_in_list(
169 get_configuration(vnfd, entity_id).get("execution-environment-list", []),
170 lambda ee: "juju" in ee,
171 )
172
173
174 def find_software_version(vnfd: dict) -> str:
175 """Find the sotware version in the VNFD descriptors
176
177 Args:
178 vnfd (dict): Descriptor as a dictionary
179
180 Returns:
181 software-version (str)
182 """
183
184 default_sw_version = "1.0"
185
186 if vnfd.get("vnfd"):
187 vnfd = vnfd["vnfd"]
188
189 if vnfd.get("software-version"):
190 return vnfd["software-version"]
191
192 else:
193 return default_sw_version
194
195
196 def check_helm_ee_in_ns(db_vnfds: list) -> bool:
197 for vnfd in db_vnfds:
198 descriptor_config = get_configuration(vnfd, vnfd["id"])
199 if not (
200 descriptor_config and "execution-environment-list" in descriptor_config
201 ):
202 continue
203 ee_list = descriptor_config.get("execution-environment-list", [])
204 if list_utils.find_in_list(ee_list, lambda ee_item: "helm-chart" in ee_item):
205 return True