1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.elections; 9 10 import java.io.BufferedReader; 11 import java.io.IOException; 12 import java.io.InputStreamReader; 13 import java.io.PrintWriter; 14 import java.nio.channels.Channels; 15 import java.util.logging.Level; 16 17 import com.sleepycat.je.EnvironmentFailureException; 18 import com.sleepycat.je.JEVersion; 19 import com.sleepycat.je.rep.elections.Proposer.Proposal; 20 import com.sleepycat.je.rep.elections.Protocol.Accept; 21 import com.sleepycat.je.rep.elections.Protocol.Propose; 22 import com.sleepycat.je.rep.elections.Protocol.Value; 23 import com.sleepycat.je.rep.impl.TextProtocol.InvalidMessageException; 24 import com.sleepycat.je.rep.impl.TextProtocol.RequestMessage; 25 import com.sleepycat.je.rep.impl.TextProtocol.ResponseMessage; 26 import com.sleepycat.je.rep.impl.node.RepNode; 27 import com.sleepycat.je.rep.net.DataChannel; 28 import com.sleepycat.je.rep.utilint.ServiceDispatcher; 29 import com.sleepycat.je.utilint.LoggerUtils; 30 31 /** 32 * Plays the role of Acceptor in the consensus algorithm. It runs in its 33 * own thread listening for and responding to messages sent by Proposers. 34 */ 35 public class Acceptor extends ElectionAgentThread { 36 37 /* The RepNode associated with the Acceptor */ 38 private final RepNode repNode; 39 40 /* 41 * The currently promised proposal. Proposals below this one are rejected. 42 */ 43 private Proposal promisedProposal = null; 44 45 private Value acceptedValue = null; 46 47 /* Used to return suggestions in response to Propose requests. */ 48 private final SuggestionGenerator suggestionGenerator; 49 50 /* Identifies the Acceptor Service. */ 51 public static final String SERVICE_NAME = "Acceptor"; 52 53 /** 54 * Creates an Acceptor 55 */ Acceptor(Protocol protocol, RepNode repNode, SuggestionGenerator suggestionGenerator)56 public Acceptor(Protocol protocol, 57 RepNode repNode, 58 SuggestionGenerator suggestionGenerator) { 59 60 super(repNode, protocol, 61 "Acceptor Thread " + repNode.getNameIdPair().getName()); 62 this.repNode = repNode; 63 64 this.suggestionGenerator = suggestionGenerator; 65 } 66 67 /** 68 * The Acceptor thread body. 69 */ 70 @Override run()71 public void run() { 72 final ServiceDispatcher serviceDispatcher = 73 repNode.getServiceDispatcher(); 74 serviceDispatcher.register(SERVICE_NAME, channelQueue); 75 LoggerUtils.logMsg 76 (logger, envImpl, formatter, Level.FINE, "Acceptor started"); 77 DataChannel channel = null; 78 try { 79 while (true) { 80 channel = serviceDispatcher.takeChannel 81 (SERVICE_NAME, true /* block */, 82 protocol.getReadTimeout()); 83 84 if (channel == null) { 85 /* A soft shutdown. */ 86 return; 87 } 88 89 BufferedReader in = null; 90 PrintWriter out = null; 91 try { 92 in = new BufferedReader( 93 new InputStreamReader( 94 Channels.newInputStream(channel))); 95 out = new PrintWriter( 96 Channels.newOutputStream(channel), true); 97 String requestLine = in.readLine(); 98 if (requestLine == null) { 99 LoggerUtils.logMsg(logger, envImpl, 100 formatter, Level.FINE, 101 "Acceptor: EOF on request"); 102 continue; 103 } 104 RequestMessage requestMessage = null; 105 try { 106 requestMessage = protocol.parseRequest(requestLine); 107 } catch (InvalidMessageException e) { 108 LoggerUtils.logMsg(logger, envImpl, 109 formatter, Level.WARNING, 110 "Message format error: " + 111 e.getMessage()); 112 out.println 113 (protocol.new ProtocolError(e).wireFormat()); 114 continue; 115 } 116 ResponseMessage responseMessage = null; 117 if (requestMessage.getOp() == protocol.PROPOSE) { 118 responseMessage = process((Propose) requestMessage); 119 } else if (requestMessage.getOp() == protocol.ACCEPT) { 120 responseMessage = process((Accept) requestMessage); 121 } else if (requestMessage.getOp() == protocol.SHUTDOWN) { 122 break; 123 } else { 124 LoggerUtils.logMsg(logger, envImpl, 125 formatter, Level.SEVERE, 126 "Unrecognized request: " + 127 requestLine); 128 continue; 129 } 130 131 /* 132 * The request message may be of an earlier version. If so, 133 * this node transparently read the older version. JE only 134 * throws out InvalidMesageException when the version of 135 * the request message is newer than the current protocol. 136 * To avoid sending a repsonse that the requester cannot 137 * understand, we send a response in the same version as 138 * that of the original request message. 139 */ 140 responseMessage.setSendVersion 141 (requestMessage.getSendVersion()); 142 out.println(responseMessage.wireFormat()); 143 } catch (IOException e) { 144 LoggerUtils.logMsg 145 (logger, envImpl, formatter, Level.WARNING, 146 "IO error on socket: " + e.getMessage()); 147 continue; 148 } finally { 149 Utils.cleanup(logger, envImpl, formatter, channel, in, out); 150 cleanup(); 151 } 152 } 153 } catch (InterruptedException e) { 154 if (isShutdown()) { 155 /* Treat it like a shutdown, exit the thread. */ 156 return; 157 } 158 LoggerUtils.logMsg(logger, envImpl, formatter, Level.WARNING, 159 "Acceptor unexpected interrupted"); 160 throw EnvironmentFailureException.unexpectedException(e); 161 } finally { 162 serviceDispatcher.cancel(SERVICE_NAME); 163 cleanup(); 164 } 165 } 166 167 /** 168 * Responds to a Propose request. 169 * 170 * @param propose the request proposal 171 * 172 * @return the response: a Promise if the request was accepted, a Reject 173 * otherwise. 174 */ process(Propose propose)175 ResponseMessage process(Propose propose) { 176 177 if ((promisedProposal != null) && 178 (promisedProposal.compareTo(propose.getProposal()) > 0)) { 179 LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE, 180 "Reject Propose: " + propose.getProposal() + 181 " Promised proposal: " + promisedProposal); 182 return protocol.new Reject(promisedProposal); 183 } 184 185 promisedProposal = propose.getProposal(); 186 final Value suggestedValue = suggestionGenerator.get(promisedProposal); 187 final long suggestionRanking = 188 suggestionGenerator.getRanking(promisedProposal); 189 LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE, 190 "Promised: " + promisedProposal + 191 " Suggested Value: " + suggestedValue + 192 " Suggestion Ranking: " + suggestionRanking); 193 return protocol.new Promise 194 (promisedProposal, 195 acceptedValue, 196 suggestedValue, 197 suggestionRanking, 198 repNode.getElectionPriority(), 199 repNode.getLogVersion(), 200 JEVersion.CURRENT_VERSION); 201 } 202 203 /** 204 * Responds to Accept request 205 * 206 * @param accept the request 207 * @return an Accepted or Reject response as appropriate. 208 */ process(Accept accept)209 ResponseMessage process(Accept accept) { 210 if ((promisedProposal != null) && 211 (promisedProposal.compareTo(accept.getProposal()) != 0)) { 212 LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE, 213 "Reject Accept: " + accept.getProposal() + 214 " Promised proposal: " + promisedProposal); 215 return protocol.new Reject(promisedProposal); 216 } 217 acceptedValue = accept.getValue(); 218 LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE, 219 "Promised: " + promisedProposal + " Accepted: " + 220 accept.getProposal() + " Value: " + acceptedValue); 221 return protocol.new Accepted(accept.getProposal(), acceptedValue); 222 } 223 224 public interface SuggestionGenerator { 225 226 /** 227 * Used to generate a suggested value for use by a Proposer. It's a 228 * hint. The proposal argument may be used to freeze values like the 229 * VLSN number from advancing (if they were used in the ranking) until 230 * an election has completed. 231 * 232 * @param proposal the Proposal for which the value is being suggested. 233 * 234 * @return the suggested value. 235 */ get(Proposal proposal)236 abstract Value get(Proposal proposal); 237 238 /** 239 * The importance associated with the above suggestion. Acceptors have 240 * to agree on a common system for ranking importance so that the 241 * relative importance of different suggestions can be meaningfully 242 * compared. 243 * 244 * @param the proposal associated with the ranking 245 * 246 * @return the importance of the suggestion as a number 247 */ getRanking(Proposal proposal)248 abstract long getRanking(Proposal proposal); 249 } 250 } 251