Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

searcher: make indexing of repos concurrent #272

Merged
merged 6 commits into from
Jan 18, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
searcher: make indexing of repos concurrent
This patch attempts to improve the startup time of hound vastly in cases
when we have huge number of repositories and hound would generally take
long time to start because of its sequential nature of indexing.

Now, we have the startup indexing in a concurrent way while respecting
the config.MaxConcurrentIndexers parameter set by users in config.json.

Fixes #250
  • Loading branch information
sahildua2305 committed Dec 30, 2017
commit 74815e94efdfd32adfd7f6c1e66d7ad17b86f012
92 changes: 84 additions & 8 deletions searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ type Searcher struct {
doneCh chan empty
}

// Wrapper around searcher - used for sending the searcher struct to channel once
// successfully returned from `newSearcher` function.
// We need this wrapper because we need to have the `name` of a repo which a
// particular searcher belongs to.
type searcherWrapper struct {
searcher *Searcher
name string
}

// Wrapper around error - used for sending error to channel once returned from
// `newSearcher` function.
// We need this wrapper because we need to have the `name` of a repo which a
// particular error belongs to.
type errorWrapper struct {
error error
name string
}

type empty struct{}
type limiter chan bool

Expand Down Expand Up @@ -277,21 +295,49 @@ func MakeAll(cfg *config.Config) (map[string]*Searcher, map[string]error, error)

lim := makeLimiter(cfg.MaxConcurrentIndexers)

for name, repo := range cfg.Repos {
s, err := newSearcher(cfg.DbPath, name, repo, refs, lim)
if err != nil {
log.Print(err)
errs[name] = err
continue
}
// Channel to receive the successfully created searchers.
searcherCh := make(chan searcherWrapper, 1)
// Channel to receive the errors in creation of searchers.
errorCh := make(chan errorWrapper, 1)
// Channel to quit the go routine listening for new successful searchers or errors.
quitCh := make(chan struct{}, 1)

// Create a wait group for total number of repos so that we can proceed once the
// go routines to create searchers for all repos have returned.
var wg sync.WaitGroup
wg.Add(len(cfg.Repos))

searchers[name] = s
for name, repo := range cfg.Repos {
go newSearcherConcurrent(cfg.DbPath, name, repo, refs, lim, searcherCh, errorCh)
}

// Create a listener on searcherCh and errorCh.
// It also listens on quitCh to decide when to return.
go func() {
for {
select {
case sw := <-searcherCh:
searchers[sw.name] = sw.searcher
wg.Done()
case ew := <-errorCh:
log.Print(ew.error)
errs[ew.name] = ew.error
wg.Done()
case <-quitCh:
return
}
}
}()

if err := refs.removeUnclaimed(); err != nil {
return nil, nil, err
}

// Wait for all go routines to finish.
wg.Wait()
// Send close signal to quitCh to return the listening go routine.
close(quitCh)

// after all the repos are in good shape, we start their polling
for _, s := range searchers {
s.begin()
Expand Down Expand Up @@ -464,3 +510,33 @@ func newSearcher(

return s, nil
}

// This function is a wrapper around `newSearcher` function.
// It respects the parameter `cfg.MaxConcurrentIndexers` while making the
// creation of searchers for various repositories concurrent.
func newSearcherConcurrent(
dbpath, name string,
repo *config.Repo,
refs *foundRefs,
lim limiter,
searcherCh chan searcherWrapper,
errorCh chan errorWrapper) {

// acquire a token from the rate limiter
lim.Acquire()
defer lim.Release()

s, err := newSearcher(dbpath, name, repo, refs, lim)
if err != nil {
errorCh <- errorWrapper{
error: err,
name: name,
}
return
}

searcherCh <- searcherWrapper{
searcher: s,
name: name,
}
}