1package notifiers 2 3import ( 4 "strconv" 5 6 "fmt" 7 8 "github.com/grafana/grafana/pkg/bus" 9 "github.com/grafana/grafana/pkg/components/simplejson" 10 "github.com/grafana/grafana/pkg/infra/log" 11 "github.com/grafana/grafana/pkg/models" 12 "github.com/grafana/grafana/pkg/services/alerting" 13) 14 15func init() { 16 alerting.RegisterNotifier(&alerting.NotifierPlugin{ 17 Type: "kafka", 18 Name: "Kafka REST Proxy", 19 Description: "Sends notifications to Kafka Rest Proxy", 20 Heading: "Kafka settings", 21 Factory: NewKafkaNotifier, 22 Options: []alerting.NotifierOption{ 23 { 24 Label: "Kafka REST Proxy", 25 Element: alerting.ElementTypeInput, 26 InputType: alerting.InputTypeText, 27 Placeholder: "http://localhost:8082", 28 PropertyName: "kafkaRestProxy", 29 Required: true, 30 }, 31 { 32 Label: "Topic", 33 Element: alerting.ElementTypeInput, 34 InputType: alerting.InputTypeText, 35 Placeholder: "topic1", 36 PropertyName: "kafkaTopic", 37 Required: true, 38 }, 39 }, 40 }) 41} 42 43// NewKafkaNotifier is the constructor function for the Kafka notifier. 44func NewKafkaNotifier(model *models.AlertNotification, _ alerting.GetDecryptedValueFn) (alerting.Notifier, error) { 45 endpoint := model.Settings.Get("kafkaRestProxy").MustString() 46 if endpoint == "" { 47 return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"} 48 } 49 topic := model.Settings.Get("kafkaTopic").MustString() 50 if topic == "" { 51 return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} 52 } 53 54 return &KafkaNotifier{ 55 NotifierBase: NewNotifierBase(model), 56 Endpoint: endpoint, 57 Topic: topic, 58 log: log.New("alerting.notifier.kafka"), 59 }, nil 60} 61 62// KafkaNotifier is responsible for sending 63// alert notifications to Kafka. 64type KafkaNotifier struct { 65 NotifierBase 66 Endpoint string 67 Topic string 68 log log.Logger 69} 70 71// Notify sends the alert notification. 72func (kn *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { 73 state := evalContext.Rule.State 74 75 customData := triggMetrString 76 for _, evt := range evalContext.EvalMatches { 77 customData += fmt.Sprintf("%s: %v\n", evt.Metric, evt.Value) 78 } 79 80 kn.log.Info("Notifying Kafka", "alert_state", state) 81 82 recordJSON := simplejson.New() 83 records := make([]interface{}, 1) 84 85 bodyJSON := simplejson.New() 86 // get alert state in the kafka output issue #11401 87 bodyJSON.Set("alert_state", state) 88 bodyJSON.Set("description", evalContext.Rule.Name+" - "+evalContext.Rule.Message) 89 bodyJSON.Set("client", "Grafana") 90 bodyJSON.Set("details", customData) 91 bodyJSON.Set("incident_key", "alertId-"+strconv.FormatInt(evalContext.Rule.ID, 10)) 92 93 ruleURL, err := evalContext.GetRuleURL() 94 if err != nil { 95 kn.log.Error("Failed get rule link", "error", err) 96 return err 97 } 98 bodyJSON.Set("client_url", ruleURL) 99 100 if kn.NeedsImage() && evalContext.ImagePublicURL != "" { 101 contexts := make([]interface{}, 1) 102 imageJSON := simplejson.New() 103 imageJSON.Set("type", "image") 104 imageJSON.Set("src", evalContext.ImagePublicURL) 105 contexts[0] = imageJSON 106 bodyJSON.Set("contexts", contexts) 107 } 108 109 valueJSON := simplejson.New() 110 valueJSON.Set("value", bodyJSON) 111 records[0] = valueJSON 112 recordJSON.Set("records", records) 113 body, _ := recordJSON.MarshalJSON() 114 115 topicURL := kn.Endpoint + "/topics/" + kn.Topic 116 117 cmd := &models.SendWebhookSync{ 118 Url: topicURL, 119 Body: string(body), 120 HttpMethod: "POST", 121 HttpHeader: map[string]string{ 122 "Content-Type": "application/vnd.kafka.json.v2+json", 123 "Accept": "application/vnd.kafka.v2+json", 124 }, 125 } 126 127 if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil { 128 kn.log.Error("Failed to send notification to Kafka", "error", err, "body", string(body)) 129 return err 130 } 131 132 return nil 133} 134