LCM Helm connector integration
[osm/LCM.git] / osm_lcm / vim_sdn.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 from osm_lcm import ROclient
24 from osm_lcm.lcm_utils import LcmException, LcmBase
25 from n2vc.k8s_helm_conn import K8sHelmConnector
26 from osm_common.dbbase import DbException
27 from copy import deepcopy
28
29 __author__ = "Alfonso Tierno"
30
31
32 class VimLcm(LcmBase):
33 # values that are encrypted at vim config because they are passwords
34 vim_config_encrypted = {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
35 "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
36
37 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
38 """
39 Init, Connect to database, filesystem storage, and messaging
40 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
41 :return: None
42 """
43
44 self.logger = logging.getLogger('lcm.vim')
45 self.loop = loop
46 self.lcm_tasks = lcm_tasks
47 self.ro_config = ro_config
48
49 super().__init__(db, msg, fs, self.logger)
50
51 async def create(self, vim_content, order_id):
52
53 # HA tasks and backward compatibility:
54 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
55 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
56 # Register 'create' task here for related future HA operations
57 op_id = vim_content.pop('op_id', None)
58 if not self.lcm_tasks.lock_HA('vim', 'create', op_id):
59 return
60
61 vim_id = vim_content["_id"]
62 vim_content.pop("op_id", None)
63 logging_text = "Task vim_create={} ".format(vim_id)
64 self.logger.debug(logging_text + "Enter")
65
66 db_vim = None
67 db_vim_update = {}
68 exc = None
69 RO_sdn_id = None
70 operationState_HA = ''
71 detailed_status_HA = ''
72 try:
73 step = "Getting vim-id='{}' from db".format(vim_id)
74 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
75 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
76 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
77 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
78
79 # If the VIM account has an associated SDN account, also
80 # wait for any previous tasks in process for the SDN
81 await self.lcm_tasks.waitfor_related_HA('sdn', 'ANY', db_sdn["_id"])
82
83 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
84 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
85 else:
86 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
87 vim_content["config"]["sdn-controller"]))
88
89 step = "Creating vim at RO"
90 db_vim_update["_admin.deployed.RO"] = None
91 db_vim_update["_admin.detailed-status"] = step
92 self.update_db_2("vim_accounts", vim_id, db_vim_update)
93 RO = ROclient.ROClient(self.loop, **self.ro_config)
94 vim_RO = deepcopy(vim_content)
95 vim_RO.pop("_id", None)
96 vim_RO.pop("_admin", None)
97 schema_version = vim_RO.pop("schema_version", None)
98 vim_RO.pop("schema_type", None)
99 vim_RO.pop("vim_tenant_name", None)
100 vim_RO["type"] = vim_RO.pop("vim_type")
101 vim_RO.pop("vim_user", None)
102 vim_RO.pop("vim_password", None)
103 if RO_sdn_id:
104 vim_RO["config"]["sdn-controller"] = RO_sdn_id
105 desc = await RO.create("vim", descriptor=vim_RO)
106 RO_vim_id = desc["uuid"]
107 db_vim_update["_admin.deployed.RO"] = RO_vim_id
108 self.logger.debug(logging_text + "VIM created at RO_vim_id={}".format(RO_vim_id))
109
110 step = "Creating vim_account at RO"
111 db_vim_update["_admin.detailed-status"] = step
112 self.update_db_2("vim_accounts", vim_id, db_vim_update)
113
114 if vim_content.get("vim_password"):
115 vim_content["vim_password"] = self.db.decrypt(vim_content["vim_password"],
116 schema_version=schema_version,
117 salt=vim_id)
118 vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
119 "vim_username": vim_content["vim_user"],
120 "vim_password": vim_content["vim_password"]
121 }
122 if vim_RO.get("config"):
123 vim_account_RO["config"] = vim_RO["config"]
124 if "sdn-controller" in vim_account_RO["config"]:
125 del vim_account_RO["config"]["sdn-controller"]
126 if "sdn-port-mapping" in vim_account_RO["config"]:
127 del vim_account_RO["config"]["sdn-port-mapping"]
128 vim_config_encrypted_keys = self.vim_config_encrypted.get(schema_version) or \
129 self.vim_config_encrypted.get("default")
130 for p in vim_config_encrypted_keys:
131 if vim_account_RO["config"].get(p):
132 vim_account_RO["config"][p] = self.db.decrypt(vim_account_RO["config"][p],
133 schema_version=schema_version,
134 salt=vim_id)
135
136 desc = await RO.attach("vim_account", RO_vim_id, descriptor=vim_account_RO)
137 db_vim_update["_admin.deployed.RO-account"] = desc["uuid"]
138 db_vim_update["_admin.operationalState"] = "ENABLED"
139 db_vim_update["_admin.detailed-status"] = "Done"
140 # Mark the VIM 'create' HA task as successful
141 operationState_HA = 'COMPLETED'
142 detailed_status_HA = 'Done'
143
144 # await asyncio.sleep(15) # TODO remove. This is for test
145 self.logger.debug(logging_text + "Exit Ok VIM account created at RO_vim_account_id={}".format(desc["uuid"]))
146 return
147
148 except (ROclient.ROClientException, DbException) as e:
149 self.logger.error(logging_text + "Exit Exception {}".format(e))
150 exc = e
151 except Exception as e:
152 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
153 exc = e
154 finally:
155 if exc and db_vim:
156 db_vim_update["_admin.operationalState"] = "ERROR"
157 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
158 # Mark the VIM 'create' HA task as erroneous
159 operationState_HA = 'FAILED'
160 detailed_status_HA = "ERROR {}: {}".format(step, exc)
161 try:
162 if db_vim_update:
163 self.update_db_2("vim_accounts", vim_id, db_vim_update)
164 # Register the VIM 'create' HA task either
165 # succesful or erroneous, or do nothing (if legacy NBI)
166 self.lcm_tasks.register_HA('vim', 'create', op_id,
167 operationState=operationState_HA,
168 detailed_status=detailed_status_HA)
169 except DbException as e:
170 self.logger.error(logging_text + "Cannot update database: {}".format(e))
171
172 self.lcm_tasks.remove("vim_account", vim_id, order_id)
173
174 async def edit(self, vim_content, order_id):
175
176 # HA tasks and backward compatibility:
177 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
178 # In such a case, HA is not supported by NBI, and the HA check always returns True
179 op_id = vim_content.pop('op_id', None)
180 if not self.lcm_tasks.lock_HA('vim', 'edit', op_id):
181 return
182
183 vim_id = vim_content["_id"]
184 vim_content.pop("op_id", None)
185 logging_text = "Task vim_edit={} ".format(vim_id)
186 self.logger.debug(logging_text + "Enter")
187
188 db_vim = None
189 exc = None
190 RO_sdn_id = None
191 RO_vim_id = None
192 db_vim_update = {}
193 operationState_HA = ''
194 detailed_status_HA = ''
195 step = "Getting vim-id='{}' from db".format(vim_id)
196 try:
197 # wait for any previous tasks in process
198 await self.lcm_tasks.waitfor_related_HA('vim', 'edit', op_id)
199
200 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
201
202 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
203 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
204 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
205 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
206
207 # If the VIM account has an associated SDN account, also
208 # wait for any previous tasks in process for the SDN
209 await self.lcm_tasks.waitfor_related_HA('sdn', 'ANY', db_sdn["_id"])
210
211 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get(
212 "RO"):
213 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
214 else:
215 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
216 vim_content["config"]["sdn-controller"]))
217
218 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
219 step = "Editing vim at RO"
220 RO = ROclient.ROClient(self.loop, **self.ro_config)
221 vim_RO = deepcopy(vim_content)
222 vim_RO.pop("_id", None)
223 vim_RO.pop("_admin", None)
224 schema_version = vim_RO.pop("schema_version", None)
225 vim_RO.pop("schema_type", None)
226 vim_RO.pop("vim_tenant_name", None)
227 if "vim_type" in vim_RO:
228 vim_RO["type"] = vim_RO.pop("vim_type")
229 vim_RO.pop("vim_user", None)
230 vim_RO.pop("vim_password", None)
231 if RO_sdn_id:
232 vim_RO["config"]["sdn-controller"] = RO_sdn_id
233 # TODO make a deep update of sdn-port-mapping
234 if vim_RO:
235 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
236
237 step = "Editing vim-account at RO tenant"
238 vim_account_RO = {}
239 if "config" in vim_content:
240 if "sdn-controller" in vim_content["config"]:
241 del vim_content["config"]["sdn-controller"]
242 if "sdn-port-mapping" in vim_content["config"]:
243 del vim_content["config"]["sdn-port-mapping"]
244 if not vim_content["config"]:
245 del vim_content["config"]
246 if "vim_tenant_name" in vim_content:
247 vim_account_RO["vim_tenant_name"] = vim_content["vim_tenant_name"]
248 if "vim_password" in vim_content:
249 vim_account_RO["vim_password"] = vim_content["vim_password"]
250 if vim_content.get("vim_password"):
251 vim_account_RO["vim_password"] = self.db.decrypt(vim_content["vim_password"],
252 schema_version=schema_version,
253 salt=vim_id)
254 if "config" in vim_content:
255 vim_account_RO["config"] = vim_content["config"]
256 if vim_content.get("config"):
257 vim_config_encrypted_keys = self.vim_config_encrypted.get(schema_version) or \
258 self.vim_config_encrypted.get("default")
259 for p in vim_config_encrypted_keys:
260 if vim_content["config"].get(p):
261 vim_account_RO["config"][p] = self.db.decrypt(vim_content["config"][p],
262 schema_version=schema_version,
263 salt=vim_id)
264
265 if "vim_user" in vim_content:
266 vim_content["vim_username"] = vim_content["vim_user"]
267 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
268 # vim_thread. RO will remove and relaunch a new thread for this vim_account
269 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
270 db_vim_update["_admin.operationalState"] = "ENABLED"
271 # Mark the VIM 'edit' HA task as successful
272 operationState_HA = 'COMPLETED'
273 detailed_status_HA = 'Done'
274
275 self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id))
276 return
277
278 except (ROclient.ROClientException, DbException) as e:
279 self.logger.error(logging_text + "Exit Exception {}".format(e))
280 exc = e
281 except Exception as e:
282 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
283 exc = e
284 finally:
285 if exc and db_vim:
286 db_vim_update["_admin.operationalState"] = "ERROR"
287 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
288 # Mark the VIM 'edit' HA task as erroneous
289 operationState_HA = 'FAILED'
290 detailed_status_HA = "ERROR {}: {}".format(step, exc)
291 try:
292 if db_vim_update:
293 self.update_db_2("vim_accounts", vim_id, db_vim_update)
294 # Register the VIM 'edit' HA task either
295 # succesful or erroneous, or do nothing (if legacy NBI)
296 self.lcm_tasks.register_HA('vim', 'edit', op_id,
297 operationState=operationState_HA,
298 detailed_status=detailed_status_HA)
299 except DbException as e:
300 self.logger.error(logging_text + "Cannot update database: {}".format(e))
301
302 self.lcm_tasks.remove("vim_account", vim_id, order_id)
303
304 async def delete(self, vim_content, order_id):
305
306 # HA tasks and backward compatibility:
307 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
308 # In such a case, HA is not supported by NBI, and the HA check always returns True
309 op_id = vim_content.pop('op_id', None)
310 if not self.lcm_tasks.lock_HA('vim', 'delete', op_id):
311 return
312
313 vim_id = vim_content["_id"]
314 logging_text = "Task vim_delete={} ".format(vim_id)
315 self.logger.debug(logging_text + "Enter")
316
317 db_vim = None
318 db_vim_update = {}
319 exc = None
320 operationState_HA = ''
321 detailed_status_HA = ''
322 step = "Getting vim from db"
323 try:
324 # wait for any previous tasks in process
325 await self.lcm_tasks.waitfor_related_HA('vim', 'delete', op_id)
326
327 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
328 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
329 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
330 RO = ROclient.ROClient(self.loop, **self.ro_config)
331 step = "Detaching vim from RO tenant"
332 try:
333 await RO.detach("vim_account", RO_vim_id)
334 except ROclient.ROClientException as e:
335 if e.http_code == 404: # not found
336 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
337 else:
338 raise
339
340 step = "Deleting vim from RO"
341 try:
342 await RO.delete("vim", RO_vim_id)
343 except ROclient.ROClientException as e:
344 if e.http_code == 404: # not found
345 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
346 else:
347 raise
348 else:
349 # nothing to delete
350 self.logger.error(logging_text + "Nothing to remove at RO")
351 self.db.del_one("vim_accounts", {"_id": vim_id})
352 db_vim = None
353 self.logger.debug(logging_text + "Exit Ok")
354 return
355
356 except (ROclient.ROClientException, DbException) as e:
357 self.logger.error(logging_text + "Exit Exception {}".format(e))
358 exc = e
359 except Exception as e:
360 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
361 exc = e
362 finally:
363 self.lcm_tasks.remove("vim_account", vim_id, order_id)
364 if exc and db_vim:
365 db_vim_update["_admin.operationalState"] = "ERROR"
366 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
367 # Mark the VIM 'delete' HA task as erroneous
368 operationState_HA = 'FAILED'
369 detailed_status_HA = "ERROR {}: {}".format(step, exc)
370 self.lcm_tasks.register_HA('vim', 'delete', op_id,
371 operationState=operationState_HA,
372 detailed_status=detailed_status_HA)
373 try:
374 if db_vim and db_vim_update:
375 self.update_db_2("vim_accounts", vim_id, db_vim_update)
376 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
377 # which means that there is nowhere to register this task, so do nothing here.
378 except DbException as e:
379 self.logger.error(logging_text + "Cannot update database: {}".format(e))
380 self.lcm_tasks.remove("vim_account", vim_id, order_id)
381
382
383 class WimLcm(LcmBase):
384 # values that are encrypted at wim config because they are passwords
385 wim_config_encrypted = ()
386
387 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
388 """
389 Init, Connect to database, filesystem storage, and messaging
390 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
391 :return: None
392 """
393
394 self.logger = logging.getLogger('lcm.vim')
395 self.loop = loop
396 self.lcm_tasks = lcm_tasks
397 self.ro_config = ro_config
398
399 super().__init__(db, msg, fs, self.logger)
400
401 async def create(self, wim_content, order_id):
402
403 # HA tasks and backward compatibility:
404 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
405 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
406 # Register 'create' task here for related future HA operations
407 op_id = wim_content.pop('op_id', None)
408 self.lcm_tasks.lock_HA('wim', 'create', op_id)
409
410 wim_id = wim_content["_id"]
411 wim_content.pop("op_id", None)
412 logging_text = "Task wim_create={} ".format(wim_id)
413 self.logger.debug(logging_text + "Enter")
414
415 db_wim = None
416 db_wim_update = {}
417 exc = None
418 operationState_HA = ''
419 detailed_status_HA = ''
420 try:
421 step = "Getting wim-id='{}' from db".format(wim_id)
422 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
423 db_wim_update["_admin.deployed.RO"] = None
424
425 step = "Creating wim at RO"
426 db_wim_update["_admin.detailed-status"] = step
427 self.update_db_2("wim_accounts", wim_id, db_wim_update)
428 RO = ROclient.ROClient(self.loop, **self.ro_config)
429 wim_RO = deepcopy(wim_content)
430 wim_RO.pop("_id", None)
431 wim_RO.pop("_admin", None)
432 schema_version = wim_RO.pop("schema_version", None)
433 wim_RO.pop("schema_type", None)
434 wim_RO.pop("wim_tenant_name", None)
435 wim_RO["type"] = wim_RO.pop("wim_type")
436 wim_RO.pop("wim_user", None)
437 wim_RO.pop("wim_password", None)
438 desc = await RO.create("wim", descriptor=wim_RO)
439 RO_wim_id = desc["uuid"]
440 db_wim_update["_admin.deployed.RO"] = RO_wim_id
441 self.logger.debug(logging_text + "WIM created at RO_wim_id={}".format(RO_wim_id))
442
443 step = "Creating wim_account at RO"
444 db_wim_update["_admin.detailed-status"] = step
445 self.update_db_2("wim_accounts", wim_id, db_wim_update)
446
447 if wim_content.get("wim_password"):
448 wim_content["wim_password"] = self.db.decrypt(wim_content["wim_password"],
449 schema_version=schema_version,
450 salt=wim_id)
451 wim_account_RO = {"name": wim_content["name"],
452 "user": wim_content["user"],
453 "password": wim_content["password"]
454 }
455 if wim_RO.get("config"):
456 wim_account_RO["config"] = wim_RO["config"]
457 if "wim_port_mapping" in wim_account_RO["config"]:
458 del wim_account_RO["config"]["wim_port_mapping"]
459 for p in self.wim_config_encrypted:
460 if wim_account_RO["config"].get(p):
461 wim_account_RO["config"][p] = self.db.decrypt(wim_account_RO["config"][p],
462 schema_version=schema_version,
463 salt=wim_id)
464
465 desc = await RO.attach("wim_account", RO_wim_id, descriptor=wim_account_RO)
466 db_wim_update["_admin.deployed.RO-account"] = desc["uuid"]
467 db_wim_update["_admin.operationalState"] = "ENABLED"
468 db_wim_update["_admin.detailed-status"] = "Done"
469 # Mark the WIM 'create' HA task as successful
470 operationState_HA = 'COMPLETED'
471 detailed_status_HA = 'Done'
472
473 self.logger.debug(logging_text + "Exit Ok WIM account created at RO_wim_account_id={}".format(desc["uuid"]))
474 return
475
476 except (ROclient.ROClientException, DbException) as e:
477 self.logger.error(logging_text + "Exit Exception {}".format(e))
478 exc = e
479 except Exception as e:
480 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
481 exc = e
482 finally:
483 if exc and db_wim:
484 db_wim_update["_admin.operationalState"] = "ERROR"
485 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
486 # Mark the WIM 'create' HA task as erroneous
487 operationState_HA = 'FAILED'
488 detailed_status_HA = "ERROR {}: {}".format(step, exc)
489 try:
490 if db_wim_update:
491 self.update_db_2("wim_accounts", wim_id, db_wim_update)
492 # Register the WIM 'create' HA task either
493 # succesful or erroneous, or do nothing (if legacy NBI)
494 self.lcm_tasks.register_HA('wim', 'create', op_id,
495 operationState=operationState_HA,
496 detailed_status=detailed_status_HA)
497 except DbException as e:
498 self.logger.error(logging_text + "Cannot update database: {}".format(e))
499 self.lcm_tasks.remove("wim_account", wim_id, order_id)
500
501 async def edit(self, wim_content, order_id):
502
503 # HA tasks and backward compatibility:
504 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
505 # In such a case, HA is not supported by NBI, and the HA check always returns True
506 op_id = wim_content.pop('op_id', None)
507 if not self.lcm_tasks.lock_HA('wim', 'edit', op_id):
508 return
509
510 wim_id = wim_content["_id"]
511 wim_content.pop("op_id", None)
512 logging_text = "Task wim_edit={} ".format(wim_id)
513 self.logger.debug(logging_text + "Enter")
514
515 db_wim = None
516 exc = None
517 RO_wim_id = None
518 db_wim_update = {}
519 step = "Getting wim-id='{}' from db".format(wim_id)
520 operationState_HA = ''
521 detailed_status_HA = ''
522 try:
523 # wait for any previous tasks in process
524 await self.lcm_tasks.waitfor_related_HA('wim', 'edit', op_id)
525
526 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
527
528 if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
529
530 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
531 step = "Editing wim at RO"
532 RO = ROclient.ROClient(self.loop, **self.ro_config)
533 wim_RO = deepcopy(wim_content)
534 wim_RO.pop("_id", None)
535 wim_RO.pop("_admin", None)
536 schema_version = wim_RO.pop("schema_version", None)
537 wim_RO.pop("schema_type", None)
538 wim_RO.pop("wim_tenant_name", None)
539 if "wim_type" in wim_RO:
540 wim_RO["type"] = wim_RO.pop("wim_type")
541 wim_RO.pop("wim_user", None)
542 wim_RO.pop("wim_password", None)
543 # TODO make a deep update of wim_port_mapping
544 if wim_RO:
545 await RO.edit("wim", RO_wim_id, descriptor=wim_RO)
546
547 step = "Editing wim-account at RO tenant"
548 wim_account_RO = {}
549 if "config" in wim_content:
550 if "wim_port_mapping" in wim_content["config"]:
551 del wim_content["config"]["wim_port_mapping"]
552 if not wim_content["config"]:
553 del wim_content["config"]
554 if "wim_tenant_name" in wim_content:
555 wim_account_RO["wim_tenant_name"] = wim_content["wim_tenant_name"]
556 if "wim_password" in wim_content:
557 wim_account_RO["wim_password"] = wim_content["wim_password"]
558 if wim_content.get("wim_password"):
559 wim_account_RO["wim_password"] = self.db.decrypt(wim_content["wim_password"],
560 schema_version=schema_version,
561 salt=wim_id)
562 if "config" in wim_content:
563 wim_account_RO["config"] = wim_content["config"]
564 if wim_content.get("config"):
565 for p in self.wim_config_encrypted:
566 if wim_content["config"].get(p):
567 wim_account_RO["config"][p] = self.db.decrypt(wim_content["config"][p],
568 schema_version=schema_version,
569 salt=wim_id)
570
571 if "wim_user" in wim_content:
572 wim_content["wim_username"] = wim_content["wim_user"]
573 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
574 # wim_thread. RO will remove and relaunch a new thread for this wim_account
575 await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO)
576 db_wim_update["_admin.operationalState"] = "ENABLED"
577 # Mark the WIM 'edit' HA task as successful
578 operationState_HA = 'COMPLETED'
579 detailed_status_HA = 'Done'
580
581 self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id))
582 return
583
584 except (ROclient.ROClientException, DbException) as e:
585 self.logger.error(logging_text + "Exit Exception {}".format(e))
586 exc = e
587 except Exception as e:
588 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
589 exc = e
590 finally:
591 if exc and db_wim:
592 db_wim_update["_admin.operationalState"] = "ERROR"
593 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
594 # Mark the WIM 'edit' HA task as erroneous
595 operationState_HA = 'FAILED'
596 detailed_status_HA = "ERROR {}: {}".format(step, exc)
597 try:
598 if db_wim_update:
599 self.update_db_2("wim_accounts", wim_id, db_wim_update)
600 # Register the WIM 'edit' HA task either
601 # succesful or erroneous, or do nothing (if legacy NBI)
602 self.lcm_tasks.register_HA('wim', 'edit', op_id,
603 operationState=operationState_HA,
604 detailed_status=detailed_status_HA)
605 except DbException as e:
606 self.logger.error(logging_text + "Cannot update database: {}".format(e))
607 self.lcm_tasks.remove("wim_account", wim_id, order_id)
608
609 async def delete(self, wim_content, order_id):
610
611 # HA tasks and backward compatibility:
612 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
613 # In such a case, HA is not supported by NBI, and the HA check always returns True
614 op_id = wim_content.pop('op_id', None)
615 if not self.lcm_tasks.lock_HA('wim', 'delete', op_id):
616 return
617
618 wim_id = wim_content["_id"]
619 logging_text = "Task wim_delete={} ".format(wim_id)
620 self.logger.debug(logging_text + "Enter")
621
622 db_wim = None
623 db_wim_update = {}
624 exc = None
625 step = "Getting wim from db"
626 operationState_HA = ''
627 detailed_status_HA = ''
628 try:
629 # wait for any previous tasks in process
630 await self.lcm_tasks.waitfor_related_HA('wim', 'delete', op_id)
631
632 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
633 if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
634 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
635 RO = ROclient.ROClient(self.loop, **self.ro_config)
636 step = "Detaching wim from RO tenant"
637 try:
638 await RO.detach("wim_account", RO_wim_id)
639 except ROclient.ROClientException as e:
640 if e.http_code == 404: # not found
641 self.logger.debug(logging_text + "RO_wim_id={} already detached".format(RO_wim_id))
642 else:
643 raise
644
645 step = "Deleting wim from RO"
646 try:
647 await RO.delete("wim", RO_wim_id)
648 except ROclient.ROClientException as e:
649 if e.http_code == 404: # not found
650 self.logger.debug(logging_text + "RO_wim_id={} already deleted".format(RO_wim_id))
651 else:
652 raise
653 else:
654 # nothing to delete
655 self.logger.error(logging_text + "Nohing to remove at RO")
656 self.db.del_one("wim_accounts", {"_id": wim_id})
657 db_wim = None
658 self.logger.debug(logging_text + "Exit Ok")
659 return
660
661 except (ROclient.ROClientException, DbException) as e:
662 self.logger.error(logging_text + "Exit Exception {}".format(e))
663 exc = e
664 except Exception as e:
665 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
666 exc = e
667 finally:
668 self.lcm_tasks.remove("wim_account", wim_id, order_id)
669 if exc and db_wim:
670 db_wim_update["_admin.operationalState"] = "ERROR"
671 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
672 # Mark the WIM 'delete' HA task as erroneous
673 operationState_HA = 'FAILED'
674 detailed_status_HA = "ERROR {}: {}".format(step, exc)
675 self.lcm_tasks.register_HA('wim', 'delete', op_id,
676 operationState=operationState_HA,
677 detailed_status=detailed_status_HA)
678 try:
679 if db_wim and db_wim_update:
680 self.update_db_2("wim_accounts", wim_id, db_wim_update)
681 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
682 # which means that there is nowhere to register this task, so do nothing here.
683 except DbException as e:
684 self.logger.error(logging_text + "Cannot update database: {}".format(e))
685 self.lcm_tasks.remove("wim_account", wim_id, order_id)
686
687
688 class SdnLcm(LcmBase):
689
690 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
691 """
692 Init, Connect to database, filesystem storage, and messaging
693 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
694 :return: None
695 """
696
697 self.logger = logging.getLogger('lcm.sdn')
698 self.loop = loop
699 self.lcm_tasks = lcm_tasks
700 self.ro_config = ro_config
701
702 super().__init__(db, msg, fs, self.logger)
703
704 async def create(self, sdn_content, order_id):
705
706 # HA tasks and backward compatibility:
707 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
708 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
709 # Register 'create' task here for related future HA operations
710 op_id = sdn_content.pop('op_id', None)
711 self.lcm_tasks.lock_HA('sdn', 'create', op_id)
712
713 sdn_id = sdn_content["_id"]
714 sdn_content.pop("op_id", None)
715 logging_text = "Task sdn_create={} ".format(sdn_id)
716 self.logger.debug(logging_text + "Enter")
717
718 db_sdn = None
719 db_sdn_update = {}
720 RO_sdn_id = None
721 exc = None
722 operationState_HA = ''
723 detailed_status_HA = ''
724 try:
725 step = "Getting sdn from db"
726 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
727 db_sdn_update["_admin.deployed.RO"] = None
728
729 step = "Creating sdn at RO"
730 db_sdn_update["_admin.detailed-status"] = step
731 self.update_db_2("sdns", sdn_id, db_sdn_update)
732
733 RO = ROclient.ROClient(self.loop, **self.ro_config)
734 sdn_RO = deepcopy(sdn_content)
735 sdn_RO.pop("_id", None)
736 sdn_RO.pop("_admin", None)
737 schema_version = sdn_RO.pop("schema_version", None)
738 sdn_RO.pop("schema_type", None)
739 sdn_RO.pop("description", None)
740 if sdn_RO.get("password"):
741 sdn_RO["password"] = self.db.decrypt(sdn_RO["password"], schema_version=schema_version, salt=sdn_id)
742
743 desc = await RO.create("sdn", descriptor=sdn_RO)
744 RO_sdn_id = desc["uuid"]
745 db_sdn_update["_admin.deployed.RO"] = RO_sdn_id
746 db_sdn_update["_admin.operationalState"] = "ENABLED"
747 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
748 # Mark the SDN 'create' HA task as successful
749 operationState_HA = 'COMPLETED'
750 detailed_status_HA = 'Done'
751 return
752
753 except (ROclient.ROClientException, DbException) as e:
754 self.logger.error(logging_text + "Exit Exception {}".format(e))
755 exc = e
756 except Exception as e:
757 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
758 exc = e
759 finally:
760 if exc and db_sdn:
761 db_sdn_update["_admin.operationalState"] = "ERROR"
762 db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
763 # Mark the SDN 'create' HA task as erroneous
764 operationState_HA = 'FAILED'
765 detailed_status_HA = "ERROR {}: {}".format(step, exc)
766 try:
767 if db_sdn and db_sdn_update:
768 self.update_db_2("sdns", sdn_id, db_sdn_update)
769 # Register the SDN 'create' HA task either
770 # succesful or erroneous, or do nothing (if legacy NBI)
771 self.lcm_tasks.register_HA('sdn', 'create', op_id,
772 operationState=operationState_HA,
773 detailed_status=detailed_status_HA)
774 except DbException as e:
775 self.logger.error(logging_text + "Cannot update database: {}".format(e))
776 self.lcm_tasks.remove("sdn", sdn_id, order_id)
777
778 async def edit(self, sdn_content, order_id):
779
780 # HA tasks and backward compatibility:
781 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
782 # In such a case, HA is not supported by NBI, and the HA check always returns True
783 op_id = sdn_content.pop('op_id', None)
784 if not self.lcm_tasks.lock_HA('sdn', 'edit', op_id):
785 return
786
787 sdn_id = sdn_content["_id"]
788 sdn_content.pop("op_id", None)
789 logging_text = "Task sdn_edit={} ".format(sdn_id)
790 self.logger.debug(logging_text + "Enter")
791
792 db_sdn = None
793 db_sdn_update = {}
794 exc = None
795 operationState_HA = ''
796 detailed_status_HA = ''
797 step = "Getting sdn from db"
798 try:
799 # wait for any previous tasks in process
800 await self.lcm_tasks.waitfor_related_HA('sdn', 'edit', op_id)
801
802 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
803 RO_sdn_id = None
804 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
805 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
806 RO = ROclient.ROClient(self.loop, **self.ro_config)
807 step = "Editing sdn at RO"
808 sdn_RO = deepcopy(sdn_content)
809 sdn_RO.pop("_id", None)
810 sdn_RO.pop("_admin", None)
811 schema_version = sdn_RO.pop("schema_version", None)
812 sdn_RO.pop("schema_type", None)
813 sdn_RO.pop("description", None)
814 if sdn_RO.get("password"):
815 sdn_RO["password"] = self.db.decrypt(sdn_RO["password"], schema_version=schema_version, salt=sdn_id)
816 if sdn_RO:
817 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
818 db_sdn_update["_admin.operationalState"] = "ENABLED"
819 # Mark the SDN 'edit' HA task as successful
820 operationState_HA = 'COMPLETED'
821 detailed_status_HA = 'Done'
822
823 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
824 return
825
826 except (ROclient.ROClientException, DbException) as e:
827 self.logger.error(logging_text + "Exit Exception {}".format(e))
828 exc = e
829 except Exception as e:
830 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
831 exc = e
832 finally:
833 if exc and db_sdn:
834 db_sdn["_admin.operationalState"] = "ERROR"
835 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
836 # Mark the SDN 'edit' HA task as erroneous
837 operationState_HA = 'FAILED'
838 detailed_status_HA = "ERROR {}: {}".format(step, exc)
839 try:
840 if db_sdn_update:
841 self.update_db_2("sdns", sdn_id, db_sdn_update)
842 # Register the SDN 'edit' HA task either
843 # succesful or erroneous, or do nothing (if legacy NBI)
844 self.lcm_tasks.register_HA('sdn', 'edit', op_id,
845 operationState=operationState_HA,
846 detailed_status=detailed_status_HA)
847 except DbException as e:
848 self.logger.error(logging_text + "Cannot update database: {}".format(e))
849 self.lcm_tasks.remove("sdn", sdn_id, order_id)
850
851 async def delete(self, sdn_content, order_id):
852
853 # HA tasks and backward compatibility:
854 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
855 # In such a case, HA is not supported by NBI, and the HA check always returns True
856 op_id = sdn_content.pop('op_id', None)
857 if not self.lcm_tasks.lock_HA('sdn', 'delete', op_id):
858 return
859
860 sdn_id = sdn_content["_id"]
861 logging_text = "Task sdn_delete={} ".format(sdn_id)
862 self.logger.debug(logging_text + "Enter")
863
864 db_sdn = None
865 db_sdn_update = {}
866 exc = None
867 operationState_HA = ''
868 detailed_status_HA = ''
869 step = "Getting sdn from db"
870 try:
871 # wait for any previous tasks in process
872 await self.lcm_tasks.waitfor_related_HA('sdn', 'delete', op_id)
873
874 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
875 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
876 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
877 RO = ROclient.ROClient(self.loop, **self.ro_config)
878 step = "Deleting sdn from RO"
879 try:
880 await RO.delete("sdn", RO_sdn_id)
881 except ROclient.ROClientException as e:
882 if e.http_code == 404: # not found
883 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
884 else:
885 raise
886 else:
887 # nothing to delete
888 self.logger.error(logging_text + "Skipping. There is not RO information at database")
889 self.db.del_one("sdns", {"_id": sdn_id})
890 db_sdn = None
891 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
892 return
893
894 except (ROclient.ROClientException, DbException) as e:
895 self.logger.error(logging_text + "Exit Exception {}".format(e))
896 exc = e
897 except Exception as e:
898 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
899 exc = e
900 finally:
901 if exc and db_sdn:
902 db_sdn["_admin.operationalState"] = "ERROR"
903 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
904 # Mark the SDN 'delete' HA task as erroneous
905 operationState_HA = 'FAILED'
906 detailed_status_HA = "ERROR {}: {}".format(step, exc)
907 self.lcm_tasks.register_HA('sdn', 'delete', op_id,
908 operationState=operationState_HA,
909 detailed_status=detailed_status_HA)
910 try:
911 if db_sdn and db_sdn_update:
912 self.update_db_2("sdns", sdn_id, db_sdn_update)
913 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
914 # which means that there is nowhere to register this task, so do nothing here.
915 except DbException as e:
916 self.logger.error(logging_text + "Cannot update database: {}".format(e))
917 self.lcm_tasks.remove("sdn", sdn_id, order_id)
918
919
920 class K8sClusterLcm(LcmBase):
921
922 def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
923 """
924 Init, Connect to database, filesystem storage, and messaging
925 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
926 :return: None
927 """
928
929 self.logger = logging.getLogger('lcm.k8scluster')
930 self.loop = loop
931 self.lcm_tasks = lcm_tasks
932 self.vca_config = vca_config
933 self.fs = fs
934 self.db = db
935
936 self.k8scluster = K8sHelmConnector(
937 kubectl_command=self.vca_config.get("kubectlpath"),
938 helm_command=self.vca_config.get("helmpath"),
939 fs=self.fs,
940 log=self.logger,
941 db=self.db,
942 on_update_db=None
943 )
944
945 super().__init__(db, msg, fs, self.logger)
946
947 async def create(self, k8scluster_content, order_id):
948
949 # HA tasks and backward compatibility:
950 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
951 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
952 # Register 'create' task here for related future HA operations
953 op_id = k8scluster_content.pop('op_id', None)
954 if not self.lcm_tasks.lock_HA('k8scluster', 'create', op_id):
955 return
956
957 k8scluster_id = k8scluster_content["_id"]
958 k8scluster_content.pop("op_id", None)
959 logging_text = "Task k8scluster_create={} ".format(k8scluster_id)
960 self.logger.debug(logging_text + "Enter")
961
962 db_k8scluster = None
963 db_k8scluster_update = {}
964
965 exc = None
966 operationState_HA = ''
967 detailed_status_HA = ''
968 try:
969 step = "Getting k8scluster-id='{}' from db".format(k8scluster_id)
970 self.logger.debug(logging_text + step)
971 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
972 self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'],
973 schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"])
974 print(db_k8scluster.get("credentials"))
975 print("\n\n\n FIN CREDENTIALS")
976 print(yaml.safe_dump(db_k8scluster.get("credentials")))
977 print("\n\n\n FIN OUTPUT")
978 cluster_uuid, uninstall_sw = await self.k8scluster.init_env(yaml.safe_dump(db_k8scluster.
979 get("credentials")))
980 db_k8scluster_update["cluster-uuid"] = cluster_uuid
981 if uninstall_sw:
982 db_k8scluster_update["uninstall-sw"] = uninstall_sw
983 step = "Getting the list of repos"
984 self.logger.debug(logging_text + step)
985 task_list = []
986 db_k8srepo_list = self.db.get_list("k8srepos", {})
987 for repo in db_k8srepo_list:
988 step = "Adding repo {} to cluster: {}".format(repo["name"], cluster_uuid)
989 self.logger.debug(logging_text + step)
990 task = asyncio.ensure_future(self.k8scluster.repo_add(cluster_uuid=cluster_uuid,
991 name=repo["name"], url=repo["url"],
992 repo_type="chart"))
993 task_list.append(task)
994 if not repo["_admin"].get("cluster-inserted"):
995 repo["_admin"]["cluster-inserted"] = []
996 repo["_admin"]["cluster-inserted"].append(cluster_uuid)
997 self.update_db_2("k8srepos", repo["_id"], repo)
998
999 done = None
1000 pending = None
1001 if len(task_list) > 0:
1002 self.logger.debug('Waiting for terminate pending tasks...')
1003 done, pending = await asyncio.wait(task_list, timeout=3600)
1004 if not pending:
1005 self.logger.debug('All tasks finished...')
1006 else:
1007 self.logger.info('There are pending tasks: {}'.format(pending))
1008 db_k8scluster_update["_admin.operationalState"] = "ENABLED"
1009
1010 except Exception as e:
1011 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1012 exc = e
1013 finally:
1014 if exc and db_k8scluster:
1015 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1016 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1017 # Mark the k8scluster 'create' HA task as erroneous
1018 operationState_HA = 'FAILED'
1019 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1020 try:
1021 if db_k8scluster_update:
1022 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1023 # Register the K8scluster 'create' HA task either
1024 # succesful or erroneous, or do nothing (if legacy NBI)
1025 self.lcm_tasks.register_HA('k8scluster', 'create', op_id,
1026 operationState=operationState_HA,
1027 detailed_status=detailed_status_HA)
1028 except DbException as e:
1029 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1030 self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
1031
1032 async def delete(self, k8scluster_content, order_id):
1033
1034 # HA tasks and backward compatibility:
1035 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1036 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1037 # Register 'delete' task here for related future HA operations
1038 op_id = k8scluster_content.pop('op_id', None)
1039 if not self.lcm_tasks.lock_HA('k8scluster', 'delete', op_id):
1040 return
1041
1042 k8scluster_id = k8scluster_content["_id"]
1043 k8scluster_content.pop("op_id", None)
1044 logging_text = "Task k8scluster_delete={} ".format(k8scluster_id)
1045 self.logger.debug(logging_text + "Enter")
1046
1047 db_k8scluster = None
1048 db_k8scluster_update = {}
1049 exc = None
1050 operationState_HA = ''
1051 detailed_status_HA = ''
1052 try:
1053 step = "Getting k8scluster='{}' from db".format(k8scluster_id)
1054 self.logger.debug(logging_text + step)
1055 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
1056 uninstall_sw = db_k8scluster.get("uninstall-sw")
1057 if uninstall_sw is False or uninstall_sw is None:
1058 uninstall_sw = False
1059 cluster_removed = await self.k8scluster.reset(cluster_uuid=db_k8scluster.get("cluster-uuid"),
1060 uninstall_sw=uninstall_sw)
1061
1062 if cluster_removed:
1063 step = "Removing k8scluster='{}' from db".format(k8scluster_id)
1064 self.logger.debug(logging_text + step)
1065 db_k8srepo_list = self.db.get_list("k8srepos", {})
1066 for k8srepo in db_k8srepo_list:
1067 index = 0
1068 for cluster in k8srepo["_admin"]["cluster-inserted"]:
1069 if db_k8scluster.get("cluster-uuid") == cluster:
1070 del(k8srepo["_admin"]["cluster-inserted"][index])
1071 break
1072 index += 1
1073 self.update_db_2("k8srepos", k8srepo["_id"], k8srepo)
1074 self.db.del_one("k8sclusters", {"_id": k8scluster_id})
1075 else:
1076 raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id))
1077 # if not cluster_removed:
1078 # raise Exception("K8scluster was not properly removed")
1079
1080 except Exception as e:
1081 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1082 exc = e
1083 finally:
1084 if exc and db_k8scluster:
1085 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1086 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1087 # Mark the WIM 'create' HA task as erroneous
1088 operationState_HA = 'FAILED'
1089 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1090 try:
1091 if db_k8scluster_update:
1092 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1093 # Register the K8scluster 'delete' HA task either
1094 # succesful or erroneous, or do nothing (if legacy NBI)
1095 self.lcm_tasks.register_HA('k8scluster', 'delete', op_id,
1096 operationState=operationState_HA,
1097 detailed_status=detailed_status_HA)
1098 except DbException as e:
1099 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1100 self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
1101
1102
1103 class K8sRepoLcm(LcmBase):
1104
1105 def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
1106 """
1107 Init, Connect to database, filesystem storage, and messaging
1108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1109 :return: None
1110 """
1111
1112 self.logger = logging.getLogger('lcm.k8srepo')
1113 self.loop = loop
1114 self.lcm_tasks = lcm_tasks
1115 self.vca_config = vca_config
1116 self.fs = fs
1117 self.db = db
1118
1119 self.k8srepo = K8sHelmConnector(
1120 kubectl_command=self.vca_config.get("kubectlpath"),
1121 helm_command=self.vca_config.get("helmpath"),
1122 fs=self.fs,
1123 log=self.logger,
1124 db=self.db,
1125 on_update_db=None
1126 )
1127
1128 super().__init__(db, msg, fs, self.logger)
1129
1130 async def create(self, k8srepo_content, order_id):
1131
1132 # HA tasks and backward compatibility:
1133 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1134 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1135 # Register 'create' task here for related future HA operations
1136
1137 op_id = k8srepo_content.pop('op_id', None)
1138 if not self.lcm_tasks.lock_HA('k8srepo', 'create', op_id):
1139 return
1140
1141 k8srepo_id = k8srepo_content.get("_id")
1142 logging_text = "Task k8srepo_create={} ".format(k8srepo_id)
1143 self.logger.debug(logging_text + "Enter")
1144
1145 db_k8srepo = None
1146 db_k8srepo_update = {}
1147 exc = None
1148 operationState_HA = ''
1149 detailed_status_HA = ''
1150 try:
1151 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1152 self.logger.debug(logging_text + step)
1153 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1154 step = "Getting k8scluster_list from db"
1155 self.logger.debug(logging_text + step)
1156 db_k8scluster_list = self.db.get_list("k8sclusters", {})
1157 db_k8srepo_update["_admin.cluster-inserted"] = []
1158 task_list = []
1159 for k8scluster in db_k8scluster_list:
1160 step = "Adding repo to cluster: {}".format(k8scluster["cluster-uuid"])
1161 self.logger.debug(logging_text + step)
1162 task = asyncio.ensure_future(self.k8srepo.repo_add(cluster_uuid=k8scluster["cluster-uuid"],
1163 name=db_k8srepo["name"], url=db_k8srepo["url"],
1164 repo_type="chart"))
1165 task_list.append(task)
1166 db_k8srepo_update["_admin.cluster-inserted"].append(k8scluster["cluster-uuid"])
1167
1168 done = None
1169 pending = None
1170 if len(task_list) > 0:
1171 self.logger.debug('Waiting for terminate pending tasks...')
1172 done, pending = await asyncio.wait(task_list, timeout=3600)
1173 if not pending:
1174 self.logger.debug('All tasks finished...')
1175 else:
1176 self.logger.info('There are pending tasks: {}'.format(pending))
1177 db_k8srepo_update["_admin.operationalState"] = "ENABLED"
1178 except Exception as e:
1179 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1180 exc = e
1181 finally:
1182 if exc and db_k8srepo:
1183 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1184 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1185 # Mark the WIM 'create' HA task as erroneous
1186 operationState_HA = 'FAILED'
1187 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1188 try:
1189 if db_k8srepo_update:
1190 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1191 # Register the K8srepo 'create' HA task either
1192 # succesful or erroneous, or do nothing (if legacy NBI)
1193 self.lcm_tasks.register_HA('k8srepo', 'create', op_id,
1194 operationState=operationState_HA,
1195 detailed_status=detailed_status_HA)
1196 except DbException as e:
1197 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1198 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
1199
1200 async def delete(self, k8srepo_content, order_id):
1201
1202 # HA tasks and backward compatibility:
1203 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1204 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1205 # Register 'delete' task here for related future HA operations
1206 op_id = k8srepo_content.pop('op_id', None)
1207 if not self.lcm_tasks.lock_HA('k8srepo', 'delete', op_id):
1208 return
1209
1210 k8srepo_id = k8srepo_content.get("_id")
1211 logging_text = "Task k8srepo_delete={} ".format(k8srepo_id)
1212 self.logger.debug(logging_text + "Enter")
1213
1214 db_k8srepo = None
1215 db_k8srepo_update = {}
1216
1217 operationState_HA = ''
1218 detailed_status_HA = ''
1219 try:
1220 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1221 self.logger.debug(logging_text + step)
1222 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1223 step = "Getting k8scluster_list from db"
1224 self.logger.debug(logging_text + step)
1225 db_k8scluster_list = self.db.get_list("k8sclusters", {})
1226
1227 task_list = []
1228 for k8scluster in db_k8scluster_list:
1229 task = asyncio.ensure_future(self.k8srepo.repo_remove(cluster_uuid=k8scluster["cluster-uuid"],
1230 name=db_k8srepo["name"]))
1231 task_list.append(task)
1232 done = None
1233 pending = None
1234 if len(task_list) > 0:
1235 self.logger.debug('Waiting for terminate pending tasks...')
1236 done, pending = await asyncio.wait(task_list, timeout=3600)
1237 if not pending:
1238 self.logger.debug('All tasks finished...')
1239 else:
1240 self.logger.info('There are pending tasks: {}'.format(pending))
1241 self.db.del_one("k8srepos", {"_id": k8srepo_id})
1242
1243 except Exception as e:
1244 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1245 exc = e
1246 finally:
1247 if exc and db_k8srepo:
1248 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1249 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1250 # Mark the WIM 'create' HA task as erroneous
1251 operationState_HA = 'FAILED'
1252 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1253 try:
1254 if db_k8srepo_update:
1255 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1256 # Register the K8srepo 'delete' HA task either
1257 # succesful or erroneous, or do nothing (if legacy NBI)
1258 self.lcm_tasks.register_HA('k8srepo', 'delete', op_id,
1259 operationState=operationState_HA,
1260 detailed_status=detailed_status_HA)
1261 except DbException as e:
1262 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1263 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)