package org.xbib.elasticsearch.client.transport; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.PlainTransportFuture; import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.ResponseHandlerFailureTransportException; import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportResponseOptions; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; /** * */ public class XbibTransportService extends AbstractLifecycleComponent { private static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; private static final Setting> TRACE_LOG_INCLUDE_SETTING = Setting.listSetting("transport.tracer.include", Collections.emptyList(), Function.identity(), Property.Dynamic, Property.NodeScope); private static final Setting> TRACE_LOG_EXCLUDE_SETTING = Setting.listSetting("transport.tracer.exclude", Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), Property.Dynamic, Property.NodeScope); private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); private final Transport transport; private final ThreadPool threadPool; private final ClusterName clusterName; private final TaskManager taskManager; private final TransportInterceptor.AsyncSender asyncSender; private final Function localNodeFactory; private volatile Map> requestHandlers = Collections.emptyMap(); private final Object requestHandlerMutex = new Object(); private final ConcurrentMapLong> clientHandlers = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final TransportInterceptor interceptor; // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them private final Map timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap(100, .75F, true) { private static final long serialVersionUID = 9174428975922394994L; @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() > 100; } }); private final Logger tracerLog; private volatile String[] tracerLogInclude; private volatile String[] tracerLogExclude; private volatile DiscoveryNode localNode = null; private final Transport.Connection localNodeConnection = new Transport.Connection() { @Override public DiscoveryNode getNode() { return localNode; } @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { sendLocalRequest(requestId, action, request, options); } @Override public void close() throws IOException { } }; /** * Build the service. * * @param clusterSettings if non null the the {@linkplain XbibTransportService} will register * with the {@link ClusterSettings} for settings updates for * {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ XbibTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings) { super(settings); this.transport = transport; this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); taskManager = createTaskManager(); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); } } private TaskManager createTaskManager() { return new TaskManager(settings); } private void setTracerLogInclude(List tracerLogInclude) { this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY); } private void setTracerLogExclude(List tracerLogExclude) { this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY); } @Override protected void doStart() { rxMetric.clear(); txMetric.clear(); transport.setTransportService(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); for (Map.Entry entry : transport.profileBoundAddresses().entrySet()) { logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); } } localNode = localNodeFactory.apply(transport.boundAddress()); registerRequestHandler(HANDSHAKE_ACTION_NAME, () -> HandshakeRequest.INSTANCE, ThreadPool.Names.SAME, (request, channel) -> channel.sendResponse(new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); } @Override protected void doStop() { try { transport.stop(); } finally { // in case the transport is not connected to our local node (thus cleaned on node disconnect) // make sure to clean any leftover on going handles for (Map.Entry> entry : clientHandlers.entrySet()) { final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey()); if (holderToNotify != null) { // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows threadPool.generic().execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { // if we get rejected during node shutdown we don't wanna bubble it up logger.debug((Supplier) () -> new ParameterizedMessage( "failed to notify response handler on rejection, action: {}", holderToNotify.action()), e); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", holderToNotify.action()), e); } @Override public void doRun() { TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action()); holderToNotify.handler().handleException(ex); } }); } } } } @Override protected void doClose() { transport.close(); } /** * Start accepting incoming requests. * when the transport layer starts up it will block any incoming requests until * this method is called */ final void acceptIncomingRequests() { blockIncomingRequestsLatch.countDown(); } /** * Returns true iff the given node is already connected. */ boolean nodeConnected(DiscoveryNode node) { return isLocalNode(node) || transport.nodeConnected(node); } /** * Connect to the specified node. * * @param node the node to connect to */ void connectToNode(final DiscoveryNode node) { if (isLocalNode(node)) { return; } transport.connectToNode(node, null, (newConnection, actualProfile) -> handshake(newConnection, actualProfile.getHandshakeTimeout().millis())); } /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection * was established with. The handshake will fail if the cluster * name on the target node mismatches the local cluster name. * * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @return the connected node * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ private DiscoveryNode handshake(final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { return handshake(connection, handshakeTimeout, clusterName::equals); } /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection * was established with. The handshake will fail if the cluster * name on the target node doesn't match the local cluster name. * * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @param clusterNamePredicate cluster name validation predicate * @return the connected node * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ private DiscoveryNode handshake(final Transport.Connection connection, final long handshakeTimeout, Predicate clusterNamePredicate) throws ConnectTransportException { final HandshakeResponse response; final DiscoveryNode node = connection.getNode(); try { PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @Override public HandshakeResponse newInstance() { return new HandshakeResponse(); } }); sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, TransportRequestOptions.builder().withTimeout(handshakeTimeout).build(), futureHandler); response = futureHandler.txGet(); } catch (Exception e) { throw new IllegalStateException("handshake failed with " + node, e); } if (!clusterNamePredicate.test(response.clusterName)) { throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node); } else if (!response.version.isCompatible(localNode.getVersion())) { throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node); } return response.discoveryNode; } void disconnectFromNode(DiscoveryNode node) { if (isLocalNode(node)) { return; } transport.disconnectFromNode(node); } TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture<>(handler); try { Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, options, futureHandler); } catch (NodeNotConnectedException ex) { futureHandler.handleException(ex); } return futureHandler; } final void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { try { Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, options, handler); } catch (NodeNotConnectedException ex) { handler.handleException(ex); } } private void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { asyncSender.sendRequest(connection, action, request, options, handler); } /** * Returns either a real transport connection or a local node connection * if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected */ private Transport.Connection getConnection(DiscoveryNode node) { if (isLocalNode(node)) { return localNodeConnection; } else { return transport.getConnection(node); } } @SuppressWarnings({"unchecked", "rawtypes"}) private void sendRequestInternal(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { if (connection == null) { throw new IllegalStateException("can't send request to a null connection"); } DiscoveryNode node = connection.getNode(); final long requestId = transport.newRequestId(); final TimeoutHandler timeoutHandler; try { if (options.timeout() == null) { timeoutHandler = null; } else { timeoutHandler = new TimeoutHandler(requestId); } Supplier storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true); TransportResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler); clientHandlers.put(requestId, new RequestHolder(responseHandler, connection.getNode(), action, timeoutHandler)); if (lifecycle.stoppedOrClosed()) { // if we are not started the exception handling will remove the RequestHolder again // and calls the handler to notify the caller. It will only notify if the toStop code // hasn't done the work yet. throw new TransportException("TransportService is closed stopped can't send request"); } if (timeoutHandler != null) { assert options.timeout() != null; timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); } connection.sendRequest(requestId, action, request, options); } catch (final Exception e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message final RequestHolder holderToNotify = clientHandlers.remove(requestId); // If holderToNotify == null then handler has already been taken care of. if (holderToNotify != null) { holderToNotify.cancelTimeout(); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { // if we get rejected during node shutdown we don't wanna bubble it up logger.debug((Supplier) () -> new ParameterizedMessage( "failed to notify response handler on rejection, action: {}", holderToNotify.action()), e); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", holderToNotify.action()), e); } @Override protected void doRun() throws Exception { holderToNotify.handler().handleException(sendRequestException); } }); } else { logger.debug("Exception while sending request, handler likely already notified due to timeout", e); } } } private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { final DirectResponseChannel channel = new DirectResponseChannel(logger, localNode, action, requestId, adapter, threadPool); try { adapter.onRequestSent(localNode, requestId, action, request, options); adapter.onRequestReceived(requestId, action); final RequestHandlerRegistry reg = adapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } final String executor = reg.getExecutor(); if (ThreadPool.Names.SAME.equals(executor)) { reg.processMessageReceived(request, channel); } else { threadPool.executor(executor).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { reg.processMessageReceived(request, channel); } @Override public boolean isForceExecution() { return reg.isForceExecution(); } @Override public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn((Supplier) () -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner); } } }); } } catch (Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn( (Supplier) () -> new ParameterizedMessage( "failed to notify channel of error message for action [{}]", action), inner); } } } private boolean shouldTraceAction(String action) { if (tracerLogInclude.length > 0) { if (!Regex.simpleMatch(tracerLogInclude, action)) { return false; } } return tracerLogExclude.length <= 0 || !Regex.simpleMatch(tracerLogExclude, action); } /** * Registers a new request handler. * * @param action the action the request handler is associated with * @param request the request class that will be used to construct new instances for streaming * @param executor the executor the request handling will be executed on * @param handler the handler itself that implements the request handling */ private void registerRequestHandler(String action, Supplier request, String executor, TransportRequestHandler handler) { handler = interceptor.interceptHandler(action, executor, false, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, request, taskManager, handler, executor, false, false); registerRequestHandler(reg); } @SuppressWarnings("unchecked") private void registerRequestHandler(RequestHandlerRegistry reg) { synchronized (requestHandlerMutex) { if (requestHandlers.containsKey(reg.getAction())) { throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered"); } requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), (RequestHandlerRegistry) reg).immutableMap(); } } private boolean isLocalNode(DiscoveryNode discoveryNode) { return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); } static class HandshakeRequest extends TransportRequest { static final HandshakeRequest INSTANCE = new HandshakeRequest(); private HandshakeRequest() { } } /** * */ public static class HandshakeResponse extends TransportResponse { private DiscoveryNode discoveryNode; private ClusterName clusterName; private Version version; /** * For extern construction. */ public HandshakeResponse() { } HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { this.discoveryNode = discoveryNode; this.version = version; this.clusterName = clusterName; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); clusterName = new ClusterName(in); version = Version.readVersion(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalWriteable(discoveryNode); clusterName.writeTo(out); Version.writeVersion(version, out); } } private final class Adapter implements TransportServiceAdapter { final MeanMetric rxMetric = new MeanMetric(); final MeanMetric txMetric = new MeanMetric(); @Override public void addBytesReceived(long size) { rxMetric.inc(size); } @Override public void addBytesSent(long size) { txMetric.inc(size); } @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) { if (traceEnabled() && shouldTraceAction(action)) { traceRequestSent(node, requestId, action, options); } } boolean traceEnabled() { return tracerLog.isTraceEnabled(); } @Override public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) { if (traceEnabled() && shouldTraceAction(action)) { traceResponseSent(requestId, action); } } @Override public void onResponseSent(long requestId, String action, Exception e) { if (traceEnabled() && shouldTraceAction(action)) { traceResponseSent(requestId, action, e); } } void traceResponseSent(long requestId, String action, Exception e) { tracerLog.trace( (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage("[{}][{}] sent error response", requestId, action), e); } @Override public void onRequestReceived(long requestId, String action) { try { blockIncomingRequestsLatch.await(); } catch (InterruptedException e) { logger.trace("interrupted while waiting for incoming requests block to be removed"); } if (traceEnabled() && shouldTraceAction(action)) { traceReceivedRequest(requestId, action); } } @Override public RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } @Override public TransportResponseHandler onResponseReceived(final long requestId) { RequestHolder holder = clientHandlers.remove(requestId); if (holder == null) { checkForTimeout(requestId); return null; } holder.cancelTimeout(); if (traceEnabled() && shouldTraceAction(holder.action())) { traceReceivedResponse(requestId, holder.node(), holder.action()); } return holder.handler(); } void checkForTimeout(long requestId) { // lets see if its in the timeout holder, but sync on mutex to make sure any ongoing timeout // handling has finished final DiscoveryNode sourceNode; final String action; if (clientHandlers.get(requestId) != null) { throw new IllegalStateException(); } TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId); if (timeoutInfoHolder != null) { long time = System.currentTimeMillis(); logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " + "action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId); action = timeoutInfoHolder.action(); sourceNode = timeoutInfoHolder.node(); } else { logger.warn("Transport response handler not found of id [{}]", requestId); action = null; sourceNode = null; } // call tracer out of lock if (!traceEnabled()) { return; } if (action == null) { assert sourceNode == null; traceUnresolvedResponse(requestId); } else if (shouldTraceAction(action)) { traceReceivedResponse(requestId, sourceNode, action); } } @Override public void onNodeConnected(final DiscoveryNode node) { } @Override public void onConnectionOpened(DiscoveryNode node) { } @Override public void onNodeDisconnected(final DiscoveryNode node) { try { for (Map.Entry> entry : clientHandlers.entrySet()) { RequestHolder holder = entry.getValue(); if (holder.node().equals(node)) { final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey()); if (holderToNotify != null) { // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows threadPool.generic().execute(() -> holderToNotify.handler() .handleException(new NodeDisconnectedException(node, holderToNotify.action()))); } } } } catch (EsRejectedExecutionException ex) { logger.debug("Rejected execution on NodeDisconnected", ex); } } void traceReceivedRequest(long requestId, String action) { tracerLog.trace("[{}][{}] received request", requestId, action); } void traceResponseSent(long requestId, String action) { tracerLog.trace("[{}][{}] sent response", requestId, action); } void traceReceivedResponse(long requestId, DiscoveryNode sourceNode, String action) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, action, sourceNode); } void traceUnresolvedResponse(long requestId) { tracerLog.trace("[{}] received response but can't resolve it to a request", requestId); } void traceRequestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } } private final class TimeoutHandler implements Runnable { private final long requestId; private final long sentTime = System.currentTimeMillis(); volatile ScheduledFuture future; TimeoutHandler(long requestId) { this.requestId = requestId; } @Override public void run() { // we get first to make sure we only add the TimeoutInfoHandler if needed. final RequestHolder holder = clientHandlers.get(requestId); if (holder != null) { // add it to the timeout information holder, in case we are going to get a response later long timeoutTime = System.currentTimeMillis(); timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action(), sentTime, timeoutTime)); // now that we have the information visible via timeoutInfoHandlers, we try to remove the request id final RequestHolder removedHolder = clientHandlers.remove(requestId); if (removedHolder != null) { assert removedHolder == holder : "two different holder instances for request [" + requestId + "]"; removedHolder.handler().handleException( new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]")); } else { // response was processed, remove timeout info. timeoutInfoHandlers.remove(requestId); } } } /** * Cancels timeout handling. This is a best effort only to avoid running it. * Remove the requestId from {@link #clientHandlers} to make sure this doesn't run. */ void cancel() { if (clientHandlers.get(requestId) != null) { throw new IllegalStateException("cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"); } FutureUtils.cancel(future); } } private static class TimeoutInfoHolder { private final DiscoveryNode node; private final String action; private final long sentTime; private final long timeoutTime; TimeoutInfoHolder(DiscoveryNode node, String action, long sentTime, long timeoutTime) { this.node = node; this.action = action; this.sentTime = sentTime; this.timeoutTime = timeoutTime; } public DiscoveryNode node() { return node; } String action() { return action; } long sentTime() { return sentTime; } long timeoutTime() { return timeoutTime; } } private static class RequestHolder { private final TransportResponseHandler handler; private final DiscoveryNode node; private final String action; private final TimeoutHandler timeoutHandler; RequestHolder(TransportResponseHandler handler, DiscoveryNode node, String action, TimeoutHandler timeoutHandler) { this.handler = handler; this.node = node; this.action = action; this.timeoutHandler = timeoutHandler; } TransportResponseHandler handler() { return handler; } public DiscoveryNode node() { return this.node; } String action() { return this.action; } void cancelTimeout() { if (timeoutHandler != null) { timeoutHandler.cancel(); } } } /** * This handler wrapper ensures that the response thread executes with the correct thread context. * Before any of the handle methods are invoked we restore the context. * @param thr transport response type */ public static final class ContextRestoreResponseHandler implements TransportResponseHandler { private final TransportResponseHandler delegate; private final Supplier contextSupplier; ContextRestoreResponseHandler(Supplier contextSupplier, TransportResponseHandler delegate) { this.delegate = delegate; this.contextSupplier = contextSupplier; } @Override public T newInstance() { return delegate.newInstance(); } @SuppressWarnings("try") @Override public void handleResponse(T response) { try (ThreadContext.StoredContext ignore = contextSupplier.get()) { delegate.handleResponse(response); } } @SuppressWarnings("try") @Override public void handleException(TransportException exp) { try (ThreadContext.StoredContext ignore = contextSupplier.get()) { delegate.handleException(exp); } } @Override public String executor() { return delegate.executor(); } @Override public String toString() { return getClass().getName() + "/" + delegate.toString(); } } static class DirectResponseChannel implements TransportChannel { private static final String DIRECT_RESPONSE_PROFILE = ".direct"; private final Logger logger; private final DiscoveryNode localNode; private final String action; private final long requestId; private final TransportServiceAdapter adapter; private final ThreadPool threadPool; DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) { this.logger = logger; this.localNode = localNode; this.action = action; this.requestId = requestId; this.adapter = adapter; this.threadPool = threadPool; } @Override public String action() { return action; } @Override public String getProfileName() { return DIRECT_RESPONSE_PROFILE; } @Override public void sendResponse(TransportResponse response) throws IOException { sendResponse(response, TransportResponseOptions.EMPTY); } @SuppressWarnings("unchecked") @Override public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException { adapter.onResponseSent(requestId, action, response, options); final TransportResponseHandler handler = adapter.onResponseReceived(requestId); if (handler != null) { final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { processResponse(handler, response); } else { threadPool.executor(executor).execute(() -> processResponse(handler, response)); } } } void processResponse(TransportResponseHandler handler, TransportResponse response) { try { handler.handleResponse(response); } catch (Exception e) { processException(handler, wrapInRemote(new ResponseHandlerFailureTransportException(e))); } } @SuppressWarnings("unchecked") @Override public void sendResponse(Exception exception) throws IOException { adapter.onResponseSent(requestId, action, exception); final TransportResponseHandler handler = adapter.onResponseReceived(requestId); if (handler != null) { final RemoteTransportException rtx = wrapInRemote(exception); final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { processException(handler, rtx); } else { threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx)); } } } RemoteTransportException wrapInRemote(Exception e) { if (e instanceof RemoteTransportException) { return (RemoteTransportException) e; } return new RemoteTransportException(localNode.getName(), localNode.getAddress(), action, e); } void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) { try { handler.handleException(rtx); } catch (Exception e) { logger.error((Supplier) () -> new ParameterizedMessage( "failed to handle exception for action [{}], handler [{}]", action, handler), e); } } @Override public long getRequestId() { return requestId; } @Override public String getChannelType() { return "direct"; } @Override public Version getVersion() { return localNode.getVersion(); } } }