diff --git a/jasmin/managers/listeners.py b/jasmin/managers/listeners.py index 2595b6b8..e7e93b20 100644 --- a/jasmin/managers/listeners.py +++ b/jasmin/managers/listeners.py @@ -10,7 +10,7 @@ from twisted.internet import reactor from txamqp.queue import Closed from smpp.pdu.operations import SubmitSM, DeliverSM -from smpp.pdu.pdu_types import CommandStatus, DataCodingScheme, DataCodingGsmMsgClass, EsmClassGsmFeatures +from smpp.pdu.pdu_types import CommandStatus, DataCodingScheme, DataCodingGsmMsgClass, EsmClassGsmFeatures, CommandId from smpp.twisted.protocol import DataHandlerResponse from smpp.pdu.error import SMPPRequestTimoutError @@ -120,6 +120,9 @@ def rejectAndRequeueMessage(self, message, delay=True): @defer.inlineCallbacks def rejectMessage(self, message, requeue=0): yield self.amqpBroker.chan.basic_reject(delivery_tag=message.delivery_tag, requeue=requeue) + if requeue == 0: + dlr = DLR(pdu_type=CommandId.submit_sm_resp, status=CommandStatus.ESME_RSYSERR, msgid=message.content.properties['message-id']) + self.amqpBroker.publish(exchange='messaging', routing_key='dlr.submit_sm_resp', content=dlr) @defer.inlineCallbacks def ackMessage(self, message): diff --git a/jasmin/protocols/smpp/protocol.py b/jasmin/protocols/smpp/protocol.py index 0ab3bd96..c6cb767a 100644 --- a/jasmin/protocols/smpp/protocol.py +++ b/jasmin/protocols/smpp/protocol.py @@ -169,7 +169,7 @@ def cancelLongSubmitSmTransactions(self, err): # Do errback txn.ackDeferred.errback(err) - def startLongSubmitSmTransaction(self, reqPDU, timeout): + def startLongSubmitSmTransaction(self, reqPDU, timeout, maxSeqNum): if reqPDU.LongSubmitSm['msg_ref_num'] in self.longSubmitSmTxns: self.log.error( 'Transaction with msg_ref_num [%s] is already in progress, open longSubmitSmTxns count: %s', @@ -185,7 +185,8 @@ def startLongSubmitSmTransaction(self, reqPDU, timeout): # Save transaction self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']] = { 'txn': SMPPOutboundTxn(reqPDU, timer, ackDeferred), - 'nack_count': reqPDU.LongSubmitSm['total_segments']} + 'nack_count': reqPDU.LongSubmitSm['total_segments'], + 'max_sequence': maxSeqNum} self.log.debug("Long submit_sm transaction started with msg_ref_num %s", reqPDU.LongSubmitSm['msg_ref_num']) return ackDeferred @@ -225,8 +226,12 @@ def endLongSubmitSmTransaction(self, _SMPPOutboundTxnResult): reqPDU.LongSubmitSm['msg_ref_num'], self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['nack_count']) + if respPDU.seqNum == self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['max_sequence']: + self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['end_resp_pdu'] = respPDU + # End the transaction if no more pending ACKs if self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['nack_count'] == 0: + respPDU = self.longSubmitSmTxns[reqPDU.LongSubmitSm['msg_ref_num']]['end_resp_pdu'] txn = self.closeLongSubmitSmTransaction(reqPDU.LongSubmitSm['msg_ref_num']) # Do callback @@ -295,10 +300,12 @@ def doSendRequest(self, pdu, timeout): if splitMethod is not None and hasattr(pdu, 'nextPdu'): partedSmPdu = pdu first = True + maxSeqNum = 0 # Iterate through parted PDUs while True: partedSmPdu.seqNum = self.claimSeqNum() + maxSeqNum = partedSmPdu.seqNum # Set LongSubmitSm tracking flags in pdu: partedSmPdu.LongSubmitSm = {'msg_ref_num': None, 'total_segments': None, @@ -325,7 +332,7 @@ def doSendRequest(self, pdu, timeout): # Start a transaction using the first parted PDU if first: first = False - txn = self.startLongSubmitSmTransaction(partedSmPdu, timeout) + firstPartedSmPdu = partedSmPdu try: # There still another PDU to go for @@ -333,6 +340,8 @@ def doSendRequest(self, pdu, timeout): except AttributeError: break + txn = self.startLongSubmitSmTransaction(firstPartedSmPdu, timeout, maxSeqNum) + return txn else: self.preSubmitSm(pdu)