Lines Matching refs:rc

87 	rc := &raftNode{
106 go rc.startRaft()
107 return commitC, errorC, rc.snapshotterReady
110 func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
118 if err := rc.wal.SaveSnapshot(walSnap); err != nil {
121 if err := rc.snapshotter.SaveSnap(snap); err != nil {
124 return rc.wal.ReleaseLockTo(snap.Metadata.Index)
127 func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
132 if firstIdx > rc.appliedIndex+1 {
133 …rst index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, rc.appliedIndex)
135 if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) {
136 nents = ents[rc.appliedIndex-firstIdx+1:]
143 func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
153 case rc.commitC <- &s:
154 case <-rc.stopc:
161 rc.confState = *rc.node.ApplyConfChange(cc)
165 rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
168 if cc.NodeID == uint64(rc.id) {
172 rc.transport.RemovePeer(types.ID(cc.NodeID))
177 rc.appliedIndex = ents[i].Index
180 if ents[i].Index == rc.lastIndex {
182 case rc.commitC <- nil:
183 case <-rc.stopc:
191 func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
192 snapshot, err := rc.snapshotter.Load()
200 func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
201 if !wal.Exist(rc.waldir) {
202 if err := os.Mkdir(rc.waldir, 0750); err != nil {
206 w, err := wal.Create(zap.NewExample(), rc.waldir, nil)
218 w, err := wal.Open(zap.NewExample(), rc.waldir, walsnap)
227 func (rc *raftNode) replayWAL() *wal.WAL {
228 log.Printf("replaying WAL of member %d", rc.id)
229 snapshot := rc.loadSnapshot()
230 w := rc.openWAL(snapshot)
235 rc.raftStorage = raft.NewMemoryStorage()
237 rc.raftStorage.ApplySnapshot(*snapshot)
239 rc.raftStorage.SetHardState(st)
242 rc.raftStorage.Append(ents)
245 rc.lastIndex = ents[len(ents)-1].Index
247 rc.commitC <- nil
252 func (rc *raftNode) writeError(err error) {
253 rc.stopHTTP()
254 close(rc.commitC)
255 rc.errorC <- err
256 close(rc.errorC)
257 rc.node.Stop()
260 func (rc *raftNode) startRaft() {
261 if !fileutil.Exist(rc.snapdir) {
262 if err := os.Mkdir(rc.snapdir, 0750); err != nil {
266 rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
267 rc.snapshotterReady <- rc.snapshotter
269 oldwal := wal.Exist(rc.waldir)
270 rc.wal = rc.replayWAL()
272 rpeers := make([]raft.Peer, len(rc.peers))
277 ID: uint64(rc.id),
280 Storage: rc.raftStorage,
287 rc.node = raft.RestartNode(c)
290 if rc.join {
293 rc.node = raft.StartNode(c, startPeers)
296 rc.transport = &rafthttp.Transport{
298 ID: types.ID(rc.id),
300 Raft: rc,
302 LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
306 rc.transport.Start()
307 for i := range rc.peers {
308 if i+1 != rc.id {
309 rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
313 go rc.serveRaft()
314 go rc.serveChannels()
318 func (rc *raftNode) stop() {
319 rc.stopHTTP()
320 close(rc.commitC)
321 close(rc.errorC)
322 rc.node.Stop()
325 func (rc *raftNode) stopHTTP() {
326 rc.transport.Stop()
327 close(rc.httpstopc)
328 <-rc.httpdonec
331 func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
336 log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
337 defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
339 if snapshotToSave.Metadata.Index <= rc.appliedIndex {
340 …ot index [%d] should > progress.appliedIndex [%d]", snapshotToSave.Metadata.Index, rc.appliedIndex)
342 rc.commitC <- nil // trigger kvstore to load snapshot
344 rc.confState = snapshotToSave.Metadata.ConfState
345 rc.snapshotIndex = snapshotToSave.Metadata.Index
346 rc.appliedIndex = snapshotToSave.Metadata.Index
351 func (rc *raftNode) maybeTriggerSnapshot() {
352 if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
356 …log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.sna…
357 data, err := rc.getSnapshot()
361 snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
365 if err := rc.saveSnap(snap); err != nil {
370 if rc.appliedIndex > snapshotCatchUpEntriesN {
371 compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
373 if err := rc.raftStorage.Compact(compactIndex); err != nil {
378 rc.snapshotIndex = rc.appliedIndex
381 func (rc *raftNode) serveChannels() {
382 snap, err := rc.raftStorage.Snapshot()
386 rc.confState = snap.Metadata.ConfState
387 rc.snapshotIndex = snap.Metadata.Index
388 rc.appliedIndex = snap.Metadata.Index
390 defer rc.wal.Close()
399 for rc.proposeC != nil && rc.confChangeC != nil {
401 case prop, ok := <-rc.proposeC:
403 rc.proposeC = nil
406 rc.node.Propose(context.TODO(), []byte(prop))
409 case cc, ok := <-rc.confChangeC:
411 rc.confChangeC = nil
415 rc.node.ProposeConfChange(context.TODO(), cc)
420 close(rc.stopc)
427 rc.node.Tick()
430 case rd := <-rc.node.Ready():
431 rc.wal.Save(rd.HardState, rd.Entries)
433 rc.saveSnap(rd.Snapshot)
434 rc.raftStorage.ApplySnapshot(rd.Snapshot)
435 rc.publishSnapshot(rd.Snapshot)
437 rc.raftStorage.Append(rd.Entries)
438 rc.transport.Send(rd.Messages)
439 if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
440 rc.stop()
443 rc.maybeTriggerSnapshot()
444 rc.node.Advance()
446 case err := <-rc.transport.ErrorC:
447 rc.writeError(err)
450 case <-rc.stopc:
451 rc.stop()
457 func (rc *raftNode) serveRaft() {
458 url, err := url.Parse(rc.peers[rc.id-1])
463 ln, err := newStoppableListener(url.Host, rc.httpstopc)
468 err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
470 case <-rc.httpstopc:
474 close(rc.httpdonec)
477 func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
478 return rc.node.Step(ctx, m)
480 func (rc *raftNode) IsIDRemoved(id uint64) bool { return false }
481 func (rc *raftNode) ReportUnreachable(id uint64) {}
482 func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}