1package notifiers
2
3import (
4	"strconv"
5
6	"fmt"
7
8	"github.com/grafana/grafana/pkg/bus"
9	"github.com/grafana/grafana/pkg/components/simplejson"
10	"github.com/grafana/grafana/pkg/infra/log"
11	"github.com/grafana/grafana/pkg/models"
12	"github.com/grafana/grafana/pkg/services/alerting"
13)
14
15func init() {
16	alerting.RegisterNotifier(&alerting.NotifierPlugin{
17		Type:        "kafka",
18		Name:        "Kafka REST Proxy",
19		Description: "Sends notifications to Kafka Rest Proxy",
20		Heading:     "Kafka settings",
21		Factory:     NewKafkaNotifier,
22		Options: []alerting.NotifierOption{
23			{
24				Label:        "Kafka REST Proxy",
25				Element:      alerting.ElementTypeInput,
26				InputType:    alerting.InputTypeText,
27				Placeholder:  "http://localhost:8082",
28				PropertyName: "kafkaRestProxy",
29				Required:     true,
30			},
31			{
32				Label:        "Topic",
33				Element:      alerting.ElementTypeInput,
34				InputType:    alerting.InputTypeText,
35				Placeholder:  "topic1",
36				PropertyName: "kafkaTopic",
37				Required:     true,
38			},
39		},
40	})
41}
42
43// NewKafkaNotifier is the constructor function for the Kafka notifier.
44func NewKafkaNotifier(model *models.AlertNotification, _ alerting.GetDecryptedValueFn) (alerting.Notifier, error) {
45	endpoint := model.Settings.Get("kafkaRestProxy").MustString()
46	if endpoint == "" {
47		return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"}
48	}
49	topic := model.Settings.Get("kafkaTopic").MustString()
50	if topic == "" {
51		return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"}
52	}
53
54	return &KafkaNotifier{
55		NotifierBase: NewNotifierBase(model),
56		Endpoint:     endpoint,
57		Topic:        topic,
58		log:          log.New("alerting.notifier.kafka"),
59	}, nil
60}
61
62// KafkaNotifier is responsible for sending
63// alert notifications to Kafka.
64type KafkaNotifier struct {
65	NotifierBase
66	Endpoint string
67	Topic    string
68	log      log.Logger
69}
70
71// Notify sends the alert notification.
72func (kn *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error {
73	state := evalContext.Rule.State
74
75	customData := triggMetrString
76	for _, evt := range evalContext.EvalMatches {
77		customData += fmt.Sprintf("%s: %v\n", evt.Metric, evt.Value)
78	}
79
80	kn.log.Info("Notifying Kafka", "alert_state", state)
81
82	recordJSON := simplejson.New()
83	records := make([]interface{}, 1)
84
85	bodyJSON := simplejson.New()
86	// get alert state in the kafka output issue #11401
87	bodyJSON.Set("alert_state", state)
88	bodyJSON.Set("description", evalContext.Rule.Name+" - "+evalContext.Rule.Message)
89	bodyJSON.Set("client", "Grafana")
90	bodyJSON.Set("details", customData)
91	bodyJSON.Set("incident_key", "alertId-"+strconv.FormatInt(evalContext.Rule.ID, 10))
92
93	ruleURL, err := evalContext.GetRuleURL()
94	if err != nil {
95		kn.log.Error("Failed get rule link", "error", err)
96		return err
97	}
98	bodyJSON.Set("client_url", ruleURL)
99
100	if kn.NeedsImage() && evalContext.ImagePublicURL != "" {
101		contexts := make([]interface{}, 1)
102		imageJSON := simplejson.New()
103		imageJSON.Set("type", "image")
104		imageJSON.Set("src", evalContext.ImagePublicURL)
105		contexts[0] = imageJSON
106		bodyJSON.Set("contexts", contexts)
107	}
108
109	valueJSON := simplejson.New()
110	valueJSON.Set("value", bodyJSON)
111	records[0] = valueJSON
112	recordJSON.Set("records", records)
113	body, _ := recordJSON.MarshalJSON()
114
115	topicURL := kn.Endpoint + "/topics/" + kn.Topic
116
117	cmd := &models.SendWebhookSync{
118		Url:        topicURL,
119		Body:       string(body),
120		HttpMethod: "POST",
121		HttpHeader: map[string]string{
122			"Content-Type": "application/vnd.kafka.json.v2+json",
123			"Accept":       "application/vnd.kafka.v2+json",
124		},
125	}
126
127	if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil {
128		kn.log.Error("Failed to send notification to Kafka", "error", err, "body", string(body))
129		return err
130	}
131
132	return nil
133}
134