1// Copyright 2017 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package loadtest implements load testing for pubsub, 16// following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ . 17// 18// This package is experimental. 19package loadtest 20 21import ( 22 "bytes" 23 "context" 24 "errors" 25 "log" 26 "runtime" 27 "strconv" 28 "sync" 29 "sync/atomic" 30 "time" 31 32 "cloud.google.com/go/pubsub" 33 pb "cloud.google.com/go/pubsub/loadtest/pb" 34 "github.com/golang/protobuf/ptypes" 35 "golang.org/x/time/rate" 36) 37 38type pubServerConfig struct { 39 topic *pubsub.Topic 40 msgData []byte 41 batchSize int32 42} 43 44// PubServer is a dummy Pub/Sub server for load testing. 45type PubServer struct { 46 ID string 47 48 cfg atomic.Value 49 seqNum int32 50} 51 52// Start starts the server. 53func (l *PubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) { 54 log.Println("received start") 55 c, err := pubsub.NewClient(ctx, req.Project) 56 if err != nil { 57 return nil, err 58 } 59 dur, err := ptypes.Duration(req.PublishBatchDuration) 60 if err != nil { 61 return nil, err 62 } 63 l.init(c, req.Topic, req.MessageSize, req.PublishBatchSize, dur) 64 log.Println("started") 65 return &pb.StartResponse{}, nil 66} 67 68func (l *PubServer) init(c *pubsub.Client, topicName string, msgSize, batchSize int32, batchDur time.Duration) { 69 topic := c.Topic(topicName) 70 topic.PublishSettings = pubsub.PublishSettings{ 71 DelayThreshold: batchDur, 72 CountThreshold: 950, 73 ByteThreshold: 9500000, 74 } 75 76 l.cfg.Store(pubServerConfig{ 77 topic: topic, 78 msgData: bytes.Repeat([]byte{'A'}, int(msgSize)), 79 batchSize: batchSize, 80 }) 81} 82 83// Execute executes a request. 84func (l *PubServer) Execute(ctx context.Context, _ *pb.ExecuteRequest) (*pb.ExecuteResponse, error) { 85 latencies, err := l.publishBatch() 86 if err != nil { 87 log.Printf("error: %v", err) 88 return nil, err 89 } 90 return &pb.ExecuteResponse{Latencies: latencies}, nil 91} 92 93func (l *PubServer) publishBatch() ([]int64, error) { 94 var cfg pubServerConfig 95 if c, ok := l.cfg.Load().(pubServerConfig); ok { 96 cfg = c 97 } else { 98 return nil, errors.New("config not loaded") 99 } 100 101 start := time.Now() 102 latencies := make([]int64, cfg.batchSize) 103 startStr := strconv.FormatInt(start.UnixNano()/1e6, 10) 104 seqNum := atomic.AddInt32(&l.seqNum, cfg.batchSize) - cfg.batchSize 105 106 rs := make([]*pubsub.PublishResult, cfg.batchSize) 107 for i := int32(0); i < cfg.batchSize; i++ { 108 rs[i] = cfg.topic.Publish(context.TODO(), &pubsub.Message{ 109 Data: cfg.msgData, 110 Attributes: map[string]string{ 111 "sendTime": startStr, 112 "clientId": l.ID, 113 "sequenceNumber": strconv.Itoa(int(seqNum + i)), 114 }, 115 }) 116 } 117 for i, r := range rs { 118 _, err := r.Get(context.Background()) 119 if err != nil { 120 return nil, err 121 } 122 // TODO(jba,pongad): fix latencies 123 // Later values will be skewed by earlier ones, since we wait for the 124 // results in order. (On the other hand, it may not matter much, since 125 // messages are added to bundles in order and bundles get sent more or 126 // less in order.) If we want more accurate values, we can either start 127 // a goroutine for each result (similar to the original code using a 128 // callback), or call reflect.Select with the Ready channels of the 129 // results. 130 latencies[i] = time.Since(start).Nanoseconds() / 1e6 131 } 132 return latencies, nil 133} 134 135// SubServer is a dummy Pub/Sub server for load testing. 136type SubServer struct { 137 // TODO(deklerk): what is this actually for? 138 lim *rate.Limiter 139 140 mu sync.Mutex 141 idents []*pb.MessageIdentifier 142 latencies []int64 143} 144 145// Start starts the server. 146func (s *SubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartResponse, error) { 147 log.Println("received start") 148 s.lim = rate.NewLimiter(rate.Every(time.Second), 1) 149 150 c, err := pubsub.NewClient(ctx, req.Project) 151 if err != nil { 152 return nil, err 153 } 154 155 // Load test API doesn't define any way to stop right now. 156 go func() { 157 sub := c.Subscription(req.GetPubsubOptions().Subscription) 158 sub.ReceiveSettings.NumGoroutines = 10 * runtime.GOMAXPROCS(0) 159 err := sub.Receive(context.Background(), s.callback) 160 log.Fatal(err) 161 }() 162 163 log.Println("started") 164 return &pb.StartResponse{}, nil 165} 166 167func (s *SubServer) callback(_ context.Context, m *pubsub.Message) { 168 id, err := strconv.ParseInt(m.Attributes["clientId"], 10, 64) 169 if err != nil { 170 log.Println(err) 171 m.Nack() 172 return 173 } 174 175 seqNum, err := strconv.ParseInt(m.Attributes["sequenceNumber"], 10, 32) 176 if err != nil { 177 log.Println(err) 178 m.Nack() 179 return 180 } 181 182 sendTimeMillis, err := strconv.ParseInt(m.Attributes["sendTime"], 10, 64) 183 if err != nil { 184 log.Println(err) 185 m.Nack() 186 return 187 } 188 189 latency := time.Now().UnixNano()/1e6 - sendTimeMillis 190 ident := &pb.MessageIdentifier{ 191 PublisherClientId: id, 192 SequenceNumber: int32(seqNum), 193 } 194 195 s.mu.Lock() 196 s.idents = append(s.idents, ident) 197 s.latencies = append(s.latencies, latency) 198 s.mu.Unlock() 199 m.Ack() 200} 201 202// Execute executes the request. 203func (s *SubServer) Execute(ctx context.Context, _ *pb.ExecuteRequest) (*pb.ExecuteResponse, error) { 204 // Throttle so the load tester doesn't spam us and consume all our CPU. 205 if err := s.lim.Wait(ctx); err != nil { 206 return nil, err 207 } 208 209 s.mu.Lock() 210 idents := s.idents 211 s.idents = nil 212 latencies := s.latencies 213 s.latencies = nil 214 s.mu.Unlock() 215 216 return &pb.ExecuteResponse{ 217 Latencies: latencies, 218 ReceivedMessages: idents, 219 }, nil 220} 221