- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
-
- elif message.topic == "vim_account":
- if message.key == "create" or message.key == "edit":
- auth_manager.store_auth_credentials(values)
- if message.key == "delete":
- auth_manager.delete_auth_credentials(values)
-
- # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly.
- elif message.topic == "access_credentials":
- # Check the vim desired by the message
- vim_type = get_vim_type(values['vim_uuid'])
-
- if vim_type == "aws":
- log.info("This message is for the CloudWatch plugin.")
- aws_access_credentials.access_credential_calls(message)
-
- elif vim_type == "vmware":
- log.info("This access_credentials message is for the vROPs plugin.")
- vrops_rcvr.consume(message)
-
- else:
- log.debug("vim_type is misconfigured or unsupported; %s",
- vim_type)
-
- else:
- log.info("This topic is not relevant to any of the MON plugins.")
-
-
- except Exception as exc:
- log.exception("Exception: %s")
+ # Get ns_id from message
+ # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
+ contains_list = False
+ list_index = None
+ for k, v in six.iteritems(values):
+ if isinstance(v, dict):
+ if 'ns_id' in v:
+ contains_list = True
+ list_index = k
+ break
+ if not contains_list and 'ns_id' in values:
+ ns_id = values['ns_id']
+ else:
+ ns_id = values[list_index]['ns_id']
+
+ vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
+
+ # Check the vim desired by the message
+ vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
+ vim_uuid = vnfr['vim-account-id']
+
+ if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
+ vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
+ vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
+ if contains_list:
+ values[list_index]['resource_uuid'] = vdur['vim-id']
+ else:
+ values['resource_uuid'] = vdur['vim-id']
+ message = message._replace(value=json.dumps(values))
+
+ vim_type = self.get_vim_type(vim_uuid)
+
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ if message.topic == "metric_request":
+ response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
+ if message.topic == "alarm_request":
+ response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
+
+ elif vim_type == "aws":
+ log.info("This message is for the CloudWatch plugin.")
+ aws_conn = self.aws_connection.setEnvironment()
+ if message.topic == "metric_request":
+ response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
+ if message.topic == "alarm_request":
+ response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
+
+ elif vim_type == "vmware":
+ log.info("This metric_request message is for the vROPs plugin.")
+ if message.topic == "metric_request":
+ response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
+ if message.topic == "alarm_request":
+ response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
+
+ else:
+ log.debug("vim_type is misconfigured or unsupported; %s",
+ vim_type)
+ if response:
+ self._publish_response(message.topic, message.key, response)
+
+ except Exception:
+ log.exception("Exception processing message: ")
+
+ def _publish_response(self, topic: str, key: str, msg: dict):
+ topic = topic.replace('request', 'response')
+ key = key.replace('request', 'response')
+ producer = Producer()
+ producer.send(topic=topic, key=key, value=json.dumps(msg))
+ producer.flush()
+ producer.close()
+
+
+if __name__ == '__main__':
+ CommonConsumer().run()