1// +build !confonly
2
3package reverse
4
5import (
6	"context"
7	"time"
8
9	"google.golang.org/protobuf/proto"
10
11	"github.com/v2fly/v2ray-core/v4/common/mux"
12	"github.com/v2fly/v2ray-core/v4/common/net"
13	"github.com/v2fly/v2ray-core/v4/common/session"
14	"github.com/v2fly/v2ray-core/v4/common/task"
15	"github.com/v2fly/v2ray-core/v4/features/routing"
16	"github.com/v2fly/v2ray-core/v4/transport"
17	"github.com/v2fly/v2ray-core/v4/transport/pipe"
18)
19
20// Bridge is a component in reverse proxy, that relays connections from Portal to local address.
21type Bridge struct {
22	dispatcher  routing.Dispatcher
23	tag         string
24	domain      string
25	workers     []*BridgeWorker
26	monitorTask *task.Periodic
27}
28
29// NewBridge creates a new Bridge instance.
30func NewBridge(config *BridgeConfig, dispatcher routing.Dispatcher) (*Bridge, error) {
31	if config.Tag == "" {
32		return nil, newError("bridge tag is empty")
33	}
34	if config.Domain == "" {
35		return nil, newError("bridge domain is empty")
36	}
37
38	b := &Bridge{
39		dispatcher: dispatcher,
40		tag:        config.Tag,
41		domain:     config.Domain,
42	}
43	b.monitorTask = &task.Periodic{
44		Execute:  b.monitor,
45		Interval: time.Second * 2,
46	}
47	return b, nil
48}
49
50func (b *Bridge) cleanup() {
51	var activeWorkers []*BridgeWorker
52
53	for _, w := range b.workers {
54		if w.IsActive() {
55			activeWorkers = append(activeWorkers, w)
56		}
57	}
58
59	if len(activeWorkers) != len(b.workers) {
60		b.workers = activeWorkers
61	}
62}
63
64func (b *Bridge) monitor() error {
65	b.cleanup()
66
67	var numConnections uint32
68	var numWorker uint32
69
70	for _, w := range b.workers {
71		if w.IsActive() {
72			numConnections += w.Connections()
73			numWorker++
74		}
75	}
76
77	if numWorker == 0 || numConnections/numWorker > 16 {
78		worker, err := NewBridgeWorker(b.domain, b.tag, b.dispatcher)
79		if err != nil {
80			newError("failed to create bridge worker").Base(err).AtWarning().WriteToLog()
81			return nil
82		}
83		b.workers = append(b.workers, worker)
84	}
85
86	return nil
87}
88
89func (b *Bridge) Start() error {
90	return b.monitorTask.Start()
91}
92
93func (b *Bridge) Close() error {
94	return b.monitorTask.Close()
95}
96
97type BridgeWorker struct {
98	tag        string
99	worker     *mux.ServerWorker
100	dispatcher routing.Dispatcher
101	state      Control_State
102}
103
104func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) {
105	ctx := context.Background()
106	ctx = session.ContextWithInbound(ctx, &session.Inbound{
107		Tag: tag,
108	})
109	link, err := d.Dispatch(ctx, net.Destination{
110		Network: net.Network_TCP,
111		Address: net.DomainAddress(domain),
112		Port:    0,
113	})
114	if err != nil {
115		return nil, err
116	}
117
118	w := &BridgeWorker{
119		dispatcher: d,
120		tag:        tag,
121	}
122
123	worker, err := mux.NewServerWorker(context.Background(), w, link)
124	if err != nil {
125		return nil, err
126	}
127	w.worker = worker
128
129	return w, nil
130}
131
132func (w *BridgeWorker) Type() interface{} {
133	return routing.DispatcherType()
134}
135
136func (w *BridgeWorker) Start() error {
137	return nil
138}
139
140func (w *BridgeWorker) Close() error {
141	return nil
142}
143
144func (w *BridgeWorker) IsActive() bool {
145	return w.state == Control_ACTIVE && !w.worker.Closed()
146}
147
148func (w *BridgeWorker) Connections() uint32 {
149	return w.worker.ActiveConnections()
150}
151
152func (w *BridgeWorker) handleInternalConn(link transport.Link) {
153	go func() {
154		reader := link.Reader
155		for {
156			mb, err := reader.ReadMultiBuffer()
157			if err != nil {
158				break
159			}
160			for _, b := range mb {
161				var ctl Control
162				if err := proto.Unmarshal(b.Bytes(), &ctl); err != nil {
163					newError("failed to parse proto message").Base(err).WriteToLog()
164					break
165				}
166				if ctl.State != w.state {
167					w.state = ctl.State
168				}
169			}
170		}
171	}()
172}
173
174func (w *BridgeWorker) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
175	if !isInternalDomain(dest) {
176		ctx = session.ContextWithInbound(ctx, &session.Inbound{
177			Tag: w.tag,
178		})
179		return w.dispatcher.Dispatch(ctx, dest)
180	}
181
182	opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
183	uplinkReader, uplinkWriter := pipe.New(opt...)
184	downlinkReader, downlinkWriter := pipe.New(opt...)
185
186	w.handleInternalConn(transport.Link{
187		Reader: downlinkReader,
188		Writer: uplinkWriter,
189	})
190
191	return &transport.Link{
192		Reader: uplinkReader,
193		Writer: downlinkWriter,
194	}, nil
195}
196