Coverage for osm_pla/server/server.py: 80%

129 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-22 10:12 +0000

1#!/usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4# Copyright 2020 ArctosLabs Scandinavia AB 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18 

19import asyncio 

20import logging 

21import itertools 

22from pathlib import Path 

23 

24import yaml 

25from osm_common import dbmemory, dbmongo, msglocal, msgkafka 

26 

27from osm_pla.config.config import Config 

28from osm_pla.placement.mznplacement import MznPlacementConductor 

29from osm_pla.placement.mznplacement import NsPlacementDataFactory 

30 

31 

32class Server: 

33 pil_price_list_file = Path("/placement/pil_price_list.yaml") 

34 vnf_price_list_file = Path("/placement/vnf_price_list.yaml") 

35 

36 def __init__(self, config: Config): 

37 self.log = logging.getLogger("pla.server") 

38 self.db = None 

39 self.msgBus = None 

40 self.config = config 

41 

42 try: 

43 if config.get("database", "driver") == "mongo": 

44 self.db = dbmongo.DbMongo() 

45 self.db.db_connect(config.get("database")) 

46 elif config.get("database", "driver") == "memory": 

47 self.db = dbmemory.DbMemory() 

48 self.db.db_connect(config.get("database")) 

49 else: 

50 raise Exception( 

51 "Invalid configuration param '{}' at '[database]':'driver'".format( 

52 config.get("database", "driver") 

53 ) 

54 ) 

55 

56 if config.get("message", "driver") == "local": 

57 self.msgBus = msglocal.MsgLocal() 

58 elif config.get("message", "driver") == "kafka": 

59 self.msgBus = msgkafka.MsgKafka() 

60 else: 

61 raise Exception( 

62 "Invalid message bus driver {}".format( 

63 config.get("message", "driver") 

64 ) 

65 ) 

66 self.msgBus.connect(config.get("message")) 

67 

68 except Exception as e: 

69 self.log.exception("kafka setup error. Exception: {}".format(e)) 

70 

71 def _get_nslcmop(self, nsdlcmop_id): 

72 """ 

73 :param nsdlcmop_id: 

74 :return: nslcmop from database corresponding to nslcmop_id 

75 """ 

76 db_filter = {"_id": nsdlcmop_id} 

77 nslcmop = self.db.get_one("nslcmops", db_filter) 

78 return nslcmop 

79 

80 def _get_projects(self): 

81 """ 

82 :return: project name to project id mapping 

83 """ 

84 projects = self.db.get_list("projects") 

85 return {project["_id"]: project["name"] for project in projects} 

86 

87 def _get_nsd(self, nsd_id): 

88 """ 

89 :param nsd_id: 

90 :return: nsd from database corresponding to nsd_id 

91 """ 

92 db_filter = {"_id": nsd_id} 

93 return self.db.get_one("nsds", db_filter) 

94 

95 def _get_vim_accounts(self, vim_account_ids): 

96 """ 

97 :param vim_account_ids: list of VIM account ids 

98 :return: list of vim account entries from database corresponding to list in vim_accounts_id 

99 """ 

100 db_filter = {"_id": vim_account_ids} 

101 return self.db.get_list("vim_accounts", db_filter) 

102 

103 def _read_vnf_price_list(self, price_list_file_path): 

104 """ 

105 read vnf price list configuration file 

106 :param price_list_file_path: 

107 :return: 

108 """ 

109 with open(str(price_list_file_path)) as pl_fd: 

110 price_list = yaml.safe_load_all(pl_fd) 

111 return next(price_list) 

112 

113 def _price_list_with_project(self, price_list): 

114 """ 

115 Figure out if this price list is with project or not. 

116 Note: to handle the unlikely event that a project is called 'prices' we do not simply check if 'prices' 

117 is in the dict keys for a price list sequence but rather go down one step in the nesting 

118 in which we either have 

119 1) 'prices:{vim_url:...}' if prices are also per project, or 

120 2) '{vim_url:...}' if prices are only per vim 

121 

122 :param price_list: 

123 :return: True if project part of price list, else False 

124 """ 

125 price_list_entry_keys = set(price_list[0].keys()) 

126 price_list_entry_keys.remove("vnfd") 

127 pl_key = price_list_entry_keys.pop() 

128 entry_to_check = price_list[0][pl_key][0].keys() 

129 return True if "prices" in entry_to_check else False 

130 

131 def _get_vnf_price_list(self, price_list_file_path, project_name=None): 

132 """ 

133 read vnf price list configuration file, determine its type and reformat content accordingly 

134 

135 :param price_list_file_path: 

136 :param project_name: 

137 :return: dictionary formatted as {'<vnfd>': {'<vim-url>':'<price>'}} 

138 """ 

139 price_list_data = self._read_vnf_price_list(price_list_file_path) 

140 if self._price_list_with_project(price_list_data): 

141 res = {} 

142 for i in price_list_data: 

143 price_data = ( 

144 i[project_name] 

145 if type(i[project_name]) is dict 

146 else i[project_name][0] 

147 ) 

148 res_component = { 

149 i["vim_name"]: i["price"] for i in price_data["prices"] 

150 } 

151 res.update({i["vnfd"]: res_component}) 

152 return res 

153 else: 

154 return { 

155 i["vnfd"]: {i1["vim_name"]: i1["price"] for i1 in i["prices"]} 

156 for i in price_list_data 

157 } 

158 

159 def _get_pil_info(self, pil_info_file_path): 

160 """ 

161 read and return pil information from file 

162 :param pil_info_file_path: Path to pil_info file 

163 :return pil configuration file content as Python object 

164 """ 

165 with open(str(pil_info_file_path)) as pil_fd: 

166 data = yaml.safe_load_all(pil_fd) 

167 return next(data) 

168 

169 def _create_vnf_id_maps(self, nsd): 

170 """ 

171 map identifier for 'member-vnf-index' in nsd to syntax that is safe for mzn 

172 

173 return tuples with mappings {<adjusted id>: <original id>} and {<original id>: <adjusted id>} 

174 """ 

175 # TODO: Change for multiple DF support 

176 ns_df = nsd.get("df", [{}])[0] 

177 next_idx = itertools.count() 

178 member_vnf_index2mzn = { 

179 e["id"]: "VNF" + str(next(next_idx)) for e in ns_df.get("vnf-profile", []) 

180 } 

181 

182 # reverse the name map dictionary, used when the placement result is remapped 

183 mzn_name2member_vnf_index = {v: k for k, v in member_vnf_index2mzn.items()} 

184 

185 return member_vnf_index2mzn, mzn_name2member_vnf_index 

186 

187 async def get_placement(self, nslcmop_id): 

188 """ 

189 - Collects and prepares placement information. 

190 - Request placement computation. 

191 - Formats and distribute placement result 

192 

193 Note: exceptions result in empty response message 

194 

195 :param nslcmop_id: 

196 :return: 

197 """ 

198 try: 

199 nslcmop = self._get_nslcmop(nslcmop_id) 

200 nsd = self._get_nsd(nslcmop["operationParams"]["nsdId"]) 

201 member_vnf_index2mzn, mzn2member_vnf_index = self._create_vnf_id_maps(nsd) 

202 # adjust vnf identifiers 

203 # TODO: Change for multiple DF support 

204 ns_df = nsd.get("df", [{}])[0] 

205 for vnf_profile in ns_df.get("vnf-profile", []): 

206 vnf_profile["id"] = member_vnf_index2mzn[vnf_profile["id"]] 

207 for vlc in vnf_profile.get("virtual-link-connectivity", []): 

208 for ccpd in vlc.get("constituent-cpd-id", []): 

209 ccpd["constituent-base-element-id"] = member_vnf_index2mzn[ 

210 ccpd["constituent-base-element-id"] 

211 ] 

212 self.log.info("adjusted nsd: {}".format(nsd)) 

213 projects = self._get_projects() 

214 self.log.info("projects: {}".format(projects)) 

215 nslcmop_project = nslcmop["_admin"]["projects_read"][0] 

216 self.log.info("nslcmop_project: {}".format(nslcmop_project)) 

217 valid_vim_accounts = nslcmop["operationParams"]["validVimAccounts"] 

218 vim_accounts_data = self._get_vim_accounts(valid_vim_accounts) 

219 vims_information = {_["name"]: _["_id"] for _ in vim_accounts_data} 

220 price_list = self._get_vnf_price_list( 

221 Server.vnf_price_list_file, projects[nslcmop_project] 

222 ) 

223 pil_info = self._get_pil_info(Server.pil_price_list_file) 

224 pinnings = nslcmop["operationParams"].get("vnf", []) 

225 # remap member-vnf-index values according to id map 

226 for pinning in pinnings: 

227 pinning["member-vnf-index"] = member_vnf_index2mzn[ 

228 pinning["member-vnf-index"] 

229 ] 

230 self.log.info("pinnings: {}".format(pinnings)) 

231 order_constraints = nslcmop["operationParams"].get("placement-constraints") 

232 self.log.info("order constraints: {}".format(order_constraints)) 

233 

234 nspd = NsPlacementDataFactory( 

235 vims_information, price_list, nsd, pil_info, pinnings, order_constraints 

236 ).create_ns_placement_data() 

237 

238 vnf_placement = MznPlacementConductor(self.log).do_placement_computation( 

239 nspd 

240 ) 

241 

242 except Exception as e: 

243 # Note: there is no cure for failure so we have a catch-all clause here 

244 self.log.exception("PLA fault. Exception: {}".format(e)) 

245 vnf_placement = [] 

246 finally: 

247 # remap names in vnf_placement 

248 for e in vnf_placement: 

249 e["member-vnf-index"] = mzn2member_vnf_index[e["member-vnf-index"]] 

250 await self.msgBus.aiowrite( 

251 "pla", 

252 "placement", 

253 {"placement": {"vnf": vnf_placement, "nslcmopId": nslcmop_id}}, 

254 ) 

255 

256 def handle_kafka_command(self, topic, command, params): 

257 self.log.info("Kafka msg arrived: {} {} {}".format(topic, command, params)) 

258 if topic == "pla" and command == "get_placement": 

259 nslcmop_id = params.get("nslcmopId") 

260 asyncio.create_task(self.get_placement(nslcmop_id)) 

261 

262 async def kafka_read(self): 

263 self.log.info("Task kafka_read start") 

264 while True: 

265 try: 

266 topics = "pla" 

267 await self.msgBus.aioread(topics, self.handle_kafka_command) 

268 except Exception as e: 

269 self.log.error("kafka read error. Exception: {}".format(e)) 

270 await asyncio.sleep(5) 

271 

272 def run(self): 

273 asyncio.run(self.kafka_read()) 

274 if self.msgBus: 

275 self.msgBus.disconnect()