Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement goroutine_pool package and integrate #1766

Merged
merged 10 commits into from
Jul 10, 2024

Conversation

Intizar-T
Copy link
Contributor

Description

  • implement goroutine_pool package
  • integrate it to Accumulator and raft's becomeLeader function

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

Checklist before requesting a review

  • I have performed a self-review of my code.
  • If it is a core feature, I have added thorough tests.

Deployment

  • Should publish npm package
  • Should publish Docker image

@Intizar-T Intizar-T self-assigned this Jul 9, 2024
@Intizar-T Intizar-T requested a review from a team as a code owner July 9, 2024 06:55
Copy link
Contributor

coderabbitai bot commented Jul 9, 2024

Warning

Rate limit exceeded

@Intizar-T has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 18 seconds before requesting another review.

How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

Commits

Files that changed from the base of the PR and between bbf2362 and 895ba69.

Walkthrough

Walkthrough

The update introduces a worker pool pattern to the codebase, optimizing parallel job execution across various modules. A new Pool struct in the utilities package handles worker management. Functions in aggregator, fetcher, raft, and reporter modules are modified to utilize this worker pool for improved concurrency and resource management.

Changes

File/Path Change Summary
.../aggregator/aggregator.go NewAggregator function now includes WORKER_COUNT parameter, passing it to raft.NewRaftNode.
.../fetcher/accumulator.go Removed ACCUMULATOR_WORKER_COUNT constant, restructured to use a goroutine pool for job scheduling with the utility package.
.../raft/raft.go, .../types.go Added import for pool package, included a workers parameter in NewRaftNode, and initialized a pool in becomeLeader. Updated Raft struct with a workers field.
.../reporter/reporter.go NewReporter function now accepts WORKER_COUNT and uses it in raft.NewRaftNode.
.../utils/pool/pool.go Introduced Pool struct managing a pool of workers with methods for job execution based on context, including NewPool, Run, worker, and AddJob.
.../utils/tests/pool_test.go Added tests for the Pool utility covering creation, job handling, context cancellation, worker count verification, and concurrent execution.
.../script/test_raft/main.go Introduced WORKER_COUNT constant and updated the NewRaftNode call to include this worker count.

Poem

In code, we weave our tales today,
With worker pools in bright array,
Jobs dance and prance, in sync they flow,
A rabbit's joy in lines we sow.
🐇🌟💻


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between cccac8e and 3f86256.

Files selected for processing (4)
  • node/pkg/fetcher/accumulator.go (2 hunks)
  • node/pkg/raft/raft.go (4 hunks)
  • node/pkg/utils/goroutine-pool/pool.go (1 hunks)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
Additional comments not posted (9)
node/pkg/utils/goroutine-pool/pool.go (5)

7-13: LGTM! Type definitions and constants are correct.

The type definitions for PoolJob and Pool, along with the constant POOL_WORKER_COUNT, are well-defined.


15-19: LGTM! Pool initialization is correct.

The NewPool function correctly initializes a new Pool with a job channel.


21-25: LGTM! Worker goroutines are started correctly.

The Run function correctly starts worker goroutines and handles context cancellation.


27-35: LGTM! Worker function is implemented correctly.

The worker function correctly processes jobs from the job channel and handles context cancellation.


38-40: LGTM! Job addition is implemented correctly.

The AddJob function correctly adds a job to the job channel.

node/pkg/utils/tests/pool_test.go (1)

11-16: LGTM! Pool creation test is correct.

The TestNewPool function correctly tests the creation of a new pool.

node/pkg/fetcher/accumulator.go (2)

28-33: LGTM! Ticker and context handling are correct.

The ticker and context handling in the Run method are correctly implemented.


34-36: LGTM! Job addition to the pool is correct.

The accumulator job is correctly added to the pool.

node/pkg/raft/raft.go (1)

Line range hint 323-341:
LGTM! Ticker and context handling are correct.

The ticker and context handling in the becomeLeader method are correctly implemented.

node/pkg/utils/tests/pool_test.go Outdated Show resolved Hide resolved
node/pkg/fetcher/accumulator.go Outdated Show resolved Hide resolved
node/pkg/raft/raft.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@nick-bisonai nick-bisonai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments

node/pkg/utils/goroutine-pool/pool.go Outdated Show resolved Hide resolved
node/pkg/utils/goroutine-pool/pool.go Outdated Show resolved Hide resolved
node/pkg/utils/goroutine-pool/pool.go Outdated Show resolved Hide resolved
node/pkg/utils/tests/pool_test.go Show resolved Hide resolved
@Intizar-T Intizar-T force-pushed the feat/aggregator-goroutine-pooling branch from 3f86256 to 6e7f64c Compare July 9, 2024 09:56
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range and nitpick comments (2)
node/pkg/utils/pool/pool.go (2)

25-33: Consider adding logging for job processing and context cancellation.

Adding logging would help in debugging and monitoring the worker's activity.

-		select {
+		select {
+		case job := <-p.jobChannel:
+			log.Println("Processing job")
+			job()
+		case <-ctx.Done():
+			log.Println("Worker exiting")
+			return

36-43: Consider adding logging for adding jobs and context cancellation.

Adding logging would help in debugging and monitoring job addition.

-		select {
+		select {
+		case p.jobChannel <- job:
+			log.Println("Job added to the pool")
+			return
+		case <-ctx.Done():
+			log.Println("Add job operation cancelled")
+			return
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 3f86256 and 6e7f64c.

Files selected for processing (4)
  • node/pkg/fetcher/accumulator.go (2 hunks)
  • node/pkg/raft/raft.go (4 hunks)
  • node/pkg/utils/pool/pool.go (1 hunks)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
Files skipped from review as they are similar to previous changes (3)
  • node/pkg/fetcher/accumulator.go
  • node/pkg/raft/raft.go
  • node/pkg/utils/tests/pool_test.go
Additional comments not posted (2)
node/pkg/utils/pool/pool.go (2)

7-10: Struct definition looks good.

The Pool struct is well-defined with a job channel and worker count.


19-23: Method looks good.

The Run method correctly spawns the worker goroutines.

node/pkg/utils/pool/pool.go Show resolved Hide resolved
@nick-bisonai
Copy link
Collaborator

can you also include Stop() func in pool package?

and I think it should be integrated with other package's start and stop. to do so, I recommend you to include pool into accumulator and raft structure. rather than initializing pool from run, it'd be better aligned if the pools are initialized from each init funcs for both accumulator and raft

@nick-bisonai
Copy link
Collaborator

nick-bisonai commented Jul 9, 2024

in raft, pool stop should be included in raft.ResignLeader

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6e7f64c and 18e5401.

Files selected for processing (1)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • node/pkg/utils/tests/pool_test.go

@Intizar-T
Copy link
Contributor Author

@nick-bisonai i intentionally didn't implement context cancel in the pool package. Rather than being a standalone feature that can be started and stopped independently, I thought pool should be a sub-feature. Meaning some other feature should be responsible for starting and stopping it. In raft, becomeLeader is responsible for managing the pool state and in fetcher it's Accumulator's job. pool shouldn't be running as a separate feature. In that sense, I think this design makes pool management easier. Pls let me know if I missed anything

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 18e5401 and 24307ca.

Files selected for processing (1)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • node/pkg/utils/tests/pool_test.go

@nick-bisonai
Copy link
Collaborator

nick-bisonai commented Jul 9, 2024

@nick-bisonai i intentionally didn't implement context cancel in the pool package. Rather than being a standalone feature that can be started and stopped independently, I thought pool should be a sub-feature. Meaning some other feature should be responsible for starting and stopping it. In raft, becomeLeader is responsible for managing the pool state and in fetcher it's Accumulator's job. pool shouldn't be running as a separate feature. In that sense, I think this design makes pool management easier. Pls let me know if I missed anything

I understand, but in my perspective, I thought that using the temporarily declared variable inside run function seemed less structured, and less managing. if you want to keep the code please add test codes to check if there aren't wasted resources after leader resign or accumulator stop. There shouldn't be any running pools after those steps. since it's running in separate go routine, I think it should be signaled somehow for cleanup if it is not used anymore

@Intizar-T
Copy link
Contributor Author

e please add test codes to check if

@nick-bisonai could you pls elaborate more on what you meant by temporarily declared variable inside run function? It doesn't seem like you are referring to pool.Run() function, right?

And yes, there is a test that ensures Pool workers exit after context is canceled. I think the current implementation should be safe since pool.Run() requires a ctx to be passed which gets passed to workers. So whoever uses the Pool will be responsible for the cleanup (canceling the context).

@nick-bisonai
Copy link
Collaborator

e please add test codes to check if

@nick-bisonai could you pls elaborate more on what you meant by temporarily declared variable inside run function? It doesn't seem like you are referring to pool.Run() function, right?

And yes, there is a test that ensures Pool workers exit after context is canceled. I think the current implementation should be safe since pool.Run() requires a ctx to be passed which gets passed to workers. So whoever uses the Pool will be responsible for the cleanup (canceling the context).

I was talking about this pattern inside Run functions for temporarily declared variable inside run function

p := pool.NewPool(POOL_WORKER_COUNT)
	p.Run(ctx)

resignLeader doesn't send ctx.cancel to the pool object you have said. but it should be canceled somehow.

for accumulator it might work as you expected since it uses cancel which is defined from init func

@nick-bisonai
Copy link
Collaborator

nick-bisonai commented Jul 9, 2024

if you want to keep the pattern you should add something which cancels pool context from function ResignLeader or this part

case <-r.Resign:
				log.Debug().Msg("resigning as leader")
				r.HeartbeatTicker.Stop()
				r.LeaderJobTicker.Stop()

				return

but it still looks more organized for me to have internal pool property for both accumulator and raft, and externally specifying stop and start rather than making it done internally and automatically 😅

and for the test, I was meant for test from accumulator side and raft side.

node/pkg/raft/raft.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 24307ca and 03ed717.

Files selected for processing (8)
  • node/pkg/aggregator/aggregator.go (2 hunks)
  • node/pkg/fetcher/accumulator.go (2 hunks)
  • node/pkg/raft/raft.go (6 hunks)
  • node/pkg/raft/types.go (1 hunks)
  • node/pkg/reporter/reporter.go (2 hunks)
  • node/pkg/utils/pool/pool.go (1 hunks)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
  • node/script/test_raft/main.go (2 hunks)
Files skipped from review due to trivial changes (1)
  • node/pkg/raft/types.go
Files skipped from review as they are similar to previous changes (4)
  • node/pkg/fetcher/accumulator.go
  • node/pkg/raft/raft.go
  • node/pkg/utils/pool/pool.go
  • node/pkg/utils/tests/pool_test.go
Additional comments not posted (6)
node/script/test_raft/main.go (2)

14-14: Define WORKER_COUNT constant.

The constant WORKER_COUNT is defined correctly and will be used to set the number of workers for the goroutine pool.


49-49: Update NewRaftNode call to include WORKER_COUNT.

The function call is correctly updated to pass WORKER_COUNT, initializing the worker pool with the specified number of workers.

node/pkg/reporter/reporter.go (2)

20-20: Define WORKER_COUNT constant.

The constant WORKER_COUNT is defined correctly and will be used to set the number of workers for the goroutine pool.


51-51: Update NewRaftNode call to include WORKER_COUNT.

The function call is correctly updated to pass WORKER_COUNT, initializing the worker pool with the specified number of workers.

node/pkg/aggregator/aggregator.go (2)

20-20: Define WORKER_COUNT constant.

The constant WORKER_COUNT is defined correctly and will be used to set the number of workers for the goroutine pool.


37-37: Update NewRaftNode call to include WORKER_COUNT.

The function call is correctly updated to pass WORKER_COUNT, initializing the worker pool with the specified number of workers.

@Intizar-T
Copy link
Contributor Author

@nick-bisonai Pool now has it's own context management. It gets cancelled from raft in r.Resign case in becomeLeader and from accumulator when the parent context is cancelled. Pls let me know if I missed anything 🙏

}

func (p *Pool) AddJob(job func()) {
select {
Copy link
Collaborator

@nick-bisonai nick-bisonai Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if !isRunning {
return
}

Copy link
Collaborator

@nick-bisonai nick-bisonai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 03ed717 and e472b54.

Files selected for processing (1)
  • node/pkg/utils/pool/pool.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • node/pkg/utils/pool/pool.go

@Intizar-T Intizar-T force-pushed the feat/aggregator-goroutine-pooling branch from e472b54 to bbf2362 Compare July 10, 2024 05:14
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e472b54 and bbf2362.

Files selected for processing (8)
  • node/pkg/aggregator/aggregator.go (2 hunks)
  • node/pkg/fetcher/accumulator.go (2 hunks)
  • node/pkg/raft/raft.go (6 hunks)
  • node/pkg/raft/types.go (1 hunks)
  • node/pkg/reporter/reporter.go (2 hunks)
  • node/pkg/utils/pool/pool.go (1 hunks)
  • node/pkg/utils/tests/pool_test.go (1 hunks)
  • node/script/test_raft/main.go (2 hunks)
Files skipped from review as they are similar to previous changes (8)
  • node/pkg/aggregator/aggregator.go
  • node/pkg/fetcher/accumulator.go
  • node/pkg/raft/raft.go
  • node/pkg/raft/types.go
  • node/pkg/reporter/reporter.go
  • node/pkg/utils/pool/pool.go
  • node/pkg/utils/tests/pool_test.go
  • node/script/test_raft/main.go

@Intizar-T Intizar-T merged commit 98648d0 into master Jul 10, 2024
1 check passed
@Intizar-T Intizar-T deleted the feat/aggregator-goroutine-pooling branch July 10, 2024 05:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants