use crate::env::basic::MapBindings;
use crate::error::EvaluationError;
use crate::eval::expr::EvalExpr;
use crate::eval::{EvalContext, EvalPlan};
use itertools::Itertools;
use partiql_value::Value::{Boolean, Missing, Null};
use partiql_value::{
bag, list, tuple, Bag, List, NullSortedValue, Tuple, Value, ValueIntoIterator,
};
use rustc_hash::FxHashMap;
use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::rc::Rc;
#[macro_export]
macro_rules! take_input {
($expr:expr, $ctx:expr) => {
match $expr {
None => {
$ctx.add_error(EvaluationError::IllegalState(
"Error in retrieving input value".to_string(),
));
return Missing;
}
Some(val) => val,
}
};
}
pub enum EvalType {
SelfManaged,
GraphManaged,
}
pub trait Evaluable: Debug {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value;
fn update_input(&mut self, input: Value, branch_num: u8, ctx: &dyn EvalContext);
fn get_vars(&self) -> Option<&[String]> {
None
}
fn eval_type(&self) -> EvalType {
EvalType::GraphManaged
}
}
pub(crate) struct EvalScan {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) as_key: String,
pub(crate) at_key: Option<String>,
pub(crate) input: Option<Value>,
attrs: Vec<String>,
}
impl Debug for EvalScan {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SCAN ")?;
self.expr.fmt(f)?;
write!(f, " AS {}", self.as_key)?;
if let Some(at_key) = &self.at_key {
write!(f, " AT {}", at_key)?;
}
Ok(())
}
}
impl EvalScan {
pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str) -> Self {
let attrs = vec![as_key.to_string()];
EvalScan {
expr,
as_key: as_key.to_string(),
at_key: None,
input: None,
attrs,
}
}
pub(crate) fn new_with_at_key(expr: Box<dyn EvalExpr>, as_key: &str, at_key: &str) -> Self {
let attrs = vec![as_key.to_string(), at_key.to_string()];
EvalScan {
expr,
as_key: as_key.to_string(),
at_key: Some(at_key.to_string()),
input: None,
attrs,
}
}
}
impl Evaluable for EvalScan {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = self.input.take().unwrap_or(Missing);
let bindings = match input_value {
Value::Bag(t) => *t,
Value::Tuple(t) => bag![*t],
_ => bag![tuple![]],
};
let mut value = bag![];
bindings.iter().for_each(|binding| {
let binding_tuple = binding.as_tuple_ref();
let v = self.expr.evaluate(&binding_tuple, ctx).into_owned();
let ordered = &v.is_ordered();
let mut at_index_counter: i64 = 0;
if let Some(at_key) = &self.at_key {
for t in v.into_iter() {
let mut out = Tuple::from([(self.as_key.as_str(), t)]);
let at_id = if *ordered {
at_index_counter.into()
} else {
Missing
};
out.insert(at_key, at_id);
value.push(Value::Tuple(Box::new(out)));
at_index_counter += 1;
}
} else {
for t in v.into_iter() {
let out = Tuple::from([(self.as_key.as_str(), t)]);
value.push(Value::Tuple(Box::new(out)));
}
}
});
Value::Bag(Box::new(value))
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
fn get_vars(&self) -> Option<&[String]> {
Some(&self.attrs)
}
}
pub(crate) struct EvalJoin {
pub(crate) kind: EvalJoinKind,
pub(crate) on: Option<Box<dyn EvalExpr>>,
pub(crate) input: Option<Value>,
pub(crate) left: Box<dyn Evaluable>,
pub(crate) right: Box<dyn Evaluable>,
}
#[derive(Debug)]
pub(crate) enum EvalJoinKind {
Inner,
Left,
Right,
Full,
}
impl Debug for EvalJoin {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:#?} JOIN", &self.kind)?;
if let Some(on) = &self.on {
write!(f, "ON ")?;
on.fmt(f)?;
}
Ok(())
}
}
impl EvalJoin {
pub(crate) fn new(
kind: EvalJoinKind,
left: Box<dyn Evaluable>,
right: Box<dyn Evaluable>,
on: Option<Box<dyn EvalExpr>>,
) -> Self {
EvalJoin {
kind,
on,
input: None,
left,
right,
}
}
}
impl Evaluable for EvalJoin {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
#[inline]
fn tuple_with_null_vals<I, S>(attrs: I) -> Tuple
where
S: Into<String>,
I: IntoIterator<Item = S>,
{
attrs.into_iter().map(|k| (k.into(), Null)).collect()
}
let mut output_bag = bag![];
let input_env = self.input.take().unwrap_or_else(|| Value::from(tuple![]));
self.left.update_input(input_env.clone(), 0, ctx);
let lhs_values = self.left.evaluate(ctx);
let left_bindings = match lhs_values {
Value::Bag(t) => *t,
_ => {
ctx.add_error(EvaluationError::IllegalState(
"Left side of FROM source should result in a bag of bindings".to_string(),
));
return Missing;
}
};
match self.kind {
EvalJoinKind::Inner => {
left_bindings.iter().for_each(|b_l| {
let env_b_l = input_env
.as_tuple_ref()
.as_ref()
.tuple_concat(b_l.as_tuple_ref().borrow());
self.right.update_input(Value::from(env_b_l), 0, ctx);
let rhs_values = self.right.evaluate(ctx);
let right_bindings = match rhs_values {
Value::Bag(t) => *t,
_ => bag![tuple![]],
};
for b_r in right_bindings.iter() {
match &self.on {
None => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
output_bag.push(Value::from(b_l_b_r));
}
Some(condition) => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
let env_b_l_b_r =
&input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
let cond = condition.evaluate(env_b_l_b_r, ctx);
if cond.as_ref() == &Value::Boolean(true) {
output_bag.push(Value::Tuple(Box::new(b_l_b_r)));
}
}
}
}
});
}
EvalJoinKind::Left => {
left_bindings.iter().for_each(|b_l| {
let mut output_bag_left = bag![];
let env_b_l = input_env
.as_tuple_ref()
.as_ref()
.tuple_concat(b_l.as_tuple_ref().borrow());
self.right.update_input(Value::from(env_b_l), 0, ctx);
let rhs_values = self.right.evaluate(ctx);
let right_bindings = match rhs_values {
Value::Bag(t) => *t,
_ => bag![tuple![]],
};
for b_r in right_bindings.iter() {
match &self.on {
None => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
output_bag_left.push(Value::from(b_l_b_r));
}
Some(condition) => {
let b_l_b_r = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(b_r.as_tuple_ref().borrow());
let env_b_l_b_r =
&input_env.as_tuple_ref().as_ref().tuple_concat(&b_l_b_r);
let cond = condition.evaluate(env_b_l_b_r, ctx);
if cond.as_ref() == &Value::Boolean(true) {
output_bag_left.push(Value::Tuple(Box::new(b_l_b_r)));
}
}
}
}
if output_bag_left.is_empty() {
let attrs = self.right.get_vars().unwrap_or(&[]);
let new_binding = b_l
.as_tuple_ref()
.as_ref()
.tuple_concat(&tuple_with_null_vals(attrs));
output_bag.push(Value::from(new_binding));
} else {
for elem in output_bag_left.into_iter() {
output_bag.push(elem)
}
}
});
}
EvalJoinKind::Full | EvalJoinKind::Right => {
ctx.add_error(EvaluationError::NotYetImplemented(
"FULL and RIGHT JOIN".to_string(),
));
return Missing;
}
};
Value::Bag(Box::new(output_bag))
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
fn eval_type(&self) -> EvalType {
EvalType::SelfManaged
}
}
#[derive(Debug)]
pub(crate) struct AggregateExpression {
pub(crate) name: String,
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) func: Box<dyn AggregateFunction>,
}
impl AggregateFunction for AggregateExpression {
#[inline]
fn next_distinct(
&self,
input_value: &Value,
state: &mut Option<Value>,
seen: &mut FxHashMap<Value, ()>,
) {
if input_value.is_present() {
self.func.next_distinct(input_value, state, seen);
}
}
#[inline]
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
if input_value.is_present() {
self.func.next_value(input_value, state);
}
}
#[inline]
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
self.func.finalize(state)
}
}
pub trait AggregateFunction: Debug {
#[inline]
fn next_distinct(
&self,
input_value: &Value,
state: &mut Option<Value>,
seen: &mut FxHashMap<Value, ()>,
) {
match seen.entry(input_value.clone()) {
Entry::Occupied(_) => {}
Entry::Vacant(v) => {
v.insert(());
self.next_value(input_value, state);
}
}
}
fn next_value(&self, input_value: &Value, state: &mut Option<Value>);
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError>;
}
#[derive(Debug)]
pub(crate) struct Avg {}
impl AggregateFunction for Avg {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => *state = Some(Value::from(list![Value::from(1), input_value.clone()])),
Some(Value::List(list)) => {
if let Some(count) = list.get_mut(0) {
*count += &Value::from(1);
}
if let Some(sum) = list.get_mut(1) {
*sum += input_value;
}
}
_ => unreachable!(),
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
match state {
None => Ok(Null),
Some(Value::List(list)) => {
let vals = list.to_vec();
if let [count, sum] = &vals[..] {
if let Value::Integer(n) = sum {
let sum = Value::from(rust_decimal::Decimal::from(*n));
Ok(&sum / count)
} else {
Ok(sum / count)
}
} else {
Err(EvaluationError::IllegalState(
"Bad finalize state for Avg".to_string(),
))
}
}
_ => unreachable!(),
}
}
}
#[derive(Debug)]
pub(crate) struct Count {}
impl AggregateFunction for Count {
fn next_value(&self, _: &Value, state: &mut Option<Value>) {
match state {
None => *state = Some(Value::from(1)),
Some(Value::Integer(i)) => {
*i += 1;
}
_ => unreachable!(),
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Value::from(0)))
}
}
#[derive(Debug)]
pub(crate) struct Max {}
impl AggregateFunction for Max {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => *state = Some(input_value.clone()),
Some(max) => {
if &*max < input_value {
*max = input_value.clone();
}
}
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Null))
}
}
#[derive(Debug)]
pub(crate) struct Min {}
impl AggregateFunction for Min {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => *state = Some(input_value.clone()),
Some(min) => {
if &*min > input_value {
*min = input_value.clone();
}
}
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Null))
}
}
#[derive(Debug)]
pub(crate) struct Sum {}
impl AggregateFunction for Sum {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => *state = Some(input_value.clone()),
Some(ref mut sum) => *sum += input_value,
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Null))
}
}
#[derive(Debug)]
pub(crate) struct Any {}
impl AggregateFunction for Any {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => {
*state = Some(match input_value {
Boolean(b) => Value::Boolean(*b),
_ => Missing,
})
}
Some(ref mut acc) => {
*acc = match (&acc, input_value) {
(Boolean(acc), Boolean(new)) => Boolean(*acc || *new),
_ => Missing,
}
}
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Null))
}
}
#[derive(Debug)]
pub(crate) struct Every {}
impl AggregateFunction for Every {
fn next_value(&self, input_value: &Value, state: &mut Option<Value>) {
match state {
None => {
*state = Some(match input_value {
Boolean(b) => Value::Boolean(*b),
_ => Missing,
})
}
Some(ref mut acc) => {
*acc = match (&acc, input_value) {
(Boolean(acc), Boolean(new)) => Boolean(*acc && *new),
_ => Missing,
}
}
};
}
fn finalize(&self, state: Option<Value>) -> Result<Value, EvaluationError> {
Ok(state.unwrap_or_else(|| Null))
}
}
#[derive(Debug)]
pub(crate) struct EvalGroupBy {
pub(crate) strategy: EvalGroupingStrategy,
pub(crate) group: Vec<Box<dyn EvalExpr>>,
pub(crate) aliases: Vec<String>,
pub(crate) aggs: Vec<AggregateExpression>,
pub(crate) distinct_aggs: Vec<AggregateExpression>,
pub(crate) group_as_alias: Option<String>,
pub(crate) input: Option<Value>,
}
type GroupKey = Vec<Value>;
type AggState = Vec<Option<Value>>;
type DAggState = Vec<(Option<Value>, FxHashMap<Value, ()>)>;
#[derive(Clone)]
struct CombinedState(AggState, DAggState, Option<Vec<Value>>);
#[derive(Debug)]
pub(crate) enum EvalGroupingStrategy {
GroupFull,
GroupPartial,
}
impl EvalGroupBy {
#[inline]
pub(crate) fn new(
strategy: EvalGroupingStrategy,
group: Vec<Box<dyn EvalExpr>>,
aliases: Vec<String>,
aggs: Vec<AggregateExpression>,
distinct_aggs: Vec<AggregateExpression>,
group_as_alias: Option<String>,
) -> Self {
Self {
strategy,
group,
aliases,
aggs,
distinct_aggs,
group_as_alias,
input: None,
}
}
#[inline]
fn group_key(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> GroupKey {
self.group
.iter()
.map(|expr| match expr.evaluate(bindings, ctx).as_ref() {
Missing => Value::Null,
val => val.clone(),
})
.collect()
}
}
impl Evaluable for EvalGroupBy {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let group_as_alias = &self.group_as_alias;
let input_value = take_input!(self.input.take(), ctx);
match self.strategy {
EvalGroupingStrategy::GroupPartial => {
ctx.add_error(EvaluationError::NotYetImplemented(
"GROUP PARTIAL".to_string(),
));
Missing
}
EvalGroupingStrategy::GroupFull => {
let mut grouped: FxHashMap<GroupKey, CombinedState> = FxHashMap::default();
let state = std::iter::repeat(None).take(self.aggs.len()).collect_vec();
let distinct_state = std::iter::repeat_with(|| (None, FxHashMap::default()))
.take(self.distinct_aggs.len())
.collect_vec();
let group_as = group_as_alias.as_ref().map(|_| vec![]);
let combined = CombinedState(state, distinct_state, group_as);
for v in input_value.into_iter() {
let v_as_tuple = v.coerce_into_tuple();
let group_key = self.group_key(&v_as_tuple, ctx);
let CombinedState(state, distinct_state, group_as) =
grouped.entry(group_key).or_insert_with(|| combined.clone());
for (agg_expr, state) in self.aggs.iter().zip(state.iter_mut()) {
let evaluated = agg_expr.expr.evaluate(&v_as_tuple, ctx);
agg_expr.next_value(evaluated.as_ref(), state);
}
for (distinct_expr, (state, seen)) in
self.distinct_aggs.iter().zip(distinct_state.iter_mut())
{
let evaluated = distinct_expr.expr.evaluate(&v_as_tuple, ctx);
distinct_expr.next_distinct(evaluated.as_ref(), state, seen);
}
if let Some(ref mut tuples) = group_as {
tuples.push(Value::from(v_as_tuple));
}
}
let vals = grouped
.into_iter()
.map(|(group_key, state)| {
let CombinedState(agg_state, distinct_state, group_as) = state;
let group = self.aliases.iter().cloned().zip(group_key);
let aggs_with_state = self.aggs.iter().zip(agg_state);
let daggs_with_state = self
.distinct_aggs
.iter()
.zip(distinct_state.into_iter().map(|(state, _)| state));
let agg_data = aggs_with_state.chain(daggs_with_state).map(
|(aggregate_expr, state)| {
let val = match aggregate_expr.finalize(state) {
Ok(agg_result) => agg_result,
Err(err) => {
ctx.add_error(err);
Missing
}
};
(aggregate_expr.name.to_string(), val)
},
);
let mut tuple = Tuple::from_iter(group.chain(agg_data));
if let Some(tuples) = group_as {
tuple.insert(
group_as_alias.as_ref().unwrap(),
Value::from(Bag::from(tuples)),
);
}
Value::from(tuple)
})
.collect_vec();
Value::from(Bag::from(vals))
}
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalPivot {
pub(crate) input: Option<Value>,
pub(crate) key: Box<dyn EvalExpr>,
pub(crate) value: Box<dyn EvalExpr>,
}
impl EvalPivot {
pub(crate) fn new(key: Box<dyn EvalExpr>, value: Box<dyn EvalExpr>) -> Self {
EvalPivot {
input: None,
key,
value,
}
}
}
impl Evaluable for EvalPivot {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let tuple: Tuple = input_value
.into_iter()
.filter_map(|binding| {
let binding = binding.coerce_into_tuple();
let key = self.key.evaluate(&binding, ctx);
if let Value::String(s) = key.as_ref() {
let value = self.value.evaluate(&binding, ctx);
Some((s.to_string(), value.into_owned()))
} else {
None
}
})
.collect();
Value::from(tuple)
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalUnpivot {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) as_key: String,
pub(crate) at_key: Option<String>,
pub(crate) input: Option<Value>,
attrs: Vec<String>,
}
impl EvalUnpivot {
pub(crate) fn new(expr: Box<dyn EvalExpr>, as_key: &str, at_key: Option<String>) -> Self {
let attrs = if let Some(at_key) = &at_key {
vec![as_key.to_string(), at_key.clone()]
} else {
vec![as_key.to_string()]
};
EvalUnpivot {
expr,
as_key: as_key.to_string(),
at_key,
input: None,
attrs,
}
}
}
impl Evaluable for EvalUnpivot {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let tuple = match self.expr.evaluate(&Tuple::new(), ctx).into_owned() {
Value::Tuple(tuple) => *tuple,
other => other.coerce_into_tuple(),
};
let as_key = self.as_key.as_str();
let pairs = tuple;
let unpivoted = if let Some(at_key) = &self.at_key {
pairs
.map(|(k, v)| Tuple::from([(as_key, v), (at_key.as_str(), k.into())]))
.collect::<Bag>()
} else {
pairs
.map(|(_, v)| Tuple::from([(as_key, v)]))
.collect::<Bag>()
};
Value::from(unpivoted)
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
fn get_vars(&self) -> Option<&[String]> {
Some(&self.attrs)
}
}
#[derive(Debug)]
pub(crate) struct EvalFilter {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalFilter {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalFilter { expr, input: None }
}
#[inline]
fn eval_filter(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> bool {
let result = self.expr.evaluate(bindings, ctx);
match result.as_ref() {
Boolean(bool_val) => *bool_val,
_ => false,
}
}
}
impl Evaluable for EvalFilter {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let filtered = input_value
.into_iter()
.map(Value::coerce_into_tuple)
.filter_map(|v| self.eval_filter(&v, ctx).then_some(v));
Value::from(filtered.collect::<Bag>())
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalHaving {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalHaving {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalHaving { expr, input: None }
}
#[inline]
fn eval_having(&self, bindings: &Tuple, ctx: &dyn EvalContext) -> bool {
let result = self.expr.evaluate(bindings, ctx);
match result.as_ref() {
Boolean(bool_val) => *bool_val,
_ => false,
}
}
}
impl Evaluable for EvalHaving {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let filtered = input_value
.into_iter()
.map(Value::coerce_into_tuple)
.filter_map(|v| self.eval_having(&v, ctx).then_some(v));
Value::from(filtered.collect::<Bag>())
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalOrderBySortCondition {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) spec: EvalOrderBySortSpec,
}
#[derive(Debug)]
pub(crate) enum EvalOrderBySortSpec {
AscNullsFirst,
AscNullsLast,
DescNullsFirst,
DescNullsLast,
}
#[derive(Debug)]
pub(crate) struct EvalOrderBy {
pub(crate) cmp: Vec<EvalOrderBySortCondition>,
pub(crate) input: Option<Value>,
}
impl EvalOrderBy {
#[inline]
fn compare(&self, l: &Value, r: &Value, ctx: &dyn EvalContext) -> Ordering {
let l = l.as_tuple_ref();
let r = r.as_tuple_ref();
self.cmp
.iter()
.map(|spec| {
let l = spec.expr.evaluate(&l, ctx);
let r = spec.expr.evaluate(&r, ctx);
match spec.spec {
EvalOrderBySortSpec::AscNullsFirst => {
let wrap = NullSortedValue::<true, Value>;
let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
l.cmp(&r)
}
EvalOrderBySortSpec::AscNullsLast => {
let wrap = NullSortedValue::<false, Value>;
let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
l.cmp(&r)
}
EvalOrderBySortSpec::DescNullsFirst => {
let wrap = NullSortedValue::<false, Value>;
let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
r.cmp(&l)
}
EvalOrderBySortSpec::DescNullsLast => {
let wrap = NullSortedValue::<true, Value>;
let (l, r) = (wrap(l.as_ref()), wrap(r.as_ref()));
r.cmp(&l)
}
}
})
.find_or_last(|o| o != &Ordering::Equal)
.unwrap_or(Ordering::Equal)
}
}
impl Evaluable for EvalOrderBy {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let mut values = input_value.into_iter().collect_vec();
values.sort_by(|l, r| self.compare(l, r, ctx));
Value::from(List::from(values))
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalLimitOffset {
pub(crate) limit: Option<Box<dyn EvalExpr>>,
pub(crate) offset: Option<Box<dyn EvalExpr>>,
pub(crate) input: Option<Value>,
}
impl Evaluable for EvalLimitOffset {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let empty_bindings = Tuple::new();
let offset = match &self.offset {
None => 0,
Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
Value::Integer(i) => {
if *i >= 0 {
*i as usize
} else {
0
}
}
_ => 0,
},
};
let limit = match &self.limit {
None => None,
Some(expr) => match expr.evaluate(&empty_bindings, ctx).as_ref() {
Value::Integer(i) => {
if *i >= 0 {
Some(*i as usize)
} else {
None
}
}
_ => None,
},
};
let ordered = input_value.is_ordered();
fn collect(values: impl Iterator<Item = Value>, ordered: bool) -> Value {
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
let offsetted = input_value.into_iter().skip(offset);
match limit {
Some(n) => collect(offsetted.take(n), ordered),
None => collect(offsetted, ordered),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalSelectValue {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalSelectValue {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalSelectValue { expr, input: None }
}
}
impl Evaluable for EvalSelectValue {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|v| {
let v_as_tuple = v.coerce_into_tuple();
self.expr.evaluate(&v_as_tuple, ctx).into_owned()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
pub(crate) struct EvalSelect {
pub(crate) exprs: Vec<(String, Box<dyn EvalExpr>)>,
pub(crate) input: Option<Value>,
}
impl EvalSelect {
pub(crate) fn new(exprs: Vec<(String, Box<dyn EvalExpr>)>) -> Self {
EvalSelect { exprs, input: None }
}
}
impl Debug for EvalSelect {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SELECT ")?;
let mut sep = "";
for (alias, expr) in &self.exprs {
write!(f, "{sep}")?;
expr.fmt(f)?;
write!(f, " AS {alias}")?;
sep = ", ";
}
Ok(())
}
}
impl Evaluable for EvalSelect {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|v| {
let v_as_tuple = v.coerce_into_tuple();
let tuple_pairs = self.exprs.iter().filter_map(|(alias, expr)| {
let evaluated_val = expr.evaluate(&v_as_tuple, ctx);
match evaluated_val.as_ref() {
Missing => None,
_ => Some((alias.as_str(), evaluated_val.into_owned())),
}
});
tuple_pairs.collect::<Tuple>()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug, Default)]
pub(crate) struct EvalSelectAll {
pub(crate) input: Option<Value>,
}
impl EvalSelectAll {
pub(crate) fn new() -> Self {
Self::default()
}
}
impl Evaluable for EvalSelectAll {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().map(|val| {
val.coerce_into_tuple()
.into_values()
.flat_map(|v| v.coerce_into_tuple().into_pairs())
.collect::<Tuple>()
});
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug)]
pub(crate) struct EvalExprQuery {
pub(crate) expr: Box<dyn EvalExpr>,
pub(crate) input: Option<Value>,
}
impl EvalExprQuery {
pub(crate) fn new(expr: Box<dyn EvalExpr>) -> Self {
EvalExprQuery { expr, input: None }
}
}
impl Evaluable for EvalExprQuery {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = self.input.take().unwrap_or(Value::Null).coerce_into_tuple();
self.expr.evaluate(&input_value, ctx).into_owned()
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
#[derive(Debug, Default)]
pub(crate) struct EvalDistinct {
pub(crate) input: Option<Value>,
}
impl EvalDistinct {
pub(crate) fn new() -> Self {
Self::default()
}
}
impl Evaluable for EvalDistinct {
fn evaluate(&mut self, ctx: &dyn EvalContext) -> Value {
let input_value = take_input!(self.input.take(), ctx);
let ordered = input_value.is_ordered();
let values = input_value.into_iter().unique();
match ordered {
true => Value::from(values.collect::<List>()),
false => Value::from(values.collect::<Bag>()),
}
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
pub(crate) struct EvalSink {
pub(crate) input: Option<Value>,
}
impl Evaluable for EvalSink {
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Value {
self.input.take().unwrap_or_else(|| Missing)
}
fn update_input(&mut self, input: Value, _branch_num: u8, _ctx: &dyn EvalContext) {
self.input = Some(input);
}
}
impl Debug for EvalSink {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SINK")
}
}
#[derive(Debug)]
pub(crate) struct EvalSubQueryExpr {
pub(crate) plan: Rc<RefCell<EvalPlan>>,
}
impl EvalSubQueryExpr {
pub(crate) fn new(plan: EvalPlan) -> Self {
EvalSubQueryExpr {
plan: Rc::new(RefCell::new(plan)),
}
}
}
impl EvalExpr for EvalSubQueryExpr {
fn evaluate<'a>(&'a self, bindings: &'a Tuple, _ctx: &'a dyn EvalContext) -> Cow<'a, Value> {
let value = if let Ok(evaluated) = self
.plan
.borrow_mut()
.execute_mut(MapBindings::from(bindings))
{
evaluated.result
} else {
Missing
};
Cow::Owned(value)
}
}
#[inline]
fn bagop_iter(v: Value) -> ValueIntoIterator {
match v {
Value::Null | Value::Missing => ValueIntoIterator::Single(None),
other => other.into_iter(),
}
}
#[derive(Debug, PartialEq)]
pub(crate) struct EvalOuterUnion {
pub(crate) setq: SetQuantifier,
pub(crate) l_input: Option<Value>,
pub(crate) r_input: Option<Value>,
}
impl EvalOuterUnion {
pub(crate) fn new(setq: SetQuantifier) -> Self {
EvalOuterUnion {
setq,
l_input: None,
r_input: None,
}
}
}
impl Evaluable for EvalOuterUnion {
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Value {
let lhs = bagop_iter(self.l_input.take().unwrap_or(Missing));
let rhs = bagop_iter(self.r_input.take().unwrap_or(Missing));
let chained = lhs.chain(rhs);
let vals = match self.setq {
SetQuantifier::All => chained.collect_vec(),
SetQuantifier::Distinct => chained.unique().collect_vec(),
};
Value::from(Bag::from(vals))
}
fn update_input(&mut self, input: Value, branch_num: u8, ctx: &dyn EvalContext) {
match branch_num {
0 => self.l_input = Some(input),
1 => self.r_input = Some(input),
_ => ctx.add_error(EvaluationError::IllegalState(
"Invalid branch number".to_string(),
)),
}
}
}
#[derive(Debug, PartialEq)]
pub(crate) struct EvalOuterIntersect {
pub(crate) setq: SetQuantifier,
pub(crate) l_input: Option<Value>,
pub(crate) r_input: Option<Value>,
}
impl EvalOuterIntersect {
pub(crate) fn new(setq: SetQuantifier) -> Self {
EvalOuterIntersect {
setq,
l_input: None,
r_input: None,
}
}
}
impl Evaluable for EvalOuterIntersect {
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Value {
let lhs = bagop_iter(self.l_input.take().unwrap_or(Missing));
let rhs = bagop_iter(self.r_input.take().unwrap_or(Missing));
let bag: Bag = match self.setq {
SetQuantifier::All => {
let mut lhs = lhs.counts();
Bag::from_iter(rhs.filter(|elem| match lhs.get_mut(elem) {
Some(count) if *count > 0 => {
*count -= 1;
true
}
_ => false,
}))
}
SetQuantifier::Distinct => {
let lhs: HashSet<Value> = lhs.collect();
Bag::from_iter(
rhs.filter(|elem| lhs.contains(elem))
.collect::<HashSet<_>>(),
)
}
};
Value::from(bag)
}
fn update_input(&mut self, input: Value, branch_num: u8, ctx: &dyn EvalContext) {
match branch_num {
0 => self.l_input = Some(input),
1 => self.r_input = Some(input),
_ => ctx.add_error(EvaluationError::IllegalState(
"Invalid branch number".to_string(),
)),
}
}
}
#[derive(Debug, PartialEq)]
pub(crate) struct EvalOuterExcept {
pub(crate) setq: SetQuantifier,
pub(crate) l_input: Option<Value>,
pub(crate) r_input: Option<Value>,
}
impl EvalOuterExcept {
pub(crate) fn new(setq: SetQuantifier) -> Self {
EvalOuterExcept {
setq,
l_input: None,
r_input: None,
}
}
}
impl Evaluable for EvalOuterExcept {
fn evaluate(&mut self, _ctx: &dyn EvalContext) -> Value {
let lhs = bagop_iter(self.l_input.take().unwrap_or(Missing));
let rhs = bagop_iter(self.r_input.take().unwrap_or(Missing));
let mut exclude = rhs.counts();
let excepted = lhs.filter(|elem| match exclude.get_mut(elem) {
Some(count) if *count > 0 => {
*count -= 1;
false
}
_ => true,
});
let vals = match self.setq {
SetQuantifier::All => excepted.collect_vec(),
SetQuantifier::Distinct => excepted.unique().collect_vec(),
};
Value::from(Bag::from(vals))
}
fn update_input(&mut self, input: Value, branch_num: u8, ctx: &dyn EvalContext) {
match branch_num {
0 => self.l_input = Some(input),
1 => self.r_input = Some(input),
_ => ctx.add_error(EvaluationError::IllegalState(
"Invalid branch number".to_string(),
)),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum SetQuantifier {
All,
Distinct,
}