[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

JDBCInputFormat and SplitDataProperties

Hi everyone,

I have the following scenario: I have a database table with 3 columns: a host (string), a timestamp, and an integer ID. Conceptually, what I'd like to do is:

group by host and timestamp -> based on all the IDs in each group, create a mapping to n new tuples -> for each unique tuple, count how many times it appeared across the resulting data

Each new tuple has 3 fields: the host, a new ID, and an Integer=1

What I'm currently doing is roughly:

val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
val source = environment.createInput(inut)
source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)

The query given to JDBCInputFormat provides results ordered by host and timestamp, and I was wondering if performance can be improved by specifying this in the code. I've looked at http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html and http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, but I still have some questions:

- If a split is a subset of a partition, what is the meaning of SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a split is divided into partitions, meaning that a partition would be a subset of a split.
- At which point can I retrieve and adjust a SplitDataProperties instance, if possible at all?
- If I wanted a coarser parallelization where each slot gets all the data for the same host, would I have to manually create the sub-groups based on timestamp?