diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 44d81bb..377ca7e 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -76,6 +76,11 @@ public abstract class Socket extends Emitter { public static final String EVENT_HEARTBEAT = "heartbeat"; public static final String EVENT_DATA = "data"; + /** + * Called on a new transport is created. + */ + public static final String EVENT_TRANSPORT = "transport"; + private static final Runnable noop = new Runnable() { @Override public void run() {} @@ -99,7 +104,6 @@ public abstract class Socket extends Emitter { private String hostname; private String path; private String timestampParam; - private String cookie; private List transports; private List upgrades; private Map query; @@ -163,7 +167,6 @@ public abstract class Socket extends Emitter { this.transports = new ArrayList(Arrays.asList(opts.transports != null ? opts.transports : new String[]{Polling.NAME, WebSocket.NAME})); this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; - this.cookie = opts.cookie; } /** @@ -201,7 +204,6 @@ public abstract class Socket extends Emitter { opts.timestampRequests = this.timestampRequests; opts.timestampParam = this.timestampParam; opts.policyPort = this.policyPort; - opts.cookie = this.cookie; if (WebSocket.NAME.equals(name)) { return new WebSocket(opts); @@ -223,6 +225,8 @@ public abstract class Socket extends Emitter { this.transport = transport; + self.emit(EVENT_TRANSPORT, transport); + transport.on(Transport.EVENT_DRAIN, new Listener() { @Override public void call(Object... args) { diff --git a/src/main/java/com/github/nkzawa/engineio/client/Transport.java b/src/main/java/com/github/nkzawa/engineio/client/Transport.java index 5a61b53..2dbb7c2 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Transport.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Transport.java @@ -23,6 +23,8 @@ public abstract class Transport extends Emitter { public static final String EVENT_PACKET = "packet"; public static final String EVENT_DRAIN = "drain"; public static final String EVENT_ERROR = "error"; + public static final String EVENT_REQUEST_HEADERS = "requestHeaders"; + public static final String EVENT_RESPONSE_HEADERS = "responseHeaders"; public boolean writable; public String name; @@ -121,11 +123,6 @@ public abstract class Transport extends Emitter { public static class Options { - /** - * Cookie value for handshake. - */ - public String cookie; - public String hostname; public String path; public String timestampParam; diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java index 240e70e..3e81c8f 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java @@ -7,6 +7,8 @@ import com.github.nkzawa.engineio.client.EventThread; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -17,11 +19,9 @@ public class PollingXHR extends Polling { private Request sendXhr; private Request pollXhr; - private String cookie; public PollingXHR(Options opts) { super(opts); - this.cookie = opts.cookie; } protected Request request() { @@ -33,8 +33,34 @@ public class PollingXHR extends Polling { opts = new Request.Options(); } opts.uri = this.uri(); - opts.cookie = this.cookie; - return new Request(opts); + + Request req = new Request(opts); + + final PollingXHR self = this; + req.on(Request.EVENT_REQUEST_HEADERS, new Listener() { + @Override + public void call(Object... args) { + // Never execute asynchronously for support to modify headers. + @SuppressWarnings("unchecked") + Map headers = args.length > 0 && args[0] instanceof Map ? + (Map)args[0] : new HashMap(); + self.emit(EVENT_REQUEST_HEADERS, headers); + } + }).on(Request.EVENT_RESPONSE_HEADERS, new Listener() { + @Override + public void call(final Object... args) { + EventThread.exec(new Runnable() { + @Override + public void run() { + @SuppressWarnings("unchecked") + final Map headers = args.length > 0 && args[0] instanceof Map ? + (Map)args[0] : new HashMap(); + self.emit(EVENT_RESPONSE_HEADERS, headers); + } + }); + } + }); + return req; } protected void doWrite(String data, final Runnable fn) { @@ -107,20 +133,20 @@ public class PollingXHR extends Polling { public static final String EVENT_SUCCESS = "success"; public static final String EVENT_DATA = "data"; public static final String EVENT_ERROR = "error"; + public static final String EVENT_REQUEST_HEADERS = "requestHeaders"; + public static final String EVENT_RESPONSE_HEADERS = "responseHeaders"; private static final ExecutorService xhrService = Executors.newCachedThreadPool(); String method; String uri; String data; - String cookie; HttpURLConnection xhr; public Request(Options opts) { this.method = opts.method != null ? opts.method : "GET"; this.uri = opts.uri; this.data = opts.data; - this.cookie = opts.cookie; } public void create() { @@ -135,13 +161,16 @@ public class PollingXHR extends Polling { return; } + Map headers = new HashMap(); + if ("POST".equals(this.method)) { xhr.setDoOutput(true); - xhr.setRequestProperty("Content-type", "text/plain;charset=UTF-8"); + headers.put("Content-type", "text/plain;charset=UTF-8"); } - if (this.cookie != null) { - xhr.setRequestProperty("Cookie", this.cookie); + self.onRequestHeaders(headers); + for (Map.Entry header : headers.entrySet()) { + xhr.setRequestProperty(header.getKey(), header.getValue()); } logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data)); @@ -159,6 +188,12 @@ public class PollingXHR extends Polling { writer.flush(); } + Map headers = new HashMap(); + for (String key : xhr.getHeaderFields().keySet()) { + headers.put(key, xhr.getHeaderField(key)); + } + self.onResponseHeaders(headers); + StringBuilder data = null; final int statusCode = xhr.getResponseCode(); @@ -205,6 +240,14 @@ public class PollingXHR extends Polling { this.cleanup(); } + private void onRequestHeaders(Map headers) { + this.emit(EVENT_REQUEST_HEADERS, headers); + } + + private void onResponseHeaders(Map headers) { + this.emit(EVENT_RESPONSE_HEADERS, headers); + } + private void cleanup() { if (xhr != null) { xhr.disconnect(); @@ -221,7 +264,6 @@ public class PollingXHR extends Polling { public String uri; public String method; public String data; - public String cookie; } } } diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index d0abc28..6d93597 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -1,6 +1,8 @@ package com.github.nkzawa.engineio.client; import com.github.nkzawa.emitter.Emitter; +import com.github.nkzawa.engineio.client.transports.Polling; +import com.github.nkzawa.engineio.client.transports.PollingXHR; import com.github.nkzawa.engineio.parser.HandshakeData; import org.junit.After; import org.junit.Before; @@ -12,6 +14,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URISyntaxException; +import java.util.Map; import java.util.concurrent.*; import static org.hamcrest.CoreMatchers.is; @@ -93,7 +96,6 @@ public class ServerConnectionTest { socket = new Socket("ws://localhost:" + PORT) { @Override public void onopen() { - System.out.println("onopen:"); events.offer("onopen"); } @@ -102,7 +104,6 @@ public class ServerConnectionTest { @Override public void onclose() { - System.out.println("onclose:"); events.offer("onclose"); } @@ -123,19 +124,16 @@ public class ServerConnectionTest { socket = new Socket("ws://localhost:" + PORT) { @Override public void onopen() { - System.out.println("onopen:"); socket.send("hi"); } @Override public void onmessage(String data) { - System.out.println("onmessage: " + data); events.offer(data); } @Override public void onclose() {} - @Override public void onerror(Exception err) {} }; @@ -163,7 +161,6 @@ public class ServerConnectionTest { socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() { @Override public void call(Object... args) { - System.out.println(String.format("on handshake: %s", args.length)); events.offer(args); } }); @@ -178,7 +175,7 @@ public class ServerConnectionTest { assertThat(data.upgrades, is(notNullValue())); assertThat(data.upgrades, is(not(empty()))); assertThat(data.pingTimeout, is(greaterThan((long)0))); - assertThat(data.pingInterval, is(greaterThan((long)0))); + assertThat(data.pingInterval, is(greaterThan((long) 0))); socket.close(); } @@ -200,14 +197,12 @@ public class ServerConnectionTest { socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() { @Override public void call(Object... args) { - System.out.println(String.format("on upgrading: %s", args.length)); events.offer(args); } }); socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { @Override public void call(Object... args) { - System.out.println(String.format("on upgrade: %s", args.length)); events.offer(args); } }); @@ -229,32 +224,49 @@ public class ServerConnectionTest { } @Test(timeout = TIMEOUT) - public void cookie() throws URISyntaxException, InterruptedException { + public void pollingHeaders() throws URISyntaxException, InterruptedException { final BlockingQueue messages = new LinkedBlockingQueue(); Socket.Options opts = new Socket.Options(); - opts.cookie = "foo=1;"; + opts.transports = new String[] {Polling.NAME}; socket = new Socket("ws://localhost:" + PORT, opts) { @Override public void onopen() {} - @Override - public void onmessage(String data) { - System.out.println("onmessage: " + data); - messages.offer(data); - } - + public void onmessage(String data) {} @Override public void onclose() {} - @Override public void onerror(Exception err) {} }; + socket.on(Socket.EVENT_TRANSPORT, new Emitter.Listener() { + @Override + public void call(Object... args) { + Transport transport = (Transport)args[0]; + if (!(transport instanceof PollingXHR)) return; + + transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() { + @Override + public void call(Object... args) { + @SuppressWarnings("unchecked") + Map headers = (Map)args[0]; + headers.put("X-EngineIO", "foo"); + } + }).on(Transport.EVENT_RESPONSE_HEADERS, new Emitter.Listener() { + @Override + public void call(Object... args) { + @SuppressWarnings("unchecked") + Map headers = (Map)args[0]; + messages.offer(headers.get("X-EngineIO")); + } + }); + } + }); socket.open(); - assertThat(messages.take(), is("hello client")); - assertThat(messages.take(), is(opts.cookie)); + assertThat(messages.take(), is("foo")); + assertThat(messages.take(), is("foo")); socket.close(); } } diff --git a/src/test/resources/index.js b/src/test/resources/index.js index 86732b9..b179185 100644 --- a/src/test/resources/index.js +++ b/src/test/resources/index.js @@ -1,36 +1,28 @@ -var engine = require('engine.io') - , port = parseInt(process.argv[2], 10) || 3000 - , server = engine.listen(port, function() { - console.log('Engine.IO server listening on port', port); - }); +var engine = require('engine.io'); + +var port = parseInt(process.argv[2], 10) || 3000 +var server = engine.listen(port, function() { + console.log('Engine.IO server listening on port', port); +}); server.on('connection', function(socket) { socket.send('hello client'); - if (socket.request.headers.cookie) { - console.log('cookie:', socket.request.headers.cookie); - socket.send(socket.request.headers.cookie); - } - - socket.on('packet', function(packet) { - console.log('packet:', packet); - }); - - socket.on('packetCreate', function(packet) { - console.log('packetCreate:', packet); - }); - socket.on('message', function(message) { - console.log('message:', message); socket.send(message); }); - socket.on('close', function(reason, desc) { - console.log('close:', reason, desc); - }); - socket.on('error', function(err) { - console.log('error:', err); + throw err; }); }); +var handleRequest = server.handleRequest; +server.handleRequest = function(req, res) { + var header = req.headers['x-engineio']; + if (header) { + res.setHeader('X-EngineIO', header); + } + + handleRequest.call(this, req, res); +};