convert for virtual thread

This commit is contained in:
NetRiceCake
2025-12-07 03:32:30 +09:00
parent 4739f63391
commit b2bd90097b
6 changed files with 44 additions and 67 deletions

View File

@@ -6,6 +6,8 @@
<U>**device uuid 무조건 바꾸시오.**</U> <U>**device uuid 무조건 바꾸시오.**</U>
자바21에 추가된 virtual thread 사용하도록 바꾸는중...
## Example ## Example
![ex](./ex.png) ![ex](./ex.png)
@@ -72,7 +74,8 @@ TalkClient client = new TalkClient(email, password, deviceName, deviceUuid, new
``` ```
./gradlew jar ./gradlew jar
``` ```
jdk 필요 jdk21 이상 필요
jar 파일은 build/libs 디렉터리 안에 생성됩니다. jar 파일은 build/libs 디렉터리 안에 생성됩니다.
## Usage ## Usage

View File

@@ -21,6 +21,10 @@ public class Main {
static String deviceUuid = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; // 64자 랜덤 hex-string, 이것도 에시니까 무조건 다른걸로 바꾸세요. static String deviceUuid = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; // 64자 랜덤 hex-string, 이것도 에시니까 무조건 다른걸로 바꾸세요.
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
System.setProperty("jdk.virtualThreadScheduler.parallelism", "1");
System.setProperty("jdk.virtualThreadScheduler.maxPoolSize", "1");
TalkClient client = new TalkClient(email, password, deviceName, deviceUuid, new TalkHandler() { TalkClient client = new TalkClient(email, password, deviceName, deviceUuid, new TalkHandler() {
@Override @Override
public void onMessage(Message msg) { public void onMessage(Message msg) {

View File

@@ -32,8 +32,6 @@ import java.security.MessageDigest;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TalkClient { public class TalkClient {
@@ -54,8 +52,6 @@ public class TalkClient {
private CheckInIn checkInData; private CheckInIn checkInData;
private LoginListIn loginListData; private LoginListIn loginListData;
private ExecutorService locoHandlerPool;
@Getter @Getter
private TalkHandler talkHandler; private TalkHandler talkHandler;
@@ -95,7 +91,7 @@ public class TalkClient {
public void onError(Exception e) { public void onError(Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
}, Executors.newFixedThreadPool(1)); });
byte[] body = new CheckInOut(loginData.userId).toBson(); byte[] body = new CheckInOut(loginData.userId).toBson();
checkInSocket.connect(); checkInSocket.connect();
LocoPacket checkinResponse = checkInSocket.writeAndRead(new LocoPacket(1000, "CHECKIN", body)); LocoPacket checkinResponse = checkInSocket.writeAndRead(new LocoPacket(1000, "CHECKIN", body));
@@ -115,9 +111,7 @@ public class TalkClient {
rp = ByteUtil.hexStringToByteArray("0100ffff0100"); // 이게 도대체 뭐임 rp = ByteUtil.hexStringToByteArray("0100ffff0100"); // 이게 도대체 뭐임
} }
locoHandlerPool = Executors.newFixedThreadPool(1); socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this));
socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this), locoHandlerPool);
socket.connect(); socket.connect();
LoginListOut req = new LoginListOut(); LoginListOut req = new LoginListOut();
req.setDuuid(deviceUuid); req.setDuuid(deviceUuid);
@@ -139,7 +133,7 @@ public class TalkClient {
connected = true; connected = true;
new Thread(() -> { Thread.ofVirtual().start(() -> {
try { try {
while (socket.isAlive()) { while (socket.isAlive()) {
Thread.sleep(5 * 60 * 1000); Thread.sleep(5 * 60 * 1000);
@@ -150,7 +144,7 @@ public class TalkClient {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
}).start(); });
} }
public boolean sendMessage(long chatId, int type, String message, String extra) { public boolean sendMessage(long chatId, int type, String message, String extra) {
@@ -188,7 +182,7 @@ public class TalkClient {
int status = jsonObject.get("status").getAsInt(); int status = jsonObject.get("status").getAsInt();
future.complete(status); future.complete(status);
} }
}, Executors.newFixedThreadPool(1)); });
postSocket.connect(); postSocket.connect();
PostOut po = new PostOut(); PostOut po = new PostOut();

View File

@@ -1,13 +0,0 @@
package com.github.netricecake.loco;
public class LocoSocektHandler {
public void onPacket(LocoPacket packet) {}
public void onConnect() {}
public void onDisconnect() {}
public void onError(Exception e) {}
}

View File

@@ -34,19 +34,16 @@ public class LocoSocket {
@Getter @Getter
private boolean alive = false; private boolean alive = false;
private final LocoSocketHandler locoSocektHandler; private final LocoSocketHandler locoSocketHandler;
private final ExecutorService handlerPool;
private final Map<Integer, Future<LocoPacket>> waitList = new HashMap<>(); private final Map<Integer, Future<LocoPacket>> waitList = new HashMap<>();
private int packetIdCounter = 1000; private int packetIdCounter = 1000;
public LocoSocket(String ip, int port, LocoSocketHandler locoSocektHandler, ExecutorService handlerPool) { public LocoSocket(String ip, int port, LocoSocketHandler locoSocketHandler) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.locoSocektHandler = locoSocektHandler; this.locoSocketHandler = locoSocketHandler;
this.handlerPool = handlerPool;
} }
public void connect() throws IOException { public void connect() throws IOException {
@@ -69,26 +66,27 @@ public class LocoSocket {
alive = true; alive = true;
channel.writeAndFlush(cryptoManager.generateHandshakeMessage()).sync(); channel.writeAndFlush(cryptoManager.generateHandshakeMessage()).sync();
channel.pipeline().addLast(new SecureLayerCodec(cryptoManager)); channel.pipeline().addLast(new SecureLayerCodec(cryptoManager));
channel.pipeline().addLast(new LocoCodec(locoSocektHandler, handlerPool, waitList)); channel.pipeline().addLast(new LocoCodec(locoSocketHandler, waitList));
handlerPool.execute(locoSocektHandler::onConnect); Thread.ofVirtual().start(locoSocketHandler::onConnect);
new Thread() {
@Override final CompletableFuture<?> closeFuture = new CompletableFuture<>(); // 네티 퓨처 sync함수가 virtual thread에서 제대로 작동하지 않습니다.(쓰레드 양보를 안함) 그래서 이렇게 해야됨
public void run() { channel.closeFuture().addListener(future -> {
try { closeFuture.complete(null);
channel.closeFuture().sync(); });
eventLoopGroup.shutdownGracefully(); Thread.ofVirtual().start(() -> {
handlerPool.execute(locoSocektHandler::onDisconnect); try {
alive = false; closeFuture.get();
} catch (Exception e) { eventLoopGroup.shutdownGracefully();
handlerPool.execute(() -> { locoSocketHandler.onDisconnect();
locoSocektHandler.onError(e); alive = false;
}); } catch (Exception e) {
} locoSocketHandler.onError(e);
} }
}.start(); });
} catch (InterruptedException e) { } catch (InterruptedException e) {
handlerPool.execute(() -> { Thread.ofVirtual().start(() -> {
locoSocektHandler.onError(e); locoSocketHandler.onError(e);
}); });
} }
} }
@@ -116,21 +114,16 @@ public class LocoSocket {
waitList.remove(packetId); waitList.remove(packetId);
return result; return result;
} catch (Exception e) { } catch (Exception e) {
handlerPool.execute(() -> { locoSocketHandler.onError(e);
locoSocektHandler.onError(e);
});
} }
return null; return null;
} }
public void close() { public void close() {
if (!alive) return; if (!alive) return;
handlerPool.execute(() -> {
locoSocektHandler.onDisconnect();
});
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
channel.close(); channel.close();
handlerPool.execute(locoSocektHandler::onDisconnect); Thread.ofVirtual().start(locoSocketHandler::onDisconnect);
alive = false; alive = false;
} }

View File

@@ -10,7 +10,6 @@ import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> { public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
@@ -20,13 +19,10 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
private final Map<Integer, Future<LocoPacket>> waitList; private final Map<Integer, Future<LocoPacket>> waitList;
private final LocoSocketHandler locoSocektHandler; private final LocoSocketHandler locoSocketHandler;
private final ExecutorService handlerPool; public LocoCodec(LocoSocketHandler locoSocketHandler, Map<Integer, Future<LocoPacket>> waitList) {
this.locoSocketHandler = locoSocketHandler;
public LocoCodec(LocoSocketHandler locoSocektHandler, ExecutorService handlerPool, Map<Integer, Future<LocoPacket>> waitList) {
this.locoSocektHandler = locoSocektHandler;
this.handlerPool = handlerPool;
this.waitList = waitList; this.waitList = waitList;
} }
@@ -68,16 +64,16 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
} }
if (currentLocoPacket.getBodyLength() > buffer.length) break; if (currentLocoPacket.getBodyLength() > buffer.length) break;
byte[] body = ByteUtil.sliceBytes(buffer, 0, currentLocoPacket.getBodyLength()); byte[] body = ByteUtil.sliceBytes(buffer, 0, currentLocoPacket.getBodyLength());
//System.out.println(currentLocoPacket.getMethod()); System.out.println(currentLocoPacket.getMethod());
//System.out.println(BsonUtil.bsonToJson(body)); System.out.println(BsonUtil.bsonToJson(body));
buffer = ByteUtil.sliceBytes(buffer, currentLocoPacket.getBodyLength(), buffer.length - currentLocoPacket.getBodyLength()); buffer = ByteUtil.sliceBytes(buffer, currentLocoPacket.getBodyLength(), buffer.length - currentLocoPacket.getBodyLength());
currentLocoPacket.setBody(body); currentLocoPacket.setBody(body);
if (waitList.containsKey(currentLocoPacket.getPacketId())) { if (waitList.containsKey(currentLocoPacket.getPacketId())) {
((CompletableFuture<LocoPacket>) waitList.get(currentLocoPacket.getPacketId())).complete(currentLocoPacket); ((CompletableFuture<LocoPacket>) waitList.get(currentLocoPacket.getPacketId())).complete(currentLocoPacket);
} else { } else {
final LocoPacket p = currentLocoPacket; final LocoPacket p = currentLocoPacket;
handlerPool.execute(() -> { Thread.ofVirtual().start(() -> {
locoSocektHandler.onPacket(p); locoSocketHandler.onPacket(p);
}); });
} }
currentLocoPacket = null; currentLocoPacket = null;