remove cookie option, and add support for handling headers

This commit is contained in:
Naoyuki Kanezawa
2014-02-01 03:38:10 +09:00
parent 84f97d8caf
commit 4e4d51bd04
5 changed files with 109 additions and 62 deletions

View File

@@ -76,6 +76,11 @@ public abstract class Socket extends Emitter {
public static final String EVENT_HEARTBEAT = "heartbeat"; public static final String EVENT_HEARTBEAT = "heartbeat";
public static final String EVENT_DATA = "data"; public static final String EVENT_DATA = "data";
/**
* Called on a new transport is created.
*/
public static final String EVENT_TRANSPORT = "transport";
private static final Runnable noop = new Runnable() { private static final Runnable noop = new Runnable() {
@Override @Override
public void run() {} public void run() {}
@@ -99,7 +104,6 @@ public abstract class Socket extends Emitter {
private String hostname; private String hostname;
private String path; private String path;
private String timestampParam; private String timestampParam;
private String cookie;
private List<String> transports; private List<String> transports;
private List<String> upgrades; private List<String> upgrades;
private Map<String, String> query; private Map<String, String> query;
@@ -163,7 +167,6 @@ public abstract class Socket extends Emitter {
this.transports = new ArrayList<String>(Arrays.asList(opts.transports != null ? this.transports = new ArrayList<String>(Arrays.asList(opts.transports != null ?
opts.transports : new String[]{Polling.NAME, WebSocket.NAME})); opts.transports : new String[]{Polling.NAME, WebSocket.NAME}));
this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843;
this.cookie = opts.cookie;
} }
/** /**
@@ -201,7 +204,6 @@ public abstract class Socket extends Emitter {
opts.timestampRequests = this.timestampRequests; opts.timestampRequests = this.timestampRequests;
opts.timestampParam = this.timestampParam; opts.timestampParam = this.timestampParam;
opts.policyPort = this.policyPort; opts.policyPort = this.policyPort;
opts.cookie = this.cookie;
if (WebSocket.NAME.equals(name)) { if (WebSocket.NAME.equals(name)) {
return new WebSocket(opts); return new WebSocket(opts);
@@ -223,6 +225,8 @@ public abstract class Socket extends Emitter {
this.transport = transport; this.transport = transport;
self.emit(EVENT_TRANSPORT, transport);
transport.on(Transport.EVENT_DRAIN, new Listener() { transport.on(Transport.EVENT_DRAIN, new Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {

View File

@@ -23,6 +23,8 @@ public abstract class Transport extends Emitter {
public static final String EVENT_PACKET = "packet"; public static final String EVENT_PACKET = "packet";
public static final String EVENT_DRAIN = "drain"; public static final String EVENT_DRAIN = "drain";
public static final String EVENT_ERROR = "error"; public static final String EVENT_ERROR = "error";
public static final String EVENT_REQUEST_HEADERS = "requestHeaders";
public static final String EVENT_RESPONSE_HEADERS = "responseHeaders";
public boolean writable; public boolean writable;
public String name; public String name;
@@ -121,11 +123,6 @@ public abstract class Transport extends Emitter {
public static class Options { public static class Options {
/**
* Cookie value for handshake.
*/
public String cookie;
public String hostname; public String hostname;
public String path; public String path;
public String timestampParam; public String timestampParam;

View File

@@ -7,6 +7,8 @@ import com.github.nkzawa.engineio.client.EventThread;
import java.io.*; import java.io.*;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -17,11 +19,9 @@ public class PollingXHR extends Polling {
private Request sendXhr; private Request sendXhr;
private Request pollXhr; private Request pollXhr;
private String cookie;
public PollingXHR(Options opts) { public PollingXHR(Options opts) {
super(opts); super(opts);
this.cookie = opts.cookie;
} }
protected Request request() { protected Request request() {
@@ -33,8 +33,34 @@ public class PollingXHR extends Polling {
opts = new Request.Options(); opts = new Request.Options();
} }
opts.uri = this.uri(); opts.uri = this.uri();
opts.cookie = this.cookie;
return new Request(opts); Request req = new Request(opts);
final PollingXHR self = this;
req.on(Request.EVENT_REQUEST_HEADERS, new Listener() {
@Override
public void call(Object... args) {
// Never execute asynchronously for support to modify headers.
@SuppressWarnings("unchecked")
Map<String, String> headers = args.length > 0 && args[0] instanceof Map ?
(Map<String, String>)args[0] : new HashMap<String, String>();
self.emit(EVENT_REQUEST_HEADERS, headers);
}
}).on(Request.EVENT_RESPONSE_HEADERS, new Listener() {
@Override
public void call(final Object... args) {
EventThread.exec(new Runnable() {
@Override
public void run() {
@SuppressWarnings("unchecked")
final Map<String, String> headers = args.length > 0 && args[0] instanceof Map ?
(Map<String, String>)args[0] : new HashMap<String, String>();
self.emit(EVENT_RESPONSE_HEADERS, headers);
}
});
}
});
return req;
} }
protected void doWrite(String data, final Runnable fn) { protected void doWrite(String data, final Runnable fn) {
@@ -107,20 +133,20 @@ public class PollingXHR extends Polling {
public static final String EVENT_SUCCESS = "success"; public static final String EVENT_SUCCESS = "success";
public static final String EVENT_DATA = "data"; public static final String EVENT_DATA = "data";
public static final String EVENT_ERROR = "error"; public static final String EVENT_ERROR = "error";
public static final String EVENT_REQUEST_HEADERS = "requestHeaders";
public static final String EVENT_RESPONSE_HEADERS = "responseHeaders";
private static final ExecutorService xhrService = Executors.newCachedThreadPool(); private static final ExecutorService xhrService = Executors.newCachedThreadPool();
String method; String method;
String uri; String uri;
String data; String data;
String cookie;
HttpURLConnection xhr; HttpURLConnection xhr;
public Request(Options opts) { public Request(Options opts) {
this.method = opts.method != null ? opts.method : "GET"; this.method = opts.method != null ? opts.method : "GET";
this.uri = opts.uri; this.uri = opts.uri;
this.data = opts.data; this.data = opts.data;
this.cookie = opts.cookie;
} }
public void create() { public void create() {
@@ -135,13 +161,16 @@ public class PollingXHR extends Polling {
return; return;
} }
Map<String, String> headers = new HashMap<String, String>();
if ("POST".equals(this.method)) { if ("POST".equals(this.method)) {
xhr.setDoOutput(true); xhr.setDoOutput(true);
xhr.setRequestProperty("Content-type", "text/plain;charset=UTF-8"); headers.put("Content-type", "text/plain;charset=UTF-8");
} }
if (this.cookie != null) { self.onRequestHeaders(headers);
xhr.setRequestProperty("Cookie", this.cookie); for (Map.Entry<String, String> header : headers.entrySet()) {
xhr.setRequestProperty(header.getKey(), header.getValue());
} }
logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data)); logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data));
@@ -159,6 +188,12 @@ public class PollingXHR extends Polling {
writer.flush(); writer.flush();
} }
Map<String, String> headers = new HashMap<String, String>();
for (String key : xhr.getHeaderFields().keySet()) {
headers.put(key, xhr.getHeaderField(key));
}
self.onResponseHeaders(headers);
StringBuilder data = null; StringBuilder data = null;
final int statusCode = xhr.getResponseCode(); final int statusCode = xhr.getResponseCode();
@@ -205,6 +240,14 @@ public class PollingXHR extends Polling {
this.cleanup(); this.cleanup();
} }
private void onRequestHeaders(Map<String, String> headers) {
this.emit(EVENT_REQUEST_HEADERS, headers);
}
private void onResponseHeaders(Map<String, String> headers) {
this.emit(EVENT_RESPONSE_HEADERS, headers);
}
private void cleanup() { private void cleanup() {
if (xhr != null) { if (xhr != null) {
xhr.disconnect(); xhr.disconnect();
@@ -221,7 +264,6 @@ public class PollingXHR extends Polling {
public String uri; public String uri;
public String method; public String method;
public String data; public String data;
public String cookie;
} }
} }
} }

View File

@@ -1,6 +1,8 @@
package com.github.nkzawa.engineio.client; package com.github.nkzawa.engineio.client;
import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.transports.Polling;
import com.github.nkzawa.engineio.client.transports.PollingXHR;
import com.github.nkzawa.engineio.parser.HandshakeData; import com.github.nkzawa.engineio.parser.HandshakeData;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@@ -12,6 +14,7 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@@ -93,7 +96,6 @@ public class ServerConnectionTest {
socket = new Socket("ws://localhost:" + PORT) { socket = new Socket("ws://localhost:" + PORT) {
@Override @Override
public void onopen() { public void onopen() {
System.out.println("onopen:");
events.offer("onopen"); events.offer("onopen");
} }
@@ -102,7 +104,6 @@ public class ServerConnectionTest {
@Override @Override
public void onclose() { public void onclose() {
System.out.println("onclose:");
events.offer("onclose"); events.offer("onclose");
} }
@@ -123,19 +124,16 @@ public class ServerConnectionTest {
socket = new Socket("ws://localhost:" + PORT) { socket = new Socket("ws://localhost:" + PORT) {
@Override @Override
public void onopen() { public void onopen() {
System.out.println("onopen:");
socket.send("hi"); socket.send("hi");
} }
@Override @Override
public void onmessage(String data) { public void onmessage(String data) {
System.out.println("onmessage: " + data);
events.offer(data); events.offer(data);
} }
@Override @Override
public void onclose() {} public void onclose() {}
@Override @Override
public void onerror(Exception err) {} public void onerror(Exception err) {}
}; };
@@ -163,7 +161,6 @@ public class ServerConnectionTest {
socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() { socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
System.out.println(String.format("on handshake: %s", args.length));
events.offer(args); events.offer(args);
} }
}); });
@@ -178,7 +175,7 @@ public class ServerConnectionTest {
assertThat(data.upgrades, is(notNullValue())); assertThat(data.upgrades, is(notNullValue()));
assertThat(data.upgrades, is(not(empty()))); assertThat(data.upgrades, is(not(empty())));
assertThat(data.pingTimeout, is(greaterThan((long)0))); assertThat(data.pingTimeout, is(greaterThan((long)0)));
assertThat(data.pingInterval, is(greaterThan((long)0))); assertThat(data.pingInterval, is(greaterThan((long) 0)));
socket.close(); socket.close();
} }
@@ -200,14 +197,12 @@ public class ServerConnectionTest {
socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() { socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
System.out.println(String.format("on upgrading: %s", args.length));
events.offer(args); events.offer(args);
} }
}); });
socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
System.out.println(String.format("on upgrade: %s", args.length));
events.offer(args); events.offer(args);
} }
}); });
@@ -229,32 +224,49 @@ public class ServerConnectionTest {
} }
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void cookie() throws URISyntaxException, InterruptedException { public void pollingHeaders() throws URISyntaxException, InterruptedException {
final BlockingQueue<String> messages = new LinkedBlockingQueue<String>(); final BlockingQueue<String> messages = new LinkedBlockingQueue<String>();
Socket.Options opts = new Socket.Options(); Socket.Options opts = new Socket.Options();
opts.cookie = "foo=1;"; opts.transports = new String[] {Polling.NAME};
socket = new Socket("ws://localhost:" + PORT, opts) { socket = new Socket("ws://localhost:" + PORT, opts) {
@Override @Override
public void onopen() {} public void onopen() {}
@Override @Override
public void onmessage(String data) { public void onmessage(String data) {}
System.out.println("onmessage: " + data);
messages.offer(data);
}
@Override @Override
public void onclose() {} public void onclose() {}
@Override @Override
public void onerror(Exception err) {} public void onerror(Exception err) {}
}; };
socket.on(Socket.EVENT_TRANSPORT, new Emitter.Listener() {
@Override
public void call(Object... args) {
Transport transport = (Transport)args[0];
if (!(transport instanceof PollingXHR)) return;
transport.on(Transport.EVENT_REQUEST_HEADERS, new Emitter.Listener() {
@Override
public void call(Object... args) {
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>)args[0];
headers.put("X-EngineIO", "foo");
}
}).on(Transport.EVENT_RESPONSE_HEADERS, new Emitter.Listener() {
@Override
public void call(Object... args) {
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>)args[0];
messages.offer(headers.get("X-EngineIO"));
}
});
}
});
socket.open(); socket.open();
assertThat(messages.take(), is("hello client")); assertThat(messages.take(), is("foo"));
assertThat(messages.take(), is(opts.cookie)); assertThat(messages.take(), is("foo"));
socket.close(); socket.close();
} }
} }

View File

@@ -1,36 +1,28 @@
var engine = require('engine.io') var engine = require('engine.io');
, port = parseInt(process.argv[2], 10) || 3000
, server = engine.listen(port, function() { var port = parseInt(process.argv[2], 10) || 3000
var server = engine.listen(port, function() {
console.log('Engine.IO server listening on port', port); console.log('Engine.IO server listening on port', port);
}); });
server.on('connection', function(socket) { server.on('connection', function(socket) {
socket.send('hello client'); socket.send('hello client');
if (socket.request.headers.cookie) {
console.log('cookie:', socket.request.headers.cookie);
socket.send(socket.request.headers.cookie);
}
socket.on('packet', function(packet) {
console.log('packet:', packet);
});
socket.on('packetCreate', function(packet) {
console.log('packetCreate:', packet);
});
socket.on('message', function(message) { socket.on('message', function(message) {
console.log('message:', message);
socket.send(message); socket.send(message);
}); });
socket.on('close', function(reason, desc) {
console.log('close:', reason, desc);
});
socket.on('error', function(err) { socket.on('error', function(err) {
console.log('error:', err); throw err;
}); });
}); });
var handleRequest = server.handleRequest;
server.handleRequest = function(req, res) {
var header = req.headers['x-engineio'];
if (header) {
res.setHeader('X-EngineIO', header);
}
handleRequest.call(this, req, res);
};