libpqxx 7.8.1
pipeline.hxx
1/* Definition of the pqxx::pipeline class.
2 *
3 * Throughput-optimized mechanism for executing queries.
4 *
5 * DO NOT INCLUDE THIS FILE DIRECTLY; include pqxx/pipeline instead.
6 *
7 * Copyright (c) 2000-2023, Jeroen T. Vermeulen.
8 *
9 * See COPYING for copyright license. If you did not receive a file called
10 * COPYING with this source code, please notify the distributor of this
11 * mistake, or contact the author.
12 */
13#ifndef PQXX_H_PIPELINE
14#define PQXX_H_PIPELINE
15
16#if !defined(PQXX_HEADER_PRE)
17# error "Include libpqxx headers as <pqxx/header>, not <pqxx/header.hxx>."
18#endif
19
20#include <limits>
21#include <map>
22#include <string>
23
24#include "pqxx/transaction_base.hxx"
25
26
27namespace pqxx
28{
29// TODO: libpq 14 introduced a similar "pipeline mode." Can we use that?
30
32
50class PQXX_LIBEXPORT pipeline : public transaction_focus
51{
52public:
54 using query_id = long;
55
56 pipeline(pipeline const &) = delete;
57 pipeline &operator=(pipeline const &) = delete;
58 pipeline(pipeline &&) = delete;
60
61
63 explicit pipeline(transaction_base &t) : transaction_focus{t, s_classname}
64 {
65 init();
66 }
68 pipeline(transaction_base &t, std::string_view tname) :
69 transaction_focus{t, s_classname, tname}
70 {
71 init();
72 }
73
75 ~pipeline() noexcept;
76
78
84 query_id insert(std::string_view) &;
85
87
93 void complete();
94
96
105 void flush();
106
108
116 void cancel();
117
119 [[nodiscard]] bool is_finished(query_id) const;
120
122
128 result retrieve(query_id qid)
129 {
130 return retrieve(m_queries.find(qid)).second;
131 }
132
134
135 std::pair<query_id, result> retrieve();
136
137 [[nodiscard]] bool empty() const noexcept { return std::empty(m_queries); }
138
141
152 int retain(int retain_max = 2) &;
153
154
156 void resume() &;
157
158private:
159 struct PQXX_PRIVATE Query
160 {
161 explicit Query(std::string_view q) :
162 query{std::make_shared<std::string>(q)}
163 {}
164
165 std::shared_ptr<std::string> query;
166 result res;
167 };
168
169 using QueryMap = std::map<query_id, Query>;
170
171 void init();
172 void attach();
173 void detach();
174
176 static constexpr query_id qid_limit() noexcept
177 {
178 // Parenthesise this to work around an eternal Visual C++ problem:
179 // Without the extra parentheses, unless NOMINMAX is defined, the
180 // preprocessor will mistake this "max" for its annoying built-in macro
181 // of the same name.
182 return (std::numeric_limits<query_id>::max)();
183 }
184
186 PQXX_PRIVATE query_id generate_id();
187
188 bool have_pending() const noexcept
189 {
190 return m_issuedrange.second != m_issuedrange.first;
191 }
192
193 PQXX_PRIVATE void issue();
194
196 void set_error_at(query_id qid) noexcept
197 {
198 PQXX_UNLIKELY
199 if (qid < m_error)
200 m_error = qid;
201 }
202
204 [[noreturn]] PQXX_PRIVATE void internal_error(std::string const &err);
205
206 PQXX_PRIVATE bool obtain_result(bool expect_none = false);
207
208 PQXX_PRIVATE void obtain_dummy();
209 PQXX_PRIVATE void get_further_available_results();
210 PQXX_PRIVATE void check_end_results();
211
213 PQXX_PRIVATE void receive_if_available();
214
216 PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop);
217 std::pair<pipeline::query_id, result> retrieve(pipeline::QueryMap::iterator);
218
219 QueryMap m_queries;
220 std::pair<QueryMap::iterator, QueryMap::iterator> m_issuedrange;
221 int m_retain = 0;
222 int m_num_waiting = 0;
223 query_id m_q_id = 0;
224
226 bool m_dummy_pending = false;
227
229 query_id m_error = qid_limit();
230
232
235 internal::encoding_group m_encoding;
236
237 static constexpr std::string_view s_classname{"pipeline"};
238};
239} // namespace pqxx
240#endif
The home of all libpqxx classes, functions, templates, etc.
Definition array.hxx:33
Processes several queries in FIFO manner, optimized for high throughput.
Definition pipeline.hxx:51
pipeline(transaction_base &t)
Start a pipeline.
Definition pipeline.hxx:63
pipeline & operator=(pipeline &&)=delete
bool empty() const noexcept
Definition pipeline.hxx:137
pipeline & operator=(pipeline const &)=delete
pipeline(transaction_base &t, std::string_view tname)
Start a pipeline. Assign it a name, for more helpful error messages.
Definition pipeline.hxx:68
pipeline(pipeline &&)=delete
pipeline(pipeline const &)=delete
long query_id
Identifying numbers for queries.
Definition pipeline.hxx:54
Result set containing data returned by a query or command.
Definition result.hxx:73
Interface definition (and common code) for "transaction" classes.
Definition transaction_base.hxx:88
Base class for things that monopolise a transaction's attention.
Definition transaction_focus.hxx:29