-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+#!/bin/bash
-# http://www.apache.org/licenses/LICENSE-2.0
+# Copyright 2017 Intel Research and Development Ireland Limited
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+echo "Launching tox"
+TOX_PARALLEL_NO_SPINNER=1 tox --parallel=auto
-OUTPUT=$(TOX_PARALLEL_NO_SPINNER=1 tox --parallel=auto)
-printf "$OUTPUT"
metrics.append(vim_status_metric)
vnfrs = self.common_db.get_vnfrs(vim_account_id=self.vim_account["_id"])
if self.conf.get("collector", "vm_infra_metrics"):
- vm_infra_metrics_enabled = str(self.conf.get("collector", "vm_infra_metrics")).lower() in ("yes", "true", "1")
+ vm_infra_metrics_enabled = str(
+ self.conf.get("collector", "vm_infra_metrics")
+ ).lower() in ("yes", "true", "1")
else:
vm_infra_metrics_enabled = True
if vm_infra_metrics_enabled:
}
try:
vm = self.nova.servers.get(resource_uuid)
- vm_status = (0 if (vm.status == 'ERROR') else 1)
+ vm_status = 0 if (vm.status == "ERROR") else 1
vm_status_metric = Metric(tags, "vm_status", vm_status)
except Exception as e:
log.warning("VM status is not OK: %s" % e)
vimacc_resources = self.vim_account["resources"]
# Compute resources
try:
- com_lim = self.nova.limits.get()._info['absolute']
- if ("compute" in vimacc_resources) \
- and ((vimacc_resources["compute"]["ram"]["total"] != com_lim['maxTotalRAMSize'])
- or (vimacc_resources["compute"]["vcpus"]["total"] != com_lim['maxTotalCores'])
- or (vimacc_resources["compute"]["ram"]["used"] != com_lim['totalRAMUsed'])
- or (vimacc_resources["compute"]["vcpus"]["used"] != com_lim['totalCoresUsed'])
- or (vimacc_resources["compute"]["instances"]["total"] != com_lim['maxTotalInstances'])
- or (vimacc_resources["compute"]["instances"]["used"] != com_lim['totalInstancesUsed'])):
- update_dict = {"resources.compute": {"ram": {"total": com_lim['maxTotalRAMSize'],
- "used": com_lim['totalRAMUsed']},
- "vcpus": {"total": com_lim['maxTotalCores'],
- "used": com_lim['totalCoresUsed']},
- "instances": {"total": com_lim['maxTotalInstances'],
- "used": com_lim['totalInstancesUsed']}}}
- suc_value = self.common_db.set_vim_account(str(self.vim_account['_id']), update_dict)
+ com_lim = self.nova.limits.get()._info["absolute"]
+ if ("compute" in vimacc_resources) and (
+ (
+ vimacc_resources["compute"]["ram"]["total"]
+ != com_lim["maxTotalRAMSize"]
+ )
+ or (
+ vimacc_resources["compute"]["vcpus"]["total"]
+ != com_lim["maxTotalCores"]
+ )
+ or (
+ vimacc_resources["compute"]["ram"]["used"]
+ != com_lim["totalRAMUsed"]
+ )
+ or (
+ vimacc_resources["compute"]["vcpus"]["used"]
+ != com_lim["totalCoresUsed"]
+ )
+ or (
+ vimacc_resources["compute"]["instances"]["total"]
+ != com_lim["maxTotalInstances"]
+ )
+ or (
+ vimacc_resources["compute"]["instances"]["used"]
+ != com_lim["totalInstancesUsed"]
+ )
+ ):
+ update_dict = {
+ "resources.compute": {
+ "ram": {
+ "total": com_lim["maxTotalRAMSize"],
+ "used": com_lim["totalRAMUsed"],
+ },
+ "vcpus": {
+ "total": com_lim["maxTotalCores"],
+ "used": com_lim["totalCoresUsed"],
+ },
+ "instances": {
+ "total": com_lim["maxTotalInstances"],
+ "used": com_lim["totalInstancesUsed"],
+ },
+ }
+ }
+ suc_value = self.common_db.set_vim_account(
+ str(self.vim_account["_id"]), update_dict
+ )
log.info("Compute resources update in mongoDB = %s" % suc_value)
except Exception as e:
log.warning("Error in updating compute resources: %s" % e)
# Volume resources
try:
- vol_lim = self.cinder.limits.get()._info['absolute']
- if ("storage" in vimacc_resources) and\
- ((vimacc_resources["storage"]["volumes"]["total"] != vol_lim['maxTotalVolumes'])
- or (vimacc_resources["storage"]["snapshots"]["total"] != vol_lim['maxTotalSnapshots'])
- or (vimacc_resources["storage"]["volumes"]["used"] != vol_lim['totalVolumesUsed'])
- or (vimacc_resources["storage"]["snapshots"]["used"] != vol_lim['totalSnapshotsUsed'])
- or (vimacc_resources["storage"]["storage"]["total"] != vol_lim['maxTotalVolumeGigabytes'])
- or (vimacc_resources["storage"]["storage"]["used"] != vol_lim['totalGigabytesUsed'])):
- update_dict = {"resources.storage": {"volumes": {"total": vol_lim['maxTotalVolumes'],
- "used": vol_lim['totalVolumesUsed']},
- "snapshots": {"total": vol_lim['maxTotalSnapshots'],
- "used": vol_lim['totalSnapshotsUsed']},
- "storage": {"total": vol_lim['maxTotalVolumeGigabytes'],
- "used": vol_lim['totalGigabytesUsed']}}}
- suc_value = self.common_db.set_vim_account(str(self.vim_account['_id']), update_dict)
+ vol_lim = self.cinder.limits.get()._info["absolute"]
+ if ("storage" in vimacc_resources) and (
+ (
+ vimacc_resources["storage"]["volumes"]["total"]
+ != vol_lim["maxTotalVolumes"]
+ )
+ or (
+ vimacc_resources["storage"]["snapshots"]["total"]
+ != vol_lim["maxTotalSnapshots"]
+ )
+ or (
+ vimacc_resources["storage"]["volumes"]["used"]
+ != vol_lim["totalVolumesUsed"]
+ )
+ or (
+ vimacc_resources["storage"]["snapshots"]["used"]
+ != vol_lim["totalSnapshotsUsed"]
+ )
+ or (
+ vimacc_resources["storage"]["storage"]["total"]
+ != vol_lim["maxTotalVolumeGigabytes"]
+ )
+ or (
+ vimacc_resources["storage"]["storage"]["used"]
+ != vol_lim["totalGigabytesUsed"]
+ )
+ ):
+ update_dict = {
+ "resources.storage": {
+ "volumes": {
+ "total": vol_lim["maxTotalVolumes"],
+ "used": vol_lim["totalVolumesUsed"],
+ },
+ "snapshots": {
+ "total": vol_lim["maxTotalSnapshots"],
+ "used": vol_lim["totalSnapshotsUsed"],
+ },
+ "storage": {
+ "total": vol_lim["maxTotalVolumeGigabytes"],
+ "used": vol_lim["totalGigabytesUsed"],
+ },
+ }
+ }
+ suc_value = self.common_db.set_vim_account(
+ str(self.vim_account["_id"]), update_dict
+ )
log.info("Volume resources update in mongoDB = %s" % suc_value)
except Exception as e:
log.warning("Error in updating volume resources: %s" % e)
# Network resources
try:
net_lim = self.neutron.show_quota_details(self.tenant_id)["quota"]
- if ("network" in vimacc_resources) and\
- ((vimacc_resources["network"]["networks"]["total"] != net_lim["network"]["limit"])
- or (vimacc_resources["network"]["networks"]["used"] != net_lim['network']['used'])
- or (vimacc_resources["network"]["subnets"]["total"] != net_lim['subnet']['limit'])
- or (vimacc_resources["network"]["subnets"]["used"] != net_lim['subnet']['used'])
- or (vimacc_resources["network"]["floating_ips"]["total"] != net_lim['floatingip']['limit'])
- or (vimacc_resources["network"]["floating_ips"]["used"] != net_lim['floatingip']['used'])):
- update_dict = {"resources.network": {"networks": {"total": net_lim['network']['limit'],
- "used": net_lim['network']['used']},
- "subnets": {"total": net_lim['subnet']['limit'],
- "used": net_lim['subnet']['used']},
- "floating_ips": {"total": net_lim['floatingip']['limit'],
- "used": net_lim['floatingip']['used']}}}
- suc_value = self.common_db.set_vim_account(str(self.vim_account['_id']), update_dict)
+ if ("network" in vimacc_resources) and (
+ (
+ vimacc_resources["network"]["networks"]["total"]
+ != net_lim["network"]["limit"]
+ )
+ or (
+ vimacc_resources["network"]["networks"]["used"]
+ != net_lim["network"]["used"]
+ )
+ or (
+ vimacc_resources["network"]["subnets"]["total"]
+ != net_lim["subnet"]["limit"]
+ )
+ or (
+ vimacc_resources["network"]["subnets"]["used"]
+ != net_lim["subnet"]["used"]
+ )
+ or (
+ vimacc_resources["network"]["floating_ips"]["total"]
+ != net_lim["floatingip"]["limit"]
+ )
+ or (
+ vimacc_resources["network"]["floating_ips"]["used"]
+ != net_lim["floatingip"]["used"]
+ )
+ ):
+ update_dict = {
+ "resources.network": {
+ "networks": {
+ "total": net_lim["network"]["limit"],
+ "used": net_lim["network"]["used"],
+ },
+ "subnets": {
+ "total": net_lim["subnet"]["limit"],
+ "used": net_lim["subnet"]["used"],
+ },
+ "floating_ips": {
+ "total": net_lim["floatingip"]["limit"],
+ "used": net_lim["floatingip"]["used"],
+ },
+ }
+ }
+ suc_value = self.common_db.set_vim_account(
+ str(self.vim_account["_id"]), update_dict
+ )
log.info("Network resources update in mongoDB = %s" % suc_value)
except Exception as e:
log.warning("Error in updating network resources: %s" % e)
def _build_neutron_client(self, vim_account: dict) -> tuple:
# sess = OpenstackUtils.get_session(vim_account)
tenant_id = self.vim_session.get_project_id()
- return neutron_client.Client("2", session=self.vim_session, timeout=10), tenant_id
+ return (
+ neutron_client.Client("2", session=self.vim_session, timeout=10),
+ tenant_id,
+ )
metrics.append(vim_status_metric)
vnfrs = self.common_db.get_vnfrs(vim_account_id=vim_account_id)
if self.conf.get("collector", "vm_infra_metrics"):
- vm_infra_metrics_enabled = str(self.conf.get("collector", "vm_infra_metrics")).lower() in ("yes", "true", "1")
+ vm_infra_metrics_enabled = str(
+ self.conf.get("collector", "vm_infra_metrics")
+ ).lower() in ("yes", "true", "1")
else:
vm_infra_metrics_enabled = True
if vm_infra_metrics_enabled:
vim_type = CollectorService._get_vim_type(conf, vim_account_id)
log.debug("vim type.....{}".format(vim_type))
if vim_type in VIM_COLLECTORS:
- collector = VIM_COLLECTORS[vim_type](conf, vim_account_id, vim_sess_map[vim_account_id])
+ collector = VIM_COLLECTORS[vim_type](
+ conf, vim_account_id, vim_sess_map[vim_account_id]
+ )
metrics = collector.collect(vnfr)
log.debug("Collecting vim metrics.....{}".format(metrics))
else:
# Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
# init_session is called to assign the session map to the gloabal vim session map variable
with concurrent.futures.ProcessPoolExecutor(
- self.conf.get("collector", "process_pool_size"), initializer=init_session, initargs=(vim_sess_map,)
+ self.conf.get("collector", "process_pool_size"),
+ initializer=init_session,
+ initargs=(vim_sess_map,),
) as executor:
log.info(
"Started metric collector process pool with pool size %s"
day12ops = ops_config.get("day1-2", [])
for day12op in day12ops:
if day12op and "metrics" in day12op:
- vdur = next(filter(lambda vdur: vdur["vdu-id-ref"] == day12op["id"], vnfr["vdur"]))
+ vdur = next(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] == day12op["id"], vnfr["vdur"]
+ )
+ )
# This avoids errors when vdur records have not been completely filled
if vdur and "name" in vdur:
log.warning(repr(e))
continue
# This avoids errors before application and model is not ready till they are occured
- if vca_deployment_info.get("model") and vca_deployment_info.get("application"):
+ if vca_deployment_info.get("model") and vca_deployment_info.get(
+ "application"
+ ):
measures = self.loop.run_until_complete(
self.n2vc.get_metrics(
vca_deployment_info["model"],
vca_deployment_info["application"],
vca_id=vnfr.get("vca-id"),
- )
)
+ )
log.debug("Measures: %s", measures)
for measure_list in measures.values():
for measure in measure_list:
"disk_read_bytes": "disk.device.read.bytes",
"disk_write_bytes": "disk.device.write.bytes",
"packets_received": "network.incoming.packets",
- "packets_sent": "network.outgoing.packets"
+ "packets_sent": "network.outgoing.packets",
}
METRIC_MULTIPLIERS = {"cpu": 0.0000001}
INSTANCE = "instance"
INTERFACE_ALL = "interface_all"
INTERFACE_ONE = "interface_one"
- INSTANCEDISK = 'instancedisk'
+ INSTANCEDISK = "instancedisk"
class OpenstackCollector(BaseVimCollector):
metric_type, openstack_metric_name, resource_id
)
- if value is None and metric_name in METRIC_MAPPINGS_FOR_ROCKY_AND_NEWER_RELEASES and type(self.backend) is not PrometheusTSBDBackend:
+ if (
+ value is None
+ and metric_name
+ in METRIC_MAPPINGS_FOR_ROCKY_AND_NEWER_RELEASES
+ and type(self.backend) is not PrometheusTSBDBackend
+ ):
# Reattempting metric collection with new metric names.
# Some metric names have changed in newer Openstack releases
log.info(
"Reattempting metric collection for type: %s and name: %s and resource_id %s",
metric_type,
metric_name,
- resource_id
+ resource_id,
+ )
+ openstack_metric_name = (
+ METRIC_MAPPINGS_FOR_ROCKY_AND_NEWER_RELEASES[
+ metric_name
+ ]
)
- openstack_metric_name = METRIC_MAPPINGS_FOR_ROCKY_AND_NEWER_RELEASES[metric_name]
value = self.backend.collect_metric(
metric_type, openstack_metric_name, resource_id
)
def _build_prometheus_client(self, vim_account: dict) -> prometheus_client:
url = vim_account["prometheus-config"]["prometheus_url"]
- return prometheus_client(url, disable_ssl = True)
+ return prometheus_client(url, disable_ssl=True)
def collect_metric(
self, metric_type: MetricType, metric_name: str, resource_id: str
- ):
+ ):
metric = self.query_metric(metric_name, resource_id)
return metric["value"][1] if metric else None
def map_metric(self, metric_name: str):
return self.map[metric_name]
- def query_metric(self, metric_name, resource_id = None):
- metrics = self.client.get_current_metric_value(metric_name = metric_name)
+ def query_metric(self, metric_name, resource_id=None):
+ metrics = self.client.get_current_metric_value(metric_name=metric_name)
if resource_id:
- metric = next(filter(lambda x: resource_id in x["metric"]["resource_id"], metrics))
+ metric = next(
+ filter(lambda x: resource_id in x["metric"]["resource_id"], metrics)
+ )
return metric
return metrics
self.client = self._build_gnocchi_client(vim_account, vim_session)
self.neutron = self._build_neutron_client(vim_account, vim_session)
- def _build_gnocchi_client(self, vim_account: dict, vim_session: object) -> gnocchi_client.Client:
+ def _build_gnocchi_client(
+ self, vim_account: dict, vim_session: object
+ ) -> gnocchi_client.Client:
return gnocchi_client.Client(session=vim_session)
- def _build_neutron_client(self, vim_account: dict, vim_session: object) -> neutron_client.Client:
+ def _build_neutron_client(
+ self, vim_account: dict, vim_session: object
+ ) -> neutron_client.Client:
return neutron_client.Client(session=vim_session)
def collect_metric(
def _collect_instance_disk_metric(self, openstack_metric_name, resource_id):
value = None
instances = self.client.resource.search(
- resource_type='instance_disk',
- query={'=': {'instance_id': resource_id}},
+ resource_type="instance_disk",
+ query={"=": {"instance_id": resource_id}},
)
for instance in instances:
try:
measures = self.client.metric.get_measures(
- openstack_metric_name, resource_id=instance['id'], limit=1
+ openstack_metric_name, resource_id=instance["id"], limit=1
)
if measures:
value = measures[-1][2]
except gnocchiclient.exceptions.NotFound as e:
- log.debug("No metric %s found for instance disk %s: %s", openstack_metric_name,
- instance['id'], e)
+ log.debug(
+ "No metric %s found for instance disk %s: %s",
+ openstack_metric_name,
+ instance["id"],
+ e,
+ )
return value
def _collect_instance_metric(self, openstack_metric_name, resource_id):
def __init__(self, vim_account: dict, vim_session: object):
self.client = self._build_ceilometer_client(vim_account, vim_session)
- def _build_ceilometer_client(self, vim_account: dict, vim_session: object) -> ceilometer_client.Client:
+ def _build_ceilometer_client(
+ self, vim_account: dict, vim_session: object
+ ) -> ceilometer_client.Client:
return ceilometer_client.Client("2", session=vim_session)
def collect_metric(
def set_vim_account(self, vim_account_id: str, update_dict: dict) -> bool:
try:
# Set vim_account resources in mongo
- self.common_db.set_one('vim_accounts', {"_id": vim_account_id}, update_dict)
+ self.common_db.set_one("vim_accounts", {"_id": vim_account_id}, update_dict)
# self.common_db.set_one('vim_accounts', {"name": "test-vim"}, update_dict)
return True
except Exception:
return alarms
def update_alarm_status(self, alarm_state: str, uuid):
- modified_count = self.common_db.set_one("alarms", {"uuid": uuid}, {"alarm_status": alarm_state})
+ modified_count = self.common_db.set_one(
+ "alarms", {"uuid": uuid}, {"alarm_status": alarm_state}
+ )
return modified_count
def get_alarm_by_uuid(self, uuid: str):
log.debug("Searching for dashboard result: %s", response.text)
return response
- def create_dashboard(self, uid, name, json_file, project_name=None, datasource_name=None):
+ def create_dashboard(
+ self, uid, name, json_file, project_name=None, datasource_name=None
+ ):
try:
with open(json_file) as f:
dashboard_data = f.read()
"OSM_NAME", name
)
if datasource_name:
- dashboard_data = dashboard_data.replace("OSM_DATASOURCE_NAME", datasource_name)
+ dashboard_data = dashboard_data.replace(
+ "OSM_DATASOURCE_NAME", datasource_name
+ )
dashboard_json_data = json.loads(dashboard_data)
# Get folder id
if project_name:
"url": datasource_url,
"access": "proxy",
"readOnly": False,
- "basicAuth": False
+ "basicAuth": False,
}
response = requests.request(
"POST",
def delete_datasource(self, datasource_name):
response = requests.request(
- "DELETE", self.url + "/api/datasources/name/" + datasource_name, headers=self.headers
+ "DELETE",
+ self.url + "/api/datasources/name/" + datasource_name,
+ headers=self.headers,
)
log.debug("Datasource %s deleted from Grafana", datasource_name)
return response
k8scluster_id = k8scluster["_id"]
k8scluster_name = k8scluster["name"]
osm_resource_uids.append(k8scluster_id)
- osm_datasource_names.append("{}-{}".format(datasource_name_substr, k8scluster_name))
+ osm_datasource_names.append(
+ "{}-{}".format(datasource_name_substr, k8scluster_name)
+ )
if k8scluster_id not in dashboard_uids:
projects_read = k8scluster["_admin"]["projects_read"]
if len(projects_read) and projects_read[0] == project_id:
# Collect K8S Cluster IDs for periodical dashboard clean-up
- k8scluster_address = k8scluster["credentials"]["clusters"][0]["cluster"]["server"]
+ k8scluster_address = k8scluster["credentials"]["clusters"][0][
+ "cluster"
+ ]["server"]
# Extract K8S Cluster ip from url
- k8scluster_ip = re.findall(r'://([\w\-\.]+)', k8scluster_address)[0]
+ k8scluster_ip = re.findall(
+ r"://([\w\-\.]+)", k8scluster_address
+ )[0]
# prometheus-operator url
- datasource_url = "http://{}:{}".format(k8scluster_ip, prom_operator_port)
+ datasource_url = "http://{}:{}".format(
+ k8scluster_ip, prom_operator_port
+ )
# Create datsource for prometheus-operator in grafana
datasource_type = "prometheus"
- datasource_name = "{}-{}".format(datasource_name_substr, k8scluster_name)
+ datasource_name = "{}-{}".format(
+ datasource_name_substr, k8scluster_name
+ )
if datasource_name not in datasource_names:
- self.grafana.create_datasource(datasource_name, datasource_type, datasource_url)
- log.debug("Created datasource for k8scluster: %s", k8scluster_id)
+ self.grafana.create_datasource(
+ datasource_name, datasource_type, datasource_url
+ )
+ log.debug(
+ "Created datasource for k8scluster: %s", k8scluster_id
+ )
if project["name"] != "admin":
self.grafana.create_dashboard(
- k8scluster_id, k8scluster_name, cnf_dashboard_path, project_name=project["name"],
- datasource_name=datasource_name)
+ k8scluster_id,
+ k8scluster_name,
+ cnf_dashboard_path,
+ project_name=project["name"],
+ datasource_name=datasource_name,
+ )
else:
self.grafana.create_dashboard(
- k8scluster_id, k8scluster_name, cnf_dashboard_path, datasource_name=datasource_name)
+ k8scluster_id,
+ k8scluster_name,
+ cnf_dashboard_path,
+ datasource_name=datasource_name,
+ )
log.debug("Created dashboard for k8scluster: %s", k8scluster_id)
else:
- log.debug("Dashboard already exist for k8scluster: %s", k8scluster_id)
+ log.debug(
+ "Dashboard already exist for k8scluster: %s", k8scluster_id
+ )
# Reads existing NS list and creates a dashboard for each
# TODO lavado: only create for ACTIVE NSRs
# If there are metrics, create dashboard (if exists)
if vnfd.get("vdu"):
vdu_found = find_in_list(
- vnfd.get("vdu"), lambda a_vdu: "monitoring-parameter" in a_vdu
+ vnfd.get("vdu"),
+ lambda a_vdu: "monitoring-parameter" in a_vdu,
)
else:
vdu_found = None
log.info("Project %s not found", project_id)
log.debug("Exception %s" % e)
self.grafana.create_dashboard(
- nsr_id, nsr_name, dashboard_path, project_name=project_name
+ nsr_id,
+ nsr_name,
+ dashboard_path,
+ project_name=project_name,
)
log.debug("Created dashboard for NS: %s", nsr_id)
else:
# without side-effects.
log.info("Setting up MON Exporter in prometheus")
result = self.common_db.common_db.set_one(
- "prometheus_jobs", {"job_name": "mon_exporter"}, {
- "job_name": "mon_exporter",
- "static_configs": [
- {
- "targets": [
- "mon:8000"
- ]
- }
- ]
- },
+ "prometheus_jobs",
+ {"job_name": "mon_exporter"},
+ {"job_name": "mon_exporter", "static_configs": [{"targets": ["mon:8000"]}]},
fail_on_empty=False,
- upsert=True
+ upsert=True,
)
log.info("Prometheus Jobs added > {}".format(result))
self.mock_db = mock_db
mock_vim_session = mock.Mock()
mock_get_vim_account.return_value = VIM_ACCOUNT
- self.collector = VIOCollector(Config(), "9de6df67-b820-48c3-bcae-ee4838c5c5f4", mock_vim_session)
+ self.collector = VIOCollector(
+ Config(), "9de6df67-b820-48c3-bcae-ee4838c5c5f4", mock_vim_session
+ )
with open(
os.path.join(os.path.dirname(__file__), "osm_mocks", "VNFR.json"), "r"
) as f:
@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
@mock.patch.object(dbmongo.DbMongo, "create")
def test_create_alarm(self, create):
- alarm = Alarm("name", "severity", 50.0, "operation", "statistic", "metric", "scale_out", {}, "ok")
+ alarm = Alarm(
+ "name",
+ "severity",
+ 50.0,
+ "operation",
+ "statistic",
+ "metric",
+ "scale_out",
+ {},
+ "ok",
+ )
alarm.uuid = "1"
common_db_client = CommonDbClient(self.config)
common_db_client.create_alarm(alarm)
def test_build_headers(self):
prometheus = PrometheusBackend(self.config)
headers = prometheus._build_headers()
- self.assertEqual(headers, {'Authorization': 'Basic YWRtaW46YWRtaW4='})
+ self.assertEqual(headers, {"Authorization": "Basic YWRtaW46YWRtaW4="})
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
+parallel_show_output = true
#######################################################################################
[testenv:black]
deps = black
skip_install = true
commands =
- - black --check --diff osm_mon/
- - black --check --diff setup.py
+ black --check --diff osm_mon/
+ black --check --diff setup.py
#######################################################################################
-r{toxinidir}/requirements-test.txt
pylint
commands =
- pylint -E osm_mon
+ pylint -E osm_mon
#######################################################################################