-
Notifications
You must be signed in to change notification settings - Fork 0
/
collect.py
68 lines (51 loc) · 1.76 KB
/
collect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import argparse
import json
import logging
import sys
import queue
import threading
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from signal import signal, SIGPIPE, SIG_DFL
import certstream
parser = argparse.ArgumentParser(description='Connect to the CertStream and process CTL list updates.')
parser.add_argument('--verbose', action='store_true', default=False, dest='verbose', help='Display debug logging.')
parser.add_argument('--url', default="wss://certstream.calidog.io", dest='url', help='Connect to a certstream server.')
def uploader(queue):
es = Elasticsearch()
id = 0
while True:
actions = []
for i in range(100):
actions.append(
{
"_index": "certstream-test",
"_type": "_doc",
"_id": id,
"_source": {
"message": queue.get(),
"timestamp": datetime.utcnow()}
}
)
id+=1
print(helpers.bulk(es, actions))
def main():
args = parser.parse_args()
# Ignore broken pipes
signal(SIGPIPE, SIG_DFL)
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(format='[%(levelname)s:%(name)s] %(asctime)s - %(message)s', level=log_level)
q = queue.Queue()
t = threading.Thread(target=uploader,args=(q,))
t.start()
def _handle_messages(message, context):
q.put(message, block=False);
#sys.stdout.flush()
#sys.stdout.write(json.dumps(message) + "\n")
#sys.stdout.flush()
certstream.listen_for_events(_handle_messages, args.url, skip_heartbeats=True)
if __name__ == "__main__":
main()