-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_topics.py
45 lines (34 loc) · 1.43 KB
/
create_topics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from confluent_kafka.admin import AdminClient, NewTopic
from config import *
def create_topic():
a = AdminClient({'bootstrap.servers': BOOTSTRAP_SERVERS})
config = {
'num_partitions': 1,
'replication_factor': 1
}
new_topics = [NewTopic(topic, num_partitions=config['num_partitions'],
replication_factor=config['replication_factor']) for topic in ["Frames", "Bboxes", "Logs"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = a.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
def list_topics():
# Configure Kafka Admin Client
bootstrap_servers = BOOTSTRAP_SERVERS # Example bootstrap servers
admin_client = AdminClient({"bootstrap.servers": bootstrap_servers})
# List Topics
topics_metadata = admin_client.list_topics().topics
topics_list = [topic for topic in topics_metadata.keys()]
print("List of topics:")
for topic in topics_list:
print(topic)
if __name__ == "__main__":
create_topic()
list_topics()