-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/add pg source #159
base: pre-release/v0.3.0
Are you sure you want to change the base?
Conversation
|
||
session = self._service.get_service_session() | ||
try: | ||
logs = session.query(Logs).filter(query_filter).order_by(*Logs.__query_order__).all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worry about query performance here.
Also do you need order since you are getting all the rows? Or maybe do in memory ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add partition key "timestamp" to the query to enhance its performance.
The order statement would only sort the results set, and I use it for uniform processing.
If the order statement has a significant negative impact on execution, I will do in memory ordering.
session = self._service.get_service_session() | ||
try: | ||
transactions = ( | ||
session.query(Transactions).filter(query_filter).order_by(*Transactions.__query_order__).all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here. Need to test performance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check whether enough index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index has been checked.
I will add partition key "timestamp" to the query to enhance its performance.
if self._service is None: | ||
raise FastShutdownError("-pg or --postgres-url is required to run PGSourceJob") | ||
|
||
self.build_dependency = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this build_dependency do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is legacy code in development and will be removed in the next commit.
cli/stream.py
Outdated
output_types = list(set(parse_output_types)) | ||
if source_types is None and source_path.startwith("postgresql://"): | ||
source_types = "block,transaction,log" | ||
if source_types: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this extra step or we should enforce user to list all the source types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this step, I set a default value to “--source-types”
@@ -375,6 +400,9 @@ def stream( | |||
batch_web3_provider=ThreadLocalProxy(lambda: get_provider_from_uri(provider_uri, batch=False)), | |||
job_scheduler=job_scheduler, | |||
sync_recorder=create_recorder(sync_recorder, config), | |||
limit_reader=create_limit_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if user put other value in source_path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In unexpected cases, limit_reader will request the latest block from rpc.
if source_path specify a csv_source, parameter checking would ensure that the rpc_limit_reader does not interfere with the program
No description provided.