1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package pubsub
15
16import (
17	"time"
18)
19
20// AckHandler implements ack/nack handling.
21type AckHandler interface {
22	// OnAck processes a message ack.
23	OnAck()
24
25	// OnNack processes a message nack.
26	OnNack()
27}
28
29// Message represents a Pub/Sub message.
30type Message struct {
31	// ID identifies this message. This ID is assigned by the server and is
32	// populated for Messages obtained from a subscription.
33	//
34	// This field is read-only.
35	ID string
36
37	// Data is the actual data in the message.
38	Data []byte
39
40	// Attributes represents the key-value pairs the current message is
41	// labelled with.
42	Attributes map[string]string
43
44	// PublishTime is the time at which the message was published. This is
45	// populated by the server for Messages obtained from a subscription.
46	//
47	// This field is read-only.
48	PublishTime time.Time
49
50	// DeliveryAttempt is the number of times a message has been delivered.
51	// This is part of the dead lettering feature that forwards messages that
52	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
53	// If dead lettering is enabled, this will be set on all attempts, starting
54	// with value 1. Otherwise, the value will be nil.
55	// This field is read-only.
56	DeliveryAttempt *int
57
58	// OrderingKey identifies related messages for which publish order should
59	// be respected. If empty string is used, message will be sent unordered.
60	OrderingKey string
61
62	// ackh handles Ack() or Nack().
63	ackh AckHandler
64}
65
66// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
67// It should not be called on any other Message value.
68// If message acknowledgement fails, the Message will be redelivered.
69// Client code must call Ack or Nack when finished for each received Message.
70// Calls to Ack or Nack have no effect after the first call.
71func (m *Message) Ack() {
72	if m.ackh != nil {
73		m.ackh.OnAck()
74	}
75}
76
77// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
78// It should not be called on any other Message value.
79// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
80// Client code must call Ack or Nack when finished for each received Message.
81// Calls to Ack or Nack have no effect after the first call.
82func (m *Message) Nack() {
83	if m.ackh != nil {
84		m.ackh.OnNack()
85	}
86}
87
88// NewMessage creates a message with an AckHandler implementation, which should
89// not be nil.
90func NewMessage(ackh AckHandler) *Message {
91	return &Message{ackh: ackh}
92}
93
94// MessageAckHandler provides access to the internal field Message.ackh.
95func MessageAckHandler(m *Message) AckHandler {
96	return m.ackh
97}
98