Coverage for osm_policy_module/core/agent.py: 57%

115 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2024-06-30 08:29 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright 2018 Whitestack, LLC 

4# ************************************************************* 

5 

6# This file is part of OSM Monitoring module 

7# All Rights Reserved to Whitestack, LLC 

8 

9# Licensed under the Apache License, Version 2.0 (the "License"); you may 

10# not use this file except in compliance with the License. You may obtain 

11# a copy of the License at 

12 

13# http://www.apache.org/licenses/LICENSE-2.0 

14 

15# Unless required by applicable law or agreed to in writing, software 

16# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

18# License for the specific language governing permissions and limitations 

19# under the License. 

20 

21# For those usages not covered by the Apache License, Version 2.0 please 

22# contact: bdiaz@whitestack.com or glavado@whitestack.com 

23## 

24import asyncio 

25import logging 

26from pathlib import Path 

27import os 

28 

29import peewee 

30 

31from osm_policy_module.alarming.service import AlarmingService 

32from osm_policy_module.autoscaling.service import AutoscalingService 

33from osm_policy_module.healing.service import HealingService 

34from osm_policy_module.common.common_db_client import CommonDbClient 

35from osm_policy_module.common.message_bus_client import MessageBusClient 

36from osm_policy_module.core.config import Config 

37 

38log = logging.getLogger(__name__) 

39 

40ALLOWED_KAFKA_KEYS = [ 

41 "instantiated", 

42 "scaled", 

43 "terminated", 

44 "notify_alarm", 

45 "policy_updated", 

46 "vnf_terminated", 

47] 

48 

49 

50class PolicyModuleAgent: 

51 def __init__(self, config: Config, loop=None): 

52 self.conf = config 

53 if not loop: 

54 loop = asyncio.get_event_loop() 

55 self.loop = loop 

56 self.msg_bus = MessageBusClient(config) 

57 self.db_client = CommonDbClient(config) 

58 self.autoscaling_service = AutoscalingService(config, loop) 

59 self.alarming_service = AlarmingService(config, loop) 

60 self.healing_service = HealingService(config, loop) 

61 

62 def run(self): 

63 self.loop.run_until_complete(self.start()) 

64 

65 async def start(self): 

66 Path("/tmp/osm_pol_agent_health_flag").touch() 

67 topics = ["ns", "alarm_response"] 

68 await self.msg_bus.aioread(topics, self._process_msg) 

69 log.critical("Exiting...") 

70 if os.path.exists("/tmp/osm_pol_agent_health_flag"): 

71 os.remove("/tmp/osm_pol_agent_health_flag") 

72 

73 async def _process_msg(self, topic, key, msg): 

74 Path("/tmp/osm_pol_agent_health_flag").touch() 

75 log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) 

76 try: 

77 if key in ALLOWED_KAFKA_KEYS: 

78 if key == "instantiated": 

79 await self._handle_instantiated(msg) 

80 

81 if key == "scaled": 

82 await self._handle_scaled(msg) 

83 

84 if key == "terminated": 

85 await self._handle_terminated(msg) 

86 

87 if key == "notify_alarm": 

88 await self._handle_alarm_notification(msg) 

89 

90 if key == "policy_updated": 

91 await self._handle_policy_update(msg) 

92 

93 if key == "vnf_terminated": 

94 await self._handle_vnf_terminated(msg) 

95 else: 

96 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) 

97 except peewee.PeeweeException: 

98 log.exception("Database error consuming message: ") 

99 raise 

100 except Exception: 

101 log.exception("Error consuming message: ") 

102 

103 async def _handle_alarm_notification(self, content): 

104 log.debug("_handle_alarm_notification: %s", content) 

105 alarm_uuid = content["notify_details"]["alarm_uuid"] 

106 status = content["notify_details"]["status"] 

107 await self.autoscaling_service.handle_alarm(alarm_uuid, status) 

108 await self.alarming_service.handle_alarm(alarm_uuid, status, content) 

109 await self.healing_service.handle_alarm(alarm_uuid, status) 

110 

111 async def _handle_instantiated(self, content): 

112 log.debug("_handle_instantiated: %s", content) 

113 nslcmop_id = content["nslcmop_id"] 

114 nslcmop = self.db_client.get_nslcmop(nslcmop_id) 

115 if ( 

116 nslcmop["operationState"] == "COMPLETED" 

117 or nslcmop["operationState"] == "PARTIALLY_COMPLETED" 

118 ): 

119 nsr_id = nslcmop["nsInstanceId"] 

120 log.info("Configuring nsr_id: %s", nsr_id) 

121 await self.autoscaling_service.configure_scaling_groups(nsr_id) 

122 await self.alarming_service.configure_vnf_alarms(nsr_id) 

123 await self.healing_service.configure_healing_alarms(nsr_id) 

124 else: 

125 log.info( 

126 "Network_service is not in COMPLETED or PARTIALLY_COMPLETED state. " 

127 "Current state is %s. Skipping...", 

128 nslcmop["operationState"], 

129 ) 

130 

131 async def _handle_scaled(self, content): 

132 log.debug("_handle_scaled: %s", content) 

133 nslcmop_id = content["nslcmop_id"] 

134 nslcmop = self.db_client.get_nslcmop(nslcmop_id) 

135 if ( 

136 nslcmop["operationState"] == "COMPLETED" 

137 or nslcmop["operationState"] == "PARTIALLY_COMPLETED" 

138 ): 

139 nsr_id = nslcmop["nsInstanceId"] 

140 log.info("Configuring scaled service with nsr_id: %s", nsr_id) 

141 await self.autoscaling_service.configure_scaling_groups(nsr_id) 

142 await self.autoscaling_service.delete_orphaned_alarms(nsr_id) 

143 await self.alarming_service.configure_vnf_alarms(nsr_id) 

144 await self.healing_service.configure_healing_alarms(nsr_id) 

145 await self.healing_service.delete_orphaned_healing_alarms(nsr_id) 

146 else: 

147 log.debug( 

148 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " 

149 "Current state is %s. Skipping...", 

150 nslcmop["operationState"], 

151 ) 

152 

153 async def _handle_terminated(self, content): 

154 log.debug("_handle_deleted: %s", content) 

155 nsr_id = content["nsr_id"] 

156 if ( 

157 content["operationState"] == "COMPLETED" 

158 or content["operationState"] == "PARTIALLY_COMPLETED" 

159 ): 

160 log.info( 

161 "Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", 

162 nsr_id, 

163 ) 

164 await self.autoscaling_service.delete_scaling_groups(nsr_id) 

165 await self.alarming_service.delete_vnf_alarms(nsr_id) 

166 await self.healing_service.delete_healing_alarms(nsr_id) 

167 else: 

168 log.info( 

169 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " 

170 "Current state is %s. Skipping...", 

171 content["operationState"], 

172 ) 

173 

174 async def _handle_policy_update(self, content): 

175 log.info("_handle_policy_update: %s", content) 

176 nsr_id = content["nsr_id"] 

177 vnf_member_index = content["vnf_member_index"] 

178 if ( 

179 content["operationState"] == "COMPLETED" 

180 or content["operationState"] == "PARTIALLY_COMPLETED" 

181 ): 

182 log.info( 

183 "Updating policies of VNF with nsr_id: %s and vnf-member-index: %s" 

184 % (nsr_id, vnf_member_index) 

185 ) 

186 await self.autoscaling_service.delete_scaling_groups( 

187 nsr_id, vnf_member_index 

188 ) 

189 await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) 

190 await self.autoscaling_service.configure_scaling_groups( 

191 nsr_id, vnf_member_index 

192 ) 

193 await self.alarming_service.configure_vnf_alarms(nsr_id, vnf_member_index) 

194 else: 

195 log.info( 

196 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " 

197 "Current state is %s. Skipping...", 

198 content["operationState"], 

199 ) 

200 

201 async def _handle_vnf_terminated(self, content): 

202 nsr_id = content["nsr_id"] 

203 vnf_member_index = content["vnf_member_index"] 

204 if ( 

205 content["operationState"] == "COMPLETED" 

206 or content["operationState"] == "PARTIALLY_COMPLETED" 

207 ): 

208 log.info( 

209 "Deleting policies of VNF with nsr_id: %s and vnf-member-index: %s" 

210 % (nsr_id, vnf_member_index) 

211 ) 

212 await self.autoscaling_service.delete_scaling_groups( 

213 nsr_id, vnf_member_index 

214 ) 

215 await self.alarming_service.delete_vnf_alarms(nsr_id, vnf_member_index) 

216 else: 

217 log.info( 

218 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " 

219 "Current state is %s. Skipping...", 

220 content["operationState"], 

221 )