Removed common files and use osm/common package
Change-Id: I60f0cee9338fb5f1595e13bfab62805a4fec9913
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_nbi/dbbase.py b/osm_nbi/dbbase.py
deleted file mode 100644
index b101cb4..0000000
--- a/osm_nbi/dbbase.py
+++ /dev/null
@@ -1,38 +0,0 @@
-from http import HTTPStatus
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-
-class DbException(Exception):
-
- def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
- # TODO change to http.HTTPStatus instead of int that allows .value and .name
- self.http_code = http_code
- Exception.__init__(self, "database exception " + message)
-
-
-class DbBase(object):
-
- def __init__(self):
- pass
-
- def db_connect(self, config):
- pass
-
- def db_disconnect(self):
- pass
-
- def get_list(self, table, filter={}):
- pass
-
- def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
- pass
-
- def create(self, table, indata):
- pass
-
- def del_list(self, table, filter={}):
- pass
-
- def del_one(self, table, filter={}, fail_on_empty=True):
- pass
diff --git a/osm_nbi/dbmemory.py b/osm_nbi/dbmemory.py
deleted file mode 100644
index cdb0482..0000000
--- a/osm_nbi/dbmemory.py
+++ /dev/null
@@ -1,124 +0,0 @@
-import logging
-from dbbase import DbException, DbBase
-from http import HTTPStatus
-from uuid import uuid4
-from copy import deepcopy
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-
-class DbMemory(DbBase):
-
- def __init__(self, logger_name='db'):
- self.logger = logging.getLogger(logger_name)
- self.db = {}
-
- def db_connect(self, config):
- if "logger_name" in config:
- self.logger = logging.getLogger(config["logger_name"])
-
- @staticmethod
- def _format_filter(filter):
- return filter # TODO
-
- def _find(self, table, filter):
- for i, row in enumerate(self.db.get(table, ())):
- match = True
- if filter:
- for k, v in filter.items():
- if k not in row or v != row[k]:
- match = False
- if match:
- yield i, row
-
- def get_list(self, table, filter={}):
- try:
- l = []
- for _, row in self._find(table, self._format_filter(filter)):
- l.append(deepcopy(row))
- return l
- except DbException:
- raise
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
- try:
- l = None
- for _, row in self._find(table, self._format_filter(filter)):
- if not fail_on_more:
- return deepcopy(row)
- if l:
- raise DbException("Found more than one entry with filter='{}'".format(filter),
- HTTPStatus.CONFLICT.value)
- l = row
- if not l and fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
- return deepcopy(l)
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def del_list(self, table, filter={}):
- try:
- id_list = []
- for i, _ in self._find(table, self._format_filter(filter)):
- id_list.append(i)
- deleted = len(id_list)
- for i in id_list:
- del self.db[table][i]
- return {"deleted": deleted}
- except DbException:
- raise
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def del_one(self, table, filter={}, fail_on_empty=True):
- try:
- for i, _ in self._find(table, self._format_filter(filter)):
- break
- else:
- if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
- return None
- del self.db[table][i]
- return {"deleted": 1}
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def replace(self, table, filter, indata, fail_on_empty=True):
- try:
- for i, _ in self._find(table, self._format_filter(filter)):
- break
- else:
- if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
- return None
- self.db[table][i] = deepcopy(indata)
- return {"upadted": 1}
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def create(self, table, indata):
- try:
- id = indata.get("_id")
- if not id:
- id = str(uuid4())
- indata["_id"] = id
- if table not in self.db:
- self.db[table] = []
- self.db[table].append(deepcopy(indata))
- return id
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
-
-if __name__ == '__main__':
- # some test code
- db = dbmemory()
- db.create("test", {"_id": 1, "data": 1})
- db.create("test", {"_id": 2, "data": 2})
- db.create("test", {"_id": 3, "data": 3})
- print("must be 3 items:", db.get_list("test"))
- print("must return item 2:", db.get_list("test", {"_id": 2}))
- db.del_one("test", {"_id": 2})
- print("must be emtpy:", db.get_list("test", {"_id": 2}))
diff --git a/osm_nbi/dbmongo.py b/osm_nbi/dbmongo.py
deleted file mode 100644
index a8ea1ca..0000000
--- a/osm_nbi/dbmongo.py
+++ /dev/null
@@ -1,190 +0,0 @@
-#import pymongo
-import logging
-from pymongo import MongoClient, errors
-from dbbase import DbException, DbBase
-from http import HTTPStatus
-from time import time, sleep
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-# TODO consider use this decorator for database access retries
-# @retry_mongocall
-# def retry_mongocall(call):
-# def _retry_mongocall(*args, **kwargs):
-# retry = 1
-# while True:
-# try:
-# return call(*args, **kwargs)
-# except pymongo.AutoReconnect as e:
-# if retry == 4:
-# raise DbException(str(e))
-# sleep(retry)
-# return _retry_mongocall
-
-
-class DbMongo(DbBase):
- conn_initial_timout = 120
- conn_timout = 10
-
- def __init__(self, logger_name='db'):
- self.logger = logging.getLogger(logger_name)
-
- def db_connect(self, config):
- try:
- if "logger_name" in config:
- self.logger = logging.getLogger(config["logger_name"])
- self.client = MongoClient(config["host"], config["port"])
- self.db = self.client[config["name"]]
- if "loglevel" in config:
- self.logger.setLevel(getattr(logging, config['loglevel']))
- # get data to try a connection
- now = time()
- while True:
- try:
- self.db.users.find_one({"username": "admin"})
- return
- except errors.ConnectionFailure as e:
- if time() - now >= self.conn_initial_timout:
- raise
- self.logger.info("Waiting to database up {}".format(e))
- sleep(2)
- except errors.PyMongoError as e:
- raise DbException(str(e))
-
- def db_disconnect(self):
- pass # TODO
-
- @staticmethod
- def _format_filter(filter):
- try:
- db_filter = {}
- for query_k, query_v in filter.items():
- dot_index = query_k.rfind(".")
- if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont",
- "ncont", "neq"):
- operator = "$" + query_k[dot_index+1:]
- if operator == "$neq":
- operator = "$ne"
- k = query_k[:dot_index]
- else:
- operator = "$eq"
- k = query_k
-
- v = query_v
- if isinstance(v, list):
- if operator in ("$eq", "$cont"):
- operator = "$in"
- v = query_v
- elif operator in ("$ne", "$ncont"):
- operator = "$nin"
- v = query_v
- else:
- v = query_v.join(",")
-
- if operator in ("$eq", "$cont"):
- # v cannot be a comma separated list, because operator would have been changed to $in
- db_filter[k] = v
- elif operator == "$ncount":
- # v cannot be a comma separated list, because operator would have been changed to $nin
- db_filter[k] = {"$ne": v}
- else:
- # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8"
- if k not in db_filter:
- db_filter[k] = {}
- db_filter[k][operator] = v
-
- return db_filter
- except Exception as e:
- raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
- http_code=HTTPStatus.BAD_REQUEST)
-
- def get_list(self, table, filter={}):
- try:
- l = []
- collection = self.db[table]
- rows = collection.find(self._format_filter(filter))
- for row in rows:
- l.append(row)
- return l
- except DbException:
- raise
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True):
- try:
- if filter:
- filter = self._format_filter(filter)
- collection = self.db[table]
- if not (fail_on_empty and fail_on_more):
- return collection.find_one(filter)
- rows = collection.find(filter)
- if rows.count() == 0:
- if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
- return None
- elif rows.count() > 1:
- if fail_on_more:
- raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.CONFLICT)
- return rows[0]
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def del_list(self, table, filter={}):
- try:
- collection = self.db[table]
- rows = collection.delete_many(self._format_filter(filter))
- return {"deleted": rows.deleted_count}
- except DbException:
- raise
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def del_one(self, table, filter={}, fail_on_empty=True):
- try:
- collection = self.db[table]
- rows = collection.delete_one(self._format_filter(filter))
- if rows.deleted_count == 0:
- if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
- return None
- return {"deleted": rows.deleted_count}
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def create(self, table, indata):
- try:
- collection = self.db[table]
- data = collection.insert_one(indata)
- return data.inserted_id
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def set_one(self, table, filter, update_dict, fail_on_empty=True):
- try:
- collection = self.db[table]
- rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
- if rows.updated_count == 0:
- if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
- return None
- return {"deleted": rows.deleted_count}
- except Exception as e: # TODO refine
- raise DbException(str(e))
-
- def replace(self, table, id, indata, fail_on_empty=True):
- try:
- collection = self.db[table]
- rows = collection.replace_one({"_id": id}, indata)
- if rows.modified_count == 0:
- if fail_on_empty:
- raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
- HTTPStatus.NOT_FOUND)
- return None
- return {"replace": rows.modified_count}
- except Exception as e: # TODO refine
- raise DbException(str(e))
diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py
index 7b8799d..d771f96 100644
--- a/osm_nbi/engine.py
+++ b/osm_nbi/engine.py
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
-import dbmongo
-import dbmemory
-import fslocal
-import msglocal
-import msgkafka
+from osm_common import dbmongo
+from osm_common import dbmemory
+from osm_common import fslocal
+from osm_common import msglocal
+from osm_common import msgkafka
import tarfile
import yaml
import json
@@ -12,9 +12,9 @@
from random import choice as random_choice
from uuid import uuid4
from hashlib import sha256, md5
-from dbbase import DbException
-from fsbase import FsException
-from msgbase import MsgException
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
from http import HTTPStatus
from time import time
from copy import deepcopy
diff --git a/osm_nbi/fsbase.py b/osm_nbi/fsbase.py
deleted file mode 100644
index 7b6cd0c..0000000
--- a/osm_nbi/fsbase.py
+++ /dev/null
@@ -1,43 +0,0 @@
-
-from http import HTTPStatus
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-
-class FsException(Exception):
- def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
- self.http_code = http_code
- Exception.__init__(self, "storage exception " + message)
-
-
-class FsBase(object):
- def __init__(self):
- pass
-
- def get_params(self):
- return {}
-
- def fs_connect(self, config):
- pass
-
- def fs_disconnect(self):
- pass
-
- def mkdir(self, folder):
- pass
-
- def file_exists(self, storage):
- pass
-
- def file_size(self, storage):
- pass
-
- def file_extract(self, tar_object, path):
- pass
-
- def file_open(self, storage, mode):
- pass
-
- def file_delete(self, storage, ignore_non_exist=False):
- pass
-
diff --git a/osm_nbi/fslocal.py b/osm_nbi/fslocal.py
deleted file mode 100644
index b7dd839..0000000
--- a/osm_nbi/fslocal.py
+++ /dev/null
@@ -1,142 +0,0 @@
-import os
-import logging
-import tarfile
-from http import HTTPStatus
-from shutil import rmtree
-from fsbase import FsBase, FsException
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-
-class FsLocal(FsBase):
-
- def __init__(self, logger_name='fs'):
- self.logger = logging.getLogger(logger_name)
- self.path = None
-
- def get_params(self):
- return {"fs": "local", "path": self.path}
-
- def fs_connect(self, config):
- try:
- if "logger_name" in config:
- self.logger = logging.getLogger(config["logger_name"])
- self.path = config["path"]
- if not self.path.endswith("/"):
- self.path += "/"
- if not os.path.exists(self.path):
- raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
- config["path"]))
- except FsException:
- raise
- except Exception as e: # TODO refine
- raise FsException(str(e))
-
- def fs_disconnect(self):
- pass # TODO
-
- def mkdir(self, folder):
- """
- Creates a folder or parent object location
- :param folder:
- :return: None or raises and exception
- """
- try:
- os.mkdir(self.path + folder)
- except Exception as e:
- raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
-
- def file_exists(self, storage, mode=None):
- """
- Indicates if "storage" file exist
- :param storage: can be a str or a str list
- :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
- :return: True, False
- """
- if isinstance(storage, str):
- f = storage
- else:
- f = "/".join(storage)
- if os.path.exists(self.path + f):
- if mode == "file" and os.path.isfile(self.path + f):
- return True
- if mode == "dir" and os.path.isdir(self.path + f):
- return True
- return False
-
- def file_size(self, storage):
- """
- return file size
- :param storage: can be a str or a str list
- :return: file size
- """
- if isinstance(storage, str):
- f = storage
- else:
- f = "/".join(storage)
- return os.path.getsize(self.path + f)
-
- def file_extract(self, tar_object, path):
- """
- extract a tar file
- :param tar_object: object of type tar
- :param path: can be a str or a str list, or a tar object where to extract the tar_object
- :return: None
- """
- if isinstance(path, str):
- f = self.path + path
- else:
- f = self.path + "/".join(path)
- tar_object.extractall(path=f)
-
- def file_open(self, storage, mode):
- """
- Open a file
- :param storage: can be a str or list of str
- :param mode: file mode
- :return: file object
- """
- try:
- if isinstance(storage, str):
- f = storage
- else:
- f = "/".join(storage)
- return open(self.path + f, mode)
- except FileNotFoundError:
- raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
- except IOError:
- raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
-
- def dir_ls(self, storage):
- """
- return folder content
- :param storage: can be a str or list of str
- :return: folder content
- """
- try:
- if isinstance(storage, str):
- f = storage
- else:
- f = "/".join(storage)
- return os.listdir(self.path + f)
- except NotADirectoryError:
- raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
- except IOError:
- raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
-
- def file_delete(self, storage, ignore_non_exist=False):
- """
- Delete storage content recursivelly
- :param storage: can be a str or list of str
- :param ignore_non_exist: not raise exception if storage does not exist
- :return: None
- """
-
- if isinstance(storage, str):
- f = self.path + storage
- else:
- f = self.path + "/".join(storage)
- if os.path.exists(f):
- rmtree(f)
- elif not ignore_non_exist:
- raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
diff --git a/osm_nbi/msgbase.py b/osm_nbi/msgbase.py
deleted file mode 100644
index 25e8c80..0000000
--- a/osm_nbi/msgbase.py
+++ /dev/null
@@ -1,47 +0,0 @@
-
-import asyncio
-from http import HTTPStatus
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-
-class MsgException(Exception):
- """
- Base Exception class for all msgXXXX exceptions
- """
-
- def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
- """
- General exception
- :param message: descriptive text
- :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
- """
- self.http_code = http_code
- Exception.__init__(self, "messaging exception " + message)
-
-
-class MsgBase(object):
- """
- Base class for all msgXXXX classes
- """
-
- def __init__(self):
- pass
-
- def connect(self, config):
- pass
-
- def disconnect(self):
- pass
-
- def write(self, topic, key, msg):
- pass
-
- def read(self, topic):
- pass
-
- async def aiowrite(self, topic, key, msg, loop):
- pass
-
- async def aioread(self, topic, loop):
- pass
diff --git a/osm_nbi/msgkafka.py b/osm_nbi/msgkafka.py
deleted file mode 100644
index 96456af..0000000
--- a/osm_nbi/msgkafka.py
+++ /dev/null
@@ -1,96 +0,0 @@
-import logging
-import asyncio
-import yaml
-from aiokafka import AIOKafkaConsumer
-from aiokafka import AIOKafkaProducer
-from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
-
-
-class MsgKafka(MsgBase):
- def __init__(self, logger_name='msg'):
- self.logger = logging.getLogger(logger_name)
- self.host = None
- self.port = None
- self.consumer = None
- self.producer = None
- # create a different file for each topic
- #self.files = {}
-
- def connect(self, config):
- try:
- if "logger_name" in config:
- self.logger = logging.getLogger(config["logger_name"])
- self.host = config["host"]
- self.port = config["port"]
- self.topic_lst = []
- self.loop = asyncio.get_event_loop()
- self.broker = str(self.host) + ":" + str(self.port)
-
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
- def disconnect(self):
- try:
- self.loop.close()
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
- def write(self, topic, key, msg):
- try:
- self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
- msg=yaml.safe_dump(msg, default_flow_style=True),
- loop=self.loop))
-
- except Exception as e:
- raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
-
- def read(self, topic):
- """
- Read from one or several topics. it is non blocking returning None if nothing is available
- :param topic: can be str: single topic; or str list: several topics
- :return: topic, key, message; or None
- """
- try:
- return self.loop.run_until_complete(self.aioread(topic, self.loop))
- except MsgException:
- raise
- except Exception as e:
- raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
-
- async def aiowrite(self, topic, key, msg, loop):
- try:
- self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
- bootstrap_servers=self.broker)
- await self.producer.start()
- await self.producer.send(topic=topic, key=key, value=msg)
- except Exception as e:
- raise MsgException("Error publishing to {} topic: {}".format(topic, str(e)))
- finally:
- await self.producer.stop()
-
- async def aioread(self, topic, loop):
- """
- Asyncio read from one or several topics. It blocks
- :param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop
- :return: topic, key, message
- """
- try:
- if isinstance(topic, (list, tuple)):
- topic_list = topic
- else:
- topic_list = (topic,)
-
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
- await self.consumer.start()
- self.consumer.subscribe(topic_list)
- async for message in self.consumer:
- return message.topic, yaml.load(message.key), yaml.load(message.value)
- except KafkaError as e:
- raise MsgException(str(e))
- finally:
- await self.consumer.stop()
-
-
diff --git a/osm_nbi/msglocal.py b/osm_nbi/msglocal.py
deleted file mode 100644
index 337321f..0000000
--- a/osm_nbi/msglocal.py
+++ /dev/null
@@ -1,111 +0,0 @@
-import logging
-import os
-import yaml
-import asyncio
-from msgbase import MsgBase, MsgException
-from time import sleep
-
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-
-"""
-This emulated kafka bus by just using a shared file system. Usefull for testing or devops.
-One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
-access to the same file. e.g. same volume if running with docker.
-One text line per message is used in yaml format
-"""
-
-class MsgLocal(MsgBase):
-
- def __init__(self, logger_name='msg'):
- self.logger = logging.getLogger(logger_name)
- self.path = None
- # create a different file for each topic
- self.files = {}
- self.buffer = {}
-
- def connect(self, config):
- try:
- if "logger_name" in config:
- self.logger = logging.getLogger(config["logger_name"])
- self.path = config["path"]
- if not self.path.endswith("/"):
- self.path += "/"
- if not os.path.exists(self.path):
- os.mkdir(self.path)
- except MsgException:
- raise
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
- def disconnect(self):
- for f in self.files.values():
- try:
- f.close()
- except Exception as e: # TODO refine
- pass
-
- def write(self, topic, key, msg):
- """
- Insert a message into topic
- :param topic: topic
- :param key: key text to be inserted
- :param msg: value object to be inserted, can be str, object ...
- :return: None or raises and exception
- """
- try:
- if topic not in self.files:
- self.files[topic] = open(self.path + topic, "a+")
- yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
- self.files[topic].flush()
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
- def read(self, topic, blocks=True):
- """
- Read from one or several topics. it is non blocking returning None if nothing is available
- :param topic: can be str: single topic; or str list: several topics
- :param blocks: indicates if it should wait and block until a message is present or returns None
- :return: topic, key, message; or None if blocks==True
- """
- try:
- if isinstance(topic, (list, tuple)):
- topic_list = topic
- else:
- topic_list = (topic, )
- while True:
- for single_topic in topic_list:
- if single_topic not in self.files:
- self.files[single_topic] = open(self.path + single_topic, "a+")
- self.buffer[single_topic] = ""
- self.buffer[single_topic] += self.files[single_topic].readline()
- if not self.buffer[single_topic].endswith("\n"):
- continue
- msg_dict = yaml.load(self.buffer[single_topic])
- self.buffer[single_topic] = ""
- assert len(msg_dict) == 1
- for k, v in msg_dict.items():
- return single_topic, k, v
- if not blocks:
- return None
- sleep(2)
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
- async def aioread(self, topic, loop):
- """
- Asyncio read from one or several topics. It blocks
- :param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop
- :return: topic, key, message
- """
- try:
- while True:
- msg = self.read(topic, blocks=False)
- if msg:
- return msg
- await asyncio.sleep(2, loop=loop)
- except MsgException:
- raise
- except Exception as e: # TODO refine
- raise MsgException(str(e))
-
diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py
index a8f891d..ba2e14e 100644
--- a/osm_nbi/nbi.py
+++ b/osm_nbi/nbi.py
@@ -8,9 +8,9 @@
import html_out as html
import logging
from engine import Engine, EngineException
-from dbbase import DbException
-from fsbase import FsException
-from msgbase import MsgException
+from osm_common.dbbase import DbException
+from osm_common.fsbase import FsException
+from osm_common.msgbase import MsgException
from base64 import standard_b64decode
#from os import getenv
from http import HTTPStatus