1package amqp
2
3import (
4	"context"
5	"time"
6
7	"github.com/streadway/amqp"
8)
9
10// RequestFunc may take information from a publisher request and put it into a
11// request context. In Subscribers, RequestFuncs are executed prior to invoking
12// the endpoint.
13type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
14
15// SubscriberResponseFunc may take information from a request context and use it to
16// manipulate a Publisher. SubscriberResponseFuncs are only executed in
17// subscribers, after invoking the endpoint but prior to publishing a reply.
18type SubscriberResponseFunc func(context.Context,
19	*amqp.Delivery,
20	Channel,
21	*amqp.Publishing,
22) context.Context
23
24// PublisherResponseFunc may take information from an AMQP request and make the
25// response available for consumption. PublisherResponseFunc are only executed
26// in publishers, after a request has been made, but prior to it being decoded.
27type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
28
29// SetPublishExchange returns a RequestFunc that sets the Exchange field
30// of an AMQP Publish call.
31func SetPublishExchange(publishExchange string) RequestFunc {
32	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
33		return context.WithValue(ctx, ContextKeyExchange, publishExchange)
34	}
35}
36
37// SetPublishKey returns a RequestFunc that sets the Key field
38// of an AMQP Publish call.
39func SetPublishKey(publishKey string) RequestFunc {
40	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
41		return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
42	}
43}
44
45// SetPublishDeliveryMode sets the delivery mode of a Publishing.
46// Please refer to AMQP delivery mode constants in the AMQP package.
47func SetPublishDeliveryMode(dmode uint8) RequestFunc {
48	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
49		pub.DeliveryMode = dmode
50		return ctx
51	}
52}
53
54// SetNackSleepDuration returns a RequestFunc that sets the amount of time
55// to sleep in the event of a Nack.
56// This has to be used in conjunction with an error encoder that Nack and sleeps.
57// One example is the SingleNackRequeueErrorEncoder.
58// It is designed to be used by Subscribers.
59func SetNackSleepDuration(duration time.Duration) RequestFunc {
60	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
61		return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
62	}
63}
64
65// SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck
66// messages when consuming.
67// When set to false, the publisher will Ack the first message it receives with
68// a matching correlationId.
69// It is designed to be used by Publishers.
70func SetConsumeAutoAck(autoAck bool) RequestFunc {
71	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
72		return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
73	}
74}
75
76// SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume
77// function.
78// It is designed to be used by Publishers.
79func SetConsumeArgs(args amqp.Table) RequestFunc {
80	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
81		return context.WithValue(ctx, ContextKeyConsumeArgs, args)
82	}
83}
84
85// SetContentType returns a RequestFunc that sets the ContentType field of
86// an AMQP Publishing.
87func SetContentType(contentType string) RequestFunc {
88	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
89		pub.ContentType = contentType
90		return ctx
91	}
92}
93
94// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
95// of an AMQP Publishing.
96func SetContentEncoding(contentEncoding string) RequestFunc {
97	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
98		pub.ContentEncoding = contentEncoding
99		return ctx
100	}
101}
102
103// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
104// of an AMQP Publishing.
105func SetCorrelationID(cid string) RequestFunc {
106	return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
107		pub.CorrelationId = cid
108		return ctx
109	}
110}
111
112// SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service
113// to Ack the Delivery object after successfully evaluating the endpoint,
114// and before it encodes the response.
115// It is designed to be used by Subscribers.
116func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
117	return func(ctx context.Context,
118		deliv *amqp.Delivery,
119		ch Channel,
120		pub *amqp.Publishing,
121	) context.Context {
122		deliv.Ack(multiple)
123		return ctx
124	}
125}
126
127func getPublishExchange(ctx context.Context) string {
128	if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
129		return exchange.(string)
130	}
131	return ""
132}
133
134func getPublishKey(ctx context.Context) string {
135	if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
136		return publishKey.(string)
137	}
138	return ""
139}
140
141func getNackSleepDuration(ctx context.Context) time.Duration {
142	if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
143		return duration.(time.Duration)
144	}
145	return 0
146}
147
148func getConsumeAutoAck(ctx context.Context) bool {
149	if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
150		return autoAck.(bool)
151	}
152	return false
153}
154
155func getConsumeArgs(ctx context.Context) amqp.Table {
156	if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
157		return args.(amqp.Table)
158	}
159	return nil
160}
161
162type contextKey int
163
164const (
165	// ContextKeyExchange is the value of the reply Exchange in
166	// amqp.Publish.
167	ContextKeyExchange contextKey = iota
168	// ContextKeyPublishKey is the value of the ReplyTo field in
169	// amqp.Publish.
170	ContextKeyPublishKey
171	// ContextKeyNackSleepDuration is the duration to sleep for if the
172	// service Nack and requeues a message.
173	// This is to prevent sporadic send-resending of message
174	// when a message is constantly Nack'd and requeued.
175	ContextKeyNackSleepDuration
176	// ContextKeyAutoAck is the value of autoAck field when calling
177	// amqp.Channel.Consume.
178	ContextKeyAutoAck
179	// ContextKeyConsumeArgs is the value of consumeArgs field when calling
180	// amqp.Channel.Consume.
181	ContextKeyConsumeArgs
182)
183