diff --git a/gradle.properties b/gradle.properties
index c582403..f36e95b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,14 +1,12 @@
group = org.xbib
name = netty-http
-version = 4.1.68.0
+version = 4.1.69.0
gradle.wrapper.version = 6.6.1
-netty.version = 4.1.68.Final
-tcnative.version = 2.0.43.Final
+netty.version = 4.1.69.Final
+tcnative.version = 2.0.44.Final
bouncycastle.version = 1.69
-reactivestreams.version = 1.0.3
-reactivex.version = 1.3.8
conscrypt.version = 2.5.2
javassist.version = 3.28.0-GA
jackson.version = 2.11.4
@@ -16,5 +14,3 @@ mockito.version = 3.10.0
xbib.net.version = 2.1.1
xbib-guice.version = 4.4.2
junit.version = 5.7.1
-# uuhh, too many tests to update to jupiter in rx...
-junit4.version = 4.13.1
diff --git a/netty-http-rx/NOTICE.txt b/netty-http-rx/NOTICE.txt
deleted file mode 100644
index 3241e88..0000000
--- a/netty-http-rx/NOTICE.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-This work is based on
-
-https://github.com/ReactiveX/RxNetty
-
-(branch 0.5.x as of 22-Sep-2019)
diff --git a/netty-http-rx/build.gradle b/netty-http-rx/build.gradle
deleted file mode 100644
index df6cb51..0000000
--- a/netty-http-rx/build.gradle
+++ /dev/null
@@ -1,8 +0,0 @@
-dependencies {
- api "io.reactivex:rxjava:${project.property('reactivex.version')}"
- implementation "io.netty:netty-codec-http:${project.property('netty.version')}"
- implementation "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
- testImplementation "org.mockito:mockito-core:${project.property('mockito.version')}"
- testRuntimeOnly "org.junit.vintage:junit-vintage-engine:${project.property('junit.version')}"
- testImplementation "junit:junit:${project.property('junit4.version')}"
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/HandlerNames.java b/netty-http-rx/src/main/java/io/reactivex/netty/HandlerNames.java
deleted file mode 100644
index 14bf1f7..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/HandlerNames.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package io.reactivex.netty;
-
-/**
- * A list of all handler names added by the framework. This is just to ensure consistency in naming.
- */
-public enum HandlerNames {
-
- SslHandler("ssl-handler"),
- SslConnectionEmissionHandler("ssl-connection-emitter"),
- WireLogging("wire-logging-handler"),
- WriteTransformer("write-transformer"),
- ClientReadTimeoutHandler("client-read-timeout-handler"),
- ClientChannelActiveBufferingHandler("client-channel-active-buffer-handler"),
- ;
-
- private final String name;
-
- HandlerNames(String name) {
- this.name = qualify(name);
- }
-
- public String getName() {
- return name;
- }
-
- private static String qualify(String name) {
- return "_rx_netty_" + name;
-
- }
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/RxNetty.java b/netty-http-rx/src/main/java/io/reactivex/netty/RxNetty.java
deleted file mode 100644
index 1eb4986..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/RxNetty.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2015 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package io.reactivex.netty;
-
-import io.reactivex.netty.threads.RxEventLoopProvider;
-import io.reactivex.netty.threads.SingleNioLoopProvider;
-
-public final class RxNetty {
-
- private static volatile RxEventLoopProvider rxEventLoopProvider = new SingleNioLoopProvider(Runtime.getRuntime().availableProcessors());
-
- private static volatile boolean usingNativeTransport;
- private static volatile boolean disableEventPublishing;
-
- private RxNetty() {
- }
-
- /**
- * An implementation of {@link RxEventLoopProvider} to be used by all clients and servers created after this call.
- *
- * @param provider New provider to use.
- *
- * @return Existing provider.
- */
- public static RxEventLoopProvider useEventLoopProvider(RxEventLoopProvider provider) {
- RxEventLoopProvider oldProvider = rxEventLoopProvider;
- rxEventLoopProvider = provider;
- return oldProvider;
- }
-
- public static RxEventLoopProvider getRxEventLoopProvider() {
- return rxEventLoopProvider;
- }
-
- /**
- * A global flag to start using netty's native protocol
- * if applicable for a client or server.
- *
- * This does not evaluate whether the native transport is available for the OS or not.
- *
- * So, this method should be called conditionally when the caller is sure that the OS supports the native protocol.
- *
- * Alternatively, this can be done selectively per client and server instance.
- */
- public static void useNativeTransportIfApplicable() {
- usingNativeTransport = true;
- }
-
- /**
- * A global flag to disable the effects of calling {@link #useNativeTransportIfApplicable()}
- */
- public static void disableNativeTransport() {
- usingNativeTransport = false;
- }
-
- /**
- * Enables publishing of events for RxNetty.
- */
- public static void enableEventPublishing() {
- disableEventPublishing = false;
- }
-
- /**
- * Disables publishing of events for RxNetty.
- */
- public static void disableEventPublishing() {
- disableEventPublishing = true;
- }
-
- /**
- * Returns {@code true} if event publishing is disabled.
- *
- * @return {@code true} if event publishing is disabled.
- */
- public static boolean isEventPublishingDisabled() {
- return disableEventPublishing;
- }
-
- public static boolean isUsingNativeTransport() {
- return usingNativeTransport;
- }
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AbstractConnectionToChannelBridge.java b/netty-http-rx/src/main/java/io/reactivex/netty/channel/AbstractConnectionToChannelBridge.java
deleted file mode 100644
index e5bd22c..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AbstractConnectionToChannelBridge.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Copyright 2015 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package io.reactivex.netty.channel;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOption;
-import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.internal.EmptyArrays;
-import io.reactivex.netty.channel.events.ConnectionEventListener;
-import io.reactivex.netty.events.EventPublisher;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import rx.Producer;
-import rx.Subscriber;
-import rx.exceptions.MissingBackpressureException;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-/**
- * A bridge between a {@link Connection} instance and the associated {@link Channel}.
- *
- * All operations on {@link Connection} will pass through this bridge to an appropriate action on the {@link Channel}
- *
- *
Lazy {@link Connection#getInput()} subscription
- *
- * Lazy subscriptions are allowed on {@link Connection#getInput()} if and only if the channel is configured to
- * not read data automatically (i.e. {@link ChannelOption#AUTO_READ} is set to {@code false}). Otherwise,
- * if {@link Connection#getInput()} is subscribed lazily, the subscriber always receives an error. The content
- * in this case is disposed upon reading.
- *
- * @param Type read from the connection held by this handler.
- * @param Type written to the connection held by this handler.
- */
-public abstract class AbstractConnectionToChannelBridge extends BackpressureManagingHandler {
-
- private static final Logger logger = Logger.getLogger(AbstractConnectionToChannelBridge.class.getName());
-
- @SuppressWarnings("ThrowableInstanceNeverThrown")
- private static final IllegalStateException ONLY_ONE_CONN_SUB_ALLOWED =
- new IllegalStateException("Only one subscriber allowed for connection observable.");
- @SuppressWarnings("ThrowableInstanceNeverThrown")
- private static final IllegalStateException ONLY_ONE_CONN_INPUT_SUB_ALLOWED =
- new IllegalStateException("Only one subscriber allowed for connection input.");
- @SuppressWarnings("ThrowableInstanceNeverThrown")
- private static final IllegalStateException LAZY_CONN_INPUT_SUB =
- new IllegalStateException("Channel is set to auto-read but the subscription was lazy.");
-
- @SuppressWarnings("ThrowableInstanceNeverThrown")
- private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
-
- static {
- ONLY_ONE_CONN_INPUT_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
- ONLY_ONE_CONN_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
- LAZY_CONN_INPUT_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
- CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
- }
-
- private final AttributeKey eventListenerAttributeKey;
- private final AttributeKey eventPublisherAttributeKey;
-
- protected ConnectionEventListener eventListener;
- protected EventPublisher eventPublisher;
- private Subscriber super Channel> newChannelSub;
- private ReadProducer readProducer;
- private boolean raiseErrorOnInputSubscription;
- private boolean connectionEmitted;
-
- protected AbstractConnectionToChannelBridge(String thisHandlerName, ConnectionEventListener eventListener,
- EventPublisher eventPublisher) {
- super(thisHandlerName);
- if (null == eventListener) {
- throw new IllegalArgumentException("Event listener can not be null.");
- }
- if (null == eventPublisher) {
- throw new IllegalArgumentException("Event publisher can not be null.");
- }
- this.eventListener = eventListener;
- this.eventPublisher = eventPublisher;
- eventListenerAttributeKey = null;
- eventPublisherAttributeKey = null;
- }
-
- protected AbstractConnectionToChannelBridge(String thisHandlerName,
- AttributeKey eventListenerAttributeKey,
- AttributeKey eventPublisherAttributeKey) {
- super(thisHandlerName);
- this.eventListenerAttributeKey = eventListenerAttributeKey;
- this.eventPublisherAttributeKey = eventPublisherAttributeKey;
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- if (null == eventListener && null == eventPublisher) {
- eventListener = ctx.channel().attr(eventListenerAttributeKey).get();
- eventPublisher = ctx.channel().attr(eventPublisherAttributeKey).get();
- }
-
- if (null == eventPublisher) {
- logger.log(Level.SEVERE, "No Event publisher bound to the channel, closing channel.");
- ctx.channel().close();
- return;
- }
-
- if (eventPublisher.publishingEnabled() && null == eventListener) {
- logger.log(Level.SEVERE, "No Event listener bound to the channel and publising is enabled, closing channel.");
- ctx.channel().close();
- return;
- }
-
- ctx.pipeline().addFirst(new BytesInspector(eventPublisher, eventListener));
-
- super.handlerAdded(ctx);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- if (!connectionEmitted && isValidToEmit(newChannelSub)) {
- emitNewConnection(ctx.channel());
- connectionEmitted = true;
- }
- super.channelInactive(ctx);
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-
- if (isValidToEmitToReadSubscriber(readProducer)) {
- /*If the subscriber is still active, then it expects data but the channel is closed.*/
- readProducer.sendOnError(CLOSED_CHANNEL_EXCEPTION);
- }
-
- super.channelUnregistered(ctx);
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof EmitConnectionEvent) {
- if (!connectionEmitted) {
- emitNewConnection(ctx.channel());
- connectionEmitted = true;
- }
- } else if (evt instanceof ConnectionCreationFailedEvent) {
- if (isValidToEmit(newChannelSub)) {
- newChannelSub.onError(((ConnectionCreationFailedEvent)evt).getThrowable());
- }
- } else if (evt instanceof ChannelSubscriberEvent) {
- @SuppressWarnings("unchecked")
- final ChannelSubscriberEvent channelSubscriberEvent = (ChannelSubscriberEvent) evt;
-
- newConnectionSubscriber(channelSubscriberEvent);
- } else if (evt instanceof ConnectionInputSubscriberEvent) {
- @SuppressWarnings("unchecked")
- ConnectionInputSubscriberEvent event = (ConnectionInputSubscriberEvent) evt;
-
- newConnectionInputSubscriber(ctx.channel(), event.getSubscriber(), false);
- } else if (evt instanceof ConnectionInputSubscriberResetEvent) {
- resetConnectionInputSubscriber();
- } else if (evt instanceof ConnectionInputSubscriberReplaceEvent) {
- @SuppressWarnings("unchecked")
- ConnectionInputSubscriberReplaceEvent event = (ConnectionInputSubscriberReplaceEvent) evt;
- replaceConnectionInputSubscriber(ctx.channel(), event);
- }
-
- super.userEventTriggered(ctx, evt);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void newMessage(ChannelHandlerContext ctx, Object msg) {
- if (isValidToEmitToReadSubscriber(readProducer)) {
- try {
- readProducer.sendOnNext((R) msg);
- } catch (ClassCastException e) {
- ReferenceCountUtil.release(msg); // Since, this was not sent to the subscriber, release the msg.
- readProducer.sendOnError(e);
- }
- } else {
- logger.log(Level.WARNING, "Data received on channel, but no subscriber registered. Discarding data. Message class: "
- + msg.getClass().getName() + ", channel: " + ctx.channel());
- ReferenceCountUtil.release(msg); // No consumer of the message, so discard.
- }
- }
-
- @Override
- public boolean shouldReadMore(ChannelHandlerContext ctx) {
- return null != readProducer && readProducer.shouldReadMore(ctx);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (!connectionEmitted && isValidToEmit(newChannelSub)) {
- newChannelSub.onError(cause);
- } else if (isValidToEmitToReadSubscriber(readProducer)) {
- readProducer.sendOnError(cause);
- } else {
- logger.log(Level.INFO, "Exception in the pipeline and none of the subscribers are active.", cause);
- }
- }
-
- protected static boolean isValidToEmit(Subscriber> subscriber) {
- return null != subscriber && !subscriber.isUnsubscribed();
- }
-
- private static boolean isValidToEmitToReadSubscriber(ReadProducer> readProducer) {
- return null != readProducer && !readProducer.subscriber.isUnsubscribed();
- }
-
- protected boolean connectionInputSubscriberExists(Channel channel) {
- assert channel.eventLoop().inEventLoop();
-
- return null != readProducer && null != readProducer.subscriber && !readProducer.subscriber.isUnsubscribed();
- }
-
- protected void onNewReadSubscriber(Subscriber super R> subscriber) {
- // NOOP
- }
-
- protected final void checkEagerSubscriptionIfConfigured(Channel channel) {
- if (channel.config().isAutoRead() && null == readProducer) {
- // If the channel is set to auto-read and there is no eager subscription then, we should raise errors
- // when a subscriber arrives.
- raiseErrorOnInputSubscription = true;
- final Subscriber super R> discardAll = ConnectionInputSubscriberEvent.discardAllInput()
- .getSubscriber();
- final ReadProducer producer = new ReadProducer<>(discardAll, channel);
- discardAll.setProducer(producer);
- readProducer = producer;
- }
- }
-
- protected final Subscriber super Channel> getNewChannelSub() {
- return newChannelSub;
- }
-
- private void emitNewConnection(Channel channel) {
- if (isValidToEmit(newChannelSub)) {
- try {
- newChannelSub.onNext(channel);
- connectionEmitted = true;
- checkEagerSubscriptionIfConfigured(channel);
- newChannelSub.onCompleted();
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Error emitting a new connection. Closing this channel.", e);
- channel.close();
- }
- } else {
- channel.close(); // Closing the connection if not sent to a subscriber.
- }
- }
-
- private void resetConnectionInputSubscriber() {
- final Subscriber super R> connInputSub = null == readProducer? null : readProducer.subscriber;
- if (isValidToEmit(connInputSub)) {
- connInputSub.onCompleted();
- }
- raiseErrorOnInputSubscription = false;
- readProducer = null; // A subsequent event should set it to the desired subscriber.
- }
-
- private void newConnectionInputSubscriber(final Channel channel, final Subscriber super R> subscriber,
- boolean replace) {
- final Subscriber super R> connInputSub = null == readProducer ? null : readProducer.subscriber;
- if (isValidToEmit(connInputSub)) {
- if (!replace) {
- /*Allow only once concurrent input subscriber but allow concatenated subscribers*/
- subscriber.onError(ONLY_ONE_CONN_INPUT_SUB_ALLOWED);
- } else {
- setNewReadProducer(channel, subscriber);
- connInputSub.onCompleted();
- }
- } else if (raiseErrorOnInputSubscription) {
- subscriber.onError(LAZY_CONN_INPUT_SUB);
- } else {
- setNewReadProducer(channel, subscriber);
- }
- }
-
- private void setNewReadProducer(Channel channel, Subscriber super R> subscriber) {
- final ReadProducer producer = new ReadProducer<>(subscriber, channel);
- subscriber.setProducer(producer);
- onNewReadSubscriber(subscriber);
- readProducer = producer;
- }
-
- private void replaceConnectionInputSubscriber(Channel channel, ConnectionInputSubscriberReplaceEvent event) {
- ConnectionInputSubscriberEvent newSubEvent = event.getNewSubEvent();
- newConnectionInputSubscriber(channel, newSubEvent.getSubscriber(),
- true);
- }
-
- private void newConnectionSubscriber(ChannelSubscriberEvent event) {
- if (null == newChannelSub) {
- newChannelSub = event.getSubscriber();
- } else {
- event.getSubscriber().onError(ONLY_ONE_CONN_SUB_ALLOWED);
- }
- }
-
- /*Visible for testing*/ static final class ReadProducer extends RequestReadIfRequiredEvent implements Producer {
-
- @SuppressWarnings("rawtypes")
- private static final AtomicLongFieldUpdater REQUEST_UPDATER =
- AtomicLongFieldUpdater.newUpdater(ReadProducer.class, "requested");/*Updater for requested*/
- private volatile long requested; // Updated by REQUEST_UPDATER, required to be volatile.
-
- private final Subscriber super T> subscriber;
- private final Channel channel;
-
- /*Visible for testing*/ ReadProducer(Subscriber super T> subscriber, Channel channel) {
- this.subscriber = subscriber;
- this.channel = channel;
- }
-
- @Override
- public void request(long n) {
- if (Long.MAX_VALUE != requested) {
- if (Long.MAX_VALUE == n) {
- // Now turning off backpressure
- REQUEST_UPDATER.set(this, Long.MAX_VALUE);
- } else {
- // add n to field but check for overflow
- while (true) {
- final long current = requested;
- long next = current + n;
- // check for overflow
- if (next < 0) {
- next = Long.MAX_VALUE;
- }
- if (REQUEST_UPDATER.compareAndSet(this, current, next)) {
- break;
- }
- }
- }
- }
-
- if (!channel.config().isAutoRead()) {
- channel.pipeline().fireUserEventTriggered(this);
- }
- }
-
- public void sendOnError(Throwable throwable) {
- subscriber.onError(throwable);
- }
-
- public void sendOnComplete() {
- subscriber.onCompleted();
- }
-
- public void sendOnNext(T nextItem) {
- if (requested > 0) {
- if (REQUEST_UPDATER.get(this) != Long.MAX_VALUE) {
- REQUEST_UPDATER.decrementAndGet(this);
- }
- subscriber.onNext(nextItem);
- } else {
- subscriber.onError(new MissingBackpressureException(
- "Received more data on the channel than demanded by the subscriber."));
- }
- }
-
- @Override
- protected boolean shouldReadMore(ChannelHandlerContext ctx) {
- return !subscriber.isUnsubscribed() && REQUEST_UPDATER.get(this) > 0;
- }
-
- /*Visible for testing*/long getRequested() {
- return requested;
- }
-
- @Override
- public String toString() {
- return "ReadProducer{" + "requested=" + requested + '}';
- }
- }
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AllocatingTransformer.java b/netty-http-rx/src/main/java/io/reactivex/netty/channel/AllocatingTransformer.java
deleted file mode 100644
index 01b0fa2..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AllocatingTransformer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package io.reactivex.netty.channel;
-
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.util.internal.TypeParameterMatcher;
-import rx.annotations.Beta;
-
-import java.util.List;
-
-/**
- * A transformer to be used for modifying the type of objects written on a {@link Connection}.
- *
- *
Why is this required?
- *
- * The type of an object can usually be transformed using {@code Observable.map()}, however, while writing on a
- * {@link Connection}, typically one requires to allocate buffers. Although a {@code Connection} provides a way to
- * retrieve the {@link ByteBufAllocator} via the {@code Channel}, allocating buffers from outside the eventloop will
- * lead to buffer bloats as the allocators will typically use thread-local buffer pools.
- *
- * This transformer is always invoked from within the eventloop and hence does not have buffer bloating issues, even
- * when transformations happen outside the eventloop.
- *
- * @param Source type.
- * @param Target type.
- */
-@Beta
-public abstract class AllocatingTransformer {
-
- private final TypeParameterMatcher matcher;
-
- protected AllocatingTransformer() {
- matcher = TypeParameterMatcher.find(this, AllocatingTransformer.class, "T");
- }
-
- /**
- * Asserts whether the passed message can be transformed using this transformer.
- *
- * @param msg Message to transform.
- *
- * @return {@code true} if the message can be transformed.
- */
- protected boolean acceptMessage(Object msg) {
- return matcher.match(msg);
- }
-
- /**
- * Transforms the passed message and adds the output to the returned list.
- *
- * @param toTransform Message to transform.
- * @param allocator Allocating for allocating buffers, if required.
- *
- * @return Output of the transformation.
- */
- public abstract List transform(T toTransform, ByteBufAllocator allocator);
-
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AppendTransformerEvent.java b/netty-http-rx/src/main/java/io/reactivex/netty/channel/AppendTransformerEvent.java
deleted file mode 100644
index 9cbc39e..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AppendTransformerEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package io.reactivex.netty.channel;
-
-/**
- * An event to register a custom transformer of data written on a channel.
- *
- * @param Source type for the transformer.
- * @param Target type for the transformer.
- */
-public final class AppendTransformerEvent {
-
- private final AllocatingTransformer transformer;
-
- public AppendTransformerEvent(AllocatingTransformer transformer) {
- if (null == transformer) {
- throw new NullPointerException("Transformer can not be null.");
- }
- this.transformer = transformer;
- }
-
- public AllocatingTransformer getTransformer() {
- return transformer;
- }
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AutoReleaseOperator.java b/netty-http-rx/src/main/java/io/reactivex/netty/channel/AutoReleaseOperator.java
deleted file mode 100644
index dcf977f..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/channel/AutoReleaseOperator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2015 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package io.reactivex.netty.channel;
-
-import io.netty.util.ReferenceCountUtil;
-import rx.Observable.Operator;
-import rx.Subscriber;
-
-class AutoReleaseOperator implements Operator {
-
- @Override
- public Subscriber super T> call(final Subscriber super T> subscriber) {
- return new Subscriber(subscriber) {
- @Override
- public void onCompleted() {
- subscriber.onCompleted();
- }
-
- @Override
- public void onError(Throwable e) {
- subscriber.onError(e);
- }
-
- @Override
- public void onNext(T t) {
- try {
- subscriber.onNext(t);
- } finally {
- ReferenceCountUtil.release(t);
- }
- }
- };
- }
-}
diff --git a/netty-http-rx/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java b/netty-http-rx/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java
deleted file mode 100644
index 6b7a53a..0000000
--- a/netty-http-rx/src/main/java/io/reactivex/netty/channel/BackpressureManagingHandler.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/*
- * Copyright 2015 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package io.reactivex.netty.channel;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.internal.RecyclableArrayList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import rx.Observable;
-import rx.Scheduler;
-import rx.Subscriber;
-import rx.functions.Action0;
-import rx.schedulers.Schedulers;
-import rx.subscriptions.Subscriptions;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public abstract class BackpressureManagingHandler extends ChannelDuplexHandler {
-
- private static final Logger logger = Logger.getLogger(BackpressureManagingHandler.class.getName());
-
- /*Visible for testing*/ enum State {
- ReadRequested,
- Reading,
- Buffering,
- DrainingBuffer,
- Stopped,
- }
-
- private RecyclableArrayList buffer;
- private int currentBufferIndex;
- private State currentState = State.Buffering; /*Buffer unless explicitly asked to read*/
- private boolean continueDraining;
- private final BytesWriteInterceptor bytesWriteInterceptor;
-
- protected BackpressureManagingHandler(String thisHandlerName) {
- bytesWriteInterceptor = new BytesWriteInterceptor(thisHandlerName);
- }
-
- @SuppressWarnings("fallthrough")
- @Override
- public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- if (State.Stopped != currentState && !shouldReadMore(ctx)) {
- currentState = State.Buffering;
- }
-
- switch (currentState) {
- case ReadRequested:
- currentState = State.Reading;
- case Reading:
- newMessage(ctx, msg);
- break;
- case Buffering:
- case DrainingBuffer:
- if (null == buffer) {
- buffer = RecyclableArrayList.newInstance();
- }
- buffer.add(msg);
- break;
- case Stopped:
- logger.log(Level.WARNING, "Message read after handler removed, discarding the same. Message class: "
- + msg.getClass().getName());
- ReferenceCountUtil.release(msg);
- break;
- }
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- ctx.pipeline().addFirst(bytesWriteInterceptor);
- currentState = State.Buffering;
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- /*On shut down, all the handlers are removed from the pipeline, so we don't need to explicitly remove the
- additional handlers added in handlerAdded()*/
- currentState = State.Stopped;
- if (null != buffer) {
- if (!buffer.isEmpty()) {
- for (Object item : buffer) {
- ReferenceCountUtil.release(item);
- }
- }
- buffer.recycle();
- buffer = null;
- }
- }
-
- @Override
- public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-
- switch (currentState) {
- case ReadRequested:
- /*Nothing read from the last request, forward to read() and let it take the decision on what to do.*/
- break;
- case Reading:
- /*
- * After read completion, move to Buffering, unless an explicit read is issued, which moves to an
- * appropriate state.
- */
- currentState = State.Buffering;
- break;
- case Buffering:
- /*Keep buffering, unless the buffer drains and more items are requested*/
- break;
- case DrainingBuffer:
- /*Keep draining, unless the buffer drains and more items are requested*/
- break;
- case Stopped:
- break;
- }
-
- ctx.fireChannelReadComplete();
-
- if (!ctx.channel().config().isAutoRead() && shouldReadMore(ctx)) {
- read(ctx);
- }
- }
-
- @Override
- public final void read(ChannelHandlerContext ctx) throws Exception {
- switch (currentState) {
- case ReadRequested:
- /*Nothing read since last request, but requested more, so push the demand upstream.*/
- ctx.read();
- break;
- case Reading:
- /*
- * We are already reading data and the read has not completed as that would move the state to buffering.
- * So, ignore this read, or otherwise, read is requested on the channel, unnecessarily.
- */
- break;
- case Buffering:
- /*
- * We were buffering and now a read was requested, so start draining the buffer.
- */
- currentState = State.DrainingBuffer;
- continueDraining = true;
- /*
- * Looping here to drain, instead of having it done via readComplete -> read -> readComplete loop to reduce
- * call stack depth. Otherwise, the stackdepth is proportional to number of items in the buffer and hence
- * for large buffers will overflow stack.
- */
- while (continueDraining && null != buffer && currentBufferIndex < buffer.size()) {
- Object nextItem = buffer.get(currentBufferIndex++);
- newMessage(ctx, nextItem); /*Send the next message.*/
- /*
- * If there is more read demand then that should come as part of read complete or later as another
- * read (this method) invocation. */
- continueDraining = false;
- channelReadComplete(ctx);
- }
-
- if (continueDraining) {
- if (null != buffer) {
- /*Outstanding read demand and buffer is empty, so recycle the buffer and pass the read upstream.*/
- recycleBuffer();
- }
- /*
- * Since, continueDraining is true and we have broken out of the drain loop, it means that there are no
- * items in the buffer and there is more read demand. Switch to read requested and send the read demand
- * downstream.
- */
- currentState = State.ReadRequested;
- ctx.read();
- } else {
- /*
- * There is no more demand, so set the state to buffering and so another read invocation can start
- * draining.
- */
- currentState = State.Buffering;
- /*If buffer is empty, then recycle.*/
- if (null != buffer && currentBufferIndex >= buffer.size()) {
- recycleBuffer();
- }
- }
- break;
- case DrainingBuffer:
- /*Already draining buffer, so break the call stack and let the caller keep draining.*/
- continueDraining = true;
- break;
- case Stopped:
- /*Invalid, pass it downstream.*/
- ctx.read();
- break;
- }
- }
-
- /**
- * Intercepts a write on the channel. The following message types are handled:
- *
- *
-
String: If the pipeline is not configured to write a String, this converts the string to a {@link io.netty.buffer.ByteBuf} and
- then writes it on the channel.
-
byte[]: If the pipeline is not configured to write a byte[], this converts the byte[] to a {@link io.netty.buffer.ByteBuf} and
- then writes it on the channel.
-
Observable: Subscribes to the {@link Observable} and writes all items, requesting the next item if and only if
- the channel is writable as indicated by {@link Channel#isWritable()}
-
- *
- * @param ctx Channel handler context.
- * @param msg Message to write.
- * @param promise Promise for the completion of write.
- *
- * @throws Exception If there is an error handling this write.
- */
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof Observable) {
- @SuppressWarnings("rawtypes")
- Observable observable = (Observable) msg; /*One can write heterogeneous objects on a channel.*/
- final WriteStreamSubscriber subscriber = bytesWriteInterceptor.newSubscriber(ctx, promise);
- subscriber.subscribeTo(observable);
- } else {
- ctx.write(msg, promise);
- }
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof RequestReadIfRequiredEvent) {
- RequestReadIfRequiredEvent requestReadIfRequiredEvent = (RequestReadIfRequiredEvent) evt;
- if (requestReadIfRequiredEvent.shouldReadMore(ctx)) {
- read(ctx);
- }
- }
-
- super.userEventTriggered(ctx, evt);
- }
-
- protected abstract void newMessage(ChannelHandlerContext ctx, Object msg);
-
- protected abstract boolean shouldReadMore(ChannelHandlerContext ctx);
-
- /*Visible for testing*/ RecyclableArrayList getBuffer() {
- return buffer;
- }
-
- /*Visible for testing*/ int getCurrentBufferIndex() {
- return currentBufferIndex;
- }
-
- /*Visible for testing*/ State getCurrentState() {
- return currentState;
- }
-
- private void recycleBuffer() {
- buffer.recycle();
- currentBufferIndex = 0;
- buffer = null;
- }
-
- protected static abstract class RequestReadIfRequiredEvent {
-
- protected abstract boolean shouldReadMore(ChannelHandlerContext ctx);
- }
-
- /**
- * This handler inspects write to see if a write made it to {@link BytesWriteInterceptor} inline with a write call.
- * The reasons why a write would not make it to the channel, would be:
- *
-
If there is a handler in the pipeline that runs in a different group.
-
If there is a handler that collects many items to produce a single item.
-
- *
- * When a write did not reach the {@link BytesWriteInterceptor}, no request for more items will be generated and
- * we could get into a deadlock where a handler is waiting for more items (collect case) but no more items arrive as
- * no more request is generated. In order to avoid this deadlock, this handler will detect the situation and
- * trigger more request in this case.
- *
- * Why a separate handler?
- *
- * This needs to be different than {@link BytesWriteInterceptor} as we need it immediately after
- * {@link BackpressureManagingHandler} so that no other handler eats a write and {@link BytesWriteInterceptor} is
- * always the first handler in the pipeline to be right before the channel and hence maintain proper demand.
- */
- static final class WriteInspector extends ChannelDuplexHandler {
-
- private final BytesWriteInterceptor bytesWriteInterceptor;
-
- WriteInspector(BytesWriteInterceptor bytesWriteInterceptor) {
- this.bytesWriteInterceptor = bytesWriteInterceptor;
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- /*Both these handlers always run in the same executor, so it's safe to access this variable.*/
- bytesWriteInterceptor.messageReceived = false; /*reset flag for this write*/
- ctx.write(msg, promise);
- if (!bytesWriteInterceptor.messageReceived) {
- bytesWriteInterceptor.requestMoreIfWritable(ctx.channel());
- }
- }
- }
-
- /**
- * Regulates write->request more->write process on the channel.
- *
- * Why is this a separate handler?
- * The sole purpose of this handler is to request more items from each of the Observable streams producing items to
- * write. It is important to request more items only when the current item is written on the channel i.e. added to
- * the ChannelOutboundBuffer. If we request more from outside the pipeline (from WriteStreamSubscriber.onNext())
- * then it may so happen that the onNext is not from within this eventloop and hence instead of being written to
- * the channel, is added to the task queue of the EventLoop. Requesting more items in such a case, would mean we
- * keep adding the writes to the eventloop queue and not on the channel buffer. This would mean that the channel
- * writability would not truly indicate the buffer.
- */
- /*Visible for testing*/ static final class BytesWriteInterceptor extends ChannelDuplexHandler implements Runnable {
-
- /*Visible for testing*/ static final String WRITE_INSPECTOR_HANDLER_NAME = "write-inspector";
- /*Visible for testing*/ static final int MAX_PER_SUBSCRIBER_REQUEST = 64;
-
- /*
- * Since, unsubscribes can happen on a different thread, this has to be thread-safe.
- */
- private final ConcurrentLinkedQueue subscribers = new ConcurrentLinkedQueue<>();
- private final String parentHandlerName;
-
- /* This should always be access from the eventloop and can be used to manage state before and after a write to
- * see if a write started from {@link WriteInspector} made it to this handler.
- */
- private boolean messageReceived;
-
- /**
- * The intent here is to equally divide the request to all subscribers but do not put a hard-bound on whether
- * the subscribers are actually adhering to the limit (by not throwing MissingBackpressureException). This keeps
- * the request distribution simple and still give opprotunities for subscribers to optimize (increase the limit)
- * if there is a signal that the consumption is slower than the producer.
- *
- * Worst case of this scheme is request-1 per subscriber which happens when there are as many subscribers as
- * the max limit.
- */
- private int perSubscriberMaxRequest = MAX_PER_SUBSCRIBER_REQUEST;
- private Channel channel;
- private boolean removeTaskScheduled; // Guarded by this
-
- BytesWriteInterceptor(String parentHandlerName) {
- this.parentHandlerName = parentHandlerName;
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
- ctx.write(msg, promise);
- messageReceived = true;
- requestMoreIfWritable(ctx.channel());
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- channel = ctx.channel();
- WriteInspector writeInspector = new WriteInspector(this);
- ChannelHandler parent = ctx.pipeline().get(parentHandlerName);
- if (null != parent) {
- ctx.pipeline().addBefore(parentHandlerName, WRITE_INSPECTOR_HANDLER_NAME, writeInspector);
- }
- }
-
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- if (ctx.channel().isWritable()) {
- requestMoreIfWritable(ctx.channel());
- }
- super.channelWritabilityChanged(ctx);
- }
-
- public WriteStreamSubscriber newSubscriber(final ChannelHandlerContext ctx, ChannelPromise promise) {
- int currentSubCount = subscribers.size();
- recalculateMaxPerSubscriber(currentSubCount, currentSubCount + 1);
-
- final WriteStreamSubscriber sub = new WriteStreamSubscriber(ctx, promise, perSubscriberMaxRequest);
- sub.add(Subscriptions.create(new Action0() {
- @Override
- public void call() {
- boolean _schedule;
- /*Schedule the task once as the task runs through and removes all unsubscribed subscribers*/
- synchronized (BytesWriteInterceptor.this) {
- _schedule = !removeTaskScheduled;
- removeTaskScheduled = true;
- }
- if (_schedule) {
- ctx.channel().eventLoop().execute(BytesWriteInterceptor.this);
- }
- }
- }));
-
- subscribers.add(sub);
- return sub;
- }
-
- /*Visible for testing*/List getSubscribers() {
- return Collections.unmodifiableList(new ArrayList<>(subscribers));
- }
-
- private void requestMoreIfWritable(Channel channel) {
- assert channel.eventLoop().inEventLoop();
-
- for (WriteStreamSubscriber subscriber: subscribers) {
- if (!subscriber.isUnsubscribed() && channel.isWritable()) {
- subscriber.requestMoreIfNeeded(perSubscriberMaxRequest);
- }
- }
- }
-
- @Override
- public void run() {
- synchronized (this) {
- removeTaskScheduled = false;
- }
- int oldSubCount = subscribers.size();
- for (Iterator iterator = subscribers.iterator(); iterator.hasNext(); ) {
- WriteStreamSubscriber subscriber = iterator.next();
- if (subscriber.isUnsubscribed()) {
- iterator.remove();
- }
- }
- int newSubCount = subscribers.size();
- recalculateMaxPerSubscriber(oldSubCount, newSubCount);
- }
-
- /**
- * Called from within the eventloop, whenever the subscriber queue is modified. This modifies the per subscriber
- * request limit by equally distributing the demand. Minimum demand to any subscriber is 1.
- */
- private void recalculateMaxPerSubscriber(int oldSubCount, int newSubCount) {
- assert channel.eventLoop().inEventLoop();
- perSubscriberMaxRequest = newSubCount == 0 || oldSubCount == 0
- ? MAX_PER_SUBSCRIBER_REQUEST
- : perSubscriberMaxRequest * oldSubCount / newSubCount;
-
- perSubscriberMaxRequest = Math.max(1, perSubscriberMaxRequest);
-
- if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "Channel " + channel +
- " modifying per subscriber max request. Old subscribers count " + oldSubCount +
- " new subscribers count " + newSubCount +
- " new Value {} " + perSubscriberMaxRequest);
- }
- }
- }
-
- /**
- * Backpressure enabled subscriber to an Observable written on this channel. This connects the promise for writing
- * the Observable to all the promises created per write (per onNext).
- */
- /*Visible for testing*/static class WriteStreamSubscriber extends Subscriber