2 Copyright (c) 2015 SONATA-NFV and Paderborn University
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
30 Monitor the skewness of the resource usage probability distribution
31 and export to a Prometheus Push Gateway
32 (c) 2017 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
37 from time
import sleep
, time
, perf_counter
39 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
40 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
46 LOG
= logging
.getLogger('skewmon')
47 LOG
.setLevel(level
=logging
.DEBUG
)
48 LOG
.addHandler(logging
.StreamHandler())
51 # put env vars in Dockerfile
52 PUSHGATEWAY_IP
= '172.17.0.1'
53 PUSHGATEWAY_PORT
= 9091
54 PUSHGATEWAY_ADDR
= ':'.join([PUSHGATEWAY_IP
, str(PUSHGATEWAY_PORT
)])
57 #general settings (ms)
58 SAMPLE_PERIOD
= int(os
.environ
['SAMPLE_PERIOD'])
59 TOTAL_PERIOD
= int(os
.environ
['TOTAL_PERIOD'])
61 # define global variables
62 registry
= CollectorRegistry()
63 exported_metric
= Gauge('skewness', 'Skewness of docker vnf resource usage',
64 ['vnf_id', 'vnf_name', 'vnf_metric'], registry
=registry
)
66 # find the VNFs to monitor
67 # {metric_shortId: {VNF_NAME:<>,VNF_ID:<>,VNF_METRIC:<>}}
69 def get_vnfs_to_monitor(config
):
71 vnf_name
= config
[key
].get('VNF_NAME')
72 vnf_id
= config
[key
].get('VNF_ID')
73 vnf_metric
= config
[key
].get('VNF_METRIC')
74 yield (vnf_id
, vnf_name
, vnf_metric
)
76 # export metric to the Prometheus PushGateway
77 def export_metrics(key
=None):
79 pushadd_to_gateway(PUSHGATEWAY_ADDR
, job
='sonemu-skewmon', registry
=registry
, grouping_key
=key
)
80 except Exception as e
:
81 LOG
.warning("Pushgateway not reachable: {0}".format(str(e
)))
83 class skewness_monitor():
84 def __init__(self
, docker_id
, docker_name
, metric
):
85 # Prometheus metric to export
86 self
.prom_skewness
= exported_metric
87 self
.docker_id
= docker_id
88 self
.docker_name
= docker_name
89 self
.vnf_metric
= metric
91 # https://www.datadoghq.com/blog/how-to-collect-docker-metrics/
92 self
.cpu_proc_file
= '/sys/fs/cgroup/cpuacct/docker/{0}/cpuacct.usage'.format(self
.docker_id
)
93 self
.mem_proc_file
= '/sys/fs/cgroup/memory/docker/{0}/memory.usage_in_bytes'.format(self
.docker_id
)
94 metric_dict
= {'cpu': self
.cpu_proc_file
,
95 'mem': self
.mem_proc_file
}
97 self
.proc_file
= metric_dict
[metric
]
99 self
.fp
= open(self
.proc_file
)
102 self
.export_thread
= None
103 self
.monitor_stop
= threading
.Event()
105 # get statistics with certain frequency and export skewness for further analysis
106 def _calc_skewness(self
):
112 stat_delta
= SAMPLE_PERIOD
113 sample_T
= TOTAL_PERIOD
125 for n
in range(0,round(sample_T
/stat_delta
)):
127 if cpu_count0
<= 0 or time0
<= 0:
128 time0
= perf_counter()
129 cpu_count0
= int(fp
.read().strip())
131 sleep(stat_delta
/1000)
135 #perf_counter in seconds
136 time1
= perf_counter()
138 # cpu count in nanoseconds
139 cpu_count1
= int(fp
.read().strip())
142 cpu_delta
= cpu_count1
- cpu_count0
143 cpu_count0
= cpu_count1
145 time_delta
= time1
- time0
149 metric
= (cpu_delta
/ (time_delta
* 1e9
))
153 #running calculation of sample moments
155 temp
= metric
* metric
157 moment3
+= temp
* metric
160 sleep(stat_delta
/1000)
163 M1
= (1 / n
) * moment1
164 M2
= ((1 / n
) * moment2
) - M1
**2
165 M3
= ((1 / n
) * moment3
) - (3 * M1
* ((1 / n
) * moment2
)) + (2 * M1
**3)
167 s2
= (math
.sqrt(n
*(n
- 1))/(n
- 2)) * (M3
/ (M2
)**1.5)
169 LOG
.info("docker_name: {0} metric: {1}".format(self
.docker_name
, self
.vnf_metric
))
170 LOG
.info("Nsamples: {0}".format(n
))
171 LOG
.info("skewness: {0:.2f}".format(s2
))
176 def _export_skewness_loop(self
, stop_event
):
177 #loop until flag is set
178 while(not stop_event
.is_set()):
180 skewness
= self
._calc
_skewness
()
181 self
.prom_skewness
.labels(vnf_id
=self
.docker_id
, vnf_name
=self
.docker_name
, vnf_metric
=self
.vnf_metric
)\
183 except ZeroDivisionError as e
:
184 self
.prom_skewness
.labels(vnf_id
=self
.docker_id
, vnf_name
=self
.docker_name
, vnf_metric
=self
.vnf_metric
) \
186 LOG
.warning("{1}: Skewness cannot be calculated: {0}".format(str(e
), self
.docker_name
))
187 except Exception as e
:
188 LOG
.warning("Skewness cannot be calculated, stop thread: {0}".format(str(e
)))
189 self
.monitor_stop
.set()
191 # if while has ended, monitoring thread will stop
192 self
.prom_skewness
.labels(vnf_id
=self
.docker_id
, vnf_name
=self
.docker_name
, vnf_metric
=self
.vnf_metric
) \
195 #start the monitoring thread
197 if self
.export_thread
is not None:
198 LOG
.warning('monitor thread is already running for: {0}'.format(self
.docker_name
))
201 self
.export_thread
= threading
.Thread(target
=self
._export
_skewness
_loop
, args
=(self
.monitor_stop
,))
202 self
.export_thread
.start()
203 LOG
.info('started thread: {0}'.format(self
.docker_name
))
205 #stop the monitoring thread
207 self
.monitor_stop
.set()
210 if __name__
== '__main__':
212 #started_vnfs {vnf_id:object}
217 #check config.txt for docker ids/names
218 configfile
= open('/config.txt', 'r')
219 config
= json
.load(configfile
)
220 vnfs_to_monitor
= list(get_vnfs_to_monitor(config
))
222 #for each new docker id in ENV start thread to monitor skewness
223 for vnf_id
, vnf_name
, vnf_metric
in vnfs_to_monitor
:
224 key
= '_'.join([vnf_metric
, vnf_id
])
225 if key
not in vnfs_monitored
:
227 vnfs_monitored
[key
] = skewness_monitor(vnf_id
, vnf_name
, vnf_metric
)
228 vnfs_monitored
[key
].start()
229 except Exception as e
:
230 LOG
.warning("Monitor cannot be started: {0}".format(str(e
)))
232 #for each removed docker id ENV, stop export
233 for vnf_key
in list(vnfs_monitored
):
234 vnf_keys_to_monitor
= ['_'.join([vnf_metric
, vnf_id
]) for vnf_id
, vnf_name
, vnf_metric
in vnfs_to_monitor
]
235 if vnf_key
not in vnf_keys_to_monitor
:
236 vnfs_monitored
[vnf_key
].stop()
238 vnf_name
= vnfs_monitored
[vnf_key
].docker_name
239 vnf_metric
, vnf_id
= vnf_key
.split('_')
240 LOG
.info('stop monitored VNFs: {0}'.format(vnfs_monitored
[vnf_key
].docker_name
))
241 del vnfs_monitored
[vnf_key
]
243 # remove metric with labels from registry
244 # (Push Gateway remembers last pushed value, so this is not so useful)
245 # collector = registry._names_to_collectors['skewness']
246 # if (vnf_id, vnf_name, vnf_metric) in collector._metrics:
247 # collector.remove(vnf_id, vnf_name, vnf_metric)
248 delete_from_gateway(PUSHGATEWAY_ADDR
, job
='sonemu-skewmon')
251 #push to Prometheus gateway
253 LOG
.info('monitored VNFs: {0}'.format([monitor
.docker_name
for key
, monitor
in vnfs_monitored
.items()]))
254 # wait before checking again
255 sleep(TOTAL_PERIOD
/1000)