05378d7c4797f1b8234e335fdfa7e3d5dc05c149
[osm/POL.git] / osm_policy_module / common / lcm_client.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28 import time
29 import uuid
30
31 from aiokafka import AIOKafkaProducer
32 from osm_common import dbmongo
33
34 from osm_policy_module.core.config import Config
35
36 log = logging.getLogger(__name__)
37
38
39 class LcmClient:
40 def __init__(self, loop=None):
41 cfg = Config.instance()
42 self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
43 cfg.OSMPOL_MESSAGE_PORT)
44 self.common_db = dbmongo.DbMongo()
45 self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
46 'name': 'osm'})
47 if not loop:
48 loop = asyncio.get_event_loop()
49 self.loop = loop
50
51 async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
52 log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
53 nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
54 self.common_db.create("nslcmops", nslcmop)
55 log.info("Sending scale action message: %s", json.dumps(nslcmop))
56 producer = AIOKafkaProducer(loop=self.loop,
57 bootstrap_servers=self.kafka_server,
58 key_serializer=str.encode,
59 value_serializer=str.encode)
60 await producer.start()
61 try:
62 # Produce message
63 await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
64 finally:
65 # Wait for all pending messages to be delivered or expire.
66 await producer.stop()
67
68 def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
69 log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
70 _id = str(uuid.uuid4())
71 now = time.time()
72 params = {
73 "scaleType": "SCALE_VNF",
74 "scaleVnfData": {
75 "scaleVnfType": action.upper(),
76 "scaleByStepData": {
77 "scaling-group-descriptor": scaling_group_name,
78 "member-vnf-index": str(vnf_member_index)
79 }
80 },
81 "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat())
82 }
83
84 nslcmop = {
85 "id": _id,
86 "_id": _id,
87 "operationState": "PROCESSING",
88 "statusEnteredTime": now,
89 "nsInstanceId": nsr_id,
90 "lcmOperationType": "scale",
91 "startTime": now,
92 "isAutomaticInvocation": True,
93 "operationParams": params,
94 "isCancelPending": False,
95 "links": {
96 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
97 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
98 }
99 }
100 return nslcmop