class BaseOpenStackInfraCollector(BaseVimInfraCollector):
def __init__(self, config: Config, vim_account_id: str):
super().__init__(config, vim_account_id)
- self.keystone = self._build_keystone_client(vim_account_id)
- self.nova = self._build_nova_client(vim_account_id)
- self.vim_account_id = vim_account_id
+ self.conf = config
self.common_db = CommonDbClient(config)
+ self.vim_account = self.common_db.get_vim_account(vim_account_id)
+ self.keystone = self._build_keystone_client(self.vim_account)
+ self.nova = self._build_nova_client(self.vim_account)
def collect(self) -> List[Metric]:
metrics = []
vim_status = self.is_vim_ok()
- vim_status_metric = Metric({'vim_account_id': self.vim_account_id}, 'vim_status', vim_status)
+ vim_status_metric = Metric({'vim_account_id': self.vim_account['_id']}, 'vim_status', vim_status)
metrics.append(vim_status_metric)
- vnfrs = self.common_db.get_vnfrs(vim_account_id=self.vim_account_id)
+ vnfrs = self.common_db.get_vnfrs(vim_account_id=self.vim_account['_id'])
for vnfr in vnfrs:
nsr_id = vnfr['nsr-id-ref']
vnf_member_index = vnfr['member-vnf-index-ref']
continue
resource_uuid = vdur['vim-id']
tags = {
- 'vim_account_id': self.vim_account_id,
+ 'vim_account_id': self.vim_account['_id'],
'resource_uuid': resource_uuid,
'nsr_id': nsr_id,
'vnf_member_index': vnf_member_index,
log.warning("VIM status is not OK: %s" % e)
return False
- def _build_keystone_client(self, vim_account_id) -> keystone_client.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_keystone_client(self, vim_account: dict) -> keystone_client.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return keystone_client.Client(session=sess)
- def _build_nova_client(self, vim_account_id) -> nova_client_v2.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_nova_client(self, vim_account: dict) -> nova_client_v2.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return nova_client.Client("2", session=sess)
# contact: osslegalrouting@vmware.com
##
-import json
import logging
from typing import List
from xml.etree import ElementTree as XmlElementTree
from osm_mon.collector.infra_collectors.base_vim import BaseVimInfraCollector
from osm_mon.collector.metric import Metric
-from osm_mon.collector.utils.collector import CollectorUtils
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
return - dict with vim account details
"""
vim_account = {}
- vim_account_info = CollectorUtils.get_credentials(vim_account_id)
-
- vim_account['name'] = vim_account_info.name
- vim_account['vim_tenant_name'] = vim_account_info.tenant_name
- vim_account['vim_type'] = vim_account_info.type
- vim_account['vim_url'] = vim_account_info.url
- vim_account['org_user'] = vim_account_info.user
- vim_account['org_password'] = vim_account_info.password
- vim_account['vim_uuid'] = vim_account_info.uuid
-
- vim_config = json.loads(vim_account_info.config)
+ vim_account_info = self.common_db.get_vim_account(vim_account_id)
+
+ vim_account['name'] = vim_account_info['name']
+ vim_account['vim_tenant_name'] = vim_account_info['vim_tenant_name']
+ vim_account['vim_type'] = vim_account_info['vim_type']
+ vim_account['vim_url'] = vim_account_info['vim_url']
+ vim_account['org_user'] = vim_account_info['vim_user']
+ vim_account['org_password'] = self.common_db.decrypt_vim_password(vim_account_info['vim_password'],
+ vim_account_info['schema_version'],
+ vim_account_id)
+ vim_account['vim_uuid'] = vim_account_info['_id']
+
+ vim_config = vim_account_info['config']
vim_account['admin_username'] = vim_config['admin_username']
- vim_account['admin_password'] = vim_config['admin_password']
+ vim_account['admin_password'] = self.common_db.decrypt_vim_password(vim_config['admin_password'],
+ vim_account_info['schema_version'],
+ vim_account_id)
if vim_config['orgname'] is not None:
vim_account['orgname'] = vim_config['orgname']
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
import logging
import multiprocessing
from typing import List
from osm_mon.collector.infra_collectors.onos import OnosInfraCollector
from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
-from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector
from osm_mon.collector.infra_collectors.vio import VIOInfraCollector
+from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector
from osm_mon.collector.metric import Metric
-from osm_mon.collector.utils.collector import CollectorUtils
from osm_mon.collector.vnf_collectors.juju import VCACollector
from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
from osm_mon.collector.vnf_collectors.vio import VIOCollector
def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
# TODO(diazb) Add support for aws
- vim_type = CollectorUtils.get_vim_type(vim_account_id)
+ common_db = CommonDbClient(self.conf)
+ vim_type = common_db.get_vim_account(vim_account_id)['vim_type']
if vim_type in VIM_COLLECTORS:
collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
metrics = collector.collect(vnfr)
log.debug("vimtype %s is not supported.", vim_type)
def _collect_vim_infra_metrics(self, vim_account_id: str):
- vim_type = CollectorUtils.get_vim_type(vim_account_id)
+ common_db = CommonDbClient(self.conf)
+ vim_type = common_db.get_vim_account(vim_account_id)['vim_type']
if vim_type in VIM_INFRA_COLLECTORS:
collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
metrics = collector.collect()
+++ /dev/null
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import json
-
-from osm_mon.collector.metric import Metric
-from osm_mon.core import database
-from osm_mon.core.database import VimCredentials, VimCredentialsRepository
-
-
-class CollectorUtils(Metric):
-
- @staticmethod
- def get_vim_type(vim_account_id) -> str:
- credentials = CollectorUtils.get_credentials(vim_account_id)
- config = json.loads(credentials.config)
- if 'vim_type' in config:
- vim_type = config['vim_type']
- return str(vim_type.lower())
- else:
- return str(credentials.type)
-
- @staticmethod
- def get_credentials(vim_account_id) -> VimCredentials:
- database.db.connect()
- try:
- with database.db.atomic():
- return VimCredentialsRepository.get(VimCredentials.uuid == vim_account_id)
- finally:
- database.db.close()
-
- @staticmethod
- def is_verify_ssl(vim_credentials: VimCredentials):
- vim_config = json.loads(vim_credentials.config)
- return 'insecure' not in vim_config or vim_config['insecure'] is False
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import json
from keystoneauth1 import session
from keystoneauth1.identity import v3
-from osm_mon.collector.utils.collector import CollectorUtils
-
class OpenstackUtils:
@staticmethod
- def get_session(vim_account_id: str):
- creds = CollectorUtils.get_credentials(vim_account_id)
- verify_ssl = CollectorUtils.is_verify_ssl(creds)
- vim_config = json.loads(creds.config)
+ def get_session(creds: dict):
+ verify_ssl = False if 'insecure' in creds['config'] and creds['config']['insecure'] else True
+ vim_config = creds['config']
project_domain_name = 'Default'
user_domain_name = 'Default'
if 'project_domain_name' in vim_config:
project_domain_name = vim_config['project_domain_name']
if 'user_domain_name' in vim_config:
user_domain_name = vim_config['user_domain_name']
- auth = v3.Password(auth_url=creds.url,
- username=creds.user,
- password=creds.password,
- project_name=creds.tenant_name,
+ auth = v3.Password(auth_url=creds['vim_url'],
+ username=creds['vim_user'],
+ password=creds['vim_password'],
+ project_name=creds['vim_tenant_name'],
project_domain_name=project_domain_name,
user_domain_name=user_domain_name)
return session.Session(auth=auth, verify=verify_ssl)
class OpenstackCollector(BaseVimCollector):
def __init__(self, config: Config, vim_account_id: str):
super().__init__(config, vim_account_id)
- self.conf = config
self.common_db = CommonDbClient(config)
- self.backend = self._get_backend(vim_account_id)
+ vim_account = self.common_db.get_vim_account(vim_account_id)
+ self.backend = self._get_backend(vim_account)
- def _build_keystone_client(self, vim_account_id: str) -> keystone_client.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_keystone_client(self, vim_account: dict) -> keystone_client.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return keystone_client.Client(session=sess)
def _get_resource_uuid(self, nsr_id: str, vnf_member_index: str, vdur_name: str) -> str:
log.exception("Error collecting metric %s for vdu %s" % (metric_name, vdur['name']))
return metrics
- def _get_backend(self, vim_account_id: str):
+ def _get_backend(self, vim_account: dict):
try:
- ceilometer = CeilometerBackend(vim_account_id)
+ ceilometer = CeilometerBackend(vim_account)
ceilometer.client.capabilities.get()
return ceilometer
except EndpointNotFound:
- gnocchi = GnocchiBackend(vim_account_id)
+ gnocchi = GnocchiBackend(vim_account)
gnocchi.client.status.get()
return gnocchi
class GnocchiBackend(OpenstackBackend):
- def __init__(self, vim_account_id: str):
- self.client = self._build_gnocchi_client(vim_account_id)
- self.neutron = self._build_neutron_client(vim_account_id)
+ def __init__(self, vim_account: dict):
+ self.client = self._build_gnocchi_client(vim_account)
+ self.neutron = self._build_neutron_client(vim_account)
- def _build_gnocchi_client(self, vim_account_id: str) -> gnocchi_client.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_gnocchi_client(self, vim_account: dict) -> gnocchi_client.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return gnocchi_client.Client(session=sess)
- def _build_neutron_client(self, vim_account_id: str) -> neutron_client.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_neutron_client(self, vim_account: dict) -> neutron_client.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return neutron_client.Client(session=sess)
def collect_metric(self, metric_type: MetricType, metric_name: str, resource_id: str, interface_name: str):
class CeilometerBackend(OpenstackBackend):
- def __init__(self, vim_account_id: str):
- self.client = self._build_ceilometer_client(vim_account_id)
+ def __init__(self, vim_account: dict):
+ self.client = self._build_ceilometer_client(vim_account)
- def _build_ceilometer_client(self, vim_account_id: str) -> ceilometer_client.Client:
- sess = OpenstackUtils.get_session(vim_account_id)
+ def _build_ceilometer_client(self, vim_account: dict) -> ceilometer_client.Client:
+ sess = OpenstackUtils.get_session(vim_account)
return ceilometer_client.Client(session=sess)
def collect_metric(self, metric_type: MetricType, metric_name: str, resource_id: str, interface_name: str):
# contact: osslegalrouting@vmware.com
##
-import json
import logging
-from osm_mon.collector.utils.collector import CollectorUtils
from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector
+from osm_mon.collector.vnf_collectors.vrops.vrops_helper import vROPS_Helper
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.collector.vnf_collectors.vrops.vrops_helper import vROPS_Helper
log = logging.getLogger(__name__)
vrops_password=cfg['vrops_password'])
def get_vim_account(self, vim_account_id: str):
- vim_account_info = CollectorUtils.get_credentials(vim_account_id)
- return json.loads(vim_account_info.config)
+ vim_account_info = self.common_db.get_vim_account(vim_account_id)
+ return vim_account_info['config']
def collect(self, vnfr: dict):
vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
# contact: osslegalrouting@vmware.com
##
-import json
import logging
import traceback
from xml.etree import ElementTree as XmlElementTree
from pyvcloud.vcd.client import BasicLoginCredentials
from pyvcloud.vcd.client import Client
-from osm_mon.collector.utils.collector import CollectorUtils
from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector
+from osm_mon.collector.vnf_collectors.vrops.vrops_helper import vROPS_Helper
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.collector.vnf_collectors.vrops.vrops_helper import vROPS_Helper
log = logging.getLogger(__name__)
return - dict with vim account details
"""
vim_account = {}
- vim_account_info = CollectorUtils.get_credentials(vim_account_id)
+ vim_account_info = self.common_db.get_vim_account(vim_account_id)
- vim_account['vim_url'] = vim_account_info.url
+ vim_account['vim_url'] = vim_account_info['vim_url']
- vim_config = json.loads(vim_account_info.config)
+ vim_config = vim_account_info['config']
vim_account['admin_username'] = vim_config['admin_username']
- vim_account['admin_password'] = vim_config['admin_password']
+ vim_account['admin_password'] = self.common_db.decrypt_vim_password(vim_config['admin_password'],
+ vim_account_info['schema_version'],
+ vim_account_id)
vim_account['vrops_site'] = vim_config['vrops_site']
vim_account['vrops_user'] = vim_config['vrops_user']
- vim_account['vrops_password'] = vim_config['vrops_password']
+ vim_account['vrops_password'] = self.common_db.decrypt_vim_password(vim_config['vrops_password'],
+ vim_account_info['schema_version'],
+ vim_account_id)
return vim_account
if vdur['name'] == vdur_name:
return vdur
raise ValueError('vdur not found for nsr-id {}, member_index {} and vdur_name {}'.format(nsr_id, member_index,
- vdur_name))
+ vdur_name))
def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
return self.common_db.decrypt(vim_password, schema_version, vim_id)
def get_vim_accounts(self):
return self.common_db.get_list('vim_accounts')
+ def get_vim_account(self, vim_account_id: str) -> dict:
+ vim_account = self.common_db.get_one('vim_accounts', {"_id": vim_account_id})
+ vim_account['vim_password'] = self.decrypt_vim_password(vim_account['vim_password'],
+ vim_account['schema_version'],
+ vim_account_id)
+ vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password")
+ for key in vim_account['config']:
+ if key in vim_config_encrypted:
+ vim_account['config'][key] = self.decrypt_vim_password(vim_account['config'][key],
+ vim_account['schema_version'],
+ vim_account_id)
+ return vim_account
+
def get_sdncs(self):
return self.common_db.get_list('sdns')
import os
from typing import Iterable
-from peewee import CharField, TextField, FloatField, Model, AutoField, Proxy
+from peewee import CharField, FloatField, Model, AutoField, Proxy
from peewee_migrate import Router
from playhouse.db_url import connect
database = db
-class VimCredentials(BaseModel):
- uuid = CharField(unique=True)
- name = CharField()
- type = CharField()
- url = CharField()
- user = CharField()
- password = CharField()
- tenant_name = CharField()
- config = TextField()
-
-
class Alarm(BaseModel):
uuid = CharField(unique=True)
name = CharField()
db.close()
-class VimCredentialsRepository:
- @staticmethod
- def upsert(**query) -> VimCredentials:
- vim_credentials = VimCredentials.get_or_none(**query)
- if vim_credentials:
- query.update({'id': vim_credentials.id})
- vim_id = VimCredentials.insert(**query).on_conflict_replace().execute()
- return VimCredentials.get(id=vim_id)
-
- @staticmethod
- def get(*expressions) -> VimCredentials:
- return VimCredentials.select().where(*expressions).get()
-
-
class AlarmRepository:
@staticmethod
def create(**query) -> Alarm:
async def start(self):
topics = [
- "vim_account",
"alarm_request"
]
await self.msg_bus.aioread(topics, self._process_msg)
async def _process_msg(self, topic, key, values):
log.info("Message arrived: %s", values)
try:
- if topic == "vim_account":
- if key == "create" or key == "edit":
- if 'config' not in values:
- values['config'] = {}
- self.service.upsert_vim_account(values['_id'],
- values['name'],
- values['vim_type'],
- values['vim_url'],
- values['vim_user'],
- values['vim_password'],
- values['vim_tenant_name'],
- values['schema_version'],
- values['config'])
-
- if key == "delete":
- self.service.delete_vim_account(values['_id'])
-
- elif topic == "alarm_request":
+
+ if topic == "alarm_request":
if key == "create_alarm_request":
alarm_details = values['alarm_create_request']
cor_id = alarm_details['correlation_id']
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import json
import logging
import uuid
from osm_mon.core import database
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.database import VimCredentialsRepository, VimCredentials, AlarmRepository, Alarm
+from osm_mon.core.database import AlarmRepository, Alarm
log = logging.getLogger(__name__)
def __init__(self, config: Config):
self.common_db = CommonDbClient(config)
- def upsert_vim_account(self,
- vim_uuid: str,
- name: str,
- vim_type: str,
- url: str,
- user: str,
- password: str,
- tenant_name: str,
- schema_version: str,
- config: dict) -> VimCredentials:
- decrypted_vim_password = self.common_db.decrypt_vim_password(password,
- schema_version,
- vim_uuid)
-
- vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password")
- for key in config:
- if key in vim_config_encrypted:
- config[key] = self.common_db.decrypt_vim_password(config[key],
- schema_version,
- vim_uuid)
- database.db.connect()
- try:
- with database.db.atomic():
- return VimCredentialsRepository.upsert(
- uuid=vim_uuid,
- name=name,
- type=vim_type,
- url=url,
- user=user,
- password=decrypted_vim_password,
- tenant_name=tenant_name,
- config=json.dumps(config)
- )
- finally:
- database.db.close()
-
- def delete_vim_account(self, vim_uuid: str) -> None:
- database.db.connect()
- try:
- with database.db.atomic():
- vim_credentials = VimCredentialsRepository.get(VimCredentials.uuid == vim_uuid)
- vim_credentials.delete_instance()
- finally:
- database.db.close()
-
def create_alarm(self,
name: str,
threshold: str,
from unittest import TestCase, mock
from osm_mon.collector.service import CollectorService
-from osm_mon.collector.utils.collector import CollectorUtils
from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
@mock.patch.object(OpenstackCollector, "__init__", lambda *args, **kwargs: None)
@mock.patch.object(OpenstackCollector, "collect")
- @mock.patch.object(CollectorUtils, "get_vim_type")
- def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect):
- _get_vim_type.return_value = 'openstack'
+ @mock.patch.object(CommonDbClient, "get_vim_account")
+ def test_init_vim_collector_and_collect_openstack(self, _get_vim_account, collect):
+ _get_vim_account.return_value = {'vim_type': 'openstack'}
collector = CollectorService(self.config)
collector._collect_vim_metrics({}, 'test_vim_account_id')
collect.assert_called_once_with({})
@mock.patch.object(OpenstackCollector, "collect")
- @mock.patch.object(CollectorUtils, "get_vim_type")
- def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect):
- _get_vim_type.return_value = 'unknown'
+ @mock.patch.object(CommonDbClient, "get_vim_account")
+ def test_init_vim_collector_and_collect_unknown(self, _get_vim_account, openstack_collect):
+ _get_vim_account.return_value = {'vim_type': 'unknown'}
collector = CollectorService(self.config)
collector._collect_vim_metrics({}, 'test_vim_account_id')
openstack_collect.assert_not_called()
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-from unittest import TestCase, mock
-
-from osm_mon.collector.utils.collector import CollectorUtils
-from osm_mon.core.config import Config
-from osm_mon.core.database import VimCredentialsRepository, VimCredentials
-
-
-class CollectorServiceTest(TestCase):
- def setUp(self):
- super().setUp()
- self.config = Config()
-
- @mock.patch.object(VimCredentialsRepository, "get")
- @mock.patch('osm_mon.core.database.db')
- def test_get_vim_type(self, db, get_credentials):
- mock_creds = VimCredentials()
- mock_creds.id = 'test_id'
- mock_creds.user = 'user'
- mock_creds.url = 'url'
- mock_creds.password = 'password'
- mock_creds.tenant_name = 'tenant_name'
- mock_creds.type = 'openstack'
- mock_creds.config = '{}'
-
- get_credentials.return_value = mock_creds
- vim_type = CollectorUtils.get_vim_type('test_id')
- self.assertEqual(vim_type, 'openstack')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+# #
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+# #
+from unittest import TestCase, mock
+
+from osm_mon.collector.utils.openstack import OpenstackUtils
+
+
+@mock.patch('osm_mon.collector.utils.openstack.session')
+class OpenstackUtilsTest(TestCase):
+
+ def setUp(self):
+ super().setUp()
+
+ def test_session_without_insecure(self, mock_session):
+ creds = {
+ 'config': {
+ },
+ 'vim_url': 'url',
+ 'vim_user': 'user',
+ 'vim_password': 'password',
+ 'vim_tenant_name': 'tenant_name'
+ }
+ OpenstackUtils.get_session(creds)
+
+ mock_session.Session.assert_called_once_with(
+ auth=mock.ANY, verify=True)
+
+ def test_session_with_insecure(self, mock_session):
+ creds = {
+ 'config': {
+ 'insecure': True
+ },
+ 'vim_url': 'url',
+ 'vim_user': 'user',
+ 'vim_password': 'password',
+ 'vim_tenant_name': 'tenant_name'
+ }
+ OpenstackUtils.get_session(creds)
+
+ mock_session.Session.assert_called_once_with(
+ auth=mock.ANY, verify=False)
+
+ def test_session_with_insecure_false(self, mock_session):
+ creds = {
+ 'config': {
+ 'insecure': False
+ },
+ 'vim_url': 'url',
+ 'vim_user': 'user',
+ 'vim_password': 'password',
+ 'vim_tenant_name': 'tenant_name'
+ }
+ OpenstackUtils.get_session(creds)
+ mock_session.Session.assert_called_once_with(
+ auth=mock.ANY, verify=True)
'+00:00')), 60.0, 0.0333070363)]
build_gnocchi_client.return_value = mock_gnocchi_client
- backend = GnocchiBackend('test_uuid')
+ backend = GnocchiBackend({'_id': 'test_uuid'})
value = backend._collect_instance_metric('cpu_utilization', 'test_resource_id')
self.assertEqual(value, 0.0333070363)
mock_gnocchi_client.metric.get_measures.assert_called_once_with('cpu_utilization',
build_gnocchi_client.return_value = mock_gnocchi_client
build_neutron_client.return_value = mock_neutron_client
- backend = GnocchiBackend('test_uuid')
+ backend = GnocchiBackend({'_id': 'test_uuid'})
value = backend._collect_interface_one_metric('packets_received', 'test_resource_id', 'eth0')
self.assertEqual(value, 0.0333070363)
mock_gnocchi_client.metric.get_measures.assert_called_once_with('packets_received', resource_id='test_id',
build_gnocchi_client.return_value = mock_gnocchi_client
- backend = GnocchiBackend('test_uuid')
+ backend = GnocchiBackend({'_id': 'test_uuid'})
value = backend._collect_interface_all_metric('packets_received', 'test_resource_id')
self.assertEqual(value, 0.0666140726)
mock_gnocchi_client.metric.get_measures.assert_any_call('packets_received', resource_id='test_id_1',
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import json
from unittest import TestCase, mock
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.database import VimCredentialsRepository, VimCredentials
-from osm_mon.server.service import ServerService
@mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
super().setUp()
self.config = Config()
- @mock.patch.object(CommonDbClient, "decrypt_vim_password")
- @mock.patch.object(VimCredentialsRepository, "upsert")
- @mock.patch('osm_mon.core.database.db')
- def test_upsert_vim_account(self, db, upsert_credentials, decrypt_vim_password):
- def _mock_decrypt_vim_password(password: str, schema_version: str, vim_uuid: str):
- return password.replace('encrypted', 'decrypted')
-
- decrypt_vim_password.side_effect = _mock_decrypt_vim_password
-
- mock_config = {
- 'admin_password': 'encrypted_admin_password',
- 'nsx_password': 'encrypted_nsx_password',
- 'vcenter_password': 'encrypted_vcenter_password'
- }
-
- mock_expected_config = {
- 'admin_password': 'decrypted_admin_password',
- 'nsx_password': 'decrypted_nsx_password',
- 'vcenter_password': 'decrypted_vcenter_password'
- }
-
- service = ServerService(self.config)
- service.upsert_vim_account('test_uuid', 'test_name', 'test_type', 'test_url', 'test_user', 'encrypted_password',
- 'test_tenant_name', '1.1', mock_config)
-
- upsert_credentials.assert_called_with(
- uuid=mock.ANY,
- name='test_name',
- type='test_type',
- url='test_url',
- user='test_user',
- password='decrypted_password',
- tenant_name='test_tenant_name',
- config=json.dumps(mock_expected_config)
- )
-
- @mock.patch.object(VimCredentialsRepository, "get")
- @mock.patch('osm_mon.core.database.db')
- def test_delete_vim_account(self, db, get_credentials):
- mock_creds = mock.Mock()
- get_credentials.return_value = mock_creds
-
- service = ServerService(self.config)
- service.delete_vim_account('test_uuid')
-
- get_credentials.assert_called_with(VimCredentials.uuid == 'test_uuid')
- mock_creds.delete_instance.assert_called_with()
+ def test_create_alarm(self):
+ # TODO
+ pass