X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsglocal.py;h=1e8e089b22913215ce7835cdab383ccd2a3dce51;hb=c837a7819c5388b276ada7f29a085cd251119e4b;hp=bfa30b77ca5d77bb248b76f42d3eeb0a4a9e0709;hpb=3054f783ac759b221233fd0a82424aa105e4ea2e;p=osm%2Fcommon.git diff --git a/osm_common/msglocal.py b/osm_common/msglocal.py index bfa30b7..1e8e089 100644 --- a/osm_common/msglocal.py +++ b/osm_common/msglocal.py @@ -1,9 +1,27 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Telefonica S.A. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import os import yaml import asyncio from osm_common.msgbase import MsgBase, MsgException from time import sleep +from http import HTTPStatus __author__ = "Alfonso Tierno " @@ -17,11 +35,12 @@ One text line per message is used in yaml format. class MsgLocal(MsgBase): - def __init__(self, logger_name='msg'): - self.logger = logging.getLogger(logger_name) + def __init__(self, logger_name='msg', lock=False): + super().__init__(logger_name, lock) self.path = None # create a different file for each topic - self.files = {} + self.files_read = {} + self.files_write = {} self.buffer = {} def connect(self, config): @@ -36,12 +55,19 @@ class MsgLocal(MsgBase): except MsgException: raise except Exception as e: # TODO refine - raise MsgException(str(e)) + raise MsgException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) def disconnect(self): - for f in self.files.values(): + for topic, f in self.files_read.items(): + try: + f.close() + self.files_read[topic] = None + except Exception: # TODO refine + pass + for topic, f in self.files_write.items(): try: f.close() + self.files_write[topic] = None except Exception: # TODO refine pass @@ -54,12 +80,13 @@ class MsgLocal(MsgBase): :return: None or raises and exception """ try: - if topic not in self.files: - self.files[topic] = open(self.path + topic, "a+") - yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000) - self.files[topic].flush() + with self.lock: + if topic not in self.files_write: + self.files_write[topic] = open(self.path + topic, "a+") + yaml.safe_dump({key: msg}, self.files_write[topic], default_flow_style=True, width=20000) + self.files_write[topic].flush() except Exception as e: # TODO refine - raise MsgException(str(e)) + raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) def read(self, topic, blocks=True): """ @@ -75,24 +102,25 @@ class MsgLocal(MsgBase): topic_list = (topic, ) while True: for single_topic in topic_list: - if single_topic not in self.files: - self.files[single_topic] = open(self.path + single_topic, "a+") + with self.lock: + if single_topic not in self.files_read: + self.files_read[single_topic] = open(self.path + single_topic, "a+") + self.buffer[single_topic] = "" + self.buffer[single_topic] += self.files_read[single_topic].readline() + if not self.buffer[single_topic].endswith("\n"): + continue + msg_dict = yaml.load(self.buffer[single_topic]) self.buffer[single_topic] = "" - self.buffer[single_topic] += self.files[single_topic].readline() - if not self.buffer[single_topic].endswith("\n"): - continue - msg_dict = yaml.load(self.buffer[single_topic]) - self.buffer[single_topic] = "" - assert len(msg_dict) == 1 - for k, v in msg_dict.items(): - return single_topic, k, v + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return single_topic, k, v if not blocks: return None sleep(2) except Exception as e: # TODO refine - raise MsgException(str(e)) + raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) - async def aioread(self, topic, loop): + async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs): """ Asyncio read from one or several topics. It blocks :param topic: can be str: single topic; or str list: several topics @@ -103,9 +131,25 @@ class MsgLocal(MsgBase): while True: msg = self.read(topic, blocks=False) if msg: - return msg + if callback: + callback(*msg, **kwargs) + elif aiocallback: + await aiocallback(*msg, **kwargs) + else: + return msg await asyncio.sleep(2, loop=loop) except MsgException: raise except Exception as e: # TODO refine - raise MsgException(str(e)) + raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) + + async def aiowrite(self, topic, key, msg, loop=None): + """ + Asyncio write. It blocks + :param topic: str + :param key: str + :param msg: message, can be str or yaml + :param loop: asyncio loop + :return: nothing if ok or raises an exception + """ + return self.write(topic, key, msg)