X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fcommon%2Flcm_client.py;h=c94a424b5c9870fd502f8ff3d97d240d8fb8bcd1;hb=be42d54170ca40d8d52e2c9fc8d888621585d6cf;hp=05378d7c4797f1b8234e335fdfa7e3d5dc05c149;hpb=312c16596975a42d6294a1a2ca7af98b0ff2ffb5;p=osm%2FPOL.git diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py index 05378d7..c94a424 100644 --- a/osm_policy_module/common/lcm_client.py +++ b/osm_policy_module/common/lcm_client.py @@ -28,45 +28,74 @@ import logging import time import uuid -from aiokafka import AIOKafkaProducer -from osm_common import dbmongo - +from osm_policy_module.common.common_db_client import CommonDbClient +from osm_policy_module.common.message_bus_client import MessageBusClient from osm_policy_module.core.config import Config log = logging.getLogger(__name__) class LcmClient: - def __init__(self, loop=None): - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI, - 'name': 'osm'}) + """ + Client to communicate with LCM through the message bus. + """ + + def __init__(self, config: Config, loop=None): + self.db_client = CommonDbClient(config) + self.msg_bus = MessageBusClient(config) if not loop: loop = asyncio.get_event_loop() self.loop = loop - async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): - log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) - nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action) - self.common_db.create("nslcmops", nslcmop) - log.info("Sending scale action message: %s", json.dumps(nslcmop)) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - await producer.start() - try: - # Produce message - await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop)) - finally: - # Wait for all pending messages to be delivered or expire. - await producer.stop() - - def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): - log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) + async def scale( + self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str + ): + """ + Sends scaling action to LCM through the message bus. + + :param nsr_id: Network service record id + :param scaling_group_name: Scaling group name + :param vnf_member_index: VNF member index + :param action: Scaling action to be executed. Valid values: scale_in, scale_out + :return: + """ + log.debug( + "scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action + ) + nsr = self.db_client.get_nsr(nsr_id) + nslcmop = self._generate_nslcmop( + nsr_id, scaling_group_name, vnf_member_index, action, nsr["_admin"] + ) + self.db_client.create_nslcmop(nslcmop) + log.debug("Sending scale action message: %s", json.dumps(nslcmop)) + await self.msg_bus.aiowrite("ns", "scale", nslcmop) + + def _generate_nslcmop( + self, + nsr_id: str, + scaling_group_name: str, + vnf_member_index: str, + action: str, + admin: dict, + ): + """ + Builds scaling nslcmop. + + :param nsr_id: Network service record id + :param scaling_group_name: Scaling group name + :param vnf_member_index: VNF member index + :param action: Scaling action to be executed. Valid values: scale_in, scale_out + :param admin: Dict corresponding to the _admin section of the nsr. Required keys: projects_read, projects_write. + :return: + """ + log.debug( + "_generate_nslcmop %s %s %s %s %s", + nsr_id, + scaling_group_name, + vnf_member_index, + action, + admin, + ) _id = str(uuid.uuid4()) now = time.time() params = { @@ -75,10 +104,10 @@ class LcmClient: "scaleVnfType": action.upper(), "scaleByStepData": { "scaling-group-descriptor": scaling_group_name, - "member-vnf-index": str(vnf_member_index) - } + "member-vnf-index": vnf_member_index, + }, }, - "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat()) + "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat()), } nslcmop = { @@ -95,6 +124,138 @@ class LcmClient: "links": { "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id, - } + }, + "_admin": { + "projects_read": admin["projects_read"], + "projects_write": admin["projects_write"], + }, + } + return nslcmop + + async def heal( + self, + nsr_id: str, + vnfinstance_id: str, + vdur_name: str, + vdu_id: str, + vnf_member_index: str, + heal_type: str, + day1: bool, + count_index: int, + ): + """ + Sends healing action to LCM through the message bus. + + param nsr_id: Network service record id + param vdu_id: Scaling vdu id + param vnf_member_index: VNF member index + param heal_type: healing action to be executed. Valid values: restart,respawn + param day1: To run day1 operations + param cause: cause of healing + return + """ + log.debug( + "heal %s %s %s %s %s %s %s %s", + nsr_id, + vnfinstance_id, + vdur_name, + vdu_id, + vnf_member_index, + heal_type, + day1, + count_index, + ) + nsr = self.db_client.get_nsr(nsr_id) + nslcmop = self._generate_nslcmop_heal( + nsr_id, + vnfinstance_id, + vdur_name, + vdu_id, + vnf_member_index, + heal_type, + day1, + count_index, + nsr["_admin"], + ) + self.db_client.create_nslcmop(nslcmop) + log.debug("Sending heal action message: %s", json.dumps(nslcmop)) + await self.msg_bus.aiowrite("ns", "heal", nslcmop) + + def _generate_nslcmop_heal( + self, + nsr_id: str, + vnfinstance_id: str, + vdur_name: str, + vdu_id: str, + vnf_member_index: str, + heal_type: str, + day1: bool, + count_index: int, + admin: dict, + ): + """ + Builds healing nslcmop. + param nsr_id: Network service record id + param vnf_member_index: VNF member index + param action: healing action to be executed. Valid values: restart, respawn + param admin: Dict corresponding to the _admin section of the nsr. Required keys: projects_read, projects_write. + return: + """ + log.debug( + "_generate_nslcmop_heal %s %s %s %s %s %s %s %s %s", + nsr_id, + vnfinstance_id, + vdur_name, + vdu_id, + vnf_member_index, + heal_type, + day1, + count_index, + admin, + ) + _id = str(uuid.uuid4()) + now = time.time() + params = { + "lcmOperationType": "heal", + "nsInstanceId": nsr_id, + "healVnfData": [ + { + "vnfInstanceId": vnfinstance_id, + "cause": "default", + "additionalParams": { + "run-day1": day1, + "vdu": [ + { + "run-day1": day1, + "count-index": count_index, + "vdu-id": vdu_id, + } + ], + }, + } + ], + } + + nslcmop = { + "id": _id, + "_id": _id, + "operationState": "PROCESSING", + "statusEnteredTime": now, + "nsInstanceId": nsr_id, + "member-vnf-index": vnf_member_index, + "lcmOperationType": "heal", + "startTime": now, + "location": "default", + "isAutomaticInvocation": True, + "operationParams": params, + "isCancelPending": False, + "links": { + "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id, + "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id, + }, + "_admin": { + "projects_read": admin["projects_read"], + "projects_write": admin["projects_write"], + }, } return nslcmop