Merge branch 'java21'
The user handler now runs on virtual threads. We don't need to configure a thread pool, and even with blocking I/O inside the handler, it works without blocking the system—similar to how Node.js behaves.
This commit is contained in:
@@ -72,11 +72,14 @@ TalkClient client = new TalkClient(email, password, deviceName, deviceUuid, new
|
||||
```
|
||||
./gradlew jar
|
||||
```
|
||||
jdk 필요
|
||||
jdk21 이상 필요
|
||||
|
||||
jar 파일은 build/libs 디렉터리 안에 생성됩니다.
|
||||
|
||||
## Usage
|
||||
|
||||
자바21 이상 필요합니다.
|
||||
|
||||
첫 로그인시에 기기등록이 필요합니다. 콘솔창에 방법 나오니 따라하세요.
|
||||
|
||||
로그인하면 로그인 정보(토큰 등)가 email_deviceName 폴더 안에 저장됩니다. 서버 연결이 안되면 삭제하고 시도하세요.
|
||||
|
||||
@@ -4,6 +4,8 @@ plugins {
|
||||
|
||||
group = 'com.github.netricecake'
|
||||
version = '1.0-SNAPSHOT'
|
||||
sourceCompatibility = '21'
|
||||
targetCompatibility = '21'
|
||||
|
||||
jar {
|
||||
manifest {
|
||||
|
||||
@@ -21,6 +21,10 @@ public class Main {
|
||||
static String deviceUuid = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; // 64자 랜덤 hex-string, 이것도 에시니까 무조건 다른걸로 바꾸세요.
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
System.setProperty("jdk.virtualThreadScheduler.parallelism", "1");
|
||||
System.setProperty("jdk.virtualThreadScheduler.maxPoolSize", "1");
|
||||
|
||||
TalkClient client = new TalkClient(email, password, deviceName, deviceUuid, new TalkHandler() {
|
||||
@Override
|
||||
public void onMessage(Message msg) {
|
||||
|
||||
@@ -16,12 +16,10 @@ import com.github.netricecake.loco.packet.outbound.room.InfoLinkOut;
|
||||
import com.github.netricecake.loco.packet.outbound.message.MessageOut;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonObject;
|
||||
import lombok.Getter;
|
||||
|
||||
public class LocoSocketHandlerImpl extends LocoSocketHandler {
|
||||
|
||||
@Getter
|
||||
private TalkClient client;
|
||||
private final TalkClient client;
|
||||
|
||||
public LocoSocketHandlerImpl(TalkClient client) {
|
||||
this.client = client;
|
||||
@@ -41,7 +39,9 @@ public class LocoSocketHandlerImpl extends LocoSocketHandler {
|
||||
Member member = room.getMembers().get(in.getAuthorId());
|
||||
|
||||
Message msg = new Message(in.getLogId(), room, member, in.getType(), in.getMessage(), in.getAttachment());
|
||||
Thread.ofVirtual().start(() -> {
|
||||
client.getTalkHandler().onMessage(msg);
|
||||
});
|
||||
} else if (packet.getMethod().equals("NEWMEM")) {
|
||||
NewMemIn in = new NewMemIn();
|
||||
in.fromBson(packet.getBody());
|
||||
@@ -50,7 +50,9 @@ public class LocoSocketHandlerImpl extends LocoSocketHandler {
|
||||
|
||||
ChatRoom room = client.getChatRooms().get(in.getChatId());
|
||||
if (!room.getType().equals("OM")) return;
|
||||
Thread.ofVirtual().start(() -> {
|
||||
client.getTalkHandler().onNewMember(room, room.getMembers().get(in.getUserId()));
|
||||
});
|
||||
} else if (packet.getMethod().equals("DELMEM")) {
|
||||
DelMemIn in = new DelMemIn();
|
||||
in.fromBson(packet.getBody());
|
||||
@@ -58,7 +60,9 @@ public class LocoSocketHandlerImpl extends LocoSocketHandler {
|
||||
|
||||
ChatRoom room = client.getChatRooms().get(in.getChatId());
|
||||
if (!room.getType().equals("OM")) return;
|
||||
Thread.ofVirtual().start(() -> {
|
||||
client.getTalkHandler().onDelMember(room, new Member(in.getUserId(), in.getNickname(), 2));
|
||||
});
|
||||
room.getMembers().remove(in.getUserId());
|
||||
} else if (packet.getMethod().equals("SYNCLINKPF")) {
|
||||
SyncLinkPfIn si = new SyncLinkPfIn();
|
||||
|
||||
@@ -32,19 +32,18 @@ import java.security.MessageDigest;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
public class TalkClient {
|
||||
|
||||
private String email;
|
||||
private String password;
|
||||
private String deviceName;
|
||||
private String deviceUuid;
|
||||
private String sessionDir;
|
||||
private final String email;
|
||||
private final String password;
|
||||
private final String deviceName;
|
||||
private final String deviceUuid;
|
||||
private final String sessionDir;
|
||||
|
||||
@Getter
|
||||
private Map<Long, ChatRoom> chatRooms = new HashMap<>();
|
||||
private final Map<Long, ChatRoom> chatRooms = new HashMap<>();
|
||||
|
||||
@Getter
|
||||
protected boolean connected;
|
||||
@@ -54,8 +53,6 @@ public class TalkClient {
|
||||
private CheckInIn checkInData;
|
||||
private LoginListIn loginListData;
|
||||
|
||||
private ExecutorService locoHandlerPool;
|
||||
|
||||
@Getter
|
||||
private TalkHandler talkHandler;
|
||||
|
||||
@@ -115,9 +112,7 @@ public class TalkClient {
|
||||
rp = ByteUtil.hexStringToByteArray("0100ffff0100"); // 이게 도대체 뭐임
|
||||
}
|
||||
|
||||
locoHandlerPool = Executors.newFixedThreadPool(1);
|
||||
|
||||
socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this), locoHandlerPool);
|
||||
socket = new LocoSocket(checkInData.getHost(), checkInData.getPort(), new LocoSocketHandlerImpl(this), Executors.newFixedThreadPool(1));
|
||||
socket.connect();
|
||||
LoginListOut req = new LoginListOut();
|
||||
req.setDuuid(deviceUuid);
|
||||
@@ -139,7 +134,7 @@ public class TalkClient {
|
||||
|
||||
connected = true;
|
||||
|
||||
new Thread(() -> {
|
||||
Thread.ofVirtual().start(() -> {
|
||||
try {
|
||||
while (socket.isAlive()) {
|
||||
Thread.sleep(5 * 60 * 1000);
|
||||
@@ -150,7 +145,7 @@ public class TalkClient {
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
});
|
||||
}
|
||||
|
||||
public boolean sendMessage(long chatId, int type, String message, String extra) {
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
package com.github.netricecake.loco;
|
||||
|
||||
public class LocoSocektHandler {
|
||||
|
||||
public void onPacket(LocoPacket packet) {}
|
||||
|
||||
public void onConnect() {}
|
||||
|
||||
public void onDisconnect() {}
|
||||
|
||||
public void onError(Exception e) {}
|
||||
|
||||
}
|
||||
@@ -34,19 +34,19 @@ public class LocoSocket {
|
||||
@Getter
|
||||
private boolean alive = false;
|
||||
|
||||
private final LocoSocketHandler locoSocektHandler;
|
||||
private final LocoSocketHandler locoSocketHandler;
|
||||
|
||||
private final ExecutorService handlerPool;
|
||||
private final ExecutorService handlerLoop;
|
||||
|
||||
private final Map<Integer, Future<LocoPacket>> waitList = new HashMap<>();
|
||||
|
||||
private int packetIdCounter = 1000;
|
||||
|
||||
public LocoSocket(String ip, int port, LocoSocketHandler locoSocektHandler, ExecutorService handlerPool) {
|
||||
public LocoSocket(String ip, int port, LocoSocketHandler locoSocketHandler, ExecutorService handlerLoop) {
|
||||
this.ip = ip;
|
||||
this.port = port;
|
||||
this.locoSocektHandler = locoSocektHandler;
|
||||
this.handlerPool = handlerPool;
|
||||
this.locoSocketHandler = locoSocketHandler;
|
||||
this.handlerLoop = handlerLoop;
|
||||
}
|
||||
|
||||
public void connect() throws IOException {
|
||||
@@ -69,26 +69,27 @@ public class LocoSocket {
|
||||
alive = true;
|
||||
channel.writeAndFlush(cryptoManager.generateHandshakeMessage()).sync();
|
||||
channel.pipeline().addLast(new SecureLayerCodec(cryptoManager));
|
||||
channel.pipeline().addLast(new LocoCodec(locoSocektHandler, handlerPool, waitList));
|
||||
handlerPool.execute(locoSocektHandler::onConnect);
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
channel.pipeline().addLast(new LocoCodec(locoSocketHandler, waitList, handlerLoop));
|
||||
handlerLoop.execute(locoSocketHandler::onConnect);
|
||||
|
||||
final CompletableFuture<?> closeFuture = new CompletableFuture<>(); // 네티 퓨처 sync함수가 virtual thread에서 제대로 작동하지 않습니다.(쓰레드 양보를 안함)
|
||||
channel.closeFuture().addListener(future -> {
|
||||
closeFuture.complete(null);
|
||||
});
|
||||
Thread.ofVirtual().start(() -> {
|
||||
try {
|
||||
channel.closeFuture().sync();
|
||||
closeFuture.get();
|
||||
eventLoopGroup.shutdownGracefully();
|
||||
handlerPool.execute(locoSocektHandler::onDisconnect);
|
||||
handlerLoop.execute(locoSocketHandler::onDisconnect);
|
||||
alive = false;
|
||||
} catch (Exception e) {
|
||||
handlerPool.execute(() -> {
|
||||
locoSocektHandler.onError(e);
|
||||
locoSocketHandler.onError(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
handlerPool.execute(() -> {
|
||||
locoSocektHandler.onError(e);
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onError(e);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -116,8 +117,8 @@ public class LocoSocket {
|
||||
waitList.remove(packetId);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
handlerPool.execute(() -> {
|
||||
locoSocektHandler.onError(e);
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onError(e);
|
||||
});
|
||||
}
|
||||
return null;
|
||||
@@ -125,12 +126,9 @@ public class LocoSocket {
|
||||
|
||||
public void close() {
|
||||
if (!alive) return;
|
||||
handlerPool.execute(() -> {
|
||||
locoSocektHandler.onDisconnect();
|
||||
});
|
||||
eventLoopGroup.shutdownGracefully();
|
||||
channel.close();
|
||||
handlerPool.execute(locoSocektHandler::onDisconnect);
|
||||
handlerLoop.execute(locoSocketHandler::onDisconnect);
|
||||
alive = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -20,14 +20,14 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
|
||||
|
||||
private final Map<Integer, Future<LocoPacket>> waitList;
|
||||
|
||||
private final LocoSocketHandler locoSocektHandler;
|
||||
private final LocoSocketHandler locoSocketHandler;
|
||||
|
||||
private final ExecutorService handlerPool;
|
||||
private final ExecutorService handlerLoop;
|
||||
|
||||
public LocoCodec(LocoSocketHandler locoSocektHandler, ExecutorService handlerPool, Map<Integer, Future<LocoPacket>> waitList) {
|
||||
this.locoSocektHandler = locoSocektHandler;
|
||||
this.handlerPool = handlerPool;
|
||||
public LocoCodec(LocoSocketHandler locoSocketHandler, Map<Integer, Future<LocoPacket>> waitList, ExecutorService handlerLoop) {
|
||||
this.locoSocketHandler = locoSocketHandler;
|
||||
this.waitList = waitList;
|
||||
this.handlerLoop = handlerLoop;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -68,16 +68,16 @@ public class LocoCodec extends MessageToMessageCodec<byte[], LocoPacket> {
|
||||
}
|
||||
if (currentLocoPacket.getBodyLength() > buffer.length) break;
|
||||
byte[] body = ByteUtil.sliceBytes(buffer, 0, currentLocoPacket.getBodyLength());
|
||||
//System.out.println(currentLocoPacket.getMethod());
|
||||
//System.out.println(BsonUtil.bsonToJson(body));
|
||||
System.out.println(currentLocoPacket.getMethod());
|
||||
System.out.println(BsonUtil.bsonToJson(body));
|
||||
buffer = ByteUtil.sliceBytes(buffer, currentLocoPacket.getBodyLength(), buffer.length - currentLocoPacket.getBodyLength());
|
||||
currentLocoPacket.setBody(body);
|
||||
if (waitList.containsKey(currentLocoPacket.getPacketId())) {
|
||||
((CompletableFuture<LocoPacket>) waitList.get(currentLocoPacket.getPacketId())).complete(currentLocoPacket);
|
||||
} else {
|
||||
final LocoPacket p = currentLocoPacket;
|
||||
handlerPool.execute(() -> {
|
||||
locoSocektHandler.onPacket(p);
|
||||
handlerLoop.execute(() -> {
|
||||
locoSocketHandler.onPacket(p);
|
||||
});
|
||||
}
|
||||
currentLocoPacket = null;
|
||||
|
||||
@@ -9,7 +9,7 @@ import java.util.List;
|
||||
|
||||
public class SecureLayerCodec extends MessageToMessageCodec<byte[], byte[]> {
|
||||
|
||||
private CryptoManager cryptoManager;
|
||||
private final CryptoManager cryptoManager;
|
||||
|
||||
private int currentLength = -1;
|
||||
private byte[] buffer = new byte[0];
|
||||
@@ -20,6 +20,7 @@ public class SecureLayerCodec extends MessageToMessageCodec<byte[], byte[]> {
|
||||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext channelHandlerContext, byte[] bytes, List<Object> list) throws Exception {
|
||||
// TODO 길이 길면 짤라서 암호화해서 보내야됨
|
||||
byte[] encryptedBody = cryptoManager.encryptMessage(bytes);
|
||||
byte[] packet = ByteUtil.concatBytes(ByteUtil.intToByteArrayLE(encryptedBody.length), encryptedBody);
|
||||
list.add(packet);
|
||||
|
||||
@@ -55,10 +55,9 @@ public class CryptoManager {
|
||||
byte[] length = ByteUtil.intToByteArrayLE(HANDSHAKE_BODY_SIZE);
|
||||
return ByteUtil.concatBytes(length, ByteUtil.intToByteArrayLE(RSA_LOCO_HEADER), ByteUtil.intToByteArrayLE(AES_LOCO_HEADER), encryptedKey);
|
||||
} catch (Exception e) {}
|
||||
return new byte[0];
|
||||
return null;
|
||||
}
|
||||
|
||||
// 바디 사이즈가 131067가 최대인거 같은데 잘 모르겠음
|
||||
public byte[] encryptMessage(byte[] message) {
|
||||
try {
|
||||
byte[] nonce = new byte[AES_NONCE_SIZE];
|
||||
@@ -67,7 +66,7 @@ public class CryptoManager {
|
||||
cipher.init(Cipher.ENCRYPT_MODE, aesKey, new GCMParameterSpec(AES_KEY_SIZE, nonce));
|
||||
return ByteUtil.concatBytes(nonce, cipher.doFinal(message));
|
||||
} catch (Exception e) {}
|
||||
return new byte[0];
|
||||
return null;
|
||||
}
|
||||
|
||||
public byte[] decryptMessage(byte[] message) {
|
||||
@@ -77,7 +76,7 @@ public class CryptoManager {
|
||||
cipher.init(Cipher.DECRYPT_MODE, aesKey, new GCMParameterSpec(AES_KEY_SIZE, nonce));
|
||||
return cipher.doFinal(ByteUtil.sliceBytes(message, AES_NONCE_SIZE, message.length - nonce.length));
|
||||
} catch (Exception e) {}
|
||||
return new byte[0];
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user