1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
use std::future::Future;
use std::marker::PhantomData;
use crate::cursor::{CursorError, CursorResult, CursorSuccess};
/// Prevent users from implementing private trait.
mod private {
use crate::cursor::{Cursor, KeyValueCursor};
use crate::raw_record::RawRecordCursor;
use super::{CursorFilter, CursorMap};
pub trait Sealed {}
impl<T, C, F> Sealed for CursorMap<T, C, F>
where
T: Send,
C: Cursor<T>,
{
}
impl<T, C, F> Sealed for CursorFilter<T, C, F>
where
T: Send,
C: Cursor<T>,
{
}
impl Sealed for KeyValueCursor {}
impl Sealed for RawRecordCursor {}
}
/// An asynchronous iterator that supports [`Continuation`].
///
/// A continuation is an opaque token that represents the position of
/// the cursor. A continuation can be at the *begin marker* which is
/// the position before any items are returned. It can be somewhere in
/// the middle or it can be at the *end marker* position. End marker
/// would be the position after all the items are returned.
///
/// When a [`Cursor::next`] stops producing values and assuming there
/// was no [`FdbError`], then the reason for not producing the values
/// is reported using [`NoNextReason`]. This is returned as part of
/// [`CursorResult`].
///
/// No next reasons are fundamentally distinguished between those that
/// are due to the data itself (in-band) and those that are due to the
/// environment / context (out-of-band). For example, running out of
/// data or having returned the maximum number of values requested are
/// in-band, while reaching a limit on the number of key-value pairs
/// scanned by the transaction or the time that a transaction has been
/// open are out-of-band.
///
/// [`Continuation`]: crate::cursor::Continuation
/// [`FdbError`]: fdb::error::FdbError
/// [`NoNextReason`]: crate::cursor::NoNextReason
//
// Unlike Java RecordLayer, we do not have monadic abstractions (i.e.,
// methods such as `flat_map`, `flatten` etc.,). This is because in
// the `next` method, `CursorResult` returns a continuation. When
// cursors are composed, we have to reason about how the continuations
// will get composed, and if the composition of the continuations is
// correct. A related issue is that when we need throughput, we need
// to use pipelining, which can interact with continuations and
// parallel cursors in subtle ways. Once we have a good handle on
// these issues, we can explore how to add methods for cursor
// composition.
pub trait Cursor<T>: private::Sealed
where
T: Send,
{
/// Asynchronously return the next result from this cursor.
fn next(&mut self) -> impl Future<Output = CursorResult<T>> + Send;
/// Drain the cursor pushing all emitted values into a collection.
fn collect(mut self) -> impl Future<Output = (Vec<T>, CursorError)> + Send
where
Self: Sized + Send,
{
async move {
let mut v = Vec::new();
let iter = &mut self;
loop {
match iter.next().await {
Ok(t) => v.push(t.into_value()),
Err(err) => return (v, err),
}
}
}
}
/// Filters the values produced by this cursor according to the
/// provided predicate.
///
/// # Note
///
/// Unlike a general iterator, the computation that you can do
/// inside the async closure is limited. Specifically the closure
/// `FnMut(&T) -> impl Future<Output = bool>` does not provide a
/// mechanism to return an error that might possibly occur within
/// the closure.
///
/// If you need this feature, you will need to build the loop
/// yourself and handle the issue of how and which continuation to
/// return that would be useful to the caller.
///
/// Also, since [`Cursor`] is a sealed class, this method is
/// primarily meant for types defined in this crate.
//
// *Note:* The reason for not providing the capability for
// returning an error from within the closure is because
// it is not clear if the API user would want the
// continuation at the point of the error or the
// continuation *before* the error occurred.
//
// Providing the latter, would require maintaining
// additional state when creating a value of
// `CursorFilter` type. We will also need a generic way of
// extracting a continuation from a `Cursor`.
//
// This means we will need to introduce another API such
// as `fn get_continuation(&mut self) ->
// CursorResultContinuation` on the `Cursor` trait. Till
// the semantics of the `Cursor` is properly understood we
// want to keep the `Cursor` API as minimal as possible.
//
// We can always roll the feature we need in the `next`
// method implementation and in the builder type for the
// `Cursor`. It won't be generic, but will get the job
// done.
fn filter<F, Fut>(self, f: F) -> impl Future<Output = CursorFilter<T, Self, F>> + Send
where
Self: Sized + Send,
F: FnMut(&T) -> Fut + Send,
Fut: Future<Output = bool>,
{
async {
CursorFilter {
cursor: self,
f,
phantom: PhantomData,
}
}
}
/// Map this cursor's items to a different type, returning a new
/// cursor of the resulting type.
///
/// # Note
///
/// Unlike a general iterator, the computation that you can do
/// inside the async closure is limited. Specifically the closure
/// `FnMut(T) -> impl Future<Output = U>` does not provide a
/// mechanism to return an error that might possibly occur within
/// the closure.
///
/// If you need this feature, you will need to build the loop
/// yourself and handle the issue of how and which continuation to
/// return that would be useful to the caller.
///
/// Also, since [`Cursor`] is a sealed class, this method is
/// primarily meant for types defined in this crate.
//
// *Note:* See the comment mentioned in `filter`.
fn map<U, F, Fut>(self, f: F) -> impl Future<Output = CursorMap<T, Self, F>> + Send
where
U: Send,
Self: Sized + Send,
F: FnMut(T) -> Fut + Send,
Fut: Future<Output = U>,
{
async {
CursorMap {
cursor: self,
f,
phantom: PhantomData,
}
}
}
}
/// Cursor returned by [`Cursor::filter`] method.
#[derive(Debug)]
pub struct CursorFilter<T, C, F>
where
T: Send,
C: Cursor<T>,
{
cursor: C,
f: F,
phantom: PhantomData<T>,
}
impl<T, C, F, Fut> Cursor<T> for CursorFilter<T, C, F>
where
T: Send,
C: Cursor<T> + Send,
F: FnMut(&T) -> Fut + Send,
Fut: Future<Output = bool> + Send,
{
async fn next(&mut self) -> CursorResult<T> {
loop {
let item = self.cursor.next().await;
match item {
Ok(cursor_success) => {
let (value, continuation) = cursor_success.into_parts();
if ((self.f)(&value)).await {
return Ok(CursorSuccess::new(value, continuation));
}
}
Err(e) => return Err(e),
}
}
}
}
/// Cursor returned by [`Cursor::map`] method.
#[derive(Debug)]
pub struct CursorMap<T, C, F>
where
T: Send,
C: Cursor<T>,
{
cursor: C,
f: F,
phantom: PhantomData<T>,
}
impl<U, T, C, F, Fut> Cursor<U> for CursorMap<T, C, F>
where
U: Send,
T: Send,
C: Cursor<T> + Send,
F: FnMut(T) -> Fut + Send,
Fut: Future<Output = U> + Send,
{
async fn next(&mut self) -> CursorResult<U> {
let item = self.cursor.next().await;
match item {
Ok(cursor_success) => Ok({
let (value, continuation) = cursor_success.into_parts();
CursorSuccess::new(((self.f)(value)).await, continuation)
}),
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
// No tests here as we are just defining traits.
}