1// Package pipelined provides the traditional Sensu event pipeline. 2package pipelined 3 4import ( 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io/ioutil" 10 "net" 11 "os" 12 "reflect" 13 "strings" 14 "testing" 15 16 corev2 "github.com/sensu/sensu-go/api/core/v2" 17 "github.com/sensu/sensu-go/command" 18 "github.com/sensu/sensu-go/rpc" 19 "github.com/sensu/sensu-go/testing/mockstore" 20 "github.com/sensu/sensu-go/types" 21 "github.com/stretchr/testify/assert" 22 "github.com/stretchr/testify/mock" 23 "github.com/stretchr/testify/require" 24) 25 26type mockExec struct { 27 mock.Mock 28} 29 30func (m *mockExec) HandleEvent(evt *types.Event, mut []byte) (rpc.HandleEventResponse, error) { 31 args := m.Called(evt, mut) 32 return args.Get(0).(rpc.HandleEventResponse), args.Error(1) 33} 34 35func (m *mockExec) MutateEvent(evt *types.Event) ([]byte, error) { 36 args := m.Called(evt) 37 return args.Get(0).([]byte), args.Error(1) 38} 39 40func (m *mockExec) FilterEvent(evt *types.Event) (bool, error) { 41 args := m.Called(evt) 42 return args.Get(0).(bool), args.Error(1) 43} 44 45// No need to override this method, consumers only log its error 46func (m *mockExec) Close() error { 47 return nil 48} 49 50func TestHelperHandlerProcess(t *testing.T) { 51 if os.Getenv("GO_WANT_HELPER_HANDLER_PROCESS") != "1" { 52 return 53 } 54 55 command := strings.Join(os.Args[3:], " ") 56 stdin, _ := ioutil.ReadAll(os.Stdin) 57 58 switch command { 59 case "cat": 60 fmt.Fprintf(os.Stdout, "%s", stdin) 61 } 62 os.Exit(0) 63} 64 65func TestPipelinedHandleEvent(t *testing.T) { 66 t.SkipNow() 67 p := &Pipelined{} 68 69 store := &mockstore.MockStore{} 70 p.store = store 71 72 entity := types.FixtureEntity("entity1") 73 check := types.FixtureCheck("check1") 74 handler := types.FixtureHandler("handler1") 75 handler.Type = "udp" 76 handler.Socket = &types.HandlerSocket{ 77 Host: "127.0.0.1", 78 Port: 6789, 79 } 80 event := &types.Event{ 81 Entity: entity, 82 Check: check, 83 } 84 extension := &types.Extension{ 85 URL: "http://127.0.0.1", 86 } 87 88 // Currently fire and forget. You may choose to return a map 89 // of handler execution information in the future, don't know 90 // how useful this would be. 91 assert.NoError(t, p.handleEvent(event)) 92 93 event.Check.Handlers = []string{"handler1", "handler2"} 94 95 store.On("GetHandlerByName", mock.Anything, "handler1").Return(handler, nil) 96 store.On("GetHandlerByName", mock.Anything, "handler2").Return((*types.Handler)(nil), nil) 97 store.On("GetExtension", mock.Anything, "handler2").Return(extension, nil) 98 m := &mockExec{} 99 m.On("HandleEvent", event, mock.Anything).Return(rpc.HandleEventResponse{ 100 Output: "ok", 101 Error: "", 102 }, nil) 103 p.extensionExecutor = func(*types.Extension) (rpc.ExtensionExecutor, error) { 104 return m, nil 105 } 106 107 assert.NoError(t, p.handleEvent(event)) 108 m.AssertCalled(t, "HandleEvent", event, mock.Anything) 109} 110 111func TestPipelinedExpandHandlers(t *testing.T) { 112 type storeFunc func(*mockstore.MockStore) 113 114 var nilHandler *corev2.Handler 115 pipeHandler := corev2.FixtureHandler("pipeHandler") 116 setHandler := &corev2.Handler{ 117 ObjectMeta: corev2.NewObjectMeta("setHandler", "default"), 118 Type: corev2.HandlerSetType, 119 Handlers: []string{"pipeHandler"}, 120 } 121 nestedHandler := &corev2.Handler{ 122 ObjectMeta: corev2.NewObjectMeta("nestedHandler", "default"), 123 Type: corev2.HandlerSetType, 124 Handlers: []string{"setHandler"}, 125 } 126 recursiveLoopHandler := &corev2.Handler{ 127 ObjectMeta: corev2.NewObjectMeta("recursiveLoopHandler", "default"), 128 Type: corev2.HandlerSetType, 129 Handlers: []string{"recursiveLoopHandler"}, 130 } 131 132 tests := []struct { 133 name string 134 handlers []string 135 storeFunc storeFunc 136 want map[string]handlerExtensionUnion 137 }{ 138 { 139 name: "pipe handler", 140 handlers: []string{"pipeHandler"}, 141 storeFunc: func(s *mockstore.MockStore) { 142 s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil) 143 }, 144 want: map[string]handlerExtensionUnion{ 145 "pipeHandler": handlerExtensionUnion{Handler: pipeHandler}, 146 }, 147 }, 148 { 149 name: "store error", 150 handlers: []string{"pipeHandler"}, 151 storeFunc: func(s *mockstore.MockStore) { 152 s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(nilHandler, errors.New("error")) 153 }, 154 want: map[string]handlerExtensionUnion{}, 155 }, 156 { 157 name: "set handler", 158 handlers: []string{"setHandler"}, 159 storeFunc: func(s *mockstore.MockStore) { 160 s.On("GetHandlerByName", mock.Anything, "setHandler").Return(setHandler, nil) 161 s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil) 162 }, 163 want: map[string]handlerExtensionUnion{ 164 "pipeHandler": handlerExtensionUnion{Handler: pipeHandler}, 165 }, 166 }, 167 { 168 name: "too deeply nested set handler", 169 handlers: []string{"recursiveLoopHandler"}, 170 storeFunc: func(s *mockstore.MockStore) { 171 s.On("GetHandlerByName", mock.Anything, "recursiveLoopHandler").Return(recursiveLoopHandler, nil) 172 }, 173 want: map[string]handlerExtensionUnion{}, 174 }, 175 { 176 name: "multiple nested set handlers", 177 handlers: []string{"recursiveLoopHandler", "nestedHandler"}, 178 storeFunc: func(s *mockstore.MockStore) { 179 s.On("GetHandlerByName", mock.Anything, "recursiveLoopHandler").Return(recursiveLoopHandler, nil) 180 s.On("GetHandlerByName", mock.Anything, "nestedHandler").Return(nestedHandler, nil) 181 s.On("GetHandlerByName", mock.Anything, "setHandler").Return(setHandler, nil) 182 s.On("GetHandlerByName", mock.Anything, "pipeHandler").Return(pipeHandler, nil) 183 }, 184 want: map[string]handlerExtensionUnion{ 185 "pipeHandler": handlerExtensionUnion{Handler: pipeHandler}, 186 }, 187 }, 188 } 189 for _, tt := range tests { 190 t.Run(tt.name, func(t *testing.T) { 191 store := &mockstore.MockStore{} 192 if tt.storeFunc != nil { 193 tt.storeFunc(store) 194 } 195 196 p := &Pipelined{store: store} 197 got, _ := p.expandHandlers(context.Background(), tt.handlers, 1) 198 if !reflect.DeepEqual(got, tt.want) { 199 t.Errorf("Pipelined.expandHandlers() = %#v, want %#v", got, tt.want) 200 } 201 }) 202 } 203} 204 205func TestPipelinedPipeHandler(t *testing.T) { 206 p := &Pipelined{} 207 p.executor = &command.ExecutionRequest{} 208 209 handler := types.FakeHandlerCommand("cat") 210 handler.Type = "pipe" 211 212 event := &types.Event{} 213 eventData, _ := json.Marshal(event) 214 215 handlerExec, err := p.pipeHandler(handler, eventData) 216 217 assert.NoError(t, err) 218 assert.Equal(t, string(eventData[:]), handlerExec.Output) 219 assert.Equal(t, 0, handlerExec.Status) 220} 221 222func TestPipelinedTcpHandler(t *testing.T) { 223 ready := make(chan struct{}) 224 done := make(chan struct{}) 225 226 p := &Pipelined{} 227 228 handlerSocket := &types.HandlerSocket{ 229 Host: "127.0.0.1", 230 Port: 5678, 231 } 232 233 handler := &types.Handler{ 234 Type: "tcp", 235 Socket: handlerSocket, 236 } 237 238 event := &types.Event{} 239 eventData, _ := json.Marshal(event) 240 241 go func() { 242 listener, err := net.Listen("tcp", "127.0.0.1:5678") 243 assert.NoError(t, err) 244 if err != nil { 245 return 246 } 247 248 defer func() { 249 require.NoError(t, listener.Close()) 250 }() 251 252 ready <- struct{}{} 253 254 conn, err := listener.Accept() 255 if err != nil { 256 return 257 } 258 defer func() { 259 require.NoError(t, conn.Close()) 260 }() 261 262 buffer, err := ioutil.ReadAll(conn) 263 if err != nil { 264 return 265 } 266 267 assert.Equal(t, eventData, buffer) 268 done <- struct{}{} 269 }() 270 271 <-ready 272 _, err := p.socketHandler(handler, eventData) 273 274 assert.NoError(t, err) 275 <-done 276} 277 278func TestPipelinedUdpHandler(t *testing.T) { 279 ready := make(chan struct{}) 280 done := make(chan struct{}) 281 282 p := &Pipelined{} 283 284 handlerSocket := &types.HandlerSocket{ 285 Host: "127.0.0.1", 286 Port: 5678, 287 } 288 289 handler := &types.Handler{ 290 Type: "udp", 291 Socket: handlerSocket, 292 } 293 294 event := &types.Event{} 295 eventData, _ := json.Marshal(event) 296 297 go func() { 298 listener, err := net.ListenPacket("udp", ":5678") 299 assert.NoError(t, err) 300 if err != nil { 301 return 302 } 303 304 defer func() { 305 require.NoError(t, listener.Close()) 306 }() 307 308 ready <- struct{}{} 309 310 buffer := make([]byte, 1024) 311 rlen, _, err := listener.ReadFrom(buffer) 312 313 assert.NoError(t, err) 314 assert.Equal(t, eventData, buffer[0:rlen]) 315 done <- struct{}{} 316 }() 317 318 <-ready 319 320 _, err := p.socketHandler(handler, eventData) 321 322 assert.NoError(t, err) 323 <-done 324} 325 326func TestPipelinedGRPCHandler(t *testing.T) { 327 extension := &types.Extension{} 328 event := types.FixtureEvent("foo", "bar") 329 execFn := func(ext *types.Extension) (rpc.ExtensionExecutor, error) { 330 mock := &mockExec{} 331 mock.On("HandleEvent", event, []byte(nil)).Return(rpc.HandleEventResponse{ 332 Output: "ok", 333 Error: "", 334 }, nil) 335 return mock, nil 336 } 337 p := &Pipelined{ 338 extensionExecutor: execFn, 339 } 340 result, err := p.grpcHandler(extension, event, nil) 341 342 assert.NoError(t, err) 343 assert.Equal(t, "ok", result.Output) 344 assert.Equal(t, "", result.Error) 345} 346