1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
use std::future::Future;
use std::marker::PhantomData;

use crate::cursor::{CursorError, CursorResult, CursorSuccess};

/// Prevent users from implementing private trait.
mod private {
    use crate::cursor::{Cursor, KeyValueCursor};
    use crate::raw_record::RawRecordCursor;

    use super::{CursorFilter, CursorMap};

    pub trait Sealed {}

    impl<T, C, F> Sealed for CursorMap<T, C, F>
    where
        T: Send,
        C: Cursor<T>,
    {
    }

    impl<T, C, F> Sealed for CursorFilter<T, C, F>
    where
        T: Send,
        C: Cursor<T>,
    {
    }

    impl Sealed for KeyValueCursor {}

    impl Sealed for RawRecordCursor {}
}

/// An asynchronous iterator that supports [`Continuation`].
///
/// A continuation is an opaque token that represents the position of
/// the cursor. A continuation can be at the *begin marker* which is
/// the position before any items are returned. It can be somewhere in
/// the middle or it can be at the *end marker* position. End marker
/// would be the position after all the items are returned.
///
/// When a [`Cursor::next`] stops producing values and assuming there
/// was no [`FdbError`], then the reason for not producing the values
/// is reported using [`NoNextReason`]. This is returned as part of
/// [`CursorResult`].
///
/// No next reasons are fundamentally distinguished between those that
/// are due to the data itself (in-band) and those that are due to the
/// environment / context (out-of-band). For example, running out of
/// data or having returned the maximum number of values requested are
/// in-band, while reaching a limit on the number of key-value pairs
/// scanned by the transaction or the time that a transaction has been
/// open are out-of-band.
///
/// [`Continuation`]: crate::cursor::Continuation
/// [`FdbError`]: fdb::error::FdbError
/// [`NoNextReason`]: crate::cursor::NoNextReason
//
// Unlike Java RecordLayer, we do not have monadic abstractions (i.e.,
// methods such as `flat_map`, `flatten` etc.,). This is because in
// the `next` method, `CursorResult` returns a continuation. When
// cursors are composed, we have to reason about how the continuations
// will get composed, and if the composition of the continuations is
// correct. A related issue is that when we need throughput, we need
// to use pipelining, which can interact with continuations and
// parallel cursors in subtle ways. Once we have a good handle on
// these issues, we can explore how to add methods for cursor
// composition.
pub trait Cursor<T>: private::Sealed
where
    T: Send,
{
    /// Asynchronously return the next result from this cursor.
    fn next(&mut self) -> impl Future<Output = CursorResult<T>> + Send;

    /// Drain the cursor pushing all emitted values into a collection.
    fn collect(mut self) -> impl Future<Output = (Vec<T>, CursorError)> + Send
    where
        Self: Sized + Send,
    {
        async move {
            let mut v = Vec::new();

            let iter = &mut self;

            loop {
                match iter.next().await {
                    Ok(t) => v.push(t.into_value()),
                    Err(err) => return (v, err),
                }
            }
        }
    }

    /// Filters the values produced by this cursor according to the
    /// provided predicate.
    ///
    /// # Note
    ///
    /// Unlike a general iterator, the computation that you can do
    /// inside the async closure is limited. Specifically the closure
    /// `FnMut(&T) -> impl Future<Output = bool>` does not provide a
    /// mechanism to return an error that might possibly occur within
    /// the closure.
    ///
    /// If you need this feature, you will need to build the loop
    /// yourself and handle the issue of how and which continuation to
    /// return that would be useful to the caller.
    ///
    /// Also, since [`Cursor`] is a sealed class, this method is
    /// primarily meant for types defined in this crate.
    //
    // *Note:* The reason for not providing the capability for
    //         returning an error from within the closure is because
    //         it is not clear if the API user would want the
    //         continuation at the point of the error or the
    //         continuation *before* the error occurred.
    //
    //         Providing the latter, would require maintaining
    //         additional state when creating a value of
    //         `CursorFilter` type. We will also need a generic way of
    //         extracting a continuation from a `Cursor`.
    //
    //         This means we will need to introduce another API such
    //         as `fn get_continuation(&mut self) ->
    //         CursorResultContinuation` on the `Cursor` trait. Till
    //         the semantics of the `Cursor` is properly understood we
    //         want to keep the `Cursor` API as minimal as possible.
    //
    //         We can always roll the feature we need in the `next`
    //         method implementation and in the builder type for the
    //         `Cursor`. It won't be generic, but will get the job
    //         done.
    fn filter<F, Fut>(self, f: F) -> impl Future<Output = CursorFilter<T, Self, F>> + Send
    where
        Self: Sized + Send,
        F: FnMut(&T) -> Fut + Send,
        Fut: Future<Output = bool>,
    {
        async {
            CursorFilter {
                cursor: self,
                f,
                phantom: PhantomData,
            }
        }
    }

    /// Map this cursor's items to a different type, returning a new
    /// cursor of the resulting type.
    ///
    /// # Note
    ///
    /// Unlike a general iterator, the computation that you can do
    /// inside the async closure is limited. Specifically the closure
    /// `FnMut(T) -> impl Future<Output = U>` does not provide a
    /// mechanism to return an error that might possibly occur within
    /// the closure.
    ///
    /// If you need this feature, you will need to build the loop
    /// yourself and handle the issue of how and which continuation to
    /// return that would be useful to the caller.
    ///
    /// Also, since [`Cursor`] is a sealed class, this method is
    /// primarily meant for types defined in this crate.
    //
    // *Note:* See the comment mentioned in `filter`.
    fn map<U, F, Fut>(self, f: F) -> impl Future<Output = CursorMap<T, Self, F>> + Send
    where
        U: Send,
        Self: Sized + Send,
        F: FnMut(T) -> Fut + Send,
        Fut: Future<Output = U>,
    {
        async {
            CursorMap {
                cursor: self,
                f,
                phantom: PhantomData,
            }
        }
    }
}

/// Cursor returned by [`Cursor::filter`] method.
#[derive(Debug)]
pub struct CursorFilter<T, C, F>
where
    T: Send,
    C: Cursor<T>,
{
    cursor: C,
    f: F,
    phantom: PhantomData<T>,
}

impl<T, C, F, Fut> Cursor<T> for CursorFilter<T, C, F>
where
    T: Send,
    C: Cursor<T> + Send,
    F: FnMut(&T) -> Fut + Send,
    Fut: Future<Output = bool> + Send,
{
    async fn next(&mut self) -> CursorResult<T> {
        loop {
            let item = self.cursor.next().await;

            match item {
                Ok(cursor_success) => {
                    let (value, continuation) = cursor_success.into_parts();

                    if ((self.f)(&value)).await {
                        return Ok(CursorSuccess::new(value, continuation));
                    }
                }
                Err(e) => return Err(e),
            }
        }
    }
}

/// Cursor returned by [`Cursor::map`] method.
#[derive(Debug)]
pub struct CursorMap<T, C, F>
where
    T: Send,
    C: Cursor<T>,
{
    cursor: C,
    f: F,
    phantom: PhantomData<T>,
}

impl<U, T, C, F, Fut> Cursor<U> for CursorMap<T, C, F>
where
    U: Send,
    T: Send,
    C: Cursor<T> + Send,
    F: FnMut(T) -> Fut + Send,
    Fut: Future<Output = U> + Send,
{
    async fn next(&mut self) -> CursorResult<U> {
        let item = self.cursor.next().await;

        match item {
            Ok(cursor_success) => Ok({
                let (value, continuation) = cursor_success.into_parts();
                CursorSuccess::new(((self.f)(value)).await, continuation)
            }),
            Err(e) => Err(e),
        }
    }
}

#[cfg(test)]
mod tests {
    // No tests here as we are just defining traits.
}