[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Does Calcite hold all records output from a node before passing them to a higher node ?

Enumerable is a record at a time therefore inherently slow. So, Sink is a step in the right direction to improving this, because it collects batches of records.

The next step is to improve the scheduling, so a node is invoked to process a number of records - say for X nanoseconds or until input is empty or until output has at least N rows or B bytes - rather than at present where it processes all input at the same time.

Finding the right balance between one-at-a-time and all-rows-at-once is the ideal compromise of efficiency and memory usage.

As I said, the Arrow adapter will take a further big step in that direction.

> On May 30, 2018, at 1:08 PM, Muhammad Gelbana <m.gelbana@xxxxxxxxx> wrote:
> Is it correct to think that the Interpreter can be improved by removing the
> "sink" mechanism since buffering is only reasonable within the receiving
> node context, not the sending one ? In other words, an node acting as an
> input to another one should always send an enumerable and the
> receiving\parent node is free to buffer the obtained rows or process them
> right away.
> Thanks,
> Gelbana
> On Wed, May 30, 2018 at 12:37 AM, Julian Hyde <jhyde@xxxxxxxxxx> wrote:
>> Yes, when you use a Sink there is an assumption that there is a Node
>> running that is consuming from the deque. Currently the Interpreter only
>> runs one Node at a time, which means that the full output of that Node sits
>> in a deque for a while.
>> Clearly the Interpreter has much room for improvement.
>>> On May 29, 2018, at 3:22 PM, Muhammad Gelbana <m.gelbana@xxxxxxxxx>
>> wrote:
>>> I found out what was consuming the memory and delaying the results at the
>>> same time. I was pushing all obtained rows from the datasource into a
>> sink
>>> creating by this method
>>> <https://github.com/apache/calcite/blob/27a190ff303700b4329384e05c39bc
>> 40c893048e/core/src/main/java/org/apache/calcite/
>> interpreter/Compiler.java#L50>.
>>> Pushing rows into the sink halts further nodes execution until all rows
>> are
>>> totally loaded. I thought since the sink is backed by an "ArrayDeque"
>> that
>>> the rows would be consumed while being pushed to the sink.
>>> The other approach I applied was to use the "enumerable" method instead.
>>> This way, returned rows from my nodes are available for successive nodes
>>> without delay.
>>> Thank you all and thank you Julian for the Arrow adapter code.
>>> Thanks,
>>> Gelbana
>>> On Tue, May 29, 2018 at 5:50 PM, Julian Hyde <jhyde@xxxxxxxxxx> wrote:
>>>> I believe that scan, filter, project do not buffer; aggregate, join and
>>>> sort do buffer; join perhaps buffers a little more than it should.
>>>> Read methods in EnumerableDefaults, for example EnumerableDefaults.join,
>>>> to see where a blocking collection is created and from which input.
>>>> Ideally the operators would exploit sorted input (e.g. we could have an
>>>> aggregate that assumes input is sorted by the GROUP BY key and only
>> buffers
>>>> records that have the same key) but Enumerable does not aim to be a
>>>> high-performance, scalable engine, so this never got prioritized.
>>>> On a related note, I was pleased to see progress on an Arrow adapter and
>>>> convention in https://issues.apache.org/jira/browse/CALCITE-2173 <
>>>> https://issues.apache.org/jira/browse/CALCITE-2173>. If we were to
>> write
>>>> a high-performance engine that scales across many threads, it would be
>>>> based on Arrow. So anyone with complaints about the performance of
>>>> Enumerable convention should start contributing to Arrow convention!
>>>> Julian
>>>>> On May 29, 2018, at 7:20 AM, Michael Mior <mmior@xxxxxxxxxx> wrote:
>>>>> In theory it certainly should be possible to stream the results. This
>>>> isn't
>>>>> guaranteed however. You would have to look at the entire query pipeline
>>>> to
>>>>> see where things are being materialized. A full stack trace without
>>>>> elements removed would be a good start.
>>>>> --
>>>>> Michael Mior
>>>>> mmior@xxxxxxxxxx
>>>>> Le lun. 28 mai 2018 à 19:05, Muhammad Gelbana <m.gelbana@xxxxxxxxx> a
>>>>> écrit :
>>>>>> I'm not sure if I phrased my question correctly so let me explain
>> more.
>>>>>> I'm running a (SELECT * FROM TABLE) query against a 50 million records
>>>>>> table (Following the BINDABLE convention, so it sends it's rows
>> through
>>>> a
>>>>>> "sink"). Since the extracted rows aren't processed in any way, I was
>>>>>> expecting that the output JDBC resultset would be able to enumerate
>>>> through
>>>>>> all the results in a matter of seconds, but instead, my machine didn't
>>>>>> print anything. What exactly happens is that
>>>>>> (PreparedStatement.executeQuery) doesn't return a resultset promptly
>>>> even
>>>>>> after a few minutes have passed.
>>>>>> I tried a table with hundreds of rows and my testing code printed
>> those
>>>>>> results right away so it's not something I missed there, but probably
>> a
>>>>>> configuration I didn't set ? Or may be that's just how it is ? Does
>>>> anyone
>>>>>> else believe that the behaviour I expected is reasonable ? It would
>> also
>>>>>> lower the amount of memory consumed to hold the complete results
>> before
>>>>>> bursting them to their final destination, if that's the case in the
>>>> first
>>>>>> place.
>>>>>> Thanks,
>>>>>> Gelbana