1 /* 2 * Copyright (C) Mellanox Technologies Ltd. 2019. ALL RIGHTS RESERVED. 3 * See file LICENSE for terms. 4 */ 5 package org.openucx.jucx.ucp; 6 7 import org.openucx.jucx.*; 8 9 import java.io.Closeable; 10 import java.nio.ByteBuffer; 11 12 public class UcpEndpoint extends UcxNativeStruct implements Closeable { 13 private final String paramsString; 14 // Keep a reference to errorHandler to prevent it from GC and have valid ref 15 // from JNI error handler. 16 private final UcpEndpointErrorHandler errorHandler; 17 18 @Override toString()19 public String toString() { 20 return "UcpEndpoint(id=" + getNativeId() + ", " + paramsString + ")"; 21 } 22 UcpEndpoint(UcpWorker worker, UcpEndpointParams params)23 public UcpEndpoint(UcpWorker worker, UcpEndpointParams params) { 24 // For backward compatibility and better error tracking always set ep error handler. 25 if (params.errorHandler == null) { 26 params.setErrorHandler((ep, status, errorMsg) -> { 27 throw new UcxException("Endpoint " + ep.toString() + 28 " error: " + errorMsg); 29 }); 30 } 31 this.errorHandler = params.errorHandler; 32 this.paramsString = params.toString(); 33 setNativeId(createEndpointNative(params, worker.getNativeId())); 34 } 35 36 @Override close()37 public void close() { 38 destroyEndpointNative(getNativeId()); 39 setNativeId(null); 40 } 41 42 /** 43 * This routine unpacks the remote key (RKEY) object into the local memory 44 * such that it can be accessed and used by UCP routines. 45 * @param rkeyBuffer - Packed remote key buffer 46 * (see {@link UcpMemory#getRemoteKeyBuffer()}). 47 */ unpackRemoteKey(ByteBuffer rkeyBuffer)48 public UcpRemoteKey unpackRemoteKey(ByteBuffer rkeyBuffer) { 49 return unpackRemoteKey(getNativeId(), 50 UcxUtils.getAddress(rkeyBuffer)); 51 } 52 checkRemoteAccessParams(ByteBuffer buf, UcpRemoteKey remoteKey)53 private void checkRemoteAccessParams(ByteBuffer buf, UcpRemoteKey remoteKey) { 54 if (!buf.isDirect()) { 55 throw new UcxException("Data buffer must be direct."); 56 } 57 if (remoteKey.getNativeId() == null) { 58 throw new UcxException("Remote key is null."); 59 } 60 } 61 62 /** 63 * Non-blocking remote memory put operation. 64 * This routine initiates a storage of contiguous block of data that is 65 * described by the local {@code src} buffer, starting of it's {@code src.position()} 66 * and size {@code src.remaining()} in the remote contiguous memory 67 * region described by {@code remoteAddress} address and the {@code remoteKey} "memory 68 * handle". The routine returns immediately and <strong>does</strong> not 69 * guarantee re-usability of the source {@code data} buffer. 70 * {@code callback} is invoked on completion of this operation. 71 */ putNonBlocking(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey, UcxCallback callback)72 public UcpRequest putNonBlocking(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey, 73 UcxCallback callback) { 74 75 checkRemoteAccessParams(src, remoteKey); 76 77 return putNonBlocking(UcxUtils.getAddress(src), src.remaining(), remoteAddress, 78 remoteKey, callback); 79 } 80 putNonBlocking(long localAddress, long size, long remoteAddress, UcpRemoteKey remoteKey, UcxCallback callback)81 public UcpRequest putNonBlocking(long localAddress, long size, 82 long remoteAddress, UcpRemoteKey remoteKey, 83 UcxCallback callback) { 84 85 return putNonBlockingNative(getNativeId(), localAddress, 86 size, remoteAddress, remoteKey.getNativeId(), callback); 87 } 88 89 /** 90 * This routine initiates a storage of contiguous block of data that is 91 * described by the local {@code buffer} in the remote contiguous memory 92 * region described by {@code remoteAddress} and the {@code remoteKey} 93 * "memory handle". The routine returns immediately and does not 94 * guarantee re-usability of the source {@code src} buffer. 95 */ putNonBlockingImplicit(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey)96 public void putNonBlockingImplicit(ByteBuffer src, long remoteAddress, 97 UcpRemoteKey remoteKey) { 98 checkRemoteAccessParams(src, remoteKey); 99 100 putNonBlockingImplicit(UcxUtils.getAddress(src), src.remaining(), remoteAddress, 101 remoteKey); 102 } 103 104 /** 105 * This routine initiates a storage of contiguous block of data that is 106 * described by the local {@code localAddress} in the remote contiguous memory 107 * region described by {@code remoteAddress} and the {@code remoteKey} 108 * "memory handle". The routine returns immediately and does not 109 * guarantee re-usability of the source {@code localAddress} address. 110 */ putNonBlockingImplicit(long localAddress, long size, long remoteAddress, UcpRemoteKey remoteKey)111 public void putNonBlockingImplicit(long localAddress, long size, 112 long remoteAddress, UcpRemoteKey remoteKey) { 113 putNonBlockingImplicitNative(getNativeId(), localAddress, size, remoteAddress, 114 remoteKey.getNativeId()); 115 } 116 117 /** 118 * Non-blocking remote memory get operation. 119 * This routine initiates a load of a contiguous block of data that is 120 * described by the remote memory address {@code remoteAddress} and the 121 * {@code remoteKey} "memory handle". The routine returns immediately and <strong>does</strong> 122 * not guarantee that remote data is loaded and stored under the local {@code dst} buffer 123 * starting of it's {@code dst.position()} and size {@code dst.remaining()}. 124 * {@code callback} is invoked on completion of this operation. 125 * @return {@link UcpRequest} object that can be monitored for completion. 126 */ getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, ByteBuffer dst, UcxCallback callback)127 public UcpRequest getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, 128 ByteBuffer dst, UcxCallback callback) { 129 130 checkRemoteAccessParams(dst, remoteKey); 131 132 return getNonBlocking(remoteAddress, remoteKey, UcxUtils.getAddress(dst), 133 dst.remaining(), callback); 134 } 135 getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, long localAddress, long size, UcxCallback callback)136 public UcpRequest getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, 137 long localAddress, long size, UcxCallback callback) { 138 139 return getNonBlockingNative(getNativeId(), remoteAddress, remoteKey.getNativeId(), 140 localAddress, size, callback); 141 } 142 143 /** 144 * Non-blocking implicit remote memory get operation. 145 * This routine initiate a load of contiguous block of data that is described 146 * by the remote memory address {@code remoteAddress} and the 147 * {@code remoteKey} "memory handle" in the local contiguous memory region described 148 * by {@code dst} buffer. The routine returns immediately and does not guarantee that 149 * remote data is loaded and stored under the local buffer. 150 */ getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, ByteBuffer dst)151 public void getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, 152 ByteBuffer dst) { 153 checkRemoteAccessParams(dst, remoteKey); 154 155 getNonBlockingImplicit(remoteAddress, remoteKey, UcxUtils.getAddress(dst), 156 dst.remaining()); 157 } 158 159 /** 160 * Non-blocking implicit remote memory get operation. 161 * This routine initiate a load of contiguous block of data that is described 162 * by the remote memory address {@code remoteAddress} and the 163 * {@code remoteKey} "memory handle" in the local contiguous memory region described 164 * by {@code localAddress} the local address. The routine returns immediately 165 * and does not guarantee that remote data is loaded and stored under the local buffer. 166 */ getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, long localAddress, long size)167 public void getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, 168 long localAddress, long size) { 169 170 getNonBlockingImplicitNative(getNativeId(), remoteAddress, remoteKey.getNativeId(), 171 localAddress, size); 172 } 173 174 /** 175 * Non-blocking tagged-send operations 176 * This routine sends a messages that is described by the local buffer {@code sendBuffer}, 177 * starting of it's {@code sendBuffer.position()} and size {@code sendBuffer.remaining()}. 178 * to the destination endpoint. Each message is associated with a {@code tag} value 179 * that is used for message matching on the 180 * {@link UcpWorker#recvTaggedNonBlocking(ByteBuffer, long, long, UcxCallback)} 181 * "receiver". The routine is non-blocking and therefore returns immediately, 182 * however the actual send operation may be delayed. 183 * The send operation is considered completed when it is safe to reuse the source 184 * {@code data} buffer. {@code callback} is invoked on completion of this operation. 185 */ sendTaggedNonBlocking(ByteBuffer sendBuffer, long tag, UcxCallback callback)186 public UcpRequest sendTaggedNonBlocking(ByteBuffer sendBuffer, long tag, UcxCallback callback) { 187 if (!sendBuffer.isDirect()) { 188 throw new UcxException("Send buffer must be direct."); 189 } 190 return sendTaggedNonBlocking(UcxUtils.getAddress(sendBuffer), 191 sendBuffer.remaining(), tag, callback); 192 } 193 sendTaggedNonBlocking(long localAddress, long size, long tag, UcxCallback callback)194 public UcpRequest sendTaggedNonBlocking(long localAddress, long size, 195 long tag, UcxCallback callback) { 196 197 return sendTaggedNonBlockingNative(getNativeId(), 198 localAddress, size, tag, callback); 199 } 200 201 /** 202 * Non blocking send operation. Invokes 203 * {@link UcpEndpoint#sendTaggedNonBlocking(ByteBuffer, long, UcxCallback)} with default 0 tag. 204 */ sendTaggedNonBlocking(ByteBuffer sendBuffer, UcxCallback callback)205 public UcpRequest sendTaggedNonBlocking(ByteBuffer sendBuffer, UcxCallback callback) { 206 return sendTaggedNonBlocking(sendBuffer, 0, callback); 207 } 208 209 /** 210 * This routine sends data that is described by the local address to the destination endpoint. 211 * The routine is non-blocking and therefore returns immediately, however the actual send 212 * operation may be delayed. The send operation is considered completed when it is safe 213 * to reuse the source buffer. The UCP library will schedule invocation of the call-back upon 214 * completion of the send operation. 215 */ sendStreamNonBlocking(long localAddress, long size, UcxCallback callback)216 public UcpRequest sendStreamNonBlocking(long localAddress, long size, UcxCallback callback) { 217 return sendStreamNonBlockingNative(getNativeId(), localAddress, size, callback); 218 } 219 sendStreamNonBlocking(ByteBuffer buffer, UcxCallback callback)220 public UcpRequest sendStreamNonBlocking(ByteBuffer buffer, UcxCallback callback) { 221 return sendStreamNonBlockingNative(getNativeId(), UcxUtils.getAddress(buffer), 222 buffer.remaining(), callback); 223 } 224 225 /** 226 * This routine receives data that is described by the local address and a size on the endpoint. 227 * The routine is non-blocking and therefore returns immediately. The receive operation is 228 * considered complete when the message is delivered to the buffer. 229 * In order to notify the application about completion of a scheduled receive operation, 230 * the UCP library will invoke the call-back when data is in the receive buffer 231 * and ready for application access. 232 */ recvStreamNonBlocking(long localAddress, long size, long flags, UcxCallback callback)233 public UcpRequest recvStreamNonBlocking(long localAddress, long size, long flags, 234 UcxCallback callback) { 235 return recvStreamNonBlockingNative(getNativeId(), localAddress, size, flags, callback); 236 } 237 recvStreamNonBlocking(ByteBuffer buffer, long flags, UcxCallback callback)238 public UcpRequest recvStreamNonBlocking(ByteBuffer buffer, long flags, UcxCallback callback) { 239 return recvStreamNonBlocking(UcxUtils.getAddress(buffer), buffer.remaining(), flags, 240 callback); 241 } 242 243 /** 244 * This routine flushes all outstanding AMO and RMA communications on this endpoint. 245 * All the AMO and RMA operations issued on this endpoint prior to this call 246 * are completed both at the origin and at the target. 247 */ flushNonBlocking(UcxCallback callback)248 public UcpRequest flushNonBlocking(UcxCallback callback) { 249 return flushNonBlockingNative(getNativeId(), callback); 250 } 251 252 /** 253 * Releases the endpoint without any confirmation from the peer. All 254 * outstanding requests will be completed with UCS_ERR_CANCELED error. 255 * This mode may cause transport level errors on remote side, so it requires set 256 * {@link UcpEndpointParams#setPeerErrorHandlingMode()} for all endpoints created on 257 * both (local and remote) sides to avoid undefined behavior. 258 */ closeNonBlockingForce()259 public UcpRequest closeNonBlockingForce() { 260 return closeNonBlockingNative(getNativeId(), UcpConstants.UCP_EP_CLOSE_MODE_FORCE); 261 } 262 263 /** 264 * Releases the endpoint by scheduling flushes on all outstanding operations. 265 */ closeNonBlockingFlush()266 public UcpRequest closeNonBlockingFlush() { 267 return closeNonBlockingNative(getNativeId(), UcpConstants.UCP_EP_CLOSE_MODE_FLUSH); 268 } 269 createEndpointNative(UcpEndpointParams params, long workerId)270 private native long createEndpointNative(UcpEndpointParams params, long workerId); 271 destroyEndpointNative(long epId)272 private static native void destroyEndpointNative(long epId); 273 unpackRemoteKey(long epId, long rkeyAddress)274 private static native UcpRemoteKey unpackRemoteKey(long epId, long rkeyAddress); 275 putNonBlockingNative(long enpointId, long localAddress, long size, long remoteAddr, long ucpRkeyId, UcxCallback callback)276 private static native UcpRequest putNonBlockingNative(long enpointId, long localAddress, 277 long size, long remoteAddr, 278 long ucpRkeyId, UcxCallback callback); 279 putNonBlockingImplicitNative(long enpointId, long localAddress, long size, long remoteAddr, long ucpRkeyId)280 private static native void putNonBlockingImplicitNative(long enpointId, long localAddress, 281 long size, long remoteAddr, 282 long ucpRkeyId); 283 getNonBlockingNative(long enpointId, long remoteAddress, long ucpRkeyId, long localAddress, long size, UcxCallback callback)284 private static native UcpRequest getNonBlockingNative(long enpointId, long remoteAddress, 285 long ucpRkeyId, long localAddress, 286 long size, UcxCallback callback); 287 getNonBlockingImplicitNative(long enpointId, long remoteAddress, long ucpRkeyId, long localAddress, long size)288 private static native void getNonBlockingImplicitNative(long enpointId, long remoteAddress, 289 long ucpRkeyId, long localAddress, 290 long size); 291 sendTaggedNonBlockingNative(long enpointId, long localAddress, long size, long tag, UcxCallback callback)292 private static native UcpRequest sendTaggedNonBlockingNative(long enpointId, long localAddress, 293 long size, long tag, 294 UcxCallback callback); 295 sendStreamNonBlockingNative(long enpointId, long localAddress, long size, UcxCallback callback)296 private static native UcpRequest sendStreamNonBlockingNative(long enpointId, long localAddress, 297 long size, UcxCallback callback); 298 recvStreamNonBlockingNative(long enpointId, long localAddress, long size, long flags, UcxCallback callback)299 private static native UcpRequest recvStreamNonBlockingNative(long enpointId, long localAddress, 300 long size, long flags, 301 UcxCallback callback); 302 flushNonBlockingNative(long enpointId, UcxCallback callback)303 private static native UcpRequest flushNonBlockingNative(long enpointId, UcxCallback callback); 304 closeNonBlockingNative(long endpointId, int mode)305 private static native UcpRequest closeNonBlockingNative(long endpointId, int mode); 306 } 307