okhttp-ws into okhttp

This commit is contained in:
Yu-Hsuan Lin
2016-12-02 16:26:28 +08:00
parent 8f8e060037
commit 657d8e89b5
2 changed files with 30 additions and 55 deletions

View File

@@ -48,8 +48,8 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.squareup.okhttp3</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-ws</artifactId> <artifactId>okhttp</artifactId>
<version>3.4.2</version> <version>3.5.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.json</groupId> <groupId>org.json</groupId>

View File

@@ -9,9 +9,7 @@ import io.socket.thread.EventThread;
import io.socket.utf8.UTF8Exception; import io.socket.utf8.UTF8Exception;
import io.socket.yeast.Yeast; import io.socket.yeast.Yeast;
import okhttp3.*; import okhttp3.*;
import okhttp3.ws.WebSocketCall; import okio.ByteString;
import okhttp3.ws.WebSocketListener;
import okio.Buffer;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLSocketFactory;
import java.io.IOException; import java.io.IOException;
@@ -22,8 +20,6 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Logger; import java.util.logging.Logger;
import static okhttp3.ws.WebSocket.BINARY;
import static okhttp3.ws.WebSocket.TEXT;
public class WebSocket extends Transport { public class WebSocket extends Transport {
@@ -31,8 +27,7 @@ public class WebSocket extends Transport {
private static final Logger logger = Logger.getLogger(PollingXHR.class.getName()); private static final Logger logger = Logger.getLogger(PollingXHR.class.getName());
private okhttp3.ws.WebSocket ws; private okhttp3.WebSocket ws;
private WebSocketCall wsCall;
public WebSocket(Options opts) { public WebSocket(Options opts) {
super(opts); super(opts);
@@ -80,11 +75,9 @@ public class WebSocket extends Transport {
} }
final Request request = builder.build(); final Request request = builder.build();
final OkHttpClient client = clientBuilder.build(); final OkHttpClient client = clientBuilder.build();
wsCall = WebSocketCall.create(client, request); ws = client.newWebSocket(request, new WebSocketListener() {
wsCall.enqueue(new WebSocketListener() {
@Override @Override
public void onOpen(okhttp3.ws.WebSocket webSocket, Response response) { public void onOpen(okhttp3.WebSocket webSocket, Response response) {
ws = webSocket;
final Map<String, List<String>> headers = response.headers().toMultimap(); final Map<String, List<String>> headers = response.headers().toMultimap();
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
@@ -96,44 +89,33 @@ public class WebSocket extends Transport {
} }
@Override @Override
public void onMessage(final ResponseBody responseBody) throws IOException { public void onMessage(okhttp3.WebSocket webSocket, final String text) {
Object data = null; if (text == null) {
if (responseBody.contentType() == TEXT) { return;
data = responseBody.string();
} else if (responseBody.contentType() == BINARY) {
data = responseBody.source().readByteArray();
} else {
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onError("Unknown payload type: " + responseBody.contentType(), new IllegalStateException());
}
});
} }
responseBody.source().close();
final Object finalData = data;
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
if (finalData == null) { self.onData(text);
return;
}
if (finalData instanceof String) {
self.onData((String) finalData);
} else {
self.onData((byte[]) finalData);
}
} }
}); });
} }
@Override @Override
public void onPong(Buffer payload) { public void onMessage(okhttp3.WebSocket webSocket, final ByteString bytes) {
if (bytes == null) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onData(bytes.toByteArray());
}
});
} }
@Override @Override
public void onClose(int code, String reason) { public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
@@ -143,11 +125,14 @@ public class WebSocket extends Transport {
} }
@Override @Override
public void onFailure(final IOException e, final Response response) { public void onFailure(okhttp3.WebSocket webSocket, final Throwable t, Response response) {
if (!(t instanceof Exception)) {
return;
}
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
self.onError("websocket error", e); self.onError("websocket error", (Exception) t);
} }
}); });
} }
@@ -186,15 +171,12 @@ public class WebSocket extends Transport {
public void call(Object packet) { public void call(Object packet) {
try { try {
if (packet instanceof String) { if (packet instanceof String) {
self.ws.sendMessage(RequestBody.create(TEXT, (String) packet)); self.ws.send((String) packet);
} else if (packet instanceof byte[]) { } else if (packet instanceof byte[]) {
self.ws.sendMessage(RequestBody.create(BINARY, (byte[]) packet)); self.ws.send(ByteString.of((byte[]) packet));
} }
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
logger.fine("websocket closed before we could write"); logger.fine("websocket closed before we could write");
} catch (IOException e) {
logger.fine("websocket closed before onclose event");
doClose();
} }
if (0 == --total[0]) done.run(); if (0 == --total[0]) done.run();
@@ -203,23 +185,16 @@ public class WebSocket extends Transport {
} }
} }
@Override
protected void onClose() {
super.onClose();
}
protected void doClose() { protected void doClose() {
if (ws != null) { if (ws != null) {
try { try {
ws.close(1000, ""); ws.close(1000, "");
} catch (IOException e) {
// websocket already closed
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// websocket already closed // websocket already closed
} }
} }
if (wsCall != null) { if (ws != null) {
wsCall.cancel(); ws.cancel();
} }
} }