Browse Source

v2.2.0; Fixed freezing on big amount of files; Improved transfer data printing; Introduction to some minor bugs with it as well, but that is not a severe problem anyway

main
Unbewohnte 3 years ago
parent
commit
0ac9cd2b71
  1. 2
      src/main.go
  2. 288
      src/node/node.go
  3. 16
      src/protocol/headers.go
  4. 23
      src/protocol/send.go

2
src/main.go

@ -30,7 +30,7 @@ import (
)
var (
VERSION string = "v2.1.4"
VERSION string = "v2.2.0"
versionInformation string = fmt.Sprintf("ftu %s\n\nCopyright (C) 2021 Kasyanov Nikolay Alexeevich (Unbewohnte (https://unbewohnte.xyz/))\nThis program comes with ABSOLUTELY NO WARRANTY.\nThis is free software, and you are welcome to redistribute it under certain conditions; type \"ftu -l\" for details.\n", VERSION)

288
src/node/node.go

@ -39,12 +39,6 @@ import (
"github.com/Unbewohnte/ftu/protocol"
)
// node-controlling states
type nodeInnerstates struct {
Stopped bool // the way to exit the mainloop in case of an external error or a successful end of a transfer
AllowedToTransfer bool // the way to notify the mainloop of a sending node to start sending pieces of files
}
// netInfowork specific settings
type netInfo struct {
ConnAddr string // address to connect to. Does not include port
@ -59,14 +53,20 @@ type sending struct {
IsDirectory bool // is ServingPath a directory
Recursive bool // recursively send directory
CanSendBytes bool // is the other node ready to receive another piece
AllowedToTransfer bool // the way to notify the mainloop of a sending node to start sending pieces of files
InTransfer bool // already transferring|receiving files
FilesToSend []*fsys.File
CurrentFileID uint64 // an id of a file that is currently being transported
SentBytes uint64 // how many bytes sent already
TotalTransferSize uint64 // how many bytes will be sent in total
}
// Receiving-side node information
type receiving struct {
AcceptedFiles []*fsys.File // files that`ve been accepted to be received
DownloadsPath string // where to download
TotalDownloadSize uint64 // how many bytes will be received in total
ReceivedBytes uint64 // how many bytes downloaded so far
}
// Both sending-side and receiving-side information
@ -81,8 +81,8 @@ type Node struct {
mutex *sync.Mutex
packetPipe chan *protocol.Packet // a way to receive incoming packets from another goroutine
isSending bool // sending or a receiving node
stopped bool // the way to exit the mainloop in case of an external error or a successful end of a transfer
netInfo *netInfo
state *nodeInnerstates
transferInfo *transferInfo
}
@ -129,19 +129,20 @@ func NewNode(options *NodeOptions) (*Node, error) {
EncryptionKey: nil,
Conn: nil,
},
state: &nodeInnerstates{
AllowedToTransfer: false,
Stopped: false,
},
stopped: false,
transferInfo: &transferInfo{
Sending: &sending{
ServingPath: options.ServerSide.ServingPath,
Recursive: options.ServerSide.Recursive,
IsDirectory: isDir,
TotalTransferSize: 0,
SentBytes: 0,
},
Receiving: &receiving{
AcceptedFiles: nil,
DownloadsPath: options.ClientSide.DownloadsFolderPath,
ReceivedBytes: 0,
TotalDownloadSize: 0,
},
},
}
@ -184,7 +185,7 @@ func (node *Node) disconnect() error {
return err
}
node.state.Stopped = true
node.stopped = true
}
return nil
@ -216,24 +217,25 @@ func (node *Node) printTransferInfo(delay time.Duration) error {
switch node.isSending {
case true:
if !node.state.AllowedToTransfer {
if !node.transferInfo.Sending.AllowedToTransfer {
// do not print if the transfer has not been accepted yet
break
}
fmt.Printf("\r| files(s) left to send: %4d", len(node.transferInfo.Sending.FilesToSend))
fmt.Printf("\r| (%.2f/%.2f MB)",
float32(node.transferInfo.Sending.SentBytes)/1024/1024,
float32(node.transferInfo.Sending.TotalTransferSize)/1024/1024,
)
case false:
if len(node.transferInfo.Receiving.AcceptedFiles) <= 0 {
break
}
fmt.Printf("\r| file(s) left to receive: %4d", len(node.transferInfo.Receiving.AcceptedFiles))
fmt.Printf("\r| (%.2f/%.2f MB)",
float32(node.transferInfo.Receiving.ReceivedBytes)/1024/1024,
float32(node.transferInfo.Receiving.TotalDownloadSize)/1024/1024,
)
}
return nil
}
// Starts the node in either sending or receiving state and performs the transfer
func (node *Node) Start() {
switch node.isSending {
case true:
func (node *Node) send() {
// SENDER NODE
localIP, err := addr.GetLocal()
@ -242,40 +244,44 @@ func (node *Node) Start() {
}
// retrieve information about the file|directory
var fileToSend *fsys.File
var dirToSend *fsys.Directory
var FILETOSEND *fsys.File
var DIRTOSEND *fsys.Directory
switch node.transferInfo.Sending.IsDirectory {
case true:
dirToSend, err = fsys.GetDir(node.transferInfo.Sending.ServingPath, node.transferInfo.Sending.Recursive)
DIRTOSEND, err = fsys.GetDir(node.transferInfo.Sending.ServingPath, node.transferInfo.Sending.Recursive)
if err != nil {
panic(err)
}
case false:
fileToSend, err = fsys.GetFile(node.transferInfo.Sending.ServingPath)
FILETOSEND, err = fsys.GetFile(node.transferInfo.Sending.ServingPath)
if err != nil {
panic(err)
}
}
if dirToSend != nil {
size := float32(dirToSend.Size) / 1024 / 1024
if DIRTOSEND != nil {
node.transferInfo.Sending.TotalTransferSize = DIRTOSEND.Size
displaySize := float32(DIRTOSEND.Size) / 1024 / 1024
sizeLevel := "MiB"
if size >= 1024 {
if displaySize >= 1024 {
// GiB
size = size / 1024
displaySize = displaySize / 1024
sizeLevel = "GiB"
}
fmt.Printf("\nSending \"%s\" (%.3f %s) locally on %s:%d", dirToSend.Name, size, sizeLevel, localIP, node.netInfo.Port)
fmt.Printf("\nSending \"%s\" (%.3f %s) locally on %s:%d and remotely (if configured)", DIRTOSEND.Name, displaySize, sizeLevel, localIP, node.netInfo.Port)
} else {
size := float32(fileToSend.Size) / 1024 / 1024
node.transferInfo.Sending.TotalTransferSize = FILETOSEND.Size
displaySize := float32(FILETOSEND.Size) / 1024 / 1024
sizeLevel := "MiB"
if size >= 1024 {
if displaySize >= 1024 {
// GiB
size = size / 1024
displaySize = displaySize / 1024
sizeLevel = "GiB"
}
fmt.Printf("\nSending \"%s\" (%.3f %s) locally on %s:%d and remotely", fileToSend.Name, size, sizeLevel, localIP, node.netInfo.Port)
fmt.Printf("\nSending \"%s\" (%.3f %s) locally on %s:%d and remotely (if configured)", FILETOSEND.Name, displaySize, sizeLevel, localIP, node.netInfo.Port)
}
@ -299,11 +305,11 @@ func (node *Node) Start() {
go protocol.ReceivePackets(node.netInfo.Conn, node.packetPipe)
// send info about file/directory
go protocol.SendTransferOffer(node.netInfo.Conn, fileToSend, dirToSend, node.netInfo.EncryptionKey)
go protocol.SendTransferOffer(node.netInfo.Conn, FILETOSEND, DIRTOSEND, node.netInfo.EncryptionKey)
// mainloop
for {
if node.state.Stopped {
if node.stopped {
fmt.Printf("\n")
node.disconnect()
break
@ -333,114 +339,44 @@ func (node *Node) Start() {
case protocol.HeaderAccept:
// the receiving node has accepted the transfer
node.state.AllowedToTransfer = true
fmt.Printf("\nTransfer allowed. Sending...")
node.transferInfo.Sending.AllowedToTransfer = true
// notify it about all the files that are going to be sent
// prepare files to send
switch node.transferInfo.Sending.IsDirectory {
case true:
// send file packets for the files in the directory
err = dirToSend.SetRelativePaths(dirToSend.Path, node.transferInfo.Sending.Recursive)
err = DIRTOSEND.SetRelativePaths(DIRTOSEND.Path, node.transferInfo.Sending.Recursive)
if err != nil {
panic(err)
}
filesToSend := dirToSend.GetAllFiles(node.transferInfo.Sending.Recursive)
filesToSend := DIRTOSEND.GetAllFiles(node.transferInfo.Sending.Recursive)
// notify the other node about all the files that are going to be sent
for counter, file := range filesToSend {
// assign ID and add it to the node sendlist
file.ID = uint64(counter)
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, file)
}
// set current file id to the first file
node.transferInfo.Sending.CurrentFileID = 0
filePacket, err := protocol.CreateFilePacket(file)
if err != nil {
panic(err)
}
// encrypt if necessary
if node.netInfo.EncryptionKey != nil {
encryptedBody, err := encryption.Encrypt(node.netInfo.EncryptionKey, filePacket.Body)
if err != nil {
panic(err)
}
filePacket.Body = encryptedBody
}
err = protocol.SendPacket(node.netInfo.Conn, *filePacket)
if err != nil {
panic(err)
}
if node.verboseOutput {
fmt.Printf("\n[File] Sent filepacket for \"%s\"", file.Name)
}
time.Sleep(time.Microsecond * 3)
}
filesInfoDonePacket := protocol.Packet{
Header: protocol.HeaderFilesInfoDone,
}
protocol.SendPacket(node.netInfo.Conn, filesInfoDonePacket)
if node.verboseOutput {
fmt.Printf("\n[File] Done sending filepackets")
}
case false:
// send a filepacket of a single file
fileToSend.ID = 0
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, fileToSend)
FILETOSEND.ID = 0
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, FILETOSEND)
// set current file index to the first and only file
node.transferInfo.Sending.CurrentFileID = 0
filePacket, err := protocol.CreateFilePacket(node.transferInfo.Sending.FilesToSend[0])
if err != nil {
panic(err)
}
// encrypt if necessary
if node.netInfo.EncryptionKey != nil {
encryptedBody, err := encryption.Encrypt(node.netInfo.EncryptionKey, filePacket.Body)
if err != nil {
panic(err)
}
filePacket.Body = encryptedBody
}
err = protocol.SendPacket(node.netInfo.Conn, *filePacket)
if err != nil {
panic(err)
}
filesInfoDonePacket := protocol.Packet{
Header: protocol.HeaderFilesInfoDone,
}
protocol.SendPacket(node.netInfo.Conn, filesInfoDonePacket)
if node.verboseOutput {
fmt.Printf("\n[File] Sent filepacket for \"%s\"", fileToSend.Name)
}
if node.verboseOutput {
fmt.Printf("\n[File] Done sending filepackets")
}
}
fmt.Printf("\n")
case protocol.HeaderReject:
node.state.Stopped = true
node.stopped = true
fmt.Printf("\nTransfer rejected. Disconnecting...")
case protocol.HeaderDisconnecting:
node.state.Stopped = true
node.stopped = true
fmt.Printf("\n%s disconnected", node.netInfo.Conn.RemoteAddr())
case protocol.HeaderAlreadyHave:
@ -457,6 +393,8 @@ func (node *Node) Start() {
node.transferInfo.Sending.CurrentFileID++
node.transferInfo.Sending.SentBytes += fileToSend.Size
if node.verboseOutput {
fmt.Printf("\n[File] receiver already has \"%s\"", fileToSend.Name)
}
@ -465,10 +403,6 @@ func (node *Node) Start() {
}
if !node.verboseOutput {
go node.printTransferInfo(time.Second)
}
// Transfer section
if len(node.transferInfo.Sending.FilesToSend) == 0 {
@ -478,14 +412,52 @@ func (node *Node) Start() {
})
fmt.Printf("\nTransfer ended successfully")
node.state.Stopped = true
node.stopped = true
continue
}
if node.transferInfo.Sending.AllowedToTransfer && !node.transferInfo.Sending.InTransfer {
// notify the node about the next file to be sent
// determine an index of a file with current ID
var currentFileIndex uint64 = 0
for index, fileToSend := range node.transferInfo.Sending.FilesToSend {
if fileToSend.ID == node.transferInfo.Sending.CurrentFileID {
currentFileIndex = uint64(index)
break
}
}
fpacket, err := protocol.CreateFilePacket(node.transferInfo.Sending.FilesToSend[currentFileIndex])
if err != nil {
panic(err)
}
if node.netInfo.EncryptionKey != nil {
err = fpacket.EncryptBody(node.netInfo.EncryptionKey)
if err != nil {
panic(err)
}
}
err = protocol.SendPacket(node.netInfo.Conn, *fpacket)
if err != nil {
panic(err)
}
// initiate the transfer for this file on the next iteration
node.transferInfo.Sending.InTransfer = true
continue
}
if !node.verboseOutput {
go node.printTransferInfo(time.Second)
}
// if allowed to transfer and the other node is ready to receive packets - send one piece
// and wait for it to be ready again
if node.state.AllowedToTransfer && node.transferInfo.Sending.CanSendBytes {
if node.transferInfo.Sending.AllowedToTransfer && node.transferInfo.Sending.CanSendBytes && node.transferInfo.Sending.InTransfer {
// sending a piece of a single file
// determine an index of a file with current ID
@ -497,7 +469,8 @@ func (node *Node) Start() {
}
}
err = protocol.SendPiece(node.transferInfo.Sending.FilesToSend[currentFileIndex], node.netInfo.Conn, node.netInfo.EncryptionKey)
sentBytes, err := protocol.SendPiece(node.transferInfo.Sending.FilesToSend[currentFileIndex], node.netInfo.Conn, node.netInfo.EncryptionKey)
node.transferInfo.Sending.SentBytes += sentBytes
switch err {
case protocol.ErrorSentAll:
// the file has been sent fully
@ -526,24 +499,27 @@ func (node *Node) Start() {
protocol.SendPacket(node.netInfo.Conn, endFilePacket)
// remove this file from the queue
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend[:currentFileIndex], node.transferInfo.Sending.FilesToSend[currentFileIndex+1:]...)
// start sending the next file
// set counter to the next file ID
node.transferInfo.Sending.CurrentFileID++
node.transferInfo.Sending.InTransfer = false
case nil:
node.transferInfo.Sending.CanSendBytes = false
default:
node.state.Stopped = true
node.stopped = true
fmt.Printf("\nAn error occured while sending a piece of \"%s\": %s", node.transferInfo.Sending.FilesToSend[currentFileIndex].Name, err)
panic(err)
}
}
}
}
case false:
func (node *Node) receive() {
// RECEIVER NODE
// connect to the sending node
@ -559,7 +535,7 @@ func (node *Node) Start() {
// mainloop
for {
node.mutex.Lock()
stopped := node.state.Stopped
stopped := node.stopped
node.mutex.Unlock()
if stopped {
@ -571,7 +547,7 @@ func (node *Node) Start() {
// receive incoming packets and decrypt them if necessary
incomingPacket, ok := <-node.packetPipe
if !ok {
fmt.Printf("\nThe connection has been closed unexpectedly\n")
fmt.Printf("\nConnection has been closed unexpectedly\n")
os.Exit(-1)
}
@ -595,6 +571,8 @@ func (node *Node) Start() {
}
if file != nil {
node.transferInfo.Receiving.TotalDownloadSize = file.Size
size := float32(file.Size) / 1024 / 1024
sizeLevel := "MiB"
if size >= 1024 {
@ -603,7 +581,10 @@ func (node *Node) Start() {
sizeLevel = "GiB"
}
fmt.Printf("\n| Filename: %s\n| Size: %.3f %s\n| Checksum: %s\n", file.Name, size, sizeLevel, file.Checksum)
} else if dir != nil {
node.transferInfo.Receiving.TotalDownloadSize = dir.Size
size := float32(dir.Size) / 1024 / 1024
sizeLevel := "MiB"
if size >= 1024 {
@ -627,7 +608,7 @@ func (node *Node) Start() {
err = os.MkdirAll(filepath.Join(node.transferInfo.Receiving.DownloadsPath, dir.Name), os.ModePerm)
if err != nil {
// well, just download all files in the default downloads folder then
fmt.Printf("\n[ERROR] could not create a directory")
fmt.Printf("\n[ERROR] could not create a directory, downloading directly to the specified location")
} else {
// also download everything in a newly created directory
node.transferInfo.Receiving.DownloadsPath = filepath.Join(node.transferInfo.Receiving.DownloadsPath, dir.Name)
@ -658,14 +639,13 @@ func (node *Node) Start() {
}
node.mutex.Lock()
node.state.Stopped = true
node.stopped = true
node.mutex.Unlock()
}
}()
case protocol.HeaderFile:
// add file to the accepted files;
file, err := protocol.DecodeFilePacket(incomingPacket)
if err != nil {
panic(err)
@ -725,6 +705,8 @@ func (node *Node) Start() {
protocol.SendPacket(node.netInfo.Conn, alreadyHavePacket)
node.transferInfo.Receiving.ReceivedBytes += file.Size
if node.verboseOutput {
fmt.Printf("\n[File] already have \"%s\"", file.Name)
}
@ -732,6 +714,10 @@ func (node *Node) Start() {
} else {
// not the same file. Remove it and await new bytes
os.Remove(file.Path)
node.mutex.Lock()
node.transferInfo.Receiving.AcceptedFiles = append(node.transferInfo.Receiving.AcceptedFiles, file)
node.mutex.Unlock()
}
existingFileHandler.Close()
@ -743,6 +729,13 @@ func (node *Node) Start() {
node.mutex.Unlock()
}
err = protocol.SendPacket(node.netInfo.Conn, protocol.Packet{
Header: protocol.HeaderReady,
})
if err != nil {
panic(err)
}
case protocol.HeaderFileBytes:
// check if this file has been accepted to receive
@ -772,6 +765,7 @@ func (node *Node) Start() {
panic(err)
}
acceptedFile.SentBytes += uint64(wrote)
node.transferInfo.Receiving.ReceivedBytes += uint64(wrote)
err = acceptedFile.Close()
if err != nil {
@ -785,21 +779,8 @@ func (node *Node) Start() {
}
protocol.SendPacket(node.netInfo.Conn, readyPacket)
case protocol.HeaderFilesInfoDone:
// have all information about the files
// notify the other node that this one is ready
err = protocol.SendPacket(node.netInfo.Conn, protocol.Packet{
Header: protocol.HeaderReady,
})
if err != nil {
panic(err)
}
case protocol.HeaderEndfile:
// one of the files has been received completely,
// compare checksums and check if it is the last
// file in the transfer
// one of the files has been received completely
fileIDReader := bytes.NewReader(incomingPacket.Body)
var fileID uint64
@ -864,20 +845,29 @@ func (node *Node) Start() {
case protocol.HeaderDone:
node.mutex.Lock()
node.state.Stopped = true
node.stopped = true
node.mutex.Unlock()
case protocol.HeaderDisconnecting:
node.mutex.Lock()
node.state.Stopped = true
node.stopped = true
node.mutex.Unlock()
fmt.Printf("\n%s disconnected", node.netInfo.Conn.RemoteAddr())
}
if !node.verboseOutput {
if !node.verboseOutput && node.transferInfo.Receiving.ReceivedBytes != 0 {
go node.printTransferInfo(time.Second)
}
}
}
// Starts the node in either sending or receiving state and performs the transfer
func (node *Node) Start() {
switch node.isSending {
case true:
node.send()
case false:
node.receive()
}
}

16
src/protocol/headers.go

@ -29,7 +29,7 @@ type Header string
//// and (size) is 8 bytes long big-endian binary encoded uint64
// ENCRKEY.
// The FIRST header to be sent. Sent immediately after the connection has been established
// The FIRST header to be sent if you`re going to encrypt the transfer. Sent immediately after the connection has been established
// by sender. Body contains a size of a key and the key itself.
// ie: ENCRKEY~(size)(encryption key)
const HeaderEncryptionKey Header = "ENCRKEY"
@ -41,7 +41,7 @@ const HeaderReject Header = "REJECT"
// ACCEPT.
// The opposite of the previous REJECT. Sent by receiver when
// he has agreed to download the file|directory.
// it has agreed to download the file|directory.
// ie: ACCEPT~
const HeaderAccept Header = "ACCEPT"
@ -55,19 +55,11 @@ const HeaderDone Header = "DONE"
// READY.
// Sent by receiver when it has read and processed the last
// FILEBYTES packet or when it has information about all the files and it`s ready to
// receive bytes (FILESINFODONE). The sender is not allowed to "spam" FILEBYTES
// packets without the permission of receiver.
// FILEBYTES or FILE packet. The sender is not allowed to "spam" FILEBYTES or FILE
// packets without the permission (packet with this header) from receiver.
// ie: READY!~
const HeaderReady Header = "READY"
// FILESINFODONE.
// Sent by sender after it`s announced about all the files that are
// going to be sent. It is not allowed to send any file bytes before
// packet with this header was sent.
// ie: FILESINFODONE~
const HeaderFilesInfoDone Header = "FILESINFODONE"
// BYE!.
// Packet with this header can be sent both by receiver and sender.
// It`s used when the sender or the receiver are going to disconnect

23
src/protocol/send.go

@ -137,13 +137,15 @@ func SendTransferOffer(connection net.Conn, file *fsys.File, dir *fsys.Directory
var ErrorSentAll error = fmt.Errorf("sent the whole file")
// sends a piece of file to the connection; The next calls will send
// Sends a piece of file to the connection; The next calls will send
// another piece util the file has been fully sent. If encrKey is not nil - encrypts each packet with
// this key
func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) error {
// this key. Returns amount of filebytes written to the connection
func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) (uint64, error) {
var sentBytes uint64 = 0
err := file.Open()
if err != nil {
return err
return sentBytes, err
}
defer file.Close()
@ -152,7 +154,7 @@ func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) error {
}
if file.Size == file.SentBytes {
return ErrorSentAll
return sentBytes, ErrorSentAll
}
fileBytesPacket := Packet{
@ -164,7 +166,7 @@ func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) error {
// write file ID first
err = binary.Write(packetBodyBuff, binary.BigEndian, file.ID)
if err != nil {
return err
return sentBytes, err
}
// fill the remaining space of packet with the contents of a file
@ -183,9 +185,10 @@ func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) error {
read, err := file.Handler.ReadAt(fileBytes, int64(file.SentBytes))
if err != nil {
return err
return sentBytes, err
}
file.SentBytes += uint64(read)
sentBytes += uint64(canSendBytes)
packetBodyBuff.Write(fileBytes)
@ -194,15 +197,15 @@ func SendPiece(file *fsys.File, connection net.Conn, encrKey []byte) error {
if encrKey != nil {
err = fileBytesPacket.EncryptBody(encrKey)
if err != nil {
return err
return sentBytes, err
}
}
// send it to the other side
err = SendPacket(connection, fileBytesPacket)
if err != nil {
return err
return 0, err
}
return nil
return sentBytes, nil
}

Loading…
Cancel
Save