Virtual thread related
Using asynchronous methods for logic such as checking if room information exists may lead to unexpected issues.
This commit is contained in:
@@ -32,6 +32,7 @@ import java.security.MessageDigest;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class TalkClient {
|
||||
|
||||
@@ -91,7 +92,7 @@ public class TalkClient {
|
||||
public void onError(Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}, Executors.newFixedThreadPool(1));
|
||||
byte[] body = new CheckInOut(loginData.userId).toBson();
|
||||
checkInSocket.connect();
|
||||
LocoPacket checkinResponse = checkInSocket.writeAndRead(new LocoPacket(1000, "CHECKIN", body));
|
||||
@@ -111,7 +112,7 @@ public class TalkClient {
|
||||
rp = ByteUtil.hexStringToByteArray("0100ffff0100"); // 이게 도대체 뭐임
|
||||
}
|
||||
|
||||
socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this));
|
||||
socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this), Executors.newFixedThreadPool(1));
|
||||
socket.connect();
|
||||
LoginListOut req = new LoginListOut();
|
||||
req.setDuuid(deviceUuid);
|
||||
@@ -182,7 +183,7 @@ public class TalkClient {
|
||||
int status = jsonObject.get("status").getAsInt();
|
||||
future.complete(status);
|
||||
}
|
||||
});
|
||||
}, Executors.newFixedThreadPool(1));
|
||||
postSocket.connect();
|
||||
|
||||
PostOut po = new PostOut();
|
||||
|
||||
@@ -36,14 +36,17 @@ public class LocoSocket {
|
||||
|
||||
private final LocoSocketHandler locoSocketHandler;
|
||||
|
||||
private final ExecutorService handlerLoop;
|
||||
|
||||
private final Map<Integer, Future<LocoPacket>> waitList = new HashMap<>();
|
||||
|
||||
private int packetIdCounter = 1000;
|
||||
|
||||
public LocoSocket(String ip, int port, LocoSocketHandler locoSocketHandler) {
|
||||
public LocoSocket(String ip, int port, LocoSocketHandler locoSocketHandler, ExecutorService handlerLoop) {
|
||||
this.ip = ip;
|
||||
this.port = port;
|
||||
this.locoSocketHandler = locoSocketHandler;
|
||||
this.handlerLoop = handlerLoop;
|
||||
}
|
||||
|
||||
public void connect() throws IOException {
|
||||
@@ -66,10 +69,10 @@ public class LocoSocket {
|
||||
alive = true;
|
||||
channel.writeAndFlush(cryptoManager.generateHandshakeMessage()).sync();
|
||||
channel.pipeline().addLast(new SecureLayerCodec(cryptoManager));
|
||||
channel.pipeline().addLast(new LocoCodec(locoSocketHandler, waitList));
|
||||
Thread.ofVirtual().start(locoSocketHandler::onConnect);
|
||||
channel.pipeline().addLast(new LocoCodec(locoSocketHandler, waitList, handlerLoop));
|
||||
handlerLoop.execute(locoSocketHandler::onConnect);
|
||||
|
||||
final CompletableFuture<?> closeFuture = new CompletableFuture<>(); // 네티 퓨처 sync함수가 virtual thread에서 제대로 작동하지 않습니다.(쓰레드 양보를 안함) 그래서 이렇게 해야됨
|
||||
final CompletableFuture<?> closeFuture = new CompletableFuture<>(); // 네티 퓨처 sync함수가 virtual thread에서 제대로 작동하지 않습니다.(쓰레드 양보를 안함)
|
||||
channel.closeFuture().addListener(future -> {
|
||||
closeFuture.complete(null);
|
||||
});
|
||||
@@ -77,7 +80,7 @@ public class LocoSocket {
|
||||
try {
|
||||
closeFuture.get();
|
||||
eventLoopGroup.shutdownGracefully();
|
||||
locoSocketHandler.onDisconnect();
|
||||
handlerLoop.execute(locoSocketHandler::onDisconnect);
|
||||
alive = false;
|
||||
} catch (Exception e) {
|
||||
locoSocketHandler.onError(e);
|
||||
@@ -85,7 +88,7 @@ public class LocoSocket {
|
||||
});
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.ofVirtual().start(() -> {
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onError(e);
|
||||
});
|
||||
}
|
||||
@@ -114,7 +117,9 @@ public class LocoSocket {
|
||||
waitList.remove(packetId);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onError(e);
|
||||
});
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -123,7 +128,7 @@ public class LocoSocket {
|
||||
if (!alive) return;
|
||||
eventLoopGroup.shutdownGracefully();
|
||||
channel.close();
|
||||
Thread.ofVirtual().start(locoSocketHandler::onDisconnect);
|
||||
handlerLoop.execute(locoSocketHandler::onDisconnect);
|
||||
alive = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.netty.handler.codec.MessageToMessageCodec;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
|
||||
@@ -21,9 +22,12 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
|
||||
|
||||
private final LocoSocketHandler locoSocketHandler;
|
||||
|
||||
public LocoCodec(LocoSocketHandler locoSocketHandler, Map<Integer, Future<LocoPacket>> waitList) {
|
||||
private final ExecutorService handlerLoop;
|
||||
|
||||
public LocoCodec(LocoSocketHandler locoSocketHandler, Map<Integer, Future<LocoPacket>> waitList, ExecutorService handlerLoop) {
|
||||
this.locoSocketHandler = locoSocketHandler;
|
||||
this.waitList = waitList;
|
||||
this.handlerLoop = handlerLoop;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -72,7 +76,7 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
|
||||
((CompletableFuture<LocoPacket>) waitList.get(currentLocoPacket.getPacketId())).complete(currentLocoPacket);
|
||||
} else {
|
||||
final LocoPacket p = currentLocoPacket;
|
||||
Thread.ofVirtual().start(() -> {
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onPacket(p);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user