1/* 2 * Copyright (c) 2013 IBM Corp. 3 * 4 * All rights reserved. This program and the accompanying materials 5 * are made available under the terms of the Eclipse Public License v1.0 6 * which accompanies this distribution, and is available at 7 * http://www.eclipse.org/legal/epl-v10.html 8 * 9 * Contributors: 10 * Seth Hoenig 11 * Allan Stockdill-Mander 12 * Mike Robertson 13 */ 14 15package mqtt 16 17import ( 18 "errors" 19 "sync/atomic" 20 "time" 21 22 "github.com/eclipse/paho.mqtt.golang/packets" 23) 24 25func keepalive(c *client) { 26 defer c.workers.Done() 27 DEBUG.Println(PNG, "keepalive starting") 28 var checkInterval int64 29 var pingSent time.Time 30 31 if c.options.KeepAlive > 10 { 32 checkInterval = 5 33 } else { 34 checkInterval = c.options.KeepAlive / 2 35 } 36 37 intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second))) 38 defer intervalTicker.Stop() 39 40 for { 41 select { 42 case <-c.stop: 43 DEBUG.Println(PNG, "keepalive stopped") 44 return 45 case <-intervalTicker.C: 46 lastSent := c.lastSent.Load().(time.Time) 47 lastReceived := c.lastReceived.Load().(time.Time) 48 49 DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds()) 50 if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) { 51 if atomic.LoadInt32(&c.pingOutstanding) == 0 { 52 DEBUG.Println(PNG, "keepalive sending ping") 53 ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) 54 //We don't want to wait behind large messages being sent, the Write call 55 //will block until it it able to send the packet. 56 atomic.StoreInt32(&c.pingOutstanding, 1) 57 ping.Write(c.conn) 58 c.lastSent.Store(time.Now()) 59 pingSent = time.Now() 60 } 61 } 62 if atomic.LoadInt32(&c.pingOutstanding) > 0 && time.Now().Sub(pingSent) >= c.options.PingTimeout { 63 CRITICAL.Println(PNG, "pingresp not received, disconnecting") 64 c.errors <- errors.New("pingresp not received, disconnecting") 65 return 66 } 67 } 68 } 69} 70