Skip to content

Commit

Permalink
Wait for previous message to be drained before sending next
Browse files Browse the repository at this point in the history
  • Loading branch information
Abestanis committed Sep 3, 2024
1 parent cbd4379 commit 20c6ccd
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions ports/tcpport.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TcpPort extends EventEmitter {
/** @type {net.Socket?} - Optional custom socket */
this._externalSocket = null;

if(typeof ip === "object") {
if (typeof ip === "object") {
options = ip;
ip = undefined;
}
Expand All @@ -62,7 +62,7 @@ class TcpPort extends EventEmitter {
}

/** @type {net.TcpSocketConnectOpts} - Options for net.connect(). */
this.connectOptions = {
this.connectOptions = {
// Default options
...{
host: ip || options.ip,
Expand All @@ -72,8 +72,8 @@ class TcpPort extends EventEmitter {
...options
};

if(options.socket) {
if(options.socket instanceof net.Socket) {
if (options.socket) {
if (options.socket instanceof net.Socket) {
this._externalSocket = options.socket;
this.openFlag = this._externalSocket.readyState === "opening" || this._externalSocket.readyState === "open";
} else {
Expand All @@ -92,6 +92,7 @@ class TcpPort extends EventEmitter {

// init a socket
this._client = this._externalSocket || new net.Socket(this.socketOpts);
this._writeCompleted = Promise.resolve();

if (options.timeout) this._client.setTimeout(options.timeout);

Expand Down Expand Up @@ -174,10 +175,10 @@ class TcpPort extends EventEmitter {
* @param {(err?: Error) => void} callback
*/
open(callback) {
if(this._externalSocket === null) {
if (this._externalSocket === null) {
this.callback = callback;
this._client.connect(this.connectOptions);
} else if(this.openFlag) {
} else if (this.openFlag) {
modbusSerialDebug("TCP port: external socket is opened");
callback(); // go ahead to setup existing socket
} else {
Expand Down Expand Up @@ -214,7 +215,7 @@ class TcpPort extends EventEmitter {
* @param {Buffer} data
*/
write(data) {
if(data.length < MIN_DATA_LENGTH) {
if (data.length < MIN_DATA_LENGTH) {
modbusSerialDebug("expected length of data is to small - minimum is " + MIN_DATA_LENGTH);
return;
}
Expand All @@ -240,7 +241,15 @@ class TcpPort extends EventEmitter {
});

// send buffer to slave
this._client.write(buffer);
this._writeCompleted = new Promise((resolve, _) => {
this._writeCompleted.finally(() => {
if (this._client.write(buffer)) {
resolve();
} else {
this._client.once("drain", resolve);
}
});
});

// set next transaction id
this._transactionIdWrite = (this._transactionIdWrite + 1) % MAX_TRANSACTIONS;
Expand Down

0 comments on commit 20c6ccd

Please sign in to comment.