1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package pubsub 15 16import ( 17 "time" 18) 19 20// AckHandler implements ack/nack handling. 21type AckHandler interface { 22 // OnAck processes a message ack. 23 OnAck() 24 25 // OnNack processes a message nack. 26 OnNack() 27} 28 29// Message represents a Pub/Sub message. 30type Message struct { 31 // ID identifies this message. This ID is assigned by the server and is 32 // populated for Messages obtained from a subscription. 33 // 34 // This field is read-only. 35 ID string 36 37 // Data is the actual data in the message. 38 Data []byte 39 40 // Attributes represents the key-value pairs the current message is 41 // labelled with. 42 Attributes map[string]string 43 44 // PublishTime is the time at which the message was published. This is 45 // populated by the server for Messages obtained from a subscription. 46 // 47 // This field is read-only. 48 PublishTime time.Time 49 50 // DeliveryAttempt is the number of times a message has been delivered. 51 // This is part of the dead lettering feature that forwards messages that 52 // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. 53 // If dead lettering is enabled, this will be set on all attempts, starting 54 // with value 1. Otherwise, the value will be nil. 55 // This field is read-only. 56 DeliveryAttempt *int 57 58 // OrderingKey identifies related messages for which publish order should 59 // be respected. If empty string is used, message will be sent unordered. 60 OrderingKey string 61 62 // ackh handles Ack() or Nack(). 63 ackh AckHandler 64} 65 66// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. 67// It should not be called on any other Message value. 68// If message acknowledgement fails, the Message will be redelivered. 69// Client code must call Ack or Nack when finished for each received Message. 70// Calls to Ack or Nack have no effect after the first call. 71func (m *Message) Ack() { 72 if m.ackh != nil { 73 m.ackh.OnAck() 74 } 75} 76 77// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. 78// It should not be called on any other Message value. 79// Nack will result in the Message being redelivered more quickly than if it were allowed to expire. 80// Client code must call Ack or Nack when finished for each received Message. 81// Calls to Ack or Nack have no effect after the first call. 82func (m *Message) Nack() { 83 if m.ackh != nil { 84 m.ackh.OnNack() 85 } 86} 87 88// NewMessage creates a message with an AckHandler implementation, which should 89// not be nil. 90func NewMessage(ackh AckHandler) *Message { 91 return &Message{ackh: ackh} 92} 93 94// MessageAckHandler provides access to the internal field Message.ackh. 95func MessageAckHandler(m *Message) AckHandler { 96 return m.ackh 97} 98