Lines Matching refs:rc

85 	rc := &raftNode{
104 go rc.startRaft()
105 return commitC, errorC, rc.snapshotterReady
108 func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
116 if err := rc.wal.SaveSnapshot(walSnap); err != nil {
119 if err := rc.snapshotter.SaveSnap(snap); err != nil {
122 return rc.wal.ReleaseLockTo(snap.Metadata.Index)
125 func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
130 if firstIdx > rc.appliedIndex+1 {
131 …rst index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rc.appliedIndex)
133 if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) {
134 nents = ents[rc.appliedIndex-firstIdx+1:]
141 func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
151 case rc.commitC <- &s:
152 case <-rc.stopc:
159 rc.confState = *rc.node.ApplyConfChange(cc)
163 rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
166 if cc.NodeID == uint64(rc.id) {
170 rc.transport.RemovePeer(types.ID(cc.NodeID))
175 rc.appliedIndex = ents[i].Index
178 if ents[i].Index == rc.lastIndex {
180 case rc.commitC <- nil:
181 case <-rc.stopc:
189 func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
190 snapshot, err := rc.snapshotter.Load()
198 func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
199 if !wal.Exist(rc.waldir) {
200 if err := os.Mkdir(rc.waldir, 0750); err != nil {
204 w, err := wal.Create(rc.waldir, nil)
216 w, err := wal.Open(rc.waldir, walsnap)
225 func (rc *raftNode) replayWAL() *wal.WAL {
226 log.Printf("replaying WAL of member %d", rc.id)
227 snapshot := rc.loadSnapshot()
228 w := rc.openWAL(snapshot)
233 rc.raftStorage = raft.NewMemoryStorage()
235 rc.raftStorage.ApplySnapshot(*snapshot)
237 rc.raftStorage.SetHardState(st)
240 rc.raftStorage.Append(ents)
243 rc.lastIndex = ents[len(ents)-1].Index
245 rc.commitC <- nil
250 func (rc *raftNode) writeError(err error) {
251 rc.stopHTTP()
252 close(rc.commitC)
253 rc.errorC <- err
254 close(rc.errorC)
255 rc.node.Stop()
258 func (rc *raftNode) startRaft() {
259 if !fileutil.Exist(rc.snapdir) {
260 if err := os.Mkdir(rc.snapdir, 0750); err != nil {
264 rc.snapshotter = snap.New(rc.snapdir)
265 rc.snapshotterReady <- rc.snapshotter
267 oldwal := wal.Exist(rc.waldir)
268 rc.wal = rc.replayWAL()
270 rpeers := make([]raft.Peer, len(rc.peers))
275 ID: uint64(rc.id),
278 Storage: rc.raftStorage,
284 rc.node = raft.RestartNode(c)
287 if rc.join {
290 rc.node = raft.StartNode(c, startPeers)
293 rc.transport = &rafthttp.Transport{
294 ID: types.ID(rc.id),
296 Raft: rc,
298 LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
302 rc.transport.Start()
303 for i := range rc.peers {
304 if i+1 != rc.id {
305 rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
309 go rc.serveRaft()
310 go rc.serveChannels()
314 func (rc *raftNode) stop() {
315 rc.stopHTTP()
316 close(rc.commitC)
317 close(rc.errorC)
318 rc.node.Stop()
321 func (rc *raftNode) stopHTTP() {
322 rc.transport.Stop()
323 close(rc.httpstopc)
324 <-rc.httpdonec
327 func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
332 log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
333 defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
335 if snapshotToSave.Metadata.Index <= rc.appliedIndex {
336 …ndex [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex)
338 rc.commitC <- nil // trigger kvstore to load snapshot
340 rc.confState = snapshotToSave.Metadata.ConfState
341 rc.snapshotIndex = snapshotToSave.Metadata.Index
342 rc.appliedIndex = snapshotToSave.Metadata.Index
347 func (rc *raftNode) maybeTriggerSnapshot() {
348 if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
352 …log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.sna…
353 data, err := rc.getSnapshot()
357 snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
361 if err := rc.saveSnap(snap); err != nil {
366 if rc.appliedIndex > snapshotCatchUpEntriesN {
367 compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
369 if err := rc.raftStorage.Compact(compactIndex); err != nil {
374 rc.snapshotIndex = rc.appliedIndex
377 func (rc *raftNode) serveChannels() {
378 snap, err := rc.raftStorage.Snapshot()
382 rc.confState = snap.Metadata.ConfState
383 rc.snapshotIndex = snap.Metadata.Index
384 rc.appliedIndex = snap.Metadata.Index
386 defer rc.wal.Close()
395 for rc.proposeC != nil && rc.confChangeC != nil {
397 case prop, ok := <-rc.proposeC:
399 rc.proposeC = nil
402 rc.node.Propose(context.TODO(), []byte(prop))
405 case cc, ok := <-rc.confChangeC:
407 rc.confChangeC = nil
411 rc.node.ProposeConfChange(context.TODO(), cc)
416 close(rc.stopc)
423 rc.node.Tick()
426 case rd := <-rc.node.Ready():
427 rc.wal.Save(rd.HardState, rd.Entries)
429 rc.saveSnap(rd.Snapshot)
430 rc.raftStorage.ApplySnapshot(rd.Snapshot)
431 rc.publishSnapshot(rd.Snapshot)
433 rc.raftStorage.Append(rd.Entries)
434 rc.transport.Send(rd.Messages)
435 if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
436 rc.stop()
439 rc.maybeTriggerSnapshot()
440 rc.node.Advance()
442 case err := <-rc.transport.ErrorC:
443 rc.writeError(err)
446 case <-rc.stopc:
447 rc.stop()
453 func (rc *raftNode) serveRaft() {
454 url, err := url.Parse(rc.peers[rc.id-1])
459 ln, err := newStoppableListener(url.Host, rc.httpstopc)
464 err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
466 case <-rc.httpstopc:
470 close(rc.httpdonec)
473 func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
474 return rc.node.Step(ctx, m)
476 func (rc *raftNode) IsIDRemoved(id uint64) bool { return false }
477 func (rc *raftNode) ReportUnreachable(id uint64) {}
478 func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}