MON changes for Grafana Multitenancy feature.

Change-Id: Iff425db1236683d8bf8bcf4ec4dac0c23484cdfa
Signed-off-by: agarwalat <atul.agarwal@altran.com>
diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py
index 211038d..b49b570 100644
--- a/osm_mon/cmd/mon_dashboarder.py
+++ b/osm_mon/cmd/mon_dashboarder.py
@@ -27,6 +27,7 @@
 
 from osm_mon.core.config import Config
 from osm_mon.dashboarder.dashboarder import Dashboarder
+import threading
 
 
 def main():
@@ -47,6 +48,9 @@
     log.info("Starting MON Dashboarder...")
     log.debug("Config: %s", cfg.conf)
     dashboarder = Dashboarder(cfg)
+    log.info("Starting MON kafka Consumer...")
+    threading.Thread(target=dashboarder.run, args=()).start()
+    log.info("Starting MON Dashboarder...")
     dashboarder.dashboard_forever()
 
 
diff --git a/osm_mon/core/common_db.py b/osm_mon/core/common_db.py
index 983d84d..e43c6cd 100644
--- a/osm_mon/core/common_db.py
+++ b/osm_mon/core/common_db.py
@@ -153,3 +153,12 @@
         for alarm_dict in alarm_dicts:
             alarms.append(Alarm.from_dict(alarm_dict))
         return alarms
+
+    def get_user(self, username: str):
+        return self.common_db.get_one('users', {'username': username})
+
+    def get_user_by_id(self, userid: str):
+        return self.common_db.get_one('users', {'_id': userid})
+
+    def get_role_by_name(self, name: str):
+        return self.common_db.get_one('roles', {'name': name})
diff --git a/osm_mon/dashboarder/backends/grafana.py b/osm_mon/dashboarder/backends/grafana.py
index acc602e..6fb6b94 100644
--- a/osm_mon/dashboarder/backends/grafana.py
+++ b/osm_mon/dashboarder/backends/grafana.py
@@ -1,72 +1,186 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: glavado@whitestack.com or fbravo@whitestack.com
-##
-import logging
-import requests
-import base64
-from osm_mon.core.config import Config
-
-log = logging.getLogger(__name__)
-
-
-class GrafanaBackend:
-    def __init__(self, config: Config):
-        self.conf = config
-        self.url = config.get('grafana', 'url')
-        grafana_user = config.get("grafana", "user")
-        grafana_password = config.get("grafana", "password")
-        self.headers = {
-            'content-type': "application/json",
-            'authorization': "Basic %s" % base64.b64encode(
-                (grafana_user + ":" + grafana_password).encode("utf-8")).decode()
-        }
-
-    def get_all_dashboard_uids(self):
-        # Gets only dashboards that were automated by OSM (with tag 'osm_automated')
-        response = requests.request("GET", self.url + "/api/search?tag=osm_automated", headers=self.headers)
-        dashboards = response.json()
-        dashboard_uids = []
-        for dashboard in dashboards:
-            dashboard_uids.append(dashboard['uid'])
-        log.debug("Searching for all dashboard uids: %s", dashboard_uids)
-        return dashboard_uids
-
-    def get_dashboard_status(self, uid):
-        response = requests.request("GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers)
-        log.debug("Searching for dashboard result: %s", response.text)
-        return response
-
-    def create_dashboard(self, uid, name, json_file):
-        with open(json_file) as f:
-            dashboard_data = f.read()
-
-        dashboard_data = dashboard_data.replace('OSM_ID', uid).replace('OSM_NAME', name)
-
-        response = requests.request(
-            "POST", self.url + "/api/dashboards/db/",  data=dashboard_data, headers=self.headers)
-        log.debug("Creating dashboard result: %s", response.text)
-        return response
-
-    def delete_dashboard(self, uid):
-        response = requests.request("DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers)
-        log.debug("Delete dashboard result: %s", response.text)
-        return response
+# -*- coding: utf-8 -*-

+

+# Copyright 2018 Whitestack, LLC

+# *************************************************************

+

+# This file is part of OSM Monitoring module

+# All Rights Reserved to Whitestack, LLC

+

+# Licensed under the Apache License, Version 2.0 (the "License"); you may

+# not use this file except in compliance with the License. You may obtain

+# a copy of the License at

+

+#         http://www.apache.org/licenses/LICENSE-2.0

+

+# Unless required by applicable law or agreed to in writing, software

+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

+# License for the specific language governing permissions and limitations

+# under the License.

+# For those usages not covered by the Apache License, Version 2.0 please

+# contact: glavado@whitestack.com or fbravo@whitestack.com

+##

+import logging

+import requests

+import base64

+import json

+from osm_mon.core.config import Config

+

+log = logging.getLogger(__name__)

+

+

+class GrafanaBackend:

+    def __init__(self, config: Config):

+        self.conf = config

+        self.url = config.get('grafana', 'url')

+        grafana_user = config.get("grafana", "user")

+        grafana_password = config.get("grafana", "password")

+        self.headers = {

+            'content-type': "application/json",

+            'authorization': "Basic %s" % base64.b64encode(

+                (grafana_user + ":" + grafana_password).encode("utf-8")).decode()

+        }

+

+    def get_all_dashboard_uids(self):

+        # Gets only dashboards that were automated by OSM (with tag 'osm_automated')

+        response = requests.request("GET", self.url + "/api/search?tag=osm_automated", headers=self.headers)

+        dashboards = response.json()

+        dashboard_uids = []

+        for dashboard in dashboards:

+            dashboard_uids.append(dashboard['uid'])

+        log.debug("Searching for all dashboard uids: %s", dashboard_uids)

+        return dashboard_uids

+

+    def get_dashboard_status(self, uid):

+        response = requests.request("GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers)

+        log.debug("Searching for dashboard result: %s", response.text)

+        return response

+

+    def create_dashboard(self, uid, name, json_file, project_name=None):

+        try:

+            with open(json_file) as f:

+                dashboard_data = f.read()

+

+            dashboard_data = dashboard_data.replace('OSM_ID', uid).replace('OSM_NAME', name)

+

+            response = requests.request(

+                "POST", self.url + "/api/dashboards/db/", data=dashboard_data, headers=self.headers)

+            # get team id

+            if project_name is not None:

+                name = project_name

+            response_team = requests.request(

+                "GET", self.url + "/api/teams/search?name={}".format(name), headers=self.headers)

+            if len(json.loads(response_team.text)["teams"]) > 0:

+                team_id = json.loads(response_team.text)["teams"][0]["id"]

+                permission_data = {"items": [{"teamId": team_id, "permission": 2}, ]}

+                # provide permission to dashboard

+                dahboard_id = json.loads(response.text)["id"]

+                response = requests.request(

+                    "POST", self.url + "/api/dashboards/id/{}/permissions".format(dahboard_id), json=permission_data,

+                    headers=self.headers)

+            return response

+        except Exception:

+            log.exception("Exception processing message: ")

+

+    def delete_dashboard(self, uid):

+        response = requests.request("DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers)

+        log.debug("Delete dashboard result: %s", response.text)

+        return response

+

+    def create_grafana_users(self, user):

+        email = "{}@osm.etsi.org".format(user)

+        user_payload = {

+            "name": user,

+            "email": email,

+            "login": user,

+            "password": user,

+        }

+        response_users = requests.request("POST", self.url + "/api/admin/users/", json=user_payload,

+                                          headers=self.headers)

+        json_data = json.loads(response_users.text)

+        url = "/api/org/users/{}/".format(json_data["id"])

+        permission_payload = {"role": "Editor", }

+        requests.request("PATCH", self.url + url, json=permission_payload, headers=self.headers)

+        log.info("Grafana user created: %s", response_users.text)

+        return response_users

+

+    # create grafana team with member

+    def create_grafana_teams_members(self, project_name, user_name, is_admin, proj_list):

+        # check if user exist in grafana or not

+        user_response = requests.request("GET", self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),

+                                         headers=self.headers)

+        user_obj = json.loads(user_response.text)

+        if user_response.status_code != 200:

+            user_response = self.create_grafana_users(user_name)

+            user_obj = json.loads(user_response.text)

+

+        user_id = user_obj["id"]

+

+        # Get Teams for user

+        team_objs = requests.request("GET", self.url + "/api/users/{}/teams".format(user_id), headers=self.headers)

+        team_obj = json.loads(team_objs.text)

+        team_list = []

+        if len(team_obj):

+            for team in team_obj:

+                team_list.append(team["name"])

+

+        proj_unlink = set(team_list) - set(proj_list)

+        for prj in proj_unlink:

+            response_team = requests.request("GET", self.url + "/api/teams/search?name={}".format(prj),

+                                             headers=self.headers)

+            team_id = json.loads(response_team.text)["teams"][0]["id"]

+            requests.request("DELETE", self.url + "/api/teams/{}/members/{}".format(team_id, user_id),

+                             headers=self.headers)

+        # add member to team

+        response_team = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_name),

+                                         headers=self.headers)

+        team_id = json.loads(response_team.text)["teams"][0]["id"]

+        if project_name not in team_list:

+            member_payload = {

+                "userId": user_id

+            }

+            requests.request("POST", self.url + "/api/teams/{}/members".format(team_id), json=member_payload,

+                             headers=self.headers)

+        # if role is admin change permission to admin

+        if is_admin:

+            url = "/api/org/users/{}/".format(user_id)

+            permission_payload = {"role": "Admin", }

+            requests.request("PATCH", self.url + url, json=permission_payload, headers=self.headers)

+        log.info("Member added to grafana team")

+        return response_team

+

+    # create grafana team

+    def create_grafana_teams(self, team_name):

+        team_payload = {"name": team_name, }

+        response = requests.request("POST", self.url + "/api/teams", json=team_payload, headers=self.headers)

+        log.info("Grafana Team is created: %s", response.text)

+

+    def delete_grafana_users(self, user_name):

+        # find user id

+        response_id = requests.request("GET", self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),

+                                       headers=self.headers)

+        try:

+            user_id = json.loads(response_id.text)["id"]

+        except Exception:

+            log.exception("Exception processing message: ")

+        # delete user

+        response = requests.request("DELETE", self.url + "/api/admin/users/{}".format(user_id), headers=self.headers)

+        log.info("Grafana user deleted: %s", response.text)

+        return response

+

+    def delete_grafana_team(self, project_name):

+        team_obj = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_name),

+                                    headers=self.headers)

+        team_id = json.loads(team_obj.text)["teams"][0]["id"]

+        response = requests.request("DELETE", self.url + "/api/teams/{}".format(team_id), headers=self.headers)

+        log.info("Grafana team deleated: %s", response.text)

+        return response

+

+    def update_grafana_teams(self, project_new_name, project_old_name):

+        team_obj = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_old_name),

+                                    headers=self.headers)

+        team_id = json.loads(team_obj.text)["teams"][0]["id"]

+        data = {"name": project_new_name, }

+        response = requests.request("PUT", self.url + "/api/teams/{}".format(team_id), json=data, headers=self.headers)

+        log.info("Grafana team updated %s", response.text)

+        return response

diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py
index bb2935a..30f27d1 100644
--- a/osm_mon/dashboarder/dashboarder.py
+++ b/osm_mon/dashboarder/dashboarder.py
@@ -24,17 +24,64 @@
 import time
 import socket
 import peewee
+import asyncio
 
 from osm_mon.dashboarder.service import DashboarderService
 from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
 
 log = logging.getLogger(__name__)
 
 
 class Dashboarder:
-    def __init__(self, config: Config):
+    def __init__(self, config: Config, loop=None):
         self.conf = config
         self.service = DashboarderService(config)
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+        self.msg_bus = MessageBusClient(config)
+
+    # run consumer for grafana user management
+    def run(self):
+        self.loop.run_until_complete(self.start())
+
+    async def start(self):
+        topics = ["users", "project"]
+        await self.msg_bus.aioread(topics, self._user_msg)
+
+    async def _user_msg(self, topic, key, values):
+        log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key)
+        try:
+            if topic == "users" and key == "created":
+                log.debug("Received message from kafka for creating user")
+                user = values['username']
+                self.service.create_grafana_user(user)
+            elif topic == "users" and key == "deleted":
+                log.debug("Received message from kafka for deleting user")
+                user = values['username']
+                self.service.delete_grafana_user(user)
+                log.info("Grafana user deleted: %s", user)
+            elif topic == "users" and key == "edited":
+                log.debug("Received message from kafka for associating user to team")
+                user_id = values["_id"]
+                project_data = values["changes"]["project_role_mappings"]
+                self.service.create_grafana_team_member(project_data, user_id)
+            elif topic == "project" and key == "created":
+                log.debug("Received message from kafka for creating team")
+                team_name = values["name"]
+                self.service.create_grafana_team(team_name)
+            elif topic == "project" and key == "deleted":
+                log.debug("Received message from kafka for deleting team")
+                project_name = values["name"]
+                self.service.delete_grafana_team(project_name)
+            elif topic == "project" and key == "edited":
+                log.debug("Received message from kafka for team name update")
+                project_old_name = values["original"]["name"]
+                project_new_name = values["changes"]["name"]
+                self.service.update_grafana_team(project_new_name, project_old_name)
+        except Exception:
+            log.exception("Exception processing message: ")
 
     def dashboard_forever(self):
         log.debug('dashboard_forever')
diff --git a/osm_mon/dashboarder/service.py b/osm_mon/dashboarder/service.py
index 14ea0b9..2abd6be 100644
--- a/osm_mon/dashboarder/service.py
+++ b/osm_mon/dashboarder/service.py
@@ -69,13 +69,16 @@
             constituent_vnfds = nsr['nsd']['constituent-vnfd']
             for constituent_vnfd in constituent_vnfds:
                 try:
-                    vnfd = self.common_db.get_vnfd_by_id(constituent_vnfd['vnfd-id-ref'])
+                    vnfd = self.common_db.get_vnfd_by_name(constituent_vnfd['vnfd-id-ref'])
                     # If there are metrics, create dashboard (if exists)
-                    if 'monitoring-param' in vnfd:
+                    if vnfd and 'monitoring-param' in vnfd:
                         if nsr_id not in dashboard_uids:
                             nsr_name = nsr['name']
+                            project_id = nsr["_admin"]["projects_read"][0]
+                            project_details = self.common_db.get_project(project_id)
+                            project_name = project_details["name"]
                             self.grafana.create_dashboard(nsr_id, nsr_name,
-                                                          dashboard_path)
+                                                          dashboard_path, project_name)
                             log.debug('Created dashboard for NS: %s', nsr_id)
                         else:
                             log.debug('Dashboard already exists')
@@ -93,3 +96,29 @@
                 log.debug('Deleted obsolete dashboard: %s', dashboard_uid)
             else:
                 log.debug('All dashboards in use')
+
+    def create_grafana_user(self, user):
+        self.grafana.create_grafana_users(user)
+
+    def create_grafana_team_member(self, project_data, userid):
+        user = self.common_db.get_user_by_id(userid)
+        user_name = user["username"]
+        proj_list = []
+        for project in project_data:
+            proj_list.append(project["project"])
+        for proj in project_data:
+            role_obj = self.common_db.get_role_by_name(proj["role"])
+            is_admin = role_obj["permissions"].get("admin")
+            self.grafana.create_grafana_teams_members(proj["project"], user_name, is_admin, proj_list)
+
+    def create_grafana_team(self, team_name):
+        self.grafana.create_grafana_teams(team_name)
+
+    def delete_grafana_user(self, user_name):
+        self.grafana.delete_grafana_users(user_name)
+
+    def delete_grafana_team(self, project_name):
+        self.grafana.delete_grafana_team(project_name)
+
+    def update_grafana_team(self, project_new_name, project_old_name):
+        self.grafana.update_grafana_teams(project_new_name, project_old_name)