1 /*
2   ZynAddSubFX - a software synthesizer
3 
4   MultiPseudoStack.cpp - Multiple-Writer Lock Free Datastructure
5   Copyright (C) 2016 Mark McCurry
6 
7   This program is free software; you can redistribute it and/or
8   modify it under the terms of the GNU General Public License
9   as published by the Free Software Foundation; either version 2
10   of the License, or (at your option) any later version.
11 */
12 #include "MultiPseudoStack.h"
13 #include <cassert>
14 #include <cstdio>
15 
16 #define INVALID ((int32_t)0xffffffff)
17 #define MAX     ((int32_t)0x7fffffff)
18 
19 namespace zyn {
20 
QueueListItem(void)21 QueueListItem::QueueListItem(void)
22     :memory(0), size(0)
23 {
24 }
25 
LockFreeQueue(qli_t * data_,int n)26 LockFreeQueue::LockFreeQueue(qli_t *data_, int n)
27     :data(data_), elms(n), next_r(0), next_w(0), avail(0)
28 {
29     tag  = new std::atomic<uint32_t>[n];
30     for(int i=0; i<n; ++i)
31         tag[i] = INVALID;
32 }
33 
~LockFreeQueue(void)34 LockFreeQueue::~LockFreeQueue(void)
35 {
36     delete [] tag;
37 }
38 
39 
read(void)40 qli_t *LockFreeQueue::read(void) {
41 retry:
42     int8_t free_elms = avail.load();
43     if(free_elms <= 0)
44         return 0;
45 
46     int32_t next_tag      = next_r.load();
47     int32_t next_next_tag = (next_tag+1)&MAX;
48 
49     assert(next_tag != INVALID);
50 
51     for(int i=0; i<elms; ++i) {
52         uint32_t elm_tag = tag[i].load();
53 
54         //attempt to remove tagged element
55         //if and only if it's next
56         if(((uint32_t)next_tag) == elm_tag) {
57             if(!tag[i].compare_exchange_strong(elm_tag, INVALID))
58                 goto retry;
59 
60             //Ok, now there is no element that can be removed from the list
61             //Effectively there's mutual exclusion over other readers here
62 
63             //Set the next element
64             int sane_read = next_r.compare_exchange_strong(next_tag, next_next_tag);
65             assert(sane_read && "No double read on a single tag");
66 
67             //Decrement available elements
68             int32_t free_elms_next = avail.load();
69             while(!avail.compare_exchange_strong(free_elms_next, free_elms_next-1));
70 
71             //printf("r%d ", free_elms_next-1);
72 
73             return &data[i];
74         }
75     }
76     goto retry;
77 }
78 
79 //Insert Node Q
write(qli_t * Q)80 void LockFreeQueue::write(qli_t *Q) {
81 retry:
82     if(!Q)
83         return;
84     int32_t write_tag = next_w.load();
85     int32_t next_write_tag = (write_tag+1)&MAX;
86     if(!next_w.compare_exchange_strong(write_tag, next_write_tag))
87         goto retry;
88 
89     uint32_t invalid_tag = INVALID;
90 
91     //Update tag
92     int sane_write = tag[Q-data].compare_exchange_strong(invalid_tag, write_tag);
93     assert(sane_write);
94 
95     //Increment available elements
96     int32_t free_elms = avail.load();
97     while(!avail.compare_exchange_strong(free_elms, free_elms+1))
98         assert(free_elms <= 32);
99     //printf("w%d ", free_elms+1);
100 }
101 
MultiQueue(void)102 MultiQueue::MultiQueue(void)
103     :pool(new qli_t[32]), m_free(pool, 32), m_msgs(pool, 32)
104 {
105     //32 instances of 2kBi memory chunks
106     for(int i=0; i<32; ++i) {
107         qli_t &ptr  = pool[i];
108         ptr.size   = 2048;
109         ptr.memory = new char[2048];
110         free(&ptr);
111     }
112 }
113 
~MultiQueue(void)114 MultiQueue::~MultiQueue(void)
115 {
116     for(int i=0; i<32; ++i)
117         delete [] pool[i].memory;
118     delete [] pool;
119 }
120 
121 }
122