1 /*
2 ZynAddSubFX - a software synthesizer
3
4 MultiPseudoStack.cpp - Multiple-Writer Lock Free Datastructure
5 Copyright (C) 2016 Mark McCurry
6
7 This program is free software; you can redistribute it and/or
8 modify it under the terms of the GNU General Public License
9 as published by the Free Software Foundation; either version 2
10 of the License, or (at your option) any later version.
11 */
12 #include "MultiPseudoStack.h"
13 #include <cassert>
14 #include <cstdio>
15
16 #define INVALID ((int32_t)0xffffffff)
17 #define MAX ((int32_t)0x7fffffff)
18
19 namespace zyn {
20
QueueListItem(void)21 QueueListItem::QueueListItem(void)
22 :memory(0), size(0)
23 {
24 }
25
LockFreeQueue(qli_t * data_,int n)26 LockFreeQueue::LockFreeQueue(qli_t *data_, int n)
27 :data(data_), elms(n), next_r(0), next_w(0), avail(0)
28 {
29 tag = new std::atomic<uint32_t>[n];
30 for(int i=0; i<n; ++i)
31 tag[i] = INVALID;
32 }
33
~LockFreeQueue(void)34 LockFreeQueue::~LockFreeQueue(void)
35 {
36 delete [] tag;
37 }
38
39
read(void)40 qli_t *LockFreeQueue::read(void) {
41 retry:
42 int8_t free_elms = avail.load();
43 if(free_elms <= 0)
44 return 0;
45
46 int32_t next_tag = next_r.load();
47 int32_t next_next_tag = (next_tag+1)&MAX;
48
49 assert(next_tag != INVALID);
50
51 for(int i=0; i<elms; ++i) {
52 uint32_t elm_tag = tag[i].load();
53
54 //attempt to remove tagged element
55 //if and only if it's next
56 if(((uint32_t)next_tag) == elm_tag) {
57 if(!tag[i].compare_exchange_strong(elm_tag, INVALID))
58 goto retry;
59
60 //Ok, now there is no element that can be removed from the list
61 //Effectively there's mutual exclusion over other readers here
62
63 //Set the next element
64 int sane_read = next_r.compare_exchange_strong(next_tag, next_next_tag);
65 assert(sane_read && "No double read on a single tag");
66
67 //Decrement available elements
68 int32_t free_elms_next = avail.load();
69 while(!avail.compare_exchange_strong(free_elms_next, free_elms_next-1));
70
71 //printf("r%d ", free_elms_next-1);
72
73 return &data[i];
74 }
75 }
76 goto retry;
77 }
78
79 //Insert Node Q
write(qli_t * Q)80 void LockFreeQueue::write(qli_t *Q) {
81 retry:
82 if(!Q)
83 return;
84 int32_t write_tag = next_w.load();
85 int32_t next_write_tag = (write_tag+1)&MAX;
86 if(!next_w.compare_exchange_strong(write_tag, next_write_tag))
87 goto retry;
88
89 uint32_t invalid_tag = INVALID;
90
91 //Update tag
92 int sane_write = tag[Q-data].compare_exchange_strong(invalid_tag, write_tag);
93 assert(sane_write);
94
95 //Increment available elements
96 int32_t free_elms = avail.load();
97 while(!avail.compare_exchange_strong(free_elms, free_elms+1))
98 assert(free_elms <= 32);
99 //printf("w%d ", free_elms+1);
100 }
101
MultiQueue(void)102 MultiQueue::MultiQueue(void)
103 :pool(new qli_t[32]), m_free(pool, 32), m_msgs(pool, 32)
104 {
105 //32 instances of 2kBi memory chunks
106 for(int i=0; i<32; ++i) {
107 qli_t &ptr = pool[i];
108 ptr.size = 2048;
109 ptr.memory = new char[2048];
110 free(&ptr);
111 }
112 }
113
~MultiQueue(void)114 MultiQueue::~MultiQueue(void)
115 {
116 for(int i=0; i<32; ++i)
117 delete [] pool[i].memory;
118 delete [] pool;
119 }
120
121 }
122