Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject message dlr #1197

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion jasmin/managers/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from twisted.internet import reactor
from txamqp.queue import Closed
from smpp.pdu.operations import SubmitSM, DeliverSM
from smpp.pdu.pdu_types import CommandStatus, DataCodingScheme, DataCodingGsmMsgClass, EsmClassGsmFeatures
from smpp.pdu.pdu_types import CommandStatus, DataCodingScheme, DataCodingGsmMsgClass, EsmClassGsmFeatures, CommandId
from smpp.twisted.protocol import DataHandlerResponse
from smpp.pdu.error import SMPPRequestTimoutError

Expand Down Expand Up @@ -120,6 +120,9 @@ def rejectAndRequeueMessage(self, message, delay=True):
@defer.inlineCallbacks
def rejectMessage(self, message, requeue=0):
yield self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=requeue)
if requeue == 0:
dlr = DLR(pdu_type=CommandId.submit_sm_resp, status=CommandStatus.ESME_RSYSERR, msgid=message.content.properties['message-id'])
self.amqpBroker.publish(exchange='messaging', routing_key='dlr.submit_sm_resp', content=dlr)

@defer.inlineCallbacks
def ackMessage(self, message):
Expand Down
15 changes: 12 additions & 3 deletions jasmin/protocols/smpp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def cancelLongSubmitSmTransactions(self, err):
# Do errback
txn.ackDeferred.errback(err)

def startLongSubmitSmTransaction(self, reqPDU, timeout):
def startLongSubmitSmTransaction(self, reqPDU, timeout, maxSeqNum):
if reqPDU.LongSubmitSm['msg_ref_num'] in self.longSubmitSmTxns:
self.log.error(
'Transaction with msg_ref_num [%s] is already in progress, open longSubmitSmTxns count: %s',
Expand All @@ -185,7 +185,8 @@ def startLongSubmitSmTransaction(self, reqPDU, timeout):
# Save transaction
self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']] = {
'txn': SMPPOutboundTxn(reqPDU, timer, ackDeferred),
'nack_count': reqPDU.LongSubmitSm['total_segments']}
'nack_count': reqPDU.LongSubmitSm['total_segments'],
'max_sequence': maxSeqNum}
self.log.debug("Long submit_sm transaction started with msg_ref_num %s",
reqPDU.LongSubmitSm['msg_ref_num'])
return ackDeferred
Expand Down Expand Up @@ -225,8 +226,12 @@ def endLongSubmitSmTransaction(self, _SMPPOutboundTxnResult):
reqPDU.LongSubmitSm['msg_ref_num'],
self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['nack_count'])

if respPDU.seqNum == self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['max_sequence']:
self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['end_resp_pdu'] = respPDU

# End the transaction if no more pending ACKs
if self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['nack_count'] == 0:
respPDU = self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['end_resp_pdu']
txn = self.closeLongSubmitSmTransaction(reqPDU.LongSubmitSm['msg_ref_num'])

# Do callback
Expand Down Expand Up @@ -295,10 +300,12 @@ def doSendRequest(self, pdu, timeout):
if splitMethod is not None and hasattr(pdu, 'nextPdu'):
partedSmPdu = pdu
first = True
maxSeqNum = 0

# Iterate through parted PDUs
while True:
partedSmPdu.seqNum = self.claimSeqNum()
maxSeqNum = partedSmPdu.seqNum

# Set LongSubmitSm tracking flags in pdu:
partedSmPdu.LongSubmitSm = {'msg_ref_num': None, 'total_segments': None,
Expand All @@ -325,14 +332,16 @@ def doSendRequest(self, pdu, timeout):
# Start a transaction using the first parted PDU
if first:
first = False
txn = self.startLongSubmitSmTransaction(partedSmPdu, timeout)
firstPartedSmPdu = partedSmPdu

try:
# There still another PDU to go for
partedSmPdu = partedSmPdu.nextPdu
except AttributeError:
break

txn = self.startLongSubmitSmTransaction(firstPartedSmPdu, timeout, maxSeqNum)

return txn
else:
self.preSubmitSm(pdu)
Expand Down