From 450b15e4dfc5847b833c8ff50cdd27cfd37cb5e9 Mon Sep 17 00:00:00 2001 From: TheRedApricot Date: Sat, 30 Aug 2025 21:25:17 +0800 Subject: [PATCH] (tools, client, server) feat: Complete ProtoBuf message transmission with both TCP and UDP --- Client/Assets/Scenes/Test.unity | 2 +- .../Assets/Scripts/Network/UnityTcpClient.cs | 8 ++-- Client/Assets/Scripts/Protocol/Message.cs | 15 ++++-- Client/Assets/Scripts/Test/TcpClientTest.cs | 30 ++++++++++-- Client/Assets/Scripts/Test/UdpClientTest.cs | 30 ++++++++++-- Server/src/main.rs | 2 +- Server/src/message_dispatcher.rs | 48 +++++++++++++++++++ Server/src/servers.rs | 2 + Server/src/servers/tcp_server.rs | 24 ++++++---- Server/src/servers/udp_server.rs | 14 +++--- Server/src/services.rs | 1 - Server/src/services/game_service.rs | 1 - Tools/ProtoBuf/proto/message.proto | 7 +++ 13 files changed, 149 insertions(+), 35 deletions(-) create mode 100644 Server/src/message_dispatcher.rs delete mode 100644 Server/src/services.rs delete mode 100644 Server/src/services/game_service.rs diff --git a/Client/Assets/Scenes/Test.unity b/Client/Assets/Scenes/Test.unity index 9ea8d64..307d820 100644 --- a/Client/Assets/Scenes/Test.unity +++ b/Client/Assets/Scenes/Test.unity @@ -584,7 +584,7 @@ GameObject: m_Icon: {fileID: 0} m_NavMeshLayer: 0 m_StaticEditorFlags: 0 - m_IsActive: 0 + m_IsActive: 1 --- !u!4 &1388451206 Transform: m_ObjectHideFlags: 0 diff --git a/Client/Assets/Scripts/Network/UnityTcpClient.cs b/Client/Assets/Scripts/Network/UnityTcpClient.cs index 15d52a4..ea5dbca 100644 --- a/Client/Assets/Scripts/Network/UnityTcpClient.cs +++ b/Client/Assets/Scripts/Network/UnityTcpClient.cs @@ -8,6 +8,8 @@ namespace Network { public class UnityTcpClient : Singleton, IDisposable { + private const int TcpMaxPayloadSize = 1460; + private TcpClient _client; private bool _disposed; @@ -34,9 +36,9 @@ namespace Network await stream.WriteAsync(data, 0, data.Length); - var buffer = new byte[1024]; - await stream.ReadAsync(buffer); - return buffer; + var buffer = new byte[TcpMaxPayloadSize]; + var len = await stream.ReadAsync(buffer); + return buffer[..len]; } catch (Exception ex) { diff --git a/Client/Assets/Scripts/Protocol/Message.cs b/Client/Assets/Scripts/Protocol/Message.cs index dd43c40..67408c6 100644 --- a/Client/Assets/Scripts/Protocol/Message.cs +++ b/Client/Assets/Scripts/Protocol/Message.cs @@ -30,11 +30,13 @@ namespace Protocol { "Eg8KB21lc3NhZ2UYAiABKAkiMwoNU2lnbnVwUmVxdWVzdBIQCgh1c2VybmFt", "ZRgBIAEoCRIQCghwYXNzd29yZBgCIAEoCSJKCg5TaWdudXBSZXNwb25zZRIn", "CgZyZXN1bHQYASABKA4yFy5wcm90b2NvbC5SZXF1ZXN0UmVzdWx0Eg8KB21l", - "c3NhZ2UYAiABKAkqJgoNUmVxdWVzdFJlc3VsdBILCgdTdWNjZXNzEAASCAoE", - "RmFpbBABYgZwcm90bzM=")); + "c3NhZ2UYAiABKAkqWQoLTWVzc2FnZVR5cGUSEAoMbG9naW5SZXF1ZXN0EAAS", + "EQoNbG9naW5SZXNwb25zZRABEhEKDXNpZ251cFJlcXVlc3QQAhISCg5zaWdu", + "dXBSZXNwb25zZRADKiYKDVJlcXVlc3RSZXN1bHQSCwoHU3VjY2VzcxAAEggK", + "BEZhaWwQAWIGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Protocol.RequestResult), }, null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Protocol.MessageType), typeof(global::Protocol.RequestResult), }, null, new pbr::GeneratedClrTypeInfo[] { new pbr::GeneratedClrTypeInfo(typeof(global::Protocol.LoginRequest), global::Protocol.LoginRequest.Parser, new[]{ "Username", "Password" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Protocol.LoginResponse), global::Protocol.LoginResponse.Parser, new[]{ "Result", "Message" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::Protocol.SignupRequest), global::Protocol.SignupRequest.Parser, new[]{ "Username", "Password" }, null, null, null, null), @@ -45,6 +47,13 @@ namespace Protocol { } #region Enums + public enum MessageType { + [pbr::OriginalName("loginRequest")] LoginRequest = 0, + [pbr::OriginalName("loginResponse")] LoginResponse = 1, + [pbr::OriginalName("signupRequest")] SignupRequest = 2, + [pbr::OriginalName("signupResponse")] SignupResponse = 3, + } + public enum RequestResult { [pbr::OriginalName("Success")] Success = 0, [pbr::OriginalName("Fail")] Fail = 1, diff --git a/Client/Assets/Scripts/Test/TcpClientTest.cs b/Client/Assets/Scripts/Test/TcpClientTest.cs index dbf4616..78d0584 100644 --- a/Client/Assets/Scripts/Test/TcpClientTest.cs +++ b/Client/Assets/Scripts/Test/TcpClientTest.cs @@ -1,5 +1,7 @@ +using Google.Protobuf; using Network; -using System.Text; +using Protocol; +using System.Collections.Generic; using UnityEngine; namespace Test @@ -8,12 +10,30 @@ namespace Test { private async void Start() { - var sendBytes = Encoding.UTF8.GetBytes("This is a test string sent via TCP."); + var request = new LoginRequest + { + Username = "原神,启动!(通过TCP)", + Password = "20200928", + }; - var receivedBytes = await UnityTcpClient.Instance.SendAndReceiveData(sendBytes); - var receivedString = Encoding.UTF8.GetString(receivedBytes); + var requestBytes = new byte[request.CalculateSize()]; + request.WriteTo(requestBytes); - Debug.Log($"Received string: {receivedString}"); + var sendBytes = new List + { + (byte)MessageType.LoginRequest + }; + sendBytes.AddRange(requestBytes); + + var responseBytes = await UnityTcpClient.Instance.SendAndReceiveData(sendBytes.ToArray()); + + if (responseBytes.Length == 0) return; + else if (responseBytes[0] == (byte)MessageType.LoginResponse) + { + var response = LoginResponse.Parser.ParseFrom(responseBytes[1..]); + + Debug.Log($"Received response: {response}"); + } } } } \ No newline at end of file diff --git a/Client/Assets/Scripts/Test/UdpClientTest.cs b/Client/Assets/Scripts/Test/UdpClientTest.cs index bfd2d68..02dfe0a 100644 --- a/Client/Assets/Scripts/Test/UdpClientTest.cs +++ b/Client/Assets/Scripts/Test/UdpClientTest.cs @@ -1,5 +1,7 @@ +using Google.Protobuf; using Network; -using System.Text; +using Protocol; +using System.Collections.Generic; using UnityEngine; namespace Test @@ -8,12 +10,30 @@ namespace Test { private async void Start() { - var sendBytes = Encoding.UTF8.GetBytes("This is a test string sent via UDP."); + var request = new LoginRequest + { + Username = "原神,启动!(谁会通过UDP启动啊喂!)", + Password = "20200928", + }; - var receivedBytes = await UnityUdpClient.Instance.SendAndReceiveData(sendBytes); - var receivedString = Encoding.UTF8.GetString(receivedBytes); + var requestBytes = new byte[request.CalculateSize()]; + request.WriteTo(requestBytes); - Debug.Log($"Received string: {receivedString}"); + var sendBytes = new List + { + (byte)MessageType.LoginRequest + }; + sendBytes.AddRange(requestBytes); + + var responseBytes = await UnityUdpClient.Instance.SendAndReceiveData(sendBytes.ToArray()); + + if (responseBytes.Length == 0) return; + else if (responseBytes[0] == (byte)MessageType.LoginResponse) + { + var response = LoginResponse.Parser.ParseFrom(responseBytes[1..]); + + Debug.Log($"Received response: {response}"); + } } } } \ No newline at end of file diff --git a/Server/src/main.rs b/Server/src/main.rs index 6fb867e..145df8e 100644 --- a/Server/src/main.rs +++ b/Server/src/main.rs @@ -1,8 +1,8 @@ mod command_helper; +mod message_dispatcher; mod protocol; mod server_logger; mod servers; -mod services; use server_logger::ServerLogger; use servers::tcp_server::TCP_SERVER; diff --git a/Server/src/message_dispatcher.rs b/Server/src/message_dispatcher.rs new file mode 100644 index 0000000..99a0390 --- /dev/null +++ b/Server/src/message_dispatcher.rs @@ -0,0 +1,48 @@ +use prost::Message; + +use crate::protocol::{ + LoginRequest, LoginResponse, MessageType, RequestResult, SignupRequest, SignupResponse, +}; + +pub(crate) fn dispatch_message(msg_type: u8, msg: &[u8]) -> Vec { + let mut buf = Vec::new(); + + match msg_type { + // Owing to Rust disallows converting an integer to an + // enumeration (yet allows in reverse! ಠ_ಠ), we have to + // use such this way to make this pattern matching works. + // + // This seems not as elegable as what we are expected, + // but it could work well, provided that items in + // `MessageType` doesn't excceed 256. + val if val == MessageType::LoginRequest as u8 => { + let msg = LoginRequest::decode(msg).unwrap(); + + log::info!("{msg:?}"); + + LoginResponse { + result: RequestResult::Success.into(), + message: "Successfully logged in!".into(), + } + .encode(&mut buf) + .unwrap(); + + [vec![MessageType::LoginResponse as u8], buf].concat() + } + val if val == MessageType::SignupRequest as u8 => { + let msg = SignupRequest::decode(msg).unwrap(); + + log::info!("{msg:?}"); + + SignupResponse { + result: RequestResult::Success.into(), + message: "Successfully signed up!".into(), + } + .encode(&mut buf) + .unwrap(); + + [vec![MessageType::SignupResponse as u8], buf].concat() + } + _ => unreachable!(), + } +} diff --git a/Server/src/servers.rs b/Server/src/servers.rs index e55b41d..718fb79 100644 --- a/Server/src/servers.rs +++ b/Server/src/servers.rs @@ -2,3 +2,5 @@ pub(crate) mod tcp_server; pub(crate) mod udp_server; const SERVER_ADDR: &str = "127.0.0.1:12345"; +const TCP_MAX_PAYLOAD_SIZE: usize = 1460; +const UDP_MAX_PAYLOAD_SIZE: usize = 1472; diff --git a/Server/src/servers/tcp_server.rs b/Server/src/servers/tcp_server.rs index 74c5aaf..051c2ec 100644 --- a/Server/src/servers/tcp_server.rs +++ b/Server/src/servers/tcp_server.rs @@ -5,10 +5,12 @@ use std::sync::LazyLock; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, mpsc}; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{self, Sender}; use tokio::task::JoinHandle; -use super::SERVER_ADDR; +use super::{SERVER_ADDR, TCP_MAX_PAYLOAD_SIZE}; +use crate::message_dispatcher; pub(crate) static TCP_SERVER: LazyLock> = LazyLock::new(|| Mutex::new(TcpServer::new())); @@ -16,7 +18,7 @@ pub(crate) static TCP_SERVER: LazyLock> = pub(crate) struct TcpServer { is_running: bool, clients: HashMap>, - shutdown_tx: Option>, + shutdown_tx: Option>, } impl TcpServer { @@ -31,13 +33,14 @@ impl TcpServer { pub(crate) async fn start(&mut self) { if self.is_running { log::warn!("TCP server is already running"); + return; } match TcpListener::bind(SERVER_ADDR).await { Ok(listener) => { - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); self.is_running = true; + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); self.shutdown_tx = Some(shutdown_tx); tokio::spawn(async move { @@ -63,6 +66,7 @@ impl TcpServer { for (addr, connection) in self.clients.drain() { log::info!("Closing connection to {}", addr); + connection.abort(); } } @@ -79,6 +83,7 @@ impl TcpServer { if let Err(e) = Self::handle_client(socket, addr).await { log::error!("Client {addr} error: {e}"); } + log::info!("Client {addr} disconnected"); }); @@ -91,6 +96,7 @@ impl TcpServer { _ = shutdown_rx.recv() => { log::info!("TCP Server shutting down"); + break; } } @@ -98,18 +104,18 @@ impl TcpServer { } async fn handle_client(mut socket: TcpStream, addr: SocketAddr) -> io::Result<()> { - let mut buffer = [0; 1024]; - loop { + let mut buffer = [0; TCP_MAX_PAYLOAD_SIZE]; let len = socket.read(&mut buffer).await?; if len == 0 { break; } - log::debug!("Received {} bytes from {}", len, addr); + log::info!("Received {} bytes from {}", len, addr); - // TODO: Deserialize data - socket.write_all(&buffer[..len]).await?; + let response = message_dispatcher::dispatch_message(buffer[0], &buffer[1..len]); + + socket.write_all(&response).await?; } let mut server = TCP_SERVER.lock().await; diff --git a/Server/src/servers/udp_server.rs b/Server/src/servers/udp_server.rs index 86ed477..1c70235 100644 --- a/Server/src/servers/udp_server.rs +++ b/Server/src/servers/udp_server.rs @@ -4,7 +4,8 @@ use std::sync::LazyLock; use tokio::net::UdpSocket; use tokio::sync::Mutex; -use super::SERVER_ADDR; +use super::{SERVER_ADDR, UDP_MAX_PAYLOAD_SIZE}; +use crate::message_dispatcher; pub(crate) static UDP_SERVER: LazyLock> = LazyLock::new(|| Mutex::new(UdpServer::new())); @@ -21,6 +22,7 @@ impl UdpServer { pub(crate) async fn start(&mut self) { if self.is_running { log::warn!("UDP server is already running"); + return; } @@ -50,17 +52,17 @@ impl UdpServer { async fn handle_client(socket: &UdpSocket) -> io::Result<()> { loop { - let mut buffer = [0; 1024]; + let mut buffer = [0; UDP_MAX_PAYLOAD_SIZE]; let (len, addr) = socket.recv_from(&mut buffer).await?; if len == 0 { break; } - log::info!("Received message from client {addr}"); + log::info!("Received {} bytes from {}", len, addr); - // TODO: Deserialize data - let buffer = &buffer[..len]; - socket.send_to(buffer, addr).await?; + let response = message_dispatcher::dispatch_message(buffer[0], &buffer[1..len]); + + socket.send_to(&response, addr).await?; } Ok(()) diff --git a/Server/src/services.rs b/Server/src/services.rs deleted file mode 100644 index 8df9e50..0000000 --- a/Server/src/services.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) mod game_service; diff --git a/Server/src/services/game_service.rs b/Server/src/services/game_service.rs deleted file mode 100644 index 8b13789..0000000 --- a/Server/src/services/game_service.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/Tools/ProtoBuf/proto/message.proto b/Tools/ProtoBuf/proto/message.proto index bbad498..e52936b 100644 --- a/Tools/ProtoBuf/proto/message.proto +++ b/Tools/ProtoBuf/proto/message.proto @@ -2,6 +2,13 @@ syntax = "proto3"; package protocol; +enum MessageType { + loginRequest = 0; + loginResponse = 1; + signupRequest = 2; + signupResponse = 3; +} + enum RequestResult { Success = 0; Fail = 1;