From 85a9185db3248f1e3f20c8edad95ab77b8ee989c Mon Sep 17 00:00:00 2001 From: agarwalat Date: Thu, 8 Oct 2020 12:24:47 +0000 Subject: [PATCH] MON changes for Grafana Multitenancy feature. Change-Id: Iff425db1236683d8bf8bcf4ec4dac0c23484cdfa Signed-off-by: agarwalat --- osm_mon/cmd/mon_dashboarder.py | 4 + osm_mon/core/common_db.py | 9 + osm_mon/dashboarder/backends/grafana.py | 258 +++++++++++++++++------- osm_mon/dashboarder/dashboarder.py | 49 ++++- osm_mon/dashboarder/service.py | 35 +++- 5 files changed, 279 insertions(+), 76 deletions(-) diff --git a/osm_mon/cmd/mon_dashboarder.py b/osm_mon/cmd/mon_dashboarder.py index 211038d..b49b570 100644 --- a/osm_mon/cmd/mon_dashboarder.py +++ b/osm_mon/cmd/mon_dashboarder.py @@ -27,6 +27,7 @@ import sys from osm_mon.core.config import Config from osm_mon.dashboarder.dashboarder import Dashboarder +import threading def main(): @@ -47,6 +48,9 @@ def main(): log.info("Starting MON Dashboarder...") log.debug("Config: %s", cfg.conf) dashboarder = Dashboarder(cfg) + log.info("Starting MON kafka Consumer...") + threading.Thread(target=dashboarder.run, args=()).start() + log.info("Starting MON Dashboarder...") dashboarder.dashboard_forever() diff --git a/osm_mon/core/common_db.py b/osm_mon/core/common_db.py index 983d84d..e43c6cd 100644 --- a/osm_mon/core/common_db.py +++ b/osm_mon/core/common_db.py @@ -153,3 +153,12 @@ class CommonDbClient: for alarm_dict in alarm_dicts: alarms.append(Alarm.from_dict(alarm_dict)) return alarms + + def get_user(self, username: str): + return self.common_db.get_one('users', {'username': username}) + + def get_user_by_id(self, userid: str): + return self.common_db.get_one('users', {'_id': userid}) + + def get_role_by_name(self, name: str): + return self.common_db.get_one('roles', {'name': name}) diff --git a/osm_mon/dashboarder/backends/grafana.py b/osm_mon/dashboarder/backends/grafana.py index acc602e..6fb6b94 100644 --- a/osm_mon/dashboarder/backends/grafana.py +++ b/osm_mon/dashboarder/backends/grafana.py @@ -1,72 +1,186 @@ -# -*- coding: utf-8 -*- - -# Copyright 2018 Whitestack, LLC -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Whitestack, LLC - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# For those usages not covered by the Apache License, Version 2.0 please -# contact: glavado@whitestack.com or fbravo@whitestack.com -## -import logging -import requests -import base64 -from osm_mon.core.config import Config - -log = logging.getLogger(__name__) - - -class GrafanaBackend: - def __init__(self, config: Config): - self.conf = config - self.url = config.get('grafana', 'url') - grafana_user = config.get("grafana", "user") - grafana_password = config.get("grafana", "password") - self.headers = { - 'content-type': "application/json", - 'authorization': "Basic %s" % base64.b64encode( - (grafana_user + ":" + grafana_password).encode("utf-8")).decode() - } - - def get_all_dashboard_uids(self): - # Gets only dashboards that were automated by OSM (with tag 'osm_automated') - response = requests.request("GET", self.url + "/api/search?tag=osm_automated", headers=self.headers) - dashboards = response.json() - dashboard_uids = [] - for dashboard in dashboards: - dashboard_uids.append(dashboard['uid']) - log.debug("Searching for all dashboard uids: %s", dashboard_uids) - return dashboard_uids - - def get_dashboard_status(self, uid): - response = requests.request("GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers) - log.debug("Searching for dashboard result: %s", response.text) - return response - - def create_dashboard(self, uid, name, json_file): - with open(json_file) as f: - dashboard_data = f.read() - - dashboard_data = dashboard_data.replace('OSM_ID', uid).replace('OSM_NAME', name) - - response = requests.request( - "POST", self.url + "/api/dashboards/db/", data=dashboard_data, headers=self.headers) - log.debug("Creating dashboard result: %s", response.text) - return response - - def delete_dashboard(self, uid): - response = requests.request("DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers) - log.debug("Delete dashboard result: %s", response.text) - return response +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# For those usages not covered by the Apache License, Version 2.0 please +# contact: glavado@whitestack.com or fbravo@whitestack.com +## +import logging +import requests +import base64 +import json +from osm_mon.core.config import Config + +log = logging.getLogger(__name__) + + +class GrafanaBackend: + def __init__(self, config: Config): + self.conf = config + self.url = config.get('grafana', 'url') + grafana_user = config.get("grafana", "user") + grafana_password = config.get("grafana", "password") + self.headers = { + 'content-type': "application/json", + 'authorization': "Basic %s" % base64.b64encode( + (grafana_user + ":" + grafana_password).encode("utf-8")).decode() + } + + def get_all_dashboard_uids(self): + # Gets only dashboards that were automated by OSM (with tag 'osm_automated') + response = requests.request("GET", self.url + "/api/search?tag=osm_automated", headers=self.headers) + dashboards = response.json() + dashboard_uids = [] + for dashboard in dashboards: + dashboard_uids.append(dashboard['uid']) + log.debug("Searching for all dashboard uids: %s", dashboard_uids) + return dashboard_uids + + def get_dashboard_status(self, uid): + response = requests.request("GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers) + log.debug("Searching for dashboard result: %s", response.text) + return response + + def create_dashboard(self, uid, name, json_file, project_name=None): + try: + with open(json_file) as f: + dashboard_data = f.read() + + dashboard_data = dashboard_data.replace('OSM_ID', uid).replace('OSM_NAME', name) + + response = requests.request( + "POST", self.url + "/api/dashboards/db/", data=dashboard_data, headers=self.headers) + # get team id + if project_name is not None: + name = project_name + response_team = requests.request( + "GET", self.url + "/api/teams/search?name={}".format(name), headers=self.headers) + if len(json.loads(response_team.text)["teams"]) > 0: + team_id = json.loads(response_team.text)["teams"][0]["id"] + permission_data = {"items": [{"teamId": team_id, "permission": 2}, ]} + # provide permission to dashboard + dahboard_id = json.loads(response.text)["id"] + response = requests.request( + "POST", self.url + "/api/dashboards/id/{}/permissions".format(dahboard_id), json=permission_data, + headers=self.headers) + return response + except Exception: + log.exception("Exception processing message: ") + + def delete_dashboard(self, uid): + response = requests.request("DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers) + log.debug("Delete dashboard result: %s", response.text) + return response + + def create_grafana_users(self, user): + email = "{}@osm.etsi.org".format(user) + user_payload = { + "name": user, + "email": email, + "login": user, + "password": user, + } + response_users = requests.request("POST", self.url + "/api/admin/users/", json=user_payload, + headers=self.headers) + json_data = json.loads(response_users.text) + url = "/api/org/users/{}/".format(json_data["id"]) + permission_payload = {"role": "Editor", } + requests.request("PATCH", self.url + url, json=permission_payload, headers=self.headers) + log.info("Grafana user created: %s", response_users.text) + return response_users + + # create grafana team with member + def create_grafana_teams_members(self, project_name, user_name, is_admin, proj_list): + # check if user exist in grafana or not + user_response = requests.request("GET", self.url + "/api/users/lookup?loginOrEmail={}".format(user_name), + headers=self.headers) + user_obj = json.loads(user_response.text) + if user_response.status_code != 200: + user_response = self.create_grafana_users(user_name) + user_obj = json.loads(user_response.text) + + user_id = user_obj["id"] + + # Get Teams for user + team_objs = requests.request("GET", self.url + "/api/users/{}/teams".format(user_id), headers=self.headers) + team_obj = json.loads(team_objs.text) + team_list = [] + if len(team_obj): + for team in team_obj: + team_list.append(team["name"]) + + proj_unlink = set(team_list) - set(proj_list) + for prj in proj_unlink: + response_team = requests.request("GET", self.url + "/api/teams/search?name={}".format(prj), + headers=self.headers) + team_id = json.loads(response_team.text)["teams"][0]["id"] + requests.request("DELETE", self.url + "/api/teams/{}/members/{}".format(team_id, user_id), + headers=self.headers) + # add member to team + response_team = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_name), + headers=self.headers) + team_id = json.loads(response_team.text)["teams"][0]["id"] + if project_name not in team_list: + member_payload = { + "userId": user_id + } + requests.request("POST", self.url + "/api/teams/{}/members".format(team_id), json=member_payload, + headers=self.headers) + # if role is admin change permission to admin + if is_admin: + url = "/api/org/users/{}/".format(user_id) + permission_payload = {"role": "Admin", } + requests.request("PATCH", self.url + url, json=permission_payload, headers=self.headers) + log.info("Member added to grafana team") + return response_team + + # create grafana team + def create_grafana_teams(self, team_name): + team_payload = {"name": team_name, } + response = requests.request("POST", self.url + "/api/teams", json=team_payload, headers=self.headers) + log.info("Grafana Team is created: %s", response.text) + + def delete_grafana_users(self, user_name): + # find user id + response_id = requests.request("GET", self.url + "/api/users/lookup?loginOrEmail={}".format(user_name), + headers=self.headers) + try: + user_id = json.loads(response_id.text)["id"] + except Exception: + log.exception("Exception processing message: ") + # delete user + response = requests.request("DELETE", self.url + "/api/admin/users/{}".format(user_id), headers=self.headers) + log.info("Grafana user deleted: %s", response.text) + return response + + def delete_grafana_team(self, project_name): + team_obj = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_name), + headers=self.headers) + team_id = json.loads(team_obj.text)["teams"][0]["id"] + response = requests.request("DELETE", self.url + "/api/teams/{}".format(team_id), headers=self.headers) + log.info("Grafana team deleated: %s", response.text) + return response + + def update_grafana_teams(self, project_new_name, project_old_name): + team_obj = requests.request("GET", self.url + "/api/teams/search?name={}".format(project_old_name), + headers=self.headers) + team_id = json.loads(team_obj.text)["teams"][0]["id"] + data = {"name": project_new_name, } + response = requests.request("PUT", self.url + "/api/teams/{}".format(team_id), json=data, headers=self.headers) + log.info("Grafana team updated %s", response.text) + return response diff --git a/osm_mon/dashboarder/dashboarder.py b/osm_mon/dashboarder/dashboarder.py index bb2935a..30f27d1 100644 --- a/osm_mon/dashboarder/dashboarder.py +++ b/osm_mon/dashboarder/dashboarder.py @@ -24,17 +24,64 @@ import logging import time import socket import peewee +import asyncio from osm_mon.dashboarder.service import DashboarderService from osm_mon.core.config import Config +from osm_mon.core.message_bus_client import MessageBusClient log = logging.getLogger(__name__) class Dashboarder: - def __init__(self, config: Config): + def __init__(self, config: Config, loop=None): self.conf = config self.service = DashboarderService(config) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + self.msg_bus = MessageBusClient(config) + + # run consumer for grafana user management + def run(self): + self.loop.run_until_complete(self.start()) + + async def start(self): + topics = ["users", "project"] + await self.msg_bus.aioread(topics, self._user_msg) + + async def _user_msg(self, topic, key, values): + log.debug("Message from kafka bus received: topic: %s and values: %s and key: %s", topic, values, key) + try: + if topic == "users" and key == "created": + log.debug("Received message from kafka for creating user") + user = values['username'] + self.service.create_grafana_user(user) + elif topic == "users" and key == "deleted": + log.debug("Received message from kafka for deleting user") + user = values['username'] + self.service.delete_grafana_user(user) + log.info("Grafana user deleted: %s", user) + elif topic == "users" and key == "edited": + log.debug("Received message from kafka for associating user to team") + user_id = values["_id"] + project_data = values["changes"]["project_role_mappings"] + self.service.create_grafana_team_member(project_data, user_id) + elif topic == "project" and key == "created": + log.debug("Received message from kafka for creating team") + team_name = values["name"] + self.service.create_grafana_team(team_name) + elif topic == "project" and key == "deleted": + log.debug("Received message from kafka for deleting team") + project_name = values["name"] + self.service.delete_grafana_team(project_name) + elif topic == "project" and key == "edited": + log.debug("Received message from kafka for team name update") + project_old_name = values["original"]["name"] + project_new_name = values["changes"]["name"] + self.service.update_grafana_team(project_new_name, project_old_name) + except Exception: + log.exception("Exception processing message: ") def dashboard_forever(self): log.debug('dashboard_forever') diff --git a/osm_mon/dashboarder/service.py b/osm_mon/dashboarder/service.py index 14ea0b9..2abd6be 100644 --- a/osm_mon/dashboarder/service.py +++ b/osm_mon/dashboarder/service.py @@ -69,13 +69,16 @@ class DashboarderService: constituent_vnfds = nsr['nsd']['constituent-vnfd'] for constituent_vnfd in constituent_vnfds: try: - vnfd = self.common_db.get_vnfd_by_id(constituent_vnfd['vnfd-id-ref']) + vnfd = self.common_db.get_vnfd_by_name(constituent_vnfd['vnfd-id-ref']) # If there are metrics, create dashboard (if exists) - if 'monitoring-param' in vnfd: + if vnfd and 'monitoring-param' in vnfd: if nsr_id not in dashboard_uids: nsr_name = nsr['name'] + project_id = nsr["_admin"]["projects_read"][0] + project_details = self.common_db.get_project(project_id) + project_name = project_details["name"] self.grafana.create_dashboard(nsr_id, nsr_name, - dashboard_path) + dashboard_path, project_name) log.debug('Created dashboard for NS: %s', nsr_id) else: log.debug('Dashboard already exists') @@ -93,3 +96,29 @@ class DashboarderService: log.debug('Deleted obsolete dashboard: %s', dashboard_uid) else: log.debug('All dashboards in use') + + def create_grafana_user(self, user): + self.grafana.create_grafana_users(user) + + def create_grafana_team_member(self, project_data, userid): + user = self.common_db.get_user_by_id(userid) + user_name = user["username"] + proj_list = [] + for project in project_data: + proj_list.append(project["project"]) + for proj in project_data: + role_obj = self.common_db.get_role_by_name(proj["role"]) + is_admin = role_obj["permissions"].get("admin") + self.grafana.create_grafana_teams_members(proj["project"], user_name, is_admin, proj_list) + + def create_grafana_team(self, team_name): + self.grafana.create_grafana_teams(team_name) + + def delete_grafana_user(self, user_name): + self.grafana.delete_grafana_users(user_name) + + def delete_grafana_team(self, project_name): + self.grafana.delete_grafana_team(project_name) + + def update_grafana_team(self, project_new_name, project_old_name): + self.grafana.update_grafana_teams(project_new_name, project_old_name) -- 2.17.1