Coverage for osm_nbi/engine.py: 27%

158 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import logging 

17 

18# import yaml 

19from osm_common import ( 

20 dbmongo, 

21 dbmemory, 

22 fslocal, 

23 fsmongo, 

24 msglocal, 

25 msgkafka, 

26 version as common_version, 

27) 

28from osm_common.dbbase import DbException 

29from osm_common.fsbase import FsException 

30from osm_common.msgbase import MsgException 

31from http import HTTPStatus 

32 

33from osm_nbi.authconn_keystone import AuthconnKeystone 

34from osm_nbi.authconn_internal import AuthconnInternal 

35from osm_nbi.authconn_tacacs import AuthconnTacacs 

36from osm_nbi.base_topic import EngineException, versiontuple 

37from osm_nbi.admin_topics import VimAccountTopic, WimAccountTopic, SdnTopic 

38from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic 

39from osm_nbi.admin_topics import VcaTopic 

40from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth 

41from osm_nbi.descriptor_topics import ( 

42 VnfdTopic, 

43 NsdTopic, 

44 PduTopic, 

45 NstTopic, 

46 VnfPkgOpTopic, 

47 NsConfigTemplateTopic, 

48) 

49from osm_nbi.instance_topics import ( 

50 NsrTopic, 

51 VnfrTopic, 

52 NsLcmOpTopic, 

53 NsiTopic, 

54 NsiLcmOpTopic, 

55) 

56from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic 

57from osm_nbi.pmjobs_topics import PmJobsTopic 

58from osm_nbi.subscription_topics import NslcmSubscriptionsTopic 

59from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic 

60from base64 import b64encode 

61from os import urandom # , path 

62from threading import Lock 

63 

64__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" 

65min_common_version = "0.1.16" 

66 

67 

68class Engine(object): 

69 map_from_topic_to_class = { 

70 "vnfds": VnfdTopic, 

71 "nsds": NsdTopic, 

72 "nsts": NstTopic, 

73 "pdus": PduTopic, 

74 "nsrs": NsrTopic, 

75 "vnfrs": VnfrTopic, 

76 "nslcmops": NsLcmOpTopic, 

77 "vim_accounts": VimAccountTopic, 

78 "wim_accounts": WimAccountTopic, 

79 "sdns": SdnTopic, 

80 "k8sclusters": K8sClusterTopic, 

81 "vca": VcaTopic, 

82 "k8srepos": K8sRepoTopic, 

83 "osmrepos": OsmRepoTopic, 

84 "users": UserTopicAuth, # Valid for both internal and keystone authentication backends 

85 "projects": ProjectTopicAuth, # Valid for both internal and keystone authentication backends 

86 "roles": RoleTopicAuth, # Valid for both internal and keystone authentication backends 

87 "nsis": NsiTopic, 

88 "nsilcmops": NsiLcmOpTopic, 

89 "vnfpkgops": VnfPkgOpTopic, 

90 "nslcm_subscriptions": NslcmSubscriptionsTopic, 

91 "vnf_instances": VnfInstances, 

92 "vnflcmops": VnfLcmOpTopic, 

93 "vnflcm_subscriptions": VnflcmSubscriptionsTopic, 

94 "nsconfigtemps": NsConfigTemplateTopic, 

95 # [NEW_TOPIC]: add an entry here 

96 # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters 

97 } 

98 

99 map_target_version_to_int = { 

100 "1.0": 1000, 

101 "1.1": 1001, 

102 "1.2": 1002, 

103 # Add new versions here 

104 } 

105 

106 def __init__(self, authenticator): 

107 self.db = None 

108 self.fs = None 

109 self.msg = None 

110 self.authconn = None 

111 self.config = None 

112 # self.operations = None 

113 self.logger = logging.getLogger("nbi.engine") 

114 self.map_topic = {} 

115 self.write_lock = None 

116 # self.token_cache = token_cache 

117 self.authenticator = authenticator 

118 

119 def start(self, config): 

120 """ 

121 Connect to database, filesystem storage, and messaging 

122 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', 

123 :return: None 

124 """ 

125 self.config = config 

126 # check right version of common 

127 if versiontuple(common_version) < versiontuple(min_common_version): 

128 raise EngineException( 

129 "Not compatible osm/common version '{}'. Needed '{}' or higher".format( 

130 common_version, min_common_version 

131 ) 

132 ) 

133 

134 try: 

135 if not self.db: 

136 if config["database"]["driver"] == "mongo": 

137 self.db = dbmongo.DbMongo() 

138 self.db.db_connect(config["database"]) 

139 elif config["database"]["driver"] == "memory": 

140 self.db = dbmemory.DbMemory() 

141 self.db.db_connect(config["database"]) 

142 else: 

143 raise EngineException( 

144 "Invalid configuration param '{}' at '[database]':'driver'".format( 

145 config["database"]["driver"] 

146 ) 

147 ) 

148 if not self.fs: 

149 if config["storage"]["driver"] == "local": 

150 self.fs = fslocal.FsLocal() 

151 self.fs.fs_connect(config["storage"]) 

152 elif config["storage"]["driver"] == "mongo": 

153 self.fs = fsmongo.FsMongo() 

154 self.fs.fs_connect(config["storage"]) 

155 else: 

156 raise EngineException( 

157 "Invalid configuration param '{}' at '[storage]':'driver'".format( 

158 config["storage"]["driver"] 

159 ) 

160 ) 

161 if not self.msg: 

162 if config["message"]["driver"] == "local": 

163 self.msg = msglocal.MsgLocal() 

164 self.msg.connect(config["message"]) 

165 elif config["message"]["driver"] == "kafka": 

166 self.msg = msgkafka.MsgKafka() 

167 self.msg.connect(config["message"]) 

168 else: 

169 raise EngineException( 

170 "Invalid configuration param '{}' at '[message]':'driver'".format( 

171 config["message"]["driver"] 

172 ) 

173 ) 

174 if not self.authconn: 

175 if config["authentication"]["backend"] == "keystone": 

176 self.authconn = AuthconnKeystone( 

177 config["authentication"], 

178 self.db, 

179 self.authenticator.role_permissions, 

180 ) 

181 elif config["authentication"]["backend"] == "tacacs": 

182 self.authconn = AuthconnTacacs( 

183 config["authentication"], 

184 self.db, 

185 self.authenticator.role_permissions, 

186 ) 

187 else: 

188 self.authconn = AuthconnInternal( 

189 config["authentication"], 

190 self.db, 

191 self.authenticator.role_permissions, 

192 ) 

193 # if not self.operations: 

194 # if "resources_to_operations" in config["rbac"]: 

195 # resources_to_operations_file = config["rbac"]["resources_to_operations"] 

196 # else: 

197 # possible_paths = ( 

198 # __file__[:__file__.rfind("engine.py")] + "resources_to_operations.yml", 

199 # "./resources_to_operations.yml" 

200 # ) 

201 # for config_file in possible_paths: 

202 # if path.isfile(config_file): 

203 # resources_to_operations_file = config_file 

204 # break 

205 # if not resources_to_operations_file: 

206 # raise EngineException("Invalid permission configuration:" 

207 # "resources_to_operations file missing") 

208 # 

209 # with open(resources_to_operations_file, 'r') as f: 

210 # resources_to_operations = yaml.safeload(f) 

211 # 

212 # self.operations = [] 

213 # 

214 # for _, value in resources_to_operations["resources_to_operations"].items(): 

215 # if value not in self.operations: 

216 # self.operations += [value] 

217 

218 self.write_lock = Lock() 

219 # create one class per topic 

220 for topic, topic_class in self.map_from_topic_to_class.items(): 

221 # if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth): 

222 # self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) 

223 self.map_topic[topic] = topic_class( 

224 self.db, self.fs, self.msg, self.authconn 

225 ) 

226 

227 self.map_topic["pm_jobs"] = PmJobsTopic( 

228 self.db, 

229 config["prometheus"].get("host"), 

230 config["prometheus"].get("port"), 

231 ) 

232 except (DbException, FsException, MsgException) as e: 

233 raise EngineException(str(e), http_code=e.http_code) 

234 

235 def stop(self): 

236 try: 

237 if self.db: 

238 self.db.db_disconnect() 

239 if self.fs: 

240 self.fs.fs_disconnect() 

241 if self.msg: 

242 self.msg.disconnect() 

243 self.write_lock = None 

244 except (DbException, FsException, MsgException) as e: 

245 raise EngineException(str(e), http_code=e.http_code) 

246 

247 def new_item( 

248 self, rollback, session, topic, indata=None, kwargs=None, headers=None 

249 ): 

250 """ 

251 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry, 

252 that must be completed with a call to method upload_content 

253 :param rollback: list to append created items at database in case a rollback must to be done 

254 :param session: contains the used login username and working project, force to avoid checkins, public 

255 :param topic: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds 

256 :param indata: data to be inserted 

257 :param kwargs: used to override the indata descriptor 

258 :param headers: http request headers 

259 :return: _id: identity of the inserted data. 

260 """ 

261 if topic not in self.map_topic: 

262 raise EngineException( 

263 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

264 ) 

265 with self.write_lock: 

266 return self.map_topic[topic].new(rollback, session, indata, kwargs, headers) 

267 

268 def upload_content(self, session, topic, _id, indata, kwargs, headers): 

269 """ 

270 Upload content for an already created entry (_id) 

271 :param session: contains the used login username and working project 

272 :param topic: it can be: users, projects, vnfds, nsds, 

273 :param _id: server id of the item 

274 :param indata: data to be inserted 

275 :param kwargs: used to override the indata descriptor 

276 :param headers: http request headers 

277 :return: _id: identity of the inserted data. 

278 """ 

279 if topic not in self.map_topic: 

280 raise EngineException( 

281 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

282 ) 

283 with self.write_lock: 

284 return self.map_topic[topic].upload_content( 

285 session, _id, indata, kwargs, headers 

286 ) 

287 

288 def get_item_list(self, session, topic, filter_q=None, api_req=False): 

289 """ 

290 Get a list of items 

291 :param session: contains the used login username and working project 

292 :param topic: it can be: users, projects, vnfds, nsds, ... 

293 :param filter_q: filter of data to be applied 

294 :param api_req: True if this call is serving an external API request. False if serving internal request. 

295 :return: The list, it can be empty if no one match the filter_q. 

296 """ 

297 if topic not in self.map_topic: 

298 raise EngineException( 

299 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

300 ) 

301 return self.map_topic[topic].list(session, filter_q, api_req) 

302 

303 def get_item(self, session, topic, _id, filter_q=None, api_req=False): 

304 """ 

305 Get complete information on an item 

306 :param session: contains the used login username and working project 

307 :param topic: it can be: users, projects, vnfds, nsds, 

308 :param _id: server id of the item 

309 :param filter_q: other arguments 

310 :param api_req: True if this call is serving an external API request. False if serving internal request. 

311 :return: dictionary, raise exception if not found. 

312 """ 

313 if topic not in self.map_topic: 

314 raise EngineException( 

315 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

316 ) 

317 return self.map_topic[topic].show(session, _id, filter_q, api_req) 

318 

319 def get_file(self, session, topic, _id, path=None, accept_header=None): 

320 """ 

321 Get descriptor package or artifact file content 

322 :param session: contains the used login username and working project 

323 :param topic: it can be: users, projects, vnfds, nsds, 

324 :param _id: server id of the item 

325 :param path: artifact path or "$DESCRIPTOR" or None 

326 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain 

327 :return: opened file plus Accept format or raises an exception 

328 """ 

329 if topic not in self.map_topic: 

330 raise EngineException( 

331 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

332 ) 

333 return self.map_topic[topic].get_file(session, _id, path, accept_header) 

334 

335 def del_item_list(self, session, topic, _filter=None): 

336 """ 

337 Delete a list of items 

338 :param session: contains the used login username and working project 

339 :param topic: it can be: users, projects, vnfds, nsds, ... 

340 :param _filter: filter of data to be applied 

341 :return: The deleted list, it can be empty if no one match the _filter. 

342 """ 

343 if topic not in self.map_topic: 

344 raise EngineException( 

345 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

346 ) 

347 with self.write_lock: 

348 return self.map_topic[topic].delete_list(session, _filter) 

349 

350 def del_item(self, session, topic, _id, not_send_msg=None): 

351 """ 

352 Delete item by its internal id 

353 :param session: contains the used login username and working project 

354 :param topic: it can be: users, projects, vnfds, nsds, ... 

355 :param _id: server id of the item 

356 :param not_send_msg: If False, message will not be sent to kafka. 

357 If a list, message is not sent, but content is stored in this variable so that the caller can send this 

358 message using its own loop. If None, message is sent 

359 :return: dictionary with deleted item _id. It raises exception if not found. 

360 """ 

361 if topic not in self.map_topic: 

362 raise EngineException( 

363 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

364 ) 

365 with self.write_lock: 

366 return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg) 

367 

368 def edit_item(self, session, topic, _id, indata=None, kwargs=None): 

369 """ 

370 Update an existing entry at database 

371 :param session: contains the used login username and working project 

372 :param topic: it can be: users, projects, vnfds, nsds, ... 

373 :param _id: identifier to be updated 

374 :param indata: data to be inserted 

375 :param kwargs: used to override the indata descriptor 

376 :return: dictionary with edited item _id, raise exception if not found. 

377 """ 

378 if topic not in self.map_topic: 

379 raise EngineException( 

380 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

381 ) 

382 with self.write_lock: 

383 return self.map_topic[topic].edit(session, _id, indata, kwargs) 

384 

385 def cancel_item( 

386 self, rollback, session, topic, indata=None, kwargs=None, headers=None 

387 ): 

388 """ 

389 Cancels an item 

390 :param rollback: list to append created items at database in case a rollback must to be done 

391 :param session: contains the used login username and working project, force to avoid checkins, public 

392 :param topic: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds 

393 :param indata: data to be inserted 

394 :param kwargs: used to override the indata descriptor 

395 :param headers: http request headers 

396 :return: _id: identity of the inserted data. 

397 """ 

398 if topic not in self.map_topic: 

399 raise EngineException( 

400 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

401 ) 

402 with self.write_lock: 

403 self.map_topic[topic].cancel(rollback, session, indata, kwargs, headers) 

404 

405 def upgrade_db(self, current_version, target_version): 

406 if target_version not in self.map_target_version_to_int.keys(): 

407 raise EngineException( 

408 "Cannot upgrade to version '{}' with this version of code".format( 

409 target_version 

410 ), 

411 http_code=HTTPStatus.INTERNAL_SERVER_ERROR, 

412 ) 

413 

414 if current_version == target_version: 

415 return 

416 

417 target_version_int = self.map_target_version_to_int[target_version] 

418 

419 if not current_version: 

420 # create database version 

421 serial = urandom(32) 

422 version_data = { 

423 "_id": "version", # Always "version" 

424 "version_int": 1000, # version number 

425 "version": "1.0", # version text 

426 "date": "2018-10-25", # version date 

427 "description": "added serial", # changes in this version 

428 "status": "ENABLED", # ENABLED, DISABLED (migration in process), ERROR, 

429 "serial": b64encode(serial), 

430 } 

431 self.db.create("admin", version_data) 

432 self.db.set_secret_key(serial) 

433 current_version = "1.0" 

434 

435 if ( 

436 current_version in ("1.0", "1.1") 

437 and target_version_int >= self.map_target_version_to_int["1.2"] 

438 ): 

439 if self.config["authentication"]["backend"] == "internal": 

440 self.db.del_list("roles") 

441 

442 version_data = { 

443 "_id": "version", 

444 "version_int": 1002, 

445 "version": "1.2", 

446 "date": "2019-06-11", 

447 "description": "set new format for roles_operations", 

448 } 

449 

450 self.db.set_one("admin", {"_id": "version"}, version_data) 

451 current_version = "1.2" 

452 # TODO add future migrations here 

453 

454 def init_db(self, target_version="1.0"): 

455 """ 

456 Init database if empty. If not empty it checks that database version and migrates if needed 

457 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' 

458 :param target_version: check desired database version. Migrate to it if possible or raises exception 

459 :return: None if ok, exception if error or if the version is different. 

460 """ 

461 

462 version_data = self.db.get_one( 

463 "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True 

464 ) 

465 # check database status is ok 

466 if version_data and version_data.get("status") != "ENABLED": 

467 raise EngineException( 

468 "Wrong database status '{}'".format(version_data["status"]), 

469 HTTPStatus.INTERNAL_SERVER_ERROR, 

470 ) 

471 

472 # check version 

473 db_version = None if not version_data else version_data.get("version") 

474 if db_version != target_version: 

475 self.upgrade_db(db_version, target_version) 

476 

477 return