MOTION  0.01
Framework for mixed-protocol multi-party computation
synchronized_queue.h
Go to the documentation of this file.
1 // MIT License
2 //
3 // Copyright (c) 2020 Lennart Braun
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 
23 #pragma once
24 
25 #include <boost/fiber/condition_variable.hpp>
26 #include <boost/fiber/mutex.hpp>
27 
28 // Undefine Windows macros that collide with function names in MOTION.
29 #ifdef SendMessage
30 #undef SendMessage
31 #endif
32 
33 #ifdef GetMessage
34 #undef GetMessage
35 #endif
36 
37 #include <future>
38 #include <iostream>
39 #include <mutex>
40 #include <optional>
41 #include <queue>
42 
43 namespace encrypto::motion {
44 
56 template <typename T, typename MutexType, typename ConditionVariableType>
58  public:
59  BasicSynchronizedQueue() = default;
61 
65  bool empty() const noexcept {
66  std::scoped_lock lock(mutex_);
67  return queue_.empty();
68  }
69 
73  bool IsClosed() const noexcept {
74  std::scoped_lock lock(mutex_);
75  return closed_;
76  }
77 
81  bool IsClosedAndEmpty() const noexcept {
82  std::scoped_lock lock(mutex_);
83  return closed_ && queue_.empty();
84  }
85 
89  void close() noexcept {
90  {
91  std::scoped_lock lock(mutex_);
92  closed_ = true;
93  }
94  condition_variable_.notify_all();
95  }
96 
100  void enqueue(const T& item) {
101  if (closed_) {
102  throw std::logic_error("Tried to enqueue in closed BasicSynchronizedQueue");
103  }
104  {
105  std::scoped_lock lock(mutex_);
106  queue_.push(item);
107  }
108  condition_variable_.notify_one();
109  }
110 
111  void enqueue(T&& item) {
112  if (closed_) {
113  throw std::logic_error("Tried to enqueue in closed BasicSynchronizedQueue");
114  }
115  {
116  std::scoped_lock lock(mutex_);
117  queue_.push(std::move(item));
118  }
119  condition_variable_.notify_one();
120  }
121 
125  std::optional<T> dequeue() noexcept {
126  std::unique_lock lock(mutex_);
127  if (queue_.empty() && closed_) {
128  return std::nullopt;
129  }
130  if (queue_.empty() && !closed_) {
131  condition_variable_.wait(lock, [this] { return !this->queue_.empty() || this->closed_; });
132  }
133  if (queue_.empty()) {
134  assert(closed_);
135  return std::nullopt;
136  }
137  auto item = std::move(queue_.front());
138  queue_.pop();
139  lock.unlock();
140  return std::optional<T>(std::move(item));
141  }
142 
146  std::optional<std::queue<T>> BatchDequeue() noexcept {
147  std::queue<T> output;
148  std::unique_lock lock(mutex_);
149  if (queue_.empty() && closed_) {
150  return std::nullopt;
151  }
152  if (queue_.empty() && !closed_) {
153  condition_variable_.wait(lock, [this] { return !this->queue_.empty() || this->closed_; });
154  }
155  if (queue_.empty()) {
156  return std::nullopt;
157  }
158  std::swap(queue_, output);
159  lock.unlock();
160  return std::optional<std::queue<T>>(std::move(output));
161  }
162 
163  private:
164  bool closed_ = false;
165  std::queue<T> queue_;
166  mutable MutexType mutex_;
167  ConditionVariableType condition_variable_;
168 };
169 
170 template <typename T>
172 
173 template <typename T>
176 
177 } // namespace encrypto::motion
encrypto::motion::BasicSynchronizedQueue::BatchDequeue
std::optional< std::queue< T > > BatchDequeue() noexcept
Definition: synchronized_queue.h:146
encrypto::motion::BasicSynchronizedQueue::enqueue
void enqueue(T &&item)
Definition: synchronized_queue.h:111
encrypto::motion::BasicSynchronizedQueue::IsClosedAndEmpty
bool IsClosedAndEmpty() const noexcept
Definition: synchronized_queue.h:81
encrypto::motion::BasicSynchronizedQueue::dequeue
std::optional< T > dequeue() noexcept
Definition: synchronized_queue.h:125
encrypto::motion
Definition: algorithm_description.cpp:35
encrypto::motion::BasicSynchronizedQueue::IsClosed
bool IsClosed() const noexcept
Definition: synchronized_queue.h:73
encrypto::motion::BasicSynchronizedQueue
Definition: synchronized_queue.h:57
encrypto::motion::swap
void swap(ReusablePromise< R, MutexType, ConditionVariableType > &lhs, ReusablePromise< R, MutexType, ConditionVariableType > &rhs) noexcept
Definition: reusable_future.h:270
encrypto::motion::BasicSynchronizedQueue::empty
bool empty() const noexcept
Definition: synchronized_queue.h:65
encrypto::motion::BasicSynchronizedQueue::enqueue
void enqueue(const T &item)
Definition: synchronized_queue.h:100
encrypto::motion::BasicSynchronizedQueue::BasicSynchronizedQueue
BasicSynchronizedQueue()=default
encrypto::motion::BasicSynchronizedQueue::close
void close() noexcept
Definition: synchronized_queue.h:89