1 // 2 // PackageHandler.java 3 // Adjust 4 // 5 // Created by Christian Wellenbrock on 2013-06-25. 6 // Copyright (c) 2013 adjust GmbH. All rights reserved. 7 // See the file MIT-LICENSE for copying permission. 8 // 9 10 package com.adjust.sdk; 11 12 import android.content.Context; 13 import android.os.Handler; 14 import android.os.HandlerThread; 15 import android.os.Looper; 16 import android.os.Message; 17 18 import org.json.JSONObject; 19 20 import java.io.BufferedInputStream; 21 import java.io.BufferedOutputStream; 22 import java.io.FileInputStream; 23 import java.io.FileNotFoundException; 24 import java.io.FileOutputStream; 25 import java.io.IOException; 26 import java.io.NotSerializableException; 27 import java.io.ObjectInputStream; 28 import java.io.ObjectOutputStream; 29 import java.io.OptionalDataException; 30 import java.lang.ref.WeakReference; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 35 // persistent 36 public class PackageHandler extends HandlerThread implements IPackageHandler { 37 private static final String PACKAGE_QUEUE_FILENAME = "AdjustIoPackageQueue"; 38 39 private final InternalHandler internalHandler; 40 private IRequestHandler requestHandler; 41 private IActivityHandler activityHandler; 42 private List<ActivityPackage> packageQueue; 43 private AtomicBoolean isSending; 44 private boolean paused; 45 private Context context; 46 private ILogger logger; 47 PackageHandler(IActivityHandler activityHandler, Context context, boolean startPaused)48 public PackageHandler(IActivityHandler activityHandler, 49 Context context, 50 boolean startPaused) { 51 super(Constants.LOGTAG, MIN_PRIORITY); 52 setDaemon(true); 53 start(); 54 this.internalHandler = new InternalHandler(getLooper(), this); 55 this.logger = AdjustFactory.getLogger(); 56 57 init(activityHandler, context, startPaused); 58 59 Message message = Message.obtain(); 60 message.arg1 = InternalHandler.INIT; 61 internalHandler.sendMessage(message); 62 } 63 64 @Override init(IActivityHandler activityHandler, Context context, boolean startPaused)65 public void init(IActivityHandler activityHandler, Context context, boolean startPaused) { 66 this.activityHandler = activityHandler; 67 this.context = context; 68 this.paused = startPaused; 69 } 70 71 // add a package to the queue 72 @Override addPackage(ActivityPackage pack)73 public void addPackage(ActivityPackage pack) { 74 Message message = Message.obtain(); 75 message.arg1 = InternalHandler.ADD; 76 message.obj = pack; 77 internalHandler.sendMessage(message); 78 } 79 80 // try to send the oldest package 81 @Override sendFirstPackage()82 public void sendFirstPackage() { 83 Message message = Message.obtain(); 84 message.arg1 = InternalHandler.SEND_FIRST; 85 internalHandler.sendMessage(message); 86 } 87 88 // remove oldest package and try to send the next one 89 // (after success or possibly permanent failure) 90 @Override sendNextPackage()91 public void sendNextPackage() { 92 Message message = Message.obtain(); 93 message.arg1 = InternalHandler.SEND_NEXT; 94 internalHandler.sendMessage(message); 95 } 96 97 // close the package to retry in the future (after temporary failure) 98 @Override closeFirstPackage()99 public void closeFirstPackage() { 100 isSending.set(false); 101 } 102 103 // interrupt the sending loop after the current request has finished 104 @Override pauseSending()105 public void pauseSending() { 106 paused = true; 107 } 108 109 // allow sending requests again 110 @Override resumeSending()111 public void resumeSending() { 112 paused = false; 113 } 114 115 // short info about how failing packages are handled 116 @Override getFailureMessage()117 public String getFailureMessage() { 118 return "Will retry later."; 119 } 120 121 @Override finishedTrackingActivity(JSONObject jsonResponse)122 public void finishedTrackingActivity(JSONObject jsonResponse) { 123 activityHandler.finishedTrackingActivity(jsonResponse); 124 } 125 126 @Override sendClickPackage(ActivityPackage clickPackage)127 public void sendClickPackage(ActivityPackage clickPackage) { 128 logger.debug("Sending click package (%s)", clickPackage); 129 logger.verbose("%s", clickPackage.getExtendedString()); 130 requestHandler.sendClickPackage(clickPackage); 131 } 132 133 private static final class InternalHandler extends Handler { 134 private static final int INIT = 1; 135 private static final int ADD = 2; 136 private static final int SEND_NEXT = 3; 137 private static final int SEND_FIRST = 4; 138 139 private final WeakReference<PackageHandler> packageHandlerReference; 140 InternalHandler(Looper looper, PackageHandler packageHandler)141 protected InternalHandler(Looper looper, PackageHandler packageHandler) { 142 super(looper); 143 this.packageHandlerReference = new WeakReference<PackageHandler>(packageHandler); 144 } 145 146 @Override handleMessage(Message message)147 public void handleMessage(Message message) { 148 super.handleMessage(message); 149 150 PackageHandler packageHandler = packageHandlerReference.get(); 151 if (null == packageHandler) { 152 return; 153 } 154 155 switch (message.arg1) { 156 case INIT: 157 packageHandler.initInternal(); 158 break; 159 case ADD: 160 ActivityPackage activityPackage = (ActivityPackage) message.obj; 161 packageHandler.addInternal(activityPackage); 162 break; 163 case SEND_FIRST: 164 packageHandler.sendFirstInternal(); 165 break; 166 case SEND_NEXT: 167 packageHandler.sendNextInternal(); 168 break; 169 } 170 } 171 } 172 173 // internal methods run in dedicated queue thread 174 initInternal()175 private void initInternal() { 176 requestHandler = AdjustFactory.getRequestHandler(this); 177 178 isSending = new AtomicBoolean(); 179 180 readPackageQueue(); 181 } 182 addInternal(ActivityPackage newPackage)183 private void addInternal(ActivityPackage newPackage) { 184 packageQueue.add(newPackage); 185 logger.debug("Added package %d (%s)", packageQueue.size(), newPackage); 186 logger.verbose("%s", newPackage.getExtendedString()); 187 188 writePackageQueue(); 189 } 190 sendFirstInternal()191 private void sendFirstInternal() { 192 if (packageQueue.isEmpty()) { 193 return; 194 } 195 196 if (paused) { 197 logger.debug("Package handler is paused"); 198 return; 199 } 200 if (isSending.getAndSet(true)) { 201 logger.verbose("Package handler is already sending"); 202 return; 203 } 204 205 ActivityPackage firstPackage = packageQueue.get(0); 206 requestHandler.sendPackage(firstPackage); 207 } 208 sendNextInternal()209 private void sendNextInternal() { 210 packageQueue.remove(0); 211 writePackageQueue(); 212 isSending.set(false); 213 sendFirstInternal(); 214 } 215 readPackageQueue()216 private void readPackageQueue() { 217 try { 218 FileInputStream inputStream = context.openFileInput(PACKAGE_QUEUE_FILENAME); 219 BufferedInputStream bufferedStream = new BufferedInputStream(inputStream); 220 ObjectInputStream objectStream = new ObjectInputStream(bufferedStream); 221 222 try { 223 Object object = objectStream.readObject(); 224 @SuppressWarnings("unchecked") 225 List<ActivityPackage> packageQueue = (List<ActivityPackage>) object; 226 logger.debug("Package handler read %d packages", packageQueue.size()); 227 this.packageQueue = packageQueue; 228 return; 229 } catch (ClassNotFoundException e) { 230 logger.error("Failed to find package queue class"); 231 } catch (OptionalDataException e) { 232 /* no-op */ 233 } catch (IOException e) { 234 logger.error("Failed to read package queue object"); 235 } catch (ClassCastException e) { 236 logger.error("Failed to cast package queue object"); 237 } finally { 238 objectStream.close(); 239 } 240 } catch (FileNotFoundException e) { 241 logger.verbose("Package queue file not found"); 242 } catch (Exception e) { 243 logger.error("Failed to read package queue file"); 244 } 245 246 // start with a fresh package queue in case of any exception 247 packageQueue = new ArrayList<ActivityPackage>(); 248 } 249 deletePackageQueue(Context context)250 public static Boolean deletePackageQueue(Context context) { 251 return context.deleteFile(PACKAGE_QUEUE_FILENAME); 252 } 253 254 writePackageQueue()255 private void writePackageQueue() { 256 try { 257 FileOutputStream outputStream = context.openFileOutput(PACKAGE_QUEUE_FILENAME, Context.MODE_PRIVATE); 258 BufferedOutputStream bufferedStream = new BufferedOutputStream(outputStream); 259 ObjectOutputStream objectStream = new ObjectOutputStream(bufferedStream); 260 261 try { 262 objectStream.writeObject(packageQueue); 263 logger.debug("Package handler wrote %d packages", packageQueue.size()); 264 } catch (NotSerializableException e) { 265 logger.error("Failed to serialize packages"); 266 } finally { 267 objectStream.close(); 268 } 269 } catch (Exception e) { 270 logger.error("Failed to write packages (%s)", e.getLocalizedMessage()); 271 e.printStackTrace(); 272 } 273 } 274 } 275