add reactive streams, add chunked file service

This commit is contained in:
Jörg Prante 2019-06-20 23:05:03 +02:00
parent b62e72a222
commit 71a912d7cd
56 changed files with 3300 additions and 525 deletions

View file

@ -131,12 +131,6 @@ subprojects {
archives javadocJar, sourcesJar
}
if (project.hasProperty('signing.keyId')) {
signing {
sign configurations.archives
}
}
ext {
user = 'jprante'
name = 'netty-http'

View file

@ -1,6 +1,6 @@
group = org.xbib
name = netty-http
version = 4.1.36.3
version = 4.1.36.4
# main packages
netty.version = 4.1.36.Final
@ -12,6 +12,7 @@ xbib-net-url.version = 1.3.2
# server
bouncycastle.version = 1.61
reactivestreams.version = 1.0.2
# server-rest
xbib-guice.version = 4.0.4

View file

@ -19,6 +19,7 @@ public class NettyHttpExtension implements BeforeAllCallback {
//System.setProperty("io.netty.recycler.maxCapacity", Integer.toString(0));
//System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");
LogManager.getLogManager().reset();
@ -26,10 +27,10 @@ public class NettyHttpExtension implements BeforeAllCallback {
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.FINE);
rootLogger.setLevel(level);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.FINE);
h.setLevel(level);
}
}
}

View file

@ -6,5 +6,9 @@ dependencies {
implementation "io.netty:netty-codec-http2:${project.property('netty.version')}"
implementation "org.xbib:net-url:${project.property('xbib-net-url.version')}"
implementation "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
implementation "org.reactivestreams:reactive-streams:${project.property('reactivestreams.version')}"
testImplementation project(":netty-http-client")
testImplementation("org.reactivestreams:reactive-streams-tck:${project.property('reactivestreams.version')}") {
exclude module: 'testng'
}
}

View file

@ -156,10 +156,15 @@ public final class Server {
/**
* Start accepting incoming connections.
* @return the channel future
* @throws IOException if channel future sync is interrupted
*/
public ChannelFuture accept() {
public ChannelFuture accept() throws IOException {
logger.log(Level.INFO, () -> "trying to bind to " + serverConfig.getAddress());
this.channelFuture = bootstrap.bind(serverConfig.getAddress().getInetSocketAddress());
try {
this.channelFuture = bootstrap.bind(serverConfig.getAddress().getInetSocketAddress()).await().sync();
} catch (InterruptedException e) {
throw new IOException(e);
}
logger.log(Level.INFO, () -> ServerName.getServerName() + " ready, listening on " + serverConfig.getAddress());
return channelFuture;
}
@ -179,14 +184,14 @@ public final class Server {
logger.log(level, NetworkUtils::displayNetworkInterfaces);
}
public ServerRequest newRequest() {
/*public ServerRequest newRequest() {
return new HttpServerRequest();
}
}*/
public ServerResponse newResponse(ServerRequest serverRequest) {
/*public ServerResponse newResponse(ServerRequest serverRequest) {
return serverRequest.getNamedServer().getHttpAddress().getVersion().majorVersion() == 1 ?
new HttpServerResponse(serverRequest) : new Http2ServerResponse(serverRequest);
}
}*/
public ServerTransport newTransport(HttpVersion httpVersion) {
return httpVersion.majorVersion() == 1 ? new HttpServerTransport(this) : new Http2ServerTransport(this);
@ -198,7 +203,9 @@ public final class Server {
childEventLoopGroup.shutdownGracefully();
parentEventLoopGroup.shutdownGracefully();
try {
channelFuture.channel().closeFuture().sync();
if (channelFuture != null) {
channelFuture.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
throw new IOException(e);
}

View file

@ -2,26 +2,30 @@ package org.xbib.netty.http.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import org.xbib.net.URL;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public interface ServerRequest {
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
NamedServer getNamedServer();
public interface ServerRequest {
ChannelHandlerContext getChannelHandlerContext();
FullHttpRequest getRequest();
URL getURL();
EndpointInfo getEndpointInfo();
void setContext(List<String> context);
List<String> getContext();
void setPathParameters(Map<String, String> rawParameters);
void addPathParameter(String key, String value) throws IOException;
Map<String, String> getPathParameters();
@ -38,4 +42,60 @@ public interface ServerRequest {
Integer streamId();
Integer requestId();
class EndpointInfo implements Comparable<EndpointInfo> {
private final String path;
private final String method;
private final String contentType;
public EndpointInfo(ServerRequest serverRequest) {
this.path = extractPath(serverRequest.getRequest().uri());
this.method = serverRequest.getRequest().method().name();
this.contentType = serverRequest.getRequest().headers().get(CONTENT_TYPE);
}
public String getPath() {
return path;
}
public String getMethod() {
return method;
}
public String getContentType() {
return contentType;
}
@Override
public String toString() {
return "[EndpointInfo:path=" + path + ",method=" + method + ",contentType=" + contentType + "]";
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(Object o) {
return o instanceof EndpointInfo && toString().equals(o.toString());
}
@Override
public int compareTo(EndpointInfo o) {
return toString().compareTo(o.toString());
}
private static String extractPath(String uri) {
String path = uri;
int pos = uri.lastIndexOf('#');
path = pos >= 0 ? path.substring(0, pos) : path;
pos = uri.lastIndexOf('?');
path = pos >= 0 ? path.substring(0, pos) : path;
return path;
}
}
}

View file

@ -1,9 +1,13 @@
package org.xbib.netty.http.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.AsciiString;
import java.nio.CharBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
/**
@ -13,20 +17,37 @@ public interface ServerResponse {
void setHeader(AsciiString name, String value);
ChannelHandlerContext getChannelHandlerContext();
HttpResponseStatus getLastStatus();
void write(String text);
void writeError(HttpResponseStatus status);
void writeError(HttpResponseStatus status, String text);
void write(HttpResponseStatus status);
void write(HttpResponseStatus status, String contentType, String text);
void write(HttpResponseStatus status, String contentType, String text, Charset charset);
void write(HttpResponseStatus status, String contentType, ByteBuf byteBuf);
void write(HttpResponseStatus status, String contentType, ReadableByteChannel byteChannel);
static void write(ServerResponse serverResponse, HttpResponseStatus status) {
write(serverResponse, status, status.reasonPhrase());
}
static void write(ServerResponse serverResponse, String text) {
write(serverResponse, HttpResponseStatus.OK, text);
}
static void write(ServerResponse serverResponse, HttpResponseStatus status, String text) {
write(serverResponse, status, "text/plain; charset=utf-8", text);
}
static void write(ServerResponse serverResponse,
HttpResponseStatus status, String contentType, String text) {
serverResponse.write(status, contentType,
ByteBufUtil.writeUtf8(serverResponse.getChannelHandlerContext().alloc(), text));
}
static void write(ServerResponse serverResponse,
HttpResponseStatus status, String contentType, String text, Charset charset) {
serverResponse.write(status, contentType,
ByteBufUtil.encodeString(serverResponse.getChannelHandlerContext().alloc(),
CharBuffer.allocate(text.length()).append(text), charset));
}
}

View file

@ -2,6 +2,7 @@ package org.xbib.netty.http.server.endpoint;
import org.xbib.net.QueryParameters;
import org.xbib.net.path.PathMatcher;
import org.xbib.net.path.PathNormalizer;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.service.Service;
@ -10,17 +11,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
public class Endpoint {
private static final PathMatcher pathMatcher = new PathMatcher();
public static final List<String> DEFAULT_METHODS = Arrays.asList("GET", "HEAD");
public static final List<String> DEFAULT_METHODS = Arrays.asList("GET", "HEAD");
private final String prefix;
@ -34,9 +31,8 @@ public class Endpoint {
private Endpoint(String prefix, String path,
List<String> methods, List<String> contentTypes, List<Service> filters) {
this.prefix = prefix;
this.path = path == null || path.isEmpty() ?
prefix + "/**" : path.startsWith("/") ? prefix + path : prefix + "/" + path;
this.prefix = PathNormalizer.normalize(prefix);
this.path = PathNormalizer.normalize(path);
this.methods = methods;
this.contentTypes = contentTypes;
this.filters = filters;
@ -63,21 +59,19 @@ public class Endpoint {
return path;
}
public boolean matches(EndpointInfo info) {
return pathMatcher.match(path, info.path) &&
(methods == null || methods.isEmpty() || (methods.contains(info.method))) &&
(contentTypes == null || contentTypes.isEmpty() || info.contentType == null ||
contentTypes.stream().anyMatch(info.contentType::startsWith));
public boolean matches(ServerRequest.EndpointInfo info) {
return pathMatcher.match(prefix + path, info.getPath()) &&
(methods == null || methods.isEmpty() || (methods.contains(info.getMethod()))) &&
(contentTypes == null || contentTypes.isEmpty() || info.getContentType() == null ||
contentTypes.stream().anyMatch(info.getContentType()::startsWith));
}
public void resolveUriTemplate(ServerRequest serverRequest) {
if (pathMatcher.match(path, serverRequest.getEffectiveRequestPath())) {
QueryParameters queryParameters = pathMatcher.extractUriTemplateVariables(path, serverRequest.getEffectiveRequestPath());
Map<String, String> map = new LinkedHashMap<>();
public void resolveUriTemplate(ServerRequest serverRequest) throws IOException {
if (pathMatcher.match(prefix + path, serverRequest.getRequest().uri())) {
QueryParameters queryParameters = pathMatcher.extractUriTemplateVariables(prefix + path, serverRequest.getRequest().uri());
for (QueryParameters.Pair<String, String> pair : queryParameters) {
map.put(pair.getFirst(), pair.getSecond());
serverRequest.addPathParameter(pair.getFirst(), pair.getSecond());
}
serverRequest.setPathParameters(map);
}
}
@ -93,42 +87,7 @@ public class Endpoint {
@Override
public String toString() {
return path + "_" + methods + "_" + contentTypes + " --> " + filters;
}
public static class EndpointInfo implements Comparable<EndpointInfo> {
private final String path;
private final String method;
private final String contentType;
public EndpointInfo(ServerRequest serverRequest) {
this.path = serverRequest.getEffectiveRequestPath();
this.method = serverRequest.getRequest().method().name();
this.contentType = serverRequest.getRequest().headers().get(CONTENT_TYPE);
}
@Override
public String toString() {
return path + "_" + method + "_" + contentType;
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(Object o) {
return o instanceof EndpointInfo && toString().equals(o.toString());
}
@Override
public int compareTo(EndpointInfo o) {
return toString().compareTo(o.toString());
}
return "Endpoint[prefix=" + prefix + ",path=" + path + ",methods=" + methods + ",contentTypes=" + contentTypes + " --> " + filters +"]";
}
static class EndpointPathComparator implements Comparator<Endpoint> {

View file

@ -12,17 +12,21 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
public class EndpointResolver {
private static final Logger logger = Logger.getLogger(EndpointResolver.class.getName());
private final Endpoint defaultEndpoint;
private final List<Endpoint> endpoints;
private final EndpointDispatcher endpointDispatcher;
private final LRUCache<Endpoint.EndpointInfo, List<Endpoint>> cache;
private final LRUCache<ServerRequest.EndpointInfo, List<Endpoint>> cache;
private EndpointResolver(Endpoint defaultEndpoint,
List<Endpoint> endpoints,
@ -35,11 +39,14 @@ public class EndpointResolver {
}
public void resolve(ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
Endpoint.EndpointInfo endpointInfo = new Endpoint.EndpointInfo(serverRequest);
ServerRequest.EndpointInfo endpointInfo = serverRequest.getEndpointInfo();
cache.putIfAbsent(endpointInfo, endpoints.stream()
.filter(endpoint -> endpoint.matches(endpointInfo))
.sorted(new Endpoint.EndpointPathComparator(serverRequest.getEffectiveRequestPath())).collect(Collectors.toList()));
.sorted(new Endpoint.EndpointPathComparator(endpointInfo.getPath())).collect(Collectors.toList()));
List<Endpoint> matchingEndpoints = cache.get(endpointInfo);
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "endpoint info = " + endpointInfo + " matching endpoints = " + matchingEndpoints + " cache size=" + cache.size());
}
if (matchingEndpoints.isEmpty()) {
if (defaultEndpoint != null) {
defaultEndpoint.resolveUriTemplate(serverRequest);
@ -48,7 +55,7 @@ public class EndpointResolver {
endpointDispatcher.dispatch(defaultEndpoint, serverRequest, serverResponse);
}
} else {
serverResponse.write(HttpResponseStatus.NOT_IMPLEMENTED);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_IMPLEMENTED);
}
} else {
for (Endpoint endpoint : matchingEndpoints) {
@ -69,7 +76,7 @@ public class EndpointResolver {
}
}
public LRUCache<Endpoint.EndpointInfo, List<Endpoint>> getCache() {
public LRUCache<ServerRequest.EndpointInfo, List<Endpoint>> getCache() {
return cache;
}
@ -79,7 +86,7 @@ public class EndpointResolver {
.addMethod("GET")
.addMethod("HEAD")
.addFilter((req, resp) -> {
resp.writeError(HttpResponseStatus.NOT_FOUND,"No endpoint configured");
ServerResponse.write(resp, HttpResponseStatus.NOT_FOUND,"No endpoint configured");
}).build();
}
@ -100,7 +107,7 @@ public class EndpointResolver {
}
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() >= cacheSize;
return size() > cacheSize;
}
}
@ -148,8 +155,11 @@ public class EndpointResolver {
*/
public Builder addEndpoint(Endpoint endpoint) {
if (endpoint.getPrefix().equals("/") && prefix != null && !prefix.isEmpty()) {
endpoints.add(Endpoint.builder(endpoint).setPrefix(prefix).build());
Endpoint thisEndpoint = Endpoint.builder(endpoint).setPrefix(prefix).build();
logger.log(Level.FINEST, "adding endpoint = " + thisEndpoint);
endpoints.add(thisEndpoint);
} else {
logger.log(Level.FINEST, "adding endpoint = " + endpoint);
endpoints.add(endpoint);
}
return this;

View file

@ -109,7 +109,7 @@ public class NamedServer {
endpointResolver.resolve(serverRequest, serverResponse);
}
} else {
serverResponse.writeError(HttpResponseStatus.NOT_IMPLEMENTED);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_IMPLEMENTED);
}
}

View file

@ -0,0 +1,51 @@
package org.xbib.netty.http.server.endpoint.service;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.util.MimeTypeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ChunkedFileService implements Service {
private static final Logger logger = Logger.getLogger(ChunkedFileService.class.getName());
private final Path prefix;
public ChunkedFileService(Path prefix) {
this.prefix = prefix;
if (!Files.exists(prefix)) {
throw new IllegalArgumentException("prefix: " + prefix + " (does not exist)");
}
if (!Files.exists(prefix) || !Files.isDirectory(prefix)) {
throw new IllegalArgumentException("prefix: " + prefix + " (not a directory)");
}
}
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) {
String requestPath = serverRequest.getEffectiveRequestPath().substring(1); // always starts with '/'
Path path = prefix.resolve(requestPath);
if (Files.isReadable(path)) {
try (InputStream inputStream = Files.newInputStream(path);
ReadableByteChannel byteChannel = Channels.newChannel(inputStream)) {
String contentType = MimeTypeUtils.guessFromPath(requestPath, false);
serverResponse.write(HttpResponseStatus.OK, contentType, byteChannel);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
} else {
logger.log(Level.WARNING, "failed to access path " + path + " prefix = " + prefix + " requestPath=" + requestPath);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
}
}

View file

@ -0,0 +1,73 @@
package org.xbib.netty.http.server.endpoint.service;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.util.MimeTypeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ClassLoaderService implements Service {
private static final Logger logger = Logger.getLogger(ClassLoaderService.class.getName());
private Class<?> clazz;
private final String prefix;
public ClassLoaderService(Class<?> clazz, String prefix) {
this.clazz = clazz;
this.prefix = prefix;
}
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) {
String requestPath = serverRequest.getEffectiveRequestPath().substring(1);
String contentType = MimeTypeUtils.guessFromPath(requestPath, false);
URL url = clazz.getResource(prefix + "/" + requestPath);
if (url != null) {
if ("file".equals(url.getProtocol())) {
doMappedResource(url, contentType, serverResponse);
} else {
doResource(url, contentType, serverResponse);
}
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
}
private void doMappedResource(URL url, String contentType, ServerResponse serverResponse) {
try {
FileChannel fileChannel = (FileChannel) Files.newByteChannel(Paths.get(url.toURI()));
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
ByteBuf byteBuf = Unpooled.wrappedBuffer(mappedByteBuffer);
serverResponse.write(HttpResponseStatus.OK, contentType, byteBuf);
} catch (URISyntaxException | IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
}
private void doResource(URL url, String contentType, ServerResponse serverResponse) {
try (InputStream inputStream = url.openStream();
ReadableByteChannel byteChannel = Channels.newChannel(inputStream)) {
serverResponse.write(HttpResponseStatus.OK, contentType, byteChannel);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
}
}

View file

@ -1,94 +0,0 @@
package org.xbib.netty.http.server.endpoint.service;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.util.MimeTypeUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ClasspathService implements Service {
private static final Logger logger = Logger.getLogger(ClasspathService.class.getName());
private Class<?> clazz;
private final String prefix;
private final Map<String, String> env;
public ClasspathService(Class<?> clazz, String prefix) {
this.clazz = clazz;
this.prefix = prefix;
this.env = new HashMap<>();
env.put("create", "true");
}
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) {
String requestPath = serverRequest.getEffectiveRequestPath();
String contentType = MimeTypeUtils.guessFromPath(requestPath, false);
URL url = clazz.getResource(prefix + "/" + requestPath);
if (url != null) {
try {
if ("jar".equals(url.getProtocol())) {
doJarResource(url.toURI(), contentType, serverResponse);
} else {
doFileResource(url.toURI(), contentType, serverResponse);
}
} catch (IOException | URISyntaxException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
serverResponse.write(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
} else {
serverResponse.write(HttpResponseStatus.NOT_FOUND);
}
}
private void doFileResource(URI uri, String contentType, ServerResponse serverResponse) {
try {
FileChannel fileChannel = (FileChannel) Files.newByteChannel(Paths.get(uri));
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
ByteBuf byteBuf = Unpooled.wrappedBuffer(mappedByteBuffer);
serverResponse.write(HttpResponseStatus.OK, contentType, byteBuf);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
serverResponse.write(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@SuppressWarnings("try")
private void doJarResource(URI uri, String contentType, ServerResponse serverResponse) throws IOException {
FileSystem zipfs = null;
try {
try {
zipfs = FileSystems.getFileSystem(uri);
} catch (FileSystemNotFoundException e) {
zipfs = FileSystems.newFileSystem(uri, env);
}
ByteBuf byteBuf = Unpooled.wrappedBuffer(Files.readAllBytes(Paths.get(uri)));
serverResponse.write(HttpResponseStatus.OK, contentType, byteBuf);
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
serverResponse.write(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
}

View file

@ -12,23 +12,30 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.logging.Level;
import java.util.logging.Logger;
public class NioService implements Service {
public class MappedFileService implements Service {
private static final Logger logger = Logger.getLogger(MappedFileService.class.getName());
private final Path prefix;
public NioService(Path prefix) {
public MappedFileService(Path prefix) {
this.prefix = prefix;
if (!Files.exists(prefix) || !Files.isDirectory(prefix)) {
throw new IllegalArgumentException("prefix: " + prefix + " (not a directory");
if (!Files.exists(prefix)) {
throw new IllegalArgumentException("prefix: " + prefix + " (does not exist)");
}
if (!Files.isDirectory(prefix)) {
throw new IllegalArgumentException("prefix: " + prefix + " (not a directory)");
}
}
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
String requestPath = serverRequest.getEffectiveRequestPath();
String requestPath = serverRequest.getEffectiveRequestPath().substring(1); // always starts with '/'
Path path = prefix.resolve(requestPath);
if (Files.exists(path) && Files.isReadable(path)) {
if (Files.isReadable(path)) {
try (FileChannel fileChannel = (FileChannel) Files.newByteChannel(path)) {
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
ByteBuf byteBuf = Unpooled.wrappedBuffer(mappedByteBuffer);
@ -36,7 +43,8 @@ public class NioService implements Service {
serverResponse.write(HttpResponseStatus.OK, contentType, byteBuf);
}
} else {
serverResponse.write(HttpResponseStatus.NOT_FOUND);
logger.log(Level.WARNING, "failed to access path " + path + " prefix = " + prefix + " requestPath=" + requestPath);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_FOUND);
}
}
}

View file

@ -1,46 +0,0 @@
package org.xbib.netty.http.server.endpoint.service;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
public class PathReaderService implements Service {
private Path path;
private ByteBufAllocator allocator;
public PathReaderService(Path path, ByteBufAllocator allocator) {
this.path = path;
this.allocator = allocator;
}
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
ByteBuf byteBuf = read(allocator, path.resolve(serverRequest.getEffectiveRequestPath()));
try {
serverResponse.write(HttpResponseStatus.OK, "application/octet-stream", byteBuf);
} finally {
byteBuf.release();
}
}
private static ByteBuf read(ByteBufAllocator allocator, Path path) throws IOException {
try (SeekableByteChannel sbc = Files.newByteChannel(path);
InputStream in = Channels.newInputStream(sbc)) {
int size = Math.toIntExact(sbc.size());
ByteBuf byteBuf = allocator.directBuffer(size, size);
byteBuf.writeBytes(in, size);
return byteBuf;
}
}
}

View file

@ -0,0 +1,21 @@
package org.xbib.netty.http.server.endpoint.service;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import java.io.IOException;
public abstract class ResourceService implements Service {
@Override
public void handle(ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
String resourcePath = getResourcePath(serverRequest);
handleResource(resourcePath, serverRequest, serverResponse);
}
protected abstract void handleResource(String resourcePath, ServerRequest serverRequest, ServerResponse serverResponse) throws IOException;
protected String getResourcePath(ServerRequest serverRequest) {
return serverRequest.getEffectiveRequestPath().substring(1);
}
}

View file

@ -0,0 +1,32 @@
package org.xbib.netty.http.server.endpoint.service;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import java.io.IOException;
import java.net.URL;
public abstract class URLService extends ResourceService {
@Override
protected void handleResource(String resourcePath, ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
URL url = getResourceURL(resourcePath);
if (url != null) {
streamResource(url, serverRequest, serverResponse);
}
}
protected abstract URL getResourceURL(String resourcePath);
protected void streamResource(URL resourceUrl, ServerRequest serverRequest,
ServerResponse serverResponse) throws IOException {
/*long lastModified = resourceUrl.openConnection().getLastModified();
serverResponse.addEtag(serverRequest, lastModified);
if (serverResponse.getLastStatus() == HttpResponseStatus.NOT_MODIFIED) {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_MODIFIED);
} else {
sendResource(resourceUrl, serverRequest, serverResponse);
}*/
}
}

View file

@ -16,6 +16,7 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.DomainNameMapping;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
@ -76,16 +77,18 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
private void configureCleartext(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec(serverConfig.getMaxInitialLineLength(),
serverConfig.getMaxHeadersSize(), serverConfig.getMaxChunkSize()));
pipeline.addLast("http-server-codec",
new HttpServerCodec(serverConfig.getMaxInitialLineLength(),
serverConfig.getMaxHeadersSize(), serverConfig.getMaxChunkSize()));
if (serverConfig.isEnableGzip()) {
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast("http-server-decompressor", new HttpContentDecompressor());
}
HttpObjectAggregator httpObjectAggregator = new HttpObjectAggregator(serverConfig.getMaxContentLength(),
false);
httpObjectAggregator.setMaxCumulationBufferComponents(serverConfig.getMaxCompositeBufferComponents());
pipeline.addLast(httpObjectAggregator);
pipeline.addLast(new HttpPipeliningHandler(1024));
pipeline.addLast("http-server-aggregator", httpObjectAggregator);
pipeline.addLast("http-server-pipelining", new HttpPipeliningHandler(1024));
pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler());
pipeline.addLast(httpHandler);
}

View file

@ -21,6 +21,7 @@ import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AsciiString;
import io.netty.util.DomainNameMapping;
import org.xbib.netty.http.common.HttpAddress;
@ -83,12 +84,14 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> {
protected void initChannel(Channel channel) {
ServerTransport serverTransport = server.newTransport(httpAddress.getVersion());
channel.attr(ServerTransport.TRANSPORT_ATTRIBUTE_KEY).set(serverTransport);
ChannelPipeline p = channel.pipeline();
p.addLast("multiplex-server-frame-converter",
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("multiplex-server-frame-converter",
new Http2StreamFrameToHttpObjectCodec(true));
p.addLast("multiplex-server-chunk-aggregator",
pipeline.addLast("multiplex-server-chunk-aggregator",
new HttpObjectAggregator(serverConfig.getMaxContentLength()));
p.addLast("multiplex-server-request-handler",
pipeline.addLast("multiplex-server-chunked-write",
new ChunkedWriteHandler());
pipeline.addLast("multiplex-server-request-handler",
new ServerRequestHandler());
}
})

View file

@ -0,0 +1,34 @@
package org.xbib.netty.http.server.reactive;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* A cancelled subscriber.
*/
public final class CancelledSubscriber<T> implements Subscriber<T> {
@Override
public void onSubscribe(Subscription subscription) {
if (subscription == null) {
throw new NullPointerException("Null subscription");
} else {
subscription.cancel();
}
}
@Override
public void onNext(T t) {
}
@Override
public void onError(Throwable error) {
if (error == null) {
throw new NullPointerException("Null error published");
}
}
@Override
public void onComplete() {
}
}

View file

@ -0,0 +1,58 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpVersion;
class DelegateHttpMessage implements HttpMessage {
protected final HttpMessage message;
public DelegateHttpMessage(HttpMessage message) {
this.message = message;
}
@Override
@Deprecated
public HttpVersion getProtocolVersion() {
return message.protocolVersion();
}
@Override
public HttpVersion protocolVersion() {
return message.protocolVersion();
}
@Override
public HttpMessage setProtocolVersion(HttpVersion version) {
message.setProtocolVersion(version);
return this;
}
@Override
public HttpHeaders headers() {
return message.headers();
}
@Override
@Deprecated
public DecoderResult getDecoderResult() {
return message.decoderResult();
}
@Override
public DecoderResult decoderResult() {
return message.decoderResult();
}
@Override
public void setDecoderResult(DecoderResult result) {
message.setDecoderResult(result);
}
@Override
public String toString() {
return this.getClass().getName() + "(" + message.toString() + ")";
}
}

View file

@ -0,0 +1,55 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
class DelegateHttpRequest extends DelegateHttpMessage implements HttpRequest {
protected final HttpRequest request;
public DelegateHttpRequest(HttpRequest request) {
super(request);
this.request = request;
}
@Override
public HttpRequest setMethod(HttpMethod method) {
request.setMethod(method);
return this;
}
@Override
public HttpRequest setUri(String uri) {
request.setUri(uri);
return this;
}
@Override
@Deprecated
public HttpMethod getMethod() {
return request.method();
}
@Override
public HttpMethod method() {
return request.method();
}
@Override
@Deprecated
public String getUri() {
return request.uri();
}
@Override
public String uri() {
return request.uri();
}
@Override
public HttpRequest setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
}

View file

@ -0,0 +1,38 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
class DelegateHttpResponse extends DelegateHttpMessage implements HttpResponse {
protected final HttpResponse response;
public DelegateHttpResponse(HttpResponse response) {
super(response);
this.response = response;
}
@Override
public HttpResponse setStatus(HttpResponseStatus status) {
response.setStatus(status);
return this;
}
@Override
@Deprecated
public HttpResponseStatus getStatus() {
return response.status();
}
@Override
public HttpResponseStatus status() {
return response.status();
}
@Override
public HttpResponse setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
}

View file

@ -0,0 +1,21 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
final class DelegateStreamedHttpRequest extends DelegateHttpRequest implements StreamedHttpRequest {
private final Publisher<HttpContent> stream;
public DelegateStreamedHttpRequest(HttpRequest request, Publisher<HttpContent> stream) {
super(request);
this.stream = stream;
}
@Override
public void subscribe(Subscriber<? super HttpContent> subscriber) {
stream.subscribe(subscriber);
}
}

View file

@ -0,0 +1,21 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
final class DelegateStreamedHttpResponse extends DelegateHttpResponse implements StreamedHttpResponse {
private final Publisher<HttpContent> stream;
public DelegateStreamedHttpResponse(HttpResponse response, Publisher<HttpContent> stream) {
super(response);
this.stream = stream;
}
@Override
public void subscribe(Subscriber<? super HttpContent> subscriber) {
stream.subscribe(subscriber);
}
}

View file

@ -0,0 +1,135 @@
package org.xbib.netty.http.server.reactive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
class EmptyHttpRequest extends DelegateHttpRequest implements FullHttpRequest {
public EmptyHttpRequest(HttpRequest request) {
super(request);
}
@Override
public FullHttpRequest setUri(String uri) {
super.setUri(uri);
return this;
}
@Override
public FullHttpRequest setMethod(HttpMethod method) {
super.setMethod(method);
return this;
}
@Override
public FullHttpRequest setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
@Override
public FullHttpRequest copy() {
if (request instanceof FullHttpRequest) {
return new EmptyHttpRequest(((FullHttpRequest) request).copy());
} else {
DefaultHttpRequest copy = new DefaultHttpRequest(protocolVersion(), method(), uri());
copy.headers().set(headers());
return new EmptyHttpRequest(copy);
}
}
@Override
public FullHttpRequest retain(int increment) {
ReferenceCountUtil.retain(message, increment);
return this;
}
@Override
public FullHttpRequest retain() {
ReferenceCountUtil.retain(message);
return this;
}
@Override
public FullHttpRequest touch() {
if (request instanceof FullHttpRequest) {
return ((FullHttpRequest) request).touch();
} else {
return this;
}
}
@Override
public FullHttpRequest touch(Object o) {
if (request instanceof FullHttpRequest) {
return ((FullHttpRequest) request).touch(o);
} else {
return this;
}
}
@Override
public HttpHeaders trailingHeaders() {
return new DefaultHttpHeaders();
}
@Override
public FullHttpRequest duplicate() {
if (request instanceof FullHttpRequest) {
return ((FullHttpRequest) request).duplicate();
} else {
return this;
}
}
@Override
public FullHttpRequest retainedDuplicate() {
if (request instanceof FullHttpRequest) {
return ((FullHttpRequest) request).retainedDuplicate();
} else {
return this;
}
}
@Override
public FullHttpRequest replace(ByteBuf byteBuf) {
if (message instanceof FullHttpRequest) {
return ((FullHttpRequest) request).replace(byteBuf);
} else {
return this;
}
}
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public int refCnt() {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).refCnt();
} else {
return 1;
}
}
@Override
public boolean release() {
return ReferenceCountUtil.release(message);
}
@Override
public boolean release(int decrement) {
return ReferenceCountUtil.release(message, decrement);
}
}

View file

@ -0,0 +1,129 @@
package org.xbib.netty.http.server.reactive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
class EmptyHttpResponse extends DelegateHttpResponse implements FullHttpResponse {
public EmptyHttpResponse(HttpResponse response) {
super(response);
}
@Override
public FullHttpResponse setStatus(HttpResponseStatus status) {
super.setStatus(status);
return this;
}
@Override
public FullHttpResponse setProtocolVersion(HttpVersion version) {
super.setProtocolVersion(version);
return this;
}
@Override
public FullHttpResponse copy() {
if (response instanceof FullHttpResponse) {
return new EmptyHttpResponse(((FullHttpResponse) response).copy());
} else {
DefaultHttpResponse copy = new DefaultHttpResponse(protocolVersion(), status());
copy.headers().set(headers());
return new EmptyHttpResponse(copy);
}
}
@Override
public FullHttpResponse retain(int increment) {
ReferenceCountUtil.retain(message, increment);
return this;
}
@Override
public FullHttpResponse retain() {
ReferenceCountUtil.retain(message);
return this;
}
@Override
public FullHttpResponse touch() {
if (response instanceof FullHttpResponse) {
return ((FullHttpResponse) response).touch();
} else {
return this;
}
}
@Override
public FullHttpResponse touch(Object o) {
if (response instanceof FullHttpResponse) {
return ((FullHttpResponse) response).touch(o);
} else {
return this;
}
}
@Override
public HttpHeaders trailingHeaders() {
return new DefaultHttpHeaders();
}
@Override
public FullHttpResponse duplicate() {
if (response instanceof FullHttpResponse) {
return ((FullHttpResponse) response).duplicate();
} else {
return this;
}
}
@Override
public FullHttpResponse retainedDuplicate() {
if (response instanceof FullHttpResponse) {
return ((FullHttpResponse) response).retainedDuplicate();
} else {
return this;
}
}
@Override
public FullHttpResponse replace(ByteBuf byteBuf) {
if (response instanceof FullHttpResponse) {
return ((FullHttpResponse) response).replace(byteBuf);
} else {
return this;
}
}
@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}
@Override
public int refCnt() {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).refCnt();
} else {
return 1;
}
}
@Override
public boolean release() {
return ReferenceCountUtil.release(message);
}
@Override
public boolean release(int decrement) {
return ReferenceCountUtil.release(message, decrement);
}
}

View file

@ -0,0 +1,477 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.TypeParameterMatcher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.BUFFERING;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.DEMANDING;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.DONE;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.DRAINING;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.IDLE;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.NO_CONTEXT;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.NO_SUBSCRIBER;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.NO_SUBSCRIBER_ERROR;
import static org.xbib.netty.http.server.reactive.HandlerPublisher.State.NO_SUBSCRIBER_OR_CONTEXT;
/**
* Publisher for a Netty Handler.
*
* This publisher supports only one subscriber.
*
* All interactions with the subscriber are done from the handlers executor, hence, they provide the same happens before
* semantics that Netty provides.
*
* The handler publishes all messages that match the type as specified by the passed in class. Any non matching messages
* are forwarded to the next handler.
*
* The publisher will signal complete if it receives a channel inactive event.
*
* The publisher will release any messages that it drops (for example, messages that are buffered when the subscriber
* cancels), but other than that, it does not release any messages. It is up to the subscriber to release messages.
*
* If the subscriber cancels, the publisher will send a close event up the channel pipeline.
*
* All errors will short circuit the buffer, and cause publisher to immediately call the subscribers onError method,
* dropping the buffer.
*
* The publisher can be subscribed to or placed in a handler chain in any order.
*/
public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publisher<T> {
private final EventExecutor executor;
private final TypeParameterMatcher matcher;
/**
* Create a handler publisher.
*
* The supplied executor must be the same event loop as the event loop that this handler is eventually registered
* with, if not, an exception will be thrown when the handler is registered.
*
* @param executor The executor to execute asynchronous events from the subscriber on.
* @param subscriberMessageType The type of message this publisher accepts.
*/
public HandlerPublisher(EventExecutor executor, Class<? extends T> subscriberMessageType) {
this.executor = executor;
this.matcher = TypeParameterMatcher.get(subscriberMessageType);
}
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* @param msg The message to check.
* @return True if the message should be accepted.
*/
protected boolean acceptInboundMessage(Object msg) {
return matcher.match(msg);
}
/**
* Override to handle when a subscriber cancels the subscription.
*
* By default, this method will simply close the channel.
*/
protected void cancelled() {
ctx.close();
}
/**
* Override to intercept when demand is requested.
*
* By default, a channel read is invoked.
*/
protected void requestDemand() {
ctx.read();
}
enum State {
/**
* Initial state. There's no subscriber, and no context.
*/
NO_SUBSCRIBER_OR_CONTEXT,
/**
* A subscriber has been provided, but no context has been provided.
*/
NO_CONTEXT,
/**
* A context has been provided, but no subscriber has been provided.
*/
NO_SUBSCRIBER,
/**
* An error has been received, but there's no subscriber to receive it.
*/
NO_SUBSCRIBER_ERROR,
/**
* There is no demand, and we have nothing buffered.
*/
IDLE,
/**
* There is no demand, and we're buffering elements.
*/
BUFFERING,
/**
* We have nothing buffered, but there is demand.
*/
DEMANDING,
/**
* The stream is complete, however there are still elements buffered for which no demand has come from the subscriber.
*/
DRAINING,
/**
* We're done, in the terminal state.
*/
DONE
}
private final Queue<Object> buffer = new LinkedList<>();
/**
* Whether a subscriber has been provided. This is used to detect whether two subscribers are subscribing
* simultaneously.
*/
private final AtomicBoolean hasSubscriber = new AtomicBoolean();
private State state = NO_SUBSCRIBER_OR_CONTEXT;
private volatile Subscriber<? super T> subscriber;
private ChannelHandlerContext ctx;
private long outstandingDemand = 0;
private Throwable noSubscriberError;
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException("Null subscriber");
}
if (!hasSubscriber.compareAndSet(false, true)) {
// Must call onSubscribe first.
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
});
subscriber.onError(new IllegalStateException("This publisher only supports one subscriber"));
} else {
executor.execute(new Runnable() {
@Override
public void run() {
provideSubscriber(subscriber);
}
});
}
}
private void provideSubscriber(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
switch (state) {
case NO_SUBSCRIBER_OR_CONTEXT:
state = NO_CONTEXT;
break;
case NO_SUBSCRIBER:
if (buffer.isEmpty()) {
state = IDLE;
} else {
state = BUFFERING;
}
subscriber.onSubscribe(new ChannelSubscription());
break;
case DRAINING:
subscriber.onSubscribe(new ChannelSubscription());
break;
case NO_SUBSCRIBER_ERROR:
cleanup();
state = DONE;
subscriber.onSubscribe(new ChannelSubscription());
subscriber.onError(noSubscriberError);
break;
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// If the channel is not yet registered, then it's not safe to invoke any methods on it, eg read() or close()
// So don't provide the context until it is registered.
if (ctx.channel().isRegistered()) {
provideChannelContext(ctx);
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
provideChannelContext(ctx);
ctx.fireChannelRegistered();
}
private void provideChannelContext(ChannelHandlerContext ctx) {
switch(state) {
case NO_SUBSCRIBER_OR_CONTEXT:
verifyRegisteredWithRightExecutor(ctx);
this.ctx = ctx;
// It's set, we don't have a subscriber
state = NO_SUBSCRIBER;
break;
case NO_CONTEXT:
verifyRegisteredWithRightExecutor(ctx);
this.ctx = ctx;
state = IDLE;
subscriber.onSubscribe(new ChannelSubscription());
break;
default:
// Ignore, this could be invoked twice by both handlerAdded and channelRegistered.
}
}
private void verifyRegisteredWithRightExecutor(ChannelHandlerContext ctx) {
if (!executor.inEventLoop()) {
throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// If we subscribed before the channel was active, then our read would have been ignored.
if (state == DEMANDING) {
requestDemand();
}
ctx.fireChannelActive();
}
private void receivedDemand(long demand) {
switch (state) {
case BUFFERING:
case DRAINING:
if (addDemand(demand)) {
flushBuffer();
}
break;
case DEMANDING:
addDemand(demand);
break;
case IDLE:
if (addDemand(demand)) {
// Important to change state to demanding before doing a read, in case we get a synchronous
// read back.
state = DEMANDING;
requestDemand();
}
break;
default:
}
}
private boolean addDemand(long demand) {
if (demand <= 0) {
illegalDemand();
return false;
} else {
if (outstandingDemand < Long.MAX_VALUE) {
outstandingDemand += demand;
if (outstandingDemand < 0) {
outstandingDemand = Long.MAX_VALUE;
}
}
return true;
}
}
private void illegalDemand() {
cleanup();
subscriber.onError(new IllegalArgumentException("Request for 0 or negative elements in violation of Section 3.9 of the Reactive Streams specification"));
ctx.close();
state = DONE;
}
private void flushBuffer() {
while (!buffer.isEmpty() && outstandingDemand > 0) {
publishMessage(buffer.remove());
}
if (buffer.isEmpty()) {
if (outstandingDemand > 0) {
if (state == BUFFERING) {
state = DEMANDING;
} // otherwise we're draining
requestDemand();
} else if (state == BUFFERING) {
state = IDLE;
}
}
}
private void receivedCancel() {
switch (state) {
case BUFFERING:
case DEMANDING:
case IDLE:
cancelled();
case DRAINING:
state = DONE;
break;
}
cleanup();
subscriber = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
if (acceptInboundMessage(message)) {
switch (state) {
case IDLE:
buffer.add(message);
state = BUFFERING;
break;
case NO_SUBSCRIBER:
case BUFFERING:
buffer.add(message);
break;
case DEMANDING:
publishMessage(message);
break;
case DRAINING:
case DONE:
ReferenceCountUtil.release(message);
break;
case NO_CONTEXT:
case NO_SUBSCRIBER_OR_CONTEXT:
throw new IllegalStateException("Message received before added to the channel context");
}
} else {
ctx.fireChannelRead(message);
}
}
private void publishMessage(Object message) {
if (COMPLETE.equals(message)) {
subscriber.onComplete();
state = DONE;
} else {
@SuppressWarnings("unchecked")
T next = (T) message;
subscriber.onNext(next);
if (outstandingDemand < Long.MAX_VALUE) {
outstandingDemand--;
if (outstandingDemand == 0 && state != DRAINING) {
if (buffer.isEmpty()) {
state = IDLE;
} else {
state = BUFFERING;
}
}
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
if (state == DEMANDING) {
requestDemand();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
complete();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
complete();
}
private void complete() {
switch (state) {
case NO_SUBSCRIBER:
case BUFFERING:
buffer.add(COMPLETE);
state = DRAINING;
break;
case DEMANDING:
case IDLE:
subscriber.onComplete();
state = DONE;
break;
case NO_SUBSCRIBER_ERROR:
// Ignore, we're already going to complete the stream with an error
// when the subscriber subscribes.
break;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
switch (state) {
case NO_SUBSCRIBER:
noSubscriberError = cause;
state = NO_SUBSCRIBER_ERROR;
cleanup();
break;
case BUFFERING:
case DEMANDING:
case IDLE:
case DRAINING:
state = DONE;
cleanup();
subscriber.onError(cause);
break;
}
}
/**
* Release all elements from the buffer.
*/
private void cleanup() {
while (!buffer.isEmpty()) {
ReferenceCountUtil.release(buffer.remove());
}
}
private class ChannelSubscription implements Subscription {
@Override
public void request(final long demand) {
executor.execute(() -> receivedDemand(demand));
}
@Override
public void cancel() {
executor.execute(() -> receivedCancel());
}
}
/**
* Used for buffering a completion signal.
*/
private static final Object COMPLETE = new Object() {
@Override
public String toString() {
return "COMPLETE";
}
};
}

View file

@ -0,0 +1,263 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.CANCELLED;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.COMPLETE;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.INACTIVE;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.NO_CONTEXT;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.NO_SUBSCRIPTION;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.NO_SUBSCRIPTION_OR_CONTEXT;
import static org.xbib.netty.http.server.reactive.HandlerSubscriber.State.RUNNING;
/**
* Subscriber that publishes received messages to the handler pipeline.
*/
public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscriber<T> {
private static final long DEFAULT_LOW_WATERMARK = 4;
private static final long DEFAULT_HIGH_WATERMARK = 16;
/**
* Create a new handler subscriber.
*
* The supplied executor must be the same event loop as the event loop that this handler is eventually registered
* with, if not, an exception will be thrown when the handler is registered.
*
* @param executor The executor to execute asynchronous events from the publisher on.
* @param demandLowWatermark The low watermark for demand. When demand drops below this, more will be requested.
* @param demandHighWatermark The high watermark for demand. This is the maximum that will be requested.
*/
public HandlerSubscriber(EventExecutor executor, long demandLowWatermark, long demandHighWatermark) {
this.executor = executor;
this.demandLowWatermark = demandLowWatermark;
this.demandHighWatermark = demandHighWatermark;
}
/**
* Create a new handler subscriber with the default low and high watermarks.
*
* The supplied executor must be the same event loop as the event loop that this handler is eventually registered
* with, if not, an exception will be thrown when the handler is registered.
*
* @param executor The executor to execute asynchronous events from the publisher on.
* @see #HandlerSubscriber(EventExecutor, long, long)
*/
public HandlerSubscriber(EventExecutor executor) {
this(executor, DEFAULT_LOW_WATERMARK, DEFAULT_HIGH_WATERMARK);
}
/**
* Override for custom error handling. By default, it closes the channel.
*
* @param error The error to handle.
*/
protected void error(Throwable error) {
doClose();
}
/**
* Override for custom completion handling. By default, it closes the channel.
*/
protected void complete() {
doClose();
}
private final EventExecutor executor;
private final long demandLowWatermark;
private final long demandHighWatermark;
enum State {
NO_SUBSCRIPTION_OR_CONTEXT,
NO_SUBSCRIPTION,
NO_CONTEXT,
INACTIVE,
RUNNING,
CANCELLED,
COMPLETE
}
private final AtomicBoolean hasSubscription = new AtomicBoolean();
private volatile Subscription subscription;
private volatile ChannelHandlerContext ctx;
private State state = NO_SUBSCRIPTION_OR_CONTEXT;
private long outstandingDemand = 0;
private ChannelFuture lastWriteFuture;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
verifyRegisteredWithRightExecutor(ctx);
switch (state) {
case NO_SUBSCRIPTION_OR_CONTEXT:
this.ctx = ctx;
// We were in no subscription or context, now we just don't have a subscription.
state = NO_SUBSCRIPTION;
break;
case NO_CONTEXT:
this.ctx = ctx;
// We were in no context, we're now fully initialised
maybeStart();
break;
case COMPLETE:
// We are complete, close
state = COMPLETE;
ctx.close();
break;
default:
throw new IllegalStateException("This handler must only be added to a pipeline once " + state);
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
verifyRegisteredWithRightExecutor(ctx);
ctx.fireChannelRegistered();
}
private void verifyRegisteredWithRightExecutor(ChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !executor.inEventLoop()) {
throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
maybeRequestMore();
ctx.fireChannelWritabilityChanged();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (state == INACTIVE) {
state = RUNNING;
maybeRequestMore();
}
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
cancel();
ctx.fireChannelInactive();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
cancel();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cancel();
ctx.fireExceptionCaught(cause);
}
private void cancel() {
switch (state) {
case NO_SUBSCRIPTION:
state = CANCELLED;
break;
case RUNNING:
case INACTIVE:
subscription.cancel();
state = CANCELLED;
break;
}
}
@Override
public void onSubscribe(final Subscription subscription) {
if (subscription == null) {
throw new NullPointerException("Null subscription");
} else if (!hasSubscription.compareAndSet(false, true)) {
subscription.cancel();
} else {
this.subscription = subscription;
executor.execute(this::provideSubscription);
}
}
private void provideSubscription() {
switch (state) {
case NO_SUBSCRIPTION_OR_CONTEXT:
state = NO_CONTEXT;
break;
case NO_SUBSCRIPTION:
maybeStart();
break;
case CANCELLED:
subscription.cancel();
break;
}
}
private void maybeStart() {
if (ctx.channel().isActive()) {
state = RUNNING;
maybeRequestMore();
} else {
state = INACTIVE;
}
}
@Override
public void onNext(T t) {
lastWriteFuture = ctx.writeAndFlush(t);
lastWriteFuture.addListener((ChannelFutureListener) future -> {
outstandingDemand--;
maybeRequestMore();
});
}
@Override
public void onError(final Throwable error) {
if (error == null) {
throw new NullPointerException("Null error published");
}
error(error);
}
@Override
public void onComplete() {
if (lastWriteFuture == null) {
complete();
} else {
lastWriteFuture.addListener((ChannelFutureListener) channelFuture -> complete());
}
}
private void doClose() {
executor.execute(() -> {
switch (state) {
case NO_SUBSCRIPTION:
case INACTIVE:
case RUNNING:
ctx.close();
state = COMPLETE;
break;
}
});
}
private void maybeRequestMore() {
if (outstandingDemand <= demandLowWatermark && ctx.channel().isWritable()) {
long toRequest = demandHighWatermark - outstandingDemand;
outstandingDemand = demandHighWatermark;
subscription.request(toRequest);
}
}
}

View file

@ -0,0 +1,157 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* Handler that converts written {@link StreamedHttpRequest} messages into {@link HttpRequest} messages
* followed by {@link HttpContent} messages and reads {@link HttpResponse} messages followed by
* {@link HttpContent} messages and produces {@link StreamedHttpResponse} messages.
*
* This allows request and response bodies to be handled using reactive streams.
*
* There are two types of messages that this handler accepts for writing, {@link StreamedHttpRequest} and
* {@link FullHttpRequest}. Writing any other messages may potentially lead to HTTP message mangling.
*
* There are two types of messages that this handler will send down the chain, {@link StreamedHttpResponse},
* and {@link FullHttpResponse}. If {@link io.netty.channel.ChannelOption#AUTO_READ} is false for the channel,
* then any {@link StreamedHttpResponse} messages <em>must</em> be subscribed to consume the body, otherwise
* it's possible that no read will be done of the messages.
*
* As long as messages are returned in the order that they arrive, this handler implicitly supports HTTP
* pipelining.
*/
public class HttpStreamsClientHandler extends HttpStreamsHandler<HttpResponse, HttpRequest> {
private int inFlight = 0;
private int withServer = 0;
private ChannelPromise closeOnZeroInFlight = null;
private Subscriber<HttpContent> awaiting100Continue;
private StreamedHttpMessage awaiting100ContinueMessage;
private boolean ignoreResponseBody = false;
public HttpStreamsClientHandler() {
super(HttpResponse.class, HttpRequest.class);
}
@Override
protected boolean hasBody(HttpResponse response) {
if (response.status().code() >= 100 && response.status().code() < 200) {
return false;
}
if (response.status().equals(HttpResponseStatus.NO_CONTENT) ||
response.status().equals(HttpResponseStatus.NOT_MODIFIED)) {
return false;
}
if (HttpUtil.isTransferEncodingChunked(response)) {
return true;
}
if (HttpUtil.isContentLengthSet(response)) {
return HttpUtil.getContentLength(response) > 0;
}
return true;
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (inFlight == 0) {
ctx.close(future);
} else {
closeOnZeroInFlight = future;
}
}
@Override
protected void consumedInMessage(ChannelHandlerContext ctx) {
inFlight--;
withServer--;
if (inFlight == 0 && closeOnZeroInFlight != null) {
ctx.close(closeOnZeroInFlight);
}
}
@Override
protected void receivedOutMessage(ChannelHandlerContext ctx) {
inFlight++;
}
@Override
protected void sentOutMessage(ChannelHandlerContext ctx) {
withServer++;
}
@Override
protected HttpResponse createEmptyMessage(HttpResponse response) {
return new EmptyHttpResponse(response);
}
@Override
protected HttpResponse createStreamedMessage(HttpResponse response, Publisher<HttpContent> stream) {
return new DelegateStreamedHttpResponse(response, stream);
}
@Override
protected void subscribeSubscriberToStream(StreamedHttpMessage msg, Subscriber<HttpContent> subscriber) {
if (HttpUtil.is100ContinueExpected(msg)) {
awaiting100Continue = subscriber;
awaiting100ContinueMessage = msg;
} else {
super.subscribeSubscriberToStream(msg, subscriber);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse && awaiting100Continue != null && withServer == 0) {
HttpResponse response = (HttpResponse) msg;
if (response.status().equals(HttpResponseStatus.CONTINUE)) {
super.subscribeSubscriberToStream(awaiting100ContinueMessage, awaiting100Continue);
awaiting100Continue = null;
awaiting100ContinueMessage = null;
if (msg instanceof FullHttpResponse) {
ReferenceCountUtil.release(msg);
} else {
ignoreResponseBody = true;
}
} else {
awaiting100ContinueMessage.subscribe(new CancelledSubscriber<HttpContent>());
awaiting100ContinueMessage = null;
awaiting100Continue.onSubscribe(new Subscription() {
public void request(long n) {
}
public void cancel() {
}
});
awaiting100Continue.onComplete();
awaiting100Continue = null;
super.channelRead(ctx, msg);
}
} else if (ignoreResponseBody && msg instanceof HttpContent) {
ReferenceCountUtil.release(msg);
if (msg instanceof LastHttpContent) {
ignoreResponseBody = false;
}
} else {
super.channelRead(ctx, msg);
}
}
}

View file

@ -0,0 +1,377 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.LinkedList;
import java.util.Queue;
abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessage> extends ChannelDuplexHandler {
private final Queue<Outgoing> outgoing = new LinkedList<>();
private final Class<In> inClass;
private final Class<Out> outClass;
public HttpStreamsHandler(Class<In> inClass, Class<Out> outClass) {
this.inClass = inClass;
this.outClass = outClass;
}
/**
* The incoming message that is currently being streamed out to a subscriber.
*
* This is tracked so that if its subscriber cancels, we can go into a mode where we ignore the rest of the body.
* Since subscribers may cancel as many times as they like, including well after they've received all their content,
* we need to track what the current message that's being streamed out is so that we can ignore it if it's not
* currently being streamed out.
*/
private In currentlyStreamedMessage;
/**
* Ignore the remaining reads for the incoming message.
*
* This is used in conjunction with currentlyStreamedMessage, as well as in situations where we have received the
* full body, but still might be expecting a last http content message.
*/
private boolean ignoreBodyRead;
/**
* Whether a LastHttpContent message needs to be written once the incoming publisher completes.
*
* Since the publisher may itself publish a LastHttpContent message, we need to track this fact, because if it
* doesn't, then we need to write one ourselves.
*/
private boolean sendLastHttpContent;
/**
* Whether the given incoming message has a body.
*/
protected abstract boolean hasBody(In in);
/**
* Create an empty incoming message. This must be of type FullHttpMessage, and is invoked when we've determined
* that an incoming message can't have a body, so we send it on as a FullHttpMessage.
* @param in input
* @return incoming message
*/
protected abstract In createEmptyMessage(In in);
/**
* Create a streamed incoming message with the given stream.
* @param in input
* @param stream stream
*/
protected abstract In createStreamedMessage(In in, Publisher<HttpContent> stream);
/**
* Invoked when an incoming message is first received.
*
* Overridden by sub classes for state tracking.
* @param ctx channel handler context
*/
protected void receivedInMessage(ChannelHandlerContext ctx) {
}
/**
* Invoked when an incoming message is fully consumed.
*
* Overridden by sub classes for state tracking.
* @param ctx channel handler context
*/
protected void consumedInMessage(ChannelHandlerContext ctx) {
}
/**
* Invoked when an outgoing message is first received.
*
* Overridden by sub classes for state tracking.
* @param ctx channel handler context
*/
protected void receivedOutMessage(ChannelHandlerContext ctx) {
}
/**
* Invoked when an outgoing message is fully sent.
*
* Overridden by sub classes for state tracking.
* @param ctx channel handler context
*/
protected void sentOutMessage(ChannelHandlerContext ctx) {
}
/**
* Subscribe the given subscriber to the given streamed message.
*
* Provided so that the client subclass can intercept this to hold off sending the body of an expect 100 continue
* request.
*/
protected void subscribeSubscriberToStream(StreamedHttpMessage msg, Subscriber<HttpContent> subscriber) {
msg.subscribe(subscriber);
}
/**
* Invoked every time a read of the incoming body is requested by the subscriber.
*
* Provided so that the server subclass can intercept this to send a 100 continue response.
* @param ctx channel handler context
*/
protected void bodyRequested(ChannelHandlerContext ctx) {}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (inClass.isInstance(msg)) {
receivedInMessage(ctx);
final In inMsg = inClass.cast(msg);
if (inMsg instanceof FullHttpMessage) {
// Forward as is
ctx.fireChannelRead(inMsg);
consumedInMessage(ctx);
} else if (!hasBody(inMsg)) {
// Wrap in empty message
ctx.fireChannelRead(createEmptyMessage(inMsg));
consumedInMessage(ctx);
// There will be a LastHttpContent message coming after this, ignore it
ignoreBodyRead = true;
} else {
currentlyStreamedMessage = inMsg;
// It has a body, stream it
HandlerPublisher<HttpContent> publisher = new HandlerPublisher<HttpContent>(ctx.executor(), HttpContent.class) {
@Override
protected void cancelled() {
if (ctx.executor().inEventLoop()) {
handleCancelled(ctx, inMsg);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
handleCancelled(ctx, inMsg);
}
});
}
}
@Override
protected void requestDemand() {
bodyRequested(ctx);
super.requestDemand();
}
};
ctx.channel().pipeline().addAfter(ctx.name(), ctx.name() + "-body-publisher", publisher);
ctx.fireChannelRead(createStreamedMessage(inMsg, publisher));
}
} else if (msg instanceof HttpContent) {
handleReadHttpContent(ctx, (HttpContent) msg);
}
}
private void handleCancelled(ChannelHandlerContext ctx, In msg) {
if (currentlyStreamedMessage == msg) {
ignoreBodyRead = true;
// Need to do a read in case the subscriber ignored a read completed.
ctx.read();
}
}
private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
if (!ignoreBodyRead) {
if (content instanceof LastHttpContent) {
if (content.content().readableBytes() > 0 ||
!((LastHttpContent) content).trailingHeaders().isEmpty()) {
// It has data or trailing headers, send them
ctx.fireChannelRead(content);
} else {
ReferenceCountUtil.release(content);
}
removeHandlerIfActive(ctx, ctx.name() + "-body-publisher");
currentlyStreamedMessage = null;
consumedInMessage(ctx);
} else {
ctx.fireChannelRead(content);
}
} else {
ReferenceCountUtil.release(content);
if (content instanceof LastHttpContent) {
ignoreBodyRead = false;
if (currentlyStreamedMessage != null) {
removeHandlerIfActive(ctx, ctx.name() + "-body-publisher");
}
currentlyStreamedMessage = null;
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (ignoreBodyRead) {
ctx.read();
} else {
ctx.fireChannelReadComplete();
}
}
@Override
public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) throws Exception {
if (outClass.isInstance(msg)) {
Outgoing out = new Outgoing(outClass.cast(msg), promise);
receivedOutMessage(ctx);
if (outgoing.isEmpty()) {
outgoing.add(out);
flushNext(ctx);
} else {
outgoing.add(out);
}
} else if (msg instanceof LastHttpContent) {
sendLastHttpContent = false;
ctx.write(msg, promise);
} else {
ctx.write(msg, promise);
}
}
protected void unbufferedWrite(final ChannelHandlerContext ctx, final Outgoing out) {
if (out.message instanceof FullHttpMessage) {
// Forward as is
ctx.writeAndFlush(out.message, out.promise);
out.promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
sentOutMessage(ctx);
outgoing.remove();
flushNext(ctx);
}
});
}
});
} else if (out.message instanceof StreamedHttpMessage) {
StreamedHttpMessage streamed = (StreamedHttpMessage) out.message;
HandlerSubscriber<HttpContent> subscriber = new HandlerSubscriber<HttpContent>(ctx.executor()) {
@Override
protected void error(Throwable error) {
out.promise.tryFailure(error);
ctx.close();
}
@Override
protected void complete() {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
completeBody(ctx);
}
});
}
};
sendLastHttpContent = true;
// DON'T pass the promise through, create a new promise instead.
ctx.writeAndFlush(out.message);
ctx.pipeline().addAfter(ctx.name(), ctx.name() + "-body-subscriber", subscriber);
subscribeSubscriberToStream(streamed, subscriber);
}
}
private void completeBody(final ChannelHandlerContext ctx) {
removeHandlerIfActive(ctx, ctx.name() + "-body-subscriber");
if (sendLastHttpContent) {
ChannelPromise promise = outgoing.peek().promise;
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
outgoing.remove();
sentOutMessage(ctx);
flushNext(ctx);
}
});
}
}
);
} else {
outgoing.remove().promise.setSuccess();
sentOutMessage(ctx);
flushNext(ctx);
}
}
/**
* Most operations we want to do even if the channel is not active, because if it's not, then we want to encounter
* the error that occurs when that operation happens and so that it can be passed up to the user. However, removing
* handlers should only be done if the channel is active, because the error that is encountered when they aren't
* makes no sense to the user (NoSuchElementException).
*/
private void removeHandlerIfActive(ChannelHandlerContext ctx, String name) {
if (ctx.channel().isActive()) {
ctx.pipeline().remove(name);
}
}
private void flushNext(ChannelHandlerContext ctx) {
if (!outgoing.isEmpty()) {
unbufferedWrite(ctx, outgoing.element());
} else {
ctx.fireChannelWritabilityChanged();
}
}
private void executeInEventLoop(ChannelHandlerContext ctx, Runnable runnable) {
if (ctx.executor().inEventLoop()) {
runnable.run();
} else {
ctx.executor().execute(runnable);
}
}
class Outgoing {
final Out message;
final ChannelPromise promise;
public Outgoing(Out message, ChannelPromise promise) {
this.message = message;
this.promise = promise;
}
}
}

View file

@ -0,0 +1,239 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.reactivestreams.Publisher;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
/**
* Handler that reads {@link HttpRequest} messages followed by {@link HttpContent} messages and produces
* {@link StreamedHttpRequest} messages, and converts written {@link StreamedHttpResponse} messages into
* {@link HttpResponse} messages followed by {@link HttpContent} messages.
*
* This allows request and response bodies to be handled using reactive streams.
*
* There are two types of messages that this handler will send down the chain, {@link StreamedHttpRequest},
* and {@link FullHttpRequest}. If {@link io.netty.channel.ChannelOption#AUTO_READ} is false for the channel,
* then any {@link StreamedHttpRequest} messages <em>must</em> be subscribed to consume the body, otherwise
* it's possible that no read will be done of the messages.
*
* There are three types of messages that this handler accepts for writing, {@link StreamedHttpResponse},
* {@link WebSocketHttpResponse} and {@link FullHttpResponse}. Writing any other messages may potentially
* lead to HTTP message mangling.
*
* As long as messages are returned in the order that they arrive, this handler implicitly supports HTTP
* pipelining.
*/
public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, HttpResponse> {
private HttpRequest lastRequest = null;
private Outgoing webSocketResponse = null;
private int inFlight = 0;
private boolean continueExpected = true;
private boolean sendContinue = false;
private boolean close = false;
private final List<ChannelHandler> dependentHandlers;
public HttpStreamsServerHandler() {
this(Collections.<ChannelHandler>emptyList());
}
/**
* Create a new handler that is depended on by the given handlers.
*
* The list of dependent handlers will be removed from the chain when this handler is removed from the chain,
* for example, when the connection is upgraded to use websockets. This is useful, for example, for removing
* the reactive streams publisher/subscriber from the chain in that event.
*
* @param dependentHandlers The handlers that depend on this handler.
*/
public HttpStreamsServerHandler(List<ChannelHandler> dependentHandlers) {
super(HttpRequest.class, HttpResponse.class);
this.dependentHandlers = dependentHandlers;
}
@Override
protected boolean hasBody(HttpRequest request) {
// Http requests don't have a body if they define 0 content length, or no content length and no transfer
// encoding
return HttpUtil.getContentLength(request, 0) != 0 || HttpUtil.isTransferEncodingChunked(request);
}
@Override
protected HttpRequest createEmptyMessage(HttpRequest request) {
return new EmptyHttpRequest(request);
}
@Override
protected HttpRequest createStreamedMessage(HttpRequest httpRequest, Publisher<HttpContent> stream) {
return new DelegateStreamedHttpRequest(httpRequest, stream);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Set to false, since if it was true, and the client is sending data, then the
// client must no longer be expecting it (due to a timeout, for example).
continueExpected = false;
sendContinue = false;
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
lastRequest = request;
if (HttpUtil.is100ContinueExpected(request)) {
continueExpected = true;
}
}
super.channelRead(ctx, msg);
}
@Override
protected void receivedInMessage(ChannelHandlerContext ctx) {
inFlight++;
}
@Override
protected void sentOutMessage(ChannelHandlerContext ctx) {
inFlight--;
if (inFlight == 1 && continueExpected && sendContinue) {
ctx.writeAndFlush(new DefaultFullHttpResponse(lastRequest.protocolVersion(), HttpResponseStatus.CONTINUE));
sendContinue = false;
continueExpected = false;
}
if (close) {
ctx.close();
}
}
@Override
protected void unbufferedWrite(ChannelHandlerContext ctx, HttpStreamsHandler<HttpRequest, HttpResponse>.Outgoing out) {
if (out.message instanceof WebSocketHttpResponse) {
if ((lastRequest instanceof FullHttpRequest) || !hasBody(lastRequest)) {
handleWebSocketResponse(ctx, out);
} else {
// If the response has a streamed body, then we can't send the WebSocket response until we've received
// the body.
webSocketResponse = out;
}
} else {
String connection = out.message.headers().get(HttpHeaderNames.CONNECTION);
if (lastRequest.protocolVersion().isKeepAliveDefault()) {
if ("close".equalsIgnoreCase(connection)) {
close = true;
}
} else {
if (!"keep-alive".equalsIgnoreCase(connection)) {
close = true;
}
}
if (inFlight == 1 && continueExpected) {
HttpUtil.setKeepAlive(out.message, false);
close = true;
continueExpected = false;
}
// According to RFC 7230 a server MUST NOT send a Content-Length or a Transfer-Encoding when the status
// code is 1xx or 204, also a status code 304 may not have a Content-Length or Transfer-Encoding set.
if (!HttpUtil.isContentLengthSet(out.message) && !HttpUtil.isTransferEncodingChunked(out.message)
&& canHaveBody(out.message)) {
HttpUtil.setKeepAlive(out.message, false);
close = true;
}
super.unbufferedWrite(ctx, out);
}
}
private boolean canHaveBody(HttpResponse message) {
HttpResponseStatus status = message.status();
// All 1xx (Informational), 204 (No Content), and 304 (Not Modified)
// responses do not include a message body
return !(status == HttpResponseStatus.CONTINUE || status == HttpResponseStatus.SWITCHING_PROTOCOLS ||
status == HttpResponseStatus.PROCESSING || status == HttpResponseStatus.NO_CONTENT ||
status == HttpResponseStatus.NOT_MODIFIED);
}
@Override
protected void consumedInMessage(ChannelHandlerContext ctx) {
if (webSocketResponse != null) {
handleWebSocketResponse(ctx, webSocketResponse);
webSocketResponse = null;
}
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);
if (handshaker == null) {
HttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.UPGRADE_REQUIRED);
res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());
HttpUtil.setContentLength(res, 0);
super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
response.subscribe(new CancelledSubscriber<>());
} else {
// First, insert new handlers in the chain after us for handling the websocket
ChannelPipeline pipeline = ctx.pipeline();
HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);
// Now remove ourselves from the chain
ctx.pipeline().remove(ctx.name());
// Now do the handshake
// Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
// we already have handled the body.
handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));
// And hook up the subscriber/publishers
response.subscribe(subscriber);
publisher.subscribe(response);
}
}
@Override
protected void bodyRequested(ChannelHandlerContext ctx) {
if (continueExpected) {
if (inFlight == 1) {
ctx.writeAndFlush(new DefaultFullHttpResponse(lastRequest.protocolVersion(), HttpResponseStatus.CONTINUE));
continueExpected = false;
} else {
sendContinue = true;
}
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
for (ChannelHandler dependent: dependentHandlers) {
try {
ctx.pipeline().remove(dependent);
} catch (NoSuchElementException e) {
// Ignore, maybe something else removed it
}
}
}
}

View file

@ -0,0 +1,16 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import org.reactivestreams.Publisher;
/**
* Combines {@link HttpMessage} and {@link Publisher} into one
* message. So it represents an http message with a stream of {@link HttpContent}
* messages that can be subscribed to.
*
* Note that receivers of this message <em>must</em> consume the publisher,
* since the publisher will exert back pressure up the stream if not consumed.
*/
public interface StreamedHttpMessage extends HttpMessage, Publisher<HttpContent> {
}

View file

@ -0,0 +1,11 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpRequest;
/**
* Combines {@link HttpRequest} and {@link StreamedHttpMessage} into one
* message. So it represents an http request with a stream of
* {@link io.netty.handler.codec.http.HttpContent} messages that can be subscribed to.
*/
public interface StreamedHttpRequest extends HttpRequest, StreamedHttpMessage {
}

View file

@ -0,0 +1,11 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpResponse;
/**
* Combines {@link HttpResponse} and {@link StreamedHttpMessage} into one
* message. So it represents an http response with a stream of
* {@link io.netty.handler.codec.http.HttpContent} messages that can be subscribed to.
*/
public interface StreamedHttpResponse extends HttpResponse, StreamedHttpMessage {
}

View file

@ -0,0 +1,24 @@
package org.xbib.netty.http.server.reactive;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.reactivestreams.Processor;
/**
* Combines {@link HttpResponse} and {@link Processor}
* into one message. So it represents an http response with a processor that can handle
* a WebSocket.
*
* This is only used for server side responses. For client side websocket requests, it's
* better to configure the reactive streams pipeline directly.
*/
public interface WebSocketHttpResponse extends HttpResponse, Processor<WebSocketFrame, WebSocketFrame> {
/**
* Get the handshaker factory to use to reconfigure the channel.
*
* @return The handshaker factory.
*/
WebSocketServerHandshakerFactory handshakerFactory();
}

View file

@ -8,6 +8,7 @@ import io.netty.handler.codec.http.HttpVersion;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@ -36,51 +37,49 @@ abstract class BaseServerTransport implements ServerTransport {
* and required special header handling, possibly returning an
* appropriate response.
*
* @param namedServer the named server
* @param serverRequest the request
* @param serverResponse the response
* @return whether further processing should be performed
*/
static boolean acceptRequest(ServerRequest serverRequest, ServerResponse serverResponse) {
static boolean acceptRequest(NamedServer namedServer, ServerRequest serverRequest, ServerResponse serverResponse) {
HttpHeaders reqHeaders = serverRequest.getRequest().headers();
HttpVersion version = serverRequest.getNamedServer().getHttpAddress().getVersion();
switch (version.majorVersion()) {
case 1:
case 2:
if (!reqHeaders.contains(HttpHeaderNames.HOST)) {
// RFC2616#14.23: missing Host header gets 400
serverResponse.writeError(HttpResponseStatus.BAD_REQUEST, "missing 'Host' header");
HttpVersion version = namedServer.getHttpAddress().getVersion();
if (version.majorVersion() == 1 || version.majorVersion() == 2) {
if (!reqHeaders.contains(HttpHeaderNames.HOST)) {
// RFC2616#14.23: missing Host header gets 400
ServerResponse.write(serverResponse, HttpResponseStatus.BAD_REQUEST, "missing 'Host' header");
return false;
}
// return a continue response before reading body
String expect = reqHeaders.get(HttpHeaderNames.EXPECT);
if (expect != null) {
if (expect.equalsIgnoreCase("100-continue")) {
//ServerResponse tempResp = new ServerResponse(serverResponse);
//tempResp.sendHeaders(100);
} else {
// RFC2616#14.20: if unknown expect, send 417
ServerResponse.write(serverResponse, HttpResponseStatus.EXPECTATION_FAILED);
return false;
}
// return a continue response before reading body
String expect = reqHeaders.get(HttpHeaderNames.EXPECT);
if (expect != null) {
if (expect.equalsIgnoreCase("100-continue")) {
//ServerResponse tempResp = new ServerResponse(serverResponse);
//tempResp.sendHeaders(100);
} else {
// RFC2616#14.20: if unknown expect, send 417
serverResponse.writeError(HttpResponseStatus.EXPECTATION_FAILED);
return false;
}
}
break;
default:
serverResponse.writeError(HttpResponseStatus.BAD_REQUEST, "Unknown version: " + version);
return false;
}
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.BAD_REQUEST, "unsupported HTTP version: " + version);
return false;
}
return true;
}
/**
* Handles a request according to the request method.
*
* @param namedServer the named server
* @param serverRequest the request
* @param serverResponse the response (into which the response is written)
* @throws IOException if and error occurs
*/
static void handle(HttpServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
// parse parameters from path and parse body, if required
static void handle(NamedServer namedServer, HttpServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
// create server URL and parse parameters from query string, path, and parse body, if exists
serverRequest.createParameters();
serverRequest.getNamedServer().execute(serverRequest, serverResponse);
namedServer.execute(serverRequest, serverResponse);
}
}

View file

@ -1,8 +1,10 @@
package org.xbib.netty.http.server.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
@ -13,13 +15,14 @@ import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedNioStream;
import io.netty.util.AsciiString;
import org.xbib.netty.http.server.ServerName;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.channels.ReadableByteChannel;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@ -52,47 +55,16 @@ public class Http2ServerResponse implements ServerResponse {
headers.set(name, value);
}
@Override
public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
}
@Override
public HttpResponseStatus getLastStatus() {
return httpResponseStatus;
}
@Override
public void write(String text) {
write(HttpResponseStatus.OK, "text/plain; charset=utf-8", text);
}
@Override
public void writeError(HttpResponseStatus status) {
writeError(status, status.reasonPhrase());
}
/**
* Sends an error response with the given status and detailed message.
*
* @param status the response status
* @param text the text body
*/
@Override
public void writeError(HttpResponseStatus status, String text) {
write(status, "text/plain; charset=utf-8", status.code() + " " + text);
}
@Override
public void write(HttpResponseStatus status) {
write(status, null, (ByteBuf) null);
}
@Override
public void write(HttpResponseStatus status, String contentType, String text) {
write(status, contentType, ByteBufUtil.writeUtf8(ctx.alloc(), text));
}
@Override
public void write(HttpResponseStatus status, String contentType, String text, Charset charset) {
write(status, contentType, ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.allocate(text.length()).append(text), charset));
}
@Override
public void write(HttpResponseStatus status, String contentType, ByteBuf byteBuf) {
if (byteBuf != null) {
@ -124,60 +96,55 @@ public class Http2ServerResponse implements ServerResponse {
headers.setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), streamId);
}
}
Http2Headers http2Headers = new DefaultHttp2Headers().status(status.codeAsText()).add(headers);
Http2HeadersFrame http2HeadersFrame = new DefaultHttp2HeadersFrame(http2Headers,byteBuf == null);
logger.log(Level.FINEST, http2HeadersFrame::toString);
ctx.channel().write(http2HeadersFrame);
this.httpResponseStatus = status;
if (byteBuf != null) {
Http2DataFrame http2DataFrame = new DefaultHttp2DataFrame(byteBuf, true);
logger.log(Level.FINEST, http2DataFrame::toString);
ctx.channel().write(http2DataFrame);
if (ctx.channel().isWritable()) {
Http2Headers http2Headers = new DefaultHttp2Headers().status(status.codeAsText()).add(headers);
Http2HeadersFrame http2HeadersFrame = new DefaultHttp2HeadersFrame(http2Headers, byteBuf == null);
logger.log(Level.FINEST, http2HeadersFrame::toString);
ctx.channel().write(http2HeadersFrame);
this.httpResponseStatus = status;
if (byteBuf != null) {
Http2DataFrame http2DataFrame = new DefaultHttp2DataFrame(byteBuf, true);
logger.log(Level.FINEST, http2DataFrame::toString);
ctx.channel().write(http2DataFrame);
}
ctx.channel().flush();
}
ctx.channel().flush();
}
/**
* Returns an HTML-escaped version of the given string for safe display
* within a web page. The characters '&amp;', '&gt;' and '&lt;' must always
* be escaped, and single and double quotes must be escaped within
* attribute values; this method escapes them always. This method can
* be used for generating both HTML and XHTML valid content.
* Chunked response from a readable byte channel.
*
* @param s the string to escape
* @return the escaped string
* @see <a href="http://www.w3.org/International/questions/qa-escapes">The W3C FAQ</a>
* @param status status
* @param contentType content type
* @param byteChannel byte channel
*/
private static String escapeHTML(String s) {
int len = s.length();
StringBuilder es = new StringBuilder(len + 30);
int start = 0;
for (int i = 0; i < len; i++) {
String ref = null;
switch (s.charAt(i)) {
case '&':
ref = "&amp;";
break;
case '>':
ref = "&gt;";
break;
case '<':
ref = "&lt;";
break;
case '"':
ref = "&quot;";
break;
case '\'':
ref = "&#39;";
break;
default:
break;
}
if (ref != null) {
es.append(s, start, i).append(ref);
start = i + 1;
}
@Override
public void write(HttpResponseStatus status, String contentType, ReadableByteChannel byteChannel) {
CharSequence s = headers.get(HttpHeaderNames.CONTENT_TYPE);
if (s == null) {
s = contentType != null ? contentType : HttpHeaderValues.APPLICATION_OCTET_STREAM;
headers.add(HttpHeaderNames.CONTENT_TYPE, s);
}
headers.add(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
if (!headers.contains(HttpHeaderNames.DATE)) {
headers.add(HttpHeaderNames.DATE, DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)));
}
headers.add(HttpHeaderNames.SERVER, ServerName.getServerName());
if (ctx.channel().isWritable()) {
Http2Headers http2Headers = new DefaultHttp2Headers().status(status.codeAsText()).add(headers);
Http2HeadersFrame http2HeadersFrame = new DefaultHttp2HeadersFrame(http2Headers,false);
logger.log(Level.FINEST, http2HeadersFrame::toString);
ctx.channel().write(http2HeadersFrame);
ChunkedInput<ByteBuf> input = new ChunkedNioStream(byteChannel);
HttpChunkedInput httpChunkedInput = new HttpChunkedInput(input);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(httpChunkedInput);
if ("close".equalsIgnoreCase(serverRequest.getRequest().headers().get(HttpHeaderNames.CONNECTION)) &&
!headers.contains(HttpHeaderNames.CONNECTION)) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
httpResponseStatus = status;
} else {
logger.log(Level.WARNING, "channel not writeable");
}
return start == 0 ? s : es.append(s.substring(start)).toString();
}
}

View file

@ -32,17 +32,16 @@ public class Http2ServerTransport extends BaseServerTransport {
}
Integer streamId = fullHttpRequest.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
HttpServerRequest serverRequest = new HttpServerRequest();
serverRequest.setNamedServer(namedServer);
serverRequest.setChannelHandlerContext(ctx);
serverRequest.setRequest(fullHttpRequest);
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(requestId);
serverRequest.setStreamId(streamId);
ServerResponse serverResponse = new Http2ServerResponse(serverRequest);
if (acceptRequest(serverRequest, serverResponse)) {
handle(serverRequest, serverResponse);
if (acceptRequest(namedServer, serverRequest, serverResponse)) {
handle(namedServer, serverRequest, serverResponse);
} else {
serverResponse.write(HttpResponseStatus.NOT_ACCEPTABLE);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
}
}

View file

@ -8,12 +8,12 @@ import org.xbib.net.QueryParameters;
import org.xbib.net.URL;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnmappableCharacterException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
@ -30,33 +30,28 @@ public class HttpServerRequest implements ServerRequest {
private static final CharSequence APPLICATION_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
private NamedServer namedServer;
private ChannelHandlerContext ctx;
private List<String> context;
private Map<String, String> pathParameters;
private String contextPath;
private Map<String, String> pathParameters = new LinkedHashMap<>();
private FullHttpRequest httpRequest;
private EndpointInfo info;
private HttpParameters parameters;
private URL url;
private Integer sequenceId;
private Integer streamId;
private Integer requestId;
public void setNamedServer(NamedServer namedServer) {
this.namedServer = namedServer;
}
@Override
public NamedServer getNamedServer() {
return namedServer;
}
public void setChannelHandlerContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@ -68,6 +63,7 @@ public class HttpServerRequest implements ServerRequest {
public void setRequest(FullHttpRequest fullHttpRequest) {
this.httpRequest = fullHttpRequest;
this.info = new EndpointInfo(this);
}
@Override
@ -75,8 +71,20 @@ public class HttpServerRequest implements ServerRequest {
return httpRequest;
}
@Override
public URL getURL() {
return url;
}
@Override
public EndpointInfo getEndpointInfo() {
return info;
}
@Override
public void setContext(List<String> context) {
this.context = context;
this.contextPath = context != null ? PATH_SEPARATOR + String.join(PATH_SEPARATOR, context) : null;
}
@Override
@ -86,18 +94,23 @@ public class HttpServerRequest implements ServerRequest {
@Override
public String getContextPath() {
return String.join(PATH_SEPARATOR, context);
return contextPath;
}
@Override
public String getEffectiveRequestPath() {
String uri = httpRequest.uri();
return context != null && !context.isEmpty() && uri.length() > 1 ?
uri.substring(getContextPath().length() + 2) : uri;
String path = getEndpointInfo().getPath();
String effective = contextPath != null && !PATH_SEPARATOR.equals(contextPath) && path.startsWith(contextPath) ?
path.substring(contextPath.length()) : path;
effective = effective.isEmpty() ? PATH_SEPARATOR : effective;
logger.log(Level.FINE, "path=" + path + " contextpath=" + contextPath + " effective=" + effective);
return effective;
}
public void setPathParameters(Map<String, String> pathParameters) {
this.pathParameters = pathParameters;
@Override
public void addPathParameter(String key, String value) throws IOException {
pathParameters.put(key, value);
parameters.add(key, value);
}
@Override
@ -108,7 +121,19 @@ public class HttpServerRequest implements ServerRequest {
@Override
public void createParameters() throws IOException {
try {
buildParameters();
HttpParameters httpParameters = new HttpParameters();
URL.Builder builder = URL.builder().path(getRequest().uri());
this.url = builder.build();
QueryParameters queryParameters = url.getQueryParams();
ByteBuf byteBuf = httpRequest.content();
if (APPLICATION_FORM_URL_ENCODED.equals(HttpUtil.getMimeType(httpRequest)) && byteBuf != null) {
String content = byteBuf.toString(HttpUtil.getCharset(httpRequest, StandardCharsets.ISO_8859_1));
queryParameters.addPercentEncodedBody(content);
}
for (QueryParameters.Pair<String, String> pair : queryParameters) {
httpParameters.add(pair.getFirst(), pair.getSecond());
}
this.parameters = httpParameters;
} catch (MalformedInputException | UnmappableCharacterException e) {
throw new IOException(e);
}
@ -146,30 +171,7 @@ public class HttpServerRequest implements ServerRequest {
return requestId;
}
private void buildParameters() throws MalformedInputException, UnmappableCharacterException {
HttpParameters httpParameters = new HttpParameters();
URL.Builder builder = URL.builder().path(getEffectiveRequestPath());
if (pathParameters != null && !pathParameters.isEmpty()) {
for (Map.Entry<String, String> entry : pathParameters.entrySet()) {
builder.queryParam(entry.getKey(), entry.getValue());
}
}
QueryParameters queryParameters = builder.build().getQueryParams();
ByteBuf byteBuf = httpRequest.content();
if (APPLICATION_FORM_URL_ENCODED.equals(HttpUtil.getMimeType(httpRequest)) && byteBuf != null) {
String content = byteBuf.toString(HttpUtil.getCharset(httpRequest, StandardCharsets.ISO_8859_1));
queryParameters.addPercentEncodedBody(content);
}
for (QueryParameters.Pair<String, String> pair : queryParameters) {
httpParameters.add(pair.getFirst(), pair.getSecond());
}
this.parameters = httpParameters;
}
public String toString() {
return "ServerRequest[namedServer=" + namedServer +
",context=" + context +
",request=" + httpRequest +
"]";
return "ServerRequest[request=" + httpRequest + "]";
}
}

View file

@ -1,24 +1,29 @@
package org.xbib.netty.http.server.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedNioStream;
import io.netty.util.AsciiString;
import org.xbib.netty.http.server.ServerName;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.handler.http.HttpPipelinedResponse;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.channels.ReadableByteChannel;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@ -26,12 +31,12 @@ import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
public class HttpServerResponse implements ServerResponse {
private static final Logger logger = Logger.getLogger(HttpServerResponse.class.getName());
private static final String EMPTY_STRING = "";
private final ServerRequest serverRequest;
private final ChannelHandlerContext ctx;
@ -56,52 +61,16 @@ public class HttpServerResponse implements ServerResponse {
headers.set(name, value);
}
@Override
public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
}
@Override
public HttpResponseStatus getLastStatus() {
return httpResponseStatus;
}
@Override
public void write(String text) {
write(HttpResponseStatus.OK, "text/plain; charset=utf-8", text);
}
/**
* Sends an error response with the given status and default body.
*
* @param status the response status
*/
@Override
public void writeError(HttpResponseStatus status) {
writeError(status, status.reasonPhrase());
}
/**
* Sends an error response with the given status and detailed message.
*
* @param status the response status
* @param text the text body
*/
@Override
public void writeError(HttpResponseStatus status, String text) {
write(status, "text/plain; charset=utf-8", status.code() + " " + text);
}
@Override
public void write(HttpResponseStatus status) {
write(status, "application/octet-stream", EMPTY_STRING);
}
@Override
public void write(HttpResponseStatus status, String contentType, String text) {
write(status, contentType, ByteBufUtil.writeUtf8(ctx.alloc(), text));
}
@Override
public void write(HttpResponseStatus status, String contentType, String text, Charset charset) {
write(status, contentType, ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.allocate(text.length()).append(text), charset));
}
@Override
public void write(HttpResponseStatus status, String contentType, ByteBuf byteBuf) {
Objects.requireNonNull(byteBuf);
@ -112,11 +81,7 @@ public class HttpServerResponse implements ServerResponse {
}
if (!headers.contains(HttpHeaderNames.CONTENT_LENGTH) && !headers.contains(HttpHeaderNames.TRANSFER_ENCODING)) {
int length = byteBuf.readableBytes();
if (length < 0) {
headers.add(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
} else {
headers.add(HttpHeaderNames.CONTENT_LENGTH, Long.toString(length));
}
headers.add(HttpHeaderNames.CONTENT_LENGTH, Long.toString(length));
}
if (serverRequest != null && "close".equalsIgnoreCase(serverRequest.getRequest().headers().get(HttpHeaderNames.CONNECTION)) &&
!headers.contains(HttpHeaderNames.CONNECTION)) {
@ -126,26 +91,57 @@ public class HttpServerResponse implements ServerResponse {
headers.add(HttpHeaderNames.DATE, DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)));
}
headers.add(HttpHeaderNames.SERVER, ServerName.getServerName());
FullHttpResponse fullHttpResponse =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, byteBuf, headers, trailingHeaders);
if (serverRequest != null && serverRequest.getSequenceId() != null) {
HttpPipelinedResponse httpPipelinedResponse = new HttpPipelinedResponse(fullHttpResponse,
ctx.channel().newPromise(), serverRequest.getSequenceId());
if (ctx.channel().isWritable()) {
logger.log(Level.FINEST, fullHttpResponse::toString);
if (ctx.channel().isWritable()) {
FullHttpResponse fullHttpResponse =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, byteBuf, headers, trailingHeaders);
if (serverRequest != null && serverRequest.getSequenceId() != null) {
HttpPipelinedResponse httpPipelinedResponse = new HttpPipelinedResponse(fullHttpResponse,
ctx.channel().newPromise(), serverRequest.getSequenceId());
ctx.channel().writeAndFlush(httpPipelinedResponse);
httpResponseStatus = status;
} else {
logger.log(Level.WARNING, "channel not writeable");
}
} else {
if (ctx.channel().isWritable()) {
logger.log(Level.FINEST, fullHttpResponse::toString);
ctx.channel().writeAndFlush(fullHttpResponse);
httpResponseStatus = status;
} else {
logger.log(Level.WARNING, "channel not writeable");
}
httpResponseStatus = status;
} else {
logger.log(Level.WARNING, "channel not writeable");
}
}
/**
* Chunked response from a readable byte channel.
*
* @param status status
* @param contentType content type
* @param byteChannel byte channel
*/
@Override
public void write(HttpResponseStatus status, String contentType, ReadableByteChannel byteChannel) {
CharSequence s = headers.get(HttpHeaderNames.CONTENT_TYPE);
if (s == null) {
s = contentType != null ? contentType : HttpHeaderValues.APPLICATION_OCTET_STREAM;
headers.add(HttpHeaderNames.CONTENT_TYPE, s);
}
headers.add(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
if (!headers.contains(HttpHeaderNames.DATE)) {
headers.add(HttpHeaderNames.DATE, DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)));
}
headers.add(HttpHeaderNames.SERVER, ServerName.getServerName());
if (ctx.channel().isWritable()) {
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);
httpResponse.headers().add(headers);
ctx.channel().write(httpResponse);
logger.log(Level.FINE, "written response " + httpResponse);
ChunkedInput<ByteBuf> input = new ChunkedNioStream(byteChannel);
HttpChunkedInput httpChunkedInput = new HttpChunkedInput(input);
ctx.channel().writeAndFlush(httpChunkedInput);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(EMPTY_LAST_CONTENT);
if ("close".equalsIgnoreCase(serverRequest.getRequest().headers().get(HttpHeaderNames.CONNECTION)) &&
!headers.contains(HttpHeaderNames.CONNECTION)) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
httpResponseStatus = status;
} else {
logger.log(Level.WARNING, "channel not writeable");
}
}
}

View file

@ -6,6 +6,7 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2Settings;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
@ -30,16 +31,15 @@ public class HttpServerTransport extends BaseServerTransport {
namedServer = server.getDefaultNamedServer();
}
HttpServerRequest serverRequest = new HttpServerRequest();
serverRequest.setNamedServer(namedServer);
serverRequest.setChannelHandlerContext(ctx);
serverRequest.setRequest(fullHttpRequest);
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(requestId);
HttpServerResponse serverResponse = new HttpServerResponse(serverRequest);
if (acceptRequest(serverRequest, serverResponse)) {
handle(serverRequest, serverResponse);
if (acceptRequest(namedServer, serverRequest, serverResponse)) {
handle(namedServer, serverRequest, serverResponse);
} else {
serverResponse.write(HttpResponseStatus.NOT_ACCEPTABLE);
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
}
}

View file

@ -0,0 +1,48 @@
package org.xbib.netty.http.server.util;
public class HtmlUtils {
/**
* Returns an HTML-escaped version of the given string for safe display
* within a web page. The characters '&amp;', '&gt;' and '&lt;' must always
* be escaped, and single and double quotes must be escaped within
* attribute values; this method escapes them always. This method can
* be used for generating both HTML and XHTML valid content.
*
* @param s the string to escape
* @return the escaped string
* @see <a href="http://www.w3.org/International/questions/qa-escapes">The W3C FAQ</a>
*/
public static String escapeHTML(String s) {
int len = s.length();
StringBuilder es = new StringBuilder(len + 30);
int start = 0;
for (int i = 0; i < len; i++) {
String ref = null;
switch (s.charAt(i)) {
case '&':
ref = "&amp;";
break;
case '>':
ref = "&gt;";
break;
case '<':
ref = "&lt;";
break;
case '"':
ref = "&quot;";
break;
case '\'':
ref = "&#39;";
break;
default:
break;
}
if (ref != null) {
es.append(s, start, i).append(ref);
start = i + 1;
}
}
return start == 0 ? s : es.append(s.substring(start)).toString();
}
}

View file

@ -9,7 +9,7 @@ import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.endpoint.NamedServer;
import org.xbib.netty.http.server.endpoint.service.ClasspathService;
import org.xbib.netty.http.server.endpoint.service.ClassLoaderService;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
@ -24,17 +24,18 @@ class ClassloaderServiceTest {
private static final Logger logger = Logger.getLogger(ClassloaderServiceTest.class.getName());
@Test
void testClassloader() throws Exception {
void testSimpleClassloader() throws Exception {
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/classloader", "/**", new ClasspathService(ClassloaderServiceTest.class, "/cl"))
.singleEndpoint("/classloader", "/**",
new ClassLoaderService(ClassloaderServiceTest.class, "/cl"))
.build();
Server server = Server.builder(namedServer)
.build();
server.logDiagnostics(Level.INFO);
Client client = Client.builder()
.build();
int max = 100;
int max = 1;
final AtomicInteger count = new AtomicInteger(0);
try {
server.accept();

View file

@ -9,6 +9,7 @@ import org.xbib.netty.http.client.listener.ResponseListener;
import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
@ -116,11 +117,11 @@ class CleartextHttp2Test {
@Test
void testMultithreadPooledClearTextHttp2() throws Exception {
int threads = 2;
int loop = 4 * 1024;
int loop = 2 * 1024;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/", (request, response) ->
response.write(HttpResponseStatus.OK, "text/plain",
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getRequest().content().toString(StandardCharsets.UTF_8)))
.build();
Server server = Server.builder(namedServer).build();
@ -184,7 +185,7 @@ class CleartextHttp2Test {
AtomicInteger counter1 = new AtomicInteger();
NamedServer namedServer1 = NamedServer.builder(httpAddress1)
.singleEndpoint("/", (request, response) -> {
response.write(HttpResponseStatus.OK, "text/plain",
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getRequest().content().toString(StandardCharsets.UTF_8));
counter1.incrementAndGet();
})
@ -195,7 +196,7 @@ class CleartextHttp2Test {
AtomicInteger counter2 = new AtomicInteger();
NamedServer namedServer2 = NamedServer.builder(httpAddress2)
.singleEndpoint("/", (request, response) -> {
response.write(HttpResponseStatus.OK, "text/plain",
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getRequest().content().toString(StandardCharsets.UTF_8));
counter2.incrementAndGet();
})

View file

@ -0,0 +1,37 @@
package org.xbib.netty.http.server.test;
import io.netty.channel.ChannelFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
import java.net.BindException;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class DoubleServerTest {
@Test
void testDoubleServer() throws IOException {
NamedServer namedServer = NamedServer.builder(HttpAddress.http1("localhost", 8008), "*")
.singleEndpoint("/", (request, response) -> ServerResponse.write(response, "Hello World"))
.build();
Server server1 = Server.builder(namedServer).build();
Server server2 = Server.builder(namedServer).build();
try {
Assertions.assertThrows(BindException.class, () ->{
ChannelFuture channelFuture1 = server1.accept();
assertNotNull(channelFuture1);
ChannelFuture channelFuture2 = server2.accept();
// should crash with BindException
});
} finally {
server1.shutdownGracefully();
server2.shutdownGracefully();
}
}
}

View file

@ -8,10 +8,11 @@ import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.Endpoint;
import org.xbib.netty.http.server.endpoint.EndpointResolver;
import org.xbib.netty.http.server.endpoint.NamedServer;
import org.xbib.netty.http.server.endpoint.service.NioService;
import org.xbib.netty.http.server.endpoint.service.MappedFileService;
import org.xbib.netty.http.server.endpoint.service.Service;
import java.io.IOException;
@ -33,16 +34,97 @@ class EndpointTest {
private static final Logger logger = Logger.getLogger(EndpointTest.class.getName());
@Test
void testEndpoints() throws Exception {
void testEmptyPrefixEndpoint() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
Service service = new NioService(vartmp);
Service service = new MappedFileService(vartmp);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
EndpointResolver endpointResolver = EndpointResolver.builder()
.addEndpoint(Endpoint.builder().setPrefix("/static").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static1").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static2").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPath("/**").build())
.setDispatcher((endpoint, req, resp) -> {
logger.log(Level.FINE, "endpoint=" + endpoint + " req=" + req);
logger.log(Level.FINE, "dispatching endpoint=" + endpoint + " req=" + req);
service.handle(req, resp);
})
.build();
NamedServer namedServer = NamedServer.builder(httpAddress)
.addEndpointResolver(endpointResolver)
.build();
Server server = Server.builder(namedServer)
.build();
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
try {
Files.write(vartmp.resolve("test.txt"), "Hello Jörg".getBytes(StandardCharsets.UTF_8));
server.accept();
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/test.txt"))
.build()
.setResponseListener(r -> {
assertEquals("Hello Jörg", r.content().toString(StandardCharsets.UTF_8));
success.set(true);
});
client.execute(request).get();
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
Files.delete(vartmp.resolve("test.txt"));
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success.get());
}
@Test
void testPlainPrefixEndpoint() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
Service service = new MappedFileService(vartmp);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
EndpointResolver endpointResolver = EndpointResolver.builder()
.addEndpoint(Endpoint.builder().setPrefix("/").setPath("/**").build())
.setDispatcher((endpoint, req, resp) -> {
logger.log(Level.FINE, "dispatching endpoint=" + endpoint + " req=" + req);
service.handle(req, resp);
})
.build();
NamedServer namedServer = NamedServer.builder(httpAddress)
.addEndpointResolver(endpointResolver)
.build();
Server server = Server.builder(namedServer)
.build();
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
try {
Files.write(vartmp.resolve("test.txt"), "Hello Jörg".getBytes(StandardCharsets.UTF_8));
server.accept();
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/test.txt"))
.build()
.setResponseListener(r -> {
assertEquals("Hello Jörg", r.content().toString(StandardCharsets.UTF_8));
success.set(true);
});
client.execute(request).get();
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
Files.delete(vartmp.resolve("test.txt"));
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success.get());
}
@Test
void testSimplePathEndpoints() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
Service service = new MappedFileService(vartmp);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
EndpointResolver endpointResolver = EndpointResolver.builder()
.addEndpoint(Endpoint.builder().setPrefix("/static").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static1").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static2").setPath("/**").build())
.setDispatcher((endpoint, req, resp) -> {
logger.log(Level.FINE, "dispatching endpoint=" + endpoint + " req=" + req);
service.handle(req, resp);
})
.build();
@ -51,7 +133,6 @@ class EndpointTest {
.build();
Server server = Server.builder(namedServer)
.build();
server.logDiagnostics(Level.INFO);
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
@ -99,27 +180,105 @@ class EndpointTest {
assertTrue(success2.get());
}
@Test
void testQueryAndFragmentEndpoints() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
Service service = new MappedFileService(vartmp);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
EndpointResolver endpointResolver = EndpointResolver.builder()
.addEndpoint(Endpoint.builder().setPrefix("/static").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static1").setPath("/**").build())
.addEndpoint(Endpoint.builder().setPrefix("/static2").setPath("/**").build())
.setDispatcher((endpoint, req, resp) -> {
logger.log(Level.FINE, "dispatching endpoint=" + endpoint + " req=" + req +
" fragment=" + req.getURL().getFragment());
service.handle(req, resp);
})
.build();
NamedServer namedServer = NamedServer.builder(httpAddress)
.addEndpointResolver(endpointResolver)
.build();
Server server = Server.builder(namedServer)
.build();
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
final AtomicBoolean success1 = new AtomicBoolean(false);
final AtomicBoolean success2 = new AtomicBoolean(false);
try {
Files.write(vartmp.resolve("test.txt"), "Hello Jörg".getBytes(StandardCharsets.UTF_8));
Files.write(vartmp.resolve("test1.txt"), "Hello Jörg 1".getBytes(StandardCharsets.UTF_8));
Files.write(vartmp.resolve("test2.txt"), "Hello Jörg 2".getBytes(StandardCharsets.UTF_8));
server.accept();
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/static/test.txt"))
.addParameter("a", "b")
.build()
.setResponseListener(r -> {
if (r.status().equals(HttpResponseStatus.OK)) {
assertEquals("Hello Jörg", r.content().toString(StandardCharsets.UTF_8));
success.set(true);
} else {
logger.log(Level.WARNING, r.toString());
}
});
client.execute(request).get();
Request request1 = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base()
.resolve("/static1/test1.txt").newBuilder().fragment("frag").build())
.build()
.setResponseListener(r -> {
if (r.status().equals(HttpResponseStatus.OK)) {
assertEquals("Hello Jörg 1", r.content().toString(StandardCharsets.UTF_8));
success1.set(true);
} else {
logger.log(Level.WARNING, r.toString());
}
});
client.execute(request1).get();
Request request2 = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/static2/test2.txt"))
.content("{\"a\":\"b\"}","application/json")
.build()
.setResponseListener(r -> {
if (r.status().equals(HttpResponseStatus.OK)) {
assertEquals("Hello Jörg 2", r.content().toString(StandardCharsets.UTF_8));
success2.set(true);
} else {
logger.log(Level.WARNING, r.toString());
}
});
client.execute(request2).get();
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
Files.delete(vartmp.resolve("test.txt"));
Files.delete(vartmp.resolve("test1.txt"));
Files.delete(vartmp.resolve("test2.txt"));
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success.get());
assertTrue(success1.get());
assertTrue(success2.get());
}
@Test
void testMassiveEndpoints() throws IOException {
int max = 1000;
int max = 2; // more than 1024
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
EndpointResolver.Builder endpointResolverBuilder = EndpointResolver.builder()
.setPrefix("/static");
for (int i = 0; i < max; i++) {
endpointResolverBuilder.addEndpoint(Endpoint.builder()
.setPath(i + "/**")
.addFilter((req, resp) -> resp.write(HttpResponseStatus.OK))
.setPath("/" + i + "/**")
.addFilter((req, resp) -> ServerResponse.write(resp, HttpResponseStatus.OK))
.build());
}
endpointResolverBuilder.setDispatcher((endpoint, req, resp) -> {
logger.log(Level.FINEST, "endpoint=" + endpoint + " req=" + req + " resp=" + resp);
});
NamedServer namedServer = NamedServer.builder(httpAddress)
.addEndpointResolver(endpointResolverBuilder.build())
.build();
Server server = Server.builder(namedServer)
.build();
server.logDiagnostics(Level.INFO);
Client client = Client.builder()
.build();
final AtomicInteger count = new AtomicInteger(0);
@ -132,7 +291,6 @@ class EndpointTest {
.setResponseListener(r -> {
if (r.status().equals(HttpResponseStatus.OK)) {
count.incrementAndGet();
logger.log(Level.INFO, r.status().reasonPhrase());
} else {
logger.log(Level.WARNING, r.status().reasonPhrase());
}

View file

@ -24,6 +24,7 @@ public class NettyHttpExtension implements BeforeAllCallback {
//System.setProperty("io.netty.recycler.maxCapacity", Integer.toString(0));
//System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");
LogManager.getLogManager().reset();
@ -31,10 +32,10 @@ public class NettyHttpExtension implements BeforeAllCallback {
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.FINE);
rootLogger.setLevel(level);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.FINE);
h.setLevel(level);
}
}
}

View file

@ -9,6 +9,7 @@ import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.util.concurrent.atomic.AtomicBoolean;
@ -29,7 +30,7 @@ class PostTest {
.singleEndpoint("/post", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got post " + parameters.toString());
resp.write(HttpResponseStatus.OK);
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
.build();
Server server = Server.builder(namedServer)
@ -67,7 +68,7 @@ class PostTest {
.singleEndpoint("/post", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got post " + parameters.toString());
resp.write(HttpResponseStatus.OK);
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
.build();
Server server = Server.builder(namedServer)

View file

@ -8,7 +8,7 @@ import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.endpoint.NamedServer;
import org.xbib.netty.http.server.endpoint.service.NioService;
import org.xbib.netty.http.server.endpoint.service.MappedFileService;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -33,7 +33,7 @@ class SecureStaticFileServiceTest {
Server server = Server.builder(NamedServer.builder(httpAddress, "*")
.setJdkSslProvider()
.setSelfCert()
.singleEndpoint("/static", "/**", new NioService(vartmp))
.singleEndpoint("/static", "/**", new MappedFileService(vartmp))
.build())
.setChildThreadCount(8)
.build();
@ -73,7 +73,7 @@ class SecureStaticFileServiceTest {
Server server = Server.builder(NamedServer.builder(httpAddress, "*")
.setOpenSSLSslProvider()
.setSelfCert()
.singleEndpoint("/static", "/**", new NioService(vartmp))
.singleEndpoint("/static", "/**", new MappedFileService(vartmp))
.build())
.build();
Client client = Client.builder()

View file

@ -4,6 +4,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
@Disabled
@ -12,7 +13,7 @@ class ServerTest {
@Test
void testServer() throws Exception {
NamedServer namedServer = NamedServer.builder(HttpAddress.http1("localhost", 8008), "*")
.singleEndpoint("/", (request, response) -> response.write("Hello World"))
.singleEndpoint("/", (request, response) -> ServerResponse.write(response, "Hello World"))
.build();
Server server = Server.builder(namedServer).build();
try {

View file

@ -8,7 +8,8 @@ import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.endpoint.NamedServer;
import org.xbib.netty.http.server.endpoint.service.NioService;
import org.xbib.netty.http.server.endpoint.service.ChunkedFileService;
import org.xbib.netty.http.server.endpoint.service.MappedFileService;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -31,11 +32,44 @@ class StaticFileServiceTest {
Path vartmp = Paths.get("/var/tmp/");
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/static", "/**", new NioService(vartmp))
.singleEndpoint("/static", "/**", new MappedFileService(vartmp))
.build();
Server server = Server.builder(namedServer)
.build();
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
try {
Files.write(vartmp.resolve("test.txt"), "Hello Jörg".getBytes(StandardCharsets.UTF_8));
server.accept();
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/static/test.txt"))
.build()
.setResponseListener(r -> {
assertEquals("Hello Jörg", r.content().toString(StandardCharsets.UTF_8));
success.set(true);
});
logger.log(Level.INFO, request.toString());
client.execute(request).get();
logger.log(Level.INFO, "request complete");
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
Files.delete(vartmp.resolve("test.txt"));
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success.get());
}
@Test
void testChunkedFileServerHttp1() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/static", "/**", new ChunkedFileService(vartmp))
.build();
Server server = Server.builder(namedServer)
.build();
server.logDiagnostics(Level.INFO);
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
@ -66,11 +100,44 @@ class StaticFileServiceTest {
Path vartmp = Paths.get("/var/tmp/");
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/static", "/**", new NioService(vartmp))
.singleEndpoint("/static", "/**", new MappedFileService(vartmp))
.build();
Server server = Server.builder(namedServer)
.build();
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);
try {
Files.write(vartmp.resolve("test.txt"), "Hello Jörg".getBytes(StandardCharsets.UTF_8));
server.accept();
Request request = Request.get().setVersion(HttpVersion.valueOf("HTTP/2.0"))
.url(server.getServerConfig().getAddress().base().resolve("/static/test.txt"))
.build()
.setResponseListener(r -> {
assertEquals("Hello Jörg", r.content().toString(StandardCharsets.UTF_8));
success.set(true);
});
logger.log(Level.INFO, request.toString());
client.execute(request).get();
logger.log(Level.INFO, "request complete");
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
Files.delete(vartmp.resolve("test.txt"));
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success.get());
}
@Test
void testChunkedFileServerHttp2() throws Exception {
Path vartmp = Paths.get("/var/tmp/");
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
NamedServer namedServer = NamedServer.builder(httpAddress)
.singleEndpoint("/static", "/**", new ChunkedFileService(vartmp))
.build();
Server server = Server.builder(namedServer)
.build();
server.logDiagnostics(Level.INFO);
Client client = Client.builder()
.build();
final AtomicBoolean success = new AtomicBoolean(false);

View file

@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.endpoint.NamedServer;
import java.io.IOException;
@ -22,8 +23,7 @@ class ThreadLeakTest {
@Test
void testForLeaks() throws IOException {
NamedServer namedServer = NamedServer.builder()
.singleEndpoint("/", (request, response) ->
response.write("Hello World"))
.singleEndpoint("/", (request, response) -> ServerResponse.write(response, "Hello World"))
.build();
Server server = Server.builder(namedServer)
.setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)

View file

@ -0,0 +1,57 @@
package org.xbib.netty.http.server.test.reactive;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.atomic.AtomicLong;
/**
* A batched producer.
*
* Responds to read requests with batches of elements according to batch size. When eofOn is reached, it closes the
* channel.
*/
public class BatchedProducer extends ChannelOutboundHandlerAdapter {
protected final long eofOn;
protected final int batchSize;
protected final AtomicLong sequence;
public BatchedProducer(long eofOn, int batchSize, long sequence) {
this.eofOn = eofOn;
this.batchSize = batchSize;
this.sequence = new AtomicLong(sequence);
}
private boolean cancelled = false;
@Override
public void read(final ChannelHandlerContext ctx) throws Exception {
if (cancelled) {
throw new IllegalStateException("Received demand after being cancelled");
}
ctx.pipeline().channel().eventLoop().parent().execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < batchSize && sequence.get() != eofOn; i++) {
ctx.fireChannelRead(sequence.getAndIncrement());
}
if (eofOn == sequence.get()) {
ctx.fireChannelInactive();
} else {
ctx.fireChannelReadComplete();
}
}
});
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (cancelled) {
throw new IllegalStateException("Cancelled twice");
}
cancelled = true;
}
}

View file

@ -0,0 +1,158 @@
package org.xbib.netty.http.server.test.reactive;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.xbib.netty.http.server.reactive.HandlerPublisher;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
class ChannelPublisherTest {
private EventLoopGroup group;
private Channel channel;
private Publisher<Channel> publisher;
private SubscriberProbe<Channel> subscriber;
@BeforeEach
void start() throws Exception {
group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
HandlerPublisher<Channel> handlerPublisher = new HandlerPublisher<>(eventLoop, Channel.class);
Bootstrap bootstrap = new Bootstrap();
bootstrap
.channel(NioServerSocketChannel.class)
.group(eventLoop)
.option(ChannelOption.AUTO_READ, false)
.handler(handlerPublisher)
.localAddress("127.0.0.1", 0);
channel = bootstrap.bind().await().channel();
this.publisher = handlerPublisher;
subscriber = new SubscriberProbe<>();
}
@AfterEach
void stop() throws Exception {
channel.unsafe().closeForcibly();
group.shutdownGracefully();
}
@Test
void test() throws Exception {
publisher.subscribe(subscriber);
Subscription sub = subscriber.takeSubscription();
// Try one cycle
sub.request(1);
Socket socket1 = connect();
receiveConnection();
readWriteData(socket1, 1);
// Check back pressure
Socket socket2 = connect();
subscriber.expectNoElements();
// Now request the next connection
sub.request(1);
receiveConnection();
readWriteData(socket2, 2);
// Close the channel
channel.close();
subscriber.expectNoElements();
subscriber.expectComplete();
}
private Socket connect() throws Exception {
InetSocketAddress address = (InetSocketAddress) channel.localAddress();
return new Socket(address.getAddress(), address.getPort());
}
private void readWriteData(Socket socket, int data) throws Exception {
OutputStream os = socket.getOutputStream();
os.write(data);
os.flush();
InputStream is = socket.getInputStream();
int received = is.read();
socket.close();
assertEquals(received, data);
}
private void receiveConnection() throws Exception {
Channel channel = subscriber.take();
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg);
}
});
group.register(channel);
}
private class SubscriberProbe<T> implements Subscriber<T> {
final BlockingQueue<Subscription> subscriptions = new LinkedBlockingQueue<>();
final BlockingQueue<T> elements = new LinkedBlockingQueue<>();
final Promise<Void> promise = new DefaultPromise<>(group.next());
public void onSubscribe(Subscription s) {
subscriptions.add(s);
}
public void onNext(T t) {
elements.add(t);
}
public void onError(Throwable t) {
promise.setFailure(t);
}
public void onComplete() {
promise.setSuccess(null);
}
Subscription takeSubscription() throws Exception {
Subscription sub = subscriptions.poll(100, TimeUnit.MILLISECONDS);
assertNotNull(sub);
return sub;
}
T take() throws Exception {
T t = elements.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(t);
return t;
}
void expectNoElements() throws Exception {
T t = elements.poll(100, TimeUnit.MILLISECONDS);
assertNull(t);
}
void expectComplete() throws Exception {
promise.get(100, TimeUnit.MILLISECONDS);
}
}
}

View file

@ -0,0 +1,108 @@
package org.xbib.netty.http.server.test.reactive;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import java.net.SocketAddress;
/**
* A closed loop channel that sends no events and receives no events, for testing purposes.
*
* Any outgoing events that reach the channel will throw an exception. All events should be caught
* be inserting a handler that catches them and responds accordingly.
*/
public class ClosedLoopChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
private static final ChannelMetadata metadata = new ChannelMetadata(false);
private volatile boolean open = true;
private volatile boolean active = true;
public ClosedLoopChannel() {
super(null);
}
public void setOpen(boolean open) {
this.open = open;
}
public void setActive(boolean active) {
this.active = active;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new AbstractUnsafe() {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
};
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
throw new UnsupportedOperationException();
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doClose() throws Exception {
this.open = false;
}
@Override
protected void doBeginRead() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return open;
}
@Override
public boolean isActive() {
return active;
}
@Override
public ChannelMetadata metadata() {
return metadata;
}
}