blob: 4272cfd06a5be986239e8f671a75edf0de450771 [file] [log] [blame]
rshri932105f2024-07-05 15:11:55 +00001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = (
17 "Shrinithi R <shrinithi.r@tataelxsi.co.in>",
18 "Shahithya Y <shahithya.y@tataelxsi.co.in>",
19)
20
21import logging
22from osm_lcm.lcm_utils import LcmBase
23from copy import deepcopy
24from osm_lcm import odu_workflows
25from osm_lcm import vim_sdn
26
27
28class ClusterLcm(LcmBase):
29 db_topic = "clusters"
30
31 def __init__(self, msg, lcm_tasks, config):
32 """
33 Init, Connect to database, filesystem storage, and messaging
34 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
35 :return: None
36 """
37
38 self.logger = logging.getLogger("lcm.clusterlcm")
39 self.lcm_tasks = lcm_tasks
40 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
41 self.regist = vim_sdn.K8sClusterLcm(msg, self.lcm_tasks, config)
42
43 super().__init__(msg, self.logger)
44
45 async def create(self, content, order_id):
46 self.logger.info("cluster Create Enter")
47
48 workflow_name = self.odu.launch_workflow("create_cluster", content)
49 self.logger.info("workflow_name is :{}".format(workflow_name))
50
51 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
52 self.logger.info(
53 "workflow_status is :{} and workflow_msg is :{}".format(
54 workflow_status, workflow_msg
55 )
56 )
57 if workflow_status:
58 content["state"] = "CREATED"
59 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
60 else:
61 content["state"] = "FAILED_CREATION"
62 content["resourceState"] = "ERROR"
63 # has to call update_operation_history return content
64 content = self.update_operation_history(content, workflow_status, None)
65 self.db.set_one("clusters", {"_id": content["_id"]}, content)
66
67 if workflow_status:
68 resource_status, resource_msg = self.odu.check_resource_status(
69 "create_cluster", content
70 )
71 self.logger.info(
72 "resource_status is :{} and resource_msg is :{}".format(
73 resource_status, resource_msg
74 )
75 )
76 if resource_status:
77 content["resourceState"] = "READY"
78 else:
79 content["resourceState"] = "ERROR"
80
81 content["operatingState"] = "IDLE"
82 content = self.update_operation_history(
83 content, workflow_status, resource_status
84 )
85 self.db.set_one("clusters", {"_id": content["_id"]}, content)
86 self.profile_state(content, workflow_status, resource_status)
87 return
88
89 def profile_state(self, content, workflow_status, resource_status):
90 profiles = [
91 "infra_controller_profiles",
92 "infra_config_profiles",
93 "app_profiles",
94 "resource_profiles",
95 ]
96 profiles_collection = {
97 "infra_controller_profiles": "k8sinfra_controller",
98 "infra_config_profiles": "k8sinfra_config",
99 "app_profiles": "k8sapp",
100 "resource_profiles": "k8sresource",
101 }
102 for profile_type in profiles:
103 profile_id = content[profile_type]
104 self.logger.info("profile id is : {}".format(profile_id))
105 db_collection = profiles_collection[profile_type]
106 self.logger.info("the db_collection is :{}".format(db_collection))
107 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
108 self.logger.info("the db_profile is :{}".format(db_profile))
109 db_profile["state"] = content["state"]
110 db_profile["resourceState"] = content["resourceState"]
111 db_profile["operatingState"] = content["operatingState"]
112 db_profile = self.update_operation_history(
113 db_profile, workflow_status, resource_status
114 )
115 self.logger.info("the db_profile is :{}".format(db_profile))
116 self.db.set_one(db_collection, {"_id": db_profile["_id"]}, db_profile)
117
118 async def delete(self, content, order_id):
119 self.logger.info("cluster delete Enter")
120 items = self.db.get_one("clusters", {"_id": content["_id"]})
121
122 workflow_name = self.odu.launch_workflow("delete_cluster", content)
123 self.logger.info("workflow_name is :{}".format(workflow_name))
124
125 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
126 self.logger.info(
127 "workflow_status is :{} and workflow_msg is :{}".format(
128 workflow_status, workflow_msg
129 )
130 )
131 if workflow_status:
132 items["state"] = "DELETED"
133 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
134 else:
135 items["state"] = "FAILED_DELETION"
136 items["resourceState"] = "ERROR"
137 # has to call update_operation_history return content
138 items = self.update_operation_history(items, workflow_status, None)
139 self.db.set_one("clusters", {"_id": content["_id"]}, items)
140
141 if workflow_status:
142 resource_status, resource_msg = self.odu.check_resource_status(
143 "delete_cluster", content
144 )
145 self.logger.info(
146 "resource_status is :{} and resource_msg is :{}".format(
147 resource_status, resource_msg
148 )
149 )
150 if resource_status:
151 items["resourceState"] = "READY"
152 else:
153 items["resourceState"] = "ERROR"
154
155 items["operatingState"] = "IDLE"
156 items = self.update_operation_history(items, workflow_status, resource_status)
157 self.db.set_one("clusters", {"_id": content["_id"]}, items)
158
159 # To delete it from dB
160 if items["state"] == "DELETED":
161 self.delete_cluster(content, order_id)
162 return
163
164 def delete_cluster(self, content, order_id):
165 item_content = self.db.get_one("clusters", {"_id": content["_id"]})
166 self.logger.info("1_the item_content is : {}".format(item_content))
167
168 self.logger.info("it is getting into if item_content state")
169 # detach profiles
170 update_dict = None
171 profiles_to_detach = [
172 "infra_controller_profiles",
173 "infra_config_profiles",
174 "app_profiles",
175 "resource_profiles",
176 ]
177 profiles_collection = {
178 "infra_controller_profiles": "k8sinfra_controller",
179 "infra_config_profiles": "k8sinfra_config",
180 "app_profiles": "k8sapp",
181 "resource_profiles": "k8sresource",
182 }
183 for profile_type in profiles_to_detach:
184 if item_content.get(profile_type):
185 self.logger.info("the profile_type is :{}".format(profile_type))
186 profile_ids = item_content[profile_type]
187 self.logger.info("the profile_ids is :{}".format(profile_ids))
188 profile_ids_copy = deepcopy(profile_ids)
189 self.logger.info("the profile_ids_copy is :{}".format(profile_ids_copy))
190 for profile_id in profile_ids_copy:
191 self.logger.info("the profile_id is :{}".format(profile_id))
192 db_collection = profiles_collection[profile_type]
193 self.logger.info("the db_collection is :{}".format(db_collection))
194 db_profile = self.db.get_one(db_collection, {"_id": profile_id})
195 self.logger.info("the db_profile is :{}".format(db_profile))
196 self.logger.info(
197 "the item_content name is :{}".format(item_content["name"])
198 )
199 self.logger.info(
200 "the db_profile name is :{}".format(db_profile["name"])
201 )
202 if item_content["name"] == db_profile["name"]:
203 self.logger.info("it is getting into if default")
204 self.db.del_one(db_collection, {"_id": profile_id})
205 else:
206 self.logger.info("it is getting into else non default")
207 profile_ids.remove(profile_id)
208 update_dict = {profile_type: profile_ids}
209 self.logger.info(f"the update dict is :{update_dict}")
210 self.db.set_one(
211 "clusters", {"_id": content["_id"]}, update_dict
212 )
213 self.db.del_one("clusters", {"_id": item_content["_id"]})
214 self.logger.info("the id is :{}".format(content["_id"]))
215
216 async def add(self, content, order_id):
217 self.logger.info("profile attach Enter")
218 db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
219 profile_type = content["profile_type"]
220 self.logger.info("profile type is : {}".format(profile_type))
221 profile_id = content["profile_id"]
222 self.logger.info("profile id is : {}".format(profile_id))
223
224 workflow_name = self.odu.launch_workflow("attach_profile_to_cluster", content)
225 self.logger.info("workflow_name is :{}".format(workflow_name))
226
227 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
228 self.logger.info(
229 "workflow_status is :{} and workflow_msg is :{}".format(
230 workflow_status, workflow_msg
231 )
232 )
233 if workflow_status:
234 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
235 else:
236 db_cluster["resourceState"] = "ERROR"
237 # has to call update_operation_history return content
238 db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
239 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
240
241 if workflow_status:
242 resource_status, resource_msg = self.odu.check_resource_status(
243 "attach_profile_to_cluster", content
244 )
245 self.logger.info(
246 "resource_status is :{} and resource_msg is :{}".format(
247 resource_status, resource_msg
248 )
249 )
250 if resource_status:
251 db_cluster["resourceState"] = "READY"
252 else:
253 db_cluster["resourceState"] = "ERROR"
254
255 db_cluster["operatingState"] = "IDLE"
256 db_cluster = self.update_operation_history(
257 db_cluster, workflow_status, resource_status
258 )
259 profiles_collection = {
260 "infra_controller_profiles": "k8sinfra_controller",
261 "infra_config_profiles": "k8sinfra_config",
262 "app_profiles": "k8sapp",
263 "resource_profiles": "k8sresource",
264 }
265 db_collection = profiles_collection[profile_type]
266 self.logger.info("db_collection is : {}".format(db_collection))
267 profile_list = db_cluster[profile_type]
268 self.logger.info("profile list is : {}".format(profile_list))
269 if resource_status:
270 self.logger.info("it is getting into resource status true")
271 profile_list.append(profile_id)
272 self.logger.info("profile list is : {}".format(profile_list))
273 db_cluster[profile_type] = profile_list
274 self.logger.info("db cluster is : {}".format(db_cluster))
275 # update_dict = {item: profile_list}
276 # self.logger.info("the update_dict is :{}".format(update_dict))
277 # self.db.set_one(self.topic, filter_q, update_dict)
278 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
279
280 return
281
282 async def remove(self, content, order_id):
283 self.logger.info("profile dettach Enter")
284 db_cluster = self.db.get_one("clusters", {"_id": content["_id"]})
285 profile_type = content["profile_type"]
286 self.logger.info("profile type is : {}".format(profile_type))
287 profile_id = content["profile_id"]
288 self.logger.info("profile id is : {}".format(profile_id))
289
290 workflow_name = self.odu.launch_workflow("detach_profile_from_cluster", content)
291 self.logger.info("workflow_name is :{}".format(workflow_name))
292
293 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
294 self.logger.info(
295 "workflow_status is :{} and workflow_msg is :{}".format(
296 workflow_status, workflow_msg
297 )
298 )
299 if workflow_status:
300 db_cluster["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
301 else:
302 db_cluster["resourceState"] = "ERROR"
303 # has to call update_operation_history return content
304 db_cluster = self.update_operation_history(db_cluster, workflow_status, None)
305 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
306
307 if workflow_status:
308 resource_status, resource_msg = self.odu.check_resource_status(
309 "detach_profile_from_cluster", content
310 )
311 self.logger.info(
312 "resource_status is :{} and resource_msg is :{}".format(
313 resource_status, resource_msg
314 )
315 )
316 if resource_status:
317 db_cluster["resourceState"] = "READY"
318 else:
319 db_cluster["resourceState"] = "ERROR"
320
321 db_cluster["operatingState"] = "IDLE"
322 db_cluster = self.update_operation_history(
323 db_cluster, workflow_status, resource_status
324 )
325 profiles_collection = {
326 "infra_controller_profiles": "k8sinfra_controller",
327 "infra_config_profiles": "k8sinfra_config",
328 "app_profiles": "k8sapp",
329 "resource_profiles": "k8sresource",
330 }
331 db_collection = profiles_collection[profile_type]
332 self.logger.info("db_collection is : {}".format(db_collection))
333 profile_list = db_cluster[profile_type]
334 self.logger.info("profile list is : {}".format(profile_list))
335 if resource_status:
336 self.logger.info("it is getting into resource status true")
337 profile_list.remove(profile_id)
338 self.logger.info("profile list is : {}".format(profile_list))
339 db_cluster[profile_type] = profile_list
340 self.logger.info("db cluster is : {}".format(db_cluster))
341 # update_dict = {item: profile_list}
342 # self.logger.info("the update_dict is :{}".format(update_dict))
343 # self.db.set_one(self.topic, filter_q, update_dict)
344 self.db.set_one("clusters", {"_id": db_cluster["_id"]}, db_cluster)
345
346 return
347
348 async def register(self, content, order_id):
349 self.logger.info("cluster register enter")
350
351 workflow_name = self.odu.launch_workflow("register_cluster", content)
352 self.logger.info("workflow_name is :{}".format(workflow_name))
353
354 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
355 self.logger.info(
356 "workflow_status is :{} and workflow_msg is :{}".format(
357 workflow_status, workflow_msg
358 )
359 )
360 if workflow_status:
361 content["state"] = "CREATED"
362 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
363 else:
364 content["state"] = "FAILED_CREATION"
365 content["resourceState"] = "ERROR"
366 # has to call update_operation_history return content
367 content = self.update_operation_history(content, workflow_status, None)
368 self.db.set_one("clusters", {"_id": content["_id"]}, content)
369
370 if workflow_status:
371 resource_status, resource_msg = self.odu.check_resource_status(
372 "register_cluster", content
373 )
374 self.logger.info(
375 "resource_status is :{} and resource_msg is :{}".format(
376 resource_status, resource_msg
377 )
378 )
379 if resource_status:
380 content["resourceState"] = "READY"
381 else:
382 content["resourceState"] = "ERROR"
383
384 content["operatingState"] = "IDLE"
385 content = self.update_operation_history(
386 content, workflow_status, resource_status
387 )
388 self.db.set_one("clusters", {"_id": content["_id"]}, content)
389 self.profile_state(content, workflow_status, resource_status)
390 return
391
392 async def deregister(self, content, order_id):
393 self.logger.info("cluster deregister enter")
394
395 items = self.db.get_one("clusters", {"_id": content["_id"]})
396 self.logger.info("the items is : {}".format(items))
397
398 workflow_name = self.odu.launch_workflow("deregister_cluster", content)
399 self.logger.info("workflow_name is :{}".format(workflow_name))
400
401 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
402 self.logger.info(
403 "workflow_status is :{} and workflow_msg is :{}".format(
404 workflow_status, workflow_msg
405 )
406 )
407 if workflow_status:
408 items["state"] = "DELETED"
409 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
410 else:
411 items["state"] = "FAILED_DELETION"
412 items["resourceState"] = "ERROR"
413 # has to call update_operation_history return content
414 items = self.update_operation_history(items, workflow_status, None)
415 self.db.set_one("clusters", {"_id": content["_id"]}, items)
416
417 if workflow_status:
418 resource_status, resource_msg = self.odu.check_resource_status(
419 "deregister_cluster", content
420 )
421 self.logger.info(
422 "resource_status is :{} and resource_msg is :{}".format(
423 resource_status, resource_msg
424 )
425 )
426 if resource_status:
427 items["resourceState"] = "READY"
428 else:
429 items["resourceState"] = "ERROR"
430
431 items["operatingState"] = "IDLE"
432 items = self.update_operation_history(items, workflow_status, resource_status)
433 self.db.set_one("clusters", {"_id": content["_id"]}, items)
434
435 # To delete it from dB
436 if items["state"] == "DELETED":
437 self.db.del_one("clusters", {"_id": items["_id"]})
438 return
439
440
441class K8sAppLcm(LcmBase):
442 def __init__(self, msg, lcm_tasks, config):
443 """
444 Init, Connect to database, filesystem storage, and messaging
445 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
446 :return: None
447 """
448
449 self.logger = logging.getLogger("lcm.clusterlcm")
450 self.lcm_tasks = lcm_tasks
451 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
452
453 super().__init__(msg, self.logger)
454
455 async def create(self, content, order_id):
456 self.logger.info("App Create Enter")
457
458 workflow_name = self.odu.launch_workflow("create_profile", content)
459 self.logger.info("workflow_name is :{}".format(workflow_name))
460
461 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
462 self.logger.info(
463 "workflow_status is :{} and workflow_msg is :{}".format(
464 workflow_status, workflow_msg
465 )
466 )
467 if workflow_status:
468 content["state"] = "CREATED"
469 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
470 else:
471 content["state"] = "FAILED_CREATION"
472 content["resourceState"] = "ERROR"
473 # has to call update_operation_history return content
474 content = self.update_operation_history(content, workflow_status, None)
475 self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
476
477 if workflow_status:
478 resource_status, resource_msg = self.odu.check_resource_status(
479 "create_profile", content
480 )
481 self.logger.info(
482 "resource_status is :{} and resource_msg is :{}".format(
483 resource_status, resource_msg
484 )
485 )
486 if resource_status:
487 content["resourceState"] = "READY"
488 else:
489 content["resourceState"] = "ERROR"
490
491 content["operatingState"] = "IDLE"
492 content = self.update_operation_history(
493 content, workflow_status, resource_status
494 )
495 self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
496
497 return
498
499 async def delete(self, content, order_id):
500 self.logger.info("App delete Enter")
501 items = self.db.get_one("k8sapp", {"_id": content["_id"]})
502
503 workflow_name = self.odu.launch_workflow("delete_profile", content)
504 self.logger.info("workflow_name is :{}".format(workflow_name))
505
506 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
507 self.logger.info(
508 "workflow_status is :{} and workflow_msg is :{}".format(
509 workflow_status, workflow_msg
510 )
511 )
512 if workflow_status:
513 items["state"] = "DELETED"
514 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
515 else:
516 items["state"] = "FAILED_DELETION"
517 items["resourceState"] = "ERROR"
518 # has to call update_operation_history return content
519 items = self.update_operation_history(items, workflow_status, None)
520 self.db.set_one("k8sapp", {"_id": content["_id"]}, content)
521
522 if workflow_status:
523 resource_status, resource_msg = self.odu.check_resource_status(
524 "delete_profile", content
525 )
526 self.logger.info(
527 "resource_status is :{} and resource_msg is :{}".format(
528 resource_status, resource_msg
529 )
530 )
531 if resource_status:
532 items["resourceState"] = "READY"
533 else:
534 items["resourceState"] = "ERROR"
535
536 items["operatingState"] = "IDLE"
537 items = self.update_operation_history(items, workflow_status, resource_status)
538 self.db.set_one("k8sapp", {"_id": content["_id"]}, items)
539
540 # To delete it from dB
541 if items["state"] == "DELETED":
542 self.db.del_one("k8sapp", {"_id": content["_id"]})
543 return
544
545
546class K8sResourceLcm(LcmBase):
547 def __init__(self, msg, lcm_tasks, config):
548 """
549 Init, Connect to database, filesystem storage, and messaging
550 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
551 :return: None
552 """
553
554 self.logger = logging.getLogger("lcm.clusterlcm")
555 self.lcm_tasks = lcm_tasks
556 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
557
558 super().__init__(msg, self.logger)
559
560 async def create(self, content, order_id):
561 self.logger.info("Resource Create Enter")
562
563 workflow_name = self.odu.launch_workflow("create_profile", content)
564 self.logger.info("workflow_name is :{}".format(workflow_name))
565
566 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
567 self.logger.info(
568 "workflow_status is :{} and workflow_msg is :{}".format(
569 workflow_status, workflow_msg
570 )
571 )
572 if workflow_status:
573 content["state"] = "CREATED"
574 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
575 else:
576 content["state"] = "FAILED_CREATION"
577 content["resourceState"] = "ERROR"
578 # has to call update_operation_history return content
579 content = self.update_operation_history(content, workflow_status, None)
580 self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
581
582 if workflow_status:
583 resource_status, resource_msg = self.odu.check_resource_status(
584 "create_profile", content
585 )
586 self.logger.info(
587 "resource_status is :{} and resource_msg is :{}".format(
588 resource_status, resource_msg
589 )
590 )
591 if resource_status:
592 content["resourceState"] = "READY"
593 else:
594 content["resourceState"] = "ERROR"
595
596 content["operatingState"] = "IDLE"
597 content = self.update_operation_history(
598 content, workflow_status, resource_status
599 )
600 self.db.set_one("k8sresource", {"_id": content["_id"]}, content)
601
602 return
603
604 async def delete(self, content, order_id):
605 self.logger.info("Resource delete Enter")
606 items = self.db.get_one("k8sresource", {"_id": content["_id"]})
607
608 workflow_name = self.odu.launch_workflow("delete_profile", content)
609 self.logger.info("workflow_name is :{}".format(workflow_name))
610
611 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
612 self.logger.info(
613 "workflow_status is :{} and workflow_msg is :{}".format(
614 workflow_status, workflow_msg
615 )
616 )
617 if workflow_status:
618 items["state"] = "DELETED"
619 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
620 else:
621 items["state"] = "FAILED_DELETION"
622 items["resourceState"] = "ERROR"
623 # has to call update_operation_history return content
624 items = self.update_operation_history(items, workflow_status, None)
625 self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
626
627 if workflow_status:
628 resource_status, resource_msg = self.odu.check_resource_status(
629 "delete_profile", content
630 )
631 self.logger.info(
632 "resource_status is :{} and resource_msg is :{}".format(
633 resource_status, resource_msg
634 )
635 )
636 if resource_status:
637 items["resourceState"] = "READY"
638 else:
639 items["resourceState"] = "ERROR"
640
641 items["operatingState"] = "IDLE"
642 items = self.update_operation_history(items, workflow_status, resource_status)
643 self.db.set_one("k8sresource", {"_id": content["_id"]}, items)
644
645 # To delete it from dB
646 if items["state"] == "DELETED":
647 self.db.del_one("k8sresource", {"_id": content["_id"]})
648 return
649
650
651class K8sInfraControllerLcm(LcmBase):
652 def __init__(self, msg, lcm_tasks, config):
653 """
654 Init, Connect to database, filesystem storage, and messaging
655 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
656 :return: None
657 """
658
659 self.logger = logging.getLogger("lcm.clusterlcm")
660 self.lcm_tasks = lcm_tasks
661 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
662
663 super().__init__(msg, self.logger)
664
665 async def create(self, content, order_id):
666 self.logger.info("Infra controller Create Enter")
667
668 workflow_name = self.odu.launch_workflow("create_profile", content)
669 self.logger.info("workflow_name is :{}".format(workflow_name))
670
671 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
672 self.logger.info(
673 "workflow_status is :{} and workflow_msg is :{}".format(
674 workflow_status, workflow_msg
675 )
676 )
677 if workflow_status:
678 content["state"] = "CREATED"
679 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
680 else:
681 content["state"] = "FAILED_CREATION"
682 content["resourceState"] = "ERROR"
683 # has to call update_operation_history return content
684 content = self.update_operation_history(content, workflow_status, None)
685 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
686
687 if workflow_status:
688 resource_status, resource_msg = self.odu.check_resource_status(
689 "create_profile", content
690 )
691 self.logger.info(
692 "resource_status is :{} and resource_msg is :{}".format(
693 resource_status, resource_msg
694 )
695 )
696 if resource_status:
697 content["resourceState"] = "READY"
698 else:
699 content["resourceState"] = "ERROR"
700
701 content["operatingState"] = "IDLE"
702 content = self.update_operation_history(
703 content, workflow_status, resource_status
704 )
705 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, content)
706
707 return
708
709 async def delete(self, content, order_id):
710 self.logger.info("Infra controller delete Enter")
711 items = self.db.get_one("k8sinfra_controller", {"_id": content["_id"]})
712
713 workflow_name = self.odu.launch_workflow("delete_profile", content)
714 self.logger.info("workflow_name is :{}".format(workflow_name))
715
716 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
717 self.logger.info(
718 "workflow_status is :{} and workflow_msg is :{}".format(
719 workflow_status, workflow_msg
720 )
721 )
722 if workflow_status:
723 items["state"] = "DELETED"
724 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
725 else:
726 items["state"] = "FAILED_DELETION"
727 items["resourceState"] = "ERROR"
728 # has to call update_operation_history return content
729 items = self.update_operation_history(items, workflow_status, None)
730 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
731
732 if workflow_status:
733 resource_status, resource_msg = self.odu.check_resource_status(
734 "delete_profile", content
735 )
736 self.logger.info(
737 "resource_status is :{} and resource_msg is :{}".format(
738 resource_status, resource_msg
739 )
740 )
741 if resource_status:
742 items["resourceState"] = "READY"
743 else:
744 items["resourceState"] = "ERROR"
745
746 items["operatingState"] = "IDLE"
747 items = self.update_operation_history(items, workflow_status, resource_status)
748 self.db.set_one("k8sinfra_controller", {"_id": content["_id"]}, items)
749
750 # To delete it from dB
751 if items["state"] == "DELETED":
752 self.db.del_one("k8sinfra_controller", {"_id": content["_id"]})
753 return
754
755
756class K8sInfraConfigLcm(LcmBase):
757 def __init__(self, msg, lcm_tasks, config):
758 """
759 Init, Connect to database, filesystem storage, and messaging
760 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
761 :return: None
762 """
763
764 self.logger = logging.getLogger("lcm.clusterlcm")
765 self.lcm_tasks = lcm_tasks
766 self.odu = odu_workflows.OduWorkflow(msg, self.lcm_tasks, config)
767
768 super().__init__(msg, self.logger)
769
770 async def create(self, content, order_id):
771 self.logger.info("Infra config Create Enter")
772
773 workflow_name = self.odu.launch_workflow("create_profile", content)
774 self.logger.info("workflow_name is :{}".format(workflow_name))
775
776 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
777 self.logger.info(
778 "workflow_status is :{} and workflow_msg is :{}".format(
779 workflow_status, workflow_msg
780 )
781 )
782 if workflow_status:
783 content["state"] = "CREATED"
784 content["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
785 else:
786 content["state"] = "FAILED_CREATION"
787 content["resourceState"] = "ERROR"
788 # has to call update_operation_history return content
789 content = self.update_operation_history(content, workflow_status, None)
790 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
791
792 if workflow_status:
793 resource_status, resource_msg = self.odu.check_resource_status(
794 "create_profile", content
795 )
796 self.logger.info(
797 "resource_status is :{} and resource_msg is :{}".format(
798 resource_status, resource_msg
799 )
800 )
801 if resource_status:
802 content["resourceState"] = "READY"
803 else:
804 content["resourceState"] = "ERROR"
805
806 content["operatingState"] = "IDLE"
807 content = self.update_operation_history(
808 content, workflow_status, resource_status
809 )
810 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, content)
811
812 return
813
814 async def delete(self, content, order_id):
815 self.logger.info("Infra config delete Enter")
816
817 workflow_name = self.odu.launch_workflow("delete_profile", content)
818 self.logger.info("workflow_name is :{}".format(workflow_name))
819 items = self.db.get_one("k8sinfra_config", {"_id": content["_id"]})
820
821 workflow_status, workflow_msg = self.odu.check_workflow_status(workflow_name)
822 self.logger.info(
823 "workflow_status is :{} and workflow_msg is :{}".format(
824 workflow_status, workflow_msg
825 )
826 )
827 if workflow_status:
828 items["state"] = "DELETED"
829 items["resourceState"] = "IN_PROGRESS.GIT_SYNCED"
830 else:
831 items["state"] = "FAILED_DELETION"
832 items["resourceState"] = "ERROR"
833 # has to call update_operation_history return content
834 items = self.update_operation_history(items, workflow_status, None)
835 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
836
837 resource_status, resource_msg = self.odu.check_resource_status(
838 "delete_profile", content
839 )
840 self.logger.info(
841 "resource_status is :{} and resource_msg is :{}".format(
842 resource_status, resource_msg
843 )
844 )
845 if resource_status:
846 items["resourceState"] = "READY"
847 else:
848 items["resourceState"] = "ERROR"
849
850 items["operatingState"] = "IDLE"
851 items = self.update_operation_history(items, workflow_status, resource_status)
852 self.db.set_one("k8sinfra_config", {"_id": content["_id"]}, items)
853
854 # To delete it from dB
855 if items["state"] == "DELETED":
856 self.db.del_one("k8sinfra_config", {"_id": content["_id"]})
857 return