Implements aiokafka and modifies code to support asyncio
[osm/POL.git] / osm_policy_module / common / lcm_client.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28 import time
29 import uuid
30
31 from aiokafka import AIOKafkaProducer
32 from osm_common import dbmongo
33
34 from osm_policy_module.core.config import Config
35
36 log = logging.getLogger(__name__)
37
38
39 class LcmClient:
40 def __init__(self, loop=None):
41 cfg = Config.instance()
42 self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
43 cfg.OSMPOL_MESSAGE_PORT)
44 self.common_db = dbmongo.DbMongo()
45 self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
46 'port': int(cfg.OSMPOL_DATABASE_PORT),
47 'name': 'osm'})
48 if not loop:
49 loop = asyncio.get_event_loop()
50 self.loop = loop
51
52 async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
53 log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
54 nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
55 self.common_db.create("nslcmops", nslcmop)
56 log.info("Sending scale action message: %s", json.dumps(nslcmop))
57 producer = AIOKafkaProducer(loop=self.loop,
58 bootstrap_servers=self.kafka_server,
59 key_serializer=str.encode,
60 value_serializer=str.encode)
61 await producer.start()
62 try:
63 # Produce message
64 await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
65 finally:
66 # Wait for all pending messages to be delivered or expire.
67 await producer.stop()
68
69 def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
70 log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
71 _id = str(uuid.uuid4())
72 now = time.time()
73 params = {
74 "scaleType": "SCALE_VNF",
75 "scaleVnfData": {
76 "scaleVnfType": action.upper(),
77 "scaleByStepData": {
78 "scaling-group-descriptor": scaling_group_name,
79 "member-vnf-index": str(vnf_member_index)
80 }
81 },
82 "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat())
83 }
84
85 nslcmop = {
86 "id": _id,
87 "_id": _id,
88 "operationState": "PROCESSING",
89 "statusEnteredTime": now,
90 "nsInstanceId": nsr_id,
91 "lcmOperationType": "scale",
92 "startTime": now,
93 "isAutomaticInvocation": True,
94 "operationParams": params,
95 "isCancelPending": False,
96 "links": {
97 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
98 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
99 }
100 }
101 return nslcmop