1// Package pipelined provides the traditional Sensu event pipeline.
2package pipelined
3
4import (
5	"context"
6	"encoding/json"
7	"errors"
8	"fmt"
9	"io/ioutil"
10	"net"
11	"os"
12	"reflect"
13	"strings"
14	"testing"
15
16	corev2 "github.com/sensu/sensu-go/api/core/v2"
17	"github.com/sensu/sensu-go/command"
18	"github.com/sensu/sensu-go/rpc"
19	"github.com/sensu/sensu-go/testing/mockstore"
20	"github.com/sensu/sensu-go/types"
21	"github.com/stretchr/testify/assert"
22	"github.com/stretchr/testify/mock"
23	"github.com/stretchr/testify/require"
24)
25
26type mockExec struct {
27	mock.Mock
28}
29
30func (m *mockExec) HandleEvent(evt *types.Event, mut []byte) (rpc.HandleEventResponse, error) {
31	args := m.Called(evt, mut)
32	return args.Get(0).(rpc.HandleEventResponse), args.Error(1)
33}
34
35func (m *mockExec) MutateEvent(evt *types.Event) ([]byte, error) {
36	args := m.Called(evt)
37	return args.Get(0).([]byte), args.Error(1)
38}
39
40func (m *mockExec) FilterEvent(evt *types.Event) (bool, error) {
41	args := m.Called(evt)
42	return args.Get(0).(bool), args.Error(1)
43}
44
45// No need to override this method, consumers only log its error
46func (m *mockExec) Close() error {
47	return nil
48}
49
50func TestHelperHandlerProcess(t *testing.T) {
51	if os.Getenv("GO_WANT_HELPER_HANDLER_PROCESS") != "1" {
52		return
53	}
54
55	command := strings.Join(os.Args[3:], " ")
56	stdin, _ := ioutil.ReadAll(os.Stdin)
57
58	switch command {
59	case "cat":
60		fmt.Fprintf(os.Stdout, "%s", stdin)
61	}
62	os.Exit(0)
63}
64
65func TestPipelinedHandleEvent(t *testing.T) {
66	t.SkipNow()
67	p := &Pipelined{}
68
69	store := &mockstore.MockStore{}
70	p.store = store
71
72	entity := types.FixtureEntity("entity1")
73	check := types.FixtureCheck("check1")
74	handler := types.FixtureHandler("handler1")
75	handler.Type = "udp"
76	handler.Socket = &types.HandlerSocket{
77		Host: "127.0.0.1",
78		Port: 6789,
79	}
80	event := &types.Event{
81		Entity: entity,
82		Check:  check,
83	}
84	extension := &types.Extension{
85		URL: "http://127.0.0.1",
86	}
87
88	// Currently fire and forget. You may choose to return a map
89	// of handler execution information in the future, don't know
90	// how useful this would be.
91	assert.NoError(t, p.handleEvent(event))
92
93	event.Check.Handlers = []string{"handler1", "handler2"}
94
95	store.On("GetHandlerByName", mock.Anything, "handler1").Return(handler, nil)
96	store.On("GetHandlerByName", mock.Anything, "handler2").Return((*types.Handler)(nil), nil)
97	store.On("GetExtension", mock.Anything, "handler2").Return(extension, nil)
98	m := &mockExec{}
99	m.On("HandleEvent", event, mock.Anything).Return(rpc.HandleEventResponse{
100		Output: "ok",
101		Error:  "",
102	}, nil)
103	p.extensionExecutor = func(*types.Extension) (rpc.ExtensionExecutor, error) {
104		return m, nil
105	}
106
107	assert.NoError(t, p.handleEvent(event))
108	m.AssertCalled(t, "HandleEvent", event, mock.Anything)
109}
110
111func TestPipelinedExpandHandlers(t *testing.T) {
112	type storeFunc func(*mockstore.MockStore)
113
114	var nilHandler *corev2.Handler
115	pipeHandler := corev2.FixtureHandler("pipeHandler")
116	setHandler := &corev2.Handler{
117		ObjectMeta: corev2.NewObjectMeta("setHandler", "default"),
118		Type:       corev2.HandlerSetType,
119		Handlers:   []string{"pipeHandler"},
120	}
121	nestedHandler := &corev2.Handler{
122		ObjectMeta: corev2.NewObjectMeta("nestedHandler", "default"),
123		Type:       corev2.HandlerSetType,
124		Handlers:   []string{"setHandler"},
125	}
126	recursiveLoopHandler := &corev2.Handler{
127		ObjectMeta: corev2.NewObjectMeta("recursiveLoopHandler", "default"),
128		Type:       corev2.HandlerSetType,
129		Handlers:   []string{"recursiveLoopHandler"},
130	}
131
132	tests := []struct {
133		name      string
134		handlers  []string
135		storeFunc storeFunc
136		want      map[string]handlerExtensionUnion
137	}{
138		{
139			name:     "pipe handler",
140			handlers: []string{"pipeHandler"},
141			storeFunc: func(s *mockstore.MockStore) {
142				s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil)
143			},
144			want: map[string]handlerExtensionUnion{
145				"pipeHandler": handlerExtensionUnion{Handler: pipeHandler},
146			},
147		},
148		{
149			name:     "store error",
150			handlers: []string{"pipeHandler"},
151			storeFunc: func(s *mockstore.MockStore) {
152				s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(nilHandler, errors.New("error"))
153			},
154			want: map[string]handlerExtensionUnion{},
155		},
156		{
157			name:     "set handler",
158			handlers: []string{"setHandler"},
159			storeFunc: func(s *mockstore.MockStore) {
160				s.On("GetHandlerByName", mock.Anything, "setHandler").Return(setHandler, nil)
161				s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil)
162			},
163			want: map[string]handlerExtensionUnion{
164				"pipeHandler": handlerExtensionUnion{Handler: pipeHandler},
165			},
166		},
167		{
168			name:     "too deeply nested set handler",
169			handlers: []string{"recursiveLoopHandler"},
170			storeFunc: func(s *mockstore.MockStore) {
171				s.On("GetHandlerByName", mock.Anything, "recursiveLoopHandler").Return(recursiveLoopHandler, nil)
172			},
173			want: map[string]handlerExtensionUnion{},
174		},
175		{
176			name:     "multiple nested set handlers",
177			handlers: []string{"recursiveLoopHandler", "nestedHandler"},
178			storeFunc: func(s *mockstore.MockStore) {
179				s.On("GetHandlerByName", mock.Anything, "recursiveLoopHandler").Return(recursiveLoopHandler, nil)
180				s.On("GetHandlerByName", mock.Anything, "nestedHandler").Return(nestedHandler, nil)
181				s.On("GetHandlerByName", mock.Anything, "setHandler").Return(setHandler, nil)
182				s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil)
183			},
184			want: map[string]handlerExtensionUnion{
185				"pipeHandler": handlerExtensionUnion{Handler: pipeHandler},
186			},
187		},
188	}
189	for _, tt := range tests {
190		t.Run(tt.name, func(t *testing.T) {
191			store := &mockstore.MockStore{}
192			if tt.storeFunc != nil {
193				tt.storeFunc(store)
194			}
195
196			p := &Pipelined{store: store}
197			got, _ := p.expandHandlers(context.Background(), tt.handlers, 1)
198			if !reflect.DeepEqual(got, tt.want) {
199				t.Errorf("Pipelined.expandHandlers() = %#v, want %#v", got, tt.want)
200			}
201		})
202	}
203}
204
205func TestPipelinedPipeHandler(t *testing.T) {
206	p := &Pipelined{}
207	p.executor = &command.ExecutionRequest{}
208
209	handler := types.FakeHandlerCommand("cat")
210	handler.Type = "pipe"
211
212	event := &types.Event{}
213	eventData, _ := json.Marshal(event)
214
215	handlerExec, err := p.pipeHandler(handler, eventData)
216
217	assert.NoError(t, err)
218	assert.Equal(t, string(eventData[:]), handlerExec.Output)
219	assert.Equal(t, 0, handlerExec.Status)
220}
221
222func TestPipelinedTcpHandler(t *testing.T) {
223	ready := make(chan struct{})
224	done := make(chan struct{})
225
226	p := &Pipelined{}
227
228	handlerSocket := &types.HandlerSocket{
229		Host: "127.0.0.1",
230		Port: 5678,
231	}
232
233	handler := &types.Handler{
234		Type:   "tcp",
235		Socket: handlerSocket,
236	}
237
238	event := &types.Event{}
239	eventData, _ := json.Marshal(event)
240
241	go func() {
242		listener, err := net.Listen("tcp", "127.0.0.1:5678")
243		assert.NoError(t, err)
244		if err != nil {
245			return
246		}
247
248		defer func() {
249			require.NoError(t, listener.Close())
250		}()
251
252		ready <- struct{}{}
253
254		conn, err := listener.Accept()
255		if err != nil {
256			return
257		}
258		defer func() {
259			require.NoError(t, conn.Close())
260		}()
261
262		buffer, err := ioutil.ReadAll(conn)
263		if err != nil {
264			return
265		}
266
267		assert.Equal(t, eventData, buffer)
268		done <- struct{}{}
269	}()
270
271	<-ready
272	_, err := p.socketHandler(handler, eventData)
273
274	assert.NoError(t, err)
275	<-done
276}
277
278func TestPipelinedUdpHandler(t *testing.T) {
279	ready := make(chan struct{})
280	done := make(chan struct{})
281
282	p := &Pipelined{}
283
284	handlerSocket := &types.HandlerSocket{
285		Host: "127.0.0.1",
286		Port: 5678,
287	}
288
289	handler := &types.Handler{
290		Type:   "udp",
291		Socket: handlerSocket,
292	}
293
294	event := &types.Event{}
295	eventData, _ := json.Marshal(event)
296
297	go func() {
298		listener, err := net.ListenPacket("udp", ":5678")
299		assert.NoError(t, err)
300		if err != nil {
301			return
302		}
303
304		defer func() {
305			require.NoError(t, listener.Close())
306		}()
307
308		ready <- struct{}{}
309
310		buffer := make([]byte, 1024)
311		rlen, _, err := listener.ReadFrom(buffer)
312
313		assert.NoError(t, err)
314		assert.Equal(t, eventData, buffer[0:rlen])
315		done <- struct{}{}
316	}()
317
318	<-ready
319
320	_, err := p.socketHandler(handler, eventData)
321
322	assert.NoError(t, err)
323	<-done
324}
325
326func TestPipelinedGRPCHandler(t *testing.T) {
327	extension := &types.Extension{}
328	event := types.FixtureEvent("foo", "bar")
329	execFn := func(ext *types.Extension) (rpc.ExtensionExecutor, error) {
330		mock := &mockExec{}
331		mock.On("HandleEvent", event, []byte(nil)).Return(rpc.HandleEventResponse{
332			Output: "ok",
333			Error:  "",
334		}, nil)
335		return mock, nil
336	}
337	p := &Pipelined{
338		extensionExecutor: execFn,
339	}
340	result, err := p.grpcHandler(extension, event, nil)
341
342	assert.NoError(t, err)
343	assert.Equal(t, "ok", result.Output)
344	assert.Equal(t, "", result.Error)
345}
346