Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch the agent over to UDP #90

Closed
wants to merge 13 commits into from
Closed

Switch the agent over to UDP #90

wants to merge 13 commits into from

Conversation

geoffxy
Copy link
Member

@geoffxy geoffxy commented Feb 10, 2018

Along with these changes I also switched the interagent serialization format to binary, in an effort to try and reduce the size of messages. The new operations messages are still quite large, unfortunately, because we just serialize it to JSON directly. A 1 character change is usually a ~300 byte message.

I added logic to split and reassemble large new operations messages. I tested by pasting large amounts of text into the editor and it seemed to work fine.

@geoffxy geoffxy self-assigned this Feb 10, 2018
@geoffxy geoffxy added this to the Milestone 3 milestone Feb 10, 2018
@geoffxy
Copy link
Member Author

geoffxy commented Feb 10, 2018

#61

return b"".join(self._buffer)


class Peer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have a models/ folder for Peer and maybe a generalized FragmentUtils class in a utils/ folder that handles the frag/defragmentation of things (as the above doesn't seem specific to NewOperations and could possibly be used later on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do. 👍

import tandem.protocol.interagent.messages as im


class PeerManager:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this up to a PeerStore to handle the storing/getting part and a PeerConnectionManager or something similar to handle the broadcasting/sending of messages + a utility file/class for handling the operation_list to messages (or maybe bundle up the frag/defrag + operation list <-> messages into one OperationUtils?) -- I feel like theres a lot of different functionalities going on in this file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed. I'll create a PeerStore to hold the peers. Instead of PeerConnectionManager I think what would work better would be something like a InteragentMessageProxy under protocol/interagent/: essentially a utility class used to provide a high level interface for sending messages to peers. It can hold the methods used to send messages for different high level reasons (e.g. broadcasting new operations, sending operations to one agent, announcing that you're shutting down, eventually the hole punching messages, etc.)

I think the frag/defrag can be generalized for any binary payload and can go into its own utilities file. But in terms of creating the NewOperations, how we create the messages from the fragmented binary payload depends on the message's serialization format. So since this provides the high level interface for sending the NewOperations messages, I think _operations_list_to_messages() should stay here (or be placed with the messages).

from threading import Thread


class UDPGateway:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely going to want this in our shared folder, to be used by the rendezvous server

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. I'm going to be busy with midterms for the first half of this week, so I think your stuff will likely make it in first. I can rebase on top of your changes once that happens and move this into shared/do any other refactoring needed.

return self._handle_assembled_operations(message.operations_binary)

peer = self._peer_manager.get_peer(sender_address)
operations_binary = peer.integrate_new_operations_message(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have more logic here in the handler, or put more logic in utility files rather than delegating models to do the work. That way we can keep our models simple, and mainly used just for grouping up data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fair. I think it'll make more sense to generalize the fragmentation stuff you mentioned into utilities and then call them here to defrag the operations and store them in the peer.

@jamiboym jamiboym force-pushed the gy-udp branch 3 times, most recently from 8dac814 to 5e02578 Compare February 14, 2018 07:01
@geoffxy geoffxy assigned jamiboym and unassigned geoffxy Feb 14, 2018
@jamiboym jamiboym assigned geoffxy and unassigned geoffxy Feb 14, 2018
Copy link
Member Author

@geoffxy geoffxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid running the frag/defrag in the reader thread, I think we should introduce a middle layer between the gateway and the handler. This can also help reduce the coupling of the UDPGateway and protocol concepts. I think we should keep the UDPGateway as low level as possible and just have it as a utility we use to read/write bytes from the socket.

So something like InteragentMessageProxy could be used to handle this. It would contain the defrag code (i.e. the calls to FragmentUtils) and forward the defragmented messages to the handler. The proxy could also hold the high level methods used to send interagent messages and perform the fragmentation before writing it to UDPGateway.

The reader thread's handler_function will submit to the main executor and have the proxy handle the raw data.

@@ -1,58 +1,60 @@
import logging
import json
import tandem.protocol.editor.messages as em
import tandem.protocol.interagent.messages as im
# import tandem.protocol.interagent.messages as im
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra comment left over

@staticmethod
def fragment(payload, message_length=512):
if type(payload) is str:
payload = payload.encode('utf-8')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use "

return messages

@staticmethod
def defragment(raw_data, sender_address, handler):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just return the defragmented data if it exists instead of coupling this with the handler.

sequence_number,
)

defragmented_data = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then this can be

defragmented_data = fragment_group.defragment()
if fragment_group.is_complete():
    fragment_store.remove_fragment_group(
        sender_address,
        sequence_number,
    )
return defragmented_data

return Bye.from_payload(payload)
@staticvalue
def _payload_keys(self):
return ['operations_binary']
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this back to operations_list since it's no longer a binary string


class FragmentStore(StoreBase):
def __init__(self):
self._peers = {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be named self._peer_fragment_groups for clarity


def get_peers(self, addresses=None):
if addresses is None:
addresses = [address for address in self._peers]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why not just do

return [peer for _, peer in self._peers.items()]

for i in range(0, len(binary_payload), max_payload_length)
]
if type(message) is str:
message = message.encode('utf-8')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use " here


def _read_datagrams(self):
try:
while True:
raw_data, address = self._socket.recvfrom(4096)
host, port = address
logging.debug("Received data from {}:{}.".format(host, port))
self._handler_function(raw_data, address)

if FragmentUtils.is_fragment(raw_data):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be done here because it's running in the reader thread. The FragmentUtils will manipulate the stores, which I think we should only do on the main thread to avoid potential concurrency issues.

Right now I saw that it only accesses FragmentStore, which is only updated by this thread, but that may change in the future. Also the sequence number counter in FragmentUtils is modified by the main thread but this thread also uses FragmentUtils. Though this thread doesn't modify that counter, I think it's best to just avoid this if possible. This could cause really subtle concurrency bugs later on if we aren't careful.


if type(addresses) is not list:
addresses = [addresses]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fragmentation should be handled in one pass here before we loop through the addresses and messages when sending. This avoids the recursive call and also avoids doing the fragmentation more than once for each recipient. We can use the same sequence number since each sequence number is unique for the sending peer.

@geoffxy
Copy link
Member Author

geoffxy commented Feb 14, 2018

Everything got marked as outdated by GitHub... 😞

@geoffxy
Copy link
Member Author

geoffxy commented Feb 14, 2018

So I thought about this a bit more: adding a specific InteragentMessageProxy would work, but I guess it won't generalize as well as what you're trying to achieve with the changes in UDPGateway.

I think we should still add a middle layer, but maybe just name it something that has to do with fragmentation (e.g. maybe FragmentingMessageProxy). It can hold a gateway internally and perform the defrag before passing it on to the handler. It can also contain a method to do the fragmentation before writing to the gateway. That way this can be reused on the server if needed. Then UDPGateway stays low level and is just responsible for binary read/write.

The important thing though is that the defrag code should run on the main thread. So the UDPGateway's handler_function() should be some function in TandemAgent that submits the data to the executor. I think it will be easier to reason about/modify the code later on if all state manipulation happens on one thread.

@jamiboym
Copy link
Member

@geoffxy Sounds good, I'll move the fragmentation code to a separate generalized middle layer

@geoffxy
Copy link
Member Author

geoffxy commented Feb 15, 2018

Thanks @jamiboy16! I'll close this PR so you can open a new one and merge 😃

@geoffxy geoffxy closed this Feb 15, 2018
@jamiboym jamiboym deleted the gy-udp branch February 20, 2018 01:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants