-
Notifications
You must be signed in to change notification settings - Fork 0
/
mpmc_lf_queue.c
153 lines (122 loc) · 2.93 KB
/
mpmc_lf_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
* Implementation of Multiple Producer Multiple Consumer Lock Free queue
* Reference: http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf
*
* Author: Rytis Karpuška
* 2015
*
*/
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include "mpmc_lf_queue.h"
#define CAS(ptr, expected, desired) __atomic_compare_exchange(ptr, \
expected, \
desired, \
0, \
__ATOMIC_SEQ_CST, \
__ATOMIC_SEQ_CST)
struct mpmcq *MPMCQ_create(void)
{
//Allocate structures
struct mpmcq_elem *node = malloc(sizeof(*node));
if(node == NULL)
return NULL;
struct mpmcq *q = malloc(sizeof(*q));
if(q == NULL){
free(node);
return NULL;
}
//Initiate fields
node->data = NULL;
node->next.blk = 0;
//Insert one dummy element
q->tail.ptr.ptr = node;
q->tail.ptr.cnt = 0;
q->head.ptr.ptr = node;
q->head.ptr.cnt = 0;
q->elem_cnt = 0;
return q;
}
void MPMCQ_destroy(struct mpmcq *q)
{
//Free all elements from queue
while(MPMCQ_dequeue(q) != NULL);
//Free dummy element
free(q->head.ptr.ptr);
//Free queue struct
free(q);
return;
}
int MPMCQ_enqueue(struct mpmcq *q, void *elem)
{
union ptr_with_tag tmp;
//Check if elem is valid
if(elem == NULL)
return -EINVAL;
//Allocate node
struct mpmcq_elem *node = malloc(sizeof(*node));
if(node == NULL)
return -ENOMEM;
//Initiate elements
node->data = elem;
node->next.blk = 0;
//Try to insert nodes until it has happened
union ptr_with_tag tail, next;
while(1){
tail = q->tail;
next = tail.ptr.ptr->next;
//If other thread has inserted, try again
if(tail.blk != q->tail.blk)
continue;
//try to insert new node
if(next.ptr.ptr == NULL){
tmp.ptr.ptr = node;
tmp.ptr.cnt = next.ptr.cnt + 1;
if(CAS(&tail.ptr.ptr->next.blk, &next.blk, &tmp.blk))
break;
} else {
tmp.ptr.ptr = next.ptr.ptr;
tmp.ptr.cnt = tail.ptr.cnt + 1;
CAS(&q->tail.blk, &tail.blk, &tmp.blk);
}
}
//Update tail pointer
tmp.ptr.ptr = node;
tmp.ptr.cnt = tail.ptr.cnt + 1;
CAS(&q->tail.blk, &tail.blk, &tmp.blk);
__atomic_add_fetch(&q->elem_cnt, 1, __ATOMIC_SEQ_CST);
return 0;
}
void *MPMCQ_dequeue(struct mpmcq *q)
{
void *data = NULL;
union ptr_with_tag head, tail, next, tmp;
while(1){
head = q->head;
tail = q->tail;
next = head.ptr.ptr->next;
//If someone else has removed - try again
if(head.blk != q->head.blk)
continue;
//Check for empty and new additions
//if ok remove element and return its data
if(head.ptr.ptr == tail.ptr.ptr){
if(next.ptr.ptr == NULL)
return NULL;
tmp.ptr.ptr = next.ptr.ptr;
tmp.ptr.cnt = tail.ptr.cnt + 1;
CAS(&q->tail.blk, &tail.blk, &tmp.blk);
} else {
data = next.ptr.ptr->data;
tmp.ptr.ptr = next.ptr.ptr;
tmp.ptr.cnt = head.ptr.cnt + 1;
if(CAS(&q->head.blk, &head.blk, &tmp.blk))
break;
}
}
__atomic_sub_fetch(&q->elem_cnt, 1, __ATOMIC_SEQ_CST);
free(head.ptr.ptr);
return data;
}