1 ########################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 from osm_common
.dataclasses
.temporal_dataclasses
import (
22 from osm_common
.temporal_constants
import (
28 WORKFLOW_NS_INSTANTIATE
,
30 from osm_common
.wftemporal
import WFTemporal
35 "created": WORKFLOW_VIM_CREATE
,
36 "edited": WORKFLOW_VIM_UPDATE
,
37 "delete": WORKFLOW_VIM_DELETE
,
40 def start_vim_workflow(self
, action
: str, content
: dict) -> None:
41 vim_uuid
= content
["_id"]
42 # Derive the operation id (although for a create it will always be 0)
43 op_id
= content
["op_id"].split(":")[1]
44 workflow
= NbiTemporal
.workflow_mappings
[action
]
45 workflow_data
= VimOperationInput(vim_uuid
, op_id
)
47 WFTemporal(logger_name
="nbi.vim_workflow").start_workflow(
48 task_queue
=LCM_TASK_QUEUE
,
49 workflow_name
=workflow
,
50 workflow_data
=workflow_data
,
55 lcm_workflow_mappings
= {
56 "instantiate": WORKFLOW_NS_INSTANTIATE
,
57 "terminate": WORKFLOW_NSLCM_NO_OP
,
58 "vca_status_refresh": WORKFLOW_NSLCM_NO_OP
,
59 "action": WORKFLOW_NSLCM_NO_OP
,
60 "update": WORKFLOW_NSLCM_NO_OP
,
61 "scale": WORKFLOW_NSLCM_NO_OP
,
62 "heal": WORKFLOW_NSLCM_NO_OP
,
63 "migrate": WORKFLOW_NSLCM_NO_OP
,
64 "verticalscale": WORKFLOW_NSLCM_NO_OP
,
65 "deleted": WORKFLOW_NSLCM_NO_OP
,
66 "vnf_terminated": WORKFLOW_NSLCM_NO_OP
,
67 "policy_updated": WORKFLOW_NSLCM_NO_OP
,
68 "terminated": WORKFLOW_NSLCM_NO_OP
,
69 "instantiated": WORKFLOW_NSLCM_NO_OP
,
70 "scaled": WORKFLOW_NSLCM_NO_OP
,
71 "healed": WORKFLOW_NSLCM_NO_OP
,
72 "actioned": WORKFLOW_NSLCM_NO_OP
,
73 "updated": WORKFLOW_NSLCM_NO_OP
,
74 "migrated": WORKFLOW_NSLCM_NO_OP
,
75 "verticalscaled": WORKFLOW_NSLCM_NO_OP
,
78 def start_ns_workflow(self
, nslcmop
: dict) -> None:
80 WFTemporal(logger_name
="nbi.lcm_workflow").start_workflow(
81 task_queue
=LCM_TASK_QUEUE
,
82 workflow_name
=NbiTemporal
.lcm_workflow_mappings
[
83 nslcmop
["lcmOperationType"]
85 workflow_data
=NsLcmOperationInput(nslcmop
=nslcmop
),
86 id=nslcmop
["nsInstanceId"],