Cleanup of grafana users
[osm/MON.git] / osm_mon / dashboarder / backends / grafana.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: glavado@whitestack.com or fbravo@whitestack.com
22 ##
23 import logging
24 import requests
25 import base64
26 import json
27 from osm_mon.core.config import Config
28
29 log = logging.getLogger(__name__)
30
31
32 class GrafanaBackend:
33 def __init__(self, config: Config):
34 self.conf = config
35 self.url = config.get("grafana", "url")
36 grafana_user = config.get("grafana", "user")
37 grafana_password = config.get("grafana", "password")
38 self.headers = {
39 "content-type": "application/json",
40 "authorization": "Basic %s"
41 % base64.b64encode(
42 (grafana_user + ":" + grafana_password).encode("utf-8")
43 ).decode(),
44 }
45
46 def get_all_dashboard_uids(self):
47 # Gets only dashboards that were automated by OSM (with tag 'osm_automated')
48 response = requests.request(
49 "GET", self.url + "/api/search?tag=osm_automated", headers=self.headers
50 )
51 dashboards = response.json()
52 dashboard_uids = []
53 for dashboard in dashboards:
54 dashboard_uids.append(dashboard["uid"])
55 log.debug("Searching for all dashboard uids: %s", dashboard_uids)
56 return dashboard_uids
57
58 def get_dashboard_status(self, uid):
59 response = requests.request(
60 "GET", self.url + "/api/dashboards/uid/" + uid, headers=self.headers
61 )
62 log.debug("Searching for dashboard result: %s", response.text)
63 return response
64
65 def create_dashboard(self, uid, name, json_file, project_name=None):
66 try:
67 with open(json_file) as f:
68 dashboard_data = f.read()
69
70 dashboard_data = dashboard_data.replace("OSM_ID", uid).replace(
71 "OSM_NAME", name
72 )
73 dashboard_json_data = json.loads(dashboard_data)
74 # Get folder id
75 if project_name:
76 folder_name = project_name
77 else:
78 folder_name = name
79 response_folder_id = requests.request(
80 "GET",
81 self.url + "/api/folders/{}".format(folder_name),
82 headers=self.headers,
83 )
84 if response_folder_id.status_code == 200:
85 folder_id = json.loads(response_folder_id.text)["id"]
86 dashboard_json_data["folderId"] = folder_id
87 dashboard_json_data["overwrite"] = False
88
89 response = self.send_request_for_creating_dashboard(dashboard_json_data)
90
91 # Admin dashboard will be created if already exists. Rest will remain same.
92 if json.loads(response.text).get("status") == "name-exists":
93 # Delete any previous project-admin dashboard if it already exist.
94 if name == "admin":
95 self.delete_admin_dashboard()
96 response = self.send_request_for_creating_dashboard(
97 dashboard_json_data
98 )
99 else:
100 return
101
102 # Get team id
103 if project_name is not None:
104 name = project_name
105 response_team = requests.request(
106 "GET",
107 self.url + "/api/teams/search?name={}".format(name),
108 headers=self.headers,
109 )
110
111 # Remove default permissions of admin user's dashboard so that it is not visible to non-admin users
112 if len(json.loads(response_team.text)["teams"]) == 0:
113 # As team information is not available so it is admin user
114 dahboard_id = json.loads(response.text)["id"]
115 requests.request(
116 "POST",
117 self.url + "/api/dashboards/id/{}/permissions".format(dahboard_id),
118 headers=self.headers,
119 )
120
121 log.info("Dashboard %s is created in Grafana", name)
122 return response
123 except Exception:
124 log.exception("Exception processing message: ")
125
126 def send_request_for_creating_dashboard(self, dashboard_data):
127 response = requests.request(
128 "POST",
129 self.url + "/api/dashboards/db/",
130 data=json.dumps(dashboard_data),
131 headers=self.headers,
132 )
133 return response
134
135 def delete_dashboard(self, uid):
136 response = requests.request(
137 "DELETE", self.url + "/api/dashboards/uid/" + uid, headers=self.headers
138 )
139 log.debug("Dashboard %s deleted from Grafana", uid)
140 return response
141
142 def delete_admin_dashboard(self):
143 requests.request(
144 "DELETE",
145 self.url + "/api/dashboards/db/osm-project-status-admin",
146 headers=self.headers,
147 )
148 log.debug("Dashboard osm-project-status-admin deleted from Grafana")
149
150 def create_grafana_users(self, user):
151 email = "{}@osm.etsi.org".format(user)
152 user_payload = {
153 "name": user,
154 "email": email,
155 "login": user,
156 "password": user,
157 }
158 response_users = requests.request(
159 "POST",
160 self.url + "/api/admin/users/",
161 json=user_payload,
162 headers=self.headers,
163 )
164 json_data = json.loads(response_users.text)
165 url = "/api/org/users/{}/".format(json_data["id"])
166 permission_payload = {
167 "role": "Editor",
168 }
169 requests.request(
170 "PATCH", self.url + url, json=permission_payload, headers=self.headers
171 )
172 log.info("New user %s created in Grafana", user)
173 return response_users
174
175 # Get Grafana users
176 def get_grafana_users(self):
177 response_users = requests.request(
178 "GET",
179 self.url + "/api/users",
180 headers=self.headers,
181 )
182 user_list = []
183 users = json.loads(response_users.text)
184 for user in users:
185 if user["name"] and user["name"] != "admin":
186 user_list.append(user["name"])
187 return user_list
188
189 # Create Grafana team with member
190 def create_grafana_teams_members(
191 self, project_name, user_name, is_admin, proj_list
192 ):
193 # Check if user exist in Grafana
194 user_response = requests.request(
195 "GET",
196 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
197 headers=self.headers,
198 )
199 user_obj = json.loads(user_response.text)
200 if user_response.status_code != 200:
201 user_response = self.create_grafana_users(user_name)
202 user_obj = json.loads(user_response.text)
203
204 user_id = user_obj["id"]
205
206 # Get teams for user
207 team_objs = requests.request(
208 "GET",
209 self.url + "/api/users/{}/teams".format(user_id),
210 headers=self.headers,
211 )
212 team_obj = json.loads(team_objs.text)
213 team_list = []
214 if len(team_obj):
215 for team in team_obj:
216 team_list.append(team["name"])
217
218 proj_unlink = set(team_list) - set(proj_list)
219 for prj in proj_unlink:
220 response_team = requests.request(
221 "GET",
222 self.url + "/api/teams/search?name={}".format(prj),
223 headers=self.headers,
224 )
225 team_id = json.loads(response_team.text)["teams"][0]["id"]
226 requests.request(
227 "DELETE",
228 self.url + "/api/teams/{}/members/{}".format(team_id, user_id),
229 headers=self.headers,
230 )
231 if project_name != "admin":
232 # Add member to team
233 response_team = requests.request(
234 "GET",
235 self.url + "/api/teams/search?name={}".format(project_name),
236 headers=self.headers,
237 )
238
239 # Search if team in Grafana corresponding to the project already exists
240 if not json.loads(response_team.text)["teams"]:
241 self.create_grafana_teams(project_name)
242 response_team = requests.request(
243 "GET",
244 self.url + "/api/teams/search?name={}".format(project_name),
245 headers=self.headers,
246 )
247 team_id = json.loads(response_team.text)["teams"][0]["id"]
248 if project_name not in team_list:
249 # Create a team in Grafana corresponding to the project as it doesn't exist
250 member_payload = {"userId": user_id}
251 requests.request(
252 "POST",
253 self.url + "/api/teams/{}/members".format(team_id),
254 json=member_payload,
255 headers=self.headers,
256 )
257 # Check if user role or project name is admin
258 if is_admin or project_name == "admin":
259 # Give admin righsts to user
260 url = "/api/org/users/{}/".format(user_id)
261 permission_payload = {
262 "role": "Admin",
263 }
264 requests.request(
265 "PATCH", self.url + url, json=permission_payload, headers=self.headers
266 )
267 log.info("User %s is assigned Admin permission", user_name)
268 else:
269 # Give editor rights to user
270 url = "/api/org/users/{}/".format(user_id)
271 permission_payload = {
272 "role": "Editor",
273 }
274 requests.request(
275 "PATCH", self.url + url, json=permission_payload, headers=self.headers
276 )
277 log.info("User %s is assigned Editor permission", user_name)
278
279 # Create team in Grafana
280 def create_grafana_teams(self, team_name):
281 team_payload = {
282 "name": team_name,
283 }
284 requests.request(
285 "POST", self.url + "/api/teams", json=team_payload, headers=self.headers
286 )
287 log.info("New team %s created in Grafana", team_name)
288
289 # Create folder in Grafana
290 def create_grafana_folders(self, folder_name):
291 folder_payload = {"uid": folder_name, "title": folder_name}
292 requests.request(
293 "POST", self.url + "/api/folders", json=folder_payload, headers=self.headers
294 )
295 log.info("Dashboard folder %s created", folder_name)
296
297 response_team = requests.request(
298 "GET",
299 self.url + "/api/teams/search?name={}".format(folder_name),
300 headers=self.headers,
301 )
302 # Create team if it doesn't already exists
303 if len(json.loads(response_team.text)["teams"]) == 0:
304 self.create_grafana_teams(folder_name)
305 response_team = requests.request(
306 "GET",
307 self.url + "/api/teams/search?name={}".format(folder_name),
308 headers=self.headers,
309 )
310 # Assign required permission to the team's folder
311 team_id = json.loads(response_team.text)["teams"][0]["id"]
312 permission_data = {
313 "items": [
314 {"teamId": team_id, "permission": 2},
315 ]
316 }
317 requests.request(
318 "POST",
319 self.url + "/api/folders/{}/permissions".format(folder_name),
320 json=permission_data,
321 headers=self.headers,
322 )
323
324 # delete user from grafana
325 def delete_grafana_users(self, user_name):
326 # Get user id
327 response_id = requests.request(
328 "GET",
329 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
330 headers=self.headers,
331 )
332 try:
333 user_id = json.loads(response_id.text)["id"]
334 except Exception:
335 log.exception("Exception processing message: ")
336 # Delete user
337 response = requests.request(
338 "DELETE",
339 self.url + "/api/admin/users/{}".format(user_id),
340 headers=self.headers,
341 )
342 log.info("User %s deleted in Grafana", user_name)
343 return response
344
345 # delete team from grafana
346 def delete_grafana_team(self, project_name):
347 # Delete Grafana folder
348 requests.request(
349 "DELETE",
350 self.url + "/api/folders/{}".format(project_name),
351 headers=self.headers,
352 )
353 # Delete Grafana team
354 team_obj = requests.request(
355 "GET",
356 self.url + "/api/teams/search?name={}".format(project_name),
357 headers=self.headers,
358 )
359 team_id = json.loads(team_obj.text)["teams"][0]["id"]
360 response = requests.request(
361 "DELETE", self.url + "/api/teams/{}".format(team_id), headers=self.headers
362 )
363 log.info("Team %s deleted in Grafana", project_name)
364 return response
365
366 # update grafana team
367 def update_grafana_teams(self, project_new_name, project_old_name):
368 team_obj = requests.request(
369 "GET",
370 self.url + "/api/teams/search?name={}".format(project_old_name),
371 headers=self.headers,
372 )
373 team_id = json.loads(team_obj.text)["teams"][0]["id"]
374 data = {
375 "name": project_new_name,
376 }
377 response = requests.request(
378 "PUT",
379 self.url + "/api/teams/{}".format(team_id),
380 json=data,
381 headers=self.headers,
382 )
383 log.info("Grafana team updated %s", response.text)
384 return response
385
386 # remove member from grafana team
387 def remove_grafana_team_member(self, user_name, project_data):
388 # Get user id
389 response_id = requests.request(
390 "GET",
391 self.url + "/api/users/lookup?loginOrEmail={}".format(user_name),
392 headers=self.headers,
393 )
394 user_id = json.loads(response_id.text)["id"]
395 for project in project_data:
396 # Get team id
397 team_obj = requests.request(
398 "GET",
399 self.url + "/api/teams/search?name={}".format(project["project"]),
400 headers=self.headers,
401 )
402 team_id = json.loads(team_obj.text)["teams"][0]["id"]
403 response = requests.request(
404 "DELETE",
405 self.url + "/api/teams/{}/members/{}".format(team_id, user_id),
406 headers=self.headers,
407 )
408 return response