Coverage for n2vc/n2vc_conn.py: 57%

134 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:03 +0000

1## 

2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. 

3# This file is part of OSM 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19# For those usages not covered by the Apache License, Version 2.0 please 

20# contact with: nfvlabs@tid.es 

21## 

22 

23 

24import abc 

25import asyncio 

26from http import HTTPStatus 

27from shlex import quote 

28import os 

29import shlex 

30import subprocess 

31import time 

32 

33from n2vc.exceptions import N2VCBadArgumentsException 

34from osm_common.dbmongo import DbException 

35import yaml 

36 

37from n2vc.loggable import Loggable 

38from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus 

39 

40 

41class N2VCConnector(abc.ABC, Loggable): 

42 """Generic N2VC connector 

43 

44 Abstract class 

45 """ 

46 

47 """ 

48 #################################################################################### 

49 ################################### P U B L I C #################################### 

50 #################################################################################### 

51 """ 

52 

53 def __init__( 

54 self, 

55 db: object, 

56 fs: object, 

57 log: object, 

58 on_update_db=None, 

59 **kwargs, 

60 ): 

61 """Initialize N2VC abstract connector. It defines de API for VCA connectors 

62 

63 :param object db: Mongo object managing the MongoDB (repo common DbBase) 

64 :param object fs: FileSystem object managing the package artifacts (repo common 

65 FsBase) 

66 :param object log: the logging object to log to 

67 :param on_update_db: callback called when n2vc connector updates database. 

68 Received arguments: 

69 table: e.g. "nsrs" 

70 filter: e.g. {_id: <nsd-id> } 

71 path: e.g. "_admin.deployed.VCA.3." 

72 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }" 

73 """ 

74 

75 # parent class 

76 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC") 

77 

78 # check arguments 

79 if db is None: 

80 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"]) 

81 if fs is None: 

82 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"]) 

83 

84 # store arguments into self 

85 self.db = db 

86 self.fs = fs 

87 self.on_update_db = on_update_db 

88 

89 # generate private/public key-pair 

90 self.private_key_path = None 

91 self.public_key_path = None 

92 

93 @abc.abstractmethod 

94 async def get_status(self, namespace: str, yaml_format: bool = True): 

95 """Get namespace status 

96 

97 :param namespace: we obtain ns from namespace 

98 :param yaml_format: returns a yaml string 

99 """ 

100 

101 # TODO: review which public key 

102 def get_public_key(self) -> str: 

103 """Get the VCA ssh-public-key 

104 

105 Returns the SSH public key from local mahine, to be injected into virtual 

106 machines to be managed by the VCA. 

107 First run, a ssh keypair will be created. 

108 The public key is injected into a VM so that we can provision the 

109 machine with Juju, after which Juju will communicate with the VM 

110 directly via the juju agent. 

111 """ 

112 

113 # Find the path where we expect our key lives (~/.ssh) 

114 homedir = os.environ.get("HOME") 

115 if not homedir: 

116 self.log.warning("No HOME environment variable, using /tmp") 

117 homedir = "/tmp" 

118 sshdir = "{}/.ssh".format(homedir) 

119 sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir))) 

120 if not os.path.exists(sshdir): 

121 os.mkdir(sshdir) 

122 

123 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir) 

124 self.private_key_path = os.path.realpath( 

125 os.path.normpath(os.path.abspath(self.private_key_path)) 

126 ) 

127 self.public_key_path = "{}.pub".format(self.private_key_path) 

128 self.public_key_path = os.path.realpath( 

129 os.path.normpath(os.path.abspath(self.public_key_path)) 

130 ) 

131 

132 # If we don't have a key generated, then we have to generate it using ssh-keygen 

133 if not os.path.exists(self.private_key_path): 

134 command = "ssh-keygen -t {} -b {} -N '' -f {}".format( 

135 "rsa", "4096", quote(self.private_key_path) 

136 ) 

137 # run command with arguments 

138 args = shlex.split(command) 

139 subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

140 

141 # Read the public key. Only one public key (one line) in the file 

142 with open(self.public_key_path, "r") as file: 

143 public_key = file.readline() 

144 

145 return public_key 

146 

147 @abc.abstractmethod 

148 async def create_execution_environment( 

149 self, 

150 namespace: str, 

151 db_dict: dict, 

152 reuse_ee_id: str = None, 

153 progress_timeout: float = None, 

154 total_timeout: float = None, 

155 ) -> tuple[str, dict]: 

156 """Create an Execution Environment. Returns when it is created or raises an 

157 exception on failing 

158 

159 :param str namespace: Contains a dot separate string. 

160 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>] 

161 :param dict db_dict: where to write to database when the status changes. 

162 It contains a dictionary with {collection: str, filter: {}, path: str}, 

163 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: 

164 "_admin.deployed.VCA.3"} 

165 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an 

166 older environment 

167 :param float progress_timeout: 

168 :param float total_timeout: 

169 :returns str, dict: id of the new execution environment and credentials for it 

170 (credentials can contains hostname, username, etc depending on 

171 underlying cloud) 

172 """ 

173 

174 @abc.abstractmethod 

175 async def register_execution_environment( 

176 self, 

177 namespace: str, 

178 credentials: dict, 

179 db_dict: dict, 

180 progress_timeout: float = None, 

181 total_timeout: float = None, 

182 ) -> str: 

183 """ 

184 Register an existing execution environment at the VCA 

185 

186 :param str namespace: same as create_execution_environment method 

187 :param dict credentials: credentials to access the existing execution 

188 environment 

189 (it can contains hostname, username, path to private key, etc depending on 

190 underlying cloud) 

191 :param dict db_dict: where to write to database when the status changes. 

192 It contains a dictionary with {collection: str, filter: {}, path: str}, 

193 e.g. {collection: "nsrs", filter: 

194 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

195 :param float progress_timeout: 

196 :param float total_timeout: 

197 :returns str: id of the execution environment 

198 """ 

199 

200 @abc.abstractmethod 

201 async def install_configuration_sw( 

202 self, 

203 ee_id: str, 

204 artifact_path: str, 

205 db_dict: dict, 

206 progress_timeout: float = None, 

207 total_timeout: float = None, 

208 ): 

209 """ 

210 Install the software inside the execution environment identified by ee_id 

211 

212 :param str ee_id: the id of the execution environment returned by 

213 create_execution_environment or register_execution_environment 

214 :param str artifact_path: where to locate the artifacts (parent folder) using 

215 the self.fs 

216 the final artifact path will be a combination of this artifact_path and 

217 additional string from the config_dict (e.g. charm name) 

218 :param dict db_dict: where to write into database when the status changes. 

219 It contains a dict with 

220 {collection: <str>, filter: {}, path: <str>}, 

221 e.g. {collection: "nsrs", filter: 

222 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

223 :param float progress_timeout: 

224 :param float total_timeout: 

225 """ 

226 

227 @abc.abstractmethod 

228 async def install_k8s_proxy_charm( 

229 self, 

230 charm_name: str, 

231 namespace: str, 

232 artifact_path: str, 

233 db_dict: dict, 

234 progress_timeout: float = None, 

235 total_timeout: float = None, 

236 config: dict = None, 

237 ) -> str: 

238 """ 

239 Install a k8s proxy charm 

240 

241 :param charm_name: Name of the charm being deployed 

242 :param namespace: collection of all the uuids related to the charm. 

243 :param str artifact_path: where to locate the artifacts (parent folder) using 

244 the self.fs 

245 the final artifact path will be a combination of this artifact_path and 

246 additional string from the config_dict (e.g. charm name) 

247 :param dict db_dict: where to write into database when the status changes. 

248 It contains a dict with 

249 {collection: <str>, filter: {}, path: <str>}, 

250 e.g. {collection: "nsrs", filter: 

251 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

252 :param float progress_timeout: 

253 :param float total_timeout: 

254 :param config: Dictionary with additional configuration 

255 

256 :returns ee_id: execution environment id. 

257 """ 

258 

259 @abc.abstractmethod 

260 async def get_ee_ssh_public__key( 

261 self, 

262 ee_id: str, 

263 db_dict: dict, 

264 progress_timeout: float = None, 

265 total_timeout: float = None, 

266 ) -> str: 

267 """ 

268 Generate a priv/pub key pair in the execution environment and return the public 

269 key 

270 

271 :param str ee_id: the id of the execution environment returned by 

272 create_execution_environment or register_execution_environment 

273 :param dict db_dict: where to write into database when the status changes. 

274 It contains a dict with 

275 {collection: <str>, filter: {}, path: <str>}, 

276 e.g. {collection: "nsrs", filter: 

277 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

278 :param float progress_timeout: 

279 :param float total_timeout: 

280 :returns: public key of the execution environment 

281 For the case of juju proxy charm ssh-layered, it is the one 

282 returned by 'get-ssh-public-key' primitive. 

283 It raises a N2VC exception if fails 

284 """ 

285 

286 @abc.abstractmethod 

287 async def add_relation( 

288 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str 

289 ): 

290 """ 

291 Add a relation between two Execution Environments (using their associated 

292 endpoints). 

293 

294 :param str ee_id_1: The id of the first execution environment 

295 :param str ee_id_2: The id of the second execution environment 

296 :param str endpoint_1: The endpoint in the first execution environment 

297 :param str endpoint_2: The endpoint in the second execution environment 

298 """ 

299 

300 # TODO 

301 @abc.abstractmethod 

302 async def remove_relation(self): 

303 """ """ 

304 

305 # TODO 

306 @abc.abstractmethod 

307 async def deregister_execution_environments(self): 

308 """ """ 

309 

310 @abc.abstractmethod 

311 async def delete_namespace( 

312 self, namespace: str, db_dict: dict = None, total_timeout: float = None 

313 ): 

314 """ 

315 Remove a network scenario and its execution environments 

316 :param namespace: [<nsi-id>].<ns-id> 

317 :param dict db_dict: where to write into database when the status changes. 

318 It contains a dict with 

319 {collection: <str>, filter: {}, path: <str>}, 

320 e.g. {collection: "nsrs", filter: 

321 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

322 :param float total_timeout: 

323 """ 

324 

325 @abc.abstractmethod 

326 async def delete_execution_environment( 

327 self, ee_id: str, db_dict: dict = None, total_timeout: float = None 

328 ): 

329 """ 

330 Delete an execution environment 

331 :param str ee_id: id of the execution environment to delete 

332 :param dict db_dict: where to write into database when the status changes. 

333 It contains a dict with 

334 {collection: <str>, filter: {}, path: <str>}, 

335 e.g. {collection: "nsrs", filter: 

336 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

337 :param float total_timeout: 

338 """ 

339 

340 @abc.abstractmethod 

341 async def upgrade_charm( 

342 self, 

343 ee_id: str = None, 

344 path: str = None, 

345 charm_id: str = None, 

346 charm_type: str = None, 

347 timeout: float = None, 

348 ) -> str: 

349 """This method upgrade charms in VNFs 

350 

351 Args: 

352 ee_id: Execution environment id 

353 path: Local path to the charm 

354 charm_id: charm-id 

355 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm 

356 timeout: (Float) Timeout for the ns update operation 

357 

358 Returns: 

359 The output of the update operation if status equals to "completed" 

360 """ 

361 

362 @abc.abstractmethod 

363 async def exec_primitive( 

364 self, 

365 ee_id: str, 

366 primitive_name: str, 

367 params_dict: dict, 

368 db_dict: dict = None, 

369 progress_timeout: float = None, 

370 total_timeout: float = None, 

371 ) -> str: 

372 """ 

373 Execute a primitive in the execution environment 

374 

375 :param str ee_id: the one returned by create_execution_environment or 

376 register_execution_environment 

377 :param str primitive_name: must be one defined in the software. There is one 

378 called 'config', where, for the proxy case, the 'credentials' of VM are 

379 provided 

380 :param dict params_dict: parameters of the action 

381 :param dict db_dict: where to write into database when the status changes. 

382 It contains a dict with 

383 {collection: <str>, filter: {}, path: <str>}, 

384 e.g. {collection: "nsrs", filter: 

385 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} 

386 :param float progress_timeout: 

387 :param float total_timeout: 

388 :returns str: primitive result, if ok. It raises exceptions in case of fail 

389 """ 

390 

391 async def disconnect(self): 

392 """ 

393 Disconnect from VCA 

394 """ 

395 

396 """ 

397 #################################################################################### 

398 ################################### P R I V A T E ################################## 

399 #################################################################################### 

400 """ 

401 

402 def _get_namespace_components( 

403 self, namespace: str 

404 ) -> tuple[str, str, str, str, str]: 

405 """ 

406 Split namespace components 

407 

408 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>] 

409 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count 

410 """ 

411 

412 # check parameters 

413 if namespace is None or len(namespace) == 0: 

414 raise N2VCBadArgumentsException( 

415 "Argument namespace is mandatory", ["namespace"] 

416 ) 

417 

418 # split namespace components 

419 parts = namespace.split(".") 

420 nsi_id = None 

421 ns_id = None 

422 vnf_id = None 

423 vdu_id = None 

424 vdu_count = None 

425 if len(parts) > 0 and len(parts[0]) > 0: 

426 nsi_id = parts[0] 

427 if len(parts) > 1 and len(parts[1]) > 0: 

428 ns_id = parts[1] 

429 if len(parts) > 2 and len(parts[2]) > 0: 

430 vnf_id = parts[2] 

431 if len(parts) > 3 and len(parts[3]) > 0: 

432 vdu_id = parts[3] 

433 vdu_parts = parts[3].split("-") 

434 if len(vdu_parts) > 1: 

435 vdu_id = vdu_parts[0] 

436 vdu_count = vdu_parts[1] 

437 

438 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count 

439 

440 async def write_app_status_to_db( 

441 self, 

442 db_dict: dict, 

443 status: N2VCDeploymentStatus, 

444 detailed_status: str, 

445 vca_status: str, 

446 entity_type: str, 

447 vca_id: str = None, 

448 ): 

449 """ 

450 Write application status to database 

451 

452 :param: db_dict: DB dictionary 

453 :param: status: Status of the application 

454 :param: detailed_status: Detailed status 

455 :param: vca_status: VCA status 

456 :param: entity_type: Entity type ("application", "machine, and "action") 

457 :param: vca_id: Id of the VCA. If None, the default VCA will be used. 

458 """ 

459 if not db_dict: 

460 self.log.debug("No db_dict => No database write") 

461 return 

462 

463 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}' 

464 # .format(str(status.value), detailed_status, vca_status, entity_type)) 

465 

466 try: 

467 the_table = db_dict["collection"] 

468 the_filter = db_dict["filter"] 

469 the_path = db_dict["path"] 

470 if not the_path[-1] == ".": 

471 the_path = the_path + "." 

472 update_dict = { 

473 the_path + "status": str(status.value), 

474 the_path + "detailed-status": detailed_status, 

475 the_path + "VCA-status": vca_status, 

476 the_path + "entity-type": entity_type, 

477 the_path + "status-time": str(time.time()), 

478 } 

479 

480 self.db.set_one( 

481 table=the_table, 

482 q_filter=the_filter, 

483 update_dict=update_dict, 

484 fail_on_empty=True, 

485 ) 

486 

487 # database callback 

488 if self.on_update_db: 

489 if asyncio.iscoroutinefunction(self.on_update_db): 

490 await self.on_update_db( 

491 the_table, the_filter, the_path, update_dict, vca_id=vca_id 

492 ) 

493 else: 

494 self.on_update_db( 

495 the_table, the_filter, the_path, update_dict, vca_id=vca_id 

496 ) 

497 

498 except DbException as e: 

499 if e.http_code == HTTPStatus.NOT_FOUND: 

500 self.log.error( 

501 "NOT_FOUND error: Exception writing status to database: {}".format( 

502 e 

503 ) 

504 ) 

505 else: 

506 self.log.info("Exception writing status to database: {}".format(e)) 

507 

508 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus: 

509 if status not in JujuStatusToOSM[entity_type]: 

510 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status)) 

511 return N2VCDeploymentStatus.UNKNOWN 

512 return JujuStatusToOSM[entity_type][status] 

513 

514 

515def obj_to_yaml(obj: object) -> str: 

516 # dump to yaml 

517 dump_text = yaml.dump(obj, default_flow_style=False, indent=2) 

518 # split lines 

519 lines = dump_text.splitlines() 

520 # remove !!python/object tags 

521 yaml_text = "" 

522 for line in lines: 

523 index = line.find("!!python/object") 

524 if index >= 0: 

525 line = line[:index] 

526 yaml_text += line + "\n" 

527 return yaml_text 

528 

529 

530def obj_to_dict(obj: object) -> dict: 

531 # convert obj to yaml 

532 yaml_text = obj_to_yaml(obj) 

533 # parse to dict 

534 return yaml.load(yaml_text, Loader=yaml.SafeLoader)