osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to sleep for 1 sec and then call keyBy for partitioning


Hi,
This worked out after looking at https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
Why cannot I use setParallelism after keyBy-is it not an operator ?
DataStream<CameraWithCube> cameraWithCubeDataStream = env
.addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor))
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getCam() : new Object())
//.setParallelism(parallelTasks) //??? Why cannot I use setParallelism after keyBy-is it not an operator
//.setMaxParallelism(parallelTasks) // https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
.process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
@Override
public void processElement(CameraWithCube cameraWithCube, Context context, Collector<CameraWithCube> collector) throws Exception {
logger.info("before thread sleep");
Thread.sleep(500);
logger.info("after thread sleep");
collector.collect(cameraWithCube);
}
})
.setParallelism(parallelTasks) //???do I need to set this or will it take the parallelism from the earlier step ?
.setMaxParallelism(parallelTasks)
.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
cameraWithCube.cameraKey.getTs() : new Object());

TIA,
Vijay

On Wed, May 16, 2018 at 1:41 PM Jörn Franke <jornfranke@xxxxxxxxx> wrote:
Just some advice - do not use sleep to simulate a heavy task. Use real data or generated data to simulate. This sleep is garbage from a software quality point of view. Furthermore, it is often forgotten etc.

On 16. May 2018, at 22:32, Vijay Balakrishnan <bvijaykr@xxxxxxxxx> wrote:

Hi,
Newbie question - What I am trying to do is the following:
CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
1. Need to partition data by cameraNbr.
2. Then sleep for 1 sec to simulate a heavy process in the task.
3. Then need to partition data by TS and finally get the DataStream to connect with another DataStream.

DataStream<CameraWithCube> cameraWithCubeDataStream = env
                .addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor))
                .setParallelism(parallelTasks)
                .setMaxParallelism(parallelTasks)
                .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr
                        cameraWithCube.cameraKey.getCam() : new Object());
//sleep for 1 sec ???? how
                ((KeyedStream) cameraWithCubeDataStream).timeWindow(Time.seconds(1))
                    .apply(new WindowFunction<CameraWithCube, CameraWithCube, String, TimeWindow>() {
                        @Override
                        public void apply(String cameraKeyCam, TimeWindow timeWindow,
                                          Iterable<CameraWithCube> cameraWithCubesAssignedToWindow,
                                          Collector<CameraWithCube> collector) throws Exception {
                            Thread.sleep(1000);
                            cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> collector.collect(cameraWithCube));

                        }
                    })//returning void here from apply ??
//partition by TS and return DataStream
                .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr
                        cameraWithCube.cameraKey.getTS() : new Object());
;
TIA,
Vijay