1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Telefonica S.A.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 from http
import HTTPStatus
20 from osm_common
.common_utils
import FakeLock
21 from threading
import Lock
23 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26 class MsgException(Exception):
28 Base Exception class for all msgXXXX exceptions
31 def __init__(self
, message
, http_code
=HTTPStatus
.SERVICE_UNAVAILABLE
):
34 :param message: descriptive text
35 :param http_code: <http.HTTPStatus> type. It contains ".value" (http error code) and ".name" (http error name
37 self
.http_code
= http_code
38 Exception.__init
__(self
, "messaging exception " + message
)
41 class MsgBase(object):
43 Base class for all msgXXXX classes
46 def __init__(self
, logger_name
='msg', lock
=False):
49 :param logger_name: logging name
50 :param lock: Used to protect simultaneous access to the same instance class by several threads:
51 False, None: Do not protect, this object will only be accessed by one thread
52 True: This object needs to be protected by several threads accessing.
53 Lock object. Use thi Lock for the threads access protection
55 self
.logger
= logging
.getLogger(logger_name
)
57 self
.lock
= FakeLock()
60 elif isinstance(lock
, Lock
):
63 raise ValueError("lock parameter must be a Lock class or boolean")
65 def connect(self
, config
):
71 def write(self
, topic
, key
, msg
):
72 raise MsgException("Method 'write' not implemented", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
74 def read(self
, topic
):
75 raise MsgException("Method 'read' not implemented", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
77 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
78 raise MsgException("Method 'aiowrite' not implemented", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
80 async def aioread(self
, topic
, loop
=None, callback
=None, aiocallback
=None, group_id
=None, **kwargs
):
81 raise MsgException("Method 'aioread' not implemented", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)