Browse Source

[protocol] added ALREADYHAVE packet; [node] fixed logical path error when receiving a single file; ALREADY HAVE FILE FEATURE

main
Unbewohnte 3 years ago
parent
commit
fab8aa947b
  1. 135
      src/node/node.go
  2. 7
      src/protocol/headers.go

135
src/node/node.go

@ -35,12 +35,13 @@ type netInfoInfo struct {
// Sending-side node information // Sending-side node information
type sending struct { type sending struct {
ServingPath string // path to the thing that will be sent ServingPath string // path to the thing that will be sent
IsDirectory bool // is ServingPath a directory IsDirectory bool // is ServingPath a directory
Recursive bool // recursively send directory Recursive bool // recursively send directory
CanSendBytes bool // is the other node ready to receive another piece CanSendBytes bool // is the other node ready to receive another piece
FilesToSend []*fsys.File FilesToSend []*fsys.File
CurrentFileIndex uint64 // an id of a file that is currently being transported // CurrentFileIndex uint64 // an index of a file with a specific ID that is currently being transported
CurrentFileID uint64 // an id of a file that is currently being transported
} }
// Receiving-side node information // Receiving-side node information
@ -313,7 +314,7 @@ func (node *Node) Start() {
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, file) node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, file)
// set current file id to the first file // set current file id to the first file
node.transferInfo.Sending.CurrentFileIndex = 0 node.transferInfo.Sending.CurrentFileID = 0
filePacket, err := protocol.CreateFilePacket(file) filePacket, err := protocol.CreateFilePacket(file)
if err != nil { if err != nil {
@ -342,7 +343,7 @@ func (node *Node) Start() {
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, fileToSend) node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend, fileToSend)
// set current file index to the first and only file // set current file index to the first and only file
node.transferInfo.Sending.CurrentFileIndex = 0 node.transferInfo.Sending.CurrentFileID = 0
filePacket, err := protocol.CreateFilePacket(node.transferInfo.Sending.FilesToSend[0]) filePacket, err := protocol.CreateFilePacket(node.transferInfo.Sending.FilesToSend[0])
if err != nil { if err != nil {
@ -373,6 +374,23 @@ func (node *Node) Start() {
case protocol.HeaderDisconnecting: case protocol.HeaderDisconnecting:
node.state.Stopped = true node.state.Stopped = true
fmt.Printf("%s disconnected\n", node.netInfo.Conn.RemoteAddr()) fmt.Printf("%s disconnected\n", node.netInfo.Conn.RemoteAddr())
case protocol.HeaderAlreadyHave:
// the other node already has a file with such ID.
// do not send it
fileIDReader := bytes.NewReader(incomingPacket.Body)
var fileID uint64
binary.Read(fileIDReader, binary.BigEndian, &fileID)
for index, fileToSend := range node.transferInfo.Sending.FilesToSend {
if fileToSend.ID == fileID {
node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend[:index], node.transferInfo.Sending.FilesToSend[index+1:]...)
node.transferInfo.Sending.CurrentFileID++
}
}
} }
// Transfer section // Transfer section
@ -386,6 +404,8 @@ func (node *Node) Start() {
fmt.Printf("Transfer ended successfully\n") fmt.Printf("Transfer ended successfully\n")
node.state.Stopped = true node.state.Stopped = true
continue
} }
// if allowed to transfer and the other node is ready to receive packets - send one piece // if allowed to transfer and the other node is ready to receive packets - send one piece
@ -394,7 +414,15 @@ func (node *Node) Start() {
// sending a piece of a single file // sending a piece of a single file
currentFileIndex := node.transferInfo.Sending.CurrentFileIndex // determine an index of a file with current ID
var currentFileIndex uint64 = 0
for index, fileToSend := range node.transferInfo.Sending.FilesToSend {
if fileToSend.ID == node.transferInfo.Sending.CurrentFileID {
currentFileIndex = uint64(index)
break
}
}
err = protocol.SendPiece(node.transferInfo.Sending.FilesToSend[currentFileIndex], node.netInfo.Conn, node.netInfo.EncryptionKey) err = protocol.SendPiece(node.transferInfo.Sending.FilesToSend[currentFileIndex], node.netInfo.Conn, node.netInfo.EncryptionKey)
switch err { switch err {
case protocol.ErrorSentAll: case protocol.ErrorSentAll:
@ -419,21 +447,10 @@ func (node *Node) Start() {
protocol.SendPacket(node.netInfo.Conn, endFilePacket) protocol.SendPacket(node.netInfo.Conn, endFilePacket)
// start sending the next file node.transferInfo.Sending.FilesToSend = append(node.transferInfo.Sending.FilesToSend[:currentFileIndex], node.transferInfo.Sending.FilesToSend[currentFileIndex+1:]...)
if uint64(len(node.transferInfo.Sending.FilesToSend)) == (node.transferInfo.Sending.CurrentFileIndex + 1) {
// all files has been sent
node.state.Stopped = true
// send DONE packet
donePacket := protocol.Packet{
Header: protocol.HeaderDone,
}
protocol.SendPacket(node.netInfo.Conn, donePacket)
} else { // start sending the next file
node.transferInfo.Sending.CurrentFileIndex++ node.transferInfo.Sending.CurrentFileID++
}
case nil: case nil:
node.transferInfo.Sending.CanSendBytes = false node.transferInfo.Sending.CanSendBytes = false
@ -441,8 +458,7 @@ func (node *Node) Start() {
default: default:
node.state.Stopped = true node.state.Stopped = true
CurrentFileIndex := node.transferInfo.Sending.CurrentFileIndex fmt.Printf("An error occured while sending a piece of \"%s\": %s\n", node.transferInfo.Sending.FilesToSend[currentFileIndex].Name, err)
fmt.Printf("An error occured while sending a piece of \"%s\": %s\n", node.transferInfo.Sending.FilesToSend[CurrentFileIndex].Name, err)
panic(err) panic(err)
} }
@ -585,7 +601,12 @@ func (node *Node) Start() {
panic(err) panic(err)
} }
file.Path = filepath.Join(node.transferInfo.Receiving.DownloadsPath, file.RelativeParentPath) if file.RelativeParentPath == "" {
// does not have a parent dir
file.Path = filepath.Join(node.transferInfo.Receiving.DownloadsPath, file.Name)
} else {
file.Path = filepath.Join(node.transferInfo.Receiving.DownloadsPath, file.RelativeParentPath)
}
// create all underlying directories right ahead // create all underlying directories right ahead
err = os.MkdirAll(filepath.Dir(file.Path), os.ModePerm) err = os.MkdirAll(filepath.Dir(file.Path), os.ModePerm)
@ -593,17 +614,54 @@ func (node *Node) Start() {
panic(err) panic(err)
} }
// check if the file already exists; if yes - remove it and replace with a new one // check if the file already exists
_, err = os.Stat(file.Path) _, err = os.Stat(file.Path)
if err == nil { if err == nil {
// exists // exists
// remove it // check if it is the exact file
os.Remove(file.Path) existingFileHandler, err := os.Open(file.Path)
} if err != nil {
os.Remove(file.Path)
}
node.mutex.Lock() existingFileChecksum, _ := checksum.GetPartialCheckSum(existingFileHandler)
node.transferInfo.Receiving.AcceptedFiles = append(node.transferInfo.Receiving.AcceptedFiles, file)
node.mutex.Unlock() if existingFileChecksum == file.Checksum {
// it`s the exact same file. No need to receive it again
// notify the other node
fmt.Printf("Already have \"%s\". Skipping...\n\n", file.Name)
alreadyHavePacketBodyBuffer := new(bytes.Buffer)
binary.Write(alreadyHavePacketBodyBuffer, binary.BigEndian, file.ID)
alreadyHavePacket := protocol.Packet{
Header: protocol.HeaderAlreadyHave,
Body: alreadyHavePacketBodyBuffer.Bytes(),
}
encryptedBody, err := encryption.Encrypt(node.netInfo.EncryptionKey, alreadyHavePacket.Body)
if err != nil {
panic(err)
}
alreadyHavePacket.Body = encryptedBody
protocol.SendPacket(node.netInfo.Conn, alreadyHavePacket)
} else {
// not the same file. Remove it and await new bytes
os.Remove(file.Path)
}
existingFileHandler.Close()
} else {
// does not exist
node.mutex.Lock()
node.transferInfo.Receiving.AcceptedFiles = append(node.transferInfo.Receiving.AcceptedFiles, file)
node.mutex.Unlock()
}
case protocol.HeaderFileBytes: case protocol.HeaderFileBytes:
// check if this file has been accepted to receive // check if this file has been accepted to receive
@ -632,17 +690,6 @@ func (node *Node) Start() {
wrote, err := acceptedFile.Handler.WriteAt(fileBytes, int64(acceptedFile.SentBytes)) wrote, err := acceptedFile.Handler.WriteAt(fileBytes, int64(acceptedFile.SentBytes))
if err != nil { if err != nil {
panic(err) panic(err)
// // fmt.Printf("[Debug] %+v\n", acceptedFile)
// // this file won`t be completed, so it`ll be ignored
// // remove this file from the pool
// node.transferInfo.Receiving.AcceptedFiles = append(node.transferInfo.Receiving.AcceptedFiles[:index], node.transferInfo.Receiving.AcceptedFiles[index+1:]...)
// fmt.Printf("[ERROR] an error occured when receiving a file \"%s\": %s. This file will not be completed", acceptedFile.Name, err)
// // remove it from the filesystem
// os.Remove(acceptedFile.Path)
} }
acceptedFile.SentBytes += uint64(wrote) acceptedFile.SentBytes += uint64(wrote)

7
src/protocol/headers.go

@ -87,3 +87,10 @@ const HeaderEndfile Header = "ENDFILE"
// between a file and a directory. // between a file and a directory.
// ie: DIRECTORY~(dirname size in binary)(dirname)(dirsize) // ie: DIRECTORY~(dirname size in binary)(dirname)(dirsize)
const HeaderDirectory Header = "DIRECTORY" const HeaderDirectory Header = "DIRECTORY"
// ALREADYHAVE
// Sent by receiver in case there is the same file that already exists.
// Sender upon receiving such packet with specified file ID must not send it.
// Body must contain a file ID.
// ie: ALREADYHAVE~(file ID in binary)
const HeaderAlreadyHave Header = "ALREADYHAVE"

Loading…
Cancel
Save