1### Tunnel service 2 3Table Store tunnel service golang sdk. 4 5### Install 6 7* download tunnel client source code 8 9```bash 10go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel 11``` 12 13 14* use dep to install dependencies under tunnel directory 15 * install [dep](https://github.com/golang/dep#installation) 16 * dep ensure -v 17* or use `go get` to install dependencies 18 19```bash 20go get -u go.uber.org/zap 21go get github.com/cenkalti/backoff 22go get github.com/golang/protobuf/proto 23go get github.com/satori/go.uuid 24go get github.com/stretchr/testify/assert 25go get github.com/smartystreets/goconvey/convey 26go get github.com/golang/mock/gomock 27go get gopkg.in/natefinch/lumberjack.v2 28``` 29 30 31### Quick Start 32 33* tunnel type 34 35 * TunnelTypeStream:stream data(增量数据流) 36 * TunnelTypeBaseData: full data(全量数据流) 37 * TunnelTypeBaseStream: full and stream data(先全量后增量数据流) 38 39* init tunnel client 40 41```go 42 tunnelClient := tunnel.NewTunnelClient(endpoint, instance, 43 accessKeyId, accessKeySecret) 44``` 45 46* create new tunnel 47 48```go 49 req := &tunnel.CreateTunnelRequest{ 50 TableName: "testTable", 51 TunnelName: "testTunnel", 52 Type: tunnel.TunnelTypeBaseStream, //base and stream data tunnel 53 } 54 resp, err := tunnelClient.CreateTunnel(req) 55 if err != nil { 56 log.Fatal("create test tunnel failed", err) 57 } 58 log.Println("tunnel id is", resp.TunnelId) 59``` 60 61* get existing tunnel detail information 62 63```go 64 req := &tunnel.DescribeTunnelRequest{ 65 TableName: "testTable", 66 TunnelName: "testTunnel", 67 } 68 resp, err := tunnelClient.DescribeTunnel(req) 69 if err != nil { 70 log.Fatal("create test tunnel failed", err) 71 } 72 log.Println("tunnel id is", resp.Tunnel.TunnelId) 73``` 74 75* consume tunnel data with callback function 76 77```go 78//user-defined callback function 79func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { 80 fmt.Println("user-defined information", ctx.CustomValue) 81 for _, rec := range records { 82 fmt.Println("tunnel record detail:", rec.String()) 83 } 84 fmt.Println("a round of records consumption finished") 85 return nil 86} 87 88//set callback to SimpleProcessFactory 89workConfig := &tunnel.TunnelWorkerConfig{ 90 ProcessorFactory: &tunnel.SimpleProcessFactory{ 91 CustomValue: "user custom interface{} value", 92 ProcessFunc: exampleConsumeFunction, 93 }, 94} 95 96//use TunnelDaemon to consume tunnel with specified tunnelId 97daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) 98log.Fatal(daemon.Run()) 99``` 100 101* delete tunnel 102```go 103req := &tunnel.DeleteTunnelRequest { 104 TableName: "testTable", 105 TunnelName: "testTunnel", 106} 107_, err := tunnelClient.DeleteTunnel(req) 108if err != nil { 109 log.Fatal("delete test tunnel failed", err) 110} 111``` 112 113See the sample directory for more details. 114 115### tunnel document 116 117* [Godoc](todo) 118* [API document](todo) 119 120### configuration 121 122* Default TunnelConfig definition 123 124```go 125var DefaultTunnelConfig = &TunnelConfig{ 126 //Max backoff retry duration. 127 MaxRetryElapsedTime: 45 * time.Second, 128 //HTTP request timeout. 129 RequestTimeout: 30 * time.Second, 130 //http.DefaultTransport. 131 Transport: http.DefaultTransport, 132} 133``` 134 135* TunnelWorkerConfig definition 136 137```go 138type TunnelWorkerConfig struct { 139 //The heartbeat timeout time of the worker. If nil, the default value is used. 140 HeartbeatTimeout time.Duration 141 //The heartbeat interval time of the worker. If nil, the default value is used. 142 HeartbeatInterval time.Duration 143 //The channel connection dial interface. If nil, the default dialer is used. 144 //Usually the default dialer is fine. 145 ChannelDialer ChannelDialer 146 147 //The channel processor creation interface. 148 //It's recomended to use the pre-defined SimpleChannelProcessorFactory. 149 ProcessorFactory ChannelProcessorFactory 150 151 //zap log config. If nil, the DefaultLogConfig is used. 152 LogConfig *zap.Config 153 //zap log rotate config. If nil, the DefaultSyncer is used. 154 LogWriteSyncer zapcore.WriteSyncer 155} 156``` 157