diff --git a/.gitignore b/.gitignore index 10565a6..1ce36a8 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ conf_mega_ita.json wecr release/ scraped/ -extracted_data.txt \ No newline at end of file +extracted_data.txt +visit_queue.tmp \ No newline at end of file diff --git a/src/config/config.go b/src/config/config.go index c6c7af2..ae61715 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -71,6 +71,7 @@ type Conf struct { InitialPages []string `json:"initial_pages"` AllowedDomains []string `json:"allowed_domains"` BlacklistedDomains []string `json:"blacklisted_domains"` + InMemoryVisitQueue bool `json:"in_memory_visit_queue"` Save Save `json:"save"` Logging Logging `json:"logging"` } @@ -98,6 +99,7 @@ func Default() *Conf { Workers: 20, AllowedDomains: []string{""}, BlacklistedDomains: []string{""}, + InMemoryVisitQueue: false, Logging: Logging{ OutputLogs: true, LogsFile: "logs.log", diff --git a/src/main.go b/src/main.go index 6fff163..6038a98 100644 --- a/src/main.go +++ b/src/main.go @@ -33,6 +33,7 @@ import ( "time" "unbewohnte/wecr/config" "unbewohnte/wecr/logger" + "unbewohnte/wecr/queue" "unbewohnte/wecr/utilities" "unbewohnte/wecr/web" "unbewohnte/wecr/worker" @@ -335,20 +336,19 @@ func main() { } defer func() { visitQueueFile.Close() - // os.Remove(filepath.Join(workingDirectory, defaultVisitQueueFile)) + os.Remove(filepath.Join(workingDirectory, defaultVisitQueueFile)) }() } // create initial jobs if !conf.InMemoryVisitQueue { - encoder := json.NewEncoder(visitQueueFile) for _, initialPage := range conf.InitialPages { var newJob web.Job = web.Job{ URL: initialPage, Search: conf.Search, Depth: conf.Depth, } - err = encoder.Encode(&newJob) + err = queue.InsertNewJob(visitQueueFile, newJob) if err != nil { logger.Error("Failed to encode an initial job to the visit queue: %s", err) continue diff --git a/src/queue/visitqueue.go b/src/queue/visitqueue.go new file mode 100644 index 0000000..200e11c --- /dev/null +++ b/src/queue/visitqueue.go @@ -0,0 +1,57 @@ +package queue + +import ( + "encoding/json" + "io" + "os" + "unbewohnte/wecr/logger" + "unbewohnte/wecr/web" +) + +func PopLastJob(queue *os.File) (*web.Job, error) { + stats, err := queue.Stat() + if err != nil { + return nil, err + } + + if stats.Size() == 0 { + return nil, nil + } + + // find the last job in the queue + var job web.Job + var offset int64 = -1 + for { + currentOffset, err := queue.Seek(offset, io.SeekEnd) + if err != nil { + return nil, err + } + + decoder := json.NewDecoder(queue) + err = decoder.Decode(&job) + if err != nil || job.URL == "" || job.Search.Query == "" { + offset -= 1 + continue + } + logger.Info("Found job: %+v", job) + + queue.Truncate(currentOffset) + return &job, nil + } +} + +func InsertNewJob(queue *os.File, newJob web.Job) error { + offset, err := queue.Seek(0, io.SeekEnd) + if err != nil { + return err + } + logger.Info("Inserting at offset %d", offset) + + encoder := json.NewEncoder(queue) + err = encoder.Encode(&newJob) + if err != nil { + return err + } + + return nil +} diff --git a/src/web/job.go b/src/web/job.go index 4c66a7a..ee09ba7 100644 --- a/src/web/job.go +++ b/src/web/job.go @@ -22,7 +22,7 @@ import "unbewohnte/wecr/config" // Job to pass around workers type Job struct { - URL string - Search config.Search - Depth uint + URL string `json:"u"` + Search config.Search `json:"s"` + Depth uint `json:"d"` } diff --git a/src/worker/worker.go b/src/worker/worker.go index 40769e7..9c58219 100644 --- a/src/worker/worker.go +++ b/src/worker/worker.go @@ -25,18 +25,26 @@ import ( "path" "path/filepath" "regexp" + "sync" "time" "unbewohnte/wecr/config" "unbewohnte/wecr/logger" + "unbewohnte/wecr/queue" "unbewohnte/wecr/web" ) +type VisitQueue struct { + VisitQueue *os.File + Lock *sync.Mutex +} + // Worker configuration type WorkerConf struct { Requests config.Requests Save config.Save BlacklistedDomains []string AllowedDomains []string + VisitQueue VisitQueue } // Web worker @@ -133,7 +141,22 @@ func (w *Worker) Work() { return } - for job := range w.Jobs { + for { + var job web.Job + if w.Conf.VisitQueue.VisitQueue != nil { + w.Conf.VisitQueue.Lock.Lock() + newJob, err := queue.PopLastJob(w.Conf.VisitQueue.VisitQueue) + if err != nil || newJob == nil { + logger.Error("Failed to get a new job from visit queue: %s", err) + w.Conf.VisitQueue.Lock.Unlock() + continue + } + job = *newJob + w.Conf.VisitQueue.Lock.Unlock() + } else { + job = <-w.Jobs + } + // check if the worker has been stopped if w.Stopped { // stop working @@ -209,18 +232,39 @@ func (w *Worker) Work() { go func() { if job.Depth > 1 { - // decrement depth and add new jobs to the channel + // decrement depth and add new jobs job.Depth-- - for _, link := range pageLinks { - if link != job.URL { - w.Jobs <- web.Job{ - URL: link, - Search: job.Search, - Depth: job.Depth, + if w.Conf.VisitQueue.VisitQueue != nil { + // add to the visit queue + w.Conf.VisitQueue.Lock.Lock() + for _, link := range pageLinks { + if link != job.URL { + err = queue.InsertNewJob(w.Conf.VisitQueue.VisitQueue, web.Job{ + URL: link, + Search: job.Search, + Depth: job.Depth, + }) + if err != nil { + logger.Error("Failed to encode a new job to a visit queue: %s", err) + continue + } + } + } + w.Conf.VisitQueue.Lock.Unlock() + } else { + // add to the in-memory channel + for _, link := range pageLinks { + if link != job.URL { + w.Jobs <- web.Job{ + URL: link, + Search: job.Search, + Depth: job.Depth, + } } } } + } pageLinks = nil }()