Skip to content

Commit

Permalink
Made some changes to hound-search#116.
Browse files Browse the repository at this point in the history
  • Loading branch information
kellegous committed Apr 28, 2015
1 parent c360072 commit 95306e0
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 67 deletions.
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,7 @@ not work.
## Keeping Repos Updated
By default Hound polls the URL in the config for updates every 30 seconds. You can override this value by setting the `ms-between-poll` key on a per repo basis in the config. You can see how this works in the [example config](config-example.json).
### Connection Limiting
During the update phase, Hound will limit the number of concurrent connections open against your vcs system to 1.
To override this setting, add a "max-connections" value to your top-level config.json file. When going against an
internal service, you can probably set this value to whatever you want, but against an externally-hosted provider,
we recommend limiting the connections to 20 or 50. We recommend working with your eternal host to find the right
value for your organization.
By default Hound polls the URL in the config for updates every 30 seconds. You can override this value by setting the `ms-between-poll` key on a per repo basis in the config. If you are indexing a large number of repositories, you may also be interested in tweaking the `max-concurrent-indexers` property. You can see how these work in the [example config](config-example.json).
## Editor Integration
Expand Down
13 changes: 1 addition & 12 deletions cmds/houndd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ var (
error_log *log.Logger
)

// Create a channel with either max-connections from the config, or 1 if it's not defined
func createLimiterChannel(cfg *config.Config) chan int {
if cfg.MaxConcurrentVCSConnections == 0 {
return make(chan int, 1)
} else {
return make(chan int, cfg.MaxConcurrentVCSConnections)
}
}

func makeSearchers(cfg *config.Config) (map[string]*searcher.Searcher, bool, error) {
// Ensure we have a dbpath
if _, err := os.Stat(cfg.DbPath); err != nil {
Expand All @@ -36,9 +27,7 @@ func makeSearchers(cfg *config.Config) (map[string]*searcher.Searcher, bool, err
}
}

connectionLimiter := createLimiterChannel(cfg)

searchers, errs, err := searcher.MakeAll(cfg, connectionLimiter)
searchers, errs, err := searcher.MakeAll(cfg)
if err != nil {
return nil, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion config-example.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"max-connections" : 25,
"max-concurrent-indexers" : 2,
"dbpath" : "data",
"repos" : {
"SomeGitRepo" : {
Expand Down
28 changes: 19 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
)

const (
defaultMsBetweenPoll = 30000
defaultPushEnabled = false
defaultPollEnabled = true
defaultVcs = "git"
defaultBaseUrl = "{url}/blob/master/{path}{anchor}"
defaultAnchor = "#L{line}"
defaultMsBetweenPoll = 30000
defaultMaxConcurrentIndexers = 2
defaultPushEnabled = false
defaultPollEnabled = true
defaultVcs = "git"
defaultBaseUrl = "{url}/blob/master/{path}{anchor}"
defaultAnchor = "#L{line}"
)

type UrlPattern struct {
Expand Down Expand Up @@ -52,9 +53,9 @@ func (r *Repo) PushUpdatesEnabled() bool {
}

type Config struct {
DbPath string `json:"dbpath"`
MaxConcurrentVCSConnections int `json:"max-connections"`
Repos map[string]*Repo `json:"repos"`
DbPath string `json:"dbpath"`
Repos map[string]*Repo `json:"repos"`
MaxConcurrentIndexers int `json:"max-concurrent-indexers"`
}

// SecretMessage is just like json.RawMessage but it will not
Expand Down Expand Up @@ -111,6 +112,13 @@ func initRepo(r *Repo) {
}
}

// Populate missing config values with default values.
func initConfig(c *Config) {
if c.MaxConcurrentIndexers == 0 {
c.MaxConcurrentIndexers = defaultMaxConcurrentIndexers
}
}

func (c *Config) LoadFromFile(filename string) error {
r, err := os.Open(filename)
if err != nil {
Expand All @@ -135,6 +143,8 @@ func (c *Config) LoadFromFile(filename string) error {
initRepo(repo)
}

initConfig(c)

return nil
}

Expand Down
116 changes: 80 additions & 36 deletions searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Searcher struct {
updateCh chan time.Time
}

type limiter chan bool

/**
* Holds a set of IndexRefs that were found in the dbpath at startup,
* these indexes can be 'claimed' and re-used by newly created searchers.
Expand All @@ -39,6 +41,18 @@ type foundRefs struct {
claimed map[*index.IndexRef]bool
}

func makeLimiter(n int) limiter {
return limiter(make(chan bool, n))
}

func (l limiter) Acquire() {
l <- true
}

func (l limiter) Release() {
<-l
}

/**
* Find an Index ref for the repo url and rev, returns nil if no such
* ref exists.
Expand Down Expand Up @@ -232,7 +246,7 @@ func init() {
// NOTE: The keys in the searcher map will be normalized to lower case, but not such
// transformation will be done on the error map to make it easier to match those errors
// back to the original repo name.
func MakeAll(cfg *config.Config, connectionLimiter) (map[string]*Searcher, map[string]error, error) {
func MakeAll(cfg *config.Config) (map[string]*Searcher, map[string]error, error) {
errs := map[string]error{}
searchers := map[string]*Searcher{}

Expand All @@ -241,8 +255,10 @@ func MakeAll(cfg *config.Config, connectionLimiter) (map[string]*Searcher, map[s
return nil, nil, err
}

lim := makeLimiter(cfg.MaxConcurrentIndexers)

for name, repo := range cfg.Repos {
s, err := newSearcher(cfg.DbPath, name, repo, refs, connectionLimiter)
s, err := newSearcher(cfg.DbPath, name, repo, refs, lim)
if err != nil {
log.Print(err)
errs[name] = err
Expand All @@ -266,8 +282,8 @@ func MakeAll(cfg *config.Config, connectionLimiter) (map[string]*Searcher, map[s

// Creates a new Searcher that is available for searches as soon as this returns.
// This will pull or clone the target repo and start watching the repo for changes.
func New(dbpath, name string, repo *config.Repo, connectionLimiter chan int) (*Searcher, error) {
s, err := newSearcher(dbpath, name, repo, &foundRefs{}, connectionLimiter)
func New(dbpath, name string, repo *config.Repo) (*Searcher, error) {
s, err := newSearcher(dbpath, name, repo, &foundRefs{}, makeLimiter(1))
if err != nil {
return nil, err
}
Expand All @@ -277,9 +293,65 @@ func New(dbpath, name string, repo *config.Repo, connectionLimiter chan int) (*S
return s, nil
}

// Update the vcs and reindex the given repo.
func updateAndReindex(
s *Searcher,
dbpath,
vcsDir,
name,
rev string,
wd *vcs.WorkDir,
opt *index.IndexOptions,
lim limiter) (string, bool) {

// acquire a token from the rate limiter
lim.Acquire()
defer lim.Release()

repo := s.Repo
newRev, err := wd.PullOrClone(vcsDir, repo.Url)

if err != nil {
log.Printf("vcs pull error (%s - %s): %s", name, repo.Url, err)
return rev, false
}

if newRev == rev {
return rev, false
}

log.Printf("Rebuilding %s for %s", name, newRev)
idx, err := buildAndOpenIndex(
opt,
dbpath,
vcsDir,
nextIndexDir(dbpath),
repo.Url,
newRev)
if err != nil {
log.Printf("failed index build (%s): %s", name, err)
return rev, false
}

if err := s.swapIndexes(idx); err != nil {
log.Printf("failed index swap (%s): %s", name, err)
if err := idx.Destroy(); err != nil {
log.Printf("failed to destroy index (%s): %s\n", name, err)
}
return rev, false
}

return newRev, true
}

// Creates a new Searcher that is capable of re-claiming an existing index directory
// from a set of existing manifests.
func newSearcher(dbpath, name string, repo *config.Repo, refs *foundRefs, connectionLimiter chan int) (*Searcher, error) {
func newSearcher(
dbpath, name string,
repo *config.Repo,
refs *foundRefs,
lim limiter) (*Searcher, error) {

vcsDir := filepath.Join(dbpath, vcsDirFor(repo))

log.Printf("Searcher started for %s", name)
Expand Down Expand Up @@ -345,37 +417,9 @@ func newSearcher(dbpath, name string, repo *config.Repo, refs *foundRefs, connec
// Wait for a signal to proceed
s.waitForUpdate(delay)

connectionLimiter <- 1
newRev, err := wd.PullOrClone(vcsDir, repo.Url)
<-connectionLimiter

if err != nil {
log.Printf("vcs pull error (%s - %s): %s", name, repo.Url, err)
continue
}

if newRev == rev {
continue
}

log.Printf("Rebuilding %s for %s", name, newRev)
idx, err := buildAndOpenIndex(
opt,
dbpath,
vcsDir,
nextIndexDir(dbpath),
repo.Url,
newRev)
if err != nil {
log.Printf("failed index build (%s): %s", name, err)
continue
}

if err := s.swapIndexes(idx); err != nil {
log.Printf("failed index swap (%s): %s", name, err)
if err := idx.Destroy(); err != nil {
log.Printf("failed to destroy index (%s): %s\n", name, err)
}
// attempt to update and reindex this searcher
newRev, ok := updateAndReindex(s, dbpath, vcsDir, name, rev, wd, opt, lim)
if !ok {
continue
}

Expand Down

0 comments on commit 95306e0

Please sign in to comment.