forked from iron-io/iron_mq_python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
iron_mq.py
169 lines (118 loc) · 4.39 KB
/
iron_mq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import iron_core
import urllib
try:
import json
except:
import simplejson as json
class Queue:
client = None
name = None
def __init__(self, mq, name):
"""Creates object for manipulating a queue.
Arguments:
mq -- An instance of IronMQ.
name -- The name of the queue.
"""
self.client = mq.client
self.name = name
def info(self):
"""Execute an HTTP request to get details on a queue, and
return it.
"""
url = "queues/%s" % (self.name,)
result = self.client.get(url)
return result["body"]
def size(self):
"""Queue size"""
return self.info()['size']
def id(self):
"""Queue ID"""
return self.info()['id']
def total_messages(self):
"""Queue total messages count"""
return self.info()['total_messages']
def clear(self):
"""Executes an HTTP request to clear all contents of a queue.
"""
url = "queues/%s/clear" % (self.name,)
result = self.client.post(url)
return result['body']
def delete(self, message_id):
"""Execute an HTTP request to delete a message from queue.
Arguments:
message_id -- The ID of the message to be deleted.
"""
url = "queues/%s/messages/%s" % (self.name, message_id)
result = self.client.delete(url)
return result["body"]
def post(self, *messages):
"""Executes an HTTP request to create message on the queue.
Creates queue if not existed.
Arguments:
messages -- An array of messages to be added to the queue.
"""
url = "queues/%s/messages" % (self.name,)
msgs = [{'body':msg} if isinstance(msg, basestring) else msg
for msg in messages]
data = json.dumps({"messages": msgs})
result = self.client.post(url=url, body=data,
headers={"Content-Type":"application/json"})
return result['body']
def get(self, max=None):
"""Executes an HTTP request to get a message off of a queue.
Keyword arguments:
max -- The maximum number of messages to pull. Defaults to 1.
"""
n = ""
if max is not None:
n = "&n=%s" % max
url = "queues/%s/messages?%s" % (self.name, n)
result = self.client.get(url)
return result['body']
class IronMQ:
NAME = "iron_mq_python"
VERSION = "0.3"
client = None
name = None
def __init__(self, name=None, **kwargs):
"""Prepare a configured instance of the API wrapper and return it.
Keyword arguments are passed directly to iron_core_python; consult its
documentation for a full list and possible values."""
if name is not None:
self.name = name
self.client = iron_core.IronClient(name=IronMQ.NAME,
version=IronMQ.VERSION, product="iron_mq", **kwargs)
def queues(self, page=None):
"""Execute an HTTP request to get a list of queues and return it.
Keyword arguments:
page -- The 0-based page to get queues from. Defaults to None, which
omits the parameter.
"""
options = {}
if page is not None:
options['page'] = page
query = urllib.urlencode(options)
url = "queues"
if query != "":
url = "%s?%s" % (url, query)
result = self.client.get(url)
return [queue["name"] for queue in result["body"]]
def queue(self, queue_name):
"""Returns Queue object.
Arguments:
queue_name -- The name of the queue.
"""
return Queue(self, queue_name)
# DEPRECATED
def getQueues(self, page=None, project_id=None):
return self.queues(page=page)
def getQueueDetails(self, queue_name, project_id=None):
return self.queue(queue_name).info()
def deleteMessage(self, queue_name, message_id, project_id=None):
return self.queue(queue_name).delete(message_id)
def postMessage(self, queue_name, messages=[], project_id=None):
return self.queue(queue_name).post(*messages)
def getMessage(self, queue_name, max=None, project_id=None):
return self.queue(queue_name).get(max=max)
def clearQueue(self, queue_name, project_id=None):
return self.queue(queue_name).clear()