1// +build !confonly 2 3package reverse 4 5import ( 6 "context" 7 "time" 8 9 "google.golang.org/protobuf/proto" 10 11 "github.com/v2fly/v2ray-core/v4/common/mux" 12 "github.com/v2fly/v2ray-core/v4/common/net" 13 "github.com/v2fly/v2ray-core/v4/common/session" 14 "github.com/v2fly/v2ray-core/v4/common/task" 15 "github.com/v2fly/v2ray-core/v4/features/routing" 16 "github.com/v2fly/v2ray-core/v4/transport" 17 "github.com/v2fly/v2ray-core/v4/transport/pipe" 18) 19 20// Bridge is a component in reverse proxy, that relays connections from Portal to local address. 21type Bridge struct { 22 dispatcher routing.Dispatcher 23 tag string 24 domain string 25 workers []*BridgeWorker 26 monitorTask *task.Periodic 27} 28 29// NewBridge creates a new Bridge instance. 30func NewBridge(config *BridgeConfig, dispatcher routing.Dispatcher) (*Bridge, error) { 31 if config.Tag == "" { 32 return nil, newError("bridge tag is empty") 33 } 34 if config.Domain == "" { 35 return nil, newError("bridge domain is empty") 36 } 37 38 b := &Bridge{ 39 dispatcher: dispatcher, 40 tag: config.Tag, 41 domain: config.Domain, 42 } 43 b.monitorTask = &task.Periodic{ 44 Execute: b.monitor, 45 Interval: time.Second * 2, 46 } 47 return b, nil 48} 49 50func (b *Bridge) cleanup() { 51 var activeWorkers []*BridgeWorker 52 53 for _, w := range b.workers { 54 if w.IsActive() { 55 activeWorkers = append(activeWorkers, w) 56 } 57 } 58 59 if len(activeWorkers) != len(b.workers) { 60 b.workers = activeWorkers 61 } 62} 63 64func (b *Bridge) monitor() error { 65 b.cleanup() 66 67 var numConnections uint32 68 var numWorker uint32 69 70 for _, w := range b.workers { 71 if w.IsActive() { 72 numConnections += w.Connections() 73 numWorker++ 74 } 75 } 76 77 if numWorker == 0 || numConnections/numWorker > 16 { 78 worker, err := NewBridgeWorker(b.domain, b.tag, b.dispatcher) 79 if err != nil { 80 newError("failed to create bridge worker").Base(err).AtWarning().WriteToLog() 81 return nil 82 } 83 b.workers = append(b.workers, worker) 84 } 85 86 return nil 87} 88 89func (b *Bridge) Start() error { 90 return b.monitorTask.Start() 91} 92 93func (b *Bridge) Close() error { 94 return b.monitorTask.Close() 95} 96 97type BridgeWorker struct { 98 tag string 99 worker *mux.ServerWorker 100 dispatcher routing.Dispatcher 101 state Control_State 102} 103 104func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) { 105 ctx := context.Background() 106 ctx = session.ContextWithInbound(ctx, &session.Inbound{ 107 Tag: tag, 108 }) 109 link, err := d.Dispatch(ctx, net.Destination{ 110 Network: net.Network_TCP, 111 Address: net.DomainAddress(domain), 112 Port: 0, 113 }) 114 if err != nil { 115 return nil, err 116 } 117 118 w := &BridgeWorker{ 119 dispatcher: d, 120 tag: tag, 121 } 122 123 worker, err := mux.NewServerWorker(context.Background(), w, link) 124 if err != nil { 125 return nil, err 126 } 127 w.worker = worker 128 129 return w, nil 130} 131 132func (w *BridgeWorker) Type() interface{} { 133 return routing.DispatcherType() 134} 135 136func (w *BridgeWorker) Start() error { 137 return nil 138} 139 140func (w *BridgeWorker) Close() error { 141 return nil 142} 143 144func (w *BridgeWorker) IsActive() bool { 145 return w.state == Control_ACTIVE && !w.worker.Closed() 146} 147 148func (w *BridgeWorker) Connections() uint32 { 149 return w.worker.ActiveConnections() 150} 151 152func (w *BridgeWorker) handleInternalConn(link transport.Link) { 153 go func() { 154 reader := link.Reader 155 for { 156 mb, err := reader.ReadMultiBuffer() 157 if err != nil { 158 break 159 } 160 for _, b := range mb { 161 var ctl Control 162 if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil { 163 newError("failed to parse proto message").Base(err).WriteToLog() 164 break 165 } 166 if ctl.State != w.state { 167 w.state = ctl.State 168 } 169 } 170 } 171 }() 172} 173 174func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) { 175 if !isInternalDomain(dest) { 176 ctx = session.ContextWithInbound(ctx, &session.Inbound{ 177 Tag: w.tag, 178 }) 179 return w.dispatcher.Dispatch(ctx, dest) 180 } 181 182 opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} 183 uplinkReader, uplinkWriter := pipe.New(opt...) 184 downlinkReader, downlinkWriter := pipe.New(opt...) 185 186 w.handleInternalConn(transport.Link{ 187 Reader: downlinkReader, 188 Writer: uplinkWriter, 189 }) 190 191 return &transport.Link{ 192 Reader: uplinkReader, 193 Writer: downlinkWriter, 194 }, nil 195} 196