diff --git a/src/args/parser.rs b/src/args/parser.rs index db66d34..a14e16d 100644 --- a/src/args/parser.rs +++ b/src/args/parser.rs @@ -28,6 +28,7 @@ pub enum DataType { pub struct Args { pub help: bool, pub version: bool, + pub verbose_output: bool, pub mode: RunMode, pub send_type: DataType, pub address_str: String, @@ -42,6 +43,7 @@ impl Args { return Args{ help: false, version: false, + verbose_output: false, mode: RunMode::DAEMON, send_type: DataType::TEXT, address_str: "".to_string(), @@ -72,6 +74,10 @@ pub fn parse(raw_args: &Vec) -> Result { parsed_args.version = true; } + "verbose" => { + parsed_args.verbose_output = true; + } + "send" => { if arg_amount < 5 { // not enough arguments ! diff --git a/src/main.rs b/src/main.rs index 5a0d734..0f98fdb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,8 +22,7 @@ mod nodes; use protocol::specs::RunMode; use args::parser::Args; -use std::net; -use crate::protocol::specs::{Packet, PacketType}; +use nodes::recv_node::RecvOptions; const VERSION: &str = "v0.1.0"; const HELP_MESSAGE: &str = @@ -34,6 +33,7 @@ ddtu [FLAG] [MODE] [MODE-specific arguments] [FLAG] [help]: ddtu help -> print help message [version]: ddtu version -> print version +[verbose]: ddtu verbose [MODE] [MODE-specific arguments] -> print almost everything the program does [MODE] [send]: ddtu send [TYPE] [SRC] [ADDR] -> send [SRC] of [TYPE] to [ADDR] @@ -49,6 +49,8 @@ ddtu [FLAG] [MODE] [MODE-specific arguments] (c) Kasyanov Nikolay Alexeyevich (Unbewohnte) "; +const DEFAULT_PORT: u16 = 55761; + fn main() { // get and parse arguments @@ -81,86 +83,18 @@ fn main() { match args.mode { RunMode::RECEIVE => { - let mut conn: net::TcpStream; - match net::TcpStream::connect(&args.address_str) { - Ok(c) => { - conn = c; - } - - Err(error) => { - println!("could not connect to {}: {}", args.address_str, error); + match nodes::recv_node::start( + RecvOptions::new(format!("0.0.0.0:{}", DEFAULT_PORT), args.save_directory), args.verbose_output) { + None => {} + Some(error) => { + println!("Error receiving: {}", error.text); return; } } - - println!("connected to {}", args.address_str); - - match protocol::packets::read_next_packet(&mut conn) { - Ok(packet) => { - match packet.get_type() { - PacketType::TextData => { - let mut text_packet = protocol::packets::Text::empty(); - text_packet.from_bytes(packet.as_bytes()); - println!("{}", text_packet.text); - } - - _ => { - - } - } - } - - Err(error) => { - println!("error reading packet: {}", error.text); - } - } } RunMode::SEND => { - let listener: net::TcpListener; - match net::TcpListener::bind("0.0.0.0:10555") { - Ok(l) => { - listener = l; - } - - Err(error) => { - println!("error creating a listener: {}", error); - return; - } - } - - let mut conn: net::TcpStream; - let addr: net::SocketAddr; - match listener.accept() { - Ok((c, a)) => { - conn = c; - addr = a; - } - - Err(error) => { - println!("error establising a new connection: {}", error); - return; - } - } - - println!("{} has connected", addr); - - // send text - let Text = protocol::packets::Text::new(&args.text_to_send); - match protocol::packets::send_packet(&mut conn, &Text) { - Ok(()) => {} - Err(error) => { - println!("{}", error.text); - } - } - - match conn.shutdown(net::Shutdown::Both) { - Ok(()) => {} - Err(error) => { - println!("could not shutdown the connection: {}", error); - return; - } - } + // match nodes::send_node::start(options: SendOptions, verbose_output: bool); } RunMode::DAEMON => { diff --git a/src/nodes/recv_node.rs b/src/nodes/recv_node.rs index 87cfaff..bfe09b0 100644 --- a/src/nodes/recv_node.rs +++ b/src/nodes/recv_node.rs @@ -16,14 +16,32 @@ GNU Affero General Public License for more details. use crate::util::error::Error; use crate::protocol::specs::PacketType; use crate::protocol::specs::Packet; +use crate::protocol::packets::{read_next_packet, send_packet}; use crate::protocol::packets; -use crate::protocol; use std::path; use std::net; +use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc; +use std::thread; -pub fn start(address_string: String, save_directory: path::PathBuf, verbose_output: bool) -> Option { +pub struct RecvOptions { + pub listen_address: String, + pub save_directory: path::PathBuf, +} + +impl RecvOptions { + pub fn new(addr: String, save_dir: path::PathBuf) -> RecvOptions { + return RecvOptions{ + listen_address: addr, + save_directory: save_dir, + } + } +} + + +pub fn start(options: RecvOptions, verbose_output: bool) -> Option { let listener: net::TcpListener; - match net::TcpListener::bind(address_string) { + match net::TcpListener::bind(options.listen_address) { Ok(l) => { listener = l; } @@ -55,7 +73,7 @@ pub fn start(address_string: String, save_directory: path::PathBuf, verbose_outp // read the first packet let incoming_packet: Box; - match protocol::packets::read_next_packet(&mut connection) { + match read_next_packet(&mut connection) { Ok(packet) => { incoming_packet = packet; } @@ -86,6 +104,16 @@ pub fn start(address_string: String, save_directory: path::PathBuf, verbose_outp println!("Handshake from {}", address); } + // accept handshake + match send_packet(&mut connection, &packets::HandshakeAccept{}) { + Ok(()) => {} + Err(error) => { + return Some( + Error::new(format!("error accepting handshake from {}: {}", address, error.text).as_str()) + ); + } + } + // break out of loop break; } @@ -101,6 +129,93 @@ pub fn start(address_string: String, save_directory: path::PathBuf, verbose_outp } // now read and process incoming packets; essentialy receiving something ! + + // launch a new packet reading thread + let (ch_send, ch_recv): (Sender>, Receiver>) = mpsc::channel(); + + let mut cloned_connection: net::TcpStream; + match connection.try_clone() { + Ok(cc) => { + cloned_connection = cc; + } + + Err(_) => { + return Some( + Error::new("could not clone connection") + ); + } + } + + thread::spawn(move || { + loop { + match read_next_packet(&mut cloned_connection) { + Ok(packet) => { + if let Err(_) = ch_send.send(packet) { + // the channel has been hung up + // return from this thread + return; + } + } + + Err(error) => { + if verbose_output { + println!("Could not read packet: {}", error.text); + } + continue; + } + } + } + }); + + + // handle incoming packets + loop { + let incoming_packet: Box; + match ch_recv.recv() { + Ok(p) => { + incoming_packet = p; + } + + Err(error) => { + return Some(Error::new( + format!("error receiving a new packet from listener thread: {}", error).as_str() + )); + } + } + + match incoming_packet.get_type() { + PacketType::ConnectionShutdown => { + if verbose_output { + println!("{} shuts down the connection", address); + } + connection.shutdown(net::Shutdown::Both); + + break; + } + + PacketType::TextData => { + let mut text_packet: packets::Text = packets::Text::empty(); + match text_packet.from_bytes(incoming_packet.as_bytes()) { + None => { + // print received text ! + println!("{}", text_packet.text); + } + + Some(error) => { + // close connection and exit + send_packet(&mut connection, &packets::ConnectionShutdown{}); + connection.shutdown(net::Shutdown::Both); + + return Some( + Error::new(format!("could not get text packet: {}", error.text).as_str()) + ); + } + } + } + + _ => {} + } + } return None; } diff --git a/src/nodes/send_node.rs b/src/nodes/send_node.rs index c4e9df0..5ac6cd7 100644 --- a/src/nodes/send_node.rs +++ b/src/nodes/send_node.rs @@ -13,9 +13,17 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. */ +use crate::args::parser::DataType; use std::net; use std::path; -pub fn start(address: String, source: path::PathBuf) { +pub struct SendOptions { + pub address: String, + pub send_type: DataType, + pub text_to_send: String, + pub source_path: path::PathBuf, +} + +pub fn start(options: SendOptions, verbose_output: bool) { } diff --git a/src/protocol/packets.rs b/src/protocol/packets.rs index 8338a62..3daff97 100644 --- a/src/protocol/packets.rs +++ b/src/protocol/packets.rs @@ -43,7 +43,7 @@ pub fn send_packet(conn: &mut net::TcpStream, packet: &dyn Packet) -> Result<(), } } -pub fn read_next_packet(conn: &mut net::TcpStream) -> Result, Error> { +pub fn read_next_packet(conn: &mut net::TcpStream) -> Result, Error> { let mut u128_buf: [u8; 16] = [0; 16]; match conn.read_exact(&mut u128_buf) { @@ -446,6 +446,22 @@ impl Packet for Handshake { self.encryption_key[i-17] = packet_bytes[i].to_be(); } + return None; + } +} + +pub struct HandshakeAccept {} + +impl Packet for HandshakeAccept { + fn get_type(&self) -> PacketType { + return PacketType::HandshakeAccept; + } + + fn as_bytes(&self) -> Vec { + return vec!(HANDSHAKE_ACCEPT_PACKET_ID.to_be_bytes()[0]); + } + + fn from_bytes(&mut self, _packet_bytes: Vec) -> Option { return None; } } \ No newline at end of file diff --git a/src/protocol/specs.rs b/src/protocol/specs.rs index 21f2673..4db400c 100644 --- a/src/protocol/specs.rs +++ b/src/protocol/specs.rs @@ -14,18 +14,18 @@ GNU Affero General Public License for more details. */ use crate::util::error::Error; -use crate::protocol::packets::*; -// Protocol versioning +// Versioning pub const PROTOCOL_VERSION_1: u8 = 1; pub const PROTOCOL_LATEST_VERSION: u8 = PROTOCOL_VERSION_1; // Packets' IDs pub const HANDSHAKE_PACKET_ID: u8 = 1; -pub const CONNECTION_SHUTDOWN_PACKET_ID: u8 = 2; -pub const TEXT_PACKET_ID: u8 = 3; -pub const FILEINFO_PACKET_ID: u8 = 4; -pub const FILEDATA_PACKET_ID: u8 = 5; +pub const HANDSHAKE_ACCEPT_PACKET_ID: u8 = 2; +pub const CONNECTION_SHUTDOWN_PACKET_ID: u8 = 3; +pub const TEXT_PACKET_ID: u8 = 4; +pub const FILEINFO_PACKET_ID: u8 = 5; +pub const FILEDATA_PACKET_ID: u8 = 6; // File chunk size pub const CHUNK_SIZE: usize = 262_144; // 256 KB @@ -49,9 +49,10 @@ pub enum PacketType { FileInfo, FileData, Handshake, + HandshakeAccept, } -pub trait Packet { +pub trait Packet : Send { fn get_type(&self) -> PacketType; fn as_bytes(&self) -> Vec; fn from_bytes(&mut self, packet_bytes: Vec) -> Option;