1 /*
2  * Copyright (C) Mellanox Technologies Ltd. 2019. ALL RIGHTS RESERVED.
3  * See file LICENSE for terms.
4  */
5 package org.openucx.jucx.ucp;
6 
7 import org.openucx.jucx.*;
8 
9 import java.io.Closeable;
10 import java.nio.ByteBuffer;
11 
12 public class UcpEndpoint extends UcxNativeStruct implements Closeable {
13     private final String paramsString;
14     // Keep a reference to errorHandler to prevent it from GC and have valid ref
15     // from JNI error handler.
16     private final UcpEndpointErrorHandler errorHandler;
17 
18     @Override
toString()19     public String toString() {
20         return "UcpEndpoint(id=" + getNativeId() + ", " + paramsString + ")";
21     }
22 
UcpEndpoint(UcpWorker worker, UcpEndpointParams params)23     public UcpEndpoint(UcpWorker worker, UcpEndpointParams params) {
24         // For backward compatibility and better error tracking always set ep error handler.
25         if (params.errorHandler == null) {
26             params.setErrorHandler((ep, status, errorMsg) -> {
27                 throw new UcxException("Endpoint " + ep.toString() +
28                     " error: " + errorMsg);
29             });
30         }
31         this.errorHandler = params.errorHandler;
32         this.paramsString = params.toString();
33         setNativeId(createEndpointNative(params, worker.getNativeId()));
34     }
35 
36     @Override
close()37     public void close() {
38         destroyEndpointNative(getNativeId());
39         setNativeId(null);
40     }
41 
42     /**
43      * This routine unpacks the remote key (RKEY) object into the local memory
44      * such that it can be accessed and used by UCP routines.
45      * @param rkeyBuffer - Packed remote key buffer
46      *                     (see {@link UcpMemory#getRemoteKeyBuffer()}).
47      */
unpackRemoteKey(ByteBuffer rkeyBuffer)48     public UcpRemoteKey unpackRemoteKey(ByteBuffer rkeyBuffer) {
49         return unpackRemoteKey(getNativeId(),
50                     UcxUtils.getAddress(rkeyBuffer));
51     }
52 
checkRemoteAccessParams(ByteBuffer buf, UcpRemoteKey remoteKey)53     private void checkRemoteAccessParams(ByteBuffer buf, UcpRemoteKey remoteKey) {
54         if (!buf.isDirect()) {
55             throw new UcxException("Data buffer must be direct.");
56         }
57         if (remoteKey.getNativeId() == null) {
58             throw new UcxException("Remote key is null.");
59         }
60     }
61 
62     /**
63      * Non-blocking remote memory put operation.
64      * This routine initiates a storage of contiguous block of data that is
65      * described by the local {@code src} buffer, starting of it's {@code src.position()}
66      * and size {@code src.remaining()} in the remote contiguous memory
67      * region described by {@code remoteAddress} address and the {@code remoteKey} "memory
68      * handle". The routine returns immediately and <strong>does</strong> not
69      * guarantee re-usability of the source {@code data} buffer.
70      * {@code callback} is invoked on completion of this operation.
71      */
putNonBlocking(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey, UcxCallback callback)72     public UcpRequest putNonBlocking(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey,
73                                      UcxCallback callback) {
74 
75         checkRemoteAccessParams(src, remoteKey);
76 
77         return putNonBlocking(UcxUtils.getAddress(src), src.remaining(), remoteAddress,
78             remoteKey, callback);
79     }
80 
putNonBlocking(long localAddress, long size, long remoteAddress, UcpRemoteKey remoteKey, UcxCallback callback)81     public UcpRequest putNonBlocking(long localAddress, long size,
82                                      long remoteAddress, UcpRemoteKey remoteKey,
83                                      UcxCallback callback) {
84 
85         return putNonBlockingNative(getNativeId(), localAddress,
86             size, remoteAddress, remoteKey.getNativeId(), callback);
87     }
88 
89     /**
90      * This routine initiates a storage of contiguous block of data that is
91      * described by the local {@code buffer} in the remote contiguous memory
92      * region described by {@code remoteAddress} and the {@code remoteKey}
93      * "memory handle". The routine returns immediately and does not
94      * guarantee re-usability of the source {@code src} buffer.
95      */
putNonBlockingImplicit(ByteBuffer src, long remoteAddress, UcpRemoteKey remoteKey)96     public void putNonBlockingImplicit(ByteBuffer src, long remoteAddress,
97                                        UcpRemoteKey remoteKey) {
98         checkRemoteAccessParams(src, remoteKey);
99 
100         putNonBlockingImplicit(UcxUtils.getAddress(src), src.remaining(), remoteAddress,
101             remoteKey);
102     }
103 
104     /**
105      * This routine initiates a storage of contiguous block of data that is
106      * described by the local {@code localAddress} in the remote contiguous memory
107      * region described by {@code remoteAddress} and the {@code remoteKey}
108      * "memory handle". The routine returns immediately and does not
109      * guarantee re-usability of the source {@code localAddress} address.
110      */
putNonBlockingImplicit(long localAddress, long size, long remoteAddress, UcpRemoteKey remoteKey)111     public void putNonBlockingImplicit(long localAddress, long size,
112                                        long remoteAddress, UcpRemoteKey remoteKey) {
113         putNonBlockingImplicitNative(getNativeId(), localAddress, size, remoteAddress,
114             remoteKey.getNativeId());
115     }
116 
117     /**
118      * Non-blocking remote memory get operation.
119      * This routine initiates a load of a contiguous block of data that is
120      * described by the remote memory address {@code remoteAddress} and the
121      * {@code remoteKey} "memory handle". The routine returns immediately and <strong>does</strong>
122      * not guarantee that remote data is loaded and stored under the local {@code dst} buffer
123      * starting of it's {@code dst.position()} and size {@code dst.remaining()}.
124      * {@code callback} is invoked on completion of this operation.
125      * @return {@link UcpRequest} object that can be monitored for completion.
126      */
getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, ByteBuffer dst, UcxCallback callback)127     public UcpRequest getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey,
128                                      ByteBuffer dst, UcxCallback callback) {
129 
130         checkRemoteAccessParams(dst, remoteKey);
131 
132         return getNonBlocking(remoteAddress, remoteKey, UcxUtils.getAddress(dst),
133             dst.remaining(), callback);
134     }
135 
getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey, long localAddress, long size, UcxCallback callback)136     public UcpRequest getNonBlocking(long remoteAddress, UcpRemoteKey remoteKey,
137                                      long localAddress, long size, UcxCallback callback) {
138 
139         return getNonBlockingNative(getNativeId(), remoteAddress, remoteKey.getNativeId(),
140             localAddress, size, callback);
141     }
142 
143     /**
144      * Non-blocking implicit remote memory get operation.
145      * This routine initiate a load of contiguous block of data that is described
146      * by the remote memory address {@code remoteAddress} and the
147      * {@code remoteKey} "memory handle" in the local contiguous memory region described
148      * by {@code dst} buffer. The routine returns immediately and does not guarantee that
149      * remote data is loaded and stored under the local buffer.
150      */
getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, ByteBuffer dst)151     public void getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey,
152                                        ByteBuffer dst) {
153         checkRemoteAccessParams(dst, remoteKey);
154 
155         getNonBlockingImplicit(remoteAddress, remoteKey, UcxUtils.getAddress(dst),
156             dst.remaining());
157     }
158 
159     /**
160      * Non-blocking implicit remote memory get operation.
161      * This routine initiate a load of contiguous block of data that is described
162      * by the remote memory address {@code remoteAddress} and the
163      * {@code remoteKey} "memory handle" in the local contiguous memory region described
164      * by {@code localAddress} the local address. The routine returns immediately
165      * and does not guarantee that remote data is loaded and stored under the local buffer.
166      */
getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey, long localAddress, long size)167     public void getNonBlockingImplicit(long remoteAddress, UcpRemoteKey remoteKey,
168                                        long localAddress, long size) {
169 
170         getNonBlockingImplicitNative(getNativeId(), remoteAddress, remoteKey.getNativeId(),
171               localAddress, size);
172     }
173 
174     /**
175      * Non-blocking tagged-send operations
176      * This routine sends a messages that is described by the local buffer {@code sendBuffer},
177      * starting of it's {@code sendBuffer.position()} and size {@code sendBuffer.remaining()}.
178      * to the destination endpoint. Each message is associated with a {@code tag} value
179      * that is used for message matching on the
180      * {@link UcpWorker#recvTaggedNonBlocking(ByteBuffer, long, long, UcxCallback)}
181      * "receiver". The routine is non-blocking and therefore returns immediately,
182      * however the actual send operation may be delayed.
183      * The send operation is considered completed when  it is safe to reuse the source
184      * {@code data} buffer. {@code callback} is invoked on completion of this operation.
185      */
sendTaggedNonBlocking(ByteBuffer sendBuffer, long tag, UcxCallback callback)186     public UcpRequest sendTaggedNonBlocking(ByteBuffer sendBuffer, long tag, UcxCallback callback) {
187         if (!sendBuffer.isDirect()) {
188             throw new UcxException("Send buffer must be direct.");
189         }
190         return sendTaggedNonBlocking(UcxUtils.getAddress(sendBuffer),
191           sendBuffer.remaining(), tag, callback);
192     }
193 
sendTaggedNonBlocking(long localAddress, long size, long tag, UcxCallback callback)194     public UcpRequest sendTaggedNonBlocking(long localAddress, long size,
195                                             long tag, UcxCallback callback) {
196 
197         return sendTaggedNonBlockingNative(getNativeId(),
198             localAddress, size, tag, callback);
199     }
200 
201     /**
202      * Non blocking send operation. Invokes
203      * {@link UcpEndpoint#sendTaggedNonBlocking(ByteBuffer, long, UcxCallback)} with default 0 tag.
204      */
sendTaggedNonBlocking(ByteBuffer sendBuffer, UcxCallback callback)205     public UcpRequest sendTaggedNonBlocking(ByteBuffer sendBuffer, UcxCallback callback) {
206         return sendTaggedNonBlocking(sendBuffer, 0, callback);
207     }
208 
209     /**
210      * This routine sends data that is described by the local address to the destination endpoint.
211      * The routine is non-blocking and therefore returns immediately, however the actual send
212      * operation may be delayed. The send operation is considered completed when it is safe
213      * to reuse the source buffer. The UCP library will schedule invocation of the call-back upon
214      * completion of the send operation.
215      */
sendStreamNonBlocking(long localAddress, long size, UcxCallback callback)216     public UcpRequest sendStreamNonBlocking(long localAddress, long size, UcxCallback callback) {
217         return sendStreamNonBlockingNative(getNativeId(), localAddress, size, callback);
218     }
219 
sendStreamNonBlocking(ByteBuffer buffer, UcxCallback callback)220     public UcpRequest sendStreamNonBlocking(ByteBuffer buffer, UcxCallback callback) {
221         return sendStreamNonBlockingNative(getNativeId(), UcxUtils.getAddress(buffer),
222             buffer.remaining(), callback);
223     }
224 
225     /**
226      * This routine receives data that is described by the local address and a size on the endpoint.
227      * The routine is non-blocking and therefore returns immediately. The receive operation is
228      * considered complete when the message is delivered to the buffer.
229      * In order to notify the application about completion of a scheduled receive operation,
230      * the UCP library will invoke the call-back when data is in the receive buffer
231      * and ready for application access.
232      */
recvStreamNonBlocking(long localAddress, long size, long flags, UcxCallback callback)233     public UcpRequest recvStreamNonBlocking(long localAddress, long size, long flags,
234                                             UcxCallback callback) {
235         return recvStreamNonBlockingNative(getNativeId(), localAddress, size, flags, callback);
236     }
237 
recvStreamNonBlocking(ByteBuffer buffer, long flags, UcxCallback callback)238     public UcpRequest recvStreamNonBlocking(ByteBuffer buffer, long flags, UcxCallback callback) {
239         return recvStreamNonBlocking(UcxUtils.getAddress(buffer), buffer.remaining(), flags,
240             callback);
241     }
242 
243     /**
244      * This routine flushes all outstanding AMO and RMA communications on this endpoint.
245      * All the AMO and RMA operations issued on this endpoint prior to this call
246      * are completed both at the origin and at the target.
247      */
flushNonBlocking(UcxCallback callback)248     public UcpRequest flushNonBlocking(UcxCallback callback) {
249         return flushNonBlockingNative(getNativeId(), callback);
250     }
251 
252     /**
253      * Releases the endpoint without any confirmation from the peer. All
254      * outstanding requests will be completed with UCS_ERR_CANCELED error.
255      * This mode may cause transport level errors on remote side, so it requires set
256      * {@link UcpEndpointParams#setPeerErrorHandlingMode()} for all endpoints created on
257      * both (local and remote) sides to avoid undefined behavior.
258      */
closeNonBlockingForce()259     public UcpRequest closeNonBlockingForce() {
260         return closeNonBlockingNative(getNativeId(), UcpConstants.UCP_EP_CLOSE_MODE_FORCE);
261     }
262 
263     /**
264      * Releases the endpoint by scheduling flushes on all outstanding operations.
265      */
closeNonBlockingFlush()266     public UcpRequest closeNonBlockingFlush() {
267         return closeNonBlockingNative(getNativeId(), UcpConstants.UCP_EP_CLOSE_MODE_FLUSH);
268     }
269 
createEndpointNative(UcpEndpointParams params, long workerId)270     private native long createEndpointNative(UcpEndpointParams params, long workerId);
271 
destroyEndpointNative(long epId)272     private static native void destroyEndpointNative(long epId);
273 
unpackRemoteKey(long epId, long rkeyAddress)274     private static native UcpRemoteKey unpackRemoteKey(long epId, long rkeyAddress);
275 
putNonBlockingNative(long enpointId, long localAddress, long size, long remoteAddr, long ucpRkeyId, UcxCallback callback)276     private static native UcpRequest putNonBlockingNative(long enpointId, long localAddress,
277                                                           long size, long remoteAddr,
278                                                           long ucpRkeyId, UcxCallback callback);
279 
putNonBlockingImplicitNative(long enpointId, long localAddress, long size, long remoteAddr, long ucpRkeyId)280     private static native void putNonBlockingImplicitNative(long enpointId, long localAddress,
281                                                             long size, long remoteAddr,
282                                                             long ucpRkeyId);
283 
getNonBlockingNative(long enpointId, long remoteAddress, long ucpRkeyId, long localAddress, long size, UcxCallback callback)284     private static native UcpRequest getNonBlockingNative(long enpointId, long remoteAddress,
285                                                           long ucpRkeyId, long localAddress,
286                                                           long size, UcxCallback callback);
287 
getNonBlockingImplicitNative(long enpointId, long remoteAddress, long ucpRkeyId, long localAddress, long size)288     private static native void getNonBlockingImplicitNative(long enpointId, long remoteAddress,
289                                                             long ucpRkeyId, long localAddress,
290                                                             long size);
291 
sendTaggedNonBlockingNative(long enpointId, long localAddress, long size, long tag, UcxCallback callback)292     private static native UcpRequest sendTaggedNonBlockingNative(long enpointId, long localAddress,
293                                                                  long size, long tag,
294                                                                  UcxCallback callback);
295 
sendStreamNonBlockingNative(long enpointId, long localAddress, long size, UcxCallback callback)296     private static native UcpRequest sendStreamNonBlockingNative(long enpointId, long localAddress,
297                                                                  long size, UcxCallback callback);
298 
recvStreamNonBlockingNative(long enpointId, long localAddress, long size, long flags, UcxCallback callback)299     private static native UcpRequest recvStreamNonBlockingNative(long enpointId, long localAddress,
300                                                                  long size, long flags,
301                                                                  UcxCallback callback);
302 
flushNonBlockingNative(long enpointId, UcxCallback callback)303     private static native UcpRequest flushNonBlockingNative(long enpointId, UcxCallback callback);
304 
closeNonBlockingNative(long endpointId, int mode)305     private static native UcpRequest closeNonBlockingNative(long endpointId, int mode);
306 }
307