-
Notifications
You must be signed in to change notification settings - Fork 0
/
get.go
135 lines (130 loc) · 2.93 KB
/
get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package lmdb
import (
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/dingyuqi/lmdb-storage/util"
"log"
"sync"
)
// Get 并发地批量获取数据. 如果查询的string不存在, 则返回的结果中没有该string
func (l *Driver) Get(data map[string]struct{}) (map[string]string, error) {
var wg sync.WaitGroup
retMap := make(map[string]string)
files, err := util.FetchAllParFiles(l.dbRoot)
if err != nil {
return retMap, err
}
ch := make(chan map[string]string, ChLen)
fileCh := make(chan string)
retCh := make(chan map[string]string)
go util.CollectRes(ch, retCh)
for i := 0; i <= 30; i++ {
wg.Add(1)
go l.getSingleFileData(fileCh, data, ch, &wg)
}
for _, file := range files {
fileCh <- file
}
close(fileCh)
wg.Wait()
close(ch)
retMap = <-retCh
return retMap, err
}
// GetAllReverse 遍历获取分区下所有数据并反转key,val后返回.
func (l *Driver) GetAllReverse() (map[string]string, error) {
var wg sync.WaitGroup
retMap := make(map[string]string)
files, err := util.FetchAllParFiles(l.dbRoot)
if err != nil {
return retMap, err
}
ch := make(chan map[string]string, ChLen)
for _, file := range files {
wg.Add(1)
go l.cursorAllReverse(file, ch, &wg)
}
retCh := make(chan map[string]string)
go util.CollectRes(ch, retCh)
wg.Wait()
close(ch)
retMap = <-retCh
return retMap, err
}
func (l *Driver) cursorAllReverse(path string, resCh chan map[string]string, group *sync.WaitGroup) {
env, err := l.newEnv(path)
if err != nil {
group.Done()
return
}
defer func(env *lmdb.Env) {
err := env.Close()
if err != nil {
log.Println(err)
return
}
}(env)
ret := make(map[string]string)
err = env.View(func(txn *lmdb.Txn) (err error) {
dbi, err := txn.OpenRoot(0)
if err != nil {
group.Done()
return
}
cur, err := txn.OpenCursor(dbi)
if err != nil {
group.Done()
return
}
defer cur.Close()
for {
k, v, err := cur.Get(nil, nil, lmdb.Next)
if lmdb.IsNotFound(err) {
return nil
}
ret[string(v)] = string(k)
}
})
if err != nil {
log.Println(err)
group.Done()
return
}
resCh <- ret
group.Done()
}
// 如果没有找到相应的string则retMap里面没有该string
func (l *Driver) getSingleFileData(fileCh chan string, blockData map[string]struct{}, resCh chan map[string]string, group *sync.WaitGroup) {
for path := range fileCh {
env, err := l.newEnv(path)
if err != nil {
group.Done()
return
}
retMap := make(map[string]string)
err = env.View(func(txn *lmdb.Txn) (err error) {
dbi, err := txn.OpenRoot(0)
if err != nil {
return err
}
for k := range blockData {
v, err := txn.Get(dbi, util.Convert2Bytes(k))
if !lmdb.IsNotFound(err) {
retMap[k] = string(v)
}
}
return nil
})
if err != nil {
log.Println("getAFileData出现错误: ", err)
group.Done()
return
}
err = env.Close()
if err != nil {
log.Println("关闭env出现错误: ", err)
return
}
resCh <- retMap
}
group.Done()
}