Coverage for osm_nbi/engine.py: 27%

158 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-30 10:14 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import logging 

17 

18# import yaml 

19from osm_common import ( 

20 dbmongo, 

21 dbmemory, 

22 fslocal, 

23 fsmongo, 

24 msglocal, 

25 msgkafka, 

26 version as common_version, 

27) 

28from osm_common.dbbase import DbException 

29from osm_common.fsbase import FsException 

30from osm_common.msgbase import MsgException 

31from http import HTTPStatus 

32 

33from osm_nbi.authconn_keystone import AuthconnKeystone 

34from osm_nbi.authconn_internal import AuthconnInternal 

35from osm_nbi.authconn_tacacs import AuthconnTacacs 

36from osm_nbi.base_topic import EngineException, versiontuple 

37from osm_nbi.admin_topics import VimAccountTopic, WimAccountTopic, SdnTopic 

38from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic 

39from osm_nbi.admin_topics import VcaTopic 

40from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth 

41from osm_nbi.descriptor_topics import ( 

42 VnfdTopic, 

43 NsdTopic, 

44 PduTopic, 

45 NstTopic, 

46 VnfPkgOpTopic, 

47) 

48from osm_nbi.instance_topics import ( 

49 NsrTopic, 

50 VnfrTopic, 

51 NsLcmOpTopic, 

52 NsiTopic, 

53 NsiLcmOpTopic, 

54) 

55from osm_nbi.vnf_instance_topics import VnfInstances, VnfLcmOpTopic 

56from osm_nbi.pmjobs_topics import PmJobsTopic 

57from osm_nbi.subscription_topics import NslcmSubscriptionsTopic 

58from osm_nbi.osm_vnfm.vnf_subscription import VnflcmSubscriptionsTopic 

59from base64 import b64encode 

60from os import urandom # , path 

61from threading import Lock 

62 

63__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" 

64min_common_version = "0.1.16" 

65 

66 

67class Engine(object): 

68 map_from_topic_to_class = { 

69 "vnfds": VnfdTopic, 

70 "nsds": NsdTopic, 

71 "nsts": NstTopic, 

72 "pdus": PduTopic, 

73 "nsrs": NsrTopic, 

74 "vnfrs": VnfrTopic, 

75 "nslcmops": NsLcmOpTopic, 

76 "vim_accounts": VimAccountTopic, 

77 "wim_accounts": WimAccountTopic, 

78 "sdns": SdnTopic, 

79 "k8sclusters": K8sClusterTopic, 

80 "vca": VcaTopic, 

81 "k8srepos": K8sRepoTopic, 

82 "osmrepos": OsmRepoTopic, 

83 "users": UserTopicAuth, # Valid for both internal and keystone authentication backends 

84 "projects": ProjectTopicAuth, # Valid for both internal and keystone authentication backends 

85 "roles": RoleTopicAuth, # Valid for both internal and keystone authentication backends 

86 "nsis": NsiTopic, 

87 "nsilcmops": NsiLcmOpTopic, 

88 "vnfpkgops": VnfPkgOpTopic, 

89 "nslcm_subscriptions": NslcmSubscriptionsTopic, 

90 "vnf_instances": VnfInstances, 

91 "vnflcmops": VnfLcmOpTopic, 

92 "vnflcm_subscriptions": VnflcmSubscriptionsTopic, 

93 # [NEW_TOPIC]: add an entry here 

94 # "pm_jobs": PmJobsTopic will be added manually because it needs other parameters 

95 } 

96 

97 map_target_version_to_int = { 

98 "1.0": 1000, 

99 "1.1": 1001, 

100 "1.2": 1002, 

101 # Add new versions here 

102 } 

103 

104 def __init__(self, authenticator): 

105 self.db = None 

106 self.fs = None 

107 self.msg = None 

108 self.authconn = None 

109 self.config = None 

110 # self.operations = None 

111 self.logger = logging.getLogger("nbi.engine") 

112 self.map_topic = {} 

113 self.write_lock = None 

114 # self.token_cache = token_cache 

115 self.authenticator = authenticator 

116 

117 def start(self, config): 

118 """ 

119 Connect to database, filesystem storage, and messaging 

120 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', 

121 :return: None 

122 """ 

123 self.config = config 

124 # check right version of common 

125 if versiontuple(common_version) < versiontuple(min_common_version): 

126 raise EngineException( 

127 "Not compatible osm/common version '{}'. Needed '{}' or higher".format( 

128 common_version, min_common_version 

129 ) 

130 ) 

131 

132 try: 

133 if not self.db: 

134 if config["database"]["driver"] == "mongo": 

135 self.db = dbmongo.DbMongo() 

136 self.db.db_connect(config["database"]) 

137 elif config["database"]["driver"] == "memory": 

138 self.db = dbmemory.DbMemory() 

139 self.db.db_connect(config["database"]) 

140 else: 

141 raise EngineException( 

142 "Invalid configuration param '{}' at '[database]':'driver'".format( 

143 config["database"]["driver"] 

144 ) 

145 ) 

146 if not self.fs: 

147 if config["storage"]["driver"] == "local": 

148 self.fs = fslocal.FsLocal() 

149 self.fs.fs_connect(config["storage"]) 

150 elif config["storage"]["driver"] == "mongo": 

151 self.fs = fsmongo.FsMongo() 

152 self.fs.fs_connect(config["storage"]) 

153 else: 

154 raise EngineException( 

155 "Invalid configuration param '{}' at '[storage]':'driver'".format( 

156 config["storage"]["driver"] 

157 ) 

158 ) 

159 if not self.msg: 

160 if config["message"]["driver"] == "local": 

161 self.msg = msglocal.MsgLocal() 

162 self.msg.connect(config["message"]) 

163 elif config["message"]["driver"] == "kafka": 

164 self.msg = msgkafka.MsgKafka() 

165 self.msg.connect(config["message"]) 

166 else: 

167 raise EngineException( 

168 "Invalid configuration param '{}' at '[message]':'driver'".format( 

169 config["message"]["driver"] 

170 ) 

171 ) 

172 if not self.authconn: 

173 if config["authentication"]["backend"] == "keystone": 

174 self.authconn = AuthconnKeystone( 

175 config["authentication"], 

176 self.db, 

177 self.authenticator.role_permissions, 

178 ) 

179 elif config["authentication"]["backend"] == "tacacs": 

180 self.authconn = AuthconnTacacs( 

181 config["authentication"], 

182 self.db, 

183 self.authenticator.role_permissions, 

184 ) 

185 else: 

186 self.authconn = AuthconnInternal( 

187 config["authentication"], 

188 self.db, 

189 self.authenticator.role_permissions, 

190 ) 

191 # if not self.operations: 

192 # if "resources_to_operations" in config["rbac"]: 

193 # resources_to_operations_file = config["rbac"]["resources_to_operations"] 

194 # else: 

195 # possible_paths = ( 

196 # __file__[:__file__.rfind("engine.py")] + "resources_to_operations.yml", 

197 # "./resources_to_operations.yml" 

198 # ) 

199 # for config_file in possible_paths: 

200 # if path.isfile(config_file): 

201 # resources_to_operations_file = config_file 

202 # break 

203 # if not resources_to_operations_file: 

204 # raise EngineException("Invalid permission configuration:" 

205 # "resources_to_operations file missing") 

206 # 

207 # with open(resources_to_operations_file, 'r') as f: 

208 # resources_to_operations = yaml.safeload(f) 

209 # 

210 # self.operations = [] 

211 # 

212 # for _, value in resources_to_operations["resources_to_operations"].items(): 

213 # if value not in self.operations: 

214 # self.operations += [value] 

215 

216 self.write_lock = Lock() 

217 # create one class per topic 

218 for topic, topic_class in self.map_from_topic_to_class.items(): 

219 # if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth): 

220 # self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) 

221 self.map_topic[topic] = topic_class( 

222 self.db, self.fs, self.msg, self.authconn 

223 ) 

224 

225 self.map_topic["pm_jobs"] = PmJobsTopic( 

226 self.db, 

227 config["prometheus"].get("host"), 

228 config["prometheus"].get("port"), 

229 ) 

230 except (DbException, FsException, MsgException) as e: 

231 raise EngineException(str(e), http_code=e.http_code) 

232 

233 def stop(self): 

234 try: 

235 if self.db: 

236 self.db.db_disconnect() 

237 if self.fs: 

238 self.fs.fs_disconnect() 

239 if self.msg: 

240 self.msg.disconnect() 

241 self.write_lock = None 

242 except (DbException, FsException, MsgException) as e: 

243 raise EngineException(str(e), http_code=e.http_code) 

244 

245 def new_item( 

246 self, rollback, session, topic, indata=None, kwargs=None, headers=None 

247 ): 

248 """ 

249 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry, 

250 that must be completed with a call to method upload_content 

251 :param rollback: list to append created items at database in case a rollback must to be done 

252 :param session: contains the used login username and working project, force to avoid checkins, public 

253 :param topic: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds 

254 :param indata: data to be inserted 

255 :param kwargs: used to override the indata descriptor 

256 :param headers: http request headers 

257 :return: _id: identity of the inserted data. 

258 """ 

259 if topic not in self.map_topic: 

260 raise EngineException( 

261 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

262 ) 

263 with self.write_lock: 

264 return self.map_topic[topic].new(rollback, session, indata, kwargs, headers) 

265 

266 def upload_content(self, session, topic, _id, indata, kwargs, headers): 

267 """ 

268 Upload content for an already created entry (_id) 

269 :param session: contains the used login username and working project 

270 :param topic: it can be: users, projects, vnfds, nsds, 

271 :param _id: server id of the item 

272 :param indata: data to be inserted 

273 :param kwargs: used to override the indata descriptor 

274 :param headers: http request headers 

275 :return: _id: identity of the inserted data. 

276 """ 

277 if topic not in self.map_topic: 

278 raise EngineException( 

279 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

280 ) 

281 with self.write_lock: 

282 return self.map_topic[topic].upload_content( 

283 session, _id, indata, kwargs, headers 

284 ) 

285 

286 def get_item_list(self, session, topic, filter_q=None, api_req=False): 

287 """ 

288 Get a list of items 

289 :param session: contains the used login username and working project 

290 :param topic: it can be: users, projects, vnfds, nsds, ... 

291 :param filter_q: filter of data to be applied 

292 :param api_req: True if this call is serving an external API request. False if serving internal request. 

293 :return: The list, it can be empty if no one match the filter_q. 

294 """ 

295 if topic not in self.map_topic: 

296 raise EngineException( 

297 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

298 ) 

299 return self.map_topic[topic].list(session, filter_q, api_req) 

300 

301 def get_item(self, session, topic, _id, filter_q=None, api_req=False): 

302 """ 

303 Get complete information on an item 

304 :param session: contains the used login username and working project 

305 :param topic: it can be: users, projects, vnfds, nsds, 

306 :param _id: server id of the item 

307 :param filter_q: other arguments 

308 :param api_req: True if this call is serving an external API request. False if serving internal request. 

309 :return: dictionary, raise exception if not found. 

310 """ 

311 if topic not in self.map_topic: 

312 raise EngineException( 

313 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

314 ) 

315 return self.map_topic[topic].show(session, _id, filter_q, api_req) 

316 

317 def get_file(self, session, topic, _id, path=None, accept_header=None): 

318 """ 

319 Get descriptor package or artifact file content 

320 :param session: contains the used login username and working project 

321 :param topic: it can be: users, projects, vnfds, nsds, 

322 :param _id: server id of the item 

323 :param path: artifact path or "$DESCRIPTOR" or None 

324 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain 

325 :return: opened file plus Accept format or raises an exception 

326 """ 

327 if topic not in self.map_topic: 

328 raise EngineException( 

329 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

330 ) 

331 return self.map_topic[topic].get_file(session, _id, path, accept_header) 

332 

333 def del_item_list(self, session, topic, _filter=None): 

334 """ 

335 Delete a list of items 

336 :param session: contains the used login username and working project 

337 :param topic: it can be: users, projects, vnfds, nsds, ... 

338 :param _filter: filter of data to be applied 

339 :return: The deleted list, it can be empty if no one match the _filter. 

340 """ 

341 if topic not in self.map_topic: 

342 raise EngineException( 

343 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

344 ) 

345 with self.write_lock: 

346 return self.map_topic[topic].delete_list(session, _filter) 

347 

348 def del_item(self, session, topic, _id, not_send_msg=None): 

349 """ 

350 Delete item by its internal id 

351 :param session: contains the used login username and working project 

352 :param topic: it can be: users, projects, vnfds, nsds, ... 

353 :param _id: server id of the item 

354 :param not_send_msg: If False, message will not be sent to kafka. 

355 If a list, message is not sent, but content is stored in this variable so that the caller can send this 

356 message using its own loop. If None, message is sent 

357 :return: dictionary with deleted item _id. It raises exception if not found. 

358 """ 

359 if topic not in self.map_topic: 

360 raise EngineException( 

361 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

362 ) 

363 with self.write_lock: 

364 return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg) 

365 

366 def edit_item(self, session, topic, _id, indata=None, kwargs=None): 

367 """ 

368 Update an existing entry at database 

369 :param session: contains the used login username and working project 

370 :param topic: it can be: users, projects, vnfds, nsds, ... 

371 :param _id: identifier to be updated 

372 :param indata: data to be inserted 

373 :param kwargs: used to override the indata descriptor 

374 :return: dictionary with edited item _id, raise exception if not found. 

375 """ 

376 if topic not in self.map_topic: 

377 raise EngineException( 

378 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

379 ) 

380 with self.write_lock: 

381 return self.map_topic[topic].edit(session, _id, indata, kwargs) 

382 

383 def cancel_item( 

384 self, rollback, session, topic, indata=None, kwargs=None, headers=None 

385 ): 

386 """ 

387 Cancels an item 

388 :param rollback: list to append created items at database in case a rollback must to be done 

389 :param session: contains the used login username and working project, force to avoid checkins, public 

390 :param topic: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds 

391 :param indata: data to be inserted 

392 :param kwargs: used to override the indata descriptor 

393 :param headers: http request headers 

394 :return: _id: identity of the inserted data. 

395 """ 

396 if topic not in self.map_topic: 

397 raise EngineException( 

398 "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR 

399 ) 

400 with self.write_lock: 

401 self.map_topic[topic].cancel(rollback, session, indata, kwargs, headers) 

402 

403 def upgrade_db(self, current_version, target_version): 

404 if target_version not in self.map_target_version_to_int.keys(): 

405 raise EngineException( 

406 "Cannot upgrade to version '{}' with this version of code".format( 

407 target_version 

408 ), 

409 http_code=HTTPStatus.INTERNAL_SERVER_ERROR, 

410 ) 

411 

412 if current_version == target_version: 

413 return 

414 

415 target_version_int = self.map_target_version_to_int[target_version] 

416 

417 if not current_version: 

418 # create database version 

419 serial = urandom(32) 

420 version_data = { 

421 "_id": "version", # Always "version" 

422 "version_int": 1000, # version number 

423 "version": "1.0", # version text 

424 "date": "2018-10-25", # version date 

425 "description": "added serial", # changes in this version 

426 "status": "ENABLED", # ENABLED, DISABLED (migration in process), ERROR, 

427 "serial": b64encode(serial), 

428 } 

429 self.db.create("admin", version_data) 

430 self.db.set_secret_key(serial) 

431 current_version = "1.0" 

432 

433 if ( 

434 current_version in ("1.0", "1.1") 

435 and target_version_int >= self.map_target_version_to_int["1.2"] 

436 ): 

437 if self.config["authentication"]["backend"] == "internal": 

438 self.db.del_list("roles") 

439 

440 version_data = { 

441 "_id": "version", 

442 "version_int": 1002, 

443 "version": "1.2", 

444 "date": "2019-06-11", 

445 "description": "set new format for roles_operations", 

446 } 

447 

448 self.db.set_one("admin", {"_id": "version"}, version_data) 

449 current_version = "1.2" 

450 # TODO add future migrations here 

451 

452 def init_db(self, target_version="1.0"): 

453 """ 

454 Init database if empty. If not empty it checks that database version and migrates if needed 

455 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version' 

456 :param target_version: check desired database version. Migrate to it if possible or raises exception 

457 :return: None if ok, exception if error or if the version is different. 

458 """ 

459 

460 version_data = self.db.get_one( 

461 "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True 

462 ) 

463 # check database status is ok 

464 if version_data and version_data.get("status") != "ENABLED": 

465 raise EngineException( 

466 "Wrong database status '{}'".format(version_data["status"]), 

467 HTTPStatus.INTERNAL_SERVER_ERROR, 

468 ) 

469 

470 # check version 

471 db_version = None if not version_data else version_data.get("version") 

472 if db_version != target_version: 

473 self.upgrade_db(db_version, target_version) 

474 

475 return