1 /*- 2 * Copyright (c) 2010, 2020 Oracle and/or its affiliates. All rights reserved. 3 * 4 * See the file LICENSE for license information. 5 * 6 */ 7 8 package rep; 9 10 import static org.junit.Assert.assertEquals; 11 import static org.junit.Assert.assertTrue; 12 13 import com.sleepycat.db.*; 14 15 import repmgrtests.Util; 16 import org.junit.Test; 17 import java.util.*; 18 19 public class TestMirandaTimeout { 20 class /* struct */ Message { 21 DatabaseEntry ctl, rec; 22 int sourceEID; 23 boolean perm; Message(DatabaseEntry ctl, DatabaseEntry rec, int eid, boolean p)24 Message(DatabaseEntry ctl, DatabaseEntry rec, int eid, boolean p) { 25 this.ctl = ctl; 26 this.rec = rec; 27 sourceEID = eid; 28 perm = p; 29 } 30 } 31 32 List<Queue<Message>> queues = new ArrayList<Queue<Message>>(); 33 Environment[] envs = new Environment[2]; 34 35 public static final int SELF = Integer.MAX_VALUE; 36 deferredArrival()37 @Test public void deferredArrival() throws Exception { 38 EnvironmentConfig ec = new EnvironmentConfig(); 39 ec.setAllowCreate(true); 40 ec.setInitializeCache(true); 41 ec.setInitializeLocking(true); 42 ec.setInitializeLogging(true); 43 ec.setInitializeReplication(true); 44 ec.setTransactional(true); 45 ec.setThreaded(true); 46 ec.setReplicationNumSites(2); 47 if (Boolean.getBoolean("VERB_REPLICATION")) 48 ec.setVerbose(VerboseConfig.REPLICATION, true); 49 50 // The EID serves double duty as an index into the array of 51 // ArrayLists which are used as the message queues. Master is 52 // EID 0 and client is EID 1. 53 // 54 queues.add(new LinkedList<Message>()); 55 queues.add(new LinkedList<Message>()); 56 ec.setReplicationTransport(SELF, new Transport(0)); 57 Environment master = new Environment(Util.mkdir("master"), ec); 58 envs[0] = master; 59 ec.setReplicationTransport(SELF, new Transport(1)); 60 Environment client = new Environment(Util.mkdir("client"), ec); 61 envs[1] = client; 62 63 master.startReplication(null, true); 64 client.startReplication(null, false); 65 66 processMessages(); 67 68 DatabaseConfig dc = new DatabaseConfig(); 69 dc.setTransactional(true); 70 dc.setAllowCreate(true); 71 dc.setType(DatabaseType.BTREE); 72 Database db = master.openDatabase(null, "test.db", null, dc); 73 processMessages(); 74 75 Transaction txn = master.beginTransaction(null, null); 76 DatabaseEntry key = new DatabaseEntry(); 77 DatabaseEntry value = new DatabaseEntry(); 78 key.setData("mykey".getBytes()); 79 value.setData("myvalue".getBytes()); 80 db.put(txn, key, value); 81 txn.commit(); 82 byte[] token1 = txn.getCommitToken(); 83 84 txn = master.beginTransaction(null, null); 85 key = new DatabaseEntry(); 86 value = new DatabaseEntry(); 87 key.setData("one,two".getBytes()); 88 value.setData("buckle my shoe".getBytes()); 89 db.put(txn, key, value); 90 txn.commit(); 91 byte[] token2 = txn.getCommitToken(); 92 93 // Since we haven't sent pending msgs to the client yet, the 94 // transaction shouldn't be there yet. 95 // 96 assertEquals(TransactionStatus.TIMEOUT, 97 client.isTransactionApplied(token1, 0)); 98 99 100 // Start 2 client threads to wait for the transactions in 101 // reverse order, to make sure the waiting completes in the 102 // correct order. 103 // 104 Client clientTh2 = new Client(client, token2); 105 Thread t2 = new Thread(clientTh2, "clientThread2"); 106 t2.start(); 107 Thread.sleep(5000); 108 109 Client clientTh1 = new Client(client, token1); 110 Thread t1 = new Thread(clientTh1, "clientThread1"); 111 t1.start(); 112 Thread.sleep(5000); 113 114 processOnePerm(); 115 Thread.sleep(5000); 116 processMessages(); 117 118 t1.join(); 119 t2.join(); 120 121 assertEquals(TransactionStatus.APPLIED, clientTh1.getResult()); 122 assertTrue(clientTh1.getDuration() > 1000 && 123 clientTh1.getDuration() < 100000); 124 assertEquals(TransactionStatus.APPLIED, clientTh2.getResult()); 125 126 // We started thread2 (way) before starting thread1, so its 127 // start time should be less (assuming the system isn't 128 // ridiculously busy). But thread1 still should have 129 // completed (way) before thread2, because its transaction was 130 // replicated first (before a pause). 131 // 132 assertTrue(clientTh2.getStartTime() < clientTh1.getStartTime()); 133 assertTrue(clientTh1.getEndTime() < clientTh2.getEndTime()); 134 135 db.close(); 136 client.close(); 137 master.close(); 138 } 139 140 class Client implements Runnable { 141 private Environment env; 142 private byte[] token; 143 private TransactionStatus result; 144 private long duration, endTime, startTime; 145 public void run() { 146 try { 147 startTime = System.currentTimeMillis(); 148 result = env.isTransactionApplied(token, 100000000); 149 endTime = System.currentTimeMillis(); 150 duration = endTime - startTime; 151 } catch (Exception e) { 152 // if an exception happens, we leave "result" 153 // unset, which will eventually cause a test failure 154 // in the parent thread. 155 e.printStackTrace(); 156 } 157 } 158 159 Client(Environment env, byte[] token) { 160 this.env = env; 161 this.token = token; 162 } 163 164 long getDuration() { return duration; } 165 long getStartTime() { return startTime; } 166 long getEndTime() { return endTime; } 167 TransactionStatus getResult() { return result; } 168 } 169 170 171 class Transport implements ReplicationTransport { 172 private int myEID; 173 Transport(int eid) { myEID = eid; } 174 public int send(Environment env, DatabaseEntry ctl, DatabaseEntry rec, 175 LogSequenceNumber lsn, int eid, boolean noBuf, 176 boolean perm, boolean anywhere, boolean isRetry) { 177 int target = 1 - myEID; 178 queues.get(target).add(new Message(ctl, rec, myEID, perm)); 179 return (0); 180 } 181 } 182 183 void processOnePerm() throws Exception { 184 Queue<Message> q = queues.get(1); // the client is site 1 185 while (!q.isEmpty()) { 186 Message m = q.remove(); 187 envs[1].processReplicationMessage(m.ctl, m.rec, m.sourceEID); 188 if (m.perm) 189 return; 190 } 191 } 192 193 void processMessages() throws Exception { 194 boolean done = false; 195 while (!done) { 196 boolean got = false; 197 for (int eid = 0; eid < 2; eid++) { 198 Queue<Message> q = queues.get(eid); 199 while (!q.isEmpty()) { 200 got = true; 201 Message m = q.remove(); 202 envs[eid].processReplicationMessage(m.ctl, m.rec, m.sourceEID); 203 } 204 } 205 if (!got) 206 done = true; 207 } 208 } 209 } 210