1package amqp 2 3import ( 4 "context" 5 "time" 6 7 "github.com/streadway/amqp" 8) 9 10// RequestFunc may take information from a publisher request and put it into a 11// request context. In Subscribers, RequestFuncs are executed prior to invoking 12// the endpoint. 13type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context 14 15// SubscriberResponseFunc may take information from a request context and use it to 16// manipulate a Publisher. SubscriberResponseFuncs are only executed in 17// subscribers, after invoking the endpoint but prior to publishing a reply. 18type SubscriberResponseFunc func(context.Context, 19 *amqp.Delivery, 20 Channel, 21 *amqp.Publishing, 22) context.Context 23 24// PublisherResponseFunc may take information from an AMQP request and make the 25// response available for consumption. PublisherResponseFunc are only executed 26// in publishers, after a request has been made, but prior to it being decoded. 27type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context 28 29// SetPublishExchange returns a RequestFunc that sets the Exchange field 30// of an AMQP Publish call. 31func SetPublishExchange(publishExchange string) RequestFunc { 32 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 33 return context.WithValue(ctx, ContextKeyExchange, publishExchange) 34 } 35} 36 37// SetPublishKey returns a RequestFunc that sets the Key field 38// of an AMQP Publish call. 39func SetPublishKey(publishKey string) RequestFunc { 40 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 41 return context.WithValue(ctx, ContextKeyPublishKey, publishKey) 42 } 43} 44 45// SetPublishDeliveryMode sets the delivery mode of a Publishing. 46// Please refer to AMQP delivery mode constants in the AMQP package. 47func SetPublishDeliveryMode(dmode uint8) RequestFunc { 48 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 49 pub.DeliveryMode = dmode 50 return ctx 51 } 52} 53 54// SetNackSleepDuration returns a RequestFunc that sets the amount of time 55// to sleep in the event of a Nack. 56// This has to be used in conjunction with an error encoder that Nack and sleeps. 57// One example is the SingleNackRequeueErrorEncoder. 58// It is designed to be used by Subscribers. 59func SetNackSleepDuration(duration time.Duration) RequestFunc { 60 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 61 return context.WithValue(ctx, ContextKeyNackSleepDuration, duration) 62 } 63} 64 65// SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck 66// messages when consuming. 67// When set to false, the publisher will Ack the first message it receives with 68// a matching correlationId. 69// It is designed to be used by Publishers. 70func SetConsumeAutoAck(autoAck bool) RequestFunc { 71 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 72 return context.WithValue(ctx, ContextKeyAutoAck, autoAck) 73 } 74} 75 76// SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume 77// function. 78// It is designed to be used by Publishers. 79func SetConsumeArgs(args amqp.Table) RequestFunc { 80 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 81 return context.WithValue(ctx, ContextKeyConsumeArgs, args) 82 } 83} 84 85// SetContentType returns a RequestFunc that sets the ContentType field of 86// an AMQP Publishing. 87func SetContentType(contentType string) RequestFunc { 88 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 89 pub.ContentType = contentType 90 return ctx 91 } 92} 93 94// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field 95// of an AMQP Publishing. 96func SetContentEncoding(contentEncoding string) RequestFunc { 97 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 98 pub.ContentEncoding = contentEncoding 99 return ctx 100 } 101} 102 103// SetCorrelationID returns a RequestFunc that sets the CorrelationId field 104// of an AMQP Publishing. 105func SetCorrelationID(cid string) RequestFunc { 106 return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context { 107 pub.CorrelationId = cid 108 return ctx 109 } 110} 111 112// SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service 113// to Ack the Delivery object after successfully evaluating the endpoint, 114// and before it encodes the response. 115// It is designed to be used by Subscribers. 116func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc { 117 return func(ctx context.Context, 118 deliv *amqp.Delivery, 119 ch Channel, 120 pub *amqp.Publishing, 121 ) context.Context { 122 deliv.Ack(multiple) 123 return ctx 124 } 125} 126 127func getPublishExchange(ctx context.Context) string { 128 if exchange := ctx.Value(ContextKeyExchange); exchange != nil { 129 return exchange.(string) 130 } 131 return "" 132} 133 134func getPublishKey(ctx context.Context) string { 135 if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil { 136 return publishKey.(string) 137 } 138 return "" 139} 140 141func getNackSleepDuration(ctx context.Context) time.Duration { 142 if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil { 143 return duration.(time.Duration) 144 } 145 return 0 146} 147 148func getConsumeAutoAck(ctx context.Context) bool { 149 if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil { 150 return autoAck.(bool) 151 } 152 return false 153} 154 155func getConsumeArgs(ctx context.Context) amqp.Table { 156 if args := ctx.Value(ContextKeyConsumeArgs); args != nil { 157 return args.(amqp.Table) 158 } 159 return nil 160} 161 162type contextKey int 163 164const ( 165 // ContextKeyExchange is the value of the reply Exchange in 166 // amqp.Publish. 167 ContextKeyExchange contextKey = iota 168 // ContextKeyPublishKey is the value of the ReplyTo field in 169 // amqp.Publish. 170 ContextKeyPublishKey 171 // ContextKeyNackSleepDuration is the duration to sleep for if the 172 // service Nack and requeues a message. 173 // This is to prevent sporadic send-resending of message 174 // when a message is constantly Nack'd and requeued. 175 ContextKeyNackSleepDuration 176 // ContextKeyAutoAck is the value of autoAck field when calling 177 // amqp.Channel.Consume. 178 ContextKeyAutoAck 179 // ContextKeyConsumeArgs is the value of consumeArgs field when calling 180 // amqp.Channel.Consume. 181 ContextKeyConsumeArgs 182) 183