1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"go.etcd.io/etcd/client/pkg/v3/types"
19	"go.etcd.io/etcd/raft/v3/raftpb"
20
21	"go.uber.org/zap"
22)
23
24type remote struct {
25	lg       *zap.Logger
26	localID  types.ID
27	id       types.ID
28	status   *peerStatus
29	pipeline *pipeline
30}
31
32func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
33	picker := newURLPicker(urls)
34	status := newPeerStatus(tr.Logger, tr.ID, id)
35	pipeline := &pipeline{
36		peerID: id,
37		tr:     tr,
38		picker: picker,
39		status: status,
40		raft:   tr.Raft,
41		errorc: tr.ErrorC,
42	}
43	pipeline.start()
44
45	return &remote{
46		lg:       tr.Logger,
47		localID:  tr.ID,
48		id:       id,
49		status:   status,
50		pipeline: pipeline,
51	}
52}
53
54func (g *remote) send(m raftpb.Message) {
55	select {
56	case g.pipeline.msgc <- m:
57	default:
58		if g.status.isActive() {
59			if g.lg != nil {
60				g.lg.Warn(
61					"dropped internal Raft message since sending buffer is full (overloaded network)",
62					zap.String("message-type", m.Type.String()),
63					zap.String("local-member-id", g.localID.String()),
64					zap.String("from", types.ID(m.From).String()),
65					zap.String("remote-peer-id", g.id.String()),
66					zap.Bool("remote-peer-active", g.status.isActive()),
67				)
68			}
69		} else {
70			if g.lg != nil {
71				g.lg.Warn(
72					"dropped Raft message since sending buffer is full (overloaded network)",
73					zap.String("message-type", m.Type.String()),
74					zap.String("local-member-id", g.localID.String()),
75					zap.String("from", types.ID(m.From).String()),
76					zap.String("remote-peer-id", g.id.String()),
77					zap.Bool("remote-peer-active", g.status.isActive()),
78				)
79			}
80		}
81		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
82	}
83}
84
85func (g *remote) stop() {
86	g.pipeline.stop()
87}
88
89func (g *remote) Pause() {
90	g.stop()
91}
92
93func (g *remote) Resume() {
94	g.pipeline.start()
95}
96