Skip to content

Commit

Permalink
fix indexing huge doc
Browse files Browse the repository at this point in the history
  • Loading branch information
huahaiy committed Jan 16, 2023
1 parent 0b1abae commit 54d8df7
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 144 deletions.
16 changes: 9 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
## WIP
### Changed
- [Search] **breaking** search index storage format change. Data re-indexing is necessary.
### Improved
- [Search] orders of magnitude indexing speed improvement. #151
- [Search] removing and updating documents are no longer slow.
- [Search] added caching for term and document indices.
### Added
- [Search] `:index-position?` option to indicate whether to record term
positions inside documents, default `false`.
- [Search] `:check-exist?` argument to `add-doc`indicate whether to check the
existence of the document in the index, default `true`. Set it to `false` when
importing data to improve ingestion speed.
### Fixed
- [Search] error when indexing huge documents.
- [KV] spillable results exception in certain cases.
### Removed
- [Search] `doc-refs` function.
- [Search] `search-index-writer` as well as related `write` and `commit`functions.
### Improved
- [Search] orders of magnitude indexing speed improvement. #151
- [Search] removing and updating documents are no longer slow
- [Search] caching term and document indices
### Fixed
- [KV] spillable results exception in certain cases
- [Search] `search-index-writer` as well as related `write` and
`commit`functions, as they are no longer needed.

## 0.7.12 (2023-01-11)
### Fixed
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[[babashka/babashka.pods "0.2.0"]
[com.cognitect/transit-clj "1.0.329"]
[com.fasterxml.jackson.core/jackson-core "2.14.1"]
[com.fasterxml.jackson.core/jackson-databind "2.14.1"]
[com.github.clj-easy/graal-build-time "0.1.4"]
[com.github.jnr/jnr-ffi "2.2.13"]
[com.taoensso/encore "3.46.0"]
Expand Down
28 changes: 28 additions & 0 deletions search-bench/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
(def version "0.7.12")

(defproject org.clojars.huahaiy/search-bench version
:description "Datalevin search benchmark"
:parent-project {:path "../project.clj"
:inherit [:managed-dependencies :profiles
:global-vars
:uberjar-exclusions]}
:dependencies [[org.clojure/clojure]
[com.github.jnr/jnr-ffi]
[com.taoensso/encore]
[com.taoensso/nippy]
[com.fasterxml.jackson.core/jackson-core]
[com.fasterxml.jackson.core/jackson-databind]
[org.roaringbitmap/RoaringBitmap]
[org.eclipse.collections/eclipse-collections]
[me.lemire.integercompression/JavaFastPFOR]
[com.cognitect/transit-clj]
[org.lmdbjava/lmdbjava]]
:jvm-opts ["--add-opens" "java.base/java.nio=ALL-UNNAMED"
"--add-opens" "java.base/sun.nio.ch=ALL-UNNAMED"
"-Djdk.attach.allowAttachSelf"
"-Dclojure.compiler.direct-linking=true"]
:source-paths ["../src" "src"]
:java-source-paths ["../src/java"]
:plugins [[lein-parent "0.3.8"]]
:profiles {:datalevin {:main datalevin.bench}
:lucene {:main lucene.bench}})
18 changes: 11 additions & 7 deletions search-bench/src/datalevin/bench.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,20 @@
(let [pool (Executors/newWorkStealingPool)]
(search 0 pool dir filename n)))

(defn run [opts]
(defn -main []
(println)
(println "Datalevin:")
(index-wiki-json "data/wiki-datalevin-100" "data/test.json")

;; (index-wiki-json "data/wiki-datalevin-100" "data/wiki100.json")
;; (println "Done indexing.")
;; (query (d/new-search-engine (d/open-kv "data/wiki-datalevin-100"))
;; "data/queries100.txt" 100)
;; (println "Done query.")

(index-wiki-json "data/wiki-datalevin-all" "data/wiki.json")
(println "Done indexing.")
(query (d/new-search-engine (d/open-kv "data/wiki-datalevin-100"))
"data/queries100.txt" 100)
(query (d/new-search-engine (d/open-kv "data/wiki-datalevin-all"))
"queries40k.txt" 40000)
(println "Done query.")
;; (index-wiki-json "data/wiki-datalevin-all" "data/wiki.json")
;; (query (d/new-search-engine (d/open-kv "data/wiki-datalevin-all"))
;; "queries40k.txt" 40000)

)
36 changes: 20 additions & 16 deletions search-bench/src/lucene/bench.clj
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
(ns lucene.bench
(:require [clojure.java.io :as io])
(:import [org.apache.lucene.analysis.standard ClassicAnalyzer]
[org.apache.lucene.document Document Field Field$Store StringField
TextField]
[org.apache.lucene.index IndexWriter IndexWriterConfig IndexReader
DirectoryReader]
[org.apache.lucene.queryparser.classic QueryParser]
[org.apache.lucene.search IndexSearcher TopScoreDocCollector Query ScoreDoc TopDocs]
[org.apache.lucene.store FSDirectory]
[java.nio.file Paths]
[java.util HashMap Arrays]
[java.util.concurrent Executors TimeUnit ConcurrentLinkedQueue]
[java.io FileInputStream]
[com.fasterxml.jackson.databind ObjectMapper]
[com.fasterxml.jackson.core JsonFactory JsonParser JsonToken]))
(:import
[org.apache.lucene.analysis.standard ClassicAnalyzer]
[org.apache.lucene.document Document Field Field$Store StringField
TextField]
[org.apache.lucene.index IndexWriter IndexWriterConfig IndexReader
DirectoryReader]
[org.apache.lucene.queryparser.classic QueryParser]
[org.apache.lucene.search IndexSearcher TopScoreDocCollector Query ScoreDoc TopDocs]
[org.apache.lucene.store FSDirectory]
[java.nio.file Paths]
[java.util HashMap Arrays]
[java.util.concurrent Executors TimeUnit ConcurrentLinkedQueue]
[java.io FileInputStream]
[com.fasterxml.jackson.databind ObjectMapper]
[com.fasterxml.jackson.core JsonFactory JsonParser JsonToken]))

(defn index-wiki-json
[dir ^String filename]
Expand Down Expand Up @@ -96,8 +97,11 @@
(let [pool (Executors/newWorkStealingPool)]
(search 0 pool dir filename n)))

(defn run [opts]
(defn -main []
(println)
(println "Lucene:")
(index-wiki-json "data/wiki-lucene-all" "wiki.json")
(query "data/wiki-lucene-all" "queries40k.txt" 40000))
(println "Done indexing.")
(query "data/wiki-lucene-all" "queries40k.txt" 40000)
(println "Done query.")
)
4 changes: 2 additions & 2 deletions src/datalevin/binding/java.clj
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@
(do (reset-write-txn this)
(raise "DB resized" {:resized true}))))
(catch Exception e
;; (st/print-stack-trace e)
(raise "Fail to transact to LMDB: " (ex-message e) {}))))))
(st/print-stack-trace e)
(raise "Fail to transact to LMDB: " e {}))))))

(get-value [this dbi-name k]
(.get-value this dbi-name k :data :data true))
Expand Down
2 changes: 1 addition & 1 deletion src/datalevin/main.clj
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@
(prn res))))
(recur))))))))

(defn ^:no-doc -main [& args]
(defn -main [& args]
(let [{:keys [command options arguments summary exit-message ok?]}
(validate-args args)]
(if exit-message
Expand Down
27 changes: 14 additions & 13 deletions src/datalevin/search.clj
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@
(doto (FastList.) (.add [position offset]))))))
terms))

(defn- idf
(defn idf
"inverse document frequency of a term"
[^long freq N]
(if (zero? freq) 0 (Math/log10 (/ ^long N freq))))

(defn- tf*
(defn tf*
"log-weighted term frequency"
[freq]
(if (zero? ^short freq) 0 (+ (Math/log10 ^short freq) 1)))
Expand Down Expand Up @@ -328,16 +328,17 @@
ISearchEngine
(add-doc [this doc-ref doc-text check-exist?]
(locking docs
(try
(when-not (s/blank? doc-text)
(when check-exist?
(when-let [doc-id (doc-ref->id this doc-ref)]
(remove-doc* this doc-id doc-ref)))
(add-doc* this doc-ref doc-text))
(catch Exception e
(st/print-stack-trace e)
(u/raise "Error indexing document:" (ex-message e)
{:doc-ref doc-ref :doc-text doc-text})))))
(when-not (s/blank? doc-text)
(when check-exist?
(when-let [doc-id (doc-ref->id this doc-ref)]
(remove-doc* this doc-id doc-ref)))
(add-doc* this doc-ref doc-text))
#_(try

(catch Exception e
(st/print-stack-trace e)
(u/raise "Error indexing document:" (ex-message e)
{:doc-ref doc-ref :doc-text doc-text})))))
(add-doc [this doc-ref doc-text]
(.add-doc this doc-ref doc-text true))

Expand Down Expand Up @@ -553,7 +554,7 @@
(assert (not (l/closed-kv? lmdb)) "LMDB env is closed.")

;; term -> term-id,max-weight,doc-freq
(l/open-dbi lmdb terms-dbi {:key-size c/+max-term-length+})
(l/open-dbi lmdb terms-dbi {:key-size c/+max-key-size+})

;; doc-ref -> doc-id,norm,term-bm
(l/open-dbi lmdb docs-dbi {:key-size c/+max-key-size+})
Expand Down
75 changes: 36 additions & 39 deletions src/datalevin/sparselist.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
(ns ^:no-doc datalevin.sparselist
"Sparse array list of integers"
(:refer-clojure :exclude [get set remove])
(:import [java.nio ByteBuffer]
[java.io Writer]
[datalevin.utl GrowingIntArray]
[me.lemire.integercompression IntCompressor]
[org.roaringbitmap RoaringBitmap]))
(:require
[taoensso.nippy :as nippy])
(:import
[java.io Writer DataInput DataOutput]
[java.nio ByteBuffer]
[java.io Writer]
[datalevin.utl GrowingIntArray]
[me.lemire.integercompression IntCompressor]
[org.roaringbitmap RoaringBitmap]))

(defonce compressor (IntCompressor.))

Expand All @@ -30,7 +34,6 @@
(.get items (dec (.rank indices index)))))

(set [this index item]

(let [index (int index)]
(if (.contains indices index)
(.set items (dec (.rank indices index)) item)
Expand All @@ -46,30 +49,53 @@
(size [_]
(.getCardinality indices))

(select [this nth]
(select [_ nth]
(.get items nth))

(serialize [this bf]
(serialize [_ bf]
(let [ar (.toArray items)
car (.compress ^IntCompressor compressor ar)
size (alength car)]
(.putInt ^ByteBuffer bf size)
(dotimes [i size] (.putInt ^ByteBuffer bf (aget car i))))
(.serialize indices ^ByteBuffer bf))

(deserialize [this bf]
(deserialize [_ bf]
(let [size (.getInt ^ByteBuffer bf)
car (int-array size)]
(dotimes [i size] (aset car i (.getInt ^ByteBuffer bf)))
(.addAll items (.uncompress ^IntCompressor compressor car)))
(.deserialize indices ^ByteBuffer bf))

Object
(equals [this other]
(equals [_ other]
(and (instance? SparseIntArrayList other)
(.equals indices (.-indices ^SparseIntArrayList other))
(.equals items (.-items ^SparseIntArrayList other)))))

(nippy/extend-freeze
SparseIntArrayList :dtlv/sial
[^SparseIntArrayList x ^DataOutput out]
(let [ar (.toArray ^GrowingIntArray (.-items x))
car (.compress ^IntCompressor compressor ar)
size (alength car)]
(.writeInt out size)
(dotimes [i size] (.writeInt out (aget car i))))
(let [^RoaringBitmap bm (.-indices x)]
(.runOptimize bm)
(nippy/freeze-to-out! out bm)))

(nippy/extend-thaw
:dtlv/sial
[^DataInput in]
(let [size (.readInt in)
car (int-array size)]
(dotimes [i size] (aset car i (.readInt in)))
(let [items (GrowingIntArray.)]
(->SparseIntArrayList
(nippy/thaw-from-in! in)
(.addAll items (.uncompress ^IntCompressor compressor car))))))

(defn sparse-arraylist
([]
(->SparseIntArrayList (RoaringBitmap.) (GrowingIntArray.)))
Expand All @@ -87,32 +113,3 @@
(.write w (str "#datalevin/SparseList "))
(binding [*out* w]
(pr (for [i (.-indices s)] [i (get s i)]))))

(comment


(-> (sparse-arraylist)
(set 2 2)
(set 1 1)
(set 3 3)
(set 1 5)
)

(= (-> (sparse-arraylist)
(set 2 2)
(set 1 1)
(set 3 3))
(-> (sparse-arraylist)
(set 1 1)
(set 2 2)
(set 3 3)))


(.size (.-items (-> (sparse-arraylist)
(set 2 2)
(set 1 1)
(set 3 3)
(set 1 5))))


)
Loading

0 comments on commit 54d8df7

Please sign in to comment.