Browse Source

In-file memory queue! No more insane RAM consumption

master
parent
commit
d877a483a2
  1. 3
      .gitignore
  2. 2
      src/config/config.go
  3. 6
      src/main.go
  4. 57
      src/queue/visitqueue.go
  5. 6
      src/web/job.go
  6. 60
      src/worker/worker.go

3
.gitignore vendored

@ -7,4 +7,5 @@ conf_mega_ita.json
wecr
release/
scraped/
extracted_data.txt
extracted_data.txt
visit_queue.tmp

2
src/config/config.go

@ -71,6 +71,7 @@ type Conf struct {
InitialPages []string `json:"initial_pages"`
AllowedDomains []string `json:"allowed_domains"`
BlacklistedDomains []string `json:"blacklisted_domains"`
InMemoryVisitQueue bool `json:"in_memory_visit_queue"`
Save Save `json:"save"`
Logging Logging `json:"logging"`
}
@ -98,6 +99,7 @@ func Default() *Conf {
Workers: 20,
AllowedDomains: []string{""},
BlacklistedDomains: []string{""},
InMemoryVisitQueue: false,
Logging: Logging{
OutputLogs: true,
LogsFile: "logs.log",

6
src/main.go

@ -33,6 +33,7 @@ import (
"time"
"unbewohnte/wecr/config"
"unbewohnte/wecr/logger"
"unbewohnte/wecr/queue"
"unbewohnte/wecr/utilities"
"unbewohnte/wecr/web"
"unbewohnte/wecr/worker"
@ -335,20 +336,19 @@ func main() {
}
defer func() {
visitQueueFile.Close()
// os.Remove(filepath.Join(workingDirectory, defaultVisitQueueFile))
os.Remove(filepath.Join(workingDirectory, defaultVisitQueueFile))
}()
}
// create initial jobs
if !conf.InMemoryVisitQueue {
encoder := json.NewEncoder(visitQueueFile)
for _, initialPage := range conf.InitialPages {
var newJob web.Job = web.Job{
URL: initialPage,
Search: conf.Search,
Depth: conf.Depth,
}
err = encoder.Encode(&newJob)
err = queue.InsertNewJob(visitQueueFile, newJob)
if err != nil {
logger.Error("Failed to encode an initial job to the visit queue: %s", err)
continue

57
src/queue/visitqueue.go

@ -0,0 +1,57 @@
package queue
import (
"encoding/json"
"io"
"os"
"unbewohnte/wecr/logger"
"unbewohnte/wecr/web"
)
func PopLastJob(queue *os.File) (*web.Job, error) {
stats, err := queue.Stat()
if err != nil {
return nil, err
}
if stats.Size() == 0 {
return nil, nil
}
// find the last job in the queue
var job web.Job
var offset int64 = -1
for {
currentOffset, err := queue.Seek(offset, io.SeekEnd)
if err != nil {
return nil, err
}
decoder := json.NewDecoder(queue)
err = decoder.Decode(&job)
if err != nil || job.URL == "" || job.Search.Query == "" {
offset -= 1
continue
}
logger.Info("Found job: %+v", job)
queue.Truncate(currentOffset)
return &job, nil
}
}
func InsertNewJob(queue *os.File, newJob web.Job) error {
offset, err := queue.Seek(0, io.SeekEnd)
if err != nil {
return err
}
logger.Info("Inserting at offset %d", offset)
encoder := json.NewEncoder(queue)
err = encoder.Encode(&newJob)
if err != nil {
return err
}
return nil
}

6
src/web/job.go

@ -22,7 +22,7 @@ import "unbewohnte/wecr/config"
// Job to pass around workers
type Job struct {
URL string
Search config.Search
Depth uint
URL string `json:"u"`
Search config.Search `json:"s"`
Depth uint `json:"d"`
}

60
src/worker/worker.go

@ -25,18 +25,26 @@ import (
"path"
"path/filepath"
"regexp"
"sync"
"time"
"unbewohnte/wecr/config"
"unbewohnte/wecr/logger"
"unbewohnte/wecr/queue"
"unbewohnte/wecr/web"
)
type VisitQueue struct {
VisitQueue *os.File
Lock *sync.Mutex
}
// Worker configuration
type WorkerConf struct {
Requests config.Requests
Save config.Save
BlacklistedDomains []string
AllowedDomains []string
VisitQueue VisitQueue
}
// Web worker
@ -133,7 +141,22 @@ func (w *Worker) Work() {
return
}
for job := range w.Jobs {
for {
var job web.Job
if w.Conf.VisitQueue.VisitQueue != nil {
w.Conf.VisitQueue.Lock.Lock()
newJob, err := queue.PopLastJob(w.Conf.VisitQueue.VisitQueue)
if err != nil || newJob == nil {
logger.Error("Failed to get a new job from visit queue: %s", err)
w.Conf.VisitQueue.Lock.Unlock()
continue
}
job = *newJob
w.Conf.VisitQueue.Lock.Unlock()
} else {
job = <-w.Jobs
}
// check if the worker has been stopped
if w.Stopped {
// stop working
@ -209,18 +232,39 @@ func (w *Worker) Work() {
go func() {
if job.Depth > 1 {
// decrement depth and add new jobs to the channel
// decrement depth and add new jobs
job.Depth--
for _, link := range pageLinks {
if link != job.URL {
w.Jobs <- web.Job{
URL: link,
Search: job.Search,
Depth: job.Depth,
if w.Conf.VisitQueue.VisitQueue != nil {
// add to the visit queue
w.Conf.VisitQueue.Lock.Lock()
for _, link := range pageLinks {
if link != job.URL {
err = queue.InsertNewJob(w.Conf.VisitQueue.VisitQueue, web.Job{
URL: link,
Search: job.Search,
Depth: job.Depth,
})
if err != nil {
logger.Error("Failed to encode a new job to a visit queue: %s", err)
continue
}
}
}
w.Conf.VisitQueue.Lock.Unlock()
} else {
// add to the in-memory channel
for _, link := range pageLinks {
if link != job.URL {
w.Jobs <- web.Job{
URL: link,
Search: job.Search,
Depth: job.Depth,
}
}
}
}
}
pageLinks = nil
}()

Loading…
Cancel
Save